In [None]:
# %pip install gdown
# %pip install pyspark
# %pip install matplotlib
# %pip install numpy
# %pip install pandas
# %pip install seaborn
# %pip install scikit-learn
# %pip install scipy
# %pip install statsmodels

In [14]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Fraud Detection Analysis") \
    .getOrCreate()

In [15]:
import os
import gdown

file_name = "Final Transaction.csv"

if not os.path.exists(file_name):
    url = "https://drive.google.com/file/d/1qJtOG4ReSYyaHn0xbitvXqESQS94OkqP/view?usp=drive_link"
    gdown.download(url, file_name, quiet=True, fuzzy=True)

df = spark.read.csv(file_name, header=True, inferSchema=True)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- TRANSACTION_ID: integer (nullable = true)
 |-- TX_DATETIME: timestamp (nullable = true)
 |-- CUSTOMER_ID: integer (nullable = true)
 |-- TERMINAL_ID: integer (nullable = true)
 |-- TX_AMOUNT: double (nullable = true)
 |-- TX_TIME_SECONDS: integer (nullable = true)
 |-- TX_TIME_DAYS: integer (nullable = true)
 |-- TX_FRAUD: integer (nullable = true)
 |-- TX_FRAUD_SCENARIO: integer (nullable = true)



In [16]:
df.show(10)

+---+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|_c0|TRANSACTION_ID|        TX_DATETIME|CUSTOMER_ID|TERMINAL_ID|TX_AMOUNT|TX_TIME_SECONDS|TX_TIME_DAYS|TX_FRAUD|TX_FRAUD_SCENARIO|
+---+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|  0|             0|2023-01-01 00:00:31|        596|       3156|   533.07|             31|           0|       0|                0|
|  1|             1|2023-01-01 00:02:10|       4961|       3412|   808.56|            130|           0|       0|                0|
|  2|             2|2023-01-01 00:07:56|          2|       1365|  1442.94|            476|           0|       1|                1|
|  3|             3|2023-01-01 00:09:29|       4128|       8737|   620.65|            569|           0|       0|                0|
|  4|             4|2023-01-01 00:10:34|        927|       9906|   490.66|         

26/01/01 13:47:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , TRANSACTION_ID, TX_DATETIME, CUSTOMER_ID, TERMINAL_ID, TX_AMOUNT, TX_TIME_SECONDS, TX_TIME_DAYS, TX_FRAUD, TX_FRAUD_SCENARIO
 Schema: _c0, TRANSACTION_ID, TX_DATETIME, CUSTOMER_ID, TERMINAL_ID, TX_AMOUNT, TX_TIME_SECONDS, TX_TIME_DAYS, TX_FRAUD, TX_FRAUD_SCENARIO
Expected: _c0 but found: 
CSV file: file:///Users/bl-sw/code/Fraud-Detection-Kaggle/Final%20Transaction.csv


In [43]:
remove_cols = ['_c0', 'TRANSACTION_ID', 'CUSTOMER_ID', 'TERMINAL_ID', 'TX_FRAUD_SCENARIO']
df = df.drop(*remove_cols)
df.show(5)

+-------------------+---------+---------------+------------+--------+
|        TX_DATETIME|TX_AMOUNT|TX_TIME_SECONDS|TX_TIME_DAYS|TX_FRAUD|
+-------------------+---------+---------------+------------+--------+
|2023-01-01 00:00:31|   533.07|             31|           0|       0|
|2023-01-01 00:02:10|   808.56|            130|           0|       0|
|2023-01-01 00:07:56|  1442.94|            476|           0|       1|
|2023-01-01 00:09:29|   620.65|            569|           0|       0|
|2023-01-01 00:10:34|   490.66|            634|           0|       0|
+-------------------+---------+---------------+------------+--------+
only showing top 5 rows


# Exploratory Data Analysis

## 1. Dataset Overview


In [44]:
print(df.describe())
df.printSchema()

DataFrame[summary: string, TX_AMOUNT: string, TX_TIME_SECONDS: string, TX_TIME_DAYS: string, TX_FRAUD: string]
root
 |-- TX_DATETIME: timestamp (nullable = true)
 |-- TX_AMOUNT: double (nullable = true)
 |-- TX_TIME_SECONDS: integer (nullable = true)
 |-- TX_TIME_DAYS: integer (nullable = true)
 |-- TX_FRAUD: integer (nullable = true)



