# Initialization

In [1]:
import json
import uuid
import os
import json
from dotenv import load_dotenv
from pathlib import Path
from kafka import KafkaProducer
from faker import Faker
from time import sleep

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Dibimbing Spark-Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [9]:
streaming = (
    spark
    .readStream
    .schema(dataSchema)
    .option('maxFilesPerTrigger', 1)
    .json('/resources/data/activity-data/')
)

In [10]:
# set partitions
spark.conf.set('spark.sql.shuffle.partitions', 5)

In [11]:
activityCounts = streaming.select('index').distinct()
activityQuery = (
    activityCounts.writeStream
    .queryName('activity_counts_3')
    .format('memory')
    .outputMode('append')
    .start()
)

# activityQuery.awaitTermination()

In [13]:
# activityQuery.awaitTermination()
activityQuery.stop()

In [12]:
from time import sleep
for x in range(5):
    spark.sql("SELECT COUNT(*) FROM activity_counts_3").show()
    sleep(1)

+--------+
|count(1)|
+--------+
|  290684|
+--------+

+--------+
|count(1)|
+--------+
|  320527|
+--------+

+--------+
|count(1)|
+--------+
|  340258|
+--------+

+--------+
|count(1)|
+--------+
|  353626|
+--------+

+--------+
|count(1)|
+--------+
|  362586|
+--------+



# Spark - Kafka Streaming

In [3]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [4]:
kafka_host = os.getenv('KAFKA_HOST')
kafka_topic = os.getenv('KAFKA_TOPIC_NAME')
kafka_topic_partition = os.getenv('KAFKA_TOPIC_NAME')+"-1"

## Batch Simulation

