Downloading Kaggle dataset and store it in this repo

In [None]:
# Install in Jupyter cell (inside Python 3.10 kernel)
# %pip install pyspark pandas kagglehub[pandas-datasets] matplotlib PyArrow plotly nbformat>=4.2.0 pgeocode geopy geopandas

# If IDE still showing error lines due to unresolved imports, most likely linting errors only. 
# Simple fix is to CTRL+Shift+P and find Python: Select Interpreter. You may select the proper Python version
# don't forget to install nbformat>=4.2.0 to avoid bunch of useless warning when calling plotly 

In [2]:
#if not yet install, please run: pip install kagglehub[pandas-datasets]

import kagglehub
import shutil
import os

# Download latest version
path = kagglehub.dataset_download("jinquan/cc-sample-data")
print("Path to dataset files:", path)

target_path = "./datasets/cc-sample-data"

os.makedirs(target_path, exist_ok=True)
for filename in os.listdir(path):
    full_file_name = os.path.join(path, filename)
    if os.path.isfile(full_file_name):
        shutil.copy(full_file_name, target_path)

print("Files moved to:", target_path)

  from .autonotebook import tqdm as notebook_tqdm


Path to dataset files: C:\Users\uadrian\.cache\kagglehub\datasets\jinquan\cc-sample-data\versions\1
Files moved to: ./datasets/cc-sample-data


Setting Up Spark

In [3]:
import sys
print(sys.executable)

c:\Users\uadrian\AppData\Local\Programs\Python\Python310\python.exe


In [4]:
import os
os.environ["HADOOP_HOME"] = "C:/hadoop"
os.environ["PYSPARK_PYTHON"] = r"c:\Users\uadrian\AppData\Local\Programs\Python\Python310\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"c:\Users\uadrian\AppData\Local\Programs\Python\Python310\python.exe"

In [5]:
import pyspark
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CreditCardFraud") \
    .master("local[*]") \
    .config("spark.sql.ansi.enabled", "false") \
    .getOrCreate()

# fun fact, after u run this script, try open your browser with this url: http://localhost:4040/

Dynamically get the downloaded file path and store in variable

In [6]:
json_files = [f for f in os.listdir(target_path) if f.endswith(".json")]
if not json_files:
    raise FileNotFoundError("No JSON file found in downloaded dataset")

json_path = os.path.join(target_path, json_files[0])  # Use first JSON file found
print("Reading JSON from:", json_path)

Reading JSON from: ./datasets/cc-sample-data\cc_sample_transaction.json


Read json file using Spark

In [7]:
from pyspark.sql.functions import col

df_raw = spark.read.json(json_path)  # Replace with actual path to JSON file
df_raw.printSchema()
df_raw.show(5)


