# Exercise #4 - Streaming Orders

With our four historical datasets properly loaded, we can now begin to process the "current" orders.

In this case, the new "system" is landing one JSON file per order into cloud storage.

We can process these JSON files as a stream of orders under the assumption that new orders are continually added to this dataset.

In order to keep this project simple, we have reduced the "stream" of orders to just the first few hours of 2020 and will be throttling that stream to only one file per iteration.

This exercise is broken up into 3 steps:
* Exercise 4.A - Use Database
* Exercise 4.B - Stream-Append Orders
* Exercise 4.C - Stream-Append Line Items

## Some Friendly Advice...

Each record is a JSON object with roughly the following structure:

* **`customerID`**
* **`orderId`**
* **`products`**
  * array
    * **`productId`**
    * **`quantity`**
    * **`soldPrice`**
* **`salesRepId`**
* **`shippingAddress`**
  * **`address`**
  * **`attention`**
  * **`city`**
  * **`state`**
  * **`zip`**
* **`submittedAt`**

As you ingest this data, it will need to be transformed to match the existing **`orders`** table's schema and the **`line_items`** table's schema.

Before attempting to ingest the data as a stream, we highly recomend that you start with a static **`DataFrame`** so that you can iron out the various kinks:
* Renaming and flattening columns
* Exploding the products array
* Parsing the **`submittedAt`** column into a **`timestamp`**
* Conforming to the **`orders`** and **`line_items`** schemas - because these are Delta tables, appending to them will fail if the schemas are not correct

Furthermore, creating a stream from JSON files will first require you to specify the schema - you can "cheat" and infer that schema from some of the JSON files before starting the stream.

In [None]:
user_db = "learning_db"
orders_table = "orders"
products_table = "products"
line_items_table = "line_items"
stream_path = "data/orders/stream"
orders_checkpoint_path = "checkpoint/orders"
line_items_checkpoint_path = "checkpoint/line_items"

In [None]:
# reset env
import os
import shutil

if not os.path.exists("checkpoint"):
    os.mkdir("checkpoint")

if os.path.exists(orders_checkpoint_path):
    shutil.rmtree(orders_checkpoint_path, True)

if os.path.exists(line_items_checkpoint_path):
    shutil.rmtree(line_items_checkpoint_path, True)

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #4.A - Use Database</h2>

Each notebook uses a different Spark session and will initially use the **`default`** database.

As in the previous exercise, we can avoid contention to commonly named tables by using our user-specific database.

**In this step you will need to:**
* Use the database identified by the variable **`user_db`** so that any tables created in this notebook are **NOT** added to the **`default`** database

In [None]:
# TODO
# Use this cell to complete your solution

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #4.B - Stream-Append Orders</h2>

Every JSON file ingested by our stream representes one order and the enumerated list of products purchased in that order.

Our goal is simple, ingest the data, transform it as required by the **`orders`** table's schema, and append these new records to our existing table.

**In this step you will need to:**

* Ingest the stream of JSON files:
  * Start a stream from the path identified by **`stream_path`**.
  * Using the **`maxFilesPerTrigger`** option, throttle the stream to process only one file per iteration.
  * Add the ingest meta data (same as with our other datasets):
    * **`ingested_at`**:**`timestamp`**
    * **`ingest_file_name`**:**`string`**
  * Properly parse the **`submitted_at`**  as a valid **`timestamp`**
  * Add the column **`submitted_yyyy_mm`** usinge the format "**yyyy-MM**"
  * Make any other changes required to the column names and data types so that they conform to the **`orders`** table's schema

* Write the stream to a Delta **table**.:
  * The table's format should be "**delta**"
  * Partition the data by the column **`submitted_yyyy_mm`**
  * Records must be appended to the table identified by the variable **`orders_table`**
  * The query must be named the same as the table, identified by the variable **`orders_table`**
  * The query must use the checkpoint location identified by the variable **`orders_checkpoint_path`**

### Implement Exercise #4.B

Implement your solution in the following cell:

In [None]:
# TODO
# Use this cell to complete your solution

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #4.C - Stream-Append Line Items</h2>

The same JSON file we processed in the previous stream also contains the line items which we now need to extract and append to the existing **`line_items`** table.

Just like before, our goal is simple, ingest the data, transform it as required by the **`line_items`** table's schema, and append these new records to our existing table.

Note: we are processing the same stream twice - there are other patterns to do this more efficiently, but for this exercise, we want to keep the design simple.<br/>
The good news here is that you can copy most of the code from the previous step to get you started here.

**In this step you will need to:**

* Ingest the stream of JSON files:
  * Start a stream from the path identified by **`stream_path`**.
  * Using the **`maxFilesPerTrigger`** option, throttle the stream to process only one file per iteration.
  * Add the ingest meta data (same as with our other datasets):
    * **`ingested_at`**:**`timestamp`**
    * **`ingest_file_name`**:**`string`**
  * Make any other changes required to the column names and data types so that they conform to the **`line_items`** table's schema
    * The most significant transformation will be to the **`products`** column.
    * The **`products`** column is an array of elements and needs to be exploded (see **`pyspark.sql.functions`**)
    * One solution would include:
      1. Select **`order_id`** and explode **`products`** while renaming it to **`product`**.
      2. Flatten the **`product`** column's nested values.
      3. Add the ingest meta data (**`ingest_file_name`** and **`ingested_at`**).
      4. Convert data types as required by the **`line_items`** table's schema.

* Write the stream to a Delta sink:
  * The sink's format should be "**delta**"
  * Records must be appended to the table identified by the variable **`line_items_table`**
  * The query must be named the same as the table, identified by the variable **`line_items_table`**
  * The query must use the checkpoint location identified by the variable **`line_items_checkpoint_path`**

### Implement Exercise #4.C

Implement your solution in the following cell:

In [None]:
# TODO
# Use this cell to complete your solution