In [9]:
print(
    "################################################################################"
)
print("Use standard python libraries to do the transformations")
print(
    "################################################################################"
)
import csv

# Question: How do you read data from a CSV file into a list of dictionaries?
data = []
with open("./sample_data.csv", "r", newline="") as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        data.append(row)

# Question: How do you remove duplicate rows based on customer ID?
data_unique = []
customer_ids_seen = set()
for row in data:
    if row["Customer_ID"] not in customer_ids_seen:
        data_unique.append(row)
        customer_ids_seen.add(row["Customer_ID"])
    else:
        print(f'duplicate customer id {row["Customer_ID"]}')

# Question: How do you handle missing values by replacing them with 0?
for row in data_unique:
    if not row["Age"]:
        print(f'Customer {row["Customer_Name"]} does not have Age value')
        row["Age"] = 0
    if not row["Purchase_Amount"]:
        row["Purchase_Amount"] = 0.0

# Question: How do you remove outliers such as age > 100 or purchase amount > 1000?
data_cleaned = [
    row
    for row in data_unique
    if int(row["Age"]) <= 100 and float(row["Purchase_Amount"]) <= 1000
]

# Question: How do you convert the Gender column to a binary format (0 for Female, 1 for Male)?
for row in data_cleaned:
    if row["Gender"] == "Female":
        row["Gender"] = 0
    elif row["Gender"] == "Male":
        row["Gender"] = 1

# Question: How do you split the Customer_Name column into separate First_Name and Last_Name columns?
for row in data_cleaned:
    first_name, last_name = row["Customer_Name"].split(" ", 1)
    row["First_Name"] = first_name
    row["Last_Name"] = last_name
    del row["Customer_Name"]

# Question: How do you calculate the total purchase amount by Gender?
from collections import defaultdict
total_purchase_by_gender = defaultdict(float)
for row in data_cleaned:
    total_purchase_by_gender[row["Gender"]] += float(row["Purchase_Amount"])

# Question: How do you calculate the average purchase amount by Age group?
# assume age_groups is the grouping we want
# hint: Why do we convert to float?
age_groups = {"18-30": [], "31-40": [], "41-50": [], "51-60": [], "61-70": []}
for row in data_cleaned:
    age = int(row["Age"])
    if age <= 30:
        age_groups["18-30"].append(float(row["Purchase_Amount"]))
    elif age <= 40:
        age_groups["31-40"].append(float(row["Purchase_Amount"]))
    elif age <= 50:
        age_groups["41-50"].append(float(row["Purchase_Amount"]))
    elif age <= 60:
        age_groups["51-60"].append(float(row["Purchase_Amount"]))
    else:
        age_groups["61-70"].append(float(row["Purchase_Amount"]))

average_purchase_by_age_group = {
    group: sum(amounts) / len(amounts) for group, amounts in age_groups.items()
}

# Question: How do you print the results for total purchase amount by Gender and average purchase amount by Age group?
print("Total purchase amount by Gender:", total_purchase_by_gender)
print("Average purchase amount by Age group:", average_purchase_by_age_group)


################################################################################
Use standard python libraries to do the transformations
################################################################################
duplicate customer id 84
duplicate customer id 85
duplicate customer id 86
duplicate customer id 87
duplicate customer id 88
duplicate customer id 89
duplicate customer id 90
duplicate customer id 91
Customer Alice Johnson does not have Age value
Customer Jack Garcia does not have Age value
Total purchase amount by Gender: defaultdict(<class 'float'>, {1: 24599.890000000003, 0: 28215.780000000002})
Average purchase amount by Age group: {'18-30': 567.9048387096775, '31-40': 555.2423529411764, '41-50': 493.946, '51-60': 494.51882352941175, '61-70': 534.6971428571428}


In [12]:
print(
    "################################################################################"
)
print("Use PySpark DataFrame API to do the transformations")
print(
    "################################################################################"
)

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, lit, when, split, sum as spark_sum, avg, regexp_replace
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType

# Initialize Spark session
# spark = SparkSession.builder \
#     .appName("DataTransformation") \
#     .config("spark.sql.adaptive.enabled", "true") \
#     .getOrCreate()

# Question: How do you connect to Spark and load data from a CSV file into a DataFrame?
# Define schema (equivalent to CREATE TABLE in DuckDB)
schema = StructType([
    StructField("Customer_ID", IntegerType(), True),
    StructField("Customer_Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Purchase_Amount", FloatType(), True),
    StructField("Purchase_Date", DateType(), True)
])

# Read data from CSV file into DataFrame
data = spark.read \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .schema(schema) \
    .csv("./sample_data.csv")

# Question: How do you remove duplicate rows based on customer ID in PySpark?
data_unique = data.dropDuplicates()

