<a href="https://colab.research.google.com/github/mdias23i/DE-DataProcessing/blob/main/spark_streaming/challenges/final_challenges.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setting up PySpark

In [None]:
%pip install pyspark



# Context
Message events are coming from platform message broker (kafka, pubsub, kinesis...).
You need to process the data according to the requirements.

Message schema:
- timestamp
- value
- event_type
- message_id
- country_id
- user_id



# Challenge 1

Step 1
- Change exising producer
	- Change parquet location to "/content/lake/bronze/messages/data"
	- Add checkpoint (/content/lake/bronze/messages/checkpoint)
	- Delete /content/lake/bronze/messages and reprocess data
	- For reprocessing, run the streaming for at least 1 minute, then stop it

Step 2
- Implement new stream job to read from messages in bronze layer and split result in two locations
	- "messages_corrupted"
		- logic: event_status is null, empty or equal to "NONE"
    - extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages_corrupted/data

	- "messages"
		- logic: not corrupted data
		- extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages/data

	- technical requirements
		- add checkpint (choose location)
		- use StructSchema
		- Set trigger interval to 5 seconds
		- run streaming for at least 20 seconds, then stop it

	- alternatives
		- implementing single streaming job with foreach/- foreachBatch logic to write into two locations
		- implementing two streaming jobs, one for messages and another for messages_corrupted
		- (paying attention on the paths and checkpoints)


  - Check results:
    - results from messages in bronze layer should match with the sum of messages+messages_corrupted in the silver layer

In [1]:
%pip install faker

Collecting faker
  Downloading Faker-33.1.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-33.1.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m21.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-33.1.0


# Producer

In [14]:
import shutil

shutil.rmtree("content/lake/bronze/messages", ignore_errors=True)
shutil.rmtree("content/lake/silver/messages", ignore_errors=True)
shutil.rmtree("content/lake/silver/messages_corrupted", ignore_errors=True)
shutil.rmtree("content/lake/silver/checkpoint", ignore_errors=True)

In [15]:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from faker import Faker
from pyspark.sql import SparkSession
import shutil

spark = SparkSession.builder.appName('Test streaming').getOrCreate()
sc = spark.sparkContext

fake = Faker()
messages = [fake.uuid4() for _ in range(50)]

shutil.rmtree("content/lake/bronze/messages", ignore_errors=True)
shutil.rmtree("content/lake/silver/messages", ignore_errors=True)
shutil.rmtree("content/lake/silver/messages_corrupted", ignore_errors=True)


def enrich_data(df, messages=messages):
  fake = Faker()
  new_columns = {
      'event_type': F.lit(fake.random_element(elements=('OPEN', 'RECEIVED', 'SENT', 'CREATED', 'CLICKED', '', 'NONE'))),
      'message_id': F.lit(fake.random_element(elements=messages)),
      'channel': F.lit(fake.random_element(elements=('CHAT', 'EMAIL', 'SMS', 'PUSH', 'OTHER'))),
      'country_id': F.lit(fake.random_int(min=2000, max=2015)),
      'user_id': F.lit(fake.random_int(min=1000, max=1050)),
      'event_status': F.lit(fake.random_element(elements=('SUCCESS', 'FAILED', 'PENDING', 'IN_PROGRESS', 'CANCELLED', "NONE", ""))),
  }
  df = df.withColumns(new_columns)
  return df

def insert_messages(df: DataFrame, batch_id):
  enrich = enrich_data(df)
  enrich.write.mode("append").format("parquet").save("content/lake/bronze/messages/data")

# read stream
df_stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# write stream
query = (df_stream.writeStream
.outputMode('append')
.option('checkpointLocation', 'content/lake/bronze/messages/checkpoint')
.trigger(processingTime='1 seconds')
.foreachBatch(insert_messages)
.start()
)

# Run for at least 1 minute
query.awaitTermination(60)

# Stop the query
query.stop()


In [22]:
for stream in spark.streams.active:
    stream.stop()

