### Processing Incremental Updates with Structured Streaming and Delta Lake
In this lab you'll apply your knowledge of structured streaming and Auto Loader to implement a simple multi-hop architecture.

#### 1.0. Import Shared Utilities and Data Files
Run the following cell to setup necessary variables and clear out past runs of this notebook. Note that re-executing this cell will allow you to start the lab over.

In [0]:
%run ./Includes/5.1-Lab-setup

Dropping the database "dbacademy_dxr9fv_virginia_edu_dewd_5_1"
Removing the working directory "dbfs:/user/dxr9fv@virginia.edu/dbacademy/dewd/5.1"

Creating the database "dbacademy_dxr9fv_virginia_edu_dewd_5_1"

Predefined Paths:
  DA.paths.working_dir: dbfs:/user/dxr9fv@virginia.edu/dbacademy/dewd/5.1
  DA.paths.user_db:     dbfs:/user/dxr9fv@virginia.edu/dbacademy/dewd/5.1/5_1.db
  DA.paths.checkpoints: dbfs:/user/dxr9fv@virginia.edu/dbacademy/dewd/5.1/_checkpoints

Predefined tables in dbacademy_dxr9fv_virginia_edu_dewd_5_1:
  -none-

Setup completed in 16 seconds



#### 2.0. Bronze Table: Ingest data
This lab uses a collection of customer-related CSV data from DBFS found in *`dbfs:/FileStore/lab_data/retail-org/customers/`*.
- Read this data using Auto Loader using its schema inference (use **`DA.paths.checkpoints`** to store the schema info in a dedicated folder for **`customers`**).
- Stream the raw data to a Delta table called **`bronze`** using the **`append`** output mode.

In [0]:
# TODO:
customers_checkpoint_path = f"{DA.paths.checkpoints}/customers"
query = (spark.readStream
        .format("cloudFiles") 
        .option("cloudFiles.format", "csv") 
        .option("cloudFiles.schemaLocation", customers_checkpoint_path)
        .load("dbfs:/FileStore/lab_data/retail-org/customers/")
        .writeStream
        .format("delta")
        .option("checkpointLocation", customers_checkpoint_path)
        .outputMode("append")
        .table("bronze"))


In [0]:
DA.block_until_stream_is_ready(query)

The stream has processed 3 batchs


##### 2.1. Create a Streaming Temporary View
Create a streaming temporary view named **`bronze_temp`** into the **`bronze`** table so we can perform transformations using SQL.

In [0]:
(spark
  .readStream
  .table("bronze")
  .createOrReplaceTempView("bronze_temp"))

##### 2.2. Clean and Enhance the Data
Use the CTAS syntax to define a new streaming view called **`bronze_enhanced_temp`** that does the following:
* Skips records with a null **`postcode`** (set to zero)
* Inserts a column called **`receipt_time`** containing a current timestamp
* Inserts a column called **`source_file`** containing the input filename

In [0]:
%sql
-- TODO:
CREATE OR REPLACE TEMPORARY VIEW bronze_enhanced_temp AS
SELECT
  *, current_timestamp() receipt_time, input_file_name() source_file
  FROM bronze_temp
  WHERE postcode > 0

#### 3.0. Silver Table
Stream the data from **`bronze_enhanced_temp`** to a Delta table named **`silver`** using the **`append`** output mode. Use **`DA.paths.checkpoints`** and a dedicated folder for **`silver`** as the checkpoint path).

In [0]:
# TODO:
silver_checkpoint_path = f"{DA.paths.checkpoints}/silver"

query = (spark.table("bronze_enhanced_temp")
         .writeStream
         .format("delta")
         .option("checkpointLocation", silver_checkpoint_path)
         .outputMode("append")
         .table("silver"))

In [0]:
DA.block_until_stream_is_ready(query)

The stream has processed 2 batchs


##### 3.1. Create a Streaming Temporary View
Create another streaming temporary view named **`silver_temp`** for the **`silver`** table so we can perform business-level queries using SQL.

In [0]:
(spark
  .readStream
  .table("silver")
  .createOrReplaceTempView("silver_temp"))


#### 4.0. Gold Table
Use the CTAS syntax to define a new streaming view called **`customer_count_by_state_temp`** that counts customers per state.

In [0]:
%sql
-- TODO:
CREATE OR REPLACE TEMPORARY VIEW customer_count_by_state_temp AS
SELECT state
    , count(state) AS customer_count
    FROM silver_temp
    GROUP BY state


Finally, stream the data from the **`customer_count_by_state_temp`** view to a Delta table called **`gold_customer_count_by_state`**. Remember to use the **`complete`** output mode because aggregations like **`count()`** and sorting cannot operate on *unbounded* datasets.  Also, use **`DA.paths.checkpoints`** and a dedicated folder for **`customer_counts`** as the checkpoint path).

In [0]:
# TODO:
customers_count_checkpoint_path = f"{DA.paths.checkpoints}/customers_counts"
query = (spark.table("customer_count_by_state_temp")
         .writeStream
         .format("delta")
         .option("checkpointLocation", customers_count_checkpoint_path)
         .outputMode("complete")
         .table("gold_customer_count_by_state"))

In [0]:
DA.block_until_stream_is_ready(query)

The stream has processed 15 batchs


#### 5.0. Query the Results
Query the **`gold_customer_count_by_state`** table (this will not be a streaming query).

In [0]:
%sql
SELECT * FROM gold_customer_count_by_state

state,customer_count
MT,203
TX,564
NV,40
AR,11
NH,2
WI,938
VT,157
HI,65
UT,416
AL,65


#### 6.0. Clean Up
Run the following cell to remove the database and all data associated with this lab.

In [0]:
DA.cleanup()

By completing this lab, you should now feel comfortable:
* Using PySpark to configure Auto Loader for incremental data ingestion
* Using Spark SQL to aggregate streaming data
* Streaming data to a Delta table