In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'

In [3]:
spark = SparkSession.builder.appName('real_time_fraud').master('local[3]').getOrCreate()

:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ayushjeet38medu/.ivy2/cache
The jars for the packages stored in: /home/ayushjeet38medu/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4e89a728-102c-4a1e-af84-89e76f6ee3f6;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 891ms :: artifacts dl 38ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;

In [4]:
spark

In [5]:
spark.sparkContext.setLogLevel("Error")

In [None]:
# bank_transactions_df = spark.read.format('csv').options(inferschema = True, header = True).load('cleaned_banking_transaction.csv')

In [None]:
# bank_transactions_df.count()

In [None]:
# len(bank_transactions_df.columns)

In [None]:
# bank_transactions_df.printSchema()

In [None]:
# ! pip install kafka-python

In [None]:
# import csv
# import time
# from kafka import KafkaProducer

In [None]:
# producer  = KafkaProducer(bootstrap_servers = 'master:9092', value_serializer = lambda v:str(v).encode('utf-8'))

# topic_n = 'assessment2_aj'

# with open('banking_transaction.csv', 'r')as f:
#     reader = csv.DictReader(f)
#     for row in reader:
#         msg = ','.join([row['transaction_id'], row['customer_id'], row['timestamp'], row['amount'], row['transaction_type'], row['channel'],row['channel'], row['status']])
#         producer.send(topic_n, value=msg)
#         producer.flush()
#         time.sleep(0.5)
# producer.close()

### Bash Command to stimulate the stream data 

In [None]:
# while IFS= read -r line;do echo $line;sleep 5;done <banking_transaction.csv | kafka-console-producer.sh --bootstrap-server master:9092 --topic assessment2_aj

# jupyter-nbconvert real_time_fraud.ipynb --to python

In [6]:
transaction_df = spark.readStream.format('kafka').option('kafka.bootstrap.servers', 'master:9092').option('subscribe', 'assessment2_aj').option('checkpointLocation', 'checkpoint_8').option("startingOffsets","latest").load()

In [7]:
transaction_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [8]:
raw_df = transaction_df.select(col('value'))

In [9]:
from pyspark.sql.functions import col, split, trim, regexp_extract

cleaned_data = transaction_df.selectExpr("CAST(value AS STRING)") \
    .withColumn("parts", split(col("value"), ",")) \
    .withColumn("transaction_id", regexp_extract(col("parts")[0], r'\d+', 0).cast("int")) \
    .withColumn("customer_id", col("parts")[1].cast("int")) \
    .withColumn("timestamp", col("parts")[2].cast("timestamp")) \
    .withColumn("amount", col("parts")[3].cast("double")) \
    .withColumn("transaction_type", trim(col("parts")[4])) \
    .withColumn("channel", trim(col("parts")[5])) \
    .withColumn("status", trim(col("parts")[6])) \
    .drop("parts")

In [10]:
cleaned_data.printSchema()

root
 |-- value: string (nullable = true)
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- status: string (nullable = true)



In [None]:
# custom_schema = "transaction_id int, customer_id int, timestamp timestamp, amount double, transaction_type string, channel string, status string"
# cleaned_data = raw_df.select(from_csv(col('value').cast('string') ,custom_schema).alias('value'))
# cleaned_data.printSchema()

In [None]:
# cleaned_data = cleaned_data.select('value.*')
# cleaned_data.printSchema()

- “If a customer performs more than two withdrawals over ₹400 within a 1-minute window,flag the behavior as suspicious.”

In [11]:
filtered_df = cleaned_data.filter((cleaned_data.transaction_type == 'withdrawal') & (cleaned_data.amount >400) & (cleaned_data.status == 'success'))

In [12]:
fraud_customers = filtered_df\
.groupBy(window(cleaned_data.timestamp, "1 minutes"), cleaned_data.customer_id).agg(count('*').alias('transaction_count'))\
.withColumn('customer_behaviour', when(col('transaction_count')>2, "Suspicious").otherwise("Innocent"))

In [None]:
# spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled",False)

In [None]:
# fraud_customers.writeStream.format('console').outputMode('update').start().awaitTermination()

In [None]:
# suspicious_customers = filtered_df\
# .groupBy(window(cleaned_data.timestamp, "1 minutes"), cleaned_data.customer_id).agg(count('*').alias('transaction_count'))\
# .filter(col('transaction_count')>2)

## kafka to Hbase

In [13]:
filtered_df_window = filtered_df.withWatermark("timestamp", "1 minutes").groupBy(window("timestamp", "1 minute", "1 second"),"customer_id")\
.agg(collect_list(struct("*")).alias("transactions"),count("*").alias("transaction_count"))

suspicious_transactions = filtered_df_window.filter(col("transaction_count") > 2).select(explode("transactions").alias("txn"),"transaction_count").select("txn.*","transaction_count")

In [None]:
# 5. Real-Time Alert Storage (HBase Integration)
# • Store flagged records into an HBase table.
# • Each record should include: transaction_id, customer_id, timestamp, amount, channel, and
# reason.

In [None]:
# create 'ayushjee_tcs.suspicious_customer',{NAME=>'info', VERSIONS=>4}

In [14]:
import happybase

def write_to_hbase(rows, batch):
    data = rows.collect()
    connection = happybase.Connection('master')  
    table = connection.table('ayushjee_tcs.suspicious_customer')

    for row in data:
        row_key = f"{row['customer_id']}_{row['transaction_id']}"
        data = {
            b'info:transaction_id': str(row['transaction_id']).encode(),
            b'info:customer_id': str(row['customer_id']).encode(),
            b'info:timestamp': str(row['timestamp']).encode(),
            b'info:amount': str(row['amount']).encode(),
            b'info:channel': str(row['channel']).encode(),
            b'info:reason': str('Alert: Suspicious Customer').encode(),
        }
        table.put(row_key, data)

    connection.close()



In [15]:
suspicious_transactions.printSchema()

root
 |-- value: string (nullable = true)
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- status: string (nullable = true)
 |-- transaction_count: long (nullable = false)



In [16]:
flat_suspicious_transactions = suspicious_transactions.select(['transaction_id', 'customer_id', 'timestamp', 'amount', 'channel'])

In [None]:
flat_suspicious_transactions.writeStream.foreachBatch(write_to_hbase).outputMode("append").start().awaitTermination()

25/06/26 04:22:10 ERROR CodeGenerator: failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in "generated.java": Code of method "expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage2;Lorg/apache/spark/unsafe/types/UTF8String;ZIZIZJDZLorg/apache/spark/unsafe/types/UTF8String;ZLorg/apache/spark/unsafe/types/UTF8String;ZLorg/apache/spark/unsafe/types/UTF8String;Z)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2" grows beyond 64 KB
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in "generated.java": Code of method "expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage2;Lorg/apache/spark/unsafe/types/UTF8String;ZIZIZJDZLorg/apache/spark/unsafe/types/UTF8String;ZLorg/apache/spark/unsafe/types/UTF8String;ZLorg/apache/spark/unsafe/types/UTF8String;Z)V" of class 