# Using Linux foundation Delta Lake in Synapse Spark
In this notebook, how to read the delta table, how to write to delta table and timetravel is demonstrated

In [None]:
# Set the strorage path info
account_name = '<storage_account_name>' # fill in your primary storage account name
container_name = '<container_name>' # fill in your container name
relative_delta_path='<folder_name>' # fill in your relative delta lake folder path

adls_path = 'abfss://%s@%s.dfs.core.windows.net/' % (container_name, account_name)
print('Primary storage account path: ' + adls_path)

# Delta Lake relative path
deltatablepath = adls_path + relative_delta_path + '/'
print('Delta Lake path: ' + deltatablepath)

### Read data in delta format



In [26]:
df_hr = spark.read.format("delta").load(deltatablepath)
df_hr.show(10)

+--------------------+----------+---------+------+-------+----------+--------------------+-----+----------+---+----------+-------------+---+
|       Employee_Name|     EmpID|MarriedID|DeptID|PayRate|PositionID|            Position|State|       DOB|Sex|DateofHire|   Department|Age|
+--------------------+----------+---------+------+-------+----------+--------------------+-----+----------+---+----------+-------------+---+
|          Brown, Mia|1103024456|     true|     1|   28.5|         1|        Accountant I|   MA|11/24/1987|  F|2008-10-27|Admin Offices| 33|
|LaRotonda, William  |1106026572|    false|     1|     23|         1|        Accountant I|   MA| 4/26/1984| M |2014-01-06|Admin Offices| 36|
|    Steans, Tyrone  |1302053333|    false|     1|     29|         1|        Accountant I|   MA|  9/1/1986| M |2014-09-29|Admin Offices| 34|
|     Howard, Estelle|1211050782|     true|     1|   21.5|         2|Administrative As...|   MA| 9/16/1985|  F|2015-02-16|Admin Offices| 35|
|         Sin

### Add New Column YearsOfService



In [27]:
import pyspark.sql.functions as f

df_hr_service = df_hr.withColumn('YearsOfService',2020-f.year(f.to_timestamp('DateofHire', 'MM/dd/yyyy')))
df_hr_service.show(5)

+--------------------+----------+---------+------+-------+----------+--------------------+-----+----------+---+----------+-------------+---+--------------+
|       Employee_Name|     EmpID|MarriedID|DeptID|PayRate|PositionID|            Position|State|       DOB|Sex|DateofHire|   Department|Age|YearsOfService|
+--------------------+----------+---------+------+-------+----------+--------------------+-----+----------+---+----------+-------------+---+--------------+
|          Brown, Mia|1103024456|     true|     1|   28.5|         1|        Accountant I|   MA|11/24/1987|  F|2008-10-27|Admin Offices| 33|            12|
|LaRotonda, William  |1106026572|    false|     1|     23|         1|        Accountant I|   MA| 4/26/1984| M |2014-01-06|Admin Offices| 36|             6|
|    Steans, Tyrone  |1302053333|    false|     1|     29|         1|        Accountant I|   MA|  9/1/1986| M |2014-09-29|Admin Offices| 34|             6|
|     Howard, Estelle|1211050782|     true|     1|   21.5|      

### Overwrite the entire delta table


In [29]:
df_hr_service.write.mode("overwrite").format("delta").option("mergeSchema","true").save(deltatablepath)

### Validate delta table is updated with new column


In [30]:
hrdataframe = spark.read.format("delta").load(deltatablepath)
hrdataframe.show(10)

+--------------------+----------+---------+------+-------+----------+--------------------+-----+----------+---+----------+-------------+---+--------------+
|       Employee_Name|     EmpID|MarriedID|DeptID|PayRate|PositionID|            Position|State|       DOB|Sex|DateofHire|   Department|Age|YearsOfService|
+--------------------+----------+---------+------+-------+----------+--------------------+-----+----------+---+----------+-------------+---+--------------+
|          Brown, Mia|1103024456|     true|     1|   28.5|         1|        Accountant I|   MA|11/24/1987|  F|2008-10-27|Admin Offices| 33|            12|
|LaRotonda, William  |1106026572|    false|     1|     23|         1|        Accountant I|   MA| 4/26/1984| M |2014-01-06|Admin Offices| 36|             6|
|    Steans, Tyrone  |1302053333|    false|     1|     29|         1|        Accountant I|   MA|  9/1/1986| M |2014-09-29|Admin Offices| 34|             6|
|     Howard, Estelle|1211050782|     true|     1|   21.5|      

### Check version with timetravel

we can see here yearsOfService column is not present in original delta table

In [32]:
hrdataoriginal = (spark
                    .read
                    .format("delta")
                    .option("versionAsOf",0)
                    .load(deltatablepath)
                    )
hrdataoriginal.show(10)

