In [1]:
#Import Statements
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
import shutil
import zipfile


In [2]:
## Start SparkSession
spark=SparkSession.builder.appName("CreditCardFraudDetection").getOrCreate()

In [3]:
dataset_name = "mlg-ulb/creditcardfraud"
download_path = "data/"

# Create the directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)

# Use the Kaggle API to download the dataset
os.system(f"kaggle datasets download -d {dataset_name} -p {download_path}")

0

In [4]:
zip_file = f"{download_path}creditcardfraud.zip"
with zipfile.ZipFile(zip_file, "r") as zip_ref:
    zip_ref.extractall(download_path)

# Remove the zip file
os.remove(zip_file)

In [5]:
input_path = "data/"
original_file = "data/creditcard.csv"
#Duplicate the file
for i in range(1, 6):
    shutil.copy(original_file, f"{input_path}creditcard_{i}.csv")

# Load all files into a single DataFrame
df = spark.read.csv(f"{input_path}/*.csv", header=True, inferSchema=True)

In [6]:
#Deduplicate the data in Dataframe
df=df.dropDuplicates()

In [7]:
#Get the count of unique rows in the Dataframe
df.count()

283726

In [8]:
# Check the schema
df.printSchema()

root
 |-- Time: double (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double (nulla

In [9]:
# Dataset shape
print(f"Rows: {df.count()}, Columns: {len(df.columns)}")

Rows: 283726, Columns: 31


In [10]:
#Check for nulls in the Dataframe
df.select([F.count(F.when(F.col(c).isNull(), c).alias(c)) for c in df.columns]).show()

+-----------------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+---

In [11]:
# Descriptive statistics
df.describe().show()


+-------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|summary|             Time|                  V1|                  V2|                  V3|                  V4|                  V5|                  V6|                  V7|                  V8|                  V9|                 V10|                 V11|                 V12|                 V13|                 V14|                 V15|  

In [12]:
# Print the mean of the Class column
df.select(F.mean(df['Class'])).show()

+--------------------+
|          avg(Class)|
+--------------------+
|0.001667101358352777|
+--------------------+



In [13]:
# Print the median of Amount column
df.select(F.median(df['Amount'])).show()

+--------------+
|median(Amount)|
+--------------+
|          22.0|
+--------------+



In [14]:
#Print the mode of the Time column
df.select(F.mode(df['Time'])).show()

+----------+
|mode(Time)|
+----------+
|    3767.0|
+----------+



In [15]:
# Rename columns for consistency
df = df.toDF(*[col.lower().replace(" ", "_") for col in df.columns])

In [16]:
# Find the mean amount
meanAmount=df.select(F.mean(F.col("amount"))).collect()[0][0]
print("Mean of Amounts is: ",meanAmount)

Mean of Amounts is:  88.47268731100479


In [None]:
# Find the Standard deviation of Amount
sdvAmount=df.select(F.stddev(F.col("amount"))).collect()[0][0]
print("Standard devation of Amounts is: ", sdvAmount)

In [None]:
# Add NormalizedAmount column
df=df.withColumn("NormalizedAmount",(F.col("amount")-meanAmount)/sdvAmount).withColumn("AmountLog",F.log("amount")+1)

In [None]:
# Save the df in parquet format
output_path = "output_data/"
df.write.parquet(output_path, mode="overwrite")
print(f"Data saved to {output_path} in Parquet format.")

In [None]:
#Create a Temp View
df.createOrReplaceTempView("creditcard_data")

# SQL queries
query1 = "SELECT AVG(NormalizedAmount) AS avg_norm_amount FROM creditcard_data WHERE class = 1"
query2 = "SELECT MAX(NormalizedAmount) AS max_norm_amount FROM creditcard_data WHERE class = 0"
query3 = "SELECT class, AVG(AmountLog) AS avg_amountlog FROM creditcard_data GROUP BY class"

# Execute queries
avg_norm_amount = spark.sql(query1).show()
max_norm_amount = spark.sql(query2).show()
avg_amountlog = spark.sql(query3).show()