root
 |-- Unnamed: 0: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- category: string (nullable = true)
 |-- cc_bic: string (nullable = true)
 |-- cc_num: string (nullable = true)
 |-- is_fraud: string (nullable = true)
 |-- merch_eff_time: string (nullable = true)
 |-- merch_last_update_time: string (nullable = true)
 |-- merch_lat: string (nullable = true)
 |-- merch_long: string (nullable = true)
 |-- merch_zipcode: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- personal_detail: string (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- trans_num: string (nullable = true)

+----------+------+-------------+-----------+----------------+--------+----------------+----------------------+------------------+-----------+-------------+--------------------+--------------------+---------------------+--------------------+
|Unnamed: 0|   amt|     category|     cc_bic|          cc_num|is_fraud|  merch_eff_time|merch_last_update_time

JSON Flattening

In [8]:
from pyspark.sql.functions import col

df = df_raw.select(
    col("Unnamed: 0").cast("int"),
    col("amt").cast("double"),
    col("category"),
    col("cc_bic"),
    col("cc_num").cast("long"),
    col("is_fraud").cast("int"),
    col("merch_eff_time").cast("bigint"),
    col("merch_last_update_time").cast("bigint"),
    col("merch_lat").cast("double"),
    col("merch_long").cast("double"),
    col("merch_zipcode"),
    col("merchant"),
    col("personal_detail"),
    col("trans_date_trans_time"),
    col("trans_num")
)


In [9]:
# df_raw.select("personal_detail").show(truncate=False)
# after viewing a sample of the available dataset, i need redefine my schema to flatten all json columns: address, personal_detail

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType

# Define schema for nested address
address_schema = StructType() \
    .add("street", StringType()) \
    .add("city", StringType()) \
    .add("state", StringType()) \
    .add("zip", StringType())

# Define schema for personal_detail
personal_schema = StructType() \
    .add("person_name", StringType()) \
    .add("dob", StringType()) \
    .add("gender", StringType()) \
    .add("job", StringType()) \
    .add("address", StringType()) \
    .add("lat", StringType()) \
    .add("long", StringType()) \
    .add("city_pop", StringType())
# # note: address still a string, we'll parse next

# First parse the personal_detail JSON
df = df.withColumn("personal_json", from_json(col("personal_detail"), personal_schema))

# Now parse the stringified address JSON
df = df.withColumn("address_json", from_json(col("personal_json.address"), address_schema))

# Flatten personal fields
for field in ["person_name", "dob", "gender", "job", "lat", "long", "city_pop"]:
    df = df.withColumn(field, col("personal_json")[field])

# Flatten address fields
for field in ["street", "city", "state", "zip"]:
    df = df.withColumn(field, col("address_json")[field])

df = df.drop("personal_detail", "personal_json", "address_json")


Successfully Flatten the df, but not handling First/Last Name yet

In [10]:
df.show()

+----------+------+-------------+-----------+-------------------+--------+----------------+----------------------+------------------+------------------+-------------+--------------------+---------------------+--------------------+--------------------+----------+------+--------------------+-------+------------------+--------+--------------------+--------------------+-----+-----+
|Unnamed: 0|   amt|     category|     cc_bic|             cc_num|is_fraud|  merch_eff_time|merch_last_update_time|         merch_lat|        merch_long|merch_zipcode|            merchant|trans_date_trans_time|           trans_num|         person_name|       dob|gender|                 job|    lat|              long|city_pop|              street|                city|state|  zip|
+----------+------+-------------+-----------+-------------------+--------+----------------+----------------------+------------------+------------------+-------------+--------------------+---------------------+--------------------+--------

Let's handle the First/Last Name

In [11]:
df.select("person_name").show(truncate=False) # to view more rows, replace with show(100,truncate=False). to show all (CAUTION), replace 100 with df.count()

# OR if u fancy Spark SQL
# df.createOrReplaceTempView("transactions")
# spark.sql("SELECT person_name FROM transactions").show(truncate=False)

+-------------------------+
|person_name              |
+-------------------------+
|Jennifer,Banks,eeeee     |
|Stephanie,Gill,eeeee     |
|Edward@Sanchez           |
|Jeremy/White, !          |
|Tyler@Garcia             |
|Jennifer,Conner,eeeee    |
|Kelsey, , Richards NOOOO |
|Steven, Williams         |
|Heather, , Chase NOOOO   |
|Melissa@Aguilar          |
|Eddie|Mendez!!!          |
|Theresa@Blackwell        |
|Charles|Robles!!!        |
|Jack@Hill                |
|Christopher@Castaneda    |
|Ronald@Carson            |
|Lisa, Mendez             |
|Nathan,Thomas,eeeee      |
|Justin, Gay              |
|Kenneth, , Robinson NOOOO|
+-------------------------+
only showing top 20 rows


converted to pandas dataframe because i love to use the built-in Data Wrangler feature to view all records

In [12]:
# converted to pandas dataframe because i love to use the built-in Data Wrangler feature to view all records
df_pd = df.select("person_name").toPandas()
df_pd

Unnamed: 0,person_name
0,"Jennifer,Banks,eeeee"
1,"Stephanie,Gill,eeeee"
2,Edward@Sanchez
3,"Jeremy/White, !"
4,Tyler@Garcia
...,...
1296670,Erik@Patterson
1296671,"Jeffrey, , White NOOOO"
1296672,"Christopher/Castaneda, !"
1296673,"Joseph,Murray,eeeee"


Based on sample data, we can identify mixed-style of concatenation between First & Last name (including some noise at the suffix)

So, let's handle it by using Regular Expressions to tackle them

In [13]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, StringType
import re

# Define a function to clean and split names
def clean_split_name(name):
    if not name:
        return ("", "")
    
    # Remove noisy tokens (e.g. "NOOOO", "eeeee", "!!!", extra spaces)
    name = re.sub(r"\b(?:NO+|e{3,}|!+)\b", "", name, flags=re.IGNORECASE)
    name = re.sub(r"[^\w\s,|/@]", "", name)  # Remove unnecessary punctuation
    name = re.sub(r"\s+", " ", name).strip()  # Normalize spaces

    # Split using common delimiters
    tokens = re.split(r"[,|/@]+", name)
    tokens = [t.strip() for t in tokens if t.strip()]

    # Return first and last if available
    if len(tokens) >= 2:
        return (tokens[0], tokens[1])
    elif len(tokens) == 1:
        return (tokens[0], "")
    else:
        return ("", "")

# Define a schema for the returned struct
name_schema = StructType([
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True)
])

# Register as UDF
split_name_udf = udf(clean_split_name, name_schema)

# Apply transformation
df_cleaned = df.withColumn("name_struct", split_name_udf(col("person_name"))) \
               .withColumn("first_name", col("name_struct.first_name")) \
               .withColumn("last_name", col("name_struct.last_name")) \
               .drop("name_struct")

# View result
df_cleaned.select("person_name", "first_name", "last_name").show(truncate=False)


