# DS-610 Week 8 Homework: Structured Streaming API in Apache Spark
This homework is adapted from:
https://docs.databricks.com/_extras/notebooks/source/stream-stream-joins-scala.html

For this homework we will practice structured streaming API in Apache Spark with a case study on a simulated data. The data consists of simulated advertisement impressions and clicks on a mobile device. The goal here is to find a set of advertisements with a *meaningful click*. The word *meaningful* can be broadly defined as it is always hard to guess the intentions of a user on a mobile device. Some clicks are due to a mistake, and some clicks are just too far apart in time to be meaningful. For this homework we will assume that each advertisement is shown exactly once in a sequence.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
import pyspark
from pyspark.streaming import StreamingContext
import time

spark = SparkSession \
    .builder \
    .appName("Simulated Ad Impression and Click") \
    .getOrCreate()

# Starting Simulation Data Streams
We will leverage `rate` source in Apache Spark to generate streams of timestamped data. The first is a time stamped data that represents the ad impressions, i.e. the pair of advertisement id and the timestamp at which the ad was displayed to the user. The second stream is a time stamped data that represents the ad clicks, i.e. the pair of advertisement id and the timestamp at which the ad was clicked by the user.

The goal of computational advertisement is to maximize the chance that a user clicks on a set of advertisements that are displayed.

In [0]:
impressions = spark.readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()
impressions = impressions.select(impressions.value.alias("shownAdId"), impressions.timestamp.alias("impressionTime"))

In [0]:
from pyspark.sql import functions as F

# 10 out of every 100 impressions result in a click
clicks = spark.readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load().where((F.rand() * 100).cast("integer") < 10)

# -50 so that a click with same id as impression is generated much later.
clicks = clicks.select((clicks.value - 50).alias("clickedAdId"), clicks.timestamp.alias("clickTime")).where("clickedAdId > 0") 

Below we will see the schema for `impressions`. The first column is ID number of the advertisement (`adId`) and the second column is the time when it was displayed on the screen (`impressionTime`).

In [0]:
impressions_query = impressions.writeStream.queryName("impressions").format("memory").outputMode("append").start()
non_empty_count = 0
while non_empty_count < 3:
    result = spark.sql("SELECT * FROM impressions")
    row_count = result.count()
    if row_count == 0:
        continue
    non_empty_count += 1
    result.show(row_count, truncate=False)
    time.sleep(3)
impressions_query.stop()

+---------+-----------------------+
|shownAdId|impressionTime         |
+---------+-----------------------+
|0        |2025-01-20 01:18:00.983|
|1        |2025-01-20 01:18:01.183|
|2        |2025-01-20 01:18:01.383|
|3        |2025-01-20 01:18:01.583|
|4        |2025-01-20 01:18:01.783|
+---------+-----------------------+

+---------+-----------------------+
|shownAdId|impressionTime         |
+---------+-----------------------+
|0        |2025-01-20 01:18:00.983|
|1        |2025-01-20 01:18:01.183|
|2        |2025-01-20 01:18:01.383|
|3        |2025-01-20 01:18:01.583|
|4        |2025-01-20 01:18:01.783|
|5        |2025-01-20 01:18:01.983|
|6        |2025-01-20 01:18:02.183|
|7        |2025-01-20 01:18:02.383|
|8        |2025-01-20 01:18:02.583|
|9        |2025-01-20 01:18:02.783|
|10       |2025-01-20 01:18:02.983|
|11       |2025-01-20 01:18:03.183|
|12       |2025-01-20 01:18:03.383|
|13       |2025-01-20 01:18:03.583|
|14       |2025-01-20 01:18:03.783|
+---------+----------------

Similarly we see the schema for `clicks` where the first column is `adId` and the second columns is the time when the advertisement was clicked (`clickTime`).

In [0]:
clicks_query = clicks.writeStream.queryName("clicks").format("memory").outputMode("append").start()
non_empty_count = 0
while non_empty_count < 3:
    result = spark.sql("select * from clicks")
    row_count = result.count()
    if row_count == 0:
        continue
    non_empty_count += 1
    result.show(row_count, truncate=False)
    time.sleep(3)
clicks_query.stop()

+-----------+-----------------------+
|clickedAdId|clickTime              |
+-----------+-----------------------+
|13         |2025-01-20 01:18:25.955|
+-----------+-----------------------+

+-----------+-----------------------+
|clickedAdId|clickTime              |
+-----------+-----------------------+
|13         |2025-01-20 01:18:25.955|
+-----------+-----------------------+

+-----------+-----------------------+
|clickedAdId|clickTime              |
+-----------+-----------------------+
|13         |2025-01-20 01:18:25.955|
|31         |2025-01-20 01:18:29.555|
|33         |2025-01-20 01:18:29.955|
+-----------+-----------------------+



