# Hitchhiker's Guide to Delta Lake (Python)

This tutorial has been adapted for more clarity from its original counterpart [here](https://docs.delta.io/latest/quick-start.html). This notebook helps you quickly explore the main features of [Delta Lake](https://github.com/delta-io/delta). It provides code snippets that show how to read from and write to Delta Lake tables from interactive, batch, and streaming queries.

Here's what we will cover:
* Create a table
* Understanding meta-data
* Read data
* Update table data
* Overwrite table data
* Conditional update without overwrite
* Read older versions of data using Time Travel
* Write a stream of data to a table
* Read a stream of changes from a table

## Configuration
Make sure you modify this as appropriate.

In [3]:
import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path

'/delta/delta-table-587152'

## Create a table
To create a Delta Lake table, write a DataFrame out in the **delta** format. You can use existing Spark SQL code and change the format from parquet, csv, json, and so on, to delta.

These operations create a new Delta Lake table using the schema that was inferred from your DataFrame. For the full set of options available when you create a new Delta Lake table, see Create a table and Write to a table (subsequent cells in this notebook).

In [4]:
data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+

## Understanding Meta-data

In Delta Lake, meta-data is no different from data i.e., it is stored next to the data. Therefore, an interesting side-effect here is that you can peek into meta-data using regular Spark APIs. 

In [5]:
[log_line.value for log_line in spark.read.text(delta_table_path + "/_delta_log/").collect()]

['{"commitInfo":{"timestamp":1587774874100,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}', '{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}', '{"metaData":{"id":"3c38c9c7-0e45-4cc1-86c2-ce61b24cd80a","format":{"provider":"parquet","options":{}},"schemaString":"{\\"type\\":\\"struct\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\",\\"nullable\\":true,\\"metadata\\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587774867458}}', '{"add":{"path":"part-00000-fd3075c0-15dd-4aca-a68f-039171f1145e-c000.snappy.parquet","partitionValues":{},"size":262,"modificationTime":1587774873000,"dataChange":true}}', '{"add":{"path":"part-00001-104b0f1c-ec0f-4b45-af2d-8a2a24851881-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587774872000,"dataChange":true}}', '{"add":{"path":"part-00003-9cf556cb-0588-4a32-b100-d7f3fe71099d-c000.snappy.parquet","partitionValues":{},"size":429,"modif

## Read data

You read data in your Delta Lake table by specifying the path to the files.

In [6]:
df = spark.read.format("delta").load(delta_table_path)
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  4|
|  3|
|  2|
+---+

## Update table data

Delta Lake supports several operations to modify tables using standard DataFrame APIs. This example runs a batch job to overwrite the data in the table.


In [7]:
data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()

+---+
| id|
+---+
|  6|
|  8|
|  5|
|  7|
|  9|
+---+

When you now inspect the meta-data, what you will notice is that the original data is over-written. Well, not in a true sense but appropriate entries are added to Delta's transaction log so it can provide an "illusion" that the original data was deleted. We can verify this by re-inspecting the meta-data. You will see several entries indicating reference removal to the original data.

In [8]:
[log_line.value for log_line in spark.read.text(delta_table_path + "/_delta_log/").collect()]

['{"commitInfo":{"timestamp":1587774905262,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isBlindAppend":false}}', '{"add":{"path":"part-00000-4c6812ff-adf7-4c2f-b75d-251237d65a5e-c000.snappy.parquet","partitionValues":{},"size":262,"modificationTime":1587774902000,"dataChange":true}}', '{"add":{"path":"part-00001-12acbec0-9aa2-4fd0-884f-be6dff640f1a-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587774902000,"dataChange":true}}', '{"add":{"path":"part-00003-032e011b-2041-4685-a780-252ee8734bd6-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587774903000,"dataChange":true}}', '{"add":{"path":"part-00004-c9b0845d-8f9b-4800-ac55-dcf51f2d3cc3-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587774902000,"dataChange":true}}', '{"add":{"path":"part-00006-2ef15d13-ff27-4044-8f79-adcbb3bc1aa7-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587

## Save as catalog tables

Delta Lake can write to managed or external catalog tables.

In [9]:
# Write data to a new managed catalog table.
data.write.format("delta").saveAsTable("ManagedDeltaTable")

In [10]:
# Define an external catalog table that points to the existing Delta Lake data in storage.
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))

DataFrame[]

In [11]:
# List the 2 new tables.
spark.sql("SHOW TABLES").show()

# Explore their properties.
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)

+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
| default|externaldeltatable|      false|
| default| manageddeltatable|      false|
+--------+------------------+-----------+

+----------------------------+-------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                              |comment|
+----------------------------+-------------------------------------------------------------------------------------------------------+-------+
|id                          |bigint                                                                                                 |null   |
|                            |                                                                                                       |       |
|# Detailed Table Information|  

## Conditional update without overwrite

Delta Lake provides programmatic APIs to conditional update, delete, and merge (upsert) data into tables. For more information on these operations, see [Table Deletes, Updates, and Merges](https://docs.delta.io/latest/delta-update.html).

In [12]:
from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

In [13]:
# Update every even value by adding 100 to it
delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()

+---+
| id|
+---+
|106|
|108|
|  5|
|  7|
|  9|
+---+

In [14]:
# Delete every even value
delta_table.delete("id % 2 == 0")
delta_table.toDF().show()

+---+
| id|
+---+
|  5|
|  7|
|  9|
+---+

In [15]:
# Upsert (merge) new data
new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)

+---+
| id|
+---+
| 18|
| 15|
| 19|
|  2|
|  1|
|  6|
|  8|
|  3|
| -1|
| 10|
| 13|
|  0|
| 16|
|  4|
| -1|
| 12|
| 11|
| 14|
| -1|
| 17|
+---+

