# Delta Lake In A Nutshell

- __[History / Temporality / Time Travel](#History-/-Temporality-/-Time-Travel)__
- __[Schema Validation](#Schema-Validation)__
- __[Vacuum](#Vacuum)__
- __[Delta Lake API](#Delta-Lake-API-(Update,-Merge,-Delete))__

## Preperations

In [1]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.appName('delta_fun').getOrCreate()

In [3]:
data1 = [ ['Maciej', 29], ['John', 31], ['Jack',35]]
data2 = [ ['Beck', 24], ['Wick', 21], ['Samantha',35]]
data3 = [ ['Homer', 23, 9], ['Barney', 24, 7], ['Edward', 45, 8]]
dataSchema = StructType([ \
    StructField("name", StringType()), \
    StructField("age", IntegerType()) \
    ])
differentSchema = StructType([ \
    StructField("name", StringType()), \
    StructField("age", IntegerType()), \
    StructField("foot_size", IntegerType()) \
    ])
df1 = spark.sparkContext.parallelize(data1).toDF(dataSchema)
df2 = spark.sparkContext.parallelize(data2).toDF(dataSchema)
df3 = spark.sparkContext.parallelize(data3).toDF(differentSchema)

## History / Temporality / Time Travel

### Old way

In [4]:
path = "./data/example_1/old"

In [5]:
df1.show()
df2.show()

+------+---+
|  name|age|
+------+---+
|Maciej| 29|
|  John| 31|
|  Jack| 35|
+------+---+

+--------+---+
|    name|age|
+--------+---+
|    Beck| 24|
|    Wick| 21|
|Samantha| 35|
+--------+---+



In [6]:
df1.write.save(path)

In [7]:
spark.read.load(path).show()

+------+---+
|  name|age|
+------+---+
|Maciej| 29|
|  John| 31|
|  Jack| 35|
+------+---+



In [8]:
df2.write.mode("overwrite").save(path)

In [9]:
spark.read.load(path).show()

+--------+---+
|    name|age|
+--------+---+
|Samantha| 35|
|    Wick| 21|
|    Beck| 24|
+--------+---+



### Problems:
- We lost previous values
- We can't go back
- In case of mistake we are in trouble

## Delta Lake way

In [10]:
path = "./data/example_1/delta"

In [11]:
df1.write.format("delta").save(path)

In [12]:
spark.read.format("delta").load(path).show()

+------+---+
|  name|age|
+------+---+
|Maciej| 29|
|  Jack| 35|
|  John| 31|
+------+---+



### Spark streaming part 1 - come back here later

In [13]:
streaming_path = './data/example_1/delta_stream'
streaming_checkpoint = './data/example_1/delta_stream_checkpount'

In [15]:
stream = spark.readStream \
  .format("delta") \
  .option("ignoreChanges", "true") \
  .load(path) \
  .agg(avg(col('age')).alias("average")) \
  .writeStream \
  .format("delta") \
  .outputMode("complete") \
  .option("checkpointLocation", streaming_checkpoint) \
  .start(streaming_path)

### Ok, you can continue

In [16]:
!ls ./data/example_1/delta | wc -l

5


In [17]:
df2.write.mode("overwrite").format("delta").save(path)

In [18]:
spark.read.format("delta").load(path).show()

+--------+---+
|    name|age|
+--------+---+
|Samantha| 35|
|    Beck| 24|
|    Wick| 21|
+--------+---+



In [19]:
!ls ./data/example_1/delta | wc -l

9


In [20]:
df1.write.mode("append").format("delta").save(path)

In [21]:
spark.read.format("delta").load(path).show()

+--------+---+
|    name|age|
+--------+---+
|Samantha| 35|
|  Maciej| 29|
|    Jack| 35|
|    Beck| 24|
|    John| 31|
|    Wick| 21|
+--------+---+



In [232]:
!ls ./data/example_1/delta | wc -l

13


In [233]:
spark.read.format("parquet").load(path).show()

+--------+---+
|    name|age|
+--------+---+
|Samantha| 35|
|  Maciej| 29|
|  Maciej| 29|
|    Jack| 35|
|    Jack| 35|
|    John| 31|
|    John| 31|
|    Wick| 21|
|    Beck| 24|
+--------+---+



Parquet format also works, but show all values (old)

In [234]:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, path)

In [235]:
deltaTable.history().toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend
0,2,2020-02-06 20:56:18,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,1.0,,True
1,1,2020-02-06 20:56:14,,,WRITE,"{'mode': 'Overwrite', 'partitionBy': '[]'}",,,,0.0,,False
2,0,2020-02-06 20:56:10,,,WRITE,"{'mode': 'ErrorIfExists', 'partitionBy': '[]'}",,,,,,True


Here is the first version

In [236]:
spark.read.format("delta").option("versionAsOf", 0).load(path).show()

+------+---+
|  name|age|
+------+---+
|Maciej| 29|
|  Jack| 35|
|  John| 31|
+------+---+



## Schema Validation

We would like to add DataFrame with different schema.

In [237]:
df3.show()

+------+---+---------+
|  name|age|foot_size|
+------+---+---------+
| Homer| 23|        9|
|Barney| 24|        7|
|Edward| 45|        8|
+------+---+---------+



In [238]:
spark.read.format("delta").load(path).printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [239]:
df3.write.mode("append").format("delta").save(path)

AnalysisException: 'A schema mismatch detected when writing to the Delta table.\nTo enable schema migration, please set:\n\'.option("mergeSchema", "true")\'.\n\nTable schema:\nroot\n-- name: string (nullable = true)\n-- age: integer (nullable = true)\n\n\nData schema:\nroot\n-- name: string (nullable = true)\n-- age: integer (nullable = true)\n-- foot_size: integer (nullable = true)\n\n         \nIf Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE\ncommand for changing the schema.\n        ;'

But we can't, unless we set option "mergeSchema" to true

In [240]:
df3.write.mode("append").option("mergeSchema", "true").format("delta").save(path)

In [241]:
deltaTable.history().select("version","timestamp","operation","operationParameters").show(100,False)

+-------+-------------------+---------+------------------------------------------+
|version|timestamp          |operation|operationParameters                       |
+-------+-------------------+---------+------------------------------------------+
|3      |2020-02-06 20:57:46|WRITE    |[mode -> Append, partitionBy -> []]       |
|2      |2020-02-06 20:56:18|WRITE    |[mode -> Append, partitionBy -> []]       |
|1      |2020-02-06 20:56:14|WRITE    |[mode -> Overwrite, partitionBy -> []]    |
|0      |2020-02-06 20:56:10|WRITE    |[mode -> ErrorIfExists, partitionBy -> []]|
+-------+-------------------+---------+------------------------------------------+



In [242]:
spark.read.format("delta").load(path).printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- foot_size: integer (nullable = true)



Looks like deltaTable store old metadata

In [243]:
deltaTable.toDF().show()

+--------+---+
|    name|age|
+--------+---+
|  Edward| 45|
|  Barney| 24|
|   Homer| 23|
|Samantha| 35|
|  Maciej| 29|
|    Wick| 21|
|    Beck| 24|
|    Jack| 35|
|    John| 31|
+--------+---+



In [244]:
deltaTable = DeltaTable.forPath(spark, path)
deltaTable.toDF().show()

+--------+---+---------+
|    name|age|foot_size|
+--------+---+---------+
|  Edward| 45|        8|
|  Barney| 24|        7|
|   Homer| 23|        9|
|Samantha| 35|     null|
|  Maciej| 29|     null|
|    Wick| 21|     null|
|    Beck| 24|     null|
|    Jack| 35|     null|
|    John| 31|     null|
+--------+---+---------+



History versions are not affected

In [245]:
spark.read.format("delta").option("versionAsOf", 0).load(path).printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



## Vacuum

Let's clear history we don't need anymore

In [246]:
!ls ./data/example_1/delta | wc -l

17


In [247]:
deltaTable.vacuum()

DataFrame[]

In [248]:
!ls ./data/example_1/delta | wc -l

17


In [249]:
deltaTable.history().select("version","timestamp","operation","operationParameters").show(100,False)

+-------+-------------------+---------+------------------------------------------+
|version|timestamp          |operation|operationParameters                       |
+-------+-------------------+---------+------------------------------------------+
|3      |2020-02-06 20:57:46|WRITE    |[mode -> Append, partitionBy -> []]       |
|2      |2020-02-06 20:56:18|WRITE    |[mode -> Append, partitionBy -> []]       |
|1      |2020-02-06 20:56:14|WRITE    |[mode -> Overwrite, partitionBy -> []]    |
|0      |2020-02-06 20:56:10|WRITE    |[mode -> ErrorIfExists, partitionBy -> []]|
+-------+-------------------+---------+------------------------------------------+



Nothing have changed because default retension is 7 days ^-^

## Delta Lake API (Update, Merge, Delete)

### Delete

In [250]:
deltaTable.toDF().show()

+--------+---+---------+
|    name|age|foot_size|
+--------+---+---------+
|  Edward| 45|        8|
|  Barney| 24|        7|
|   Homer| 23|        9|
|Samantha| 35|     null|
|  Maciej| 29|     null|
|    Wick| 21|     null|
|    Beck| 24|     null|
|    Jack| 35|     null|
|    John| 31|     null|
+--------+---+---------+



In [251]:
deltaTable.delete("name = 'Edward'")

In [252]:
deltaTable.toDF().show()

+--------+---+---------+
|    name|age|foot_size|
+--------+---+---------+
|  Barney| 24|        7|
|   Homer| 23|        9|
|Samantha| 35|     null|
|  Maciej| 29|     null|
|    Wick| 21|     null|
|    Beck| 24|     null|
|    Jack| 35|     null|
|    John| 31|     null|
+--------+---+---------+



### Update

In [253]:
deltaTable.update(
    condition = "foot_size is null",
    set = { "foot_size": "6" } )

In [254]:
deltaTable.toDF().show()

+--------+---+---------+
|    name|age|foot_size|
+--------+---+---------+
|Samantha| 35|        6|
|  Maciej| 29|        6|
|  Barney| 24|        7|
|    John| 31|        6|
|    Jack| 35|        6|
|    Beck| 24|        6|
|    Wick| 21|        6|
|   Homer| 23|        9|
+--------+---+---------+



### Merge

Let's make this example a little bit less syntetic and assume that 'name' is unique.

In [256]:
deltaTable.toDF().withColumn("count",lit(1))\
    .write.option("mergeSchema", "true").mode("overwrite").format("delta").save(path)

In [257]:
deltaTable = DeltaTable.forPath(spark, path)

In [258]:
deltaTable.toDF().show()

+--------+---+---------+-----+
|    name|age|foot_size|count|
+--------+---+---------+-----+
|Samantha| 35|        6|    1|
|  Maciej| 29|        6|    1|
|  Barney| 24|        7|    1|
|    John| 31|        6|    1|
|    Jack| 35|        6|    1|
|    Beck| 24|        6|    1|
|    Wick| 21|        6|    1|
|   Homer| 23|        9|    1|
+--------+---+---------+-----+



In [259]:
deltaTable.alias("clients").merge(
    source = df3.alias("new_clients"),
    condition = expr("clients.name == new_clients.name")
  ).whenMatchedUpdate(set =
    {
      "count": col("clients.count") + 1
    }
  ).whenNotMatchedInsert(values =
    {
      "name": col("new_clients.name"),
      "age": col("new_clients.age"),
      "foot_size": col("new_clients.foot_size"),
      "count": lit("1")
    }
  ).execute()

In [260]:
deltaTable.toDF().show()

+--------+---+---------+-----+
|    name|age|foot_size|count|
+--------+---+---------+-----+
|Samantha| 35|        6|    1|
|  Maciej| 29|        6|    1|
|  Edward| 45|        8|    1|
|  Barney| 24|        7|    2|
|   Homer| 23|        9|    2|
|    Jack| 35|        6|    1|
|    Beck| 24|        6|    1|
|    Wick| 21|        6|    1|
|    John| 31|        6|    1|
+--------+---+---------+-----+



## Delta as Streaming Sink - part 2

In [264]:
streaming_path = './data/example_1/delta_stream'
streaming_checkpoint = './data/example_1/delta_stream_checkpount'

In [267]:
stream = spark.readStream \
  .format("delta") \
  .load(path) \
  .agg(avg(col("age"))) \
  .writeStream \
  .format("delta") \
  .outputMode("complete") \
  .option("checkpointLocation", streaming_checkpoint) \
  .start(streaming_path)

SyntaxError: invalid syntax (<ipython-input-267-d7a36b652170>, line 4)

In [266]:
spark.streams

<pyspark.sql.streaming.StreamingQueryManager at 0x7faa713938d0>