In [0]:
%restart_python

In [0]:
%sql
-- USERS TABLE
CREATE TABLE IF NOT EXISTS users (
    user_id STRING NOT NULL,
    default_city STRING,
    default_state STRING,
    CONSTRAINT pk_users PRIMARY KEY (user_id)
) USING DELTA;

-- DEVICES TABLE
CREATE TABLE IF NOT EXISTS devices (
    device_id STRING NOT NULL,
    device_os STRING,
    CONSTRAINT pk_devices PRIMARY KEY (device_id)
) USING DELTA;

-- RECIPIENTS TABLE (Merchants)
CREATE TABLE IF NOT EXISTS recipients (
    merchant_id STRING NOT NULL,
    merchant_category STRING,
    CONSTRAINT pk_recipients PRIMARY KEY (merchant_id)
) USING DELTA;

-- TRANSACTIONS TABLE
CREATE TABLE IF NOT EXISTS transactions (
    transaction_id STRING NOT NULL,
    transaction_date DATE,
    transaction_time STRING,
    user_id STRING NOT NULL,
    merchant_id STRING NOT NULL,
    device_id STRING NOT NULL,
    upi_channel STRING,
    transaction_city STRING,
    transaction_state STRING,
    ip_address STRING,
    transaction_status STRING,
    amount FLOAT,
    transaction_amount_deviation FLOAT,
    fraud INT,
    CONSTRAINT pk_transactions PRIMARY KEY (transaction_id),
    CONSTRAINT fk_txn_user FOREIGN KEY (user_id) REFERENCES users(user_id),
    CONSTRAINT fk_txn_device FOREIGN KEY (device_id) REFERENCES devices(device_id),
    CONSTRAINT fk_txn_merchant FOREIGN KEY (merchant_id) REFERENCES recipients(merchant_id)
) USING DELTA;




In [0]:
from faker import Faker
from datetime import datetime, timedelta
import random
from collections import defaultdict
from pyspark.sql.types import (
    StructType, StructField, StringType, FloatType, DateType, IntegerType
)
from pyspark.sql.functions import col, hour, unix_timestamp, when, date_format