## History
Delta's most powerful feature is the ability to allow looking into history i.e., the changes that were made to the underlying Delta Table. The cell below shows how simple it is to inspect the history.

In [16]:
delta_table.history().show(20, 1000, False)

+-------+-------------------+------+--------+---------+-------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|operation|                                                operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+-------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+
|      4|2020-04-25 00:36:27|  null|    null|    MERGE|                       [predicate -> (oldData.`id` = newData.`id`)]|null|    null|     null|          3|          null|        false|
|      3|2020-04-25 00:36:08|  null|    null|   DELETE|[predicate -> ["((`id` % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]]|null|    null|     null|          2|          null|        false|
|      2|2020-04-25 00:35:51|  null|    null|   UPDATE|

## Read older versions of data using Time Travel

You can query previous snapshots of your Delta Lake table by using a feature called Time Travel. If you want to access the data that you overwrote, you can query a snapshot of the table before you overwrote the first set of data using the versionAsOf option.

Once you run the cell below, you should see the first set of data, from before you overwrote it. Time Travel is an extremely powerful feature that takes advantage of the power of the Delta Lake transaction log to access data that is no longer in the table. Removing the version 0 option (or specifying version 1) would let you see the newer data again. For more information, see [Query an older snapshot of a table (time travel)](https://docs.delta.io/latest/delta-batch.html#deltatimetravel).

In [17]:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  4|
|  3|
|  2|
+---+

## Write a stream of data to a table

You can also write to a Delta Lake table using Spark's Structured Streaming. The Delta Lake transaction log guarantees exactly-once processing, even when there are other streams or batch queries running concurrently against the table. By default, streams run in append mode, which adds new records to the table.

For more information about Delta Lake integration with Structured Streaming, see [Table Streaming Reads and Writes](https://docs.delta.io/latest/delta-streaming.html).

In the cells below, here's what we are doing:

1. *Cell 28* Setup a simple Spark Structured Streaming job to generate a sequence and make the job write into our Delta Table
2. *Cell 30* Show the newly appended data
3. *Cell 31* Inspect history
4. *Cell 32* Stop the structured streaming job
5. *Cell 33* Inspect history <-- You'll notice appends have stopped

In [18]:
streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)

## Read a stream of changes from a table

While the stream is writing to the Delta Lake table, you can also read from that table as streaming source. For example, you can start another streaming query that prints all the changes made to the Delta Lake table.

In [19]:
delta_table.toDF().sort(col("id").desc()).show(100)

+---+
| id|
+---+
| 19|
| 18|
| 17|
| 16|
| 15|
| 14|
| 13|
| 12|
| 11|
| 10|
|  8|
|  6|
|  4|
|  3|
|  2|
|  1|
|  0|
| -1|
| -1|
| -1|
+---+

In [20]:
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)

+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-----------+
|version|          timestamp|       operation|                                                                  operationParameters|readVersion|
+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-----------+
|      5|2020-04-25 00:37:09|STREAMING UPDATE|[outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0]|          4|
|      4|2020-04-25 00:36:27|           MERGE|                                         [predicate -> (oldData.`id` = newData.`id`)]|          3|
|      3|2020-04-25 00:36:08|          DELETE|                  [predicate -> ["((`id` % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]]|          2|
|      2|2020-04-25 00:35:51|          UPDATE|                   [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))

In [21]:
stream.stop()

In [22]:
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)

+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-----------+
|version|          timestamp|       operation|                                                                  operationParameters|readVersion|
+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-----------+
|      5|2020-04-25 00:37:09|STREAMING UPDATE|[outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0]|          4|
|      4|2020-04-25 00:36:27|           MERGE|                                         [predicate -> (oldData.`id` = newData.`id`)]|          3|
|      3|2020-04-25 00:36:08|          DELETE|                  [predicate -> ["((`id` % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]]|          2|
|      2|2020-04-25 00:35:51|          UPDATE|                   [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))

## Convert Parquet to Delta
You can do an in-place conversion from the Parquet format to Delta.

In [23]:
parquet_path = "/parquet/parquet-table-{0}".format(session_id)

data = spark.range(0,5)
data.write.parquet(parquet_path)

# Confirm that the data isn't in the Delta format
DeltaTable.isDeltaTable(spark, parquet_path)

False

In [24]:
DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))

# Confirm that the converted data is now in the Delta format
DeltaTable.isDeltaTable(spark, parquet_path)

True

## SQL Support
Delta supports table utility commands through SQL.  You can use SQL to:
* Get a DeltaTable's history
* Vacuum a DeltaTable
* Convert a Parquet file to Delta


In [25]:
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()

+-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|       operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+
|      5|2020-04-25 00:37:09|  null|    null|STREAMING UPDATE|[outputMode -> Ap...|null|    null|     null|          4|          null|         true|
|      4|2020-04-25 00:36:27|  null|    null|           MERGE|[predicate -> (ol...|null|    null|     null|          3|          null|        false|
|      3|2020-04-25 00:36:08|  null|    null|          DELETE|[predicate -> ["(...|null|    null|     null|          2|          null|        false|
|      2|2020-04-25 00:35:51|  null|    null|          UPDATE|[predicate -> ((i...|null|    null|     null

In [26]:
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()

+--------------------+
|                path|
+--------------------+
|abfss://data@arca...|
+--------------------+

In [27]:
parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_path)

data = spark.range(0,5)
data.write.parquet(parquet_path)

# Confirm that the data isn't in the Delta format
DeltaTable.isDeltaTable(spark, parquet_path)

# Use SQL to convert the parquet table to Delta
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))

DeltaTable.isDeltaTable(spark, parquet_path)

True