# Producer (30 points)

In a separate file, in the repo.

# Consumer

## Dataset and Stream creation (10 points)

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Final project: Structured Streaming") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("INFO")

# Optimization (reduce the number of shuffle partitions)
spark.conf.set("spark.sql.shuffle.partitions", "5")

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2.5.2/cache
The jars for the packages stored in: /root/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ce8f9f10-77fa-45dc-86e8-60d3a7a65e25;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.0.0 in central
	found org.apache.kafka#kafka-clients;3.9.0 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.7 in central
	found org.slf4j#slf4j-api;2.0.16 in central
	found org.apache.hadoop#hadoop-client-runtime;3.4.1 in central
	found org.apache.hadoop#hadoop-client-api;3.4.1 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.scala-lang.modules#scala-parallel-collections_2.13;1.2.0

In [2]:
# Create the remote connection
kafka_df = spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "kafka:9093") \
            .option("subscribe", "telemetry-project") \
            .load() 
kafka_df.selectExpr(
    "CAST(key AS STRING)", 
    "CAST(value AS STRING)",
    "offset as row_index",
    "timestamp as source_timestamp", 
)


DataFrame[key: string, value: string, row_index: bigint, source_timestamp: timestamp]

In [3]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
from regalado_floriano.spark_utils import SparkUtils

In [5]:
from pyspark.sql import functions as F

In [6]:
schema_keys = (("property_id","int"),
("country","string"),
("city","string"),
("property_type","string"),
("furnishing_status","string"),
("property_size_sqft","int"),
("price","int"),
("constructed_year","int"),
("previous_owners","int"),
("rooms","int"),
("bathrooms","int"),
("garage","bool"),
("garden","bool"),
("crime_cases_reported","int"),
("legal_cases_on_property","bool"),
("customer_salary","int"),
("loan_amount","int"),
("loan_tenure_years","int"),
("monthly_expenses","int"),
("down_payment","int"),
("emi_to_income_ratio","float"),
("satisfaction_score","int"),
("neighbourhood_rating","int"),
("connectivity_score","int"),
("decision","bool")
)
houses_schema = SparkUtils.generate_schema(schema_keys)

## Transformations and Actions (15 points)

In [7]:
from pyspark.sql.functions import explode, split, window

In [8]:
from pyspark.sql.functions import year, month, day, from_json, col 
from pyspark.sql.types import StructField, StringType, ArrayType
from regalado_floriano.spark_utils import SparkUtils

## Transformation 1: Reading JSON

In [9]:
from pyspark.sql.functions import year, month, day, from_json, col 
from pyspark.sql.types import StructField, StringType
from regalado_floriano.spark_utils import SparkUtils

raw_string_column = kafka_df.value.cast("string").alias("value_str")
raw_string_df = kafka_df.select(raw_string_column)
raw_telemetry_df = kafka_df.select( raw_string_column   , "offset","timestamp")
vg_telemetry_df = raw_telemetry_df.withColumn("telemetry", (from_json("value_str", (houses_schema) ) ))
# We need to extract the columns from the input JSON 
 
vg_extracted_df = vg_telemetry_df.withColumn("year", year(vg_telemetry_df.timestamp)) \
                                      .withColumn("month", month(vg_telemetry_df.timestamp)) \
                                      .withColumn("day", day(vg_telemetry_df.timestamp))

In [10]:
to_select = []
for key, type_ in schema_keys:
    to_select.append(f"telemetry.{key}")
to_select.extend(["timestamp","offset","value_str","year","month","day"])
house_df = vg_extracted_df.select(to_select)  

In [11]:
house_df.printSchema()

