In [None]:
import pyspark
from delta import *
from delta.tables import *
from pyspark.sql.functions import *

In [None]:
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

In [None]:
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# It might take briefly to start the Spark process. 
# Wait for some output below to let the Spark fully started.

In [None]:
deltaTable = DeltaTable.forPath(spark, "./out/delta-table")
# It might take briefly to load Delta Table in.
# Wait for some output progress bar below to let the table loaded.

In [None]:
deltaTable

In [23]:
deltaTable.toDF().printSchema()

root
 |-- id: long (nullable = true)



In [None]:
deltaTable.toDF().show()

In [None]:
# Update every even value by adding 100 to it
deltaTable.update(condition = expr("id % 2 == 0"), set = { "id": expr("id + 100") })

In [None]:
deltaTable.toDF().show()

At this point, visit http://localhost:4040 to observe Spark jobs and workloads there.

In [None]:
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

In [None]:
deltaTable.toDF().show()

In [None]:
# Upsert (merge) new data to the Table
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

In [None]:
deltaTable.toDF().show()

In [None]:
# Read older versions of data using time travel
df = spark.read.format("delta").option("versionAsOf", 0).load("./out/delta-table")
df.show()

In [None]:
# Write a stream of data to a table
streamingDf = spark.readStream.format("rate").load()

stream = streamingDf.selectExpr("value as id") \
  .writeStream.format("delta") \
  .option("checkpointLocation", "./out/checkpoint") \
  .start("./out/delta-table")

# Note: let it run for few seconds or a minute

In [None]:
deltaTable.toDF().show()

In [None]:
# Now let stop the stream process
stream.stop()

In [None]:
deltaTable.toDF().count()

In [None]:
deltaTable.toDF().printSchema()

In [None]:
deltaTable.toDF().head(5)

In [None]:
deltaTable.toDF().tail(5)

In [None]:
deltaTable.toDF().orderBy("id", ascending=False).collect()

In [19]:
# Working with Spark SQL
# https://spark.apache.org/docs/3.1.1/api/python/getting_started/quickstart.html#Working-with-SQL

df = deltaTable.toDF()
df.createOrReplaceTempView("tableA")

In [20]:
spark.sql("SELECT count(*) FROM tableA").show()

+--------+
|count(1)|
+--------+
|     114|
+--------+



In [21]:
spark.sql("DESCRIBE tableA").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|      id|   bigint|   null|
+--------+---------+-------+



In [22]:
spark.sql("SELECT * FROM tableA ORDER BY id DESC").show()

+---+
| id|
+---+
| 93|
| 92|
| 91|
| 90|
| 89|
| 88|
| 87|
| 86|
| 85|
| 84|
| 83|
| 82|
| 81|
| 80|
| 79|
| 78|
| 77|
| 76|
| 75|
| 74|
+---+
only showing top 20 rows

