<a href="https://colab.research.google.com/github/lucprosa/dataeng-basic-course/blob/main/spark_streaming/challenges/final_challenges.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setting up PySpark

In [1]:
%pip install pyspark



In [2]:
%pip install faker

Collecting faker
  Downloading Faker-33.1.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-33.1.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-33.1.0


# Context
Message events are coming from platform message broker (kafka, pubsub, kinesis...).
You need to process the data according to the requirements.

Message schema:
- timestamp
- value
- event_type
- message_id
- country_id
- user_id



# Challenge 1

Step 1
- Change exising producer
	- Change parquet location to "/content/lake/bronze/messages/data"
	- Add checkpoint (/content/lake/bronze/messages/checkpoint)
	- Delete /content/lake/bronze/messages and reprocess data
	- For reprocessing, run the streaming for at least 1 minute, then stop it

Step 2
- Implement new stream job to read from messages in bronze layer and split result in two locations
	- "messages_corrupted"
		- logic: event_status is null, empty or equal to "NONE"
		- extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages_corrupted/data

	- "messages"
		- logic: not corrupted data
		- extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages/data

	- technical requirements
		- add checkpoint (choose location)
		- use StructSchema
		- Set trigger interval to 5 seconds
		- run streaming for at least 20 seconds, then stop it

	- alternatives
		- implementing single streaming job with foreach/- foreachBatch logic to write into two locations
		- implementing two streaming jobs, one for messages and another for messages_corrupted
		- (paying attention on the paths and checkpoints)


  - Check results:
    - results from messages in bronze layer should match with the sum of messages+messages_corrupted in the silver layer

## Producer

In [3]:
import os
import shutil
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from faker import Faker
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName('Test streaming').getOrCreate()
sc = spark.sparkContext

# Create instance Faker
fake = Faker()
messages = [fake.uuid4() for _ in range(50)]

def enrich_data(df, messages=messages):
    fake = Faker()
    new_columns = {
        'event_type': F.lit(fake.random_element(elements=('OPEN', 'RECEIVED', 'SENT', 'CREATED', 'CLICKED', '', 'NONE'))),
        'message_id': F.lit(fake.random_element(elements=messages)),
        'channel': F.lit(fake.random_element(elements=('CHAT', 'EMAIL', 'SMS', 'PUSH', 'OTHER'))),
        'country_id': F.lit(fake.random_int(min=2000, max=2015)),
        'user_id': F.lit(fake.random_int(min=1000, max=1050)),
    }


    for col_name, col_value in new_columns.items():
        df = df.withColumn(col_name, col_value)

    return df


def insert_messages(df: DataFrame, batch_id):
    enrich = enrich_data(df)
    enrich.write.mode("append").format("parquet").save("/content/lake/bronze/messages/data")


def clean_directories_with_shutil():
    parquet_location = "/content/lake/bronze/messages/data"
    checkpoint_location = "/content/lake/bronze/messages/checkpoint"

    # Check and remove directories
    if os.path.exists(parquet_location):
        shutil.rmtree(parquet_location)  # Remove any old Parquet directory
    if os.path.exists(checkpoint_location):
        shutil.rmtree(checkpoint_location)  # Remove any old Checkpoint directoty

# Remove any old data
clean_directories_with_shutil()

# readStream the data
df_stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# writeStream the data and create the Checkpoint
query = (df_stream.writeStream
    .outputMode('append')
    .trigger(processingTime='1 seconds')
    .foreachBatch(insert_messages)
    .option("checkpointLocation", "/content/lake/bronze/messages/checkpoint")
    .start()
)

# Run streaming for 1 minute (60 seconds)
query.awaitTermination(60)

# Stop streaming
query.stop()

In [4]:
df_bronze = spark.read.format("parquet").load("/content/lake/bronze/messages/data")
df_bronze.show()

