# Delta Lake PySpark Quickstart

Referring to https://docs.delta.io/latest/quick-start.html, the following steps have been taken care of by the docker image.

> Note: You do **NOT** need to run these 2 commands.

## Python Notes
```bash
pip install pyspark==<compatible-spark-version>

$SPARK_HOME/bin/pyspark --packages io.delta:<compatible-delta-version> --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
```

## Scala Notes
If you would like to follow the scala version open a terminal and follow the scala instructions starting with

```bash
$SPARK_HOME/bin/spark-shell --packages io.delta:<compatible-delta-version> --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
```

## Display versions of `python` and `spark`

In [1]:
import sys
print (sys.version)

3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]


In [2]:
spark.version

'3.5.1'

## Write to and read from a Delta Lake table

### Write a Spark DataFrame to a Delta Lake table

In [3]:
data = spark.range("0", "5")

(data
  .write
  .mode('overwrite')
  .format("delta")
  .save("/data/delta-table")
)

24/06/26 17:30:38 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

### Read the above Delta Lake table to a Spark DataFrame and display the DataFrame

In [6]:
import pyspark.sql.functions as F
df = (spark
        .read
        .format("delta")
        .load("/data/delta-table")
        .orderBy("id")
      )
df = df.withColumn('foo', F.lit('2023-09-26T09:27:53.700Z'))
df = df.withColumn('bar', F.expr('month(foo) * 1000'))
df = df.withColumn('car', F.expr('id'))
df.show()
df.schema['foo'].simpleString().split(':')[1]
display (df)

(df
  .write
  .mode('overwrite')
  .format("delta")
  .save("/data/delta-table2")
)

+---+--------------------+----+---+
| id|                 foo| bar|car|
+---+--------------------+----+---+
|  0|2023-09-26T09:27:...|9000|  0|
|  1|2023-09-26T09:27:...|9000|  1|
|  2|2023-09-26T09:27:...|9000|  2|
|  3|2023-09-26T09:27:...|9000|  3|
|  4|2023-09-26T09:27:...|9000|  4|
+---+--------------------+----+---+



DataFrame[id: bigint, foo: string, bar: int, car: bigint]

In [11]:
spark.sql("""
SELECT count(*)  from delta.`/data/delta-table` 
""").collect()

24/06/26 17:58:02 WARN ObjectStore: Failed to get database delta, returning NoSuchObjectException


[Row(count(1)=5)]

## Overwrite a Delta Lake table

### Overwrite the Delta Lake table written in the above step

In [None]:
data = spark.range(5, 10)

(data
  .write
  .format("delta")
  .mode("overwrite")
  .save("/tmp/delta-table")
)

### Read the above overwritten Delta Lake table to a Spark DataFrame and display the DataFrame

In [None]:
df = (spark
        .read
        .format("delta")
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()

## Delta Lake and [ACID](https://en.wikipedia.org/wiki/ACID)

### Showcase `update` feature of Delta Lake and display the resulting DataFrame

In [None]:
from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
(delta_table
  .update(
    condition = expr("id % 2 == 0"),
    set = { "id": expr("id + 100") }
  )
)

(delta_table
  .toDF()
  .orderBy("id")
  .show()
)

### Showcase `delete` feature of Delta Lake and display the resulting DataFrame

In [None]:
# Delete every even value
(delta_table
  .delete(
    condition = expr("id % 2 == 0")
  )
)

(delta_table
  .toDF()
  .orderBy("id")
  .show()
)

### Showcase `merge` feature of Delta Lake and display the resulting DataFrame

In [None]:
# Upsert (merge) new data
new_data = spark.range(0, 20)

(delta_table.alias("old_data")
  .merge(
      new_data.alias("new_data"),
      "old_data.id = new_data.id"
      )
  .whenMatchedUpdate(set = { "id": col("new_data.id") })
  .whenNotMatchedInsert(values = { "id": col("new_data.id") })
  .execute()
)

(delta_table
  .toDF()
  .orderBy("id")
  .show()
)

## Time travel feature of Delta Lake

### Display the entire history of the above Delta Lake table

In [None]:
# get the full history of the table
delta_table_history = (DeltaTable
                        .forPath(spark, "/tmp/delta-table")
                        .history()
                      )

(delta_table_history
   .select("version", "timestamp", "operation", "operationParameters", "operationMetrics", "engineInfo")
   .show()
)

### Latest version of the Delta Lake table

In [None]:
# get the full history of the table
delta_table_history = (DeltaTable
                        .forPath(spark, "/tmp/delta-table")
                        .history()
                      )

(delta_table_history
   .select("version", "timestamp", "operation", "operationParameters", "operationMetrics", "engineInfo")
   .show()
)

### Latest version of the Delta Lake table

In [None]:
df = (spark
        .read
        .format("delta")
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()

### Time travel to the version `0` of the Delta Lake table using Delta Lake's history feature

In [None]:
df = (spark
        .read
        .format("delta")
        .option("versionAsOf", 0) # we pass an option `versionAsOf` with the required version number we are interested in
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()

### Time travel to the version `3` of the Delta Lake table using Delta Lake's  history feature

In [None]:
df = (spark
        .read
        .format("delta")
        .option("versionAsOf", 3) # we pass an option `versionAsOf` with the required version number we are interested in
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()

## A little bit of Streaming

In [None]:
streaming_df = (spark
                 .readStream
                 .format("rate")
                 .load()
               )

stream = (streaming_df
            .selectExpr("value as id")
            .writeStream
            .format("delta")
            .option("checkpointLocation", "/tmp/checkpoint")
            .start("/tmp/delta-table")
          )

In [None]:
# To view the results of this step, view your container logs after execution using: docker logs --follow <first 4 number of container id>

stream2 = (spark
            .readStream
            .format("delta")
            .load("/tmp/delta-table")
            .writeStream
            .format("console")
            .start()
          )

In [None]:
import yaml