+--------------------+----------+---------+------+-------+----------+--------------------+-----+----------+---+----------+-------------+---+
|       Employee_Name|     EmpID|MarriedID|DeptID|PayRate|PositionID|            Position|State|       DOB|Sex|DateofHire|   Department|Age|
+--------------------+----------+---------+------+-------+----------+--------------------+-----+----------+---+----------+-------------+---+
|          Brown, Mia|1103024456|     true|     1|   28.5|         1|        Accountant I|   MA|11/24/1987|  F|2008-10-27|Admin Offices| 33|
|LaRotonda, William  |1106026572|    false|     1|     23|         1|        Accountant I|   MA| 4/26/1984| M |2014-01-06|Admin Offices| 36|
|    Steans, Tyrone  |1302053333|    false|     1|     29|         1|        Accountant I|   MA|  9/1/1986| M |2014-09-29|Admin Offices| 34|
|     Howard, Estelle|1211050782|     true|     1|   21.5|         2|Administrative As...|   MA| 9/16/1985|  F|2015-02-16|Admin Offices| 35|
|         Sin

### Update records that match the given condition 
Lets update here PayRate for employees whose payroll is less than 20 to make it lowest payrate value above 20.


In [35]:
from pyspark.sql.functions import *
from delta.tables import *

deltaTable = DeltaTable.forPath(spark,deltatablepath)

minPayRateAbove20 = hrdataframe.filter("PayRate>20").agg({"PayRate":"min"}).collect()[0]["min(PayRate)"]

print(minPayRateAbove20)

#Number of records that will be updated
deltaTable.toDF().filter(col("PayRate")<minPayRateAbove20).count()

21
178

In [36]:
# Update PayRate to 21 for employees whose PayRate is below 20 
deltaTable.update(
    condition = (col("PayRate")<minPayRateAbove20),
    set = {"PayRate":minPayRateAbove20}
)
deltaTable.toDF().filter(col("PayRate")<minPayRateAbove20).show()

+-------------+-----+---------+------+-------+----------+--------+-----+---+---+----------+----------+---+--------------+
|Employee_Name|EmpID|MarriedID|DeptID|PayRate|PositionID|Position|State|DOB|Sex|DateofHire|Department|Age|YearsOfService|
+-------------+-----+---------+------+-------+----------+--------+-----+---+---+----------+----------+---+--------------+
+-------------+-----+---------+------+-------+----------+--------+-----+---+---+----------+----------+---+--------------+

### Validate changes by filtering records on condition
Validate no employees have PayRate less than or equal to 20


In [37]:
deltaTableAfterUpdate = DeltaTable.forPath(spark,deltatablepath)
deltaTableAfterUpdate.toDF().filter(col("PayRate")<minPayRateAbove20).count()

0

## Audit data changes
or Check Version history


In [38]:
#get version history
deltaTable.history().show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|      2|2020-08-12 07:53:51|  null|    null|   UPDATE|[predicate -> (Pa...|null|    null|     null|          1|          null|        false|[numRemovedFiles ...|
|      1|2020-08-12 07:47:14|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|          0|          null|        false|[numFiles -> 2, n...|
|      0|2020-08-12 07:44:49|  null|    null|    WRITE|[mode -> Append, ...|null|    null|     null|       null|          null|         true|[numFiles -> 2, n...|
+-------+-------------

In [39]:
deltaTable.history(1).show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|      2|2020-08-12 07:53:51|  null|    null|   UPDATE|[predicate -> (Pa...|null|    null|     null|          1|          null|        false|[numRemovedFiles ...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+

### Undo changes for DeltaTable by restoring previous version
Lets set the PayRate as it was in previous version 

In [40]:
#Get verison 0 details
hrdataversion0 = spark\
                        .read\
                        .format("delta")\
                        .option("versionAsOF",0)\
                        .load(deltatablepath)
print("HR Dataframe as of version 0: ")
hrdataversion0.show(10)

print("In version 0 count of employees who have PayRate less than or equal to 20 are:%d" % hrdataversion0.filter(col("PayRate")<minPayRateAbove20).count())

# Revert changes
hrdataversion0.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(deltatablepath)

#read data and check count of employees again
finalversion = spark.read.format("delta").load(deltatablepath)
print("In latest version count of employees who have PayRate less than or equal to 20 are: %d" % finalversion.filter(col("PayRate")<minPayRateAbove20).count())
finalversion.show(10)

HR Dataframe as of version 0: 
+--------------------+----------+---------+------+-------+----------+--------------------+-----+----------+---+----------+-------------+---+
|       Employee_Name|     EmpID|MarriedID|DeptID|PayRate|PositionID|            Position|State|       DOB|Sex|DateofHire|   Department|Age|
+--------------------+----------+---------+------+-------+----------+--------------------+-----+----------+---+----------+-------------+---+
|          Brown, Mia|1103024456|     true|     1|   28.5|         1|        Accountant I|   MA|11/24/1987|  F|2008-10-27|Admin Offices| 33|
|LaRotonda, William  |1106026572|    false|     1|     23|         1|        Accountant I|   MA| 4/26/1984| M |2014-01-06|Admin Offices| 36|
|    Steans, Tyrone  |1302053333|    false|     1|     29|         1|        Accountant I|   MA|  9/1/1986| M |2014-09-29|Admin Offices| 34|
|     Howard, Estelle|1211050782|     true|     1|   21.5|         2|Administrative As...|   MA| 9/16/1985|  F|2015-02-16|A