In [3]:
!pip install delta-spark

Collecting pyspark<3.2.0,>=3.1.0 (from delta-spark)
  Using cached pyspark-3.1.3-py2.py3-none-any.whl
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.3
    Can't uninstall 'pyspark'. No files were found to uninstall.
Successfully installed pyspark-3.1.3


In [4]:
import pyspark
from delta import *
from pyspark.sql import SparkSession

builder = pyspark.sql.SparkSession.builder.appName("jupyter-pyspark") \
        .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.4,org.apache.spark:spark-avro_2.12:3.1.2")\
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
        .config("spark.hadoop.fs.s3a.access.key", "minio") \
        .config("spark.hadoop.fs.s3a.secret.key", "SU2orange!") \
        .config("spark.hadoop.fs.s3a.fast.upload", True) \
        .config("spark.hadoop.fs.s3a.path.style.access", True) \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \

spark = configure_spark_with_delta_pip(builder).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")


AttributeError: module 'pyspark.sql.utils' has no attribute 'convert_exception'

In [None]:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

In [None]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

In [None]:
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

In [None]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

In [None]:
df = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta-table")
df.show()

In [18]:
history = spark.sql("DESCRIBE HISTORY delta.`/tmp/delta-table`")
latest_version = history.selectExpr("max(version)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/tmp/delta-table")

In [19]:
df.show()

+---+
| id|
+---+
|  7|
|  6|
|  9|
|  5|
|  8|
+---+



