# Initialization

In [1]:
import json
import uuid
import os
import json
from dotenv import load_dotenv
from pathlib import Path
from kafka import KafkaProducer
from faker import Faker
from time import sleep

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Ruang Data Project Spark-Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [9]:
streaming = (
    spark
    .readStream
    .schema(dataSchema)
    .option('maxFilesPerTrigger', 1)
    .json('/resources/data/activity-data/')
)

In [10]:
# set partitions
spark.conf.set('spark.sql.shuffle.partitions', 5)

In [11]:
activityCounts = streaming.select('index').distinct()
activityQuery = (
    activityCounts.writeStream
    .queryName('activity_counts_3')
    .format('memory')
    .outputMode('append')
    .start()
)

# activityQuery.awaitTermination()

In [13]:
# activityQuery.awaitTermination()
activityQuery.stop()

In [12]:
from time import sleep
for x in range(5):
    spark.sql("SELECT COUNT(*) FROM activity_counts_3").show()
    sleep(1)

+--------+
|count(1)|
+--------+
|  290684|
+--------+

+--------+
|count(1)|
+--------+
|  320527|
+--------+

+--------+
|count(1)|
+--------+
|  340258|
+--------+

+--------+
|count(1)|
+--------+
|  353626|
+--------+

+--------+
|count(1)|
+--------+
|  362586|
+--------+



# Spark - Kafka Streaming

In [3]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [4]:
kafka_host = os.getenv('KAFKA_HOST')
kafka_topic = os.getenv('KAFKA_TOPIC_NAME')
kafka_topic_partition = os.getenv('KAFKA_TOPIC_NAME')+"-1"

## Batch Simulation

In [5]:
kafka_df = (
    spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", f'{kafka_host}:9092')
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [6]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
kafka_df.show()

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|null|[7B 22 65 6D 70 5...|test-topic|        0| 16381|2025-01-19 00:42:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0| 16382|2025-01-19 00:42:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0| 16383|2025-01-19 00:43:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0| 16384|2025-01-19 00:43:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0| 16385|2025-01-19 00:43:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0| 16386|2025-01-19 00:44:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0| 16387|2025-01-19 00:44:...|            0|
|null|[7B 22 65 6D 70 5...|test-topic|        0| 16388|2025-01-19 00:44:...|            0|

In [8]:
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [11]:
kafka_json_df.show(5)

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|null|{"emp_id": "b6b86...|test-topic|        0| 16381|2025-01-19 00:42:...|            0|
|null|{"emp_id": "9aa3c...|test-topic|        0| 16382|2025-01-19 00:42:...|            0|
|null|{"emp_id": "2df54...|test-topic|        0| 16383|2025-01-19 00:43:...|            0|
|null|{"emp_id": "53661...|test-topic|        0| 16384|2025-01-19 00:43:...|            0|
|null|{"emp_id": "dcdf9...|test-topic|        0| 16385|2025-01-19 00:43:...|            0|
+----+--------------------+----------+---------+------+--------------------+-------------+
only showing top 5 rows



In [16]:
(
    kafka_json_df
    .select('value')
    .limit(5)
    .collect()
)

[Row(value='{"emp_id": "b6b86170-bdc0-4ffc-8eac-ad050509bf0b", "employee_name": "Charles Burgess", "department": "Marketing", "state": "IL", "salary": 16180, "age": 21, "bonus": 71805, "ts": 1639047747}'),
 Row(value='{"emp_id": "9aa3c7c3-4c65-457f-808a-02da577361e3", "employee_name": "Paula Haynes", "department": "Marketing", "state": "CA", "salary": 36555, "age": 24, "bonus": 60555, "ts": 1071562864}'),
 Row(value='{"emp_id": "2df54166-8549-4aa5-a0cf-ae9b58d0e6fa", "employee_name": "Leslie Carson", "department": "Marketing", "state": "IL", "salary": 128007, "age": 60, "bonus": 18210, "ts": 1102291925}'),
 Row(value='{"emp_id": "53661acb-0696-4534-a3f4-259b3d034c95", "employee_name": "Mrs. Cathy Fernandez", "department": "Marketing", "state": "NY", "salary": 91764, "age": 46, "bonus": 10094, "ts": 1198743107}'),
 Row(value='{"emp_id": "dcdf90ae-6acc-44c8-b958-3a83d19ac60a", "employee_name": "Sara Mcdonald", "department": "Sales", "state": "TX", "salary": 117205, "age": 37, "bonus": 96

In [19]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

schema = StructType(
    [
        StructField("emp_id", StringType(), True),
        StructField("employee_name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("state", StringType(), True),
        StructField("salary", LongType(), True),
        StructField("age", IntegerType(), True),
        StructField("bonus", LongType(), True),
        StructField("ts", LongType(), True),
    ]
)

In [20]:
from pyspark.sql.functions import from_json, col

(
    kafka_json_df
    .select(
        from_json(col("value"), schema)
        .alias("data")
    )
    .select("data.*")
    .show()
)

+--------------------+--------------------+----------+-----+------+---+-----+----------+
|              emp_id|       employee_name|department|state|salary|age|bonus|        ts|
+--------------------+--------------------+----------+-----+------+---+-----+----------+
|b6b86170-bdc0-4ff...|     Charles Burgess| Marketing|   IL| 16180| 21|71805|1639047747|
|9aa3c7c3-4c65-457...|        Paula Haynes| Marketing|   CA| 36555| 24|60555|1071562864|
|2df54166-8549-4aa...|       Leslie Carson| Marketing|   IL|128007| 60|18210|1102291925|
|53661acb-0696-453...|Mrs. Cathy Fernandez| Marketing|   NY| 91764| 46|10094|1198743107|
|dcdf90ae-6acc-44c...|       Sara Mcdonald|     Sales|   TX|117205| 37|96215| 811795364|
|72cbfc4e-d9c8-44b...|         Brian Henry|        IT|   RJ| 26584| 20|20444|1102492295|
|95f8944b-48a5-406...|      Evelyn Hampton|        IT|   RJ| 27185| 40|14750|1507670466|
|a60e913d-f217-4e2...|      Joseph Griffin|        IT|   CA| 63632| 54|33758|1026610775|
|d929b74e-325b-40f...

## Stream Simulation

In [21]:
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", f'{kafka_host}:9092')
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [22]:
from pyspark.sql.functions import from_json, col

parsed_df = (
    kafka_df
    .withColumn("value", expr("cast(value as string)"))
    .select(
        from_json(col("value"), schema)
        .alias("data")
    )
    .select("data.*")
)

In [None]:
(
    parsed_df
    .writeStream
    .format("console")
    .outputMode("append")
    # .trigger(processingTime='5 seconds')
    # .trigger(continuous='1 second')
    # .trigger(once=true)
    .option("checkpointLocation", "checkpoint_dir")
    .start()
    .awaitTermination()
)