In [1]:
# Install Additional Python Libraries
!pip install -r requirements.txt

In [2]:
# Need postgres
# https://mvnrepository.com/artifact/org.postgresql/postgresql
from spark_libs import spark_submit
packages = ["com.databricks:spark-csv_2.11:1.5.0", 
            "org.postgresql:postgresql:42.2.9",
            "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1",
            "io.delta:delta-core_2.11:0.4.0"]
spark_submit(packages=packages)

Adding environment variable `PYSPARK_SUBMIT_ARGS`
--packages com.databricks:spark-csv_2.11:1.5.0,org.postgresql:postgresql:42.2.9,org.mongodb.spark:mongo-spark-connector_2.11:2.4.1,io.delta:delta-core_2.11:0.4.0 pyspark-shell


In [4]:
from pyspark.sql import SparkSession

In [5]:
# get or create Spark session

app_name = "spark-structured-streaming"
spark = SparkSession.builder.appName(app_name).getOrCreate()

In [6]:
from pyspark import SparkFiles
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from delta.tables import *
import random
import shutil

In [7]:
# Clear previous delta-tables

files = ["delta/delta-table1", "delta/delta-table2"]
for i in files:
    try:
        shutil.rmtree(i)
    except:
        pass

In [None]:
data = spark.range(8)
data = data.withColumn("value", data.id + random.randint(0, 5000))
data.write.format("delta").save(files[0])


In [None]:
# Stream writes to the table

print("####### Streaming write ######")
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint")\
    .start(files[1])
stream.awaitTermination(10)
stream.stop()

In [None]:
# Stream reads from a table

print("##### Reading from stream ######")
stream2 = spark.readStream.format("delta").load(files[1])\
    .writeStream\
    .format("console")\
    .start()
stream2.awaitTermination(10)
stream2.stop()

In [None]:
# Streaming aggregates in Update mode

print("####### Streaming upgrades in update mode ########")


# Function to upsert microBatchOutputDF into Delta Lake table using merge
def upsertToDelta(microBatchOutputDF, batchId):
    t = deltaTable.alias("t").merge(microBatchOutputDF.alias("s"), "s.id = t.id")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()


streamingAggregatesDF = spark.readStream.format("rate").load()\
    .withColumn("id", F.col("value") % 10)\
    .drop("timestamp")

In [None]:
# Write the output of a streaming aggregation query into Delta Lake table

deltaTable = DeltaTable.forPath(spark, files[0])
print("#############  Original Delta Table ###############")
deltaTable.toDF().show()
stream3 = streamingAggregatesDF.writeStream\
    .format("delta") \
    .foreachBatch(upsertToDelta) \
    .outputMode("update") \
    .start()
stream3.awaitTermination(10)
stream3.stop()
print("########### DeltaTable after streaming upsert #########")
deltaTable.toDF().show()