In [45]:
numerical_cols = ['TX_AMOUNT']
categorical_cols = ['']
target_cols = ['TX_FRAUD']
timestamp_cols = ['TX_DATETIME']
relative_time_cols = [
    "TX_TIME_SECONDS",
    "TX_TIME_DAYS"
]

## 2. Missing Values Check


In [46]:
from pyspark.sql.functions import col, isnan, count, when
from pyspark.sql.types import DoubleType, FloatType

exprs = []

for field in df.schema.fields:
    c = field.name
    dtype = field.dataType

    # Numeric → check NULL + NaN
    if isinstance(dtype, (DoubleType, FloatType)):
        exprs.append(
            count(
                when(col(c).isNull() | isnan(col(c)), c)
            ).alias(c)
        )
    # Non-numeric → chỉ check NULL
    else:
        exprs.append(
            count(
                when(col(c).isNull(), c)
            ).alias(c)
        )

missing_df = df.select(exprs)
missing_df.show(truncate=False)


+-----------+---------+---------------+------------+--------+
|TX_DATETIME|TX_AMOUNT|TX_TIME_SECONDS|TX_TIME_DAYS|TX_FRAUD|
+-----------+---------+---------------+------------+--------+
|0          |0        |0              |0           |0       |
+-----------+---------+---------------+------------+--------+



## 3. Descriptive Statistics


In [48]:
df.select(
    *numerical_cols,
    *target_cols,
    *timestamp_cols,
    *relative_time_cols
).describe().show()


+-------+------------------+-------------------+-----------------+-----------------+
|summary|         TX_AMOUNT|           TX_FRAUD|  TX_TIME_SECONDS|     TX_TIME_DAYS|
+-------+------------------+-------------------+-----------------+-----------------+
|  count|           1754155|            1754155|          1754155|          1754155|
|   mean|  539.681997280744| 0.1345200395632085|7903233.708571933|90.97260276315377|
| stddev|1179.7105939984608|0.34121029423187604|4565172.383899659|52.83709109961424|
|    min|               0.0|                  0|               31|                0|
|    max|          647837.5|                  1|         15811197|              182|
+-------+------------------+-------------------+-----------------+-----------------+



## 4. Target Variable Distribution (TX_FRAUD)


In [52]:
total = df.count()

fraud_dist = (
    df.groupBy("TX_FRAUD")
      .count()
      .withColumn("ratio", col("count") / total * 100)
      .orderBy("TX_FRAUD")
)

fraud_dist.show(truncate=False)


+--------+-------+-----------------+
|TX_FRAUD|count  |ratio            |
+--------+-------+-----------------+
|0       |1518186|86.54799604367915|
|1       |235969 |13.45200395632085|
+--------+-------+-----------------+



## 5. Important Features Analysis

### 5.1. TX_AMOUNT Analysis (Transaction Amount)


In [None]:
from pyspark.sql.functions import col, mean, stddev, min as spark_min, max as spark_max, count

print("=" * 60)
print("TX_AMOUNT ANALYSIS BY TX_FRAUD")
print("=" * 60)

amount_stats = (
    df.groupBy("TX_FRAUD")
      .agg(
          mean("TX_AMOUNT").alias("mean_amount"),
          stddev("TX_AMOUNT").alias("stddev_amount"),
          spark_min("TX_AMOUNT").alias("min_amount"),
          spark_max("TX_AMOUNT").alias("max_amount"),
          count("*").alias("count")
      )
      .orderBy("TX_FRAUD")
)


amount_stats.show(truncate=False)

# Display detailed statistics
print("\nDetailed statistics:")
for row in amount_stats.collect():
    fraud_label = "Fraud" if row['TX_FRAUD'] == 1 else "Non-Fraud"
    print(f"\n{fraud_label}:")
    print(f"  Mean:    {row['mean_amount']:.2f}")
    print(f"  StdDev:  {row['stddev_amount']:.2f}")
    print(f"  Min:     {row['min_amount']:.2f}")
    print(f"  Max:     {row['max_amount']:.2f}")
    print(f"  Count:   {row['count']:,}")

import matplotlib.pyplot as plt
import numpy as np

plt.figure(figsize=(8,5))

for label, name in [(0, "Non-Fraud"), (1, "Fraud")]:
    subset = sample_df[sample_df["TX_FRAUD"] == label]["TX_AMOUNT"]
    plt.hist(
        np.log1p(subset),
        bins=50,
        alpha=0.6,
        label=name
    )

