<a href="https://colab.research.google.com/github/CPO-atu/Big-Data-Project/blob/main/BIG_DATA_SOLUTION.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("sriharshaeedala/financial-fraud-detection-dataset")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/financial-fraud-detection-dataset


In [None]:
import os
import pandas as pd

# Check what's inside the downloaded directory
dataset_dir = "/kaggle/input/financial-fraud-detection-dataset"
print("Files in dataset directory:", os.listdir(dataset_dir))

# Find the actual data file (CSV, Excel, etc.)

path = os.path.join(dataset_dir, "Synthetic_Financial_datasets_log.csv")  # Adjust filename

# Load into DataFrame
df = pd.read_csv(path)  # Use pd.read_excel() for Excel files

df.head()

Files in dataset directory: ['Synthetic_Financial_datasets_log.csv']


Unnamed: 0,step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
0,1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0
1,1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0
2,1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0
3,1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0
4,1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0


In [None]:
sample_df = df.sample(n=100000, random_state=42)


**STEP 5: SIMULATE STREAMING DATA TO PREDICT FRAUDULENT TRANSACTIONS IN REAL-TIME**

In [None]:
!pip install kafka-python

!curl -sSOL https://downloads.apache.org/kafka/3.7.2/kafka_2.12-3.7.2.tgz
!ls
!tar -xzf kafka_2.12-3.7.2.tgz

!./kafka_2.12-3.7.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.12-3.7.2/config/zookeeper.properties
!./kafka_2.12-3.7.2/bin/kafka-server-start.sh -daemon ./kafka_2.12-3.7.2/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

!ps -ef | grep kafka
!ps -ef | grep zookeeper

Collecting kafka-python
  Downloading kafka_python-2.1.5-py2.py3-none-any.whl.metadata (9.2 kB)
