# Unifying Structured Streaming with Batch Jobs using Delta Lake

Explore combining streaming and batch processing with a single pipeline.

- ingest streaming JSON data from disk and write it to a Delta Lake Table `/activity/Bronze`
- ingest dimension table data from disk and create a data frame.
- perform a Stream-Static Join on the streamed data to add additional geographic data
- transform and load the data, saving it out to our Delta Lake Table `/activity/Silver`
- summarize the data through aggregation into the Delta Lake Table `/activity/Gold/groupedCounts`
- materialize views of our gold table through streaming plots and static queries

-sandbox
## 1. Configure and set up relevant Delta Lake paths

These paths will serve as the file locations for our Delta Lake tables.

- Each streaming write has its own checkpoint directory.
- You cannot write out new Delta files within a repository that contains Delta files. Note that our hierarchy here isolates each Delta table into its own directory.

In [0]:
username = "cbdesh2013@gmail.com"
userhome = "dbfs:/user/" + username

# Set the user's name and home directory
spark.conf.set("com.databricks.training.username", username)
spark.conf.set("com.databricks.training.userhome", userhome)

In [0]:
activityPath = userhome + "/activity"

activityBronzePath = activityPath + "/Bronze"
activityBronzeCheckpoint = activityBronzePath + "/checkpoint"

activitySilverPath = activityPath + "/Silver"
activitySilverCheckpoint = activitySilverPath + "/checkpoint"

activityGoldPath = activityPath + "/Gold"
groupedCountPath = activityGoldPath + "/groupedCount"
groupedCountCheckpoint = groupedCountPath + "/checkpoint"

## 2. Reset Pipeline

In [0]:
dbutils.fs.rm(activityPath, True)

## 3. Configure Data Source

In [0]:
storeName = "functionsb8e5"
container = "data"
path = f"wasbs://{container}@{storeName}.blob.core.windows.net/"
spark.conf.set("fs.azure.account.key.chandradp203store.blob.core.windows.net", "1MEgUHpRy+yYx4hqZ6SuqM3GWRt6dMHDOIfRX5NAgz81VSc501W/snfZAqHqxEW112KKwaodCrT0zTtDj1xiUg==")

## 4. Define schema for the data set.

In [0]:
from pyspark.sql.types import StructField, StructType, LongType, StringType, DoubleType

schema = StructType([
  StructField("Arrival_Time",LongType()),
  StructField("Creation_Time",LongType()),
  StructField("Device",StringType()),
  StructField("Index",LongType()),
  StructField("Model",StringType()),
  StructField("User",StringType()),
  StructField("geolocation",StructType([
    StructField("city",StringType()),
    StructField("country",StringType())
  ])),
  StructField("gt",StringType()),
  StructField("id",LongType()),
  StructField("x",DoubleType()),
  StructField("y",DoubleType()),
  StructField("z",DoubleType())
])

## 5. Load the stream data and create dynamic data frame.
- Format is Json
- When schema is enforced, inference on schema is disabled.
- The option 'maxFilesPerTrigger' controls the amount of data which is consumed with each load from disk.  If omited it consumes all new data on disk since the last time the stream has processed.

In [0]:
rawEventsDF = (spark
  .readStream
  .format("json")
  .schema(schema)
  .option("maxFilesPerTrigger", 1)
  .load(path+"deltadata.json"))

## 6. The Delta Lake architecture.
The Bronze layer represents raw data.  It accomodates the data which directly comes from the data source before any kind of transformation.
The Silver layer represents transformed data. On cleaning, filtering, augmenting, joining the raw data, it lays Silver tables.
The Gold layer represents business level aggregation. The data in this layer is now ready for querying for visulization and reporting.

<img alt="Delta Lake Layers" title="Delta Lake Layers" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://databricks.com/wp-content/uploads/2021/07/Improving-Patient-Insights-with-Textual-ETL-blog-img-1.jpg"/>

## 7. Write the data from dynamic data frame to bronze check point in 'delta' format.
#### Output Modes
Notice, besides the "obvious" parameters, specify `outputMode`, which can take on these values
* `append`: add only new records to output sink
* `complete`: rewrite full output - applicable to aggregations operations
* `update`: At present, the `update` mode is not supported for streaming Delta jobs.

#### Important points to note.
* The 'checkpointLocation' is a structured streaming feature which stores the current state of your streaming job. Should your streaming job stops for some reason and you restart it, it will continue from where it left off.
* If you do not have a checkpoint directory, when the streaming job stops, you lose all state around your streaming job and upon restart, you start from scratch.

In [0]:
# Its a completely raw data being represented in Bronze Table.
(rawEventsDF
  .writeStream
  .format("delta")
  .option("checkpointLocation", activityBronzeCheckpoint)
  .outputMode("append") # Keep appending the stream data into the dynamic data frame.
  .start(activityBronzePath))

## 8. Load dimension data and create a static data frame
- Creating a static data frame from the parquet file.  The file format can be any valid one supported by Spark.

In [0]:
# A static data frame creating a kind of lookup table representing Countries uniquely identified using Country ID.
from pyspark.sql.functions import col