+-------------------------+-----------+---------+
|person_name              |first_name |last_name|
+-------------------------+-----------+---------+
|Jennifer,Banks,eeeee     |Jennifer   |Banks    |
|Stephanie,Gill,eeeee     |Stephanie  |Gill     |
|Edward@Sanchez           |Edward     |Sanchez  |
|Jeremy/White, !          |Jeremy     |White    |
|Tyler@Garcia             |Tyler      |Garcia   |
|Jennifer,Conner,eeeee    |Jennifer   |Conner   |
|Kelsey, , Richards NOOOO |Kelsey     |Richards |
|Steven, Williams         |Steven     |Williams |
|Heather, , Chase NOOOO   |Heather    |Chase    |
|Melissa@Aguilar          |Melissa    |Aguilar  |
|Eddie|Mendez!!!          |Eddie      |Mendez   |
|Theresa@Blackwell        |Theresa    |Blackwell|
|Charles|Robles!!!        |Charles    |Robles   |
|Jack@Hill                |Jack       |Hill     |
|Christopher@Castaneda    |Christopher|Castaneda|
|Ronald@Carson            |Ronald     |Carson   |
|Lisa, Mendez             |Lisa       |Mendez   |


Let's do some Sanity Check, samples 0.1% of the data to check whether any issues can be spotted

In [14]:
df_cleaned.select("person_name", "first_name", "last_name").sample(fraction=0.001).show(50, truncate=False)

+--------------------------+----------+---------+
|person_name               |first_name|last_name|
+--------------------------+----------+---------+
|Vincent/Waller, !         |Vincent   |Waller   |
|Terrance|Mckinney!!!      |Terrance  |Mckinney |
|Phillip, Delacruz         |Phillip   |Delacruz |
|James@Baldwin             |James     |Baldwin  |
|Frank@Key                 |Frank     |Key      |
|April@Johnson             |April     |Johnson  |
|Lawrence, Davis           |Lawrence  |Davis    |
|Jennifer@Black            |Jennifer  |Black    |
|Nathan, , Mendoza NOOOO   |Nathan    |Mendoza  |
|Sharon,Johnson,eeeee      |Sharon    |Johnson  |
|Steven, Brooks            |Steven    |Brooks   |
|Joseph/Morgan, !          |Joseph    |Morgan   |
|Jackie,Davis,eeeee        |Jackie    |Davis    |
|Amanda,Smith,eeeee        |Amanda    |Smith    |
|Gregory, , Graham NOOOO   |Gregory   |Graham   |
|Benjamin, Kim             |Benjamin  |Kim      |
|Veronica@Burton           |Veronica  |Burton   |


Just in case, for fun, let's view whether there are some rows with missing first/last names

In [15]:
# Checking whether all records have proper first name last name or any missing first/last names
from pyspark.sql.functions import col, count, when

df_cleaned.select([
    count(when(col("first_name").isNull() | (col("first_name") == ""), "first_name")).alias("null_or_empty_first_name"),
    count(when(col("last_name").isNull() | (col("last_name") == ""), "last_name")).alias("null_or_empty_last_name")
]).show()


+------------------------+-----------------------+
|null_or_empty_first_name|null_or_empty_last_name|
+------------------------+-----------------------+
|                       0|                      0|
+------------------------+-----------------------+



All seems good, Assessment check:

✅JSON Flattening
✅Name Derivation

In [16]:
df_cleaned.show()

+----------+------+-------------+-----------+-------------------+--------+----------------+----------------------+------------------+------------------+-------------+--------------------+---------------------+--------------------+--------------------+----------+------+--------------------+-------+------------------+--------+--------------------+--------------------+-----+-----+-----------+---------+
|Unnamed: 0|   amt|     category|     cc_bic|             cc_num|is_fraud|  merch_eff_time|merch_last_update_time|         merch_lat|        merch_long|merch_zipcode|            merchant|trans_date_trans_time|           trans_num|         person_name|       dob|gender|                 job|    lat|              long|city_pop|              street|                city|state|  zip| first_name|last_name|
+----------+------+-------------+-----------+-------------------+--------+----------------+----------------------+------------------+------------------+-------------+--------------------+-------

Now let's handle for all date and timestamps columns

In [None]:
# when i run this script and set the TZ to UTC+0 (Africa/Abidjan), i noticed gaps in between the 3 date time columns
# logical transaction processing flow
# 1. merch_eff_time           → When merchant/service became effective 
# 2. trans_date_trans_time    → When the actual transaction happened (however, the original format doesn't provide in epoch, hence no milliseconds can be recorded)
# 3. merch_last_update_time   → When merchant details were last updated

# THE GAPS #1 - Year Gap too far apart; by plus 7 years minus 8 hours
# trans_date_trans_time_ts_formatted    2019-01-01 00:00:18.000000 +08:00
# merch_eff_time_ts_formatted           2012-01-01 08:00:18.798532 +08:00 (original value - 1325376018798532)
# merch_last_update_time_ts_formatted   2012-01-01 08:00:18.666000 +08:00 (original value - 1325376018666)

