In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType, IntegerType

In [2]:
sc = SparkContext("local", "partitioning")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/26 03:30:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
data_rdd = sc.textFile("./data/task1/departuredelays.csv")

In [9]:
header = data_rdd.first()
data_rdd = data_rdd.filter(lambda line: line != header)

def custom_partition(row):
    origin = row.split(',')[3]
    if origin == 'ATL':
        return 0
    else:
        return hash(origin) % 3 + 1

partitioned_rdd = data_rdd.map(lambda line: (custom_partition(line), line))

for i in range(4):
    partitioned_rdd.filter(lambda x: x[0] == i).values().saveAsTextFile(f"./output/task1/partition_{i}")


                                                                                

In [2]:
spark = SparkSession.builder\
    .config(
        "spark.jars", 
        "./kafka-clients-3.5.0.jar,./spark-sql-kafka-0-10_2.12-3.5.0.jar, \
        ./spark-token-provider-kafka-0-10_2.12-3.5.0.jar, \
        ./commons-pool2-2.12.0.jar") \
    .config("spark.sql.shuffle.partitions", "3") \
    .config("spark.executorEnv.PYSPARK_PYTHON","/root/anaconda3/bin/python") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.log.level", "ERROR") \
    .appName("HW03").getOrCreate()

24/05/10 02:40:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Setting Spark log level to "ERROR".


In [3]:
streaming_df = spark.readStream \
    .format("json") \
    .option("path", "./data/task2/activity-data/") \
    .schema("Arrival_Time long, Creation_Time long, Device string, Index long, Model string, User string, gt string, x double, y double, z double") \
    .load()
streaming_df.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



In [4]:
window_query = streaming_df.withColumn("Creation_Time_seconds", col("Creation_Time") / 1000000000) \
    .withColumn("Creation_Time_timestamp", from_unixtime(col("Creation_Time_seconds"))) \
    .groupBy("User", window("Creation_Time_timestamp", "6 minute", "3 minute")).count() \
    .writeStream.queryName("activity_query").format("memory").outputMode("update").trigger(processingTime="2 seconds") \
    .option("checkpointLocation", "./checkpoint") \
    .start()

24/05/08 00:54:23 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/05/08 00:54:29 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 5800 milliseconds


In [5]:
window_df = spark.sql("select * from activity_query")
window_df.show(3, truncate = False)

+----+------------------------------------------+-----+
|User|window                                    |count|
+----+------------------------------------------+-----+
|g   |{2015-02-23 10:45:00, 2015-02-23 10:51:00}|62201|
|g   |{2015-02-23 10:15:00, 2015-02-23 10:21:00}|18846|
|g   |{2015-02-23 10:21:00, 2015-02-23 10:27:00}|57165|
+----+------------------------------------------+-----+
only showing top 3 rows



In [8]:
df_updated = None

def write_to_memory_and_parquet(df, epoch_id):
    global df_updated
    if df_updated is None:
        df_updated = df.toDF("User", "window", "count")
    else:
        df_updated = df_updated.union(df.toDF("User", "window", "count"))
    df.persist() 
    df.write.format("parquet").mode("append").save("./output/task2/parquet")

window_query_updated = streaming_df.withColumn("Creation_Time_seconds", col("Creation_Time") / 1000000000) \
    .withColumn("Creation_Time_timestamp", from_unixtime(col("Creation_Time_seconds"))) \
    .groupBy("User", window("Creation_Time_timestamp", "6 minute", "3 minute")).count() \
    .writeStream \
    .queryName("activity_query") \
    .outputMode("update") \
    .foreachBatch(write_to_memory_and_parquet) \
    .trigger(processingTime='2 seconds') \
    .option("checkpointLocation", "./ckpt_parquet/") \
    .start()

                                                                                

In [10]:
df_updated.show(truncate = False)