+--------------------+-----+----------+--------------------+-------+----------+-------+
|           timestamp|value|event_type|          message_id|channel|country_id|user_id|
+--------------------+-----+----------+--------------------+-------+----------+-------+
|2024-12-11 17:45:...|    1|      OPEN|34b86fce-5f61-4c3...|   CHAT|      2011|   1022|
|2024-12-11 17:45:...|    3|      OPEN|34b86fce-5f61-4c3...|   CHAT|      2011|   1022|
|2024-12-11 17:45:...|    5|      OPEN|34b86fce-5f61-4c3...|   CHAT|      2011|   1022|
|2024-12-11 17:45:...|    0|      OPEN|34b86fce-5f61-4c3...|   CHAT|      2011|   1022|
|2024-12-11 17:45:...|    2|      OPEN|34b86fce-5f61-4c3...|   CHAT|      2011|   1022|
|2024-12-11 17:45:...|    4|      OPEN|34b86fce-5f61-4c3...|   CHAT|      2011|   1022|
|2024-12-11 17:45:...|   25|  RECEIVED|8239fc3e-91ba-4cb...|  EMAIL|      2014|   1010|
|2024-12-11 17:45:...|   29|  RECEIVED|f4b7d7d6-86ad-429...|  OTHER|      2000|   1009|
|2024-12-11 17:45:...|   32|  RE

## Additional datasets

In [5]:
countries = [
    {"country_id": 2000, "country": "Brazil"},
    {"country_id": 2001, "country": "Portugal"},
    {"country_id": 2002, "country": "Spain"},
    {"country_id": 2003, "country": "Germany"},
    {"country_id": 2004, "country": "France"},
    {"country_id": 2005, "country": "Italy"},
    {"country_id": 2006, "country": "United Kingdom"},
    {"country_id": 2007, "country": "United States"},
    {"country_id": 2008, "country": "Canada"},
    {"country_id": 2009, "country": "Australia"},
    {"country_id": 2010, "country": "Japan"},
    {"country_id": 2011, "country": "China"},
    {"country_id": 2012, "country": "India"},
    {"country_id": 2013, "country": "South Korea"},
    {"country_id": 2014, "country": "Russia"},
    {"country_id": 2015, "country": "Argentina"}
]

countries = spark.createDataFrame(countries)

In [6]:
# Check the schema of df_bronze
df_bronze.schema

StructType([StructField('timestamp', TimestampType(), True), StructField('value', LongType(), True), StructField('event_type', StringType(), True), StructField('message_id', StringType(), True), StructField('channel', StringType(), True), StructField('country_id', IntegerType(), True), StructField('user_id', IntegerType(), True)])

In [7]:
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType, IntegerType

# Define schema of df_bronze
message_schema = StructType([StructField('timestamp', TimestampType(), True),
                             StructField('value', LongType(), True),
                             StructField('event_type', StringType(), True),
                             StructField('message_id', StringType(), True),
                             StructField('channel', StringType(), True),
                             StructField('country_id', IntegerType(), True),
                             StructField('user_id', IntegerType(), True)])

# readStream from bronze layer
df_bronze_stream = spark.readStream.schema(message_schema).parquet("/content/lake/bronze/messages/data")

# Join of df_bronze with countries dataset
df_bronze_with_country = df_bronze_stream.join(countries, on='country_id', how='left')

## Streaming Messages x Messages Corrupted

Implementing two streaming jobs, one for messages and another for messages_corrupted.

In [None]:
# Filter corrupted data (corrupted messages)
df_corrupted = df_bronze_with_country.filter(
    (F.col('event_type').isNull()) |
    (F.col('event_type') == '') |
    (F.col('event_type') == 'NONE')
)

# Filter non corrupted data (valid messages)
df_valid = df_bronze_with_country.filter(
    ~((F.col('event_type').isNull()) |
      (F.col('event_type') == '') |
      (F.col('event_type') == 'NONE'))
)