Downloading kafka_python-2.1.5-py2.py3-none-any.whl (285 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m285.4/285.4 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.1.5
kafka_2.12-3.7.2.tgz  sample_data
Waiting for 10 secs until kafka and zookeeper services are up and running
root        1042       1 21 16:27 ?        00:00:02 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/content/kafka_2.12-3.7.2/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/content/kafka_2.12-3.7.2/bin/../logs -

#### Produce Kafka Message

Here, we will set up a process to send data to a Kafka topic in batches, using the test data from training the model. To regulate the rate of data transfer, the function introduces a delay between batches, and after each batch is sent, the producer ensures all messages are flushed from the buffer to Kafka.

In [None]:
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError

def error_callback(exc):
    raise Exception('Error while sending data to kafka: {0}'.format(str(exc)))


def write_to_kafka(topic_name, items, batch_size=1000, delay_seconds=1):
    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
    total_messages = len(items)
    sent = 0

    for i in range(0, total_messages, batch_size):
        batch = items[i:i + batch_size]
        for message, key in batch:
            key_str = str(key)
            producer.send(
                topic_name,
                key=key_str.encode('utf-8'),
                value=json.dumps(message).encode('utf-8')
            ).add_errback(error_callback)
            sent += 1

        producer.flush()
        print(f"Sent {sent}/{total_messages} messages (batch {i//batch_size + 1})")
        if i + batch_size < total_messages:
            time.sleep(delay_seconds)

    print(f"Finished sending {total_messages} messages to topic: {topic_name}")


####Spark Streaming

Here, we will demonstrate a real-time data processing pipeline using Apache Kafka and PySpark for fraud detection. Initially, a Kafka producer (`write_to_kafka`) method sends batches of test data, consisting of features and labels, to a Kafka topic named `fraud-test`. The data is serialized into JSON format and transmitted with a controlled delay between batches to simulate real-time streaming.

Simultaneously, a Kafka consumer is implemented as a separate thread using PySpark's structured streaming capabilities. The consumer subscribes to the same Kafka topic, deserializes incoming messages, and proesses them in micro-batches. Each batch is converted into a Pandas DataFrame, where feature data is extracted from the message values and actual labels are derived from the keys.

The pre-trained Random Forest model (`rf_model`) is then used to predict fraud labels for the incoming data, and the results including features, actual labels, and predictions are displayed for analysis.


This design ensures scalable, real-time processing of streaming data while maintaining separation between data production and consumption.


In [None]:



# Define the Spark version to install
spark_version = "3.5.2"

# Install OpenJDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!pip install pyspark==3.5.2
!pip install -q spark-streaming-kafka-0-10-assembly_2.12

# Download and extract Spark
!wget -q https://archive.apache.org/dist/spark/spark-{spark_version}/spark-{spark_version}-bin-hadoop3.tgz
!tar xf spark-{spark_version}-bin-hadoop3.tgz

import os

# Set environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{spark_version}-bin-hadoop3"

# Install findspark
!pip install -q findspark


Collecting pyspark==3.5.2
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812369 sha256=6c9fb0fefa8a26175fe57334dbba9d57d57d08dae471962a64749da857e7dd3a
  Stored in directory: /root/.cache/pip/wheels/9d/29/ee/3a756632ca3f0a6870933bac1c9db6e4af2c068f019aba0ee1
Successfully built pyspark
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.5
    Uninstalling pyspark-3.5.5:
      Successfully uninstalled pyspark-3.5.5
Successfully installed pyspark-3.5.2
[31mERROR: Could not find a version that satisfies the requirement spark-streaming-kafka-0-10-assembly_2.12 (fro

In [None]:
!./kafka_2.12-3.7.2/bin/kafka-topics.sh --delete --topic fraud-test --bootstrap-server localhost:9092

!./kafka_2.12-3.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic fraud-test

!./kafka_2.12-3.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic fraud-test

Created topic fraud-test.
Topic: fraud-test	TopicId: q-jvrqfQRECstGxH7TR2eg	PartitionCount: 2	ReplicationFactor: 1	Configs: 
	Topic: fraud-test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: fraud-test	Partition: 1	Leader: 0	Replicas: 0	Isr: 0


In [None]:
import threading
from pyspark.sql import SparkSession
import findspark
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType, StringType

def run_consumer():
    findspark.init()

    spark = SparkSession.builder \
        .appName("KafkaConsumer") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1") \
        .getOrCreate()

    kafka_topic_name = "fraud-test"
    kafka_bootstrap_servers = 'localhost:9092'

    # Read from Kafka
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .load()

    # Cast key and value to STRING
    kafka_df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    # Define the schema based on your data
    schema = StructType([
        StructField("step", IntegerType(), True),
        StructField("type", StringType(), True),
        StructField("amount", FloatType(), True),
        StructField("nameOrig", StringType(), True),
        StructField("oldbalanceOrg", FloatType(), True),
        StructField("newbalanceOrig", FloatType(), True),
        StructField("nameDest", StringType(), True),
        StructField("oldbalanceDest", FloatType(), True),
        StructField("newbalanceDest", FloatType(), True),
        StructField("isFraud", IntegerType(), True),
        StructField("isFlaggedFraud", IntegerType(), True),
    ])

    def process_batch(batch_df, batch_id):
        print(f"Processing batch {batch_id}")
        if not batch_df.rdd.isEmpty():
            # Parse the JSON messages using the defined schema
            parsed_df = batch_df.withColumn("json", from_json(col("value"), schema)).select("json.*")

            # Aggregate count based on isFraud values
            agg_df = parsed_df.groupBy("isFraud").count()
            agg_df.show(truncate=False)

             # Get the sum total amount for isFraud groups
            sum_amount_df = parsed_df.groupBy("isFraud").agg({'amount': 'sum'}) \
                                     .withColumnRenamed("sum(amount)", "total_amount")
            print("Total sum of amount for each isFraud group:")
            sum_amount_df.show(truncate=False)

            # Get the count of each type (e.g., CASH_IN, CASH_OUT) grouped by isFraud
            type_count_df = parsed_df.groupBy("isFraud", "type").count()
            print("Count of each type for each isFraud group:")
            type_count_df.show(truncate=False)

            # Filter transactions with amount greater than 10000 and group by isFraud
            high_amount_df = parsed_df.filter(col("amount") > 10000) \
                                      .groupBy("isFraud").count() \
                                      .withColumnRenamed("count", "transactions_gt_10000")
            print("Transactions with amount > 10000 for each isFraud group:")
            high_amount_df.show(truncate=False)

            # Preprocess data and predict
            df_with_predictions, target = preprocess_and_predict(parsed_df.toPandas(), model_path='rf_model.pkl')

            # Display the first few rows of the resulting DataFrame with predictions
            print("DataFrame with predictions appended:")
            print(df_with_predictions.head())
        else:
            print("No data in batch")

    query = kafka_df.writeStream \
        .foreachBatch(process_batch) \
        .outputMode("append") \
        .start()

    query.awaitTermination()
    spark.stop()

# Run the consumer in a separate thread
consumer_thread = threading.Thread(target=run_consumer)
consumer_thread.start()

import time
time.sleep(10)
print("Kafka producer is ready")

# Convert DataFrame rows to a list of tuples (message, key)
# Here, we are using the index as the key. Adjust as needed.
records = sample_df.to_dict('records')
items = [(record, idx) for idx, record in enumerate(records)]

# Call the producer function with a desired batch_size and delay
write_to_kafka("fraud-test", items, batch_size=10000, delay_seconds=1)


Processing batch 0
No data in batch
Kafka producer is ready
Processing batch 1
Sent 10000/100000 messages (batch 1)
+-------+-----+
|isFraud|count|
+-------+-----+
|0      |126  |
+-------+-----+

Total sum of amount for each isFraud group:
+-------+--------------------+
|isFraud|total_amount        |
+-------+--------------------+
|0      |1.6802423145348072E7|
+-------+--------------------+

Count of each type for each isFraud group:
+-------+--------+-----+
|isFraud|type    |count|
+-------+--------+-----+
|0      |CASH_IN |28   |
|0      |CASH_OUT|39   |
|0      |TRANSFER|8    |
|0      |DEBIT   |2    |
|0      |PAYMENT |49   |
+-------+--------+-----+

Sent 20000/100000 messages (batch 2)
Transactions with amount > 10000 for each isFraud group:
+-------+---------------------+
|isFraud|transactions_gt_10000|
+-------+---------------------+
|0      |96                   |
+-------+---------------------+

DataFrame with predictions appended:
   step      type         amount     nameO

In [None]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
import joblib

def preprocess_and_predict(dataframe, model_path='rf_model.pkl'):

    # One-hot encode the 'type' column
    df_onehot = pd.get_dummies(dataframe, columns=["type"])

    # Drop unnecessary columns
    df_onehot = df_onehot.drop(columns=['nameOrig', 'nameDest', 'newbalanceDest'])

    # Split features and target
    X = df_onehot.drop(columns=['isFraud'])
    y = df_onehot['isFraud']

    # Normalize features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # Load the pre-trained model
    model = joblib.load(model_path)

    # Predict using the loaded model
    predictions = model.predict(X_scaled)

    # Step 7: Append predictions to the original DataFrame as a new column 'prediction'
    df_with_preds = dataframe.copy()
    df_with_preds['prediction'] = predictions

    return df_with_preds, y