# For demonstration, using a static list:
city_state_dict = {
    "Mumbai": "Maharashtra",
    "Delhi": "Delhi",
    "Bangalore": "Karnataka",
    "Hyderabad": "Telangana",
    "Ahmedabad": "Gujarat",
    "Chennai": "Tamil Nadu",
    "Kolkata": "West Bengal",
    "Pune": "Maharashtra",
    "Jaipur": "Rajasthan",
    "Surat": "Gujarat",
    "Lucknow": "Uttar Pradesh",
    "Kanpur": "Uttar Pradesh",
    "Nagpur": "Maharashtra",
    "Indore": "Madhya Pradesh",
    "Thane": "Maharashtra",
    "Bhopal": "Madhya Pradesh",
    "Visakhapatnam": "Andhra Pradesh",
    "Pimpri-Chinchwad": "Maharashtra",
    "Patna": "Bihar",
    "Vadodara": "Gujarat",
    "Ghaziabad": "Uttar Pradesh",
    "Ludhiana": "Punjab",
    "Agra": "Uttar Pradesh",
    "Nashik": "Maharashtra",
    "Faridabad": "Haryana",
    "Meerut": "Uttar Pradesh",
    "Rajkot": "Gujarat",
    "Kalyan-Dombivli": "Maharashtra",
    "Vasai-Virar": "Maharashtra",
    "Varanasi": "Uttar Pradesh",
    "Srinagar": "Jammu and Kashmir",
    "Aurangabad": "Maharashtra",
    "Dhanbad": "Jharkhand",
    "Amritsar": "Punjab",
    "Navi Mumbai": "Maharashtra",
    "Prayagraj": "Uttar Pradesh",
    "Howrah": "West Bengal",
    "Ranchi": "Jharkhand",
    "Jabalpur": "Madhya Pradesh",
    "Gwalior": "Madhya Pradesh",
    "Coimbatore": "Tamil Nadu",
    "Vijayawada": "Andhra Pradesh",
    "Jodhpur": "Rajasthan",
    "Madurai": "Tamil Nadu",
    "Raipur": "Chhattisgarh",
    "Kota": "Rajasthan",
    "Guwahati": "Assam",
    "Chandigarh": "Chandigarh",
    "Solapur": "Maharashtra",
    "Hubli-Dharwad": "Karnataka",
    "Tiruchirappalli": "Tamil Nadu",
    "Bareilly": "Uttar Pradesh",
    "Mysore": "Karnataka",
    "Tiruppur": "Tamil Nadu",
    "Gurgaon": "Haryana",
    "Aligarh": "Uttar Pradesh",
    "Jalandhar": "Punjab",
    "Bhubaneswar": "Odisha",
    "Salem": "Tamil Nadu",
    "Mira-Bhayandar": "Maharashtra",
    "Warangal": "Telangana",
    "Thiruvananthapuram": "Kerala",
    "Guntur": "Andhra Pradesh",
    "Bhiwandi": "Maharashtra",
    "Saharanpur": "Uttar Pradesh",
    "Gorakhpur": "Uttar Pradesh",
    "Bikaner": "Rajasthan",
    "Amravati": "Maharashtra",
    "Noida": "Uttar Pradesh",
    "Jamshedpur": "Jharkhand",
    "Bhilai": "Chhattisgarh",
    "Cuttack": "Odisha",
    "Firozabad": "Uttar Pradesh",
    "Kochi": "Kerala",
    "Nellore": "Andhra Pradesh",
    "Bhavnagar": "Gujarat",
    "Dehradun": "Uttarakhand",
    "Durgapur": "West Bengal",
    "Asansol": "West Bengal",
    "Nanded": "Maharashtra",
    "Kolhapur": "Maharashtra",
    "Ajmer": "Rajasthan",
    "Gulbarga": "Karnataka",
    "Jamnagar": "Gujarat",
    "Ujjain": "Madhya Pradesh",
    "Loni": "Uttar Pradesh",
    "Siliguri": "West Bengal",
    "Jhansi": "Uttar Pradesh",
    "Ulhasnagar": "Maharashtra",
    "Jammu": "Jammu and Kashmir",
    "Sangli-Miraj & Kupwad": "Maharashtra",
    "Mangalore": "Karnataka",
    "Erode": "Tamil Nadu",
    "Belgaum": "Karnataka",
    "Ambattur": "Tamil Nadu",
    "Tirunelveli": "Tamil Nadu",
    "Malegaon": "Maharashtra",
    "Gaya": "Bihar",
    "Jalgaon": "Maharashtra",
    "Udaipur": "Rajasthan",
    "Maheshtala": "West Bengal",
    "Davanagere": "Karnataka",
    "Kozhikode": "Kerala",
    "Kurnool": "Andhra Pradesh",
    "Akola": "Maharashtra",
    "Rajpur Sonarpur": "West Bengal",
    "Rajahmundry": "Andhra Pradesh",
    "Bokaro": "Jharkhand",
    "South Dumdum": "West Bengal",
    "Bellary": "Karnataka",
    "Patiala": "Punjab",
    "Gopalpur": "Odisha",
    "Agartala": "Tripura",
    "Bhagalpur": "Bihar",
    "Muzaffarnagar": "Uttar Pradesh",
    "Bhatpara": "West Bengal",
    "Panihati": "West Bengal",
    "Latur": "Maharashtra",
    "Dhule": "Maharashtra",
    "Tirupati": "Andhra Pradesh",
    "Rohtak": "Haryana",
    "Korba": "Chhattisgarh",
    "Bhilwara": "Rajasthan",
    "Berhampur": "Odisha",
    "Muzaffarpur": "Bihar",
    "Ahmednagar": "Maharashtra",
    "Mathura": "Uttar Pradesh",
    "Kollam": "Kerala",
    "Avadi": "Tamil Nadu",
    "Kadapa": "Andhra Pradesh",
    "Kamarhati": "West Bengal",
    "Bilaspur": "Chhattisgarh",
    "Shahjahanpur": "Uttar Pradesh",
    "Bijapur": "Karnataka",
    "Rampur": "Uttar Pradesh",
    "Shivamogga": "Karnataka",
    "Chandrapur": "Maharashtra",
    "Junagadh": "Gujarat",
    "Thrissur": "Kerala",
    "Alwar": "Rajasthan",
    "Bardhaman": "West Bengal",
    "Kulti": "West Bengal",
    "Kakinada": "Andhra Pradesh",
    "Nizamabad": "Telangana",
    "Parbhani": "Maharashtra",
    "Tumkur": "Karnataka",
    "Khammam": "Telangana",
    "Ozhukarai": "Puducherry",
    "Bihar Sharif": "Bihar",
    "Panipat": "Haryana",
    "Darbhanga": "Bihar",
    "Bally": "West Bengal",
    "Aizawl": "Mizoram",
    "Dewas": "Madhya Pradesh",
    "Ichalkaranji": "Maharashtra",
    "Karnal": "Haryana",
    "Bathinda": "Punjab",
    "Jalna": "Maharashtra",
    "Eluru": "Andhra Pradesh",
    "Kirari Suleman Nagar": "Delhi",
    "Barasat": "West Bengal",
    "Purnia": "Bihar",
    "Satna": "Madhya Pradesh",
    "Mau": "Uttar Pradesh",
    "Sonipat": "Haryana",
    "Farrukhabad": "Uttar Pradesh",
    "Sagar": "Madhya Pradesh",
    "Rourkela": "Odisha",
    "Durg": "Chhattisgarh",
    "Imphal": "Manipur",
    "Ratlam": "Madhya Pradesh",
    "Hapur": "Uttar Pradesh",
    "Arrah": "Bihar",
    "Karimnagar": "Telangana",
    "Anantapur": "Andhra Pradesh",
    "Etawah": "Uttar Pradesh",
    "Ambarnath": "Maharashtra",
    "North Dumdum": "West Bengal",
    "Bharatpur": "Rajasthan",
    "Begusarai": "Bihar",
    "New Delhi": "Delhi",
    "Gandhidham": "Gujarat",
    "Baranagar": "West Bengal",
    "Tiruvottiyur": "Tamil Nadu",
    "Puducherry": "Puducherry",
    "Sikar": "Rajasthan",
    "Thoothukudi": "Tamil Nadu",
    "Rewa": "Madhya Pradesh",
    "Mirzapur": "Uttar Pradesh",
    "Raichur": "Karnataka",
    "Pali": "Rajasthan"
}
# (Paste the above dictionary here)

