In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from delta.tables import DeltaTable
import shutil

In [0]:
shutil.rmtree("/tmp/delta-table", ignore_errors=True)

In [0]:
import pyspark
from delta import *
builder = SparkSession.builder.appName("DeltaLakeApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config("spark.jars.packages","io.delta:delta-core_2.12:2.0.0")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [0]:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

In [0]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [0]:
newData = spark.range(0, 20)

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

deltaTable.alias("oldData")\
    .merge(
    newData.alias("newData"),
    "oldData.id = newData.id")\
    .whenMatchedUpdate(set={"id": col("newData.id")})\
    .whenNotMatchedInsert(values={"id": col("newData.id")})\
    .execute()

deltaTable.toDF().show()

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  7|
|  8|
|  9|
| 12|
| 13|
| 14|
| 17|
| 18|
| 19|
|  0|
|  1|
|  5|
|  6|
| 10|
| 11|
| 15|
| 16|
+---+



In [0]:
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
deltaTable.toDF().show()

+---+
| id|
+---+
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [0]:
deltaTable.update(
    condition=expr("id % 2 == 0"),
    set={"id": expr("id + 100")})

deltaTable.toDF().show()

+---+
| id|
+---+
|  5|
|  7|
|  9|
|106|
|108|
+---+



In [0]:
deltaTable.delete(condition=expr("id % 2 == 0"))
deltaTable.toDF().show()

+---+
| id|
+---+
|  5|
|  7|
|  9|
+---+



In [0]:
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [0]:
# Reference - https://github.com/deepavasanthkumar/spark_delta_lake/blob/main/delta_quickstart.ipynb