# THE GAPS #2 - last update time became 1974, TZ = 7:30
# trans_date_trans_time_ts_formatted    2019-01-01 00:00:44.000000 +08:00
# merch_eff_time_ts_formatted           2012-01-01 08:00:44.867960 +08:00 (original value - 1325376044867960)
# merch_last_update_time_ts_formatted   1974-03-15 07:30:04.479000 +07:30 (original value - 132537604479)

# THE GAPS #2.1 - epoch value is short by 1 digit, this is also noticed in col merch_eff_time
# merch_last_update_time_ts_formatted   2012-01-01 08:00:18.666000 +08:00 (original value - 1325376018666)
# merch_last_update_time_ts_formatted   1974-03-15 07:30:04.479000 +07:30 (original value - 132537604479)

# Assumptions for Gap #1 - may be due to merchant's machine/terminal is not calibrated to the actual year/time
# Assumptions for Gap #2 - shortness of single digit may be due to missing zeroes 


from pyspark.sql.functions import col, from_unixtime, to_utc_timestamp, date_format, when,length,expr

from pyspark.sql.types import TimestampType

# Constants
TARGET_TZ = "America/New_York"  # UTC+8; TARGET_TZ can use value from  IANA Time Zone Database, column TZ identifier. url:https://en.wikipedia.org/wiki/List_of_tz_database_time_zones

# 1. Parse `trans_date_trans_time` (string format to timestamp and convert timezone)
df_cleaned = df_cleaned.withColumn(
    "trans_date_trans_time_ts",
    to_utc_timestamp(col("trans_date_trans_time"), TARGET_TZ)
)

# Fix merch_eff_time
df_cleaned = df_cleaned.withColumn(
    "merch_eff_time_fixed",
    when(length(col("merch_eff_time")) == 13, col("merch_eff_time") / 1000)  # ms
    .when(length(col("merch_eff_time")) == 12, col("merch_eff_time") * 10 / 1000)  # likely truncated ms
    .when(length(col("merch_eff_time")) == 16, col("merch_eff_time") / 1_000_000)  # µs
    .when(length(col("merch_eff_time")) == 15, col("merch_eff_time") * 10 / 1_000_000)  # truncated µs
    .otherwise(None)
)
# Convert to timestamp
df_cleaned = df_cleaned.withColumn(
    "merch_eff_time_fixed_ts",
    to_utc_timestamp(col("merch_eff_time_fixed").cast("timestamp"), TARGET_TZ)
)

# Fix merch_last_update_time
df_cleaned = df_cleaned.withColumn(
    "merch_last_update_time_fixed",
    when(length(col("merch_last_update_time")) == 13, col("merch_last_update_time") / 1000)  # ms
    .when(length(col("merch_last_update_time")) == 12, col("merch_last_update_time") * 10 / 1000)  # likely truncated ms
    .when(length(col("merch_last_update_time")) == 16, col("merch_last_update_time") / 1_000_000)  # µs
    .when(length(col("merch_last_update_time")) == 15, col("merch_last_update_time") * 10 / 1_000_000)  # truncated µs
    .otherwise(None)
)
# Convert to timestamp
df_cleaned = df_cleaned.withColumn(
    "merch_last_update_time_fixed_ts",
    to_utc_timestamp(col("merch_last_update_time_fixed").cast("timestamp"), TARGET_TZ)
)

df_cleaned = df_cleaned.withColumn(
    "merch_eff_time_fixed_ts",
    expr("merch_eff_time_fixed_ts + INTERVAL 7 YEARS - INTERVAL 8 HOURS")
)

df_cleaned = df_cleaned.withColumn(
    "merch_last_update_time_fixed_ts",
    expr("merch_last_update_time_fixed_ts + INTERVAL 7 YEARS - INTERVAL 8 HOURS")
)

# 3. Format timestamps (optional, but for clear human-readable string)
for ts_col in ["trans_date_trans_time_ts", "merch_eff_time_fixed_ts", "merch_last_update_time_fixed_ts"]:
    df_cleaned = df_cleaned.withColumn(
        f"{ts_col}_formatted",
        date_format(col(ts_col), "yyyy-MM-dd HH:mm:ss.SSSSSS XXX")
    )

# Preview
df_cleaned.select(
    "trans_date_trans_time", "trans_date_trans_time_ts_formatted",
    "merch_eff_time", "merch_eff_time_fixed_ts_formatted",
    "merch_last_update_time", "merch_last_update_time_fixed_ts_formatted"
).show(20, truncate=False)


