-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Coupon Sales Lab
Process and append streaming data on transactions using coupons.
1. Read data stream
2. Filter for transactions with coupons codes
3. Write streaming query results to Delta
4. Monitor streaming query
5. Stop streaming query

##### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamReader.html#pyspark.sql.streaming.DataStreamReader" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html#pyspark.sql.streaming.StreamingQuery" target="_blank">StreamingQuery</a>

In [0]:
%run ../Includes/Classroom-Setup

### 1. Read data stream
- Set to process 1 file per trigger
- Read from Delta files in the source directory specified by **`sales_path`**

Assign the resulting DataFrame to **`df`**.

In [0]:
# TODO
df = (spark
      .readStream
      .option("maxFilesPerTrigger", 1)
      .format("delta")
      .load(sales_path)
)

df.isStreaming

**1.1: CHECK YOUR WORK**

In [0]:
assert df.isStreaming
assert df.columns == ["order_id", "email", "transaction_timestamp", "total_item_quantity", "purchase_revenue_in_usd", "unique_items", "items"]
print("All test pass")

### 2. Filter for transactions with coupon codes
- Explode the **`items`** field in **`df`** with the results replacing the existing **`items`** field
- Filter for records where **`items.coupon`** is not null

Assign the resulting DataFrame to **`coupon_sales_df`**.

In [0]:
from pyspark.sql.functions import col, explode
# TODO
coupon_sales_df = (df.withColumn("items", explode(col("items")))
                   .filter("items.coupon is Not Null")
       #filter(col("items.coupon").isNotNull())            
)

**2.1: CHECK YOUR WORK**

In [0]:
schema_str = str(coupon_sales_df.schema)
assert "StructField(items,StructType(List(StructField(coupon" in schema_str, "items column was not exploded"
print("All test pass")

### 3. Write streaming query results to Delta
- Configure the streaming query to write Delta format files in "append" mode
- Set the query name to "coupon_sales"
- Set a trigger interval of 1 second
- Set the checkpoint location to **`coupons_checkpoint_path`**
- Set the output path to **`coupons_output_path`**

Start the streaming query and assign the resulting handle to **`coupon_sales_query`**.

In [0]:
# TODO
coupons_checkpoint_path = working_dir + "/coupon-sales/checkpoint"
coupons_output_path = working_dir + "/coupon-sales/output"

coupon_sales_query = (coupon_sales_df
                      .writeStream
                     .outputMode("append")
                     .format("delta")
                     .queryName("coupon_sales")
                     .trigger(processingTime="1 second")
                     .option("checkpointLocation", coupons_checkpoint_path)
                     .start(coupons_output_path)
                     )

**3.1: CHECK YOUR WORK**

In [0]:
until_stream_is_ready("coupon_sales")
assert coupon_sales_query.isActive
assert len(dbutils.fs.ls(coupons_output_path)) > 0
assert len(dbutils.fs.ls(coupons_checkpoint_path)) > 0
assert "coupon_sales" in coupon_sales_query.lastProgress["name"]
print("All test pass")

### 4. Monitor streaming query
- Get the ID of streaming query and store it in **`queryID`**
- Get the status of streaming query and store it in **`queryStatus`**

In [0]:
# TODO
query_id = coupon_sales_query.id
#print(query_id)

In [0]:
# TODO
query_status = coupon_sales_query.status

**4.1: CHECK YOUR WORK**

In [0]:
assert type(query_id) == str
assert list(query_status.keys()) == ["message", "isDataAvailable", "isTriggerActive"]
print("All test pass")

### 5. Stop streaming query
- Stop the streaming query

In [0]:
# TODO
coupon_sales_query.stop()

**5.1: CHECK YOUR WORK**

In [0]:
assert not coupon_sales_query.isActive
print("All test pass")

### 6. Verify the records were written in Delta format

In [0]:
# TODO

### Classroom Cleanup
Run the cell below to clean up resources.

In [0]:
classroom_cleanup()

-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>