In [5]:
kafka_df = (
    spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", f'{kafka_host}:9092')
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [6]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
kafka_df.show()

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|null|[7B 22 65 6D 70 5...|test-topic|        0|     0|2025-01-26 06:47:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0|     1|2025-01-26 06:47:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0|     2|2025-01-26 06:47:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0|     3|2025-01-26 06:49:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0|     4|2025-01-26 06:49:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0|     5|2025-01-26 07:04:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0|     6|2025-01-26 07:04:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0|     7|2025-01-26 07:04:...|            0|

In [8]:
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [9]:
kafka_json_df.show(5)

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|null|{"emp_id": "ea349...|test-topic|        0|     0|2025-01-26 06:47:...|            0|
|null|{"emp_id": "60039...|test-topic|        0|     1|2025-01-26 06:47:...|            0|
|null|{"emp_id": "e88bb...|test-topic|        0|     2|2025-01-26 06:47:...|            0|
|null|{"emp_id": "e3938...|test-topic|        0|     3|2025-01-26 06:49:...|            0|
|null|{"emp_id": "4df30...|test-topic|        0|     4|2025-01-26 06:49:...|            0|
+----+--------------------+----------+---------+------+--------------------+-------------+
only showing top 5 rows



In [10]:
(
    kafka_json_df
    .select('value')
    .limit(5)
    .collect()
)

[Row(value='{"emp_id": "ea3492b4-cfb6-41e2-bc01-df488dffce97", "employee_name": "Mike Russell", "department": "Sales", "state": "RJ", "salary": 111203, "age": 25, "bonus": 99728, "ts": 1094101705}'),
 Row(value='{"emp_id": "600396e6-9532-4afd-9369-6063bdc1d1d4", "employee_name": "Eugene Welch", "department": "Marketing", "state": "TX", "salary": 31921, "age": 35, "bonus": 30321, "ts": 1527044999}'),
 Row(value='{"emp_id": "e88bb36b-a839-4e2b-8620-ee2d99b7332c", "employee_name": "Adam Johnson", "department": "Marketing", "state": "NY", "salary": 86465, "age": 58, "bonus": 75891, "ts": 1080657041}'),
 Row(value='{"emp_id": "e39388e1-0ed6-46ee-a220-5b7226cd3a23", "employee_name": "Deanna Burton", "department": "IT", "state": "RJ", "salary": 85386, "age": 30, "bonus": 20336, "ts": 169859000}'),
 Row(value='{"emp_id": "4df300f1-0e27-4e69-b5b9-ba17a51bcb9e", "employee_name": "Catherine Stewart", "department": "HR", "state": "CA", "salary": 128457, "age": 27, "bonus": 84444, "ts": 575151972}'

In [11]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

schema = StructType(
    [
        StructField("emp_id", StringType(), True),
        StructField("employee_name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("state", StringType(), True),
        StructField("salary", LongType(), True),
        StructField("age", IntegerType(), True),
        StructField("bonus", LongType(), True),
        StructField("ts", LongType(), True),
    ]
)

In [12]:
from pyspark.sql.functions import from_json, col

(
    kafka_json_df
    .select(
        from_json(col("value"), schema)
        .alias("data")
    )
    .select("data.*")
    .show()
)

+--------------------+------------------+----------+-----+------+---+-----+----------+
|              emp_id|     employee_name|department|state|salary|age|bonus|        ts|
+--------------------+------------------+----------+-----+------+---+-----+----------+
|ea3492b4-cfb6-41e...|      Mike Russell|     Sales|   RJ|111203| 25|99728|1094101705|
|600396e6-9532-4af...|      Eugene Welch| Marketing|   TX| 31921| 35|30321|1527044999|
|e88bb36b-a839-4e2...|      Adam Johnson| Marketing|   NY| 86465| 58|75891|1080657041|
|e39388e1-0ed6-46e...|     Deanna Burton|        IT|   RJ| 85386| 30|20336| 169859000|
|4df300f1-0e27-4e6...| Catherine Stewart|        HR|   CA|128457| 27|84444| 575151972|
|02d0fc67-c985-4d0...|   Deanna Gonzalez|        HR|   NY|110263| 25|42047| 264348802|
|d8eecc84-d825-4cc...|      Sara Salazar| Marketing|   FL| 75972| 41|54943| 212073133|
|bbd316e4-685c-4c4...|       Erik Murphy| Marketing|   NY| 13751| 51|30377|1736559455|
|23462f41-4360-4d3...|     Crystal Evans| M

## Stream Simulation

In [31]:
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", f'{kafka_host}:9092')
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [32]:
from pyspark.sql.functions import from_json, col

parsed_df = (
    kafka_df
    .withColumn("value", expr("cast(value as string)"))
    .select(
        from_json(col("value"), schema)
        .alias("data")
    )
    .select("data.*")
)

In [33]:
record_count = parsed_df.selectExpr("COUNT(*) AS total_records")

In [34]:
(
    record_count
    .writeStream
    .outputMode("complete")  # Use "complete" to show the aggregation result
    .format("console")
    .start()
    .awaitTermination()
)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [30]:
(
    parsed_df
    .writeStream
    .format("console")
    .outputMode("append")
    .trigger(processingTime='5 seconds')
    # .trigger(continuous='1 second')
    # .trigger(once=true)
    .option("checkpointLocation", "checkpoint_dir_3")
    .start()
    .awaitTermination()
)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [22]:
activityCounts = parsed_df.select('emp_id').distinct()
activityQuery = (
    activityCounts.writeStream
    .queryName('activity_counts_5')
    .format('memory')
    .outputMode('append')
    .start()
)

In [None]:
# activityQuery.awaitTermination()
activityQuery.stop()

In [24]:
from time import sleep
for x in range(5):
    spark.sql("SELECT COUNT(*) FROM activity_counts_5").show()
    sleep(1)

+--------+
|count(1)|
+--------+
|     651|
+--------+

+--------+
|count(1)|
+--------+
|     653|
+--------+

+--------+
|count(1)|
+--------+
|     653|
+--------+

+--------+
|count(1)|
+--------+
|     655|
+--------+

+--------+
|count(1)|
+--------+
|     657|
+--------+