+---------------------+----------------------------------+----------------+---------------------------------+----------------------+-----------------------------------------+
|trans_date_trans_time|trans_date_trans_time_ts_formatted|merch_eff_time  |merch_eff_time_fixed_ts_formatted|merch_last_update_time|merch_last_update_time_fixed_ts_formatted|
+---------------------+----------------------------------+----------------+---------------------------------+----------------------+-----------------------------------------+
|2019-01-01 00:00:18  |2019-01-01 05:00:18.000000 +08:00 |1325376018798532|2019-01-01 05:00:18.798532 +08:00|1325376018666         |2019-01-01 05:00:18.666000 +08:00        |
|2019-01-01 00:00:44  |2019-01-01 05:00:44.000000 +08:00 |1325376044867960|2019-01-01 05:00:44.867960 +08:00|132537604479          |2019-01-01 05:00:44.790000 +08:00        |
|2019-01-01 00:00:51  |2019-01-01 05:00:51.000000 +08:00 |1325376051506840|2019-01-01 05:00:51.506840 +08:00|1325376051286   

Assessment check:

✅JSON Flattening
✅Name Derivation
✅Timestamp Conversion

In [41]:
df_cleaned = df_cleaned.drop("merch_eff_time_fixed_ts","merch_last_update_time_fixed","merch_last_update_time_fixed_ts","trans_date_trans_time_ts","merch_eff_time_fixed")
df_cleaned.show(truncate=False)

+----------+------+-------------+-----------+-------------------+--------+----------------+----------------------+------------------+------------------+-------------+----------------------------------------+---------------------+--------------------------------+-------------------------+----------+------+---------------------------------------------+-------+------------------+--------+------------------------------+------------------------+-----+-----+-----------+---------+----------------------------------+---------------------------------+-----------------------------------------+
|Unnamed: 0|amt   |category     |cc_bic     |cc_num             |is_fraud|merch_eff_time  |merch_last_update_time|merch_lat         |merch_long        |merch_zipcode|merchant                                |trans_date_trans_time|trans_num                       |person_name              |dob       |gender|job                                          |lat    |long              |city_pop|street                

Passing to pandas dataframe [COMMENTED DUE TO VERY SLOW PROCESS, PLEASE PROCEED TO NEXT CELL]

In [19]:
# # Commented this piece of working code because: VERY SLOW
# from pyspark.sql.functions import monotonically_increasing_id, col
# import math

# # 💡 Step 1: Add a unique row number to simulate index
# print("🔢 Adding row index using `monotonically_increasing_id`...")
# df_indexed = df_cleaned.withColumn("index", monotonically_increasing_id())

# # 💡 Step 2: Define chunk size
# chunk_size = 10000

# # 💡 Step 3: Count total rows and compute number of chunks
# total_rows = df_indexed.count()
# num_chunks = math.ceil(total_rows / chunk_size)
# print(f"📊 Total rows: {total_rows}, Chunk size: {chunk_size}, Total chunks: {num_chunks}")

# # 💡 Step 4: Process each chunk
# for i in range(num_chunks):
#     print(f"\n🧩 Processing chunk {i+1}/{num_chunks} (Rows {i*chunk_size} to {(i+1)*chunk_size - 1})")

#     # Use index to filter rows for current chunk
#     start = i * chunk_size
#     end = start + chunk_size
#     chunk_df = df_indexed.filter((col("index") >= start) & (col("index") < end)).drop("index")

#     # Convert to Pandas for downstream processing
#     chunk_pd = chunk_df.toPandas()

#     # Example processing: save as CSV
#     # chunk_pd.to_csv(f"output_chunk_{i+1}.csv", index=False)
#     print(f"✅ Finished processing chunk {i+1}/{num_chunks} with {len(chunk_pd)} rows")


Opting for direct PySpark aggregation and Visualization instead

In [42]:
# Test pandas dataframe view
ps_df = df_cleaned.pandas_api()
ps_df.head(5)

