In [0]:
import random
from pyspark.sql.functions import *
from pyspark.sql.types import *

delta_table_path = "/mnt/filesystem/stream_batch_unification_demo/loans_delta/"

def random_checkpoint_dir(): 
  return "/mnt/filesystem/stream_batch_unification_demo/chkpt/%s" % str(random.randint(0, 10000))

# Generate a stream of randomly generated load data and append to the parquet table
def generate_and_append_data_stream(table_format, table_path):
    
  stream_data = spark.readStream.format("rate").option("rowsPerSecond", 50).load() \
    .withColumn("loan_id", 10000 + col("value")) \
    .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) \
    .select("loan_id", "funded_amnt")

  query = stream_data.writeStream \
    .format(table_format) \
    .option("checkpointLocation", random_checkpoint_dir()) \
    .trigger(processingTime="10 seconds") \
    .start(delta_table_path)

  return query


# Function to stop all streaming queries 
def stop_all_streams():
  # Stop all the streams
  print("Stopping all streams")
  for s in spark.streams.active:
    s.stop()
  print("Stopped all streams")
  print("Deleting checkpoints")  
  dbutils.fs.rm("/mnt/filesystem/stream_batch_unification_demo/chkpt/", True)
  print("Deleted checkpoints")


In [0]:
stream_query_1 = generate_and_append_data_stream(table_format = "delta", table_path = delta_table_path)

In [0]:
stream_query_2 = generate_and_append_data_stream(table_format = "delta", table_path = delta_table_path)

In [0]:
%sql
SELECT * FROM delta.`/mnt/filesystem/stream_batch_unification_demo/loans_delta/`

In [0]:
%sql
SELECT count(*) FROM delta.`/mnt/filesystem/stream_batch_unification_demo/loans_delta/`

In [0]:
%sh

ls /dbfs/mnt/filesystem/stream_batch_unification_demo/loans_delta/ | wc -l

In [0]:
%sql
OPTIMIZE delta.`/mnt/filesystem/stream_batch_unification_demo/loans_delta/`

In [0]:
stop_all_streams()

In [0]:
%fs

rm -r /mnt/filesystem/stream_batch_unification_demo/loans_delta/