### Processing Incremental Updates with Structured Streaming and Delta Lake
In this lab you'll apply your knowledge of structured streaming and Auto Loader to implement a simple multi-hop architecture.

#### 1.0. Import Shared Utilities and Data Files
Run the following cell to setup necessary variables and clear out past runs of this notebook. Note that re-executing this cell will allow you to start the lab over.

In [0]:
%run ./Includes/5.1-Lab-setup

Dropping the database "dbacademy_zta9cq_virginia_edu_dewd_5_1"
Removing the working directory "dbfs:/user/zta9cq@virginia.edu/dbacademy/dewd/5.1"

Creating the database "dbacademy_zta9cq_virginia_edu_dewd_5_1"

Predefined Paths:
  DA.paths.working_dir: dbfs:/user/zta9cq@virginia.edu/dbacademy/dewd/5.1
  DA.paths.user_db:     dbfs:/user/zta9cq@virginia.edu/dbacademy/dewd/5.1/5_1.db
  DA.paths.checkpoints: dbfs:/user/zta9cq@virginia.edu/dbacademy/dewd/5.1/_checkpoints

Predefined tables in dbacademy_zta9cq_virginia_edu_dewd_5_1:
  -none-

Setup completed in 7 seconds


#### 2.0. Bronze Table: Ingest data
This lab uses a collection of customer-related CSV data from DBFS found in */databricks-datasets/retail-org/customers/*.  Read this data using Auto Loader using its schema inference (use **`customersCheckpointPath`** to store the schema info). Stream the raw data to a Delta table called **`bronze`**.

In [0]:
# TODO: 
customersCheckpointPath = f"{DA.paths.checkpoints}/customers"

query = (spark.readStream
              .format("cloudFiles")
              .option("cloudFiles.format", "csv")
              .option("cloudFiles.schemaLocation", customersCheckpointPath)
              .load("/databricks-datasets/retail-org/customers/")
              .writeStream
              .format("delta")
              .option("checkpointLocation", customersCheckpointPath)
              .outputMode("append")
              .table("bronze"))

In [0]:
DA.block_until_stream_is_ready(query)

The stream has processed 20 batchs


##### 2.1. Create a Streaming Temporary View
Create a streaming temporary view into the bronze table so that we can perform transformations using SQL.

In [0]:
(spark
  .readStream
  .table("bronze")
  .createOrReplaceTempView("bronze_temp"))

##### 2.2. Clean and Enhance the Data
Use the CTAS syntax to define a new streaming view called **`bronze_enhanced_temp`** that does the following:
* Skips records with a null **`postcode`** (set to zero)
* Inserts a column called **`receipt_time`** containing a current timestamp
* Inserts a column called **`source_file`** containing the input filename

In [0]:
%sql
-- TODO:
CREATE OR REPLACE TEMPORARY VIEW bronze_enhanced_temp AS
SELECT *
  , current_timestamp() receipt_time
  , input_file_name() source_file
  FROM bronze_temp
  WHERE postcode > 0

#### 3.0. Silver Table
Stream the data from **`bronze_enhanced_temp`** to a table called **`silver`**.

In [0]:
# TODO:

silverCheckpointPath= f"{DA.paths.checkpoints}/silver"
query =(spark.table("bronze_enhanced_temp")
             .writeStream
             .format("delta")
             .option("checkpointLocation", silverCheckpointPath)
             .outputMode("append")
             .table("silver")
)

In [0]:
DA.block_until_stream_is_ready(query)

The stream has processed 13 batchs


##### 3.1. Create a Streaming Temporary View
Create another streaming temporary view for the silver table so that we can perform business-level queries using SQL.

In [0]:
(spark
  .readStream
  .table("silver")
  .createOrReplaceTempView("silver_temp"))

#### 4.0. Gold Table
Use the CTAS syntax to define a new streaming view called **`customer_count_temp`** that counts customers per state.

In [0]:
%sql
-- TODO:
CREATE OR REPLACE TEMPORARY VIEW customer_count_temp AS
  SELECT state
  , count(state) AS customer_count
  FROM silver_temp
  GROUP BY state


Finally, stream the data from the **`customer_count_by_state_temp`** view to a Delta table called **`gold_customer_count_by_state`**.

In [0]:
# TODO:
customerCountCheckpointPath = f"{DA.paths.checkpoints}/customers_counts"

query = (spark.table("customer_count_temp")
              .writeStream
              .format("delta")
              .option("checkpointLocation", customerCountCheckpointPath)
              .outputMode("complete")
              .table("gold_customer_count_by_state")
        )

In [0]:
DA.block_until_stream_is_ready(query)

The stream has processed 10 batchs


#### 5.0. Query the Results
Query the **`gold_customer_count_by_state`** table (this will not be a streaming query). Plot the results as a bar graph and also using the map plot.

In [0]:
%sql
SELECT * FROM gold_customer_count_by_state

state,customer_count
MT,203
TX,564
NV,40
AR,11
NH,2
WI,938
VT,157
HI,65
UT,416
AL,65


Output can only be rendered in Databricks

#### 6.0. Clean Up
Run the following cell to remove the database and all data associated with this lab.

In [0]:
DA.cleanup()

By completing this lab, you should now feel comfortable:
* Using PySpark to configure Auto Loader for incremental data ingestion
* Using Spark SQL to aggregate streaming data
* Streaming data to a Delta table