## Part 1: Inner Join of Streaming DataFrames
### Part 1a
Write a code that joins the streaming DataFrame `impressions` and the streaming DataFrame `clicks` on the column `shownAdId = clickedAdId'. Then create a `StreamingQuery` by calling `writeStream`. Use `outputMode` as `append` and `format` as `memory`. Set the queryName via `queryName` method. Then finally fire off the query by calling `start` method on it. For your reference, the Scala version of the code is given in the website:
https://docs.databricks.com/_extras/notebooks/source/stream-stream-joins-scala.html

For Python API, please refer to:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Part 1a requires joining two column names `shownAdId` with `clickedAdId` so we will find the following useful:
https://stackoverflow.com/questions/43675465/how-to-pass-join-condition-as-a-parameter-to-spark-dataframe-joins

**Note.** For our homework, the inner join is written to the memory but we can also leverage other sink types. For reference, please see the website links referenced in this cell.

In [0]:
# Complete this part for Part 1a. This function should return a StreamingQuery that is a result of an innerjoin
# between the two streaming dataframes, impressions and clicks.
from pyspark.sql.functions import expr
joined_stream_name = 'inner_join_impression_click'

inner_join_query = impressions.join(clicks, expr("shownAdId = clickedAdId"))\
  .writeStream\
    .outputMode('append')\
      .format('memory')\
        .queryName(joined_stream_name)\
          .start()


In [0]:
# Start running the query that prints the running counts to the console
non_empty_count = 0
while non_empty_count < 3:
    result = spark.sql("SELECT * FROM inner_join_impression_click")
    row_count = result.count()
    if row_count == 0:
        continue
    non_empty_count += 1
    result.show(row_count, truncate=False)
    time.sleep(3)

+---------+-----------------------+-----------+----------------------+
|shownAdId|impressionTime         |clickedAdId|clickTime             |
+---------+-----------------------+-----------+----------------------+
|29       |2025-01-20 01:18:44.045|29         |2025-01-20 01:18:54.33|
|54       |2025-01-20 01:18:49.045|54         |2025-01-20 01:18:59.33|
|95       |2025-01-20 01:18:57.245|95         |2025-01-20 01:19:07.53|
|3        |2025-01-20 01:18:38.845|3          |2025-01-20 01:18:49.13|
|37       |2025-01-20 01:18:45.645|37         |2025-01-20 01:18:55.93|
|12       |2025-01-20 01:18:40.645|12         |2025-01-20 01:18:50.93|
|66       |2025-01-20 01:18:51.445|66         |2025-01-20 01:19:01.73|
|13       |2025-01-20 01:18:40.845|13         |2025-01-20 01:18:51.13|
|36       |2025-01-20 01:18:45.445|36         |2025-01-20 01:18:55.73|
|14       |2025-01-20 01:18:41.045|14         |2025-01-20 01:18:51.33|
|38       |2025-01-20 01:18:45.845|38         |2025-01-20 01:18:56.13|
|53   

In [0]:
try:
    inner_join_query.stop()
    inner_join_query.awaitTermination()
except:
    pass

### Part 1b
Inspect the output of your streaming query. For the moment, please ignore the warning messages that are outputted in the cell output. Instead, examine the outputs of the SELECT statements. What do you notice about the tables that are outputted?

**Answer to Part 1b**: 
The streaming query shows how new data keeps getting added over time, while older data stays unchanged. Each new set of rows represents recent events, and the timestamps show the data is processed in order. The IDs match between ad impressions and clicks, showing a clear link between the two. This is a simple example of how real-time data is handled, adding new information while keeping past data intact.

## Part 2: Adding More Logic to Inner Join
### Part 2a
Note that a plain inner join will happily join an impression with a click that could be far away. Normally in computational advertisement, we would want to constrain the impression/click join within a minute or two. This is especially true in a mobile device. 

Write a code that enforces that a click can occur within a time range of 0 seconds to 1 minute after the corresponding impression. Specify this as a join condition between `impressionTime` and `clickTime`.

In [0]:
# Complete this part for Part 2a.
from pyspark.sql.functions import expr
restricted_joined_stream_name = 'rest_inner_join_impression_click'

rest_inner_join_query = impressions.join(clicks, expr("shownAdId = clickedAdId AND clickTime > impressionTime "))\
  .writeStream \
    .outputMode('append')\
      .format('memory')\
        .queryName(restricted_joined_stream_name)\
          .start()

In [0]:
# Start running the query that prints the running counts to the console
non_empty_count = 0
while non_empty_count < 3:
    result = spark.sql("SELECT * FROM rest_inner_join_impression_click")
    row_count = result.count()
    if row_count == 0:
        continue
    non_empty_count += 1
    result.show(row_count, truncate=False)
    time.sleep(3)

+---------+-----------------------+-----------+-----------------------+
|shownAdId|impressionTime         |clickedAdId|clickTime              |
+---------+-----------------------+-----------+-----------------------+
|19       |2025-01-20 01:36:19.606|19         |2025-01-20 01:36:29.956|
|95       |2025-01-20 01:36:34.806|95         |2025-01-20 01:36:45.156|
|6        |2025-01-20 01:36:17.006|6          |2025-01-20 01:36:27.356|
|88       |2025-01-20 01:36:33.406|88         |2025-01-20 01:36:43.756|
|1        |2025-01-20 01:36:16.006|1          |2025-01-20 01:36:26.356|
|69       |2025-01-20 01:36:29.606|69         |2025-01-20 01:36:39.956|
|59       |2025-01-20 01:36:27.606|59         |2025-01-20 01:36:37.956|
|47       |2025-01-20 01:36:25.206|47         |2025-01-20 01:36:35.556|
+---------+-----------------------+-----------+-----------------------+

+---------+-----------------------+-----------+-----------------------+
|shownAdId|impressionTime         |clickedAdId|clickTime       

In [0]:
try:
    rest_inner_join_query.stop()
    rest_inner_join_query.awaitTermination()
except:
    pass