geoForLookupDF = (spark
  .read
  .format("parquet")
  .load(path + "GeoLookup.parquet")
  .select(col("EnglishShortName").alias("country"), col("alpha3Code").alias("countryCode3")))

## 9. Create Silver tables i.e. query tables or tables duely enriched using tranformations.

* Load earlier created bronze table.  Join it with dimention data frame to get details of country using common country code column.
* We do not need to specify a schema when loading Delta files: it is inferred from the metadata!
* The bronze table contains nested fields, as well as time data that has been encoded in non-standard unix time (Arrival_Time is encoded as milliseconds from epoch, while Creation_Time records nanoseconds between record creation and receipt).

In [0]:
# The perform transformations on the raw data.
from pyspark.sql.functions import from_unixtime

parsedEventsDF = (spark.readStream
  .format("delta")
  .load(activityBronzePath)
  .select(from_unixtime(col("Arrival_Time")/1000).alias("Arrival_Time").cast("timestamp"),
          (col("Creation_Time")/1E9).alias("Creation_Time").cast("timestamp"),
          col("Device"),
          col("Index"),
          col("Model"),
          col("User"),
          col("gt"),
          col("x"),
          col("y"),
          col("z"),
          col("geolocation.country").alias("country"),
          col("geolocation.city").alias("city"))
  .join(geoForLookupDF, ["country"], "left"))

## 10. Persist the transformed data into Silver Layer.

In [0]:
# Persist the transformed data into Silver layer.
(parsedEventsDF
  .writeStream
  .format("delta")
  .option("checkpointLocation", activitySilverCheckpoint)
  .outputMode("append")
  .start(activitySilverPath))

In [0]:
# Observe the content of Silver Directory
dbutils.fs.ls(activitySilverPath)

## 11. The active streams.  
There are two writes working as of now so are two active streams. If streaming data frames are displayed using display(), note there will be 4 active streams as the display() creates active stream to transfer data from layer to memory.

In [0]:
for s in spark.streams.active:
  print(s.id)

In [0]:
display(rawEventsDF)
display(parsedEventsDF)

In [0]:
for s in spark.streams.active:
  print(s.id)

## 12. Apply aggregation to create gold tables.
* The data consists of a total counts of all event, grouped by `hour`, `gt`, and `countryCode3`.
* Performing this aggregation allows us to reduce the total number of rows in our table from hundreds of thousands (or millions, once we've loaded our batch data) to dozens.
* we read a stream of data from `activitySilverPath` and write another stream to `activityGoldPath/groupedCount`.
* **V.IMP**: When using complete output mode, we rewrite the entire state of our table each time our logic runs. While this is ideal for calculating aggregates, we cannot read a stream from this directory, as Structured Streaming assumes data is only being appended in the upstream logic.

In [0]:
# Apply aggregation operations.
from pyspark.sql.functions import window, hour

(spark.readStream
  .format("delta")
  .load(activitySilverPath)
  .groupBy(window("Arrival_Time", "60 minute"),"gt", "countryCode3")
  .count()
  .withColumn("hour",hour(col("window.start")))
  .drop("window")
  .writeStream
  .format("delta")
  .option("checkpointLocation", groupedCountCheckpoint)
  .outputMode("complete")
  .start(groupedCountPath))

In [0]:
# Observe the contents of groupCountPath directory
dbutils.fs.ls(groupedCountPath)

##13. Create a table of Gold/aggregated data.
- The table 'grouped_count' created here becomes visible in Azure Databricks SQL to create a dashboard.

In [0]:
spark.sql("""
  DROP TABLE IF EXISTS grouped_count
""")
spark.sql("""
  CREATE TABLE grouped_count
  USING DELTA
  LOCATION '{}'
""".format(groupedCountPath))

## 14. Query a table and observe its visualization.
* The gold Delta table we have just registered will perform a static read of the current state of the data each time we run the following query.
* We will need to re-run this query each time we wish to update the data. Run the below query now, and then after your batch has finished processing.
* The state reflected in a query on a registered Delta table will always reflect the most recent valid state of the files.

In [0]:
%sql
SELECT * FROM grouped_count

gt,countryCode3,count,hour
stairsdown,IND,440,12
stairsdown,DEU,2542,13
bike,NGA,3204,11
sit,NGA,2970,11
,BRA,4110,14
,IND,685,13
stairsdown,NGA,2810,11
stairsdown,USA,2468,12
stairsdown,AUS,2461,13
stairsdown,IND,1752,13


## 15. Create a Temp View/Materialized View.
- We can not define a streaming plot or create a Temp View on Gold Layer as its output mode is 'Complete' and not 'Append'.
- Here, we create a temp view on Silver Layer.

In [0]:
(spark.readStream
  .format("delta")
  .load(activitySilverPath)
  .createOrReplaceTempView("query_table")
)

In [0]:
%sql
SELECT gt, HOUR(Arrival_Time) hour, COUNT(*) total_events
FROM query_table
GROUP BY gt, HOUR(Arrival_Time)
ORDER BY hour

## 16. Wrapping up
* Stop all the streams.

In [0]:
for s in spark.streams.active:
    s.stop()