### Processing Incremental Updates with Structured Streaming and Delta Lake
In this lab you'll apply your knowledge of structured streaming and Auto Loader to implement a simple multi-hop architecture.

#### 1.0. Import Shared Utilities and Data Files
Run the following cell to setup necessary variables and clear out past runs of this notebook. Note that re-executing this cell will allow you to start the lab over.

In [0]:
%run ./Includes/5.1-Lab-setup

Dropping the database "dbacademy_ttd6as_virginia_edu_dewd_5_1"
Removing the working directory "dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1"

Creating the database "dbacademy_ttd6as_virginia_edu_dewd_5_1"

Predefined Paths:
  DA.paths.working_dir: dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1
  DA.paths.user_db:     dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/5_1.db
  DA.paths.checkpoints: dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/_checkpoints

Predefined tables in dbacademy_ttd6as_virginia_edu_dewd_5_1:
  -none-

Setup completed in 7 seconds


#### 2.0. Bronze Table: Ingest data
This lab uses a collection of customer-related CSV data from DBFS found in */databricks-datasets/retail-org/customers/*.  Read this data using Auto Loader using its schema inference (use **`customersCheckpointPath`** to store the schema info). Stream the raw data to a Delta table called **`bronze`**.

In [0]:
# TODO:
customers_checkpoint_path = f"{DA.paths.checkpoints}/customers" # defining where the csv is located 

query = (spark.readStream
              .format("cloudFiles")
              .option("cloudFiles.format", "csv")
              .option("cloudFiles.schemaLocation", customers_checkpoint_path)
              .load("/databricks-datasets/retail-org/customers/")
              .writeStream
              # stream raw data to delta table called bronze
              .format("delta")
              .option("checkpointLocation", customers_checkpoint_path)
              .outputMode("append") 
              .table("bronze"))


In [0]:
DA.block_until_stream_is_ready(query)

The stream has processed 7 batchs


##### 2.1. Create a Streaming Temporary View
Create a streaming temporary view into the bronze table so that we can perform transformations using SQL.

In [0]:
(spark
  .readStream
  .table("bronze")
  .createOrReplaceTempView("bronze_temp"))

In [0]:
%sql
SELECT * FROM bronze_temp

customer_id,tax_id,tax_code,customer_name,state,city,postcode,street,number,unit,region,district,lon,lat,ship_to_address,valid_from,valid_to,units_purchased,loyalty_segment,_rescued_data
11123757,,,"SMITH, SHIRLEY",IN,BREMEN,46506.0,N CENTER ST,521.0,,Indiana,50.0,-86.1465825,41.4507625,"IN, 46506.0, N CENTER ST, 521.0",1532824233,1548137353.0,34.0,3,
30585978,,,"STEPHENS, GERALDINE M",OR,ADDRESS,0,NO SITUS,,,,,-122.1055158,45.374317,"OR, 0, NO SITUS, nan",1523100473,,18.0,3,
349822,,,"GUZMAN, CARMEN",VA,VIENNA,22181,HILL RD,2860,,VA,,-77.2941261,38.88303270000001,"VA, 22181, HILL RD, 2860",1522922493,,5.0,0,
27652636,,,"HASSETT, PATRICK J",WI,VILLAGE OF NASHOTAH,53058.0,IVY LANE,W333N 5591,,,,-88.40951700000002,43.1213789,"WI, 53058.0, IVY LANE, W333N 5591",1531834357,1558052195.0,7.0,1,
14437343,,,"HENTZ, DIANA L",OH,COLUMBUS,43228.0,ALLIANCE WAY,5706,,OH,FRA,-83.158438,39.97821810000001,"OH, 43228.0, ALLIANCE WAY, 5706",1517227530,,0.0,0,
20441596,,,"TIRADO, MARCO A",NY,Otselic,13072,County Road 16,2792,,NY,Chenango,-75.7505808,42.7172722,"NY, 13072, County Road 16, 2792",1519335250,,24.0,3,
5945686,,,"SKORA, BRIAN S",MI,,48205.0,E 8 MILE RD,16414.0,,,,-82.950874,42.4499233,"MI, 48205.0, E 8 MILE RD, 16414.0",1518988242,,7.0,1,
5385771,,,"SLAWEK, DEAN J",PA,,19147-3204,FITZWATER ST,328,,,,-75.14920550000002,39.9389473,"PA, 19147-3204, FITZWATER ST, 328",1518239268,,18.0,3,
1427940,,,"REAVES, LIONEL C",VA,HOT SPRINGS,24445.0,HOT SPRINGS RD,6419.0,,,,-79.90497859999998,37.8949737,"VA, 24445.0, HOT SPRINGS RD, 6419.0",1529087690,,10.0,2,
10457387,,,"BONGIOVANNI, KELLY M",IN,VINCENNES,47591,JERRY ST,2006.0,,Indiana,42.0,-87.519002,38.662178,"IN, 47591, JERRY ST, 2006.0",1535887733,,9.0,2,


##### 2.2. Clean and Enhance the Data
Use the CTAS syntax to define a new streaming view called **`bronze_enhanced_temp`** that does the following:
* Skips records with a null **`postcode`** (set to zero)
* Inserts a column called **`receipt_time`** containing a current timestamp
* Inserts a column called **`source_file`** containing the input filename

In [0]:
%sql
-- TODO:
CREATE OR REPLACE TEMPORARY VIEW bronze_enhanced_temp AS
SELECT
  *, current_timestamp() receipt_time, input_file_name() source_file
  FROM bronze_temp
  WHERE postcode > 0

In [0]:
%sql
SELECT * FROM bronze_enhanced_temp

customer_id,tax_id,tax_code,customer_name,state,city,postcode,street,number,unit,region,district,lon,lat,ship_to_address,valid_from,valid_to,units_purchased,loyalty_segment,_rescued_data,receipt_time,source_file
11123757,,,"SMITH, SHIRLEY",IN,BREMEN,46506.0,N CENTER ST,521.0,,Indiana,50.0,-86.1465825,41.4507625,"IN, 46506.0, N CENTER ST, 521.0",1532824233,1548137353.0,34.0,3,,2023-04-04T15:11:11.804+0000,dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/5_1.db/bronze/part-00000-d4c4bb6d-5f3d-43ee-b610-bf95c4dc407b-c000.snappy.parquet
349822,,,"GUZMAN, CARMEN",VA,VIENNA,22181.0,HILL RD,2860,,VA,,-77.2941261,38.88303270000001,"VA, 22181, HILL RD, 2860",1522922493,,5.0,0,,2023-04-04T15:11:11.804+0000,dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/5_1.db/bronze/part-00000-d4c4bb6d-5f3d-43ee-b610-bf95c4dc407b-c000.snappy.parquet
27652636,,,"HASSETT, PATRICK J",WI,VILLAGE OF NASHOTAH,53058.0,IVY LANE,W333N 5591,,,,-88.40951700000002,43.1213789,"WI, 53058.0, IVY LANE, W333N 5591",1531834357,1558052195.0,7.0,1,,2023-04-04T15:11:11.804+0000,dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/5_1.db/bronze/part-00000-d4c4bb6d-5f3d-43ee-b610-bf95c4dc407b-c000.snappy.parquet
14437343,,,"HENTZ, DIANA L",OH,COLUMBUS,43228.0,ALLIANCE WAY,5706,,OH,FRA,-83.158438,39.97821810000001,"OH, 43228.0, ALLIANCE WAY, 5706",1517227530,,0.0,0,,2023-04-04T15:11:11.804+0000,dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/5_1.db/bronze/part-00000-d4c4bb6d-5f3d-43ee-b610-bf95c4dc407b-c000.snappy.parquet
20441596,,,"TIRADO, MARCO A",NY,Otselic,13072.0,County Road 16,2792,,NY,Chenango,-75.7505808,42.7172722,"NY, 13072, County Road 16, 2792",1519335250,,24.0,3,,2023-04-04T15:11:11.804+0000,dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/5_1.db/bronze/part-00000-d4c4bb6d-5f3d-43ee-b610-bf95c4dc407b-c000.snappy.parquet
5945686,,,"SKORA, BRIAN S",MI,,48205.0,E 8 MILE RD,16414.0,,,,-82.950874,42.4499233,"MI, 48205.0, E 8 MILE RD, 16414.0",1518988242,,7.0,1,,2023-04-04T15:11:11.804+0000,dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/5_1.db/bronze/part-00000-d4c4bb6d-5f3d-43ee-b610-bf95c4dc407b-c000.snappy.parquet
1427940,,,"REAVES, LIONEL C",VA,HOT SPRINGS,24445.0,HOT SPRINGS RD,6419.0,,,,-79.90497859999998,37.8949737,"VA, 24445.0, HOT SPRINGS RD, 6419.0",1529087690,,10.0,2,,2023-04-04T15:11:11.804+0000,dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/5_1.db/bronze/part-00000-d4c4bb6d-5f3d-43ee-b610-bf95c4dc407b-c000.snappy.parquet
10457387,,,"BONGIOVANNI, KELLY M",IN,VINCENNES,47591.0,JERRY ST,2006.0,,Indiana,42.0,-87.519002,38.662178,"IN, 47591, JERRY ST, 2006.0",1535887733,,9.0,2,,2023-04-04T15:11:11.804+0000,dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/5_1.db/bronze/part-00000-d4c4bb6d-5f3d-43ee-b610-bf95c4dc407b-c000.snappy.parquet
19154815,,,"VERTA, RAYMOND J",OH,CANTON,44710.0,WERTZ AVE SW,1224,,OH,STA,-81.40792359999998,40.7893091,"OH, 44710.0, WERTZ AVE SW, 1224",1544554334,,27.0,3,,2023-04-04T15:11:11.804+0000,dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/5_1.db/bronze/part-00000-d4c4bb6d-5f3d-43ee-b610-bf95c4dc407b-c000.snappy.parquet
2469887,,,"BRYANT, KATHRYN I",MA,WHITMAN,2382.0,JACOB LANE,11,,MA,PLYMOUTH,-70.942509,42.0936276,"MA, 2382.0, JACOB LANE, 11",1516971623,,7.0,1,,2023-04-04T15:11:11.804+0000,dbfs:/user/ttd6as@virginia.edu/dbacademy/dewd/5.1/5_1.db/bronze/part-00000-d4c4bb6d-5f3d-43ee-b610-bf95c4dc407b-c000.snappy.parquet


#### 3.0. Silver Table
Stream the data from **`bronze_enhanced_temp`** to a table called **`silver`**.

In [0]:
# TODO:
silver_checkpoint_path = f"{DA.paths.checkpoints}/silver"

query = (spark.table("bronze_enhanced_temp")
              .writeStream
              .format("delta")
              .option("checkpointLocation", silver_checkpoint_path)
              .outputMode("append")
              .table("silver"))

In [0]:
DA.block_until_stream_is_ready(query)

##### 3.1. Create a Streaming Temporary View
Create another streaming temporary view for the silver table so that we can perform business-level queries using SQL.

In [0]:
(spark
  .readStream
  .table("silver")
  .createOrReplaceTempView("silver_temp"))

#### 4.0. Gold Table
Use the CTAS syntax to define a new streaming view called **`customer_count_temp`** that counts customers per state.

In [0]:
%sql
-- TODO:
CREATE OR REPLACE TEMPORARY VIEW customer_count_by_state_temp AS
  SELECT state, count(state) AS customer_count
  FROM silver_temp
  GROUP BY state

Finally, stream the data from the **`customer_count_by_state_temp`** view to a Delta table called **`gold_customer_count_by_state`**.

In [0]:
# TODO:
gold_checkpoint_path = f"{DA.paths.checkpoints}/customer_counts"

query = (spark.table("customer_count_by_state_temp")
              .writeStream
              .format("delta")
              .option("checkpointLocation", silver_checkpoint_path)
              .outputMode("complete")
              .table("gold_customer_count_by_state"))


In [0]:
DA.block_until_stream_is_ready(query)

#### 5.0. Query the Results
Query the **`gold_customer_count_by_state`** table (this will not be a streaming query). Plot the results as a bar graph and also using the map plot.

In [0]:
%sql
SELECT * FROM gold_customer_count_by_state

#### 6.0. Clean Up
Run the following cell to remove the database and all data associated with this lab.

In [0]:
DA.cleanup()

By completing this lab, you should now feel comfortable:
* Using PySpark to configure Auto Loader for incremental data ingestion
* Using Spark SQL to aggregate streaming data
* Streaming data to a Delta table