root
 |-- property_id: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- furnishing_status: string (nullable = true)
 |-- property_size_sqft: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- constructed_year: integer (nullable = true)
 |-- previous_owners: integer (nullable = true)
 |-- rooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- garage: boolean (nullable = true)
 |-- garden: boolean (nullable = true)
 |-- crime_cases_reported: integer (nullable = true)
 |-- legal_cases_on_property: boolean (nullable = true)
 |-- customer_salary: integer (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- loan_tenure_years: integer (nullable = true)
 |-- monthly_expenses: integer (nullable = true)
 |-- down_payment: integer (nullable = true)
 |-- emi_to_income_ratio: float (nullable = true)
 |-- satisfaction_score: integer (nullable 

In [12]:
from pyspark.sql.functions import year, month, day, from_json, col

## Transformation 2: Add sale info

In [13]:
houses_with_sales = house_df.withColumn("sale_total", house_df.price * house_df.decision.cast("int") )

## Transformation 3: Window and Group By Countries

In [18]:
windowed_houses = (
    houses_with_sales
        .withWatermark("timestamp", "1 minute")
        .groupBy(
            window("timestamp", "30 seconds", "15 seconds"),
            F.col("country"),
        )
)


## Transformation 4: Aggregate Sales

In [23]:
aggregated_houses = windowed_houses.agg(
    F.sum(houses_with_sales.sale_total).alias("sale_total"),
    F.max(houses_with_sales.sale_total).alias("biggest_sale"),
    F.avg(houses_with_sales.sale_total).alias("average_sale"),
    F.avg(houses_with_sales.satisfaction_score).alias("average_satisfaction") 
)\
    .withColumn("year",  F.year("window.start")) \
    .withColumn("month", F.month("window.start"))

# Persistence Data

In [25]:
query_files = aggregated_houses.writeStream \
                .trigger(processingTime="10 seconds") \
                .partitionBy("country", "year", "month") \
                .format("parquet") \
                .option("header", "true") \
                .option("path", "/opt/spark/work-dir/data/final_streaming/") \
                .option("checkpointLocation", "/opt/spark/work-dir/final_streaming") \
                .start()


25/11/24 03:48:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 28
-------------------------------------------
+------+-------+----------+------------+------------+--------------------+
|window|country|sale_total|biggest_sale|average_sale|average_satisfaction|
+------+-------+----------+------------+------------+--------------------+
+------+-------+----------+------------+------------+--------------------+

-------------------------------------------
Batch: 37
-------------------------------------------
+------+-------+----+-----+----------+------------+------------+--------------------+
|window|country|year|month|sale_total|biggest_sale|average_sale|average_satisfaction|
+------+-------+----+-----+----------+------------+------------+--------------------+
+------+-------+----+-----+----------+------------+------------+--------------------+

-------------------------------------------
Batch: 30
-------------------------------------------
+------+-------+----+-----+----------+------------+---------

                                                                                

-------------------------------------------
Batch: 39
-------------------------------------------
+--------------------+------------+----+-----+----------+------------+------------------+--------------------+
|              window|     country|year|month|sale_total|biggest_sale|      average_sale|average_satisfaction|
+--------------------+------------+----+-----+----------+------------+------------------+--------------------+
|{2025-11-24 03:47...|   Singapore|2025|   11|  77217466|     4111516| 977436.2784810127|  5.6835443037974684|
|{2025-11-24 03:47...|       China|2025|   11| 254132079|     4155170| 1182009.669767442|   5.944186046511628|
|{2025-11-24 03:47...|         UAE|2025|   11| 140424034|     4019121|1032529.6617647059|   5.617647058823529|
|{2025-11-24 03:47...|      Canada|2025|   11| 183521485|     4164028|  890880.995145631|   5.805825242718447|
|{2025-11-24 03:47...|      France|2025|   11| 241316860|     4196135| 1237522.358974359|   5.671794871794872|
|{2025-11-24 0

In [None]:
query_files.awaitTermination()

                                                                                

-------------------------------------------
Batch: 50
-------------------------------------------
+------+-------+----+-----+----------+------------+------------+--------------------+
|window|country|year|month|sale_total|biggest_sale|average_sale|average_satisfaction|
+------+-------+----+-----+----------+------------+------------+--------------------+
+------+-------+----+-----+----------+------------+------------+--------------------+



                                                                                

-------------------------------------------
Batch: 57
-------------------------------------------
+------+-------+----+-----+----------+------------+------------+--------------------+
|window|country|year|month|sale_total|biggest_sale|average_sale|average_satisfaction|
+------+-------+----+-----+----------+------------+------------+--------------------+
+------+-------+----+-----+----------+------------+------------+--------------------+

-------------------------------------------
Batch: 48
-------------------------------------------
+------+-------+----------+------------+------------+--------------------+
|window|country|sale_total|biggest_sale|average_sale|average_satisfaction|
+------+-------+----------+------------+------------+--------------------+
+------+-------+----------+------------+------------+--------------------+

-------------------------------------------
Batch: 49
-------------------------------------------
+--------------------+------------+----------+------------+-