In [4]:
df = spark.read.format("parquet").load("content/lake/silver/messages/data/*")
df.show()

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/content/lake/silver/messages/data/*.

# Additional datasets

In [16]:
countries = [
    {"country_id": 2000, "country": "Brazil"},
    {"country_id": 2001, "country": "Portugal"},
    {"country_id": 2002, "country": "Spain"},
    {"country_id": 2003, "country": "Germany"},
    {"country_id": 2004, "country": "France"},
    {"country_id": 2005, "country": "Italy"},
    {"country_id": 2006, "country": "United Kingdom"},
    {"country_id": 2007, "country": "United States"},
    {"country_id": 2008, "country": "Canada"},
    {"country_id": 2009, "country": "Australia"},
    {"country_id": 2010, "country": "Japan"},
    {"country_id": 2011, "country": "China"},
    {"country_id": 2012, "country": "India"},
    {"country_id": 2013, "country": "South Korea"},
    {"country_id": 2014, "country": "Russia"},
    {"country_id": 2015, "country": "Argentina"}
]

countries = spark.createDataFrame(countries)

In [7]:
countries.show()

+--------------+----------+
|       country|country_id|
+--------------+----------+
|        Brazil|      2000|
|      Portugal|      2001|
|         Spain|      2002|
|       Germany|      2003|
|        France|      2004|
|         Italy|      2005|
|United Kingdom|      2006|
| United States|      2007|
|        Canada|      2008|
|     Australia|      2009|
|         Japan|      2010|
|         China|      2011|
|         India|      2012|
|   South Korea|      2013|
|        Russia|      2014|
|     Argentina|      2015|
+--------------+----------+



# Streaming Messages x Messages Corrupted

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

#Setting schemas
message_schema = StructType([
    StructField("event_status", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("message_id", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("country_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("timestamp", TimestampType(), True)
])

countries_schema = StructType([
    StructField("country", StringType(), True),
    StructField("country_id", IntegerType(), True)
])


# reading from Bronze layer
bronze_stream = (spark.readStream
                 .schema(message_schema)
                 .format("parquet")
                 .load("content/lake/bronze/messages/data"))


#Function to process each batch
def process_batch(df, batch_id):

  df_with_country = df.join(countries, on="country_id")

  df_with_country = df_with_country.withColumn("date", F.to_date("timestamp"))

  df_country_corrupted = df_with_country.filter(
        (F.col("event_status").isNull()) |
        (F.col("event_status") == "") |
        (F.col("event_status") == "NONE")
    )
  df_country_non_corrupted = df_with_country.filter(
        ~((F.col("event_status").isNull()) |
          (F.col("event_status") == "") |
          (F.col("event_status") == "NONE"))
    )

    # Write corrupted data
  (df_country_corrupted.write
     .mode("append")
     .partitionBy("date")
     .format("parquet")
     .save("content/lake/silver/messages_corrupted/data"))

    # Write not corrupted data
  (df_country_non_corrupted.write
     .mode("append")
     .partitionBy("date")
     .format("parquet")
     .save("content/lake/silver/messages/data"))



query2 = (bronze_stream.writeStream
         .outputMode("append")
         .foreachBatch(process_batch)
         .option("checkpointLocation", "content/lake/silver/checkpoint")
         .trigger(processingTime="5 seconds")
         .start())


query2.awaitTermination(20)

query2.stop()

## Checking data

In [18]:
bronze_df = spark.read.format("parquet").load("content/lake/bronze/messages/data")
bronze_count = bronze_df.count()

messages_df = spark.read.format("parquet").load("content/lake/silver/messages/data")
silver_messages_count = messages_df.count()

corrupted_df = spark.read.format("parquet").load("content/lake/silver/messages_corrupted/data")
silver_corrupted_count = corrupted_df.count()

total_silver_count = silver_messages_count + silver_corrupted_count

if bronze_count == total_silver_count:
    print("Results match")
else:
    print("Error, results do not match")


Results match


# Challenge 2

- Run business report
- But first, there is a bug in the system which is causing some duplicated messages, we need to exclude these lines from the report

- removing duplicates logic:
  - Identify possible duplicates on message_id, event_type and channel
  - in case of duplicates, consider only the first message (occurrence by timestamp)
  - Ex:
    In table below, the correct message to consider is the second line

```
    message_id | channel | event_type | timestamp
    123        | CHAT    | CREATED    | 10:10:01
    123        | CHAT    | CREATED    | 07:56:45 (first occurrence)
    123        | CHAT    | CREATED    | 08:13:33
```

- After cleaning the data we're able to create the busines report

In [19]:
# dedup data
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = spark.read.format("parquet").load("content/lake/silver/messages/data")
dedup = df.withColumn("row_number", F.row_number().over(Window.partitionBy("message_id", "event_type", "channel").orderBy("timestamp"))).filter("row_number = 1").drop("row_number")

### Report 1
  - Aggregate data by date, event_type and channel
  - Count number of messages
  - pivot event_type from rows into columns
  - schema expected:
  
```
|      date|channel|CLICKED|CREATED|OPEN|RECEIVED|SENT|
+----------+-------+-------+-------+----+--------+----+
|2024-12-03|    SMS|      4|      4|   1|       1|   5|
|2024-12-03|   CHAT|      3|      7|   5|       8|   4|
|2024-12-03|   PUSH|   NULL|      3|   4|       3|   4|
```

In [20]:
# report 1
report1 = (
    dedup.groupBy(F.col("date"), F.col("channel"))
    .pivot("event_type", ["CLICKED", "CREATED", "OPEN", "RECEIVED", "SENT"])
    .count()
    .fillna(0)
)

# Mostrar o relatório 1
report1.show()

+----------+-------+-------+-------+----+--------+----+
|      date|channel|CLICKED|CREATED|OPEN|RECEIVED|SENT|
+----------+-------+-------+-------+----+--------+----+
|2024-12-13|   PUSH|      0|      1|   2|       0|   2|
|2024-12-13|    SMS|      2|      1|   1|       0|   1|
|2024-12-13|   CHAT|      4|      1|   0|       0|   2|
|2024-12-13|  EMAIL|      1|      0|   0|       2|   0|
|2024-12-13|  OTHER|      2|      0|   0|       3|   4|
+----------+-------+-------+-------+----+--------+----+



## Report 2

- Identify the most active users by channel (sorted by number of iterations)
- schema expected:

```
+-------+----------+----+-----+-----+----+---+
|user_id|iterations|CHAT|EMAIL|OTHER|PUSH|SMS|
+-------+----------+----+-----+-----+----+---+
|   1022|         5|   2|    0|    1|   0|  2|
|   1004|         4|   1|    1|    1|   1|  0|
|   1013|         4|   0|    0|    2|   1|  1|
|   1020|         4|   2|    0|    1|   1|  0|
```


In [21]:
# report 2
report2 = (
    dedup.groupBy(F.col("user_id"), F.col("channel"))
    .count()
    .groupBy("user_id")
    .pivot("channel", ["CHAT", "EMAIL", "OTHER", "PUSH", "SMS"])
    .sum("count")
    .fillna(0)
)

# Adding iterations column
report2 = report2.withColumn("iterations", sum(report2[col] for col in ["CHAT", "EMAIL", "OTHER", "PUSH", "SMS"]))

# Ordering by iteration
report2 = report2.orderBy(F.desc("iterations"))

report2.show()

+-------+----+-----+-----+----+---+----------+
|user_id|CHAT|EMAIL|OTHER|PUSH|SMS|iterations|
+-------+----+-----+-----+----+---+----------+
|   1010|   0|    0|    1|   1|  1|         3|
|   1049|   1|    0|    2|   0|  0|         3|
|   1014|   0|    0|    1|   1|  1|         3|
|   1025|   0|    0|    1|   0|  1|         2|
|   1021|   1|    0|    1|   0|  0|         2|
|   1026|   1|    0|    0|   1|  0|         2|
|   1032|   1|    0|    0|   1|  0|         2|
|   1020|   1|    0|    0|   0|  1|         2|
|   1003|   2|    0|    0|   0|  0|         2|
|   1040|   0|    1|    0|   0|  1|         2|
|   1005|   0|    0|    1|   0|  0|         1|
|   1047|   0|    0|    0|   1|  0|         1|
|   1028|   0|    1|    0|   0|  0|         1|
|   1050|   1|    0|    0|   0|  0|         1|
|   1035|   0|    0|    0|   0|  1|         1|
|   1045|   0|    0|    0|   1|  0|         1|
|   1017|   1|    0|    0|   0|  0|         1|
|   1036|   0|    1|    0|   0|  0|         1|
|   1015|   0

# Challenge 3

In [None]:
# Theoretical question:

# A new usecase requires the message data to be aggregate in near real time
# They want to build a dashboard embedded in the platform website to analyze message data in low latency (few minutes)
# This application will access directly the data aggregated by streaming process

# Q1:
- What would be your suggestion to achieve that using Spark Structure Streaming?
Or would you choose a different data processing tool?

- Which storage would you use and why? (database?, data lake?, kafka?)


I would use Spark Structured Streaming to process data in time windows, for example, every minute.
Kafka would be used to ingest the messages, and Spark would aggregate the data in real time using groupBy and window functions.
The processed data would then be written to a database for quick access and real-time analytics, while the raw data would be stored in a data lake for potential future use.




In [23]:
import shutil

shutil.rmtree("content/lake/bronze/messages", ignore_errors=True)
shutil.rmtree("content/lake/silver/messages", ignore_errors=True)
shutil.rmtree("content/lake/silver/messages_corrupted", ignore_errors=True)
shutil.rmtree("content/lake/silver/checkpoint", ignore_errors=True)


import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from faker import Faker
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder.appName('Test streaming').getOrCreate()
sc = spark.sparkContext

fake = Faker()
messages = [fake.uuid4() for _ in range(50)]

shutil.rmtree("content/lake/bronze/messages", ignore_errors=True)
shutil.rmtree("content/lake/silver/messages", ignore_errors=True)
shutil.rmtree("content/lake/silver/messages_corrupted", ignore_errors=True)


def enrich_data(df, messages=messages):
  fake = Faker()
  new_columns = {
      'event_type': F.lit(fake.random_element(elements=('OPEN', 'RECEIVED', 'SENT', 'CREATED', 'CLICKED', '', 'NONE'))),
      'message_id': F.lit(fake.random_element(elements=messages)),
      'channel': F.lit(fake.random_element(elements=('CHAT', 'EMAIL', 'SMS', 'PUSH', 'OTHER'))),
      'country_id': F.lit(fake.random_int(min=2000, max=2015)),
      'user_id': F.lit(fake.random_int(min=1000, max=1050)),
      'event_status': F.lit(fake.random_element(elements=('SUCCESS', 'FAILED', 'PENDING', 'IN_PROGRESS', 'CANCELLED', "NONE", ""))),
  }
  df = df.withColumns(new_columns)
  return df

def insert_messages(df: DataFrame, batch_id):
  enrich = enrich_data(df)
  enrich.write.mode("append").format("parquet").save("content/lake/bronze/messages/data")

# read stream
df_stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# write stream
query = (df_stream.writeStream
.outputMode('append')
.option('checkpointLocation', 'content/lake/bronze/messages/checkpoint')
.trigger(processingTime='1 seconds')
.foreachBatch(insert_messages)
.start()
)

# Run for at least 1 minute
query.awaitTermination(60)

# Stop the query
query.stop()


countries = [
    {"country_id": 2000, "country": "Brazil"},
    {"country_id": 2001, "country": "Portugal"},
    {"country_id": 2002, "country": "Spain"},
    {"country_id": 2003, "country": "Germany"},
    {"country_id": 2004, "country": "France"},
    {"country_id": 2005, "country": "Italy"},
    {"country_id": 2006, "country": "United Kingdom"},
    {"country_id": 2007, "country": "United States"},
    {"country_id": 2008, "country": "Canada"},
    {"country_id": 2009, "country": "Australia"},
    {"country_id": 2010, "country": "Japan"},
    {"country_id": 2011, "country": "China"},
    {"country_id": 2012, "country": "India"},
    {"country_id": 2013, "country": "South Korea"},
    {"country_id": 2014, "country": "Russia"},
    {"country_id": 2015, "country": "Argentina"}
]

countries = spark.createDataFrame(countries)


#Setting schemas
message_schema = StructType([
    StructField("event_status", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("message_id", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("country_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("timestamp", TimestampType(), True)
])

countries_schema = StructType([
    StructField("country", StringType(), True),
    StructField("country_id", IntegerType(), True)
])


# reading from Bronze layer
bronze_stream = (spark.readStream
                 .schema(message_schema)
                 .format("parquet")
                 .load("content/lake/bronze/messages/data"))


#Function to process each batch
def process_batch(df, batch_id):

  df_with_country = df.join(countries, on="country_id")

  df_with_country = df_with_country.withColumn("date", F.to_date("timestamp"))

  df_country_corrupted = df_with_country.filter(
        (F.col("event_status").isNull()) |
        (F.col("event_status") == "") |
        (F.col("event_status") == "NONE")
    )
  df_country_non_corrupted = df_with_country.filter(
        ~((F.col("event_status").isNull()) |
          (F.col("event_status") == "") |
          (F.col("event_status") == "NONE"))
    )

    # Write corrupted data
  (df_country_corrupted.write
     .mode("append")
     .partitionBy("date")
     .format("parquet")
     .save("content/lake/silver/messages_corrupted/data"))

    # Write not corrupted data
  (df_country_non_corrupted.write
     .mode("append")
     .partitionBy("date")
     .format("parquet")
     .save("content/lake/silver/messages/data"))



query2 = (bronze_stream.writeStream
         .outputMode("append")
         .foreachBatch(process_batch)
         .option("checkpointLocation", "content/lake/silver/checkpoint")
         .trigger(processingTime="5 seconds")
         .start())


query2.awaitTermination(20)

query2.stop()


bronze_df = spark.read.format("parquet").load("content/lake/bronze/messages/data")
bronze_count = bronze_df.count()

messages_df = spark.read.format("parquet").load("content/lake/silver/messages/data")
silver_messages_count = messages_df.count()

corrupted_df = spark.read.format("parquet").load("content/lake/silver/messages_corrupted/data")
silver_corrupted_count = corrupted_df.count()

total_silver_count = silver_messages_count + silver_corrupted_count

if bronze_count == total_silver_count:
    print("Results match")
else:
    print("Error, results do not match")


# dedup data

df = spark.read.format("parquet").load("content/lake/silver/messages/data")
dedup = df.withColumn("row_number", F.row_number().over(Window.partitionBy("message_id", "event_type", "channel").orderBy("timestamp"))).filter("row_number = 1").drop("row_number")


# report 1
report1 = (
    dedup.groupBy(F.col("date"), F.col("channel"))
    .pivot("event_type", ["CLICKED", "CREATED", "OPEN", "RECEIVED", "SENT"])
    .count()
    .fillna(0)
)

# Mostrar o relatório 1
report1.show()

# report 2
report2 = (
    dedup.groupBy(F.col("user_id"), F.col("channel"))
    .count()
    .groupBy("user_id")
    .pivot("channel", ["CHAT", "EMAIL", "OTHER", "PUSH", "SMS"])
    .sum("count")
    .fillna(0)
)

# Adding iterations column
report2 = report2.withColumn("iterations", sum(report2[col] for col in ["CHAT", "EMAIL", "OTHER", "PUSH", "SMS"]))

# Ordering by iteration
report2 = report2.orderBy(F.desc("iterations"))

report2.show()

Results match
+----------+-------+-------+-------+----+--------+----+
|      date|channel|CLICKED|CREATED|OPEN|RECEIVED|SENT|
+----------+-------+-------+-------+----+--------+----+
|2024-12-13|   PUSH|      2|      0|   2|       0|   0|
|2024-12-13|    SMS|      0|      2|   0|       1|   2|
|2024-12-13|   CHAT|      3|      1|   3|       2|   2|
|2024-12-13|  EMAIL|      0|      2|   1|       3|   1|
|2024-12-13|  OTHER|      3|      3|   0|       2|   2|
+----------+-------+-------+-------+----+--------+----+

+-------+----+-----+-----+----+---+----------+
|user_id|CHAT|EMAIL|OTHER|PUSH|SMS|iterations|
+-------+----+-----+-----+----+---+----------+
|   1021|   1|    1|    1|   0|  0|         3|
|   1037|   1|    1|    1|   0|  0|         3|
|   1024|   2|    0|    0|   0|  1|         3|
|   1009|   1|    0|    0|   1|  1|         3|
|   1046|   2|    0|    0|   0|  0|         2|
|   1008|   0|    0|    2|   0|  0|         2|
|   1047|   0|    0|    1|   1|  0|         2|
|   1048|  