In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

checkpoint_dir="spark_streaming_check"

spark = SparkSession.builder \
    .appName("FraudDetection") \
    .master("local[4]") \
    .config("spark.sql.files.maxPartitionBytes", "16MB") \
    .config("spark.sql.session.timeZone", "Australia/Melbourne") \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .config("spark.sql.streaming.checkpointLocation",checkpoint_dir) \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()


In [2]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, TimestampType, ArrayType

# Customer Schema
customer_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("birthdate", DateType(), True),
    StructField("first_join_date", DateType(), True)
])

# Category Schema
category_schema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("cat_level1", StringType(), True),
    StructField("cat_level2", StringType(), True),
    StructField("cat_level3", StringType(), True)
])

# Product Schema
product_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("baseColour", StringType(), True),
    StructField("season", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("usage", StringType(), True),
    StructField("productDisplayName", StringType(), True),
    StructField("category_id", IntegerType(), True)
])

# Customer Session Schema
customer_session_schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("customer_id", IntegerType(), True)
])

# Fraud Transaction Schema
fraud_transaction_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("Is_fraud", StringType(), True)
])


# Loading CSV Files with verified paths.
category_df = spark.read.csv("category.csv", schema=category_schema, header=True)
customer_df = spark.read.csv("customer.csv", schema=customer_schema, header=True)
product_df = spark.read.csv("product.csv", schema=product_schema, header=True)
customer_session_df = spark.read.csv("customer_session.csv", schema=customer_session_schema, header=True)
fraud_transaction_df = spark.read.csv("fraud_transaction.csv", schema=fraud_transaction_schema, header=True)

# To print the schemas and show some data to verify
customer_df.printSchema()
category_df.printSchema()
product_df.printSchema()
customer_session_df.printSchema()
fraud_transaction_df.printSchema()

# To show some rows for verification
customer_df.show(5)
category_df.show(5)
product_df.show(5)
customer_session_df.show(5)
fraud_transaction_df.show(5)

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- first_join_date: date (nullable = true)

root
 |-- category_id: integer (nullable = true)
 |-- cat_level1: string (nullable = true)
 |-- cat_level2: string (nullable = true)
 |-- cat_level3: string (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- baseColour: string (nullable = true)
 |-- season: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- usage: string (nullable = true)
 |-- productDisplayName: string (nullable = true)
 |-- category_id: integer (nullable = true)

root
 |-- session_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)

root
 |-- transaction_id: string (nullable = true)
 |-- Is_fraud: string 

In [3]:
from pyspark.sql.functions import from_json

# Configuration
hostip = "kafka"
topic = "browsing-behavior-topic"

# Read data from Kafka topic
browsing_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("subscribe", topic) \
    .load()

transaction_topic = "transactions-topic"  # Kafka topic for transaction data

# Read data from Kafka topic for transactions
transaction_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("subscribe", transaction_topic) \
    .load()

In [4]:
browsing_df.printSchema()
transaction_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
# Cast key and value as STRING
browsing_df = browsing_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Define schema for Kafka value.
schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_time", StringType(), True),
    StructField("traffic_source", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("ts", IntegerType(), True)
])

# Parse the JSON data in 'value' column
parsed_browsing_df = browsing_df.withColumn("data", from_json(col("value"), schema)).select("data.*")

