In [1]:
import numpy as np
import pandas as pd

In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("BankFraud") \
    .getOrCreate()

In [3]:
df1 = spark.read.csv('train_transaction.csv', header=True, inferSchema=True)

In [4]:
df2 = spark.read.csv('train_identity.csv', header=True, inferSchema=True)

# get the shape of df1

In [None]:
num_rows = df1.count()

# Get the number of columns
num_columns = len(df1.columns)

print("Number of Rows:", num_rows)
print("Number of Columns:", num_columns)

# get the shape of df2

In [None]:
num_rows = df2.count()

# Get the number of columns
num_columns = len(df2.columns)

print("Number of Rows:", num_rows)
print("Number of Columns:", num_columns)

# join both dataframes

In [5]:
df3 = df1.join(df2, on="TransactionID", how="left")

In [None]:
num_rows = df3.count()

# Get the number of columns
num_columns = len(df3.columns)

print("Number of Rows:", num_rows)
print("Number of Columns:", num_columns)

# find missing values 

In [6]:
from pyspark.sql.functions import col, sum

In [7]:
total_rows = df3.count()
null_percentages = [(col_name, df3.filter(col(col_name).isNull()).count() / total_rows) for col_name in df3.columns]

In [8]:
columns_to_drop = [col_name for col_name, null_percentage in null_percentages if null_percentage > 0.4]

In [9]:
df = df3.drop(*columns_to_drop)

In [10]:
len(df.columns)

202

# replace missing values by mean

In [11]:
columns_to_replace = ["card2", "addr1", "addr2","D4"]

In [12]:
from pyspark.sql.functions import col, mean

In [13]:
means = df.select(*[mean(col(c)).alias(c) for c in columns_to_replace]).collect()[0]

In [14]:
mean_values = {col_name: mean_val for col_name, mean_val in zip(columns_to_replace, means)}

In [15]:
for col_name in columns_to_replace:
    df = df.fillna(mean_values[col_name], subset=[col_name])

In [None]:
for i in columns_to_replace:
    null_count = df.select(sum(col(i).isNull().cast("int")).alias("null_count")).collect()[0]["null_count"]

# Display the null count
    print(f"Null count for {i}: {null_count}")

# card3 , card5 by median

In [16]:
from pyspark.sql.functions import median

In [17]:
columns_to_replace = ["card3", "card5"]

In [18]:
medians = df.select(*[median(col(c)).alias(c) for c in columns_to_replace]).collect()[0]

In [19]:
median_values = {col_name: median_val for col_name, median_val in zip(columns_to_replace, medians)}

In [20]:
for col_name in columns_to_replace:
    df = df.fillna(median_values[col_name], subset=[col_name])

In [None]:
for i in columns_to_replace:
    null_count = df.select(sum(col(i).isNull().cast("int")).alias("null_count")).collect()[0]["null_count"]

# Display the null count
    print(f"Null count for {i}: {null_count}")

# card4 , card6 , P_emaildomain , M6 column by mode

In [21]:
from pyspark.sql.functions import mode

### card4

In [22]:
column_to_replace = 'card4'

In [23]:
mode_value = df.select(mode(col(column_to_replace))).collect()[0][0]

In [24]:
df = df.fillna(mode_value, subset=[column_to_replace])

In [None]:
null_count = df.select(sum(col('card4').isNull().cast("int")).alias("null_count")).collect()[0]["null_count"]

# Display the null count
print(f"Null count for card4: {null_count}")

### card6

In [25]:
column_to_replace = 'card6'

In [26]:
mode_value = df.select(mode(col(column_to_replace))).collect()[0][0]

In [27]:
df = df.fillna(mode_value, subset=[column_to_replace])

In [None]:
null_count = df.select(sum(col('card6').isNull().cast("int")).alias("null_count")).collect()[0]["null_count"]

# Display the null count
print(f"Null count for card6: {null_count}")

### P_emaildomain

In [28]:
column_to_replace = 'P_emaildomain'

In [29]:
mode_value = df.select(mode(col(column_to_replace))).collect()[0][0]

In [30]:
df = df.fillna(mode_value, subset=[column_to_replace])

###  M6

In [31]:
column_to_replace = 'M6'

In [32]:
mode_value = df.select(mode(col(column_to_replace))).collect()[0][0]

In [33]:
df = df.fillna(mode_value, subset=[column_to_replace])

# d1 , d10 , d15

In [34]:
columns_to_replace = ['D1','D10','D15']

In [35]:
medians = df.select(*[median(col(c)).alias(c) for c in columns_to_replace]).collect()[0]

In [36]:
median_values = {col_name: median_val for col_name, median_val in zip(columns_to_replace, medians)}

In [37]:
for col_name in columns_to_replace:
    df = df.fillna(median_values[col_name], subset=[col_name])

# for V columns

In [38]:
columns_to_replace = [col_name for col_name in df.columns if col_name.startswith("V")]

In [39]:
medians = df.select(*[median(col(c)).alias(c) for c in columns_to_replace]).collect()[0]

In [40]:
median_values = {col_name: median_val for col_name, median_val in zip(columns_to_replace, medians)}

In [41]:
for col_name in columns_to_replace:
    df = df.fillna(median_values[col_name], subset=[col_name])

# check if any null value exist 

In [None]:
for i in df.columns:

    null_count = df.select(sum(col(i).isNull().cast("int")).alias("null_count")).collect()[0]["null_count"]

# Display the null count
    print(f"Null count for {i}: {null_count}")

In [42]:
df = df.drop('TransactionID')

In [None]:
spark.stop()