In [None]:

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:

#Preprocessing credit card transaction data
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CreditCardFraudDetection") \
    .getOrCreate()

# Load dataset
file_path = '/content/drive/Shared drives/532_project/creditcard.csv'
data = spark.read.csv(file_path, header=True, inferSchema=True)## Need to mount drive

# data cleaning
data = data.dropna()  # Drop missing values

# data = data.withColumn("Time_scaled", col("Time") / 3600)  # Scale time to days
# # Get all column names
# columns_to_select = [f"V{i}" for i in range(1, 29)] + ["Time", "Amount", "Class"]
# features = data.select(*columns_to_select)


In [None]:
# features.show()
data.show(100)
input_cols = [f"V{i}" for i in range(1, 29)] + ["Time", "Amount"]


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Prepare data for training
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
processed_data = assembler.transform(data).select("features", "Class")

# Split data
train, test = processed_data.randomSplit([0.8, 0.2], seed=42)

# Train Random Forest Classifier
rf = RandomForestClassifier(labelCol="Class", featuresCol="features", numTrees=100)
model = rf.fit(train)




In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Evaluate model
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderPR")
auprc = evaluator.evaluate(predictions)
print(f"AUPRC: {auprc}")

# 2. Precision, Recall, and F1-Score
evaluator_precision = MulticlassClassificationEvaluator(labelCol="Class", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)

evaluator_recall = MulticlassClassificationEvaluator(labelCol="Class", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Class", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)

print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1_score}")

In [None]:
# bin/zookeeper-server-start.sh config/zookeeper.properties
# bin/kafka-server-start.sh config/server.properties
# bin/kafka-topics.sh --create --topic creditcard-transactions --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

""" Commands to set Kafka server up  """
# Install kafka locally
# Run in terminal to set the server locally

' Commands to set Kafka server up  '

In [None]:
!pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl.metadata (7.8 kB)
Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/246.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [None]:
!pip install pyngrok
from pyngrok import ngrok

Collecting pyngrok
  Downloading pyngrok-7.2.1-py3-none-any.whl.metadata (8.3 kB)
Downloading pyngrok-7.2.1-py3-none-any.whl (22 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.2.1


In [None]:
#Kafka Integration
from kafka import KafkaProducer
import pandas as pd
import json
import time

# Load the dataset
file_path = '/content/drive/Shared drives/532_project/creditcard.csv' # For now the dataset
data = pd.read_csv(file_path)

Kafka producer setup
producer = KafkaProducer(bootstrap_servers='0.0.0.0:9092',
                         value_serializer=lambda x: json.dumps(x).encode('utf-8'))

producer = KafkaProducer(bootstrap_servers='8.tcp.ngrok.io:15664',
                         value_serializer=lambda x: json.dumps(x).encode('utf-8'))


producer = KafkaProducer(
    bootstrap_servers='0.tcp.ngrok.io:16640',
    value_serializer=lambda x: json.dumps(x).encode('utf-8'),
    retries=5,
    request_timeout_ms=120000  # Timeout after 2 minutes
    # metadata_fetch_timeout_ms=60000  # Timeout for metadata fetch after 1 minute
)
# producer = KafkaProducer(
#     bootstrap_servers=['4.tcp.ngrok.io:19393:9092'],
#     request_timeout_ms=30000,
#     metadata_max_age_ms=60000
# )





# # Send data row by row
# for index, row in data.iterrows():
#     producer.send('creditcard-transactions', value=row.to_dict())
#     time.sleep(0.01)  # Simulate delay between transactions


NoBrokersAvailable: NoBrokersAvailable

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, DoubleType

# Define transaction schema
schema = StructType([
    StructField("Time", DoubleType(), True),
    StructField("V1", DoubleType(), True),
    # Include all fields up to V28
    StructField("Amount", DoubleType(), True),
    StructField("Class", DoubleType(), True)
])

# Initialize Spark session
spark = SparkSession.builder \
    .appName("FraudDetectionStreaming") \
    .getOrCreate()

# Read from Kafka
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "creditcard-transactions") \
    .load()

# Parse the Kafka stream
parsed_stream = kafka_stream.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# Process data
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
processed_stream = assembler.transform(parsed_stream)

# Apply the trained model
predictions = model.transform(processed_stream)

# Write flagged fraud alerts to another Kafka topic
predictions.selectExpr("to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "fraud-alerts") \
    .outputMode("append") \
    .start()