# Add column 'date' and write with partition by "date"
def insert_corrupted_messages(df, batch_id):
    df_with_date = df.withColumn("date", F.to_date(F.col("timestamp"))) # Add column 'date''
    df_with_date.write.mode("append").partitionBy("date").format("parquet").save("/content/lake/silver/messages_corrupted/data") # partitionBy 'date'

def insert_valid_messages(df, batch_id):
    df_with_date = df.withColumn("date", F.to_date(F.col("timestamp")))
    df_with_date.write.mode("append").partitionBy("date").format("parquet").save("/content/lake/silver/messages/data")


# writeStream fo corrupted messages
query_corrupted = (df_corrupted.writeStream
    .outputMode('append')
    .trigger(processingTime='5 seconds')  # trigger interval to 5 seconds
    .foreachBatch(insert_corrupted_messages)
    .option("checkpointLocation", "/content/lake/silver/messages_corrupted/checkpoint")
    .start()
)

# writeStream fo valid messages
query_valid = (df_valid.writeStream
    .outputMode('append')
    .trigger(processingTime='5 seconds')  # trigger interval to 5 seconds
    .foreachBatch(insert_valid_messages)
    .option("checkpointLocation", "/content/lake/silver/messages/checkpoint")
    .start()
)

# Run the two streams
query_corrupted.awaitTermination(20)  # run streaming for at least 20 seconds
query_valid.awaitTermination(20)  # run streaming for at least 20 seconds

# Stop the two streams
query_corrupted.stop()
query_valid.stop()

### Checking data

Results from messages in bronze layer should match with the sum of messages+messages_corrupted in the silver layer.

In [None]:
# Count records in Bronze Layer
bronze_count = df_bronze.count()
print(f"Number of records in the bronze layer: {bronze_count}")

# Count records of valid messages in Silver Layer
df_valid_messages = spark.read.format("parquet").load("/content/lake/silver/messages/data")
valid_count = df_valid_messages.count()
print(f"Number of records in silver/messages (valid): {valid_count}")

# Count records of corrupted messages in Silver Layer
df_corrupted_messages = spark.read.format("parquet").load("/content/lake/silver/messages_corrupted/data")
corrupted_count = df_corrupted_messages.count()
print(f"Number of records in silver/messages_corrupted (corrupted): {corrupted_count}")

# Check if total records in bronze layer = valid messages in silver + corrupted messages in silver
if bronze_count == (valid_count + corrupted_count):
    print("The sum of messages in silver layer matches the number of records in the bronze layer.")
else:
    print("The sum of messages in silver layer does not match the number of records in the bronze layer.")


In [None]:
df_valid_messages.show()

In [None]:
df_corrupted_messages.show()

# Challenge 2

- Run business report
- But first, there is a bug in the system which is causing some duplicated messages, we need to exclude these lines from the report

- Removing duplicates logic:
  - Identify possible duplicates on message_id, event_type and channel
  - In case of duplicates, consider only the first message (occurrence by timestamp)
  - Ex:
    In table below, the correct message to consider is the second line

```
    message_id | channel | event_type | timestamp
    123        | CHAT    | CREATED    | 10:10:01
    123        | CHAT    | CREATED    | 07:56:45 (first occurrence)
    123        | CHAT    | CREATED    | 08:13:33
```

- After cleaning the data we're able to create the busines report

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

df_valid_messages = spark.read.format("parquet").load("/content/lake/silver/messages/data")

In [None]:
# Group by message_id, event_type, and channel to check duplicates
duplicates = df_valid_messages.groupBy("message_id", "event_type", "channel").count().filter("count > 1")

duplicates.show(10,False)

In [None]:
# Show records with message_id 'af7712d1-6823-4c3a-8ff5-6b7f064b59bb' (duplicated record)
df_valid_messages.filter(df_valid_messages.message_id == "af7712d1-6823-4c3a-8ff5-6b7f064b59bb").show(10, False)

In [None]:
# Count total records before dedup
total_before_dedup = df_valid_messages.count()
print(f"Total before deduplication: {total_before_dedup}")

