In [1]:
# Create the spark session
from pyspark.sql import SparkSession

In [2]:
spark = (
    SparkSession
    .builder
    .appName("Process json from kafka")
    .config("spark.sql.streaming.stopGracefullyOnShutdown", True)
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]")
    .getOrCreate()
)
spark

In [3]:
orderjson_df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")
    .option("subscribe", "custords")
    .option("startingOffsets", "earliest") #earliest/latest
    .load()
)

In [4]:
orderjson_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
#orderjson_df.show()

In [6]:
from pyspark.sql.functions import expr, col

stringjson_df = orderjson_df.withColumn("value", expr("cast(value as string)"))

In [7]:
#stringjson_df.show(truncate=False)

In [8]:
json_schema = """
struct<
  event_id:string,
  event_type:string,
  event_timestamp:string,
  order:struct<
    order_id:string,
    customer_id:string,
    currency:string,
    product_id:string,
    product_name:string,
    category:string,
    quantity:int,
    unit_price:int,
    total_price:int
  >,
  payment:struct<
    payment_method:string,
    status:string,
    transaction_id:string
  >,
  customer_context:struct<
    device:string,
    location:string,
    ip_address:string,
    session_id:string
  >
>
"""
from pyspark.sql.functions import from_json
df_with_schema = stringjson_df.select(
    from_json(col("value").cast("string"), json_schema).alias("data")
)


In [9]:
#df_with_schema.show(truncate=False)

In [10]:
#df_with_schema.printSchema()

In [11]:
Bronze_df = df_with_schema.select("data.*")

In [12]:
#Bronze_df.show()

In [14]:
from pyspark.sql.functions import explode

Bronze_df_final = Bronze_df.select("event_id", "event_type", "event_timestamp", "order.*", "payment.*", "customer_context.*")

In [15]:
#Bronze_df_final.printSchema()

In [16]:
#running in once/available now and processingTime mode
#write the output to sink to check output

(
    Bronze_df_final
    .writeStream
    .format("console")
    .outputMode("append")
    .trigger(once=True) #processingTime="10 seconds"
    .option("checkpointLocation", "checkpoint_dir_custords_1")
    .start()
    .awaitTermination()
)

In [18]:
#running in continuous mode
#write the output to memory sink to check output

(
    Bronze_df_final
    .writeStream
    .format("memory")
    .queryName("kafka_table")
    .outputMode("append")
    .trigger(continuous="10 seconds")
    .option("checkpointLocation", "checkpoint_dir_custords_2")
    .start()
    .awaitTermination()
^)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/local/lib/python3.10/socket.py", line 717, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [17]:
# write to parquet

bronze_data_write = (
    Bronze_df_final.writeStream
    .format("parquet")     # Delta if available
    .option("checkpointLocation", "chk/bronze")
    .option("path", "data/bronze/orders")
    .outputMode("append")
    .start()
)

In [18]:
Bronze_df_final.printSchema()

root
 |-- event_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_timestamp: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: integer (nullable = true)
 |-- total_price: integer (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- status: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- device: string (nullable = true)
 |-- location: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- session_id: string (nullable = true)



In [23]:
from pyspark.sql.functions import col, to_timestamp

silver_df = (
    Bronze_df_final
    .withColumn("event_ts", to_timestamp("event_timestamp"))
    .dropDuplicates(["event_id"])
    .filter(col("status") == "PAID")
    .select(
        "event_id",
        "event_type",
        "event_ts",
        "order_id",
        "customer_id",
        "product_id",
        "product_name",
        "category",
        "quantity",
        "unit_price",
        "total_price",
        "payment_method",
        "location"
    )
)


silver_write = (
    silver_df.writeStream
    .format("parquet")
    .option("checkpointLocation", "chk/silver")
    .option("path", "data/silver/orders")
    .outputMode("append")
    .start()
)


In [24]:
# view data in memory sink

spark.sql("select * from kafka_table").show()

AnalysisException: Table or view not found: kafka_table; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [kafka_table], [], false


In [38]:
df_from_kafka_batch = spark.sql("select * from kafka_table")

In [39]:
df_from_kafka_batch.printSchema()

root
 |-- event_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_timestamp: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: integer (nullable = true)
 |-- total_price: integer (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- status: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- device: string (nullable = true)
 |-- location: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- session_id: string (nullable = true)



In [41]:
df_from_kafka_batch.select("product_name").distinct().show()

+-------------------+
|       product_name|
+-------------------+
|Mechanical Keyboard|
|       Office Chair|
|      Running Shoes|
|     Wireless Mouse|
|         Coffee Mug|
+-------------------+



In [43]:
#check the best sellling products by number of orders

df_best_products = df_from_kafka_batch.selectExpr("product_name", "quantity").groupBy("product_name").agg(sum("quantity").alias("total_qty")).orderBy(col("total_qty").desc())

df_best_products.show()

+-------------------+---------+
|       product_name|total_qty|
+-------------------+---------+
|     Wireless Mouse|     5801|
|      Running Shoes|     5745|
|       Office Chair|     5676|
|         Coffee Mug|     5650|
|Mechanical Keyboard|     5626|
+-------------------+---------+



In [29]:
# get the customers with the most spend

from pyspark.sql.functions import col, sum, asc, desc
Gold_df_1 = silver_df.selectExpr("customer_id", "cast((unit_price * quantity) as double) as order_price")\
.groupBy("customer_id").agg(sum("order_price").alias("total_order_amt")).orderBy(col("total_order_amt").desc()).limit(5)
                                                                                                                     
#Gold_df_1.show()
                                                                                                                      

In [31]:
silver_df.printSchema()

root
 |-- event_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_ts: timestamp (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: integer (nullable = true)
 |-- total_price: integer (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)



In [32]:
from pyspark.sql.functions import window, sum

gold_df_2 = (
    silver_df
    .withWatermark("event_ts", "10 minutes")
    .groupBy(
        window("event_ts", "10 minutes"),
        col("category")
    )
    .agg(
        sum("total_price").alias("revenue")
    )
)

gold_write = (
    gold_df_2.writeStream
    .format("parquet")
    .option("checkpointLocation", "chk/gold")
    .option("path", "data/gold/revenue_by_category")
    .outputMode("append")
    .start()
)


In [62]:
#spark.stop()