# **Delta Lake**: Stream Processing with Spark and Delta Lake

In [None]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

In [None]:
%%sh

mkdir -p /opt/workspace/data/loans/
curl -o /opt/workspace/data/loans/SAISEU19-loan-risks.snappy.parquet https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("pyspark-delta-lake-notebook") \
        .master("spark://spark-master:7077") \
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.executor.memory", "512m") \
        .getOrCreate()

In [None]:
loans_data = spark.read.format("parquet").load("data/loans/SAISEU19-loan-risks.snappy.parquet")

In [None]:
loans_data.count()

In [None]:
loans_data.show(n=5)

In [None]:
loans_data.createOrReplaceTempView("loans_parquet")

In [None]:
spark.sql("select * from loans_parquet").show()

In [None]:
spark.sql("select count(*) from loans_parquet").show()

In [None]:
delta_path = "data/loans/loans_delta"

loans_data.write.format("delta").mode("overwrite").save(delta_path)

In [None]:
spark.read.format("delta").load(delta_path).createOrReplaceTempView("loans_delta")

In [None]:
spark.sql("select count(*) from loans_delta").show()

In [None]:
spark.sql("select * from loans_delta order by loan_id desc").show()

In [None]:
import random

def random_checkpoint_dir(): 
    return "/tmp/delta_demo/chkpt/%s" % str(random.randint(0, 10000))

def random_state(states = ["CA", "TX", "NY", "IA"]):
    return str(random.choice(states))

def generate_and_append_data_stream_fixed(table_format, table_path):
    stream_data = spark.readStream.format("rate").option("rowsPerSecond", 50).load() \
        .withColumn("loan_id", 10000 + col("value")) \
        .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) \
        .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) \
        .withColumn("addr_state", lit(random_state())) \
        .select("loan_id", "funded_amnt", "paid_amnt", "addr_state")
    query = stream_data.writeStream \
        .format(table_format) \
        .option("checkpointLocation", random_checkpoint_dir()) \
        .trigger(processingTime="10 seconds") \
        .start(table_path)
    return query

In [None]:
stream_query_1 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)
stream_query_2 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)

In [None]:
spark.streams.active

In [None]:
spark.sql("select count(*) from loans_delta").show()

In [None]:
import shutil

def stop_all_streams():
    # Stop all the streams
    print("Stopping all streams...")
    for s in spark.streams.active:
        s.stop()
    print("Stopped all streams.")
    print("Deleting checkpoints...")  
    shutil.rmtree("/tmp/delta_demo/chkpt/", True)
    print("Deleted checkpoints.")

stop_all_streams()

In [None]:
spark.stop()