Unnamed: 0.1,Unnamed: 0,amt,category,cc_bic,cc_num,is_fraud,merch_eff_time,merch_last_update_time,merch_lat,merch_long,merch_zipcode,merchant,trans_date_trans_time,trans_num,person_name,dob,gender,job,lat,long,city_pop,street,city,state,zip,first_name,last_name,trans_date_trans_time_ts_formatted,merch_eff_time_fixed_ts_formatted,merch_last_update_time_fixed_ts_formatted
0,0,4.97,misc_net,CITIUS33CHI,2703186189652095,0,1325376018798532,1325376018666,36.011293,-82.048315,28705.0,"fraud_Rippin, Kub and Mann",2019-01-01 00:00:18,0b242abb623afc578575680df30655b9,"Jennifer,Banks,eeeee",1988-03-09,F,"Psychologist, counselling",36.0788,-81.1781,3495,561 Perry Cove,Moravian Falls,NC,28654,Jennifer,Banks,2019-01-01 05:00:18.000000 +08:00,2019-01-01 05:00:18.798532 +08:00,2019-01-01 05:00:18.666000 +08:00
1,1,107.23,grocery_pos,ADMDUS41,630423337322,0,1325376044867960,132537604479,49.159047,-118.186462,,"fraud_Heller, Gutmann and Zieme",2019-01-01 00:00:44,1f76529f8574734946361c461b024d99,"Stephanie,Gill,eeeee",1978-06-21,F,Special educational needs teacher,48.8878,-118.2105,149,43039 Riley Greens Suite 393,Orient,WA,99160,Stephanie,Gill,2019-01-01 05:00:44.000000 +08:00,2019-01-01 05:00:44.867960 +08:00,2019-01-01 05:00:44.790000 +08:00
2,2,220.11,entertainment,Null,38859492057661,0,1325376051506840,1325376051286,43.150704,-112.154481,83236.0,fraud_Lind-Buckridge,2019-01-01 00:00:51,a1a22d70485983eac12b5b88dad1cf95,Edward@Sanchez,1962-01-19,M,Nature conservation officer,42.1808,-112.262,4154,594 White Dale Suite 530,Malad City,ID,83252,Edward,Sanchez,2019-01-01 05:00:51.000000 +08:00,2019-01-01 05:00:51.506840 +08:00,2019-01-01 05:00:51.286000 +08:00
3,3,45.0,gas_transport,DEUTUS33TRF,3534093764340240,0,1325376076794698,1325376076365,47.034331,-112.561071,,"fraud_Kutch, Hermiston and Farrell",2019-01-01 00:01:16,6b849c168bdad6f867558c3793159a81,"Jeremy/White, !",1967-01-12,M,Patent attorney,46.2306,-112.1138,1939,9443 Cynthia Court Apt. 038,Boulder,MT,59632,Jeremy,White,2019-01-01 05:01:16.000000 +08:00,2019-01-01 05:01:16.794698 +08:00,2019-01-01 05:01:16.365000 +08:00
4,4,41.96,misc_pos,APBCUS61,375534208663984,0,1325376186746376,132537618681,38.674999,-78.632459,22844.0,fraud_Keeling-Crist,2019-01-01 00:03:06,a41d7549acf90789359a9aa5346dcb46,Tyler@Garcia,1986-03-28,M,Dance movement psychotherapist,38.4207,-79.4629,99,408 Bradley Rest,Doe Hill,VA,24433,Tyler,Garcia,2019-01-01 05:03:06.000000 +08:00,2019-01-01 05:03:06.746376 +08:00,2019-01-01 05:03:06.810000 +08:00


In [21]:
ps_df = df_cleaned.pandas_api()
ps_df.count()

Unnamed: 0                                   1296675
amt                                          1296675
category                                     1296675
cc_bic                                       1296675
cc_num                                       1296675
is_fraud                                     1296675
merch_eff_time                               1296675
merch_last_update_time                       1296675
merch_lat                                    1296675
merch_long                                   1296675
merch_zipcode                                1100702
merchant                                     1296675
trans_date_trans_time                        1296675
trans_num                                    1296675
person_name                                  1296675
dob                                          1296675
gender                                       1296675
job                                          1296675
lat                                          1

1st Visualization & Insights: Category vs Total Amount and Transaction Count

In [22]:
import pyspark.pandas as ps
from pyspark.sql import functions as F

# 1. Group by 'category' and compute sum and count
df_grouped = df_cleaned.groupBy("category").agg(
    F.sum("amt").alias("total_amt"),
    F.count("*").alias("transaction_count")
)

# 2. Convert to pandas-on-Spark DataFrame
ps_df = df_grouped.pandas_api()

# 3. Sort by total_amt (optional)
ps_df_sorted = ps_df.sort_values('total_amt', ascending=False)

# 4. Set index to 'category' and plot side-by-side bars
ps_df_sorted.set_index('category')[['total_amt', 'transaction_count']].plot(kind='bar')


# MY RAW ANALYSIS ANSWER
# From below bar plot figure, we can see that grocery_pos had the highest sum aggregate of $14.46082M. However, gas_transport wins with the highest transaction counts of 131.659k.
# Depending on the offered payment model of the payment gateway, these insights should bring profit for targeting the merchant's nature of business


# AI-POWERED ANSWER BASED ON MY ANALYSIS (I'm not afraid of exposing myself using AI for my answers, we should ride(utilize) with them, not compete with them)
# From the bar plot, we observe that grocery_pos had the highest total transaction amount, aggregating to approximately $14.46M.
# On the other hand, gas_transport recorded the highest number of transactions, totaling around 131.66k.

# These insights are significant when considering payment gateway pricing models.
#    If the gateway charges based on transaction volume (count), merchants in the gas_transport category would generate more fees.
#   Conversely, if the pricing is based on transaction value, grocery_pos would be more lucrative for the gateway.

# Thus, understanding the merchant's nature of business can help in optimizing profit strategies, 
# both for the payment gateway provider and for negotiating rates with high-volume or high-value merchants.