plt.xlabel("log(1 + TX_AMOUNT)")
plt.ylabel("Frequency")
plt.title("TX_AMOUNT Distribution (Log Scale)")
plt.legend()
plt.show()


TX_AMOUNT ANALYSIS BY TX_FRAUD
+--------+------------------+------------------+----------+----------+-------+
|TX_FRAUD|mean_amount       |stddev_amount     |min_amount|max_amount|count  |
+--------+------------------+------------------+----------+----------+-------+
|0       |393.90920306207437|275.34467418810556|0.0       |1000.0    |1518186|
|1       |1477.5603430111585|2973.50249720581  |0.2       |647837.5  |235969 |
+--------+------------------+------------------+----------+----------+-------+


Detailed statistics:

Non-Fraud:
  Mean:    393.91
  StdDev:  275.34
  Min:     0.00
  Max:     1000.00
  Count:   1,518,186

Fraud:
  Mean:    1477.56
  StdDev:  2973.50
  Min:     0.20
  Max:     647837.50
  Count:   235,969


### 5.2. Analysis of Unique Customers and Terminals


In [31]:
from pyspark.sql.functions import countDistinct, col

print("=" * 60)
print("ANALYSIS OF UNIQUE CUSTOMERS AND TERMINALS")
print("=" * 60)

# Recalculate total_rows to ensure we have the value
total_rows = df.count()

# Number of unique customers and terminals
unique_customers = df.select(countDistinct("CUSTOMER_ID")).collect()[0][0]
unique_terminals = df.select(countDistinct("TERMINAL_ID")).collect()[0][0]

print(f"\nNumber of unique customers: {unique_customers:,}")
print(f"Number of unique terminals: {unique_terminals:,}")

# Average number of transactions per customer
avg_tx_per_customer = total_rows / unique_customers if unique_customers > 0 else 0
print(f"Average number of transactions per customer: {avg_tx_per_customer:.2f}")

# Average number of transactions per terminal
avg_tx_per_terminal = total_rows / unique_terminals if unique_terminals > 0 else 0
print(f"Average number of transactions per terminal: {avg_tx_per_terminal:.2f}")

# Top 10 customers with most transactions
print("\nTop 10 customers with most transactions:")
top_customers = df.groupBy("CUSTOMER_ID").count().orderBy(col("count").desc()).limit(10)
top_customers.show()

# Top 10 terminals with most transactions
print("\nTop 10 terminals with most transactions:")
top_terminals = df.groupBy("TERMINAL_ID").count().orderBy(col("count").desc()).limit(10)
top_terminals.show()


ANALYSIS OF UNIQUE CUSTOMERS AND TERMINALS

Number of unique customers: 4,990
Number of unique terminals: 10,000
Average number of transactions per customer: 351.53
Average number of transactions per terminal: 175.42

Top 10 customers with most transactions:
+-----------+-----+
|CUSTOMER_ID|count|
+-----------+-----+
|        382|  767|
|       3864|  762|
|       2891|  761|
|        775|  754|
|       3651|  752|
|       1411|  752|
|        149|  749|
|       2932|  747|
|        732|  746|
|        379|  743|
+-----------+-----+


Top 10 terminals with most transactions:
+-----------+-----+
|TERMINAL_ID|count|
+-----------+-----+
|       4018|  376|
|        692|  372|
|       5295|  368|
|       8130|  360|
|        872|  356|
|       8670|  354|
|       7884|  346|
|       8606|  338|
|       2475|  334|
|       7798|  333|
+-----------+-----+



### 5.3. Time-based Analysis


In [32]:
from pyspark.sql.functions import to_date, hour, dayofweek, count as spark_count, col, when, min as spark_min, max as spark_max

print("=" * 60)
print("TIME-BASED ANALYSIS")
print("=" * 60)

# Convert TX_DATETIME to date and extract time features
df_time = df.withColumn("TX_DATE", to_date("TX_DATETIME")) \
            .withColumn("TX_HOUR", hour("TX_DATETIME")) \
            .withColumn("TX_DAYOFWEEK", dayofweek("TX_DATETIME"))

# Time range of the dataset
date_range = df_time.select(
    spark_min("TX_DATE").alias("min_date"),
    spark_max("TX_DATE").alias("max_date")
).collect()[0]

print(f"\nTime range:")
print(f"  Start date: {date_range['min_date']}")
print(f"  End date: {date_range['max_date']}")