+----+------------------------------------------+-----+
|User|window                                    |count|
+----+------------------------------------------+-----+
|g   |{2015-02-23 10:45:00, 2015-02-23 10:51:00}|62201|
|g   |{2015-02-23 10:15:00, 2015-02-23 10:21:00}|18846|
|g   |{2015-02-23 10:21:00, 2015-02-23 10:27:00}|57165|
|g   |{2015-02-23 10:54:00, 2015-02-23 11:00:00}|92614|
|g   |{2015-02-23 10:24:00, 2015-02-23 10:30:00}|64541|
|g   |{2015-02-23 11:03:00, 2015-02-23 11:09:00}|67425|
|g   |{2015-02-23 10:36:00, 2015-02-23 10:42:00}|59742|
|g   |{2015-02-23 11:09:00, 2015-02-23 11:15:00}|55430|
|g   |{2015-02-23 11:12:00, 2015-02-23 11:18:00}|53499|
|g   |{2015-02-23 10:42:00, 2015-02-23 10:48:00}|55498|
|g   |{2015-02-23 11:18:00, 2015-02-23 11:24:00}|57777|
|g   |{2015-02-23 11:21:00, 2015-02-23 11:27:00}|55715|
|c   |{2015-02-23 12:42:00, 2015-02-23 12:48:00}|65718|
|c   |{2015-02-23 12:09:00, 2015-02-23 12:15:00}|14805|
|c   |{2015-02-23 12:18:00, 2015-02-23 12:24:00}

In [12]:
q3_df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "q3") \
    .option("startingOffsets", "earliest") \
    .load()
q3_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [14]:
streaming_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "q3") \
    .option("failOnDataLoss", "false") \
    .load()

# Convert value column from binary to string
streaming_df = streaming_df.selectExpr("CAST(value AS STRING)")

# Parse CSV content and select relevant columns
parsed_df = streaming_df.selectExpr("split(value, ',') as columns") \
    .selectExpr("columns[7] as action", "columns[9] as gender")

# Filter action value equal to 2
filtered_df = parsed_df.filter("action = '2' and (gender = '0' or gender = '1')")

# Define watermark delay
watermark_delay = "10 seconds"

# Define windowed aggregation
windowed_df = filtered_df \
    .withColumn("timestamp", expr("current_timestamp()")) \
    .withWatermark("timestamp", watermark_delay) \
    .groupBy(window("timestamp", "5 seconds"), "gender") \
    .count()

# Write the results into memory with complete mode
query = windowed_df.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("transaction_count") \
    .option("checkpointLocation", "./q3_checkpoint") \
    .start()


24/05/05 03:09:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [15]:
# import os
# os.system("kafka-topics.sh --bootstrap-server localhost:9092 --topic q3 --delete")

In [21]:
spark.sql("select * from transaction_count").show(20, truncate = False)

+------------------------------------------+------+-----+
|window                                    |gender|count|
+------------------------------------------+------+-----+
|{2024-05-05 03:10:00, 2024-05-05 03:10:05}|0     |161  |
|{2024-05-05 03:14:30, 2024-05-05 03:14:35}|0     |119  |
|{2024-05-05 03:18:00, 2024-05-05 03:18:05}|0     |133  |
|{2024-05-05 03:17:45, 2024-05-05 03:17:50}|1     |150  |
|{2024-05-05 03:12:40, 2024-05-05 03:12:45}|0     |104  |
|{2024-05-05 03:12:30, 2024-05-05 03:12:35}|1     |117  |
|{2024-05-05 03:13:55, 2024-05-05 03:14:00}|1     |107  |
|{2024-05-05 03:11:20, 2024-05-05 03:11:25}|0     |150  |
|{2024-05-05 03:10:25, 2024-05-05 03:10:30}|0     |165  |
|{2024-05-05 03:11:35, 2024-05-05 03:11:40}|0     |153  |
|{2024-05-05 03:11:50, 2024-05-05 03:11:55}|1     |137  |
|{2024-05-05 03:13:00, 2024-05-05 03:13:05}|0     |137  |
|{2024-05-05 03:09:15, 2024-05-05 03:09:20}|0     |117  |
|{2024-05-05 03:14:00, 2024-05-05 03:14:05}|1     |149  |
|{2024-05-05 0