# Question: How do you handle missing values by replacing them with 0 in PySpark?
data_cleaned_missing = data_unique.select(
    col("Customer_ID"),
    col("Customer_Name"),
    coalesce(col("Age"), lit(0)).alias("Age"),
    col("Gender"),
    coalesce(col("Purchase_Amount"), lit(0.0)).alias("Purchase_Amount"),
    col("Purchase_Date")
)

# Question: How do you remove outliers (e.g., age > 100 or purchase amount > 1000) in PySpark?
data_cleaned_outliers = data_cleaned_missing.filter(
    (col("Age") <= 100) & (col("Purchase_Amount") <= 1000)
)

# Question: How do you convert the Gender column to a binary format (0 for Female, 1 for Male) in PySpark?
data_cleaned_gender = data_cleaned_outliers.withColumn(
    "Gender_Binary",
    when(col("Gender") == "Female", 0).otherwise(1)
)

# Question: How do you split the Customer_Name column into separate First_Name and Last_Name columns in PySpark?
data_cleaned = data_cleaned_gender.select(
    col("Customer_ID"),
    split(col("Customer_Name"), " ").getItem(0).alias("First_Name"),
    split(col("Customer_Name"), " ").getItem(1).alias("Last_Name"),
    col("Age"),
    col("Gender_Binary"),
    col("Purchase_Amount"),
    col("Purchase_Date")
)

# Question: How do you calculate the total purchase amount by Gender in PySpark?
total_purchase_by_gender = data_cleaned_gender.groupBy("Gender_Binary") \
    .agg(spark_sum("Purchase_Amount").alias("Total_Purchase_Amount")) \
    .collect()

# Question: How do you calculate the average purchase amount by Age group in PySpark?
average_purchase_by_age_group = data_cleaned.withColumn(
    "Age_Group",
    when((col("Age") >= 18) & (col("Age") <= 30), "18-30")
    .when((col("Age") >= 31) & (col("Age") <= 40), "31-40")
    .when((col("Age") >= 41) & (col("Age") <= 50), "41-50")
    .when((col("Age") >= 51) & (col("Age") <= 60), "51-60")
    .otherwise("61-70")
).groupBy("Age_Group") \
    .agg(avg("Purchase_Amount").alias("Average_Purchase_Amount")) \
    .collect()

# Question: How do you print the results for total purchase amount by Gender and average purchase amount by Age group in PySpark?
print("====================== Results ======================")
print("Total purchase amount by Gender:")
for row in total_purchase_by_gender:
    print(f"Gender_Binary: {row['Gender_Binary']}, Total_Purchase_Amount: {row['Total_Purchase_Amount']}")

print("Average purchase amount by Age group:")
for row in average_purchase_by_age_group:
    print(f"Age_Group: {row['Age_Group']}, Average_Purchase_Amount: {row['Average_Purchase_Amount']}")

# Optional: Show DataFrame contents for verification
print("\n====================== Data Preview ======================")
print("Final cleaned data:")
data_cleaned.show(10)

print("Total purchase by gender:")
data_cleaned_gender.groupBy("Gender_Binary") \
    .agg(spark_sum("Purchase_Amount").alias("Total_Purchase_Amount")) \
    .show()

print("Average purchase by age group:")
data_cleaned.withColumn(
    "Age_Group",
    when((col("Age") >= 18) & (col("Age") <= 30), "18-30")
    .when((col("Age") >= 31) & (col("Age") <= 40), "31-40")
    .when((col("Age") >= 41) & (col("Age") <= 50), "41-50")
    .when((col("Age") >= 51) & (col("Age") <= 60), "51-60")
    .otherwise("61-70")
).groupBy("Age_Group") \
    .agg(avg("Purchase_Amount").alias("Average_Purchase_Amount")) \
    .show()

# Clean up
spark.stop()

################################################################################
Use PySpark DataFrame API to do the transformations
################################################################################


                                                                                

Total purchase amount by Gender:
Gender_Binary: 1, Total_Purchase_Amount: 24599.89005279541
Gender_Binary: 0, Total_Purchase_Amount: 28215.77996635437
Average purchase amount by Age group:
Age_Group: 18-30, Average_Purchase_Amount: 570.8131050899111
Age_Group: 41-50, Average_Purchase_Amount: 493.946000289917
Age_Group: 31-40, Average_Purchase_Amount: 555.2423526539523
Age_Group: 51-60, Average_Purchase_Amount: 494.51882250168745
Age_Group: 61-70, Average_Purchase_Amount: 533.576874256134

Final cleaned data:
+-----------+----------+---------+---+-------------+------------------+-------------+
|Customer_ID|First_Name|Last_Name|Age|Gender_Binary|   Purchase_Amount|Purchase_Date|
+-----------+----------+---------+---+-------------+------------------+-------------+
|         35|     Grace| Williams| 58|            1|139.00999450683594|   2024-04-23|
|         93|       Bob|Rodriguez| 26|            1| 891.4600219726562|   2024-01-22|
|         58|     Alice|    Jones| 50|            1| 397