### Load Static Data

In [0]:
from pyspark.sql.types import *

# 2. Define schema for UPI_2024_schema
upi_schema = StructType([
    StructField("transaction id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("transaction type", StringType(), True),
    StructField("merchant_category", StringType(), True),
    StructField("amount (INR)", DoubleType(), True),
    StructField("transaction_status", StringType(), True),
    StructField("sender_age_group", StringType(), True),
    StructField("receiver_age_group", StringType(), True),
    StructField("sender_state", StringType(), True),
    StructField("sender_bank", StringType(), True),
    StructField("receiver_bank", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("network_type", StringType(), True),
    StructField("fraud_flag", IntegerType(), True),
    StructField("hour_of_day", IntegerType(), True),
    StructField("day_of_week", StringType(), True),
    StructField("is_weekend", IntegerType(), True)
])

# Load directly without credentials
s3_url = "s3a://upi-fraud-data-project-2025/upi_transactions_2024_.csv"

# 4. Defin DF for dataset and display
df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(upi_schema) \
    .load(s3_url)

# 5. Display dataset 
display(df.limit(10))

transaction id,timestamp,transaction type,merchant_category,amount (INR),transaction_status,sender_age_group,receiver_age_group,sender_state,sender_bank,receiver_bank,device_type,network_type,fraud_flag,hour_of_day,day_of_week,is_weekend
TXN0000000001,2024-10-08T15:17:28.000Z,P2P,Entertainment,868.0,SUCCESS,26-35,18-25,Delhi,Axis,SBI,Android,4G,0,15,Tuesday,0
TXN0000000002,2024-04-11T06:56:00.000Z,P2M,Grocery,1011.0,SUCCESS,26-35,26-35,Uttar Pradesh,ICICI,Axis,iOS,4G,0,6,Thursday,0
TXN0000000003,2024-04-02T13:27:18.000Z,P2P,Grocery,477.0,SUCCESS,26-35,36-45,Karnataka,Yes Bank,PNB,Android,4G,0,13,Tuesday,0
TXN0000000004,2024-01-07T10:09:17.000Z,P2P,Fuel,2784.0,SUCCESS,26-35,26-35,Delhi,ICICI,PNB,Android,5G,0,10,Sunday,1
TXN0000000005,2024-01-23T19:04:23.000Z,P2P,Shopping,990.0,SUCCESS,26-35,18-25,Delhi,Axis,Yes Bank,iOS,WiFi,0,19,Tuesday,0
TXN0000000006,2024-10-07T22:32:07.000Z,P2P,Food,91.0,SUCCESS,36-45,18-25,Karnataka,IndusInd,Yes Bank,Android,3G,0,22,Monday,0
TXN0000000007,2024-02-08T10:25:57.000Z,P2P,Other,314.0,SUCCESS,36-45,18-25,Telangana,HDFC,IndusInd,Android,4G,0,10,Thursday,0
TXN0000000008,2024-10-27T18:47:02.000Z,P2P,Utilities,264.0,SUCCESS,46-55,36-45,Maharashtra,Yes Bank,SBI,Android,5G,0,18,Sunday,1
TXN0000000009,2024-11-21T09:39:16.000Z,P2P,Other,887.0,SUCCESS,46-55,36-45,Maharashtra,Kotak,HDFC,Android,4G,0,9,Thursday,0
TXN0000000010,2024-11-11T15:58:56.000Z,P2M,Grocery,3260.0,SUCCESS,46-55,18-25,Delhi,SBI,HDFC,Android,4G,0,15,Monday,0


### LOAD LIVE DATA



In [0]:
# 1. Define the folder with a wildcard to catch all 999+ CSVs
live_folder = "s3a://upi-fraud-data-project-2025/live_data/*.csv"

# 2. Read them all at once
# We use header=True because every one of your 999 files has its own header row
live_all_df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(upi_schema) \
    .load(live_folder)

# 3. Register as a view for SQL
live_all_df.createOrReplaceTempView("live_upi_combined")

# 4. Check the count! 
# This should show a number around 5000 (if you have 1000 files with 5 rows each)
print(f"Total rows currently processed from S3: {live_all_df.count()}")

display(live_all_df.limit(10))

Total rows currently processed from S3: 34740


transaction id,timestamp,transaction type,merchant_category,amount (INR),transaction_status,sender_age_group,receiver_age_group,sender_state,sender_bank,receiver_bank,device_type,network_type,fraud_flag,hour_of_day,day_of_week,is_weekend
TXN_LIVE_443044,2025-12-22T08:15:28.000Z,Bill Payment,Entertainment,22354.79,SUCCESS,26-35,18-25,Uttar Pradesh,SBI,PNB,Android,5G,0,8,Monday,0
TXN_LIVE_336124,2025-12-22T08:15:28.000Z,Bill Payment,Shopping,39647.7,SUCCESS,18-25,26-35,Uttar Pradesh,ICICI,ICICI,iOS,5G,0,8,Monday,0
TXN_LIVE_502377,2025-12-22T08:15:28.000Z,Bill Payment,Shopping,1421.03,SUCCESS,36-45,36-45,Delhi,HDFC,SBI,Android,5G,0,8,Monday,0
TXN_LIVE_411825,2025-12-22T08:15:28.000Z,Bill Payment,Entertainment,27776.51,SUCCESS,18-25,18-25,Uttar Pradesh,ICICI,SBI,Android,4G,0,8,Monday,0
TXN_LIVE_885361,2025-12-22T08:15:28.000Z,Bill Payment,Entertainment,3427.22,SUCCESS,26-35,26-35,Maharashtra,SBI,ICICI,Android,4G,0,8,Monday,0
TXN_LIVE_707807,2025-12-21T23:49:41.000Z,Bill Payment,Shopping,5181.14,SUCCESS,18-25,18-25,Uttar Pradesh,Axis,Axis,iOS,WiFi,0,23,Sunday,1
TXN_LIVE_140456,2025-12-21T23:49:41.000Z,Bill Payment,Grocery,25969.68,SUCCESS,18-25,26-35,Uttar Pradesh,HDFC,Axis,Android,4G,0,23,Sunday,1
TXN_LIVE_967969,2025-12-21T23:49:41.000Z,Bill Payment,Entertainment,4502.27,SUCCESS,36-45,18-25,Uttar Pradesh,HDFC,SBI,Android,WiFi,0,23,Sunday,1
TXN_LIVE_954923,2025-12-21T23:49:41.000Z,Bill Payment,Grocery,26855.05,SUCCESS,36-45,18-25,Uttar Pradesh,HDFC,ICICI,iOS,4G,0,23,Sunday,1
TXN_LIVE_455872,2025-12-21T23:49:41.000Z,P2M,Entertainment,24898.04,SUCCESS,36-45,26-35,Karnataka,HDFC,HDFC,Android,WiFi,0,23,Sunday,1


In [0]:
## RENAME COLUMNS IN LIVE DATA

live_df = live_all_df.withColumnsRenamed({
    "transaction id" : "transaction_id",
    "transaction type" : "transaction_type",
    "amount (INR)" : "amount_INR"
})
display(live_df.limit(10))

transaction_id,timestamp,transaction_type,merchant_category,amount_INR,transaction_status,sender_age_group,receiver_age_group,sender_state,sender_bank,receiver_bank,device_type,network_type,fraud_flag,hour_of_day,day_of_week,is_weekend
TXN_LIVE_443044,2025-12-22T08:15:28.000Z,Bill Payment,Entertainment,22354.79,SUCCESS,26-35,18-25,Uttar Pradesh,SBI,PNB,Android,5G,0,8,Monday,0
TXN_LIVE_336124,2025-12-22T08:15:28.000Z,Bill Payment,Shopping,39647.7,SUCCESS,18-25,26-35,Uttar Pradesh,ICICI,ICICI,iOS,5G,0,8,Monday,0
TXN_LIVE_502377,2025-12-22T08:15:28.000Z,Bill Payment,Shopping,1421.03,SUCCESS,36-45,36-45,Delhi,HDFC,SBI,Android,5G,0,8,Monday,0
TXN_LIVE_411825,2025-12-22T08:15:28.000Z,Bill Payment,Entertainment,27776.51,SUCCESS,18-25,18-25,Uttar Pradesh,ICICI,SBI,Android,4G,0,8,Monday,0
TXN_LIVE_885361,2025-12-22T08:15:28.000Z,Bill Payment,Entertainment,3427.22,SUCCESS,26-35,26-35,Maharashtra,SBI,ICICI,Android,4G,0,8,Monday,0
TXN_LIVE_707807,2025-12-21T23:49:41.000Z,Bill Payment,Shopping,5181.14,SUCCESS,18-25,18-25,Uttar Pradesh,Axis,Axis,iOS,WiFi,0,23,Sunday,1
TXN_LIVE_140456,2025-12-21T23:49:41.000Z,Bill Payment,Grocery,25969.68,SUCCESS,18-25,26-35,Uttar Pradesh,HDFC,Axis,Android,4G,0,23,Sunday,1
TXN_LIVE_967969,2025-12-21T23:49:41.000Z,Bill Payment,Entertainment,4502.27,SUCCESS,36-45,18-25,Uttar Pradesh,HDFC,SBI,Android,WiFi,0,23,Sunday,1
TXN_LIVE_954923,2025-12-21T23:49:41.000Z,Bill Payment,Grocery,26855.05,SUCCESS,36-45,18-25,Uttar Pradesh,HDFC,ICICI,iOS,4G,0,23,Sunday,1
TXN_LIVE_455872,2025-12-21T23:49:41.000Z,P2M,Entertainment,24898.04,SUCCESS,36-45,26-35,Karnataka,HDFC,HDFC,Android,WiFi,0,23,Sunday,1


In [0]:
### DEFINE SCHEMA FOR LIVE DATA 


from pyspark.sql.types import *

upi_live_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("merchant_category", StringType(), True),
    StructField("amount_INR", DoubleType(), True),
    StructField("transaction_status", StringType(), True),
    StructField("sender_age_group", StringType(), True),
    StructField("receiver_age_group", StringType(), True),
    StructField("sender_state", StringType(), True),
    StructField("sender_bank", StringType(), True), 
    StructField("receiver_bank", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("network_type", StringType(), True),
    StructField("fraud_flag", IntegerType(), True),
    StructField("hour_of_day", IntegerType(), True),
    StructField("day_of_week", StringType(), True),
    StructField("is_weekend", IntegerType(), True)
])

In [0]:
## LOAD THE DATA WITH SCHEMA

live_df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(upi_live_schema) \
    .load(live_folder)

In [0]:
# REGISTER VIEW FOR SQL ANALYSIS
live_df.createOrReplaceTempView("live_upi_table")

print("Schema applied and data loaded successfully!")
live_df.printSchema()

Schema applied and data loaded successfully!
root
 |-- transaction_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- merchant_category: string (nullable = true)
 |-- amount_INR: double (nullable = true)
 |-- transaction_status: string (nullable = true)
 |-- sender_age_group: string (nullable = true)
 |-- receiver_age_group: string (nullable = true)
 |-- sender_state: string (nullable = true)
 |-- sender_bank: string (nullable = true)
 |-- receiver_bank: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- network_type: string (nullable = true)
 |-- fraud_flag: integer (nullable = true)
 |-- hour_of_day: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- is_weekend: integer (nullable = true)



### ANALYSIS

1. Geographical "Hotspot" Analysis (The "Where")
- Which Indian states have the highest fraud-to-transaction ratio?
- Are certain states hotspots for "High-Value" fraud? (e.g., Maharashtra has ₹50k+ frauds, while Bihar has ₹500 frauds).
- Is fraud migrating? (Compare last 10 minutes vs. previous 10 minutes to see if a fraud ring is moving across states).

2. Temporal "Danger Zone" Analysis (The "When")
- What are the "Danger Hours"? (e.g., Is 3:00 AM the peak for automated fraud?)
- Do weekends (Saturday/Sunday) see more fraud attempts than weekdays?
- Is there a specific "Night-Time" risk? (Are victims more vulnerable when they are likely sleeping?)

3. Technical & Environmental Profiling (The "How")
- Is Public/Private WiFi riskier than 4G/5G?
- Which Device OS is most targeted? (Does the Android-to-iOS fraud ratio match the general market share, or is one more vulnerable?)
- The "Tech-Risk" combo: Which combination (e.g., Android + WiFi) has the absolute highest probability of fraud?

4. Merchant & Spending Analysis (The "Money Trail")
- What are the "Cash-Out" categories? (Are fraudsters spending money on Shopping, Gaming, or Bill Payments to hide the trail?)
- Which merchant categories have the highest financial loss in INR?
- Are "Success Rates" lower for fraud? (Do bank filters catch more fraud in 'Electronics' than in 'Groceries'?)

5. Banking & Ecosystem Risk
- Which sender_bank is the most "under attack" right now?
- Which receiver_bank is being used as a "Mule Bank" to receive stolen funds?
- Is there an "Inter-Bank" failure pattern? (e.g., Transactions from Bank A to Bank B failing more often during fraud attacks).

6. Demographic & Behavioral Analysis (The "Who")
- Which age group is the most frequent victim of fraud? (e.g., 18-25 vs. 60+).
- Is there an "Age Gap" in fraud? (e.g., Do younger senders get defrauded more by older receivers?)
- Does the "Transaction Type" (Bill Pay vs. P2P) change the risk level?

7. Financial Impact & KPIs
- What is the "Total Economic Loss" in real-time?
- What is the "Average Fraud Amount"? (Useful for setting "Auto-Block" thresholds).
- Detection Accuracy: Of all "Flagged" transactions, how many were actually blocked vs. how many were successful?

In [0]:
%sql
SELECT 
    sender_state, 
    COUNT(*) AS total_txns,
    SUM(CAST(fraud_flag AS INT)) AS total_frauds,
    -- The actual Ratio Feature
    ROUND((SUM(CAST(fraud_flag AS INT)) / COUNT(*)) * 100, 2) AS fraud_ratio_percentage,
    -- Adding a Volume Category to filter out small samples
    CASE 
        WHEN COUNT(*) > 500 THEN 'Significant Sample' 
        ELSE 'Small Sample (Unreliable)' 
    END AS data_reliability
FROM live_upi_table
GROUP BY sender_state
ORDER BY fraud_ratio_percentage DESC;

sender_state,total_txns,total_frauds,fraud_ratio_percentage,data_reliability
Uttar Pradesh,7129,363,5.09,Significant Sample
Maharashtra,7066,358,5.07,Significant Sample
Delhi,6941,325,4.68,Significant Sample
Karnataka,6943,311,4.48,Significant Sample
Telangana,6956,310,4.46,Significant Sample


In [0]:
%sql
--"Identify which geographic regions (States) are currently under the highest threat of fraud. By calculating the percentage of fraudulent transactions relative to the total volume in each state, we can prioritize security alerts and bank monitoring for high-risk zones."

SELECT 
  sender_state,
  COUNT(*) AS total_transaction,
  sum(fraud_flag) AS total_fraud_cases,
  ROUND((SUM(fraud_flag) / COUNT(*)) * 100, 2) AS fraud_percentage
FROM live_upi_table
GROUP BY sender_state
ORDER BY fraud_percentage DESC

sender_state,total_transaction,total_fraud_cases,fraud_percentage
Uttar Pradesh,7129,363,5.09
Maharashtra,7066,358,5.07
Delhi,6941,325,4.68
Karnataka,6943,311,4.48
Telangana,6956,310,4.46


In [0]:
from pyspark.sql.functions import *

live_df = 

In [0]:
%sql
--"Identify the peak hour for fraudulent activity. By pinpointing the exact time when fraud volume is highest, the system can trigger 'High Alert' modes for real-time monitoring during those specific windows."

SELECT 
  hour_of_day,
  COUNT(*) AS total_transaction,
  SUM(fraud_flag) AS total_fraud_cases
FROM live_upi_table
GROUP BY hour_of_day
ORDER BY total_fraud_cases DESC

hour_of_day,total_transaction,total_fraud_cases
12,1655,102
0,1660,98
17,1660,86
5,1655,86
22,1655,84
8,1655,83
23,1655,81
20,1655,79
6,1655,78
21,1660,77


In [0]:
%sql
SELECT 
    sender_state, 
    COUNT(*) AS total_transactions,
    SUM(fraud_flag) AS fraud_cases,
    ROUND((SUM(fraud_flag) / COUNT(*)) * 100, 2) AS fraud_percentage
FROM live_upi_table
GROUP BY sender_state
ORDER BY fraud_percentage DESC;

sender_state,total_transactions,fraud_cases,fraud_percentage
Uttar Pradesh,7129,363,5.09
Maharashtra,7066,358,5.07
Delhi,6941,325,4.68
Karnataka,6943,311,4.48
Telangana,6956,310,4.46


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
--This query identifies which states are losing the most money

SELECT 
    sender_state, 
    COUNT(*) AS fraud_txn_count,
    ROUND(SUM(amount_INR), 2) AS total_money_lost,
    ROUND(AVG(amount_INR), 2) AS avg_stolen_per_txn
FROM live_upi_table
WHERE fraud_flag = 1  -- Only look at the fraud cases
GROUP BY sender_state
ORDER BY total_money_lost DESC;

sender_state,fraud_txn_count,total_money_lost,avg_stolen_per_txn
Uttar Pradesh,363,9206058.67,25361.04
Maharashtra,358,8617668.72,24071.7
Delhi,325,8581957.27,26406.02
Telangana,310,7791490.47,25133.84
Karnataka,311,7782887.29,25025.36


Databricks visualization. Run in Databricks to view.

In [0]:
%sql

-- Creating a "Risk Label"
SELECT 
    sender_state,
    SUM(fraud_flag) AS fraud_count,
    CASE 
        WHEN SUM(fraud_flag) > 100 THEN 'CRITICAL ZONE'
        WHEN SUM(fraud_flag) BETWEEN 50 AND 100 THEN 'WARNING ZONE'
        ELSE 'SAFE ZONE'
    END AS state_status
FROM live_upi_table
GROUP BY sender_state
ORDER BY fraud_count DESC;

sender_state,fraud_count,state_status
Uttar Pradesh,363,CRITICAL ZONE
Maharashtra,358,CRITICAL ZONE
Delhi,325,CRITICAL ZONE
Karnataka,311,CRITICAL ZONE
Telangana,310,CRITICAL ZONE


Databricks visualization. Run in Databricks to view.

COMBINIGN BOTH TABLES TO GET COMBINED INSIGHTS 

In [0]:
# Register the 2024 static data as a SQL view
df.createOrReplaceTempView("static_upi_table")
print("✅ Static 2024 data is now ready for SQL analysis!")

✅ Static 2024 data is now ready for SQL analysis!


In [0]:
%sql
-- Calculate fraud rates of 2024

-- This creates the "static_stats" view
CREATE OR REPLACE TEMP VIEW static_stats AS
SELECT 
    sender_state, 
    ROUND(AVG(fraud_flag) * 100, 2) AS fraud_rate_2024
FROM static_upi_table  -- This is your 2024 CSV data
GROUP BY sender_state;

In [0]:
%sql
-- calculate live fraud rates

SELECT 
    sender_state, 
    ROUND(AVG(fraud_flag) * 100, 2) AS fraud_rate_live
FROM live_upi_table
GROUP BY sender_state;

sender_state,fraud_rate_live
Delhi,4.68
Maharashtra,5.07
Telangana,4.46
Karnataka,4.48
Uttar Pradesh,5.09


In [0]:
%sql

-- IDENTIFY HOW FRAUD RATE HAS INCREASED SINCE LAST YEAR BASED ON STATE
SELECT 
    l.sender_state,
    s.fraud_rate_2024,
    l.fraud_rate_live,
    ROUND(l.fraud_rate_live - s.fraud_rate_2024, 2) AS rate_increase
FROM live_stats l
JOIN static_stats s ON l.sender_state = s.sender_state
WHERE (l.fraud_rate_live - s.fraud_rate_2024) > 0
ORDER BY rate_increase DESC;

sender_state,fraud_rate_2024,fraud_rate_live,rate_increase
Uttar Pradesh,0.17,5.09,4.92
Maharashtra,0.19,5.07,4.88
Delhi,0.2,4.68,4.48
Telangana,0.17,4.46,4.29
Karnataka,0.23,4.48,4.25


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

#### VISUALISATIONS

In [0]:
# 1. Join the SQL views and save as a Spark DataFrame
comparison_df = spark.sql("""
    SELECT 
        l.sender_state,
        s.fraud_rate_2024,
        l.fraud_rate_live,
        ROUND(l.fraud_rate_live - s.fraud_rate_2024, 2) AS rate_increase
    FROM live_stats l
    JOIN static_stats s ON l.sender_state = s.sender_state
    ORDER BY rate_increase DESC
""")

# 2. Show the data to confirm it works
display(comparison_df)

sender_state,fraud_rate_2024,fraud_rate_live,rate_increase
Uttar Pradesh,0.17,5.09,4.92
Maharashtra,0.19,5.07,4.88
Delhi,0.2,4.68,4.48
Telangana,0.17,4.46,4.29
Karnataka,0.23,4.48,4.25


####### "SQL is highly optimized in Databricks. For intermediate tasks like grouping and filtering millions of live records, the Spark SQL engine is much faster and allows for immediate visualization using the built-in charts."
- 