# Coupon Sales Lab
Process and append streaming data on transactions using coupons.
1. Read data stream
2. Filter for transactions with coupons codes
3. Write streaming query results to Delta
4. Monitor streaming query
5. Stop streaming query

##### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.html" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.html" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.html" target="_blank">StreamingQuery</a>

In [0]:
%run ../Includes/Classroom-Setup-5.1a

Python interpreter will be restarted.
Python interpreter will be restarted.



Skipping install of existing datasets to "dbfs:/mnt/dbacademy-datasets/apache-spark-programming-with-databricks/v03"


Validating the locally installed datasets...(3 seconds)

Creating & using the schema "da_jtschopp_1061_asp"...(0 seconds)

Predefined tables in "da_jtschopp_1061_asp":
  -none-

Predefined paths variables:
  DA.paths.datasets:    dbfs:/mnt/dbacademy-datasets/apache-spark-programming-with-databricks/v03
  DA.paths.user_db:     dbfs:/mnt/dbacademy-users/jtschopp@u.rochester.edu/apache-spark-programming-with-databricks/database.db
  DA.paths.working_dir: dbfs:/mnt/dbacademy-users/jtschopp@u.rochester.edu/apache-spark-programming-with-databricks
  DA.paths.checkpoints: dbfs:/mnt/dbacademy-users/jtschopp@u.rochester.edu/apache-spark-programming-with-databricks/_checkpoints
  DA.paths.sales:       dbfs:/mnt/dbacademy-datasets/apache-spark-programming-with-databricks/v03/ecommerce/sales/sales.delta
  DA.paths.users:       dbfs:/mnt/dbacademy-datasets/apache-spark-programming

### 1. Read data stream
- Set to process 1 file per trigger
- Read from Delta files in the source directory specified by **`DA.paths.sales`**

Assign the resulting DataFrame to **`df`**.

In [0]:
# TODO
df = (spark
      .readStream
      .option("maxFilesPerTrigger", 1)
      .format("delta")
      .load(DA.paths.sales)
)

**1.1: CHECK YOUR WORK**

In [0]:
DA.tests.validate_1_1(df)

Points,Test,Result
1,The query is streaming,
1,DataFrame contains all 7 columns,


### 2. Filter for transactions with coupon codes
- Explode the **`items`** field in **`df`** with the results replacing the existing **`items`** field
- Filter for records where **`items.coupon`** is not null

Assign the resulting DataFrame to **`coupon_sales_df`**.

In [0]:
# TODO

from pyspark.sql.functions import col, explode

coupon_sales_df = (df
                   .withColumn("items", explode(col("items")))
                   .filter(col("items.coupon").isNotNull())
                  )

**2.1: CHECK YOUR WORK**

In [0]:
DA.tests.validate_2_1(coupon_sales_df.schema)

Points,Test,Result
1,Schema is of type StructType,
1,Schema contians seven fields,
1,"Schema contains ""order_id"" of type LongType (nullable=None)",
1,"Schema contains ""email"" of type StringType (nullable=None)",
1,"Schema contains ""transaction_timestamp"" of type LongType (nullable=None)",
1,"Schema contains ""total_item_quantity"" of type LongType (nullable=None)",
1,"Schema contains ""purchase_revenue_in_usd"" of type DoubleType (nullable=None)",
1,"Schema contains ""unique_items"" of type LongType (nullable=None)",
1,"Schema contains ""items"" of type StructType (nullable=None)",


### 3. Write streaming query results to Delta
- Configure the streaming query to write Delta format files in "append" mode
- Set the query name to "coupon_sales"
- Set a trigger interval of 1 second
- Set the checkpoint location to **`coupons_checkpoint_path`**
- Set the output path to **`coupons_output_path`**

Start the streaming query and assign the resulting handle to **`coupon_sales_query`**.

In [0]:
# TODO
coupons_checkpoint_path = f"{DA.paths.checkpoints}/coupon-sales"
coupons_output_path = f"{DA.paths.working_dir}/coupon-sales/output"

coupon_sales_query = (coupon_sales_df.writeStream
                      .outputMode("append")
                      .format("delta")
                      .queryName("coupon_sales")
                      .trigger(processingTime="1 second")
                      .option("checkpointLocation", coupons_checkpoint_path)
                      .start(coupons_output_path))

DA.block_until_stream_is_ready(coupon_sales_query)

Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 1 of 2 batches...
Processed 1 of 2 batches...
Processed 2 of 2 batches...
The stream is now active with 2 batches having been processed.


**3.1: CHECK YOUR WORK**

In [0]:
DA.tests.validate_3_1(coupon_sales_query)

Points,Test,Result
1,The query is active,
1,"The query name is ""coupon_sales"".",
1,Found at least one file in .../coupon-sales/output,
1,Found at least one file in .../coupon-sales,


### 4. Monitor streaming query
- Get the ID of streaming query and store it in **`queryID`**
- Get the status of streaming query and store it in **`queryStatus`**

In [0]:
# TODO
query_id = coupon_sales_query.id

In [0]:
# TODO
query_status = coupon_sales_query.status

**4.1: CHECK YOUR WORK**

In [0]:
DA.tests.validate_4_1(query_id, query_status)

Points,Test,Result
1,Valid status value.,
1,Valid query_id value.,


### 5. Stop streaming query
- Stop the streaming query

In [0]:
# TODO
coupon_sales_query.stop()
coupon_sales_query.awaitTermination()

**5.1: CHECK YOUR WORK**

In [0]:
DA.tests.validate_5_1(coupon_sales_query)

Points,Test,Result
1,The query is not active,


### 6. Verify the records were written in Delta format

In [0]:
# TODO
display(spark.read.format("delta").load(coupons_output_path))

order_id,email,transaction_timestamp,total_item_quantity,purchase_revenue_in_usd,unique_items,items
282611,bmurillo@hotmail.com,1592504237604072,1,940.5,1,"List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1)"
283949,whardin@hotmail.com,1592510720760323,1,535.5,1,"List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1)"
264191,maxwelltara@edwards.com,1592306255847870,2,993.6,2,"List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1)"
264191,maxwelltara@edwards.com,1592306255847870,2,993.6,2,"List(NEWBED10, P_FOAM_S, Standard Foam Pillow, 53.1, 59.0, 1)"
286727,rojasjorge@yahoo.com,1592533048926949,1,535.5,1,"List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1)"
287573,marmstrong46@hotmail.com,1592547727220317,1,53.1,1,"List(NEWBED10, P_FOAM_S, Standard Foam Pillow, 53.1, 59.0, 1)"
271597,johnsonderrick@yahoo.com,1592399808941985,5,2846.7,5,"List(NEWBED10, M_STAN_K, Standard King Mattress, 1075.5, 1195.0, 1)"
271597,johnsonderrick@yahoo.com,1592399808941985,5,2846.7,5,"List(NEWBED10, P_DOWN_K, King Down Pillow, 143.1, 159.0, 1)"
271597,johnsonderrick@yahoo.com,1592399808941985,5,2846.7,5,"List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1)"
271597,johnsonderrick@yahoo.com,1592399808941985,5,2846.7,5,"List(NEWBED10, M_PREM_T, Premium Twin Mattress, 985.5, 1095.0, 1)"


### Classroom Cleanup
Run the cell below to clean up resources.

In [0]:
DA.cleanup()

Resetting the learning environment...
...dropping the schema "da_jtschopp_1061_asp"...(0 seconds)
...removing the working directory "dbfs:/mnt/dbacademy-users/jtschopp@u.rochester.edu/apache-spark-programming-with-databricks"...(0 seconds)


Validating the locally installed datasets...(4 seconds)
