In [None]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

upload_path = 'abfss://<adls-container>@<adls-account>.dfs.core.windows.net/iowa_population/'
checkpoint_path = 'abfss://<adls-container>@<adls-account>.dfs.core.windows.net/iowa_population_checkpoint/'
write_path = 'abfss://<adls-container>@<adls-account>.dfs.core.windows.net/iowa_population_stream_write/'

iowa_population_schema = StructType([ \
    StructField("County", StringType(), True), \
    StructField("Population", IntegerType(), True) \
  ])

In [None]:
iowa_population_stream = spark \
  .readStream \
  .format('cloudFiles') \
  .option('cloudFiles.format', 'csv') \
  .option('header', 'true') \
  .option('maxFilesPerTrigger', 10) \
  .schema(iowa_population_schema) \
  .load(upload_path) \
  .withColumn("event_timestamp", F.current_timestamp()) \
  .withColumn("event_date", F.current_date())

In [None]:
df_data = [("Adair County, Iowa","USA", 5),
    ("Appanoose County, Iowa","USA", 6),
    ("Audubon County, Iowa","USA", 6),
    ("Benton County, Iowa","USA", 5),
    ("Black Hawk County, Iowa","USA", 7),
    ("Boone County, Iowa","USA", 7),
    ("Black Hawk County, Iowa","USA", 7),
    ("Clarke County, Iowa","USA", 7),
    ("Clay County, Iowa","USA", 6),
    ("Clayton County, Iowa","USA", 6),
    ("Clinton County, Iowa","USA", 5),
    ("Crawford County, Iowa","USA", 5)
  ]

df_schema = StructType([ \
    StructField("County",StringType(),True), \
    StructField("Country",StringType(),True), \
    StructField("Group",IntegerType(),True) \
  ])
 
df_country = spark.createDataFrame(data=df_data,schema=df_schema)

In [None]:
iowa_population_stream \
  .join(df_country, iowa_population_stream.County==df_country.County, "leftouter") \
  .drop(df_country.County) \
  .writeStream \
  .format('delta') \
  .option('checkpointLocation', checkpoint_path) \
  .outputMode("append") \
  .partitionBy("event_date") \
  .start(write_path)

In [None]:
delta_loaction_data = spark.sql(
    'SELECT * FROM delta.`abfss://core-files@eightfiveadls.dfs.core.windows.net/iowa_population_stream_write/`'
)
display(delta_loaction_data.orderBy(F.col("Group").desc()))

In [None]:
event_window = \
  iowa_population_stream.groupBy( \
      iowa_population_stream.event_date, \
      F.window("event_timestamp", "10 seconds")) \
  .sum("Population") \
  .withColumnRenamed("sum(Population)", "sum_population")

event_window \
  .writeStream \
  .format("memory") \
  .queryName("iowa_population_query_memory") \
  .outputMode("complete") \
  .start()

In [None]:
%sql

SELECT 
  event_date, 
  window.start, 
  window.end, 
  sum_population 
FROM iowa_population_query_memory