cities = list(city_state_dict.keys())
states = list(set(city_state_dict.values()))




fake = Faker('en_IN')
random.seed(42)
Faker.seed(42)

NUM_USERS = 2000
NUM_DEVICES = 1000
NUM_RECIPIENTS = 1500
NUM_TRANSACTIONS = 20000

merchant_categories = [
    "Mobile Recharge", "Electricity Bill", "Water Bill", "Gas Bill", "DTH Recharge",
    "Online Shopping", "Travel Booking", "Food Delivery", "Education Fees", "Healthcare Services"
]
device_oses = ["Android", "iOS", "Windows", "MacOS"]
upi_channels = ["GPay", "PhonePe", "Paytm", "BHIM"]
transaction_statuses = ["Completed", "Pending", "Failed"]

def random_date(start, end):
    return start + timedelta(days=random.randint(0, (end - start).days))

# USERS TABLE
# Prepare the list of cities from the city_state_dict
cities = list(city_state_dict.keys())

user_rows, user_ids, user_city_map = [], [], {}
for i in range(1, NUM_USERS + 1):
    user_id = str(i)
    default_city = random.choice(cities)
    default_state = city_state_dict[default_city]

    user_rows.append((user_id, default_city, default_state))
    user_ids.append(user_id)
    # Sample 3 known cities for the user from the same city list
    user_city_map[user_id] = set(random.sample(cities, min(3, len(cities))))
user_schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("default_city", StringType(), True),
    StructField("default_state", StringType(), True)
])
users_df = spark.createDataFrame(user_rows, schema=user_schema)
users_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("upi_fraud_schema.users")

# DEVICES TABLE
device_rows, device_ids, user_to_devices = [], [], defaultdict(list)
for i in range(1, NUM_DEVICES + 1):
    device_id = str(i)
    device_os = random.choice(device_oses)
    user_id = random.choice(user_ids)
    device_rows.append((device_id, device_os))
    device_ids.append(device_id)
    user_to_devices[user_id].append(device_id)
device_schema = StructType([
    StructField("device_id", StringType(), False),
    StructField("device_os", StringType(), True)
])
devices_df = spark.createDataFrame(device_rows, schema=device_schema)
devices_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("upi_fraud_schema.devices")

# RECIPIENTS TABLE (Merchants)
recipient_rows, merchant_ids = [], []
for i in range(1, NUM_RECIPIENTS + 1):
    merchant_id = str(i)
    merchant_category = random.choice(merchant_categories)
    recipient_rows.append((merchant_id, merchant_category))
    merchant_ids.append(merchant_id)
