# Initialization

In [1]:
import json
import uuid
import os
import json
from dotenv import load_dotenv
from pathlib import Path
from kafka import KafkaProducer
from faker import Faker
from time import sleep

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Dibimbing Spark-Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [5]:
# set partitions
spark.conf.set('spark.sql.shuffle.partitions', 5)

# Spark - Kafka Streaming

In [6]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [7]:
kafka_host = os.getenv('KAFKA_HOST')
kafka_topic = os.getenv('KAFKA_TOPIC_NAME')
kafka_topic_partition = os.getenv('KAFKA_TOPIC_NAME')+"-1"

## Batch Simulation

In [8]:
kafka_df = (
    spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", f'{kafka_host}:9092')
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [9]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [10]:
kafka_df.show()

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|null|[7B 22 6F 72 64 6...|test-topic|        0|     0|2025-04-12 03:37:...|            0|
|null|[7B 22 6F 72 64 6...|test-topic|        0|     1|2025-04-12 03:37:...|            0|
|null|[7B 22 6F 72 64 6...|test-topic|        0|     2|2025-04-12 03:38:...|            0|
|null|[7B 22 6F 72 64 6...|test-topic|        0|     3|2025-04-12 03:38:...|            0|
|null|[7B 22 6F 72 64 6...|test-topic|        0|     4|2025-04-12 03:38:...|            0|
|null|[7B 22 6F 72 64 6...|test-topic|        0|     5|2025-04-12 03:38:...|            0|
|null|[7B 22 6F 72 64 6...|test-topic|        0|     6|2025-04-12 03:38:...|            0|
|null|[7B 22 6F 72 64 6...|test-topic|        0|     7|2025-04-12 03:38:...|            0|

In [11]:
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [12]:
kafka_json_df.show(5)

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|null|{"order_id": "0cc...|test-topic|        0|     0|2025-04-12 03:37:...|            0|
|null|{"order_id": "677...|test-topic|        0|     1|2025-04-12 03:37:...|            0|
|null|{"order_id": "87c...|test-topic|        0|     2|2025-04-12 03:38:...|            0|
|null|{"order_id": "dfc...|test-topic|        0|     3|2025-04-12 03:38:...|            0|
|null|{"order_id": "26c...|test-topic|        0|     4|2025-04-12 03:38:...|            0|
+----+--------------------+----------+---------+------+--------------------+-------------+
only showing top 5 rows



In [13]:
(
    kafka_json_df
    .select('value')
    .limit(5)
    .collect()
)

[Row(value='{"order_id": "0ccdfd7e-94ad-49d0-81b7-8699c08b2937", "customer_id": 75, "furniture": "Chair", "color": "teal", "price": 10451, "ts": 1744427936}'),
 Row(value='{"order_id": "677fd7f3-16f1-4ca2-aba4-aaa916993d7a", "customer_id": 74, "furniture": "Bed", "color": "fuchsia", "price": 27660, "ts": 1744425519}'),
 Row(value='{"order_id": "87c805bd-ba44-4347-9421-0e6cac9fc031", "customer_id": 65, "furniture": "Bed", "color": "silver", "price": 141173, "ts": 1744426902}'),
 Row(value='{"order_id": "dfc9a66a-6e03-466e-994d-cb88eb512844", "customer_id": 67, "furniture": "Bed", "color": "lime", "price": 102250, "ts": 1744427274}'),
 Row(value='{"order_id": "26c88389-e767-4fa0-b229-c6dbb962033e", "customer_id": 86, "furniture": "Bed", "color": "navy", "price": 95197, "ts": 1744426702}')]

In [14]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

schema = StructType(
    [
        # StructField("emp_id", StringType(), True),
        # StructField("employee_name", StringType(), True),
        # StructField("department", StringType(), True),
        # StructField("state", StringType(), True),
        # StructField("salary", LongType(), True),
        # StructField("age", IntegerType(), True),
        # StructField("bonus", LongType(), True),
        # StructField("ts", LongType(), True),
        StructField("order_id", StringType(), True),
        StructField("customer_id", IntegerType(), True),
        StructField("furniture", StringType(), True),
        StructField("color", StringType(), True),
        StructField("price", LongType(), True),
        StructField("ts", LongType(), True),
        
    ]
)

In [15]:
from pyspark.sql.functions import from_json, col

(
    kafka_json_df
    .select(
        from_json(col("value"), schema)
        .alias("data")
    )
    .select("data.*")
    .show()
)

+--------------------+-----------+---------+-------+------+----------+
|            order_id|customer_id|furniture|  color| price|        ts|
+--------------------+-----------+---------+-------+------+----------+
|0ccdfd7e-94ad-49d...|         75|    Chair|   teal| 10451|1744427936|
|677fd7f3-16f1-4ca...|         74|      Bed|fuchsia| 27660|1744425519|
|87c805bd-ba44-434...|         65|      Bed| silver|141173|1744426902|
|dfc9a66a-6e03-466...|         67|      Bed|   lime|102250|1744427274|
|26c88389-e767-4fa...|         86|      Bed|   navy| 95197|1744426702|
|0cedd4ed-900b-4f8...|          2|      Bed| yellow|135914|1744427945|
|11522abc-e521-4b0...|         35|      Bed|  white| 81696|1744425941|
|cdd3bb61-ddeb-4f6...|          8|     Sofa| purple| 68734|1744428778|
|e60e20e2-f8d5-42c...|         18|    Chair| maroon|110162|1744428290|
|0a115a2b-65ae-4df...|         33|     Desk|   aqua|149617|1744425783|
|4ae30e22-e9f1-4ec...|         36|      Bed|  black| 66634|1744426457|
|4e25d

## Stream Simulation

In [24]:
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", f'{kafka_host}:9092')
    .option("subscribe", kafka_topic)
    .option("failOnDataLoss", False)
    # .option("startingOffsets", "earliest")
    .load()
)

In [25]:
from pyspark.sql.functions import from_json, col, expr

parsed_df = (
    kafka_df
    .withColumn("value", expr("cast(value as string)"))
    .select(
        from_json(col("value"), schema)
        .alias("data")
    )
    .select("data.*")
)

In [27]:
streaming_obj = (
    parsed_df
    .writeStream
    .format("console")
    .outputMode("append")
    # .trigger(processingTime='5 seconds')
    # .trigger(continuous='1 second')
    # .trigger(once=True)
    .option("checkpointLocation", "checkpoint_dir_1")
    .start()
)

In [23]:
streaming_obj.stop()