### Leveraging Delta Lake with Synapse Analytics

This Python notebook will showcase levering delta lake capabilities inside a Synapse Spark Pool.  As the link below highlights, we can also leverage streaming data with delta lake

https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-delta-lake-overview?pivots=programming-language-python

Another great resource
https://docs.delta.io/latest/api/python/index.html


In [23]:
import random

#if you want to go to a the default path, use relative value, else provide a location in data lake

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)
delta_table_path = "abfss://delta-example@mmxsynanapsexadlsge2.dfs.core.windows.net/delta/delta-table-{0}".format(session_id)
delta_table_path

StatementMeta(mmsparkpool, 10, 23, Finished, Available)

'abfss://delta-example@mmxsynanapsexadlsge2.dfs.core.windows.net/delta/delta-table-899014'

In [24]:
data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)

StatementMeta(mmsparkpool, 10, 24, Finished, Available)

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+

In [25]:
df = spark.read.format("delta").load(delta_table_path)
df.show()

StatementMeta(mmsparkpool, 10, 25, Finished, Available)

+---+
| id|
+---+
|  1|
|  2|
|  4|
|  3|
|  0|
+---+

### Update table table
Delta Lake Supports several operations to modify tables using standard DAtaFrame APIs, this is one of the big enhancements that delta format added

In [26]:
data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()

StatementMeta(mmsparkpool, 10, 26, Finished, Available)

+---+
| id|
+---+
|  7|
|  5|
|  6|
|  8|
|  9|
+---+

### Save as catalog tables
Delta Lake can write to managed or external catalog tables.

In [6]:
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()

StatementMeta(mmsparkpool, 10, 6, Finished, Available)

+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
| default| manageddeltatable|      false|
| default|externaldeltatable|      false|
+--------+------------------+-----------+

With this code, you created a new table in the catalog from an existing data frame - referred to as a managed table.  Then you defined a new external table in the catalog that uses an existing location - referred to as an external table.  In the ouput you see both tables - no matter how they were created - they are listed in teh catalog.

Now lets look at the extended properites of bot of these tables

In [17]:
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)

StatementMeta(mmsparkpool, 10, 17, Finished, Available)

+----------------------------+-------------------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                            |comment|
+----------------------------+-------------------------------------------------------------------------------------------------------------------------------------+-------+
|id                          |bigint                                                                                                                               |null   |
|                            |                                                                                                                                     |       |
|# Detailed Table Information|                                                                                                         

In [8]:
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)

StatementMeta(mmsparkpool, 10, 8, Finished, Available)

+----------------------------+---------------------------------------------------------------------+-------+
|col_name                    |data_type                                                            |comment|
+----------------------------+---------------------------------------------------------------------+-------+
|id                          |bigint                                                               |null   |
|                            |                                                                     |       |
|# Detailed Table Information|                                                                     |       |
|Database                    |default                                                              |       |
|Table                       |externaldeltatable                                                   |       |
|Owner                       |trusted-service-user                                                 |       |
|Created Time      

### Conditional update without overview

Delta Lake provides programmcatic APIS to conditional update, delete, and merge (aka - an upsert) data into tables

In [9]:
from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()

StatementMeta(mmsparkpool, 10, 9, Finished, Available)

+---+
| id|
+---+
|  9|
|106|
|108|
|  5|
|  7|
+---+

## Delete Rows
Let's leverage delta to delete rows

In [10]:
delta_table.delete("id % 2 == 0")
delta_table.toDF().show()

StatementMeta(mmsparkpool, 10, 10, Finished, Available)

+---+
| id|
+---+
|  9|
|  5|
|  7|
+---+

The existing data has been assigned the value -1 in the update(WhenMatched) code path. The new data that was created at the top of the snippet and was added via the insert code path (WhenNotMatched), was also added.

In [11]:
new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)

StatementMeta(mmsparkpool, 10, 11, Finished, Available)

+---+
| id|
+---+
| 15|
|  1|
| 13|
|  2|
| 10|
|  4|
|  0|
|  3|
| 11|
| -1|
| 17|
| 12|
| 14|
| 18|
|  6|
| 19|
|  8|
| -1|
| 16|
| -1|
+---+

### History
Delta Lake has the ability to allow looing into history of a table.  That is, the changes that were made to the underlying Delta Table.  Let's inspect the history

In [15]:
display(delta_table.history().show(20, 1000, False))

StatementMeta(mmsparkpool, 10, 15, Finished, Available)

+-------+-------------------+------+--------+---------+-------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|          timestamp|userId|userName|operation|                                                operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|                                                                                                                                                                                              operationMetrics|
+-------+-------------------+------+--------+---------+-------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+--------------------

### Read older versions of data using Time Travel

It's possible to query previous snapshots of your Delta Lake table by using a feature called Time Travel. If you want to access the data that you overwrote, you can query a snapshot of the table before you overwrote the first set of data using the versionAsOf option.

Once you run the cell below, you should see the first set of data from before you overwrote it. Time Travel is an extremely powerful feature that takes advantage of the power of the Delta Lake transaction log to access data that is no longer in the table. Removing the version 0 option (or specifying version 1) would let you see the newer data again. 

In [18]:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()

StatementMeta(mmsparkpool, 10, 18, Finished, Available)

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  0|
+---+

In [19]:
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()

StatementMeta(mmsparkpool, 10, 19, Finished, Available)

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|      4|2021-09-14 16:32:56|  null|    null|    MERGE|[predicate -> (ol...|null|    null|     null|          3|          null|        false|[numTargetRowsCop...|
|      3|2021-09-14 16:31:20|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          2|          null|        false|[numRemovedFiles ...|
|      2|2021-09-14 16:30:20|  null|    null|   UPDATE|[predicate -> ((i...|null|    null|     null|          1|          null|        false|[numRemovedFiles ...|
|      1|2021-09-14 16

### Vacuum Data

In [20]:
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()

StatementMeta(mmsparkpool, 10, 20, Finished, Available)

+--------------------+
|                path|
+--------------------+
|abfss://delta-exa...|
+--------------------+