recipient_schema = StructType([
    StructField("merchant_id", StringType(), False),
    StructField("merchant_category", StringType(), True)
])
recipients_df = spark.createDataFrame(recipient_rows, schema=recipient_schema)
recipients_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("upi_fraud_schema.recipients")

# TRANSACTIONS TABLE
transaction_rows = []
for i in range(1, NUM_TRANSACTIONS + 1):
    transaction_id = str(i)
    date_obj = random_date(datetime(2023,1,1), datetime(2024,6,30))
    transaction_date = date_obj.date()
    transaction_time = fake.time(pattern="%I:%M:%S %p")
    user_id = random.choice(user_ids)
    merchant_id = random.choice(merchant_ids)
    possible_devices = user_to_devices[user_id]
    device_id = random.choice(possible_devices) if possible_devices else random.choice(device_ids)
    upi_channel = random.choice(upi_channels)
    transaction_city = random.choice(list(city_state_dict.keys()))
    transaction_state = city_state_dict[transaction_city]
    ip_address = fake.ipv4()
    transaction_status = random.choice(transaction_statuses)
    amount = round(random.uniform(1, 5000), 2)
    transaction_amount_deviation = round(random.uniform(-100, 100), 2)
# --- Improved Risk Score Calculation ---
    risk_score = 0

    # High amount buckets
    if amount > 5000:
        risk_score += 40
    elif amount > 3000:
        risk_score += 25
    elif amount > 1000:
        risk_score += 10

    # Large deviation from user's normal transaction amount
    if abs(transaction_amount_deviation) > 100:
        risk_score += 25
    elif abs(transaction_amount_deviation) > 50:
        risk_score += 10

    # Location anomaly: city or state not in user's known set
    if transaction_city not in user_city_map[user_id]:
        risk_score += 15
    if transaction_state not in [city_state_dict[city] for city in user_city_map[user_id]]:
        risk_score += 10

    # Device anomaly: device rarely used by this user
    if device_id not in user_to_devices[user_id][:2]:  # e.g., not among user's 2 most common devices
        risk_score += 10

    # Odd hour: between 12 AM and 5 AM
    txn_hour = int(transaction_time[:2])
    if transaction_time.endswith('AM') and 0 <= txn_hour <= 5:
        risk_score += 15

    # Status and amount interaction
    if transaction_status == 'Failed' and amount > 3000:
        risk_score += 20
    elif transaction_status == 'Pending' and amount > 3000:
        risk_score += 10

    # Multiple risk factors: bonus for stacking
    if risk_score >= 60:
        risk_score += 10

    # Normalize and assign fraud label
    risk_score = min(risk_score / 100, 1.0)
    # fraud = 1 if random.random() < risk_score else 0
    fraud = 1 if risk_score > 0.7 else 0   # 0.5 is just an example; you can choose any threshold

    transaction_rows.append((
        transaction_id, transaction_date, transaction_time, user_id, merchant_id, device_id, upi_channel,
        transaction_city, transaction_state, ip_address, transaction_status, amount, transaction_amount_deviation, fraud
    ))

transaction_schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("transaction_date", DateType(), True),
    StructField("transaction_time", StringType(), True),
    StructField("user_id", StringType(), False),
    StructField("merchant_id", StringType(), False),
    StructField("device_id", StringType(), False),
    StructField("upi_channel", StringType(), True),
    StructField("transaction_city", StringType(), True),
    StructField("transaction_state", StringType(), True),
    StructField("ip_address", StringType(), True),
    StructField("transaction_status", StringType(), True),
    StructField("amount", FloatType(), True),
    StructField("transaction_amount_deviation", FloatType(), True),
    StructField("fraud", IntegerType(), True)
])
transactions_df = spark.createDataFrame(transaction_rows, schema=transaction_schema)
transactions_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("upi_fraud_schema.transactions")


In [0]:
from pyspark.sql.functions import hour, unix_timestamp, when, date_format, col

# Read tables
users_df = spark.table("upi_fraud_schema.users")
devices_df = spark.table("upi_fraud_schema.devices")
recipients_df = spark.table("upi_fraud_schema.recipients")
transactions_df = spark.table("upi_fraud_schema.transactions")