In [20]:
history.show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      1|2022-12-30 18:35:...|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          0|          null|        false|{numFiles -> 6, n...|        null|
|      0|2022-12-30 18:35:...|  null|    null|    WRITE|{mode -> ErrorIfE...|null|    null|     null|       null|          null|         true|{numFiles -> 6, n...|        null|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--

## More Real-World Example

In [23]:
# Create initial Table
people = spark.read.json(f"s3a://delta/people/*.json") # Read From S3
people.write.format("delta").mode("overwrite").save(f"s3a://delta/people-table") # Write to DeltaTable on D3

In [26]:
people.toPandas()

Unnamed: 0,age,id,name,weight
0,22,3,Jingle,188.0
1,27,4,Heimer,201.0
2,35,2,Jacob,166.0
3,30,1,John,175.0
4,38,5,Smith,


In [27]:
history = spark.sql(f"DESCRIBE HISTORY delta.`s3a://delta/people-table`")
history.show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      3|2022-12-30 18:36:33|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          2|          null|        false|{numFiles -> 5, n...|        null|
|      2|2022-12-30 18:36:05|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          1|          null|        false|{numFiles -> 5, n...|        null|
|      1|2022-12-30 18:33:24|  null|    null|   UPDATE|{predicate -> (na...|null|    null|     null|          0|        

In [28]:
peopleTable = DeltaTable.forPath(spark,f"s3a://delta/people-table") # Get as Table

In [29]:
peopleTable.toDF().printSchema()

root
 |-- age: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- weight: long (nullable = true)



In [30]:
peopleTable.update(condition="name='John'", set= { 'weight' : "177" }) #Update will re-write for you

## History of the table

In [46]:
peopleTable.history().show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      4|2022-12-30 18:37:14|  null|    null|   UPDATE|{predicate -> (na...|null|    null|     null|          3|          null|        false|{numRemovedFiles ...|        null|
|      3|2022-12-30 18:36:33|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          2|          null|        false|{numFiles -> 5, n...|        null|
|      2|2022-12-30 18:36:05|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          1|        

In [31]:
history = spark.sql(f"DESCRIBE HISTORY delta.`s3a://delta/people-table`")
history.show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      4|2022-12-30 18:37:14|  null|    null|   UPDATE|{predicate -> (na...|null|    null|     null|          3|          null|        false|{numRemovedFiles ...|        null|
|      3|2022-12-30 18:36:33|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          2|          null|        false|{numFiles -> 5, n...|        null|
|      2|2022-12-30 18:36:05|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          1|        

## Direct delta query in Spark SQL, similar to apache drill

In [37]:
query = spark.sql("select * from delta.`s3a://delta/people-table`")
query.show()

+---+---+------+------+
|age| id|  name|weight|
+---+---+------+------+
| 27|  4|Heimer|   201|
| 22|  3|Jingle|   188|
| 35|  2| Jacob|   166|
| 30|  1|  John|   177|
| 38|  5| Smith|  null|
+---+---+------+------+



In [44]:
peopleTable.merge(

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      4|2022-12-30 18:37:14|  null|    null|   UPDATE|{predicate -> (na...|null|    null|     null|          3|          null|        false|{numRemovedFiles ...|        null|
|      3|2022-12-30 18:36:33|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          2|          null|        false|{numFiles -> 5, n...|        null|
|      2|2022-12-30 18:36:05|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          1|        

## Use with Spark SQL by registering as a temp view

In [33]:
peopleTable.toDF().createOrReplaceTempView("peopleTable")

In [34]:
spark.sql("select * from peopleTable").show()

+---+---+------+------+
|age| id|  name|weight|
+---+---+------+------+
| 27|  4|Heimer|   201|
| 22|  3|Jingle|   188|
| 35|  2| Jacob|   166|
| 30|  1|  John|   177|
| 38|  5| Smith|  null|
+---+---+------+------+



## Upsert / Merge

In [76]:
data = [(1, "Johnz", 31,179) ,(6, "His", 56, 156)]
cols = ["id","name","age","weight"]
changes = spark.createDataFrame(data = data, schema = cols)
changes.show()

+---+-----+---+------+
| id| name|age|weight|
+---+-----+---+------+
|  1|Johnz| 31|   179|
|  6|  His| 56|   156|
+---+-----+---+------+



In [77]:
peopleTable.alias("tgt").merge(changes.alias("src"), condition = peopleTable.toDF().id == changes.id) \
    .whenMatchedUpdate( set = { "name" : changes.name, "age" : "src.age", "weight" : changes['weight'] } ) \
    .whenNotMatchedInsert( values = { "id" : "src.id", "name": "src.name", "age": "src.age", "weight" : "src.weight" })\
    .execute()


In [78]:
peopleTable.toDF().show()

+---+---+------+------+
|age| id|  name|weight|
+---+---+------+------+
| 27|  4|Heimer|   201|
| 22|  3|Jingle|   188|
| 35|  2| Jacob|   166|
| 31|  1| Johnz|   179|
| 56|  6|   His|   156|
| 38|  5| Smith|  null|
+---+---+------+------+



In [88]:
peopleTable.history().show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|     13|2022-12-30 19:04:26|  null|    null|    MERGE|{predicate -> (`i...|null|    null|     null|         12|          null|        false|{numTargetRowsCop...|        null|
|     12|2022-12-30 19:04:20|  null|    null|    MERGE|{predicate -> (`i...|null|    null|     null|         11|          null|        false|{numTargetRowsCop...|        null|
|     11|2022-12-30 19:03:56|  null|    null|    MERGE|{predicate -> (`i...|null|    null|     null|         10|        

## Time Travel

Important to know this is a delta feature and not a spark feature

In [98]:
df = spark.read.format("delta").option("timestampAsOf", "2022-12-30 18:30:26").load("s3a://delta/people-table")
df.show()

+---+---+------+------+
|age| id|  name|weight|
+---+---+------+------+
| 27|  4|Heimer|   201|
| 22|  3|Jingle|   188|
| 35|  2| Jacob|   166|
| 30|  1|  John|   175|
| 38|  5| Smith|  null|
+---+---+------+------+



In [95]:
df = spark.read.format("delta").option("versionAsOf", 1).load("s3a://delta/people-table")
df.show()

+---+---+------+------+
|age| id|  name|weight|
+---+---+------+------+
| 27|  4|Heimer|   201|
| 22|  3|Jingle|   188|
| 35|  2| Jacob|   166|
| 30|  1|  John|   177|
| 38|  5| Smith|  null|
+---+---+------+------+



In [99]:
df = spark.read.format("delta").option("timestampAfter", "2022-12-30 18:30:26").load("s3a://delta/people-table")
df.show()

+---+---+------+------+
|age| id|  name|weight|
+---+---+------+------+
| 27|  4|Heimer|   201|
| 22|  3|Jingle|   188|
| 35|  2| Jacob|   166|
| 31|  1| Johnz|   179|
| 56|  6|   His|   156|
| 38|  5| Smith|  null|
+---+---+------+------+



In [104]:
history = peopleTable.history()

for row in history.collect():
    ver = row.version
    df = spark.read.format("delta").option("versionAsOf", ver).load("s3a://delta/people-table")
    print(f"VERSION: {ver}")
    df.show()

VERSION: 13
+---+---+------+------+
|age| id|  name|weight|
+---+---+------+------+
| 27|  4|Heimer|   201|
| 22|  3|Jingle|   188|
| 35|  2| Jacob|   166|
| 31|  1| Johnz|   179|
| 56|  6|   His|   156|
| 38|  5| Smith|  null|
+---+---+------+------+

VERSION: 12
+---+---+------+------+
|age| id|  name|weight|
+---+---+------+------+
| 27|  4|Heimer|   201|
| 22|  3|Jingle|   188|
| 31|  1| Johnz|   178|
| 35|  2| Jacob|   166|
| 56|  6|   His|   156|
| 38|  5| Smith|  null|
+---+---+------+------+

VERSION: 11
+---+---+------+------+
|age| id|  name|weight|
+---+---+------+------+
| 27|  4|Heimer|   201|
| 22|  3|Jingle|   188|
| 35|  2| Jacob|   166|
| 31|  1| Johnz|   178|
| 56|  6|   His|   156|
| 38|  5| Smith|  null|
+---+---+------+------+

VERSION: 10
+---+---+------+------+
|age| id|  name|weight|
+---+---+------+------+
| 27|  4|Heimer|   201|
| 22|  3|Jingle|   188|
| 31|  1| Johns|   178|
| 35|  2| Jacob|   166|
| 56|  6|   His|   156|
| 38|  5| Smith|  null|
+---+---+----