# Transaction distribution by day
print("\nTop 10 days with most transactions:")
daily_tx = df_time.groupBy("TX_DATE").agg(
    spark_count("*").alias("total_tx"),
    spark_count(when(col("TX_FRAUD") == 1, 1)).alias("fraud_tx")
).orderBy(col("total_tx").desc()).limit(10)
daily_tx.show(truncate=False)

# Transaction distribution by hour
print("\nTransaction distribution by hour of day:")
hourly_tx = df_time.groupBy("TX_HOUR").agg(
    spark_count("*").alias("total_tx"),
    spark_count(when(col("TX_FRAUD") == 1, 1)).alias("fraud_tx")
).orderBy("TX_HOUR")
hourly_tx.show(24, truncate=False)

# Transaction distribution by day of week
print("\nTransaction distribution by day of week:")
# 1 = Sunday, 2 = Monday, ..., 7 = Saturday
day_names = {1: "Sunday", 2: "Monday", 3: "Tuesday", 4: "Wednesday", 
             5: "Thursday", 6: "Friday", 7: "Saturday"}
dow_tx = df_time.groupBy("TX_DAYOFWEEK").agg(
    spark_count("*").alias("total_tx"),
    spark_count(when(col("TX_FRAUD") == 1, 1)).alias("fraud_tx")
).orderBy("TX_DAYOFWEEK")

print("Day        | Total Transactions | Fraud Transactions")
print("-" * 50)
for row in dow_tx.collect():
    day_name = day_names.get(row['TX_DAYOFWEEK'], f"Day {row['TX_DAYOFWEEK']}")
    print(f"{day_name:10s} | {row['total_tx']:>15,} | {row['fraud_tx']:>15,}")


TIME-BASED ANALYSIS

Time range:
  Start date: 2023-01-01
  End date: 2023-07-02

Top 10 days with most transactions:
+----------+--------+--------+
|TX_DATE   |total_tx|fraud_tx|
+----------+--------+--------+
|2023-04-01|9789    |1295    |
|2023-04-27|9787    |1344    |
|2023-04-06|9784    |1313    |
|2023-02-02|9771    |1414    |
|2023-02-15|9767    |1319    |
|2023-06-11|9762    |1367    |
|2023-01-30|9760    |1244    |
|2023-01-13|9753    |1349    |
|2023-02-09|9749    |1312    |
|2023-01-03|9747    |1309    |
+----------+--------+--------+


Transaction distribution by hour of day:
+-------+--------+--------+
|TX_HOUR|total_tx|fraud_tx|
+-------+--------+--------+
|0      |15137   |2031    |
|1      |21861   |2968    |
|2      |30140   |4051    |
|3      |40380   |5306    |
|4      |52403   |6887    |
|5      |65168   |8811    |
|6      |79880   |10925   |
|7      |94030   |12795   |
|8      |106617  |14411   |
|9      |116864  |15573   |
|10     |125804  |16793   |
|11     |1289

### 5.4. TX_FRAUD_SCENARIO Analysis


In [33]:
from pyspark.sql.functions import count as spark_count, mean, stddev, min as spark_min, max as spark_max

print("=" * 60)
print("TX_FRAUD_SCENARIO ANALYSIS")
print("=" * 60)

# Recalculate total_rows to ensure we have the value
total_rows = df.count()

# Distribution of fraud scenario types
scenario_dist = df.groupBy("TX_FRAUD_SCENARIO").agg(
    spark_count("*").alias("count")
).orderBy("TX_FRAUD_SCENARIO")

scenario_dist.show()

# Scenario details
print("\nFraud scenario details:")
for row in scenario_dist.collect():
    scenario = row['TX_FRAUD_SCENARIO']
    count = row['count']
    percentage = (count / total_rows) * 100
    scenario_name = "Non-Fraud" if scenario == 0 else f"Fraud Scenario {scenario}"
    print(f"  {scenario_name:20s}: {count:>12,} ({percentage:>6.2f}%)")

# Scenario analysis by TX_AMOUNT
print("\nTX_AMOUNT statistics by scenario:")
scenario_amount_stats = df.groupBy("TX_FRAUD_SCENARIO").agg(
    mean("TX_AMOUNT").alias("mean_amount"),
    stddev("TX_AMOUNT").alias("stddev_amount"),
    spark_min("TX_AMOUNT").alias("min_amount"),
    spark_max("TX_AMOUNT").alias("max_amount")
).orderBy("TX_FRAUD_SCENARIO")

scenario_amount_stats.show(truncate=False)