# Join and engineer features
features_df = transactions_df.alias("t") \
    .join(users_df.alias("u"), col("t.user_id") == col("u.user_id")) \
    .join(devices_df.alias("d"), col("t.device_id") == col("d.device_id")) \
    .join(recipients_df.alias("r"), col("t.merchant_id") == col("r.merchant_id")) \
    .withColumn("hour", hour(unix_timestamp(col("t.transaction_time"), "hh:mm:ss a").cast("timestamp"))) \
    .withColumn("day_of_week", date_format(col("t.transaction_date"), "EEEE")) \
    .withColumn("is_high_value", (col("t.amount") > 3000).cast("int")) \
    .withColumn("is_odd_hour", when((col("hour") < 6) | (col("hour") > 22), 1).otherwise(0)) \
    .select(
        "t.transaction_id", "t.user_id", "t.merchant_id", "t.device_id",
        "t.amount", "hour", "day_of_week",
        "t.transaction_amount_deviation", "is_high_value", "is_odd_hour",
        "t.upi_channel", "t.transaction_status",
        "r.merchant_category", "d.device_os", "u.default_city", "u.default_state",
        "t.transaction_city", "t.transaction_state", "t.fraud"
    )

features_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("upi_fraud_schema.features")


In [0]:
%sql
SELECT * FROM features;


Transaction_ID,Date,Time,Merchant_ID,Customer_ID,Device_ID,Transaction_Type,Payment_Gateway,Transaction_City,Transaction_State,IP_Address,Transaction_Status,Device_OS,Transaction_Frequency,Merchant_Category,Transaction_Channel,Transaction_Amount_Deviation,Days_Since_Last_Transaction,amount,fraud
T25247038,25/07/23,04:53:50 AM,31fbcb06-d53b-4163-9bba-ba631fc7c325,cd46bbaa-be64-4444-9377-661737f017e1,2422bda8-2103-406b-b4c3-a94904b840d5,Bank Transfer,Bank of Data,South Jeffrey,Rajasthan,147.99.110.163,Completed,iOS,22,Home delivery,Online,-29.36,15,877.92,0
T42135383,28/12/23,05:55:50 AM,ec95a781-f6cc-499a-89df-3e75ea6df8e0,0939f532-8daa-4356-9e9c-7d118ee2e529,bfd6eb2c-b2be-47f1-9300-a6d417865040,Purchase,Other,South Jason,Jharkhand,12.109.89.16,Completed,Windows,34,Donations and Devotion,Online,-29.76,20,497.34,1
T25969695,20/05/24,12:06:43 AM,34ef83b8-dd2b-406f-a1e6-ec0ba03338ff,ecb3af97-5874-441a-873f-d35751708c09,5ac434e2-f8b4-4e87-abe4-f6a4d5aa7059,Refund,Gamma Bank,Port Michelleville,Himachal Pradesh,78.43.152.230,Failed,MacOS,2,Investment,Online,-57.31,24,61.63,0
T62055641,06/06/23,11:37:09 AM,1ebbc5b9-4ed6-4733-886d-3e8269ea0246,674b9838-a35d-4ab8-b782-e1572cc497eb,aa9370b8-e811-48c0-afaf-3237862e555e,Purchase,Sigma Bank,Lake Debbie,West Bengal,81.150.235.134,Pending,iOS,14,Other,In-store,79.16,3,1209.02,0
T16341247,26/10/23,06:04:27 PM,12a72829-5f2e-4cde-9867-d0171f84eb3d,99f5154e-2846-4bee-bdfb-35a2e04cf991,8222908f-8c57-481a-8923-962a11af1427,Other,Other,Shawhaven,Kerala,88.185.135.153,Failed,MacOS,34,Other,In-store,-28.32,4,4462.02,1
T61341313,02/04/23,07:12:22 AM,0c96b4fb-53f0-484d-9cbf-ec163fc2cbe2,9bbc2c8b-37fd-4c02-9643-183cdca6b404,94856a70-f51e-4fed-92a8-5f7776ab31a5,Bill Payment,Other,Cassandraton,Sikkim,164.30.255.113,Pending,MacOS,8,More Services,Mobile,33.99,18,2617.84,1
T50942554,17/05/24,10:49:40 AM,ae1d579e-a23a-49a0-9b0a-b02a39ab26bc,920800d1-8638-4bb8-85ac-c3d5958cec9e,8addc177-efe2-4ac4-b06a-58ea11892142,Subscription,Bank of Data,Lake Larry,Maharashtra,34.239.218.198,Failed,Android,34,Utilities,In-store,87.79,1,529.11,0
T31728214,05/05/24,08:59:21 AM,7542ad5e-85da-4776-aba2-db8dcf298e78,1c83b3a1-3c5c-47c9-b3ac-d85b3ac31008,43ff2277-9697-438c-8cc1-0c6ede79a2f5,Purchase,Other,Karenchester,Kerala,221.11.38.221,Pending,Android,3,Utilities,Mobile,-33.32,4,164.06,0
T27273137,25/05/24,12:12:10 PM,97c30750-c5eb-4cbe-8c7a-186645492623,8d740be9-f3e6-4f4d-87a2-78739ae08952,e4635fb0-615a-4c12-8f50-ee1967ee1a8a,Investment,SamplePay,Sandersborough,Telangana,119.59.53.212,Completed,Android,30,Donations and Devotion,In-store,-78.19,28,2995.34,0
T86377170,17/12/23,10:13:48 PM,820db287-d1ed-4397-8e40-080df9c3273b,424befaf-54c8-44b1-9f39-827b2902b9ff,80f08e45-3c31-4963-b09f-dff8e0b3ab7f,Purchase,SamplePay,Port Anthonybury,Nagaland,22.119.135.128,Pending,MacOS,22,Purchases,Mobile,-63.27,0,2601.75,0