In [None]:
# Remove duplicated records
dedup = df_valid_messages.withColumn("row_number",
                                     F.row_number()\
                                     .over(Window.partitionBy("message_id", "event_type", "channel")\
                                     .orderBy("timestamp"))).filter("row_number = 1").drop("row_number")

In [None]:
# Count total records after dedup
total_after_dedup = dedup.count()
print(f"Total after deduplication: {total_after_dedup}")

In [None]:
# Group by message_id, event_type, and channel to check duplicates
duplicates_check = dedup.groupBy("message_id", "event_type", "channel").count().filter("count > 1")

duplicates_check.show()


### Report 1
  - Aggregate data by date, event_type and channel
  - Count number of messages
  - Pivot event_type from rows into columns
  - Schema expected:
  
```
|      date|channel|CLICKED|CREATED|OPEN|RECEIVED|SENT|
+----------+-------+-------+-------+----+--------+----+
|2024-12-03|    SMS|      4|      4|   1|       1|   5|
|2024-12-03|   CHAT|      3|      7|   5|       8|   4|
|2024-12-03|   PUSH|   NULL|      3|   4|       3|   4|
```

In [None]:
dedup.show(10,False)

In [None]:
# Aggregate data and count messages by date, event_type and channel
qty_messages = dedup.groupBy("date", "channel", "event_type").count()

# Pivot event_type from rows into columns
pivoted_event_type = qty_messages.groupBy("date", "channel") \
    .pivot("event_type") \
    .agg(F.sum("count"))

pivoted_event_type.show(10, False)


## Report 2

- Identify the most active users by channel (sorted by number of iterations)
- Schema expected:

```
+-------+----------+----+-----+-----+----+---+
|user_id|iterations|CHAT|EMAIL|OTHER|PUSH|SMS|
+-------+----------+----+-----+-----+----+---+
|   1022|         5|   2|    0|    1|   0|  2|
|   1004|         4|   1|    1|    1|   1|  0|
|   1013|         4|   0|    0|    2|   1|  1|
|   1020|         4|   2|    0|    1|   1|  0|
```


In [None]:
# Aggregate data and count by user_id and channel
user_channel = dedup.groupBy("user_id", "channel").count()

# Pivot channel from rows into columns
pivoted_channel = user_channel.groupBy("user_id").pivot("channel").sum("count")

# NULL substituted by 0
pivoted_channel = pivoted_channel.fillna(0)

# Total of iterations per user_id (iterations will be the sum of messages by channel per user_id)
pivoted_channel = pivoted_channel.withColumn("iterations", sum(F.col(c) for c in pivoted_channel.columns if c != "user_id"))

# Reorder columns so iterations appears after user_id
columns = ["user_id", "iterations"] + [col for col in pivoted_channel.columns if col not in ["user_id", "iterations"]]
pivoted_channel = pivoted_channel.select(*columns)

# Order (desc) user_id by iterations to see the most active user_id
most_active_users = pivoted_channel.orderBy(F.desc("iterations"))

most_active_users.show()


# Challenge 3

In [None]:
# Theoretical question:

# A new usecase requires the message data to be aggregate in near real time
# They want to build a dashboard embedded in the platform website to analyze message data in low latency (few minutes)
# This application will access directly the data aggregated by streaming process

# Q1:
- What would be your suggestion to achieve that using Spark Structure Streaming?
Or would you choose a different data processing tool?

- Which storage would you use and why? (database?, data lake?, kafka?)



Since the new usecase requires near real time, we could opt for a tool that provides micro-batch processing. As we need to aggregate data (a stateful operation), we can use **Spark Structure Streaming**, even if its low latency (few seconds to a few minutes of delay) is not as low as with a real-time streaming tool. For the refered case, Spark Structure Streaming seems to serve the required purpose.

For storage, I would say the best option is to use a **datalake**. It would be able to handle the aggregated data that is going to be visualized in the dashboard, something that the database would not be able to do as good. Regarding Kafka, it is said it is not designed for long-term data storage or complex querying, besides being expensive.