TX_FRAUD_SCENARIO ANALYSIS
+-----------------+-------+
|TX_FRAUD_SCENARIO|  count|
+-----------------+-------+
|                0|1518186|
|                1| 222261|
|                2|   9077|
|                3|   4631|
+-----------------+-------+


Fraud scenario details:
  Non-Fraud           :    1,518,186 ( 86.55%)
  Fraud Scenario 1    :      222,261 ( 12.67%)
  Fraud Scenario 2    :        9,077 (  0.52%)
  Fraud Scenario 3    :        4,631 (  0.26%)

TX_AMOUNT statistics by scenario:
+-----------------+------------------+------------------+----------+----------+
|TX_FRAUD_SCENARIO|mean_amount       |stddev_amount     |min_amount|max_amount|
+-----------------+------------------+------------------+----------+----------+
|0                |393.90920306207437|275.34467418810556|0.0       |1000.0    |
|1                |1281.7617225694128|242.2168450569149 |1000.01   |3094.88   |
|2                |515.0349080092544 |403.09473553799927|0.2       |2346.56   |
|3                |1

## 6. Summary and Insights


In [34]:
from pyspark.sql.functions import col, mean, countDistinct

print("=" * 60)
print("SUMMARY AND INSIGHTS")
print("=" * 60)

# Recalculate values to ensure we have data
total_rows = df.count()
total_cols = len(df.columns)
unique_customers = df.select(countDistinct("CUSTOMER_ID")).collect()[0][0]
unique_terminals = df.select(countDistinct("TERMINAL_ID")).collect()[0][0]

print("\n1. DATASET STRUCTURE:")
print(f"   - Total number of transactions: {total_rows:,}")
print(f"   - Number of features: {total_cols}")
print(f"   - Number of unique customers: {unique_customers:,}")
print(f"   - Number of unique terminals: {unique_terminals:,}")

print("\n2. TARGET DISTRIBUTION:")
fraud_info = df.filter(col("TX_FRAUD") == 1).count()
non_fraud_info = df.filter(col("TX_FRAUD") == 0).count()
fraud_pct = (fraud_info / total_rows) * 100
print(f"   - Fraud transactions: {fraud_info:,} ({fraud_pct:.2f}%)")
print(f"   - Non-Fraud transactions: {non_fraud_info:,} ({100-fraud_pct:.2f}%)")
if fraud_info > 0:
    print(f"   - Class imbalance ratio: {(non_fraud_info/fraud_info):.2f}:1")

print("\n3. KEY CHARACTERISTICS:")
# Calculate some important metrics
avg_amount = df.select(mean("TX_AMOUNT")).collect()[0][0]
avg_amount_fraud = df.filter(col("TX_FRAUD") == 1).select(mean("TX_AMOUNT")).collect()[0][0]
avg_amount_non_fraud = df.filter(col("TX_FRAUD") == 0).select(mean("TX_AMOUNT")).collect()[0][0]

print(f"   - Average transaction amount: {avg_amount:.2f}")
if avg_amount_fraud:
    print(f"   - Average amount (Fraud): {avg_amount_fraud:.2f}")
if avg_amount_non_fraud:
    print(f"   - Average amount (Non-Fraud): {avg_amount_non_fraud:.2f}")

print("\n4. RECOMMENDATIONS:")
print("   - Dataset has severe class imbalance, needs to be handled when training model")
print("   - Need deeper analysis on relationship between TX_AMOUNT and TX_FRAUD")
print("   - Should create additional features from TX_DATETIME (hour, day of week, etc.)")
print("   - Can create aggregate features for each CUSTOMER_ID and TERMINAL_ID")


SUMMARY AND INSIGHTS

1. DATASET STRUCTURE:
   - Total number of transactions: 1,754,155
   - Number of features: 8
   - Number of unique customers: 4,990
   - Number of unique terminals: 10,000

2. TARGET DISTRIBUTION:
   - Fraud transactions: 235,969 (13.45%)
   - Non-Fraud transactions: 1,518,186 (86.55%)
   - Class imbalance ratio: 6.43:1

3. KEY CHARACTERISTICS:
   - Average transaction amount: 539.68
   - Average amount (Fraud): 1477.56
   - Average amount (Non-Fraud): 393.91

4. RECOMMENDATIONS:
   - Dataset has severe class imbalance, needs to be handled when training model
   - Need deeper analysis on relationship between TX_AMOUNT and TX_FRAUD
   - Should create additional features from TX_DATETIME (hour, day of week, etc.)
   - Can create aggregate features for each CUSTOMER_ID and TERMINAL_ID