2nd Visualization & Insights: Finding Time difference between merchant eff_time and merchant las_update_time

In [43]:
from pyspark.sql.functions import col

df_with_diff = df_cleaned.withColumn(
    "time_diff_sec",
    (col("merch_last_update_time_fixed_ts_formatted").cast("timestamp").cast("double") -
     col("merch_eff_time_fixed_ts_formatted").cast("timestamp").cast("double"))
)

df_with_diff.orderBy(col("time_diff_sec").desc()).select(
    "category",
    "merch_eff_time_fixed_ts_formatted",
    "merch_last_update_time_fixed_ts_formatted",
    "time_diff_sec"
).show(20, truncate=False)

# since the highest time difference is only 0.89 seconds, i wouldn't dive deeper as i think this is already a good time difference (i may be very wrong, please don't hurt me)

+--------------+---------------------------------+-----------------------------------------+------------------+
|category      |merch_eff_time_fixed_ts_formatted|merch_last_update_time_fixed_ts_formatted|time_diff_sec     |
+--------------+---------------------------------+-----------------------------------------+------------------+
|grocery_pos   |2019-02-02 15:24:36.100351 +08:00|2019-02-02 15:24:36.998000 +08:00        |0.897648811340332 |
|personal_care |2019-10-10 16:49:37.100663 +08:00|2019-10-10 16:49:37.998000 +08:00        |0.8973369598388672|
|kids_pets     |2019-11-23 23:40:13.100198 +08:00|2019-11-23 23:40:13.997000 +08:00        |0.8968019485473633|
|food_dining   |2020-05-26 01:26:14.100532 +08:00|2020-05-26 01:26:14.997000 +08:00        |0.896467924118042 |
|misc_net      |2019-11-15 14:16:40.100542 +08:00|2019-11-15 14:16:40.997000 +08:00        |0.8964579105377197|
|entertainment |2019-07-18 16:41:58.100922 +08:00|2019-07-18 16:41:58.997000 +08:00        |0.8960778713

3rd Visualization & Insights: Frauds, its targeted category, preferred time to hit, & top hit states

In [None]:
from pyspark.sql.functions import col, when, hour, avg, count, expr
import plotly.express as px

# 1. Add time-related and fraud-label columns
df = df_cleaned.withColumn("trans_hour", hour("trans_date_trans_time_ts_formatted")) \
               .withColumn("is_fraud_label", when(col("is_fraud") == 1, "Fraud").otherwise("Non-Fraud"))

# 3. Transaction Count by Category & Fraud
category_stats = df.groupBy("category", "is_fraud_label") \
                   .agg(count("*").alias("txn_count")) \
                   .orderBy("txn_count", ascending=False)

category_stats_pd = category_stats.pandas_api()
category_stats_pd = category_stats_pd.to_pandas()

# 2. Transaction Count by Category and Fraud
fig2 = px.bar(category_stats_pd, x="category", y="txn_count", color="is_fraud_label",
              title="Transaction Count by Category and Fraud", barmode="group")
fig2.update_layout(xaxis_tickangle=-45)
fig2.show()

# When showing Fraud transactions, we can see that grocery_pos and shopping_net often been hit. So, let's filter in to those categories only

In [None]:
from pyspark.sql.functions import col, count

# 1. Filter to relevant merchant categories before aggregation
filtered_df = df.filter(col("category").isin("grocery_pos", "shopping_net"))

# 2. Group by transaction hour and fraud label
hourly_stats = filtered_df.groupBy("trans_hour", "is_fraud_label") \
                          .agg(count("*").alias("txn_count")) \
                          .orderBy("trans_hour", "is_fraud_label")

# 3. Convert to pandas for visualization
hourly_stats_pd = hourly_stats.pandas_api().to_pandas()

# 4. Plot using Plotly
import plotly.express as px

fig3 = px.line(hourly_stats_pd, 
               x="trans_hour", 
               y="txn_count", 
               color="is_fraud_label",
               title="Hourly Transaction Volume (Filtered: Grocery POS & Shopping Net)",
               markers=True)

fig3.update_layout(xaxis=dict(dtick=1))
fig3.show()

# From here, we can see that a spike of frauds happening between 2AM - 7AM with 3AM at its peak. 
# Human monitoring/alertness may be low as this time of the day is usually when people go to sleep. 
# People naturally pay lesser attention at this time of day


`to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small.



In [None]:
import pandas as pd
import plotly.express as px
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter

# Geocode the zipcodes
geolocator = Nominatim(user_agent="geoapi")
geocode = RateLimiter(geolocator.geocode, min_delay_seconds=1)
reverse = RateLimiter(geolocator.reverse, min_delay_seconds=1)

zipcode_df = pd.DataFrame(top_zipcodes_list, columns=["merch_zipcode"])
zipcode_df["merch_zipcode"] = zipcode_df["merch_zipcode"].astype(str)
zipcode_df["location"] = zipcode_df["merch_zipcode"].apply(lambda x: geolocator.geocode({"postalcode": x, "country": "USA"}))

# Filter out None
zipcode_df = zipcode_df[zipcode_df["location"].notnull()]

# Extract lat/lon and state
zipcode_df["latitude"] = zipcode_df["location"].apply(lambda loc: loc.latitude)
zipcode_df["longitude"] = zipcode_df["location"].apply(lambda loc: loc.longitude)
zipcode_df["state"] = zipcode_df["location"].apply(lambda loc: loc.raw.get("address", {}).get("state", "Unknown"))

# Merge with fraud counts
zipcode_counts = zipcode_stats_pd.groupby("merch_zipcode")["txn_count"].sum().reset_index()
zipcode_df = zipcode_df.merge(zipcode_counts, on="merch_zipcode", how="left")

# Plot
fig = px.scatter_geo(zipcode_df,
                     lat="latitude",
                     lon="longitude",
                     text="merch_zipcode",
                     color="state",
                     size="txn_count",
                     title="Top 10 Fraud Merchant Zipcodes on US Map",
                     scope="usa",
                     projection="albers usa")

fig.update_layout(legend_title_text='State')
fig.show()

# additional insights, top 10 frauds happens mostly at states in mid-west regions

PII DATA Handlings

In [None]:
from pyspark.sql.functions import col, sha2, concat_ws, lit, year, to_date

# 1. Mask credit card number (keep last 4 digits only)
df_anonymized = df_cleaned.withColumn("cc_num_masked", concat_ws("", lit("XXXX-XXXX-XXXX-"), col("cc_num").substr(-4, 4)))

# 2. Mask full name (keep initials or drop)
df_anonymized = df_anonymized.withColumn("name_initials", 
    concat_ws("", col("first_name").substr(1, 1), col("last_name").substr(1, 1)))

# 3. Age bucket from date of birth
df_anonymized = df_anonymized.withColumn("dob", to_date("dob"))
df_anonymized = df_anonymized.withColumn("age", 2025 - year(col("dob")))
df_anonymized = df_anonymized.withColumn("age_group",
    when(col("age") < 20, "<20")
    .when((col("age") >= 20) & (col("age") < 30), "20-29")
    .when((col("age") >= 30) & (col("age") < 40), "30-39")
    .when((col("age") >= 40) & (col("age") < 50), "40-49")
    .when((col("age") >= 50) & (col("age") < 60), "50-59")
    .otherwise("60+")
)

# 4. Hash transaction ID (optional, keeps uniqueness)
df_anonymized = df_anonymized.withColumn("trans_num_hash", sha2(col("trans_num"), 256))

# 5. Drop sensitive PII columns
pii_columns_to_drop = [
    "person_name", "first_name", "last_name", "dob", "cc_num", "trans_num", "job", "street"
]

df_anonymized = df_anonymized.drop(*pii_columns_to_drop)

# 6. Final selection of relevant and anonymized columns
df_anonymized = df_anonymized.select(
    "amt", "category", "cc_bic", "cc_num_masked", "is_fraud",
    "merch_lat", "merch_long", "merch_zipcode", "merchant",
    "gender", "age_group", "name_initials",
    "trans_num_hash", "city", "state", "zip",
    "trans_date_trans_time_ts_formatted","merch_eff_time_fixed_ts_formatted","merch_last_update_time_fixed_ts_formatted"
)
df_anonymized.show(truncate=False)


# For PII Data Handling, i did it last because i thought there's some useful information i can analyze before i do any data redaction (lol, in the end, no difference if i handled it earlier)
# PII Data Handling approach for:
# Credit Card No: masking the numbers and showing only the last 4 digits
# First/Last Name: Capture the Initials only
# Age: group by age bucket
# Transaction ID: Apply SHA-256 hashing, but prefer using a custom key-based encryption method for better control and potential reversibility.
# Drop other sensitive PII Columns

+------+-------------+-----------+-------------------+--------+------------------+------------------+-------------+----------------------------------------+------+---------+-------------+----------------------------------------------------------------+------------------------+-----+-----+----------------------------------+---------------------------------+-----------------------------------------+
|amt   |category     |cc_bic     |cc_num_masked      |is_fraud|merch_lat         |merch_long        |merch_zipcode|merchant                                |gender|age_group|name_initials|trans_num_hash                                                  |city                    |state|zip  |trans_date_trans_time_ts_formatted|merch_eff_time_fixed_ts_formatted|merch_last_update_time_fixed_ts_formatted|
+------+-------------+-----------+-------------------+--------+------------------+------------------+-------------+----------------------------------------+------+---------+-------------+-----------

Testing Grounds

In [25]:
# to test JAVA_HOME
# import os
# print("JAVA_HOME:", os.environ.get("JAVA_HOME"))
