In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from delta.tables import DeltaTable
from delta import configure_spark_with_delta_pip
import shutil

# https://docs.delta.io/latest/index.html

# delta lake essentially sits on top of apache spark and helps you manage your data lakes.
# it uses versioned, immutable parquet files to store data in cloud storage (s3), and keeps a transaction log.
# have to specify storage location.  can copy data from 1 location to another.  can stream directly to a delta table.
# portable code with other spark platforms.

# i dont think this is worth exploring in any further depth unless a role revolves around using spark almost exclusively.

# this Clears any previous runs
shutil.rmtree("/tmp/delta-table", ignore_errors=True)

builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

22/04/04 13:49:37 WARN Utils: Your hostname, jacob-BigOtisLinux resolves to a loopback address: 127.0.1.1; using 192.168.50.142 instead (on interface enp6s0)
22/04/04 13:49:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/jacob/.ivy2/cache
The jars for the packages stored in: /home/jacob/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-742f1e45-388b-45ef-835f-dccb5afae528;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.1.0 in central


:: loading settings :: url = jar:file:/home/jacob/.local/share/virtualenvs/pyspark_prac-QhYfwHaC/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 90ms :: artifacts dl 4ms
	:: modules in use:
	io.delta#delta-core_2.12;1.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-742f1e45-388b-45ef-835f-dccb5afae528
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/4ms)
22/04/04 13:49:37 WARN NativeCodeLoader: 

In [2]:
print("############# Creating a table ###############")
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")



############# Creating a table ###############


In [3]:
print("############ Reading the table ###############")
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

############ Reading the table ###############
+---+
| id|
+---+
|  3|
|  0|
|  1|
|  4|
|  2|
+---+



In [4]:
print("########### Upsert new data #############")
newData = spark.range(0, 20)

########### Upsert new data #############


In [5]:
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

In [6]:
deltaTable.alias("oldData")\
    .merge(
    newData.alias("newData"),
    "oldData.id = newData.id")\
    .whenMatchedUpdate(set={"id": col("newData.id")})\
    .whenNotMatchedInsert(values={"id": col("newData.id")})\
    .execute()


In [7]:
deltaTable.toDF().show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



In [8]:
print("########## Overwrite the table ###########")
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
deltaTable.toDF().show()

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

########## Overwrite the table ###########
+---+
| id|
+---+
|  8|
|  9|
|  6|
|  7|
|  5|
+---+



In [9]:
print("########### Update to the table(add 100 to every even value) ##############")
deltaTable.update(
    condition=expr("id % 2 == 0"),
    set={"id": expr("id + 100")})

deltaTable.toDF().show()


########### Update to the table(add 100 to every even value) ##############
+---+
| id|
+---+
|  9|
|106|
|108|
|  7|
|  5|
+---+



In [10]:
print("######### Delete every even value ##############")
deltaTable.delete(condition=expr("id % 2 == 0"))
deltaTable.toDF().show()

# Read old version of data using time travel
print("######## Read old data using time travel ############")
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()


######### Delete every even value ##############
+---+
| id|
+---+
|  9|
|  7|
|  5|
+---+

######## Read old data using time travel ############
+---+
| id|
+---+
|  3|
|  0|
|  1|
|  4|
|  2|
+---+



In [11]:
# cleanup
shutil.rmtree("/tmp/delta-table")