# Cast key and value as STRING
transaction_df = transaction_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Define schema for Kafka value (transaction data)
transaction_schema = StructType([
    StructField("created_at", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("transaction_id", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("product_metadata", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("payment_status", StringType(), True),
    StructField("promo_amount", StringType(), True),
    StructField("promo_code", StringType(), True),
    StructField("shipment_fee", StringType(), True),
    StructField("shipment_location_lat", StringType(), True),
    StructField("shipment_location_long", StringType(), True),
    StructField("total_amount", StringType(), True),
    StructField("clear_payment", StringType(), True)
])

# Parse the JSON data in the 'value' column
parsed_transaction_df = transaction_df.withColumn("data", from_json(col("value"), transaction_schema)).select("data.*")

schema_product_metadata = ArrayType(StructType([
    # define the data type for id of product in product metadata column
    StructField("product_id", IntegerType(), True),
    # define the data type for quantity in product metadata column
    StructField("quantity", IntegerType(), True),
    # define the data type for item price of product in product metadata column
    StructField("item_price", IntegerType(), True)
]))
# Using to_json function and the defined schema we get the required structure of each element in the product metadata column
parsed_transaction_df = parsed_transaction_df.withColumn("product_metadata", from_json(col("product_metadata"), schema_product_metadata))


In [6]:
parsed_browsing_df.printSchema()
# Output schema for transaction data
parsed_transaction_df.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- ts: integer (nullable = true)

root
 |-- created_at: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- product_metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: integer (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- item_price: integer (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- promo_amount: string (nullable = true)
 |-- promo_code: string (nullable = true)
 |-- shipment_fee: string (nullable = true)
 |-- shipment_location_lat: string (nullable = tr

In [7]:
from pyspark.sql.functions import col, from_unixtime, current_timestamp, unix_timestamp

# For browsing_df: Cast only the required columns
parsed_browsing_df = parsed_browsing_df.withColumn("event_time", col("event_time").cast(TimestampType())) \
    .withColumn("ts", col("ts").cast(IntegerType()))

# For transaction_df: Cast only the required columns
parsed_transaction_df = parsed_transaction_df.withColumn("created_at", col("created_at").cast(TimestampType())) \
    .withColumn("customer_id", col("customer_id").cast(IntegerType())) \
    .withColumn("promo_amount", col("promo_amount").cast(DoubleType())) \
    .withColumn("shipment_fee", col("shipment_fee").cast(DoubleType())) \
    .withColumn("shipment_location_lat", col("shipment_location_lat").cast(DoubleType())) \
    .withColumn("shipment_location_long", col("shipment_location_long").cast(DoubleType())) \
    .withColumn("total_amount", col("total_amount").cast(DoubleType()))


In [8]:
# Convert 'ts' to 'event_ts' as a timestamp and filter out old data for browsing_df
parsed_browsing_df = parsed_browsing_df.withColumn("event_ts", from_unixtime(col("ts")).cast(TimestampType())) \
    .filter((unix_timestamp(current_timestamp()) - unix_timestamp(col("event_ts"))) <= 120)

In [9]:
# For browsing data, write to memory
query_browsing = parsed_browsing_df \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("browsing_query") \
    .start()

In [10]:
spark.sql("SELECT * FROM browsing_query").show(truncate=False)

+------------------------------------+----------+-----------------------+--------------+-----------+-----------+----------+-------------------+
|session_id                          |event_type|event_time             |traffic_source|device_type|customer_id|ts        |event_ts           |
+------------------------------------+----------+-----------------------+--------------+-----------+-----------+----------+-------------------+
|cae032eb-9e06-4657-be66-8eb549bfe938|CL        |2024-01-01 06:32:36.459|MOBILE        |Android    |63151      |1728818038|2024-10-13 22:13:58|
|79883a8d-083d-471a-b537-f470ed8d42b6|HP        |2024-01-01 06:32:37.163|MOBILE        |iOS        |72212      |1728818038|2024-10-13 22:13:58|
|aea9b7f8-806a-4780-8758-a4ac58e107a2|SCR       |2024-01-01 06:32:52.867|MOBILE        |Android    |79087      |1728818038|2024-10-13 22:13:58|
|9ae8dcd6-4338-4675-adc8-9189505be3fd|VI        |2024-01-01 06:33:14.705|MOBILE        |iOS        |35603      |1728818038|2024-10-13 22

In [11]:
query_browsing.stop()

In [12]:
# For transaction data, write to memory
query_transaction = parsed_transaction_df \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("transaction_query") \
    .start()

In [13]:
# Query the transaction data
spark.sql("SELECT * FROM transaction_query").show(truncate=False)

+-----------------------+-----------+------------------------------------+------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+--------------+------------+-------------+------------+---------------------+----------------------+------------+-------------+
|created_at             |customer_id|transaction_id                      |session_id                          |product_metadata                                                                                                                                                                                                                                                                                          |payment_method|payment_status|promo_a

In [14]:
query_transaction.stop()

In [15]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.functions import when, unix_timestamp

# Define the actions by category
l1_actions = ['AP', 'ATC', 'CO']  # L1 actions
l2_actions = ['VC', 'VP', 'VI', 'SER']  # L2 actions
l3_actions = ['SCR', 'HP', 'CL']  # L3 actions

# Adding watermark to allow late data but manage memory usage
parsed_browsing_df = parsed_browsing_df.withWatermark("event_ts", "2 minutes")

time_window = F.window("event_ts", "5 seconds")

# Perform time-based aggregation without losing original columns
aggregated_browsing_df = parsed_browsing_df.groupBy(time_window, "session_id").agg(
    F.sum(F.when(F.col("event_type").isin(l1_actions), 1).otherwise(0)).alias("L1_count"),
    F.sum(F.when(F.col("event_type").isin(l2_actions), 1).otherwise(0)).alias("L2_count"),
    F.sum(F.when(F.col("event_type").isin(l3_actions), 1).otherwise(0)).alias("L3_count"),
    F.max("event_time").alias("max_event_time"),
    F.min("event_time").alias("min_event_time")
)

aggregated_browsing_df = aggregated_browsing_df.join(customer_session_df.select("session_id", "customer_id"), on="session_id", how="left")

# Calculate the median event timestamp
aggregated_browsing_df = aggregated_browsing_df.withColumn(
    "median_event_time", ((unix_timestamp(F.col("max_event_time")) + unix_timestamp(F.col("min_event_time"))) / 2).cast(TimestampType())
)

# Calculate the action ratios
aggregated_browsing_df = aggregated_browsing_df.withColumn(
    "total_actions", F.col("L1_count") + F.col("L2_count") + F.col("L3_count")
).withColumn(
    "L1_ratio", F.round(F.when(F.col("total_actions") > 0, (F.col("L1_count") / F.col("total_actions")) * 100).otherwise(0), 2)
).withColumn(
    "L2_ratio", F.round(F.when(F.col("total_actions") > 0, (F.col("L2_count") / F.col("total_actions")) * 100).otherwise(0), 2)
)

# To drop intermediate columns
aggregated_browsing_df = aggregated_browsing_df.drop("total_actions", "max_event_time", "min_event_time")

# Extract the hour from the median event timestamp
aggregated_browsing_df = aggregated_browsing_df.withColumn("extracted_hour", F.hour(F.col("median_event_time")))

# Classifying the time of day based on extracted_hour
aggregated_browsing_df = aggregated_browsing_df.withColumn(
    "time_of_day", 
    F.when((F.col("extracted_hour") >= 6) & (F.col("extracted_hour") < 12), "morning")
     .when((F.col("extracted_hour") >= 12) & (F.col("extracted_hour") < 18), "afternoon")
     .when((F.col("extracted_hour") >= 18) & (F.col("extracted_hour") < 24), "evening")
     .otherwise("night")
)

aggregated_browsing_df = aggregated_browsing_df.drop("extracted_hour", "median_event_time")

# Calculate the age in the customer_df
current_year = F.year(F.current_date())
customer_df = customer_df.withColumn("age", (current_year - F.year("birthdate")).cast(IntegerType()))
customer_df = customer_df.withColumn("first_join_year", F.year("first_join_date"))

# Join customer_df with the aggregated browsing data
aggregated_browsing_df = aggregated_browsing_df.join(
    customer_df.select("customer_id", "gender", "age", "first_join_year"),
    on="customer_id",
    how="left"
)

# Join transaction data with browsing data, creating the geolocation field
aggregated_browsing_df = aggregated_browsing_df.join(
    parsed_transaction_df.select("session_id","Customer_id" ,"shipment_location_lat", "shipment_location_long", "transaction_id", "product_metadata")
    .withColumn("geolocation", F.concat_ws(", ", F.col("shipment_location_lat"), F.col("shipment_location_long"))),
    on=["session_id","customer_id"],
    how="inner"
)

# Drop latitude and longitude after creating geolocation
aggregated_browsing_df = aggregated_browsing_df.drop("shipment_location_lat", "shipment_location_long")

aggregated_browsing_df = aggregated_browsing_df.join(
    fraud_transaction_df.select("transaction_id")
    .withColumn("fraud_status", F.lit("fraud")),
    on="transaction_id",
    how="left"
)

# Fill fraud_status for non-fraud transactions
aggregated_browsing_df = aggregated_browsing_df.withColumn(
    "fraud_status", F.when(F.col("fraud_status").isNull(), "non-fraud").otherwise(F.col("fraud_status"))
)

# Drop transaction_id if not needed
aggregated_browsing_df = aggregated_browsing_df.drop("transaction_id")

# Final Selection of all the needed Columns
final_df = parsed_browsing_df.select(
    "session_id", "event_type", "event_time", "device_type", "customer_id", "event_ts"
).join(
    aggregated_browsing_df.select(
        "session_id","customer_id", "L1_count", "L2_count", "L3_count", "L1_ratio", "L2_ratio", "time_of_day", 
        "gender", "age", "geolocation", "first_join_year", "fraud_status","product_metadata"
    ),
    on=["session_id","customer_id"],
    how="inner"
)


In [16]:
from pyspark.sql import functions as F

joined_stream = parsed_browsing_df.join(
    parsed_transaction_df,
    on=["customer_id", "session_id"],
    how="inner"
)

num_purchase_stream = joined_stream.groupBy(
    F.window("event_ts", "5 seconds"),
    "customer_id"
).agg(
    F.count("transaction_id").alias("num_purchases")
)

final_df_with_purchases = final_df.join(
    num_purchase_stream.select("customer_id", "num_purchases"),
    on="customer_id",
    how="inner"
)

final_df_with_purchases_no_duplicates = final_df_with_purchases.dropDuplicates(["session_id"])

In [17]:
final_df_with_purchases_no_duplicates.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- device_type: string (nullable = true)
 |-- event_ts: timestamp (nullable = true)
 |-- L1_count: long (nullable = true)
 |-- L2_count: long (nullable = true)
 |-- L3_count: long (nullable = true)
 |-- L1_ratio: double (nullable = true)
 |-- L2_ratio: double (nullable = true)
 |-- time_of_day: string (nullable = false)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- geolocation: string (nullable = false)
 |-- first_join_year: integer (nullable = true)
 |-- fraud_status: string (nullable = true)
 |-- product_metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: integer (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- item_price: integer (nullable = true)
 |-- num_purchases: long (nullable = 

In [26]:
from pyspark.ml import PipelineModel
best_gbt_model = PipelineModel.load("best_rf_model/")
# Apply the loaded GBT model to predict fraud in the streaming DataFrame
predicted_stream = best_gbt_model.transform(final_df_with_purchases_no_duplicates)

In [27]:
predicted_stream.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- device_type: string (nullable = true)
 |-- event_ts: timestamp (nullable = true)
 |-- L1_count: long (nullable = true)
 |-- L2_count: long (nullable = true)
 |-- L3_count: long (nullable = true)
 |-- L1_ratio: double (nullable = true)
 |-- L2_ratio: double (nullable = true)
 |-- time_of_day: string (nullable = false)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- geolocation: string (nullable = false)
 |-- first_join_year: integer (nullable = true)
 |-- fraud_status: string (nullable = true)
 |-- product_metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: integer (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- item_price: integer (nullable = true)
 |-- num_purchases: long (nullable = 

In [28]:
count_stream_save = predicted_stream.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "Predicted_Stream") \
    .start()

In [29]:
count_stream_save.stop()

In [30]:
fraud_count_data = spark.read.parquet("Predicted_Stream")

In [31]:
# Filter the DataFrame for rows where rf_prediction == 1
rf_prediction_1_df = fraud_count_data.filter(col("rf_prediction") == 1)

# Group by session_id and count the number of occurrences
session_id_count_df = rf_prediction_1_df.groupBy("session_id").count()

# Show the grouped DataFrame with session_id and count
session_id_count_df.show()

# Optionally, get the total count of unique session_id where rf_prediction == 1
unique_session_id_count = session_id_count_df.count()

# Print the total number of unique session_ids
print(f"Total session_ids where rf_prediction == 1: {unique_session_id_count}")


+--------------------+-----+
|          session_id|count|
+--------------------+-----+
|a55a4726-91e8-493...|    1|
|a48d2d01-5dc6-4b1...|    1|
|4adf12a5-8243-448...|    1|
|22f59ba7-bb41-446...|    1|
|edd6bdcb-b4f0-4b0...|    1|
|3f29b3de-cf2b-4a4...|    1|
|a461249e-454a-443...|    1|
+--------------------+-----+

Total session_ids where rf_prediction == 1: 7


In [32]:
rf_prediction_1_df.write.mode("overwrite").parquet("fraudCount_parquet")

In [33]:
from pyspark.sql.functions import col, explode, sum

# Filter for non-fraud transactions (rf_prediction == 0)
non_fraud_transactions_df = fraud_count_data.filter(col("rf_prediction") == 0)

# Filter for ADD_TO_CART (ATC) events
add_to_cart_events_df = non_fraud_transactions_df.filter(col("event_type") == "ATC")

# Explode the product_metadata column to extract individual product details
exploded_cart_items_df = add_to_cart_events_df.withColumn("product", explode("product_metadata"))

# Select relevant columns: session_id, product_id, product name (productDisplayName), and quantity
product_details_df = exploded_cart_items_df.select(
    "session_id", 
    col("product.product_id").alias("product_id"), 
    col("product.item_price").alias("item_price"),
    col("product.quantity").alias("quantity")
)

product_details_df = product_details_df.join(
    product_df.select(col("id").alias("product_id"), "productDisplayName"),
    on="product_id",
    how="inner"
)

# Group by product_id and product_name and aggregate the total quantities for each product
aggregated_product_quantity_df = product_details_df.groupBy(
    "product_id", 
    "productDisplayName"
).agg(
    sum("quantity").alias("total_quantity")
)

# Order by total_quantity in descending order and limit to the top 20 products
top_20_products_df = aggregated_product_quantity_df.orderBy(col("total_quantity").desc()).limit(20)

# Show the top 20 products with their product ID, name, and total quantity
top_20_products_df.show(truncate=False)

+----------+------------------------------------------------------+--------------+
|product_id|productDisplayName                                    |total_quantity|
+----------+------------------------------------------------------+--------------+
|44170     |Nike Fragrances Men Green Storm Deo                   |12            |
|34314     |Myntra Men Orange Urban T-shirt                       |9             |
|19837     |U.S. Polo Assn. Men Check Green Shirt                 |9             |
|56226     |SDL by Sweet Dreams Men Green & Grey Pyjama Set       |9             |
|58591     |ToniQ Women Coral Necklace                            |8             |
|54619     |Fastrack Men Black Dial Watch                         |7             |
|12463     |Basics Men Blue Checked Stoles                        |7             |
|24583     |Lee Men Roadie Red T-shirt                            |6             |
|13078     |Numero Uno Men White Casual Shoes                     |6             |
|551

In [34]:
# Save the top 20 products DataFrame as a Parquet file
top_20_products_df.write.mode("overwrite").parquet("top_20_products_parquet")

In [35]:
top_20_products_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- productDisplayName: string (nullable = true)
 |-- total_quantity: long (nullable = true)



In [36]:
from pyspark.sql.functions import current_timestamp, col
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

fraud_count_schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("count", IntegerType(), True)
])

# Define schema for product stream (initially as IntegerType for product_id)
product_schema = StructType([
    StructField("product_id", IntegerType(), True),  # Initially inferred as IntegerType
    StructField("productDisplayName", StringType(), True),
    StructField("total_quantity", IntegerType(), True)
])

# Read fraudCount parquet as streaming dataframe
fraud_count_stream = spark.readStream \
    .schema(fraud_count_schema) \
    .parquet("fraudCount_parquet") 

# Read top_20_products parquet as streaming dataframe
product_stream = spark.readStream \
    .schema(product_schema) \
    .parquet("top_20_products_parquet") 

# Cast product_id to StringType
product_stream = product_stream.withColumn("product_id", col("product_id").cast(StringType()))
product_stream = product_stream.withColumn("total_quantity", col("total_quantity").cast(StringType()))

fraud_count_stream.selectExpr(
    "CAST(session_id AS STRING) AS key",
    "to_json(struct(*)) AS value"
).writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "fraud_count_topic") \
    .start()


<pyspark.sql.streaming.query.StreamingQuery at 0x7fdda58b9660>

In [37]:
# Send product stream to Kafka with timestamp
product_stream.selectExpr(
    "CAST(product_id AS STRING) AS key",
    "to_json(struct(*)) AS value"
).writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "top_products_topic") \
    .start()


<pyspark.sql.streaming.query.StreamingQuery at 0x7fdd37eb37f0>