1. Import Necessary Libraries and Initialize Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, stddev_samp, regexp_replace, to_date, count

# Initialize Spark session with increased memory and optimized settings, running in local mode
spark = SparkSession.builder \
    .appName("DataCleaning") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.local.dir", "C:/temp/spark") \
    .getOrCreate()


2. Load the Dataset

In [None]:
# Specifying the dataset path
file_path = 'C:/Users/a/Desktop/BD assignment/data/raw/ShopSpectra Transaction Dataset.csv'

# Loading the dataset from the specified path
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Displaying the first few rows of the loaded dataset
data.show(5)


+-------------+------+-----------------+--------------------+-------------+----------------+--------+-----------+-------------+----------+-----------------+-----------+--------+--------------+---------------+--------+---------+---------------+
|TransactionID|UserID|TransactionAmount|    TransactionDate1|PaymentMethod|MerchantCategory|Quantity|CustomerAge|     Location|DeviceType|TransactionStatus|Is_Declined|Is_Fraud|AccountAgeDays|TransactionDate|Latitude|Longitude|TransactionHour|
+-------------+------+-----------------+--------------------+-------------+----------------+--------+-----------+-------------+----------+-----------------+-----------+--------+--------------+---------------+--------+---------+---------------+
|            1| 29318|          1307.37|2023-12-19 19:47:...|       PayPal|   Entertainment|       5|         79|Oklahoma City|    Tablet|          Pending|          0|       0|            31|     2023-06-10| 35.4676| -97.5164|              8|
|            2| 33379|  

3. Handle Missing Values

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Remove rows with missing critical identifiers
data = data.filter(F.col('UserID').isNotNull())
data = data.filter(F.col('TransactionID').isNotNull())

# Interpolate missing TransactionDate1 values
window_spec = Window.partitionBy('UserID').orderBy('TransactionDate1')
data = data.withColumn('TransactionDate1', F.last('TransactionDate1', ignorenulls=True).over(window_spec))

# Fill missing numerical values with median
numerical_cols = [col for col, dtype in data.dtypes if dtype in ['int', 'double']]
numerical_cols.remove('UserID')  # Ensure UserID is not imputed

for num_col in numerical_cols:
    median_value = data.approxQuantile(num_col, [0.5], 0.01)[0]
    data = data.fillna({num_col: median_value})

# Fill missing categorical values with mode
categorical_cols = [col for col, dtype in data.dtypes if dtype == 'string' and col != 'TransactionDate1']

for cat_col in categorical_cols:
    mode_value = data.groupBy(cat_col).count().orderBy('count', ascending=False).first()[0]
    data = data.fillna({cat_col: mode_value})

# Handle critical columns with domain knowledge
data = data.withColumn('Is_Declined', F.when(F.col('Is_Declined').isNull(), 0).otherwise(F.col('Is_Declined')))
data = data.withColumn('Is_Fraud', F.when(F.col('Is_Fraud').isNull(), 0).otherwise(F.col('Is_Fraud')))

# Final cleanup: drop rows with remaining missing values in critical columns
data = data.dropna(subset=['TransactionID', 'TransactionAmount', 'Quantity', 'CustomerAge', 'Location'])


4. Remove Duplicates:

In [5]:
# Remove duplicate rows
data = data.dropDuplicates()


5. Correct Formatting Errors

In [6]:
# Standardize date formats
date_cols = [col for col, dtype in data.dtypes if 'date' in col.lower()]

for date_col in date_cols:
    data = data.withColumn(date_col, to_date(col(date_col), 'yyyy-MM-dd'))
# Clean numerical columns of non-numeric characters
for num_col in numerical_cols:
    data = data.withColumn(num_col, regexp_replace(col(num_col), '[^0-9.]', '').cast('double'))


6. Column Renaming

In [7]:
# Renaming columns for consistency and clarity
data = data.withColumnRenamed('TransactionID', 'transaction_id') \
           .withColumnRenamed('UserID', 'user_id') \
           .withColumnRenamed('TransactionAmount', 'transaction_amount') \
           .withColumnRenamed('TransactionDate1', 'transaction_date1') \
           .withColumnRenamed('PaymentMethod', 'payment_method') \
           .withColumnRenamed('MerchantCategory', 'merchant_category') \
           .withColumnRenamed('Quantity', 'quantity') \
           .withColumnRenamed('CustomerAge', 'customer_age') \
           .withColumnRenamed('Location', 'location') \
           .withColumnRenamed('DeviceType', 'device_type') \
           .withColumnRenamed('TransactionStatus', 'transaction_status') \
           .withColumnRenamed('Is_Declined', 'is_declined') \
           .withColumnRenamed('Is_Fraud', 'is_fraud') \
           .withColumnRenamed('AccountAgeDays', 'account_age_days') \
           .withColumnRenamed('Latitude', 'latitude') \
           .withColumnRenamed('Longitude', 'longitude') \
           .withColumnRenamed('TransactionHour', 'transaction_hour')


7. Feature Engineering

In [8]:
# Creating new features
user_transactions = data.groupBy("user_id").agg(
    count("transaction_id").alias("total_transactions_per_user"),
    avg("transaction_amount").alias("avg_transaction_amount_per_user")
)

# Joining the new features with the original dataset
data = data.join(user_transactions, on="user_id", how="left")


8. Save the Cleaned Data

In [9]:
import pandas as pd

# Convert the PySpark DataFrame to a Pandas DataFrame
pandas_df = data.toPandas()

# Specify the new path to save the cleaned data as a CSV file
new_output_path = 'C:/Users/a/Desktop/BD assignment/data/processed/clean_data.csv'

# Save the cleaned data as a CSV file for efficient storage and retrieval
pandas_df.to_csv(new_output_path, index=False)

print(f"Cleaned data successfully loaded and saved to {new_output_path}")


Cleaned data successfully loaded and saved to C:/Users/a/Desktop/BD assignment/data/processed/clean_data.csv