In [0]:
import pandas as pd
df=features_df
pdf=features_df.toPandas()


pdf.to_csv('upi_transactions_2024.csv', index=False)
print("Dataset saved as 'upi_transactions_2024.csv'")

Dataset saved as 'upi_transactions_2024.csv'


IMPORTING LIBRARIES


In [0]:
# importing libraries
# %pip install pandas
import pandas as pd
import numpy as np
import matplotlib.pyplot as px
import seaborn as sns

In [0]:
# df.head()

df = spark.read.table("features")
df.show(5, truncate=False)


+--------------+--------+-----------+------------------------------------+------------------------------------+------------------------------------+----------------+---------------+-----------------+-----------------+-------------+------------------+---------+---------------------+----------------------------+-------------------+----------------------------+---------------------------+-------+-----+
|Transaction_ID|Date    |Time       |Merchant_ID                         |Customer_ID                         |Device_ID                           |Transaction_Type|Payment_Gateway|Transaction_City |Transaction_State|IP_Address   |Transaction_Status|Device_OS|Transaction_Frequency|Merchant_Category           |Transaction_Channel|Transaction_Amount_Deviation|Days_Since_Last_Transaction|amount |fraud|
+--------------+--------+-----------+------------------------------------+------------------------------------+------------------------------------+----------------+---------------+-------------

In [0]:
df.columns
# df.printSchema

['Transaction_ID',
 'Date',
 'Time',
 'Merchant_ID',
 'Customer_ID',
 'Device_ID',
 'Transaction_Type',
 'Payment_Gateway',
 'Transaction_City',
 'Transaction_State',
 'IP_Address',
 'Transaction_Status',
 'Device_OS',
 'Transaction_Frequency',
 'Merchant_Category',
 'Transaction_Channel',
 'Transaction_Amount_Deviation',
 'Days_Since_Last_Transaction',
 'amount',
 'fraud']

Data Preparation
-duplicate, missing, unique, removing or dropping unique values

DUPLICATE VALUES


In [0]:
from pyspark.sql.functions import count

# Find all rows that are duplicated across all columns
df.groupBy(df.columns) \
    .count() \
    .filter("count > 1") \
    .show(truncate=False)
    #  .count()


# df.duplictaed()->pandas


+--------------+----+----+-----------+-----------+---------+----------------+---------------+----------------+-----------------+----------+------------------+---------+---------------------+-----------------+-------------------+----------------------------+---------------------------+------+-----+-----+
|Transaction_ID|Date|Time|Merchant_ID|Customer_ID|Device_ID|Transaction_Type|Payment_Gateway|Transaction_City|Transaction_State|IP_Address|Transaction_Status|Device_OS|Transaction_Frequency|Merchant_Category|Transaction_Channel|Transaction_Amount_Deviation|Days_Since_Last_Transaction|amount|fraud|count|
+--------------+----+----+-----------+-----------+---------+----------------+---------------+----------------+-----------------+----------+------------------+---------+---------------------+-----------------+-------------------+----------------------------+---------------------------+------+-----+-----+
+--------------+----+----+-----------+-----------+---------+----------------+--------

MISSING VALUES

In [0]:

from pyspark.sql.functions import col, sum as spark_sum
from functools import reduce

# This will show you how many nulls are present in each column:
missing_counts = df.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns
])
missing_counts.show()


# This will display all rows that have at least one null value
df.filter(
    reduce(lambda a, b: a | b, (col(c).isNull() for c in df.columns))
).show()


# This gives you the number of rows with at least one null:
num_missing_rows = df.filter(
    reduce(lambda a, b: a | b, (col(c).isNull() for c in df.columns))
).count()
print("Rows with any missing value:", num_missing_rows)

+--------------+----+----+-----------+-----------+---------+----------------+---------------+----------------+-----------------+----------+------------------+---------+---------------------+-----------------+-------------------+----------------------------+---------------------------+------+-----+
|Transaction_ID|Date|Time|Merchant_ID|Customer_ID|Device_ID|Transaction_Type|Payment_Gateway|Transaction_City|Transaction_State|IP_Address|Transaction_Status|Device_OS|Transaction_Frequency|Merchant_Category|Transaction_Channel|Transaction_Amount_Deviation|Days_Since_Last_Transaction|amount|fraud|
+--------------+----+----+-----------+-----------+---------+----------------+---------------+----------------+-----------------+----------+------------------+---------+---------------------+-----------------+-------------------+----------------------------+---------------------------+------+-----+
|             0|   0|   0|          0|          0|        0|               0|              0|          

How to deal with missing values if any
-drop the missing values ( only if proportion is very less)
-fill th emissing values
-froward fill
-back fill
-linear regression
-mean values (but it is sensitive to outliers)
-median values ( not sensitive to outliers)

In [0]:
num_rows = df.count()
num_cols = len(df.columns)
print(f"Shape: ({num_rows}, {num_cols})")


Shape: (20000, 20)


DEALING WITH UNIQUES VALUES


In [0]:
from pyspark.sql.functions import countDistinct
import pandas as pd

# Compute unique counts for each column
unique_counts = [(c, df.select(countDistinct(c)).first()[0]) for c in df.columns]

# Convert to Spark DataFrame for display
unique_counts_df = spark.createDataFrame(unique_counts, ["column_name", "unique_count"])
unique_counts_df.show(truncate=False)


+----------------------------+------------+
|column_name                 |unique_count|
+----------------------------+------------+
|Transaction_ID              |19997       |
|Date                        |547         |
|Time                        |17817       |
|Merchant_ID                 |20000       |
|Customer_ID                 |20000       |
|Device_ID                   |20000       |
|Transaction_Type            |7           |
|Payment_Gateway             |9           |
|Transaction_City            |197         |
|Transaction_State           |27          |
|IP_Address                  |20000       |
|Transaction_Status          |3           |
|Device_OS                   |4           |
|Transaction_Frequency       |51          |
|Merchant_Category           |10          |
|Transaction_Channel         |3           |
|Transaction_Amount_Deviation|12663       |
|Days_Since_Last_Transaction |31          |
|amount                      |19618       |
|fraud                       |2 

Drop the unique values from the data



In [0]:

df.show(5, truncate=False)

+--------------+--------+-----------+------------------------------------+------------------------------------+------------------------------------+----------------+---------------+-----------------+-----------------+-------------+------------------+---------+---------------------+----------------------------+-------------------+----------------------------+---------------------------+-------+-----+
|Transaction_ID|Date    |Time       |Merchant_ID                         |Customer_ID                         |Device_ID                           |Transaction_Type|Payment_Gateway|Transaction_City |Transaction_State|IP_Address   |Transaction_Status|Device_OS|Transaction_Frequency|Merchant_Category           |Transaction_Channel|Transaction_Amount_Deviation|Days_Since_Last_Transaction|amount |fraud|
+--------------+--------+-----------+------------------------------------+------------------------------------+------------------------------------+----------------+---------------+-------------

In [0]:
from pyspark.sql.functions import col

# For fraud transactions (fraud = 1)
fraud_df = df.filter(col('fraud') == 1)

# For legitimate transactions (fraud = 0)
normal_df = df.filter(col('fraud') == 0)

print(f"Fraud cases: {fraud_df.count()}")
print(f"Legitimate cases: {normal_df.count()}")


Fraud cases: 5929
Legitimate cases: 14071


Analysing the fraud df


In [0]:
# pdf = df.toPandas()
print(pdf.columns.tolist())


['transaction_id', 'user_id', 'merchant_id', 'device_id', 'amount', 'hour', 'day_of_week', 'transaction_amount_deviation', 'is_high_value', 'is_odd_hour', 'upi_channel', 'transaction_status', 'merchant_category', 'device_os', 'default_city', 'default_state', 'transaction_city', 'transaction_state', 'fraud']


In [0]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder, StandardScaler

# Assume pdf is your pandas DataFrame

# Categorical columns to encode
categorical_cols = [
    'user_id',
    'merchant_id',
    'device_id',
    'day_of_week',
    'upi_channel',
    'transaction_status',
    'merchant_category',
    'device_os',
    'default_city',
    'default_state',
    'transaction_city',
    'transaction_state'
]

# Numerical columns to scale
numeric_cols = [
    'amount',
    'hour',
    'transaction_amount_deviation',
    'is_high_value',
    'is_odd_hour'
]

# 2. Label Encoding for Categorical Columns
for col in categorical_cols:
    le = LabelEncoder()
    pdf[col + '_enc'] = le.fit_transform(pdf[col].astype(str))


# 3. Standard Scaling for Numerical Columns
scaler = StandardScaler()
pdf[numeric_cols] = scaler.fit_transform(pdf[numeric_cols])

# 4. Prepare final feature list for modeling
feature_cols = [col + '_enc' for col in categorical_cols] + numeric_cols

# 5. X and y for modeling
X = pdf[feature_cols]
y = pdf['fraud']

# (Optional) Show the first few rows of the processed DataFrame
print(pdf[feature_cols + ['fraud']].head())


   user_id_enc  merchant_id_enc  ...  is_odd_hour  fraud
0          853              338  ...    -0.638714      0
1          670             1108  ...    -0.638714      0
2          762             1193  ...     1.565647      1
3         1967              279  ...    -0.638714      0
4           99             1095  ...     1.565647      0

[5 rows x 18 columns]


In [0]:
from sklearn.model_selection import train_test_split

# X = your features DataFrame
# y = your target variable (e.g., pdf['fraud'])

X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.3,         # 20% for testing, 80% for training
    random_state=42,       # for reproducibility
    stratify=y             # keeps the fraud ratio similar in both sets
)

print("Train shape:", X_train.shape, y_train.shape)
print("Test shape:", X_test.shape, y_test.shape)


Train shape: (14000, 17) (14000,)
Test shape: (6000, 17) (6000,)


In [0]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, roc_auc_score

# 1. Initialize the Random Forest model
clf = RandomForestClassifier(
    n_estimators=100,        # Number of trees
    random_state=42,         # For reproducibility
    class_weight='balanced'  # Helps with class imbalance
)

# 2. Train the model
clf.fit(X_train, y_train)

# 3. Make predictions
y_pred = clf.predict(X_test)
y_proba = clf.predict_proba(X_test)[:, 1]  # Probability of class 1 (fraud)

# 4. Evaluate the model
print("Classification Report:\n", classification_report(y_test, y_pred))
print(f"ROC AUC Score: {roc_auc_score(y_test, y_proba):.4f}")


Classification Report:
               precision    recall  f1-score   support

           0       0.94      0.94      0.94      4164
           1       0.86      0.86      0.86      1836

    accuracy                           0.91      6000
   macro avg       0.90      0.90      0.90      6000
weighted avg       0.91      0.91      0.91      6000

ROC AUC Score: 0.9782


In [0]:
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score


# Set scale_pos_weight to balance classes (optional, recommended for imbalanced data)
scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()

# Initialize and train XGBoost


model = xgb.XGBClassifier(
    objective='binary:logistic',
    scale_pos_weight=scale_pos_weight,
    eval_metric='auc',
    random_state=42
)

model.fit(X_train, y_train)

# Predict and evaluate
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1]
print(classification_report(y_test, y_pred))
print("ROC AUC Score:", roc_auc_score(y_test, y_proba))


              precision    recall  f1-score   support

           0       0.96      0.92      0.94      4164
           1       0.83      0.92      0.87      1836

    accuracy                           0.92      6000
   macro avg       0.90      0.92      0.91      6000
weighted avg       0.92      0.92      0.92      6000

ROC AUC Score: 0.9791346200130174
