## Kafka Real-Time Streaming Project ( Author: Pawonrat Khumngoen )

### This Jupyter Notebook demonstrates a real-time data streaming processing using Kafka. After the Kafka producer starts sending messages based on the schema defined in schema.py, the messages are consumed by the Kafka consumer.

### These messages are processed in real-time using PySpark, exported to parquet files for downstream, enabling some basic analysis on the incoming data as shown below.

##### Link of Structured Streaming Programming Guide: https://spark.apache.org/docs/latest/streaming/structured-streaming-kafka-integration.html

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, from_json, sum, count, concat, explode, desc, asc

In [2]:
spark = SparkSession \
        .builder \
        .appName('Kafka_finance_transaction') \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0") \
        .getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2.5.2/cache
The jars for the packages stored in: /root/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7336ccfb-f1e4-44c8-bf52-1f7be9945cf6;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.0.0 in central
	found org.apache.kafka#kafka-clients;3.9.0 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.7 in central
	found org.slf4j#slf4j-api;2.0.16 in central
	found org.apache.hadoop#hadoop-client-runtime;3.4.1 in central
	found org.apache.hadoop#hadoop-client-api;3.4.1 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.scala-lang.modules#scala-parallel-collections_2.13;1.2.0

In [None]:
# check spark verison, will match 4.0.0 followed by Dockerfile
spark.version

'4.0.0'

In [None]:
bootstrap_server = 'broker:29092'
topic_name = 'financial_transaction_topic'

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", bootstrap_server) \
  .option("subscribe", topic_name) \
  .load()

# Cast key and value to string format
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [5]:
df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [6]:
# Set schema
schema = StructType([
    StructField("transaction_id", StringType(), nullable = False),
    StructField("user_id", StringType(), nullable = False),
    StructField("account_id", StringType(), nullable = False),
    StructField("transaction_type", StringType(), nullable = False),
    StructField("original_amount", DecimalType(10,2), nullable = False),
    StructField("amount", DecimalType(10,2), nullable = False),
    StructField("currency", StringType(), nullable = False),
    StructField("status", StringType(), nullable = False),
    StructField("timestamp", TimestampType(), nullable = False),
    StructField("description", StringType(), nullable = False),
    StructField("recipient_account_id", StringType(), nullable = True),
    StructField("recipient_user_id", StringType(), nullable = True),
    StructField("fee", DecimalType(3,2), nullable = False),
    StructField("currency_rate", DecimalType(5,2), nullable = False),
    StructField("location", StringType(), nullable = True),
    StructField("merchant_id", StringType(), nullable = True),
    StructField("category", StringType(), nullable = True),
    StructField("reference_id", StringType(), nullable = True)])

In [7]:
df_parsed = df.select(from_json(col('value'), schema).alias('data'))

In [8]:
# Unpack json objects that read from Kafka to struct type
df_parsed.select(col('data.*')).limit(10).show()

+--------------------+-----------+------------+----------------+---------------+---------+--------+---------+--------------------+--------------------+--------------------+-----------------+----+-------------+-------------------+-------------+---------------+------------+
|      transaction_id|    user_id|  account_id|transaction_type|original_amount|   amount|currency|   status|           timestamp|         description|recipient_account_id|recipient_user_id| fee|currency_rate|           location|  merchant_id|       category|reference_id|
+--------------------+-----------+------------+----------------+---------------+---------+--------+---------+--------------------+--------------------+--------------------+-----------------+----+-------------+-------------------+-------------+---------------+------------+
|1da7438b-8bb6-415...|user_008329|acc_00035075|        transfer|         234.47|  6904.58|     TWD|completed|2025-07-20 07:23:...|Transfer to Grego...|        acc_00015196|      use

### Show total amount, Number of transactions by Transaction type and status

In [9]:
df_parsed.groupby(col('data.status'), col('data.transaction_type')) \
         .agg(
             sum(col('data.amount')).alias('Total_amount'),
             count(col('*')).alias('number_of_transaction')
         ) \
         .orderBy(asc(col('data.status')), asc(col('data.transaction_type'))).show()

[Stage 1:>                                                          (0 + 1) / 1]

+---------+----------------+------------+---------------------+
|   status|transaction_type|Total_amount|number_of_transaction|
+---------+----------------+------------+---------------------+
|cancelled|         deposit|  1467481.71|                    7|
|cancelled|         payment|   451646.78|                   11|
|cancelled|          refund|   105034.78|                   12|
|cancelled|        transfer|   961087.12|                   10|
|cancelled|      withdrawal|    51728.05|                   10|
|completed|         deposit| 72231264.17|                  385|
|completed|         payment| 10723204.05|                  340|
|completed|          refund|  3824873.98|                  411|
|completed|        transfer| 43046836.99|                  405|
|completed|      withdrawal|  7652298.55|                  386|
|   failed|         deposit|  2297040.56|                   12|
|   failed|         payment|   513908.00|                   13|
|   failed|          refund|   139913.27

                                                                                

### Show Number of transactions by Currency

In [10]:
df_parsed.groupby(col('data.currency')).count() \
         .withColumnRenamed('count', 'number_of_transaction') \
         .orderBy(desc('number_of_transaction')) \
         .show()

+--------+---------------------+
|currency|number_of_transaction|
+--------+---------------------+
|     TWD|                  387|
|     THB|                  365|
|     CNY|                  358|
|     SGD|                  358|
|     HKD|                  344|
|     JPY|                  343|
+--------+---------------------+



### Show transactions that do activity more than 5,000 USD per transaction

In [11]:
df_parsed.select(col('data.original_amount').alias('transaction_per_usd') \
            ,col('data.amount').alias('transaction_per_currency') \
            ,col('data.currency').alias('currency_type') \
            ,col('data.currency_rate').alias('current_rate_per_usd') \
            ,col('data.transaction_type')) \
            .filter(col('data.original_amount')>5000) \
            .show()

+-------------------+------------------------+-------------+--------------------+----------------+
|transaction_per_usd|transaction_per_currency|currency_type|current_rate_per_usd|transaction_type|
+-------------------+------------------------+-------------+--------------------+----------------+
|            9791.11|                76858.77|          HKD|                7.85|         deposit|
|            5548.55|                43555.30|          HKD|                7.85|         deposit|
|            5125.17|               150923.50|          TWD|               29.45|         deposit|
|            7660.35|              1131778.80|          JPY|              147.75|         deposit|
|            8251.21|                10573.11|          SGD|                1.28|         deposit|
|            6102.81|                 7820.15|          SGD|                1.28|         deposit|
|            9410.97|              1390423.77|          JPY|              147.75|         deposit|
|         

### Export data to parquet file

In [14]:
df_parsed.select(col('data.*')) \
         .write \
         .option("mergeSchema", "true") \
         .mode('append') \
         .parquet('transactions/')