In [0]:
from pyspark.sql import functions as F

# Read the file
customers_df = spark.table("cip.customers")

# Doing quick sanity checks (schema, few rows, etc)
customers_df.printSchema()
display(customers_df.limit(10))
print("Row Count:", customers_df.count())

root
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- region: string (nullable = true)
 |-- customer_segment: string (nullable = true)



customer_id,first_name,last_name,email,gender,date_of_birth,signup_date,region,customer_segment
1adf1f4d-87f2-431e-90d4-2bd1c8ad3971,Zachary,Jackson,gdavis@salazar.com,Other,1990-10-30,2025-08-03,South,Gold
b006f784-c562-4e8d-a830-c43fc1603096,Jill,Day,nunezlance@yahoo.com,Other,1993-04-12,2023-08-26,South,Gold
a528db6f-4f0c-42c9-9abe-d480cc5247dc,Caroline,Franklin,dianaflores@ruiz.info,Male,1999-10-03,2024-04-23,North,Silver
47edf296-13b8-4ff1-a436-ab6c98797df8,Deborah,Stone,edwardsricky@gmail.com,Male,1994-06-06,2023-03-23,East,Platinum
64f9c9e1-b0bb-4f56-af8c-00dc625f12c6,Alexander,Campbell,kimberly82@harris.com,Other,1988-10-18,2025-06-28,East,Silver
5d868d06-3fa3-4eca-8de4-4c54d0ea0e74,Beverly,Ramirez,jennifer66@gmail.com,Other,1976-10-12,2025-06-06,North,Platinum
9691c7cd-1d77-4053-a62c-876d461ab8f5,Ashley,Carlson,warnergeorge@hotmail.com,Male,1995-09-12,2023-02-04,West,Silver
5af5d6e5-b22e-45a3-8bed-2819edef3134,Charles,Rose,susan33@ramos.com,Male,1957-08-16,2024-03-09,South,Silver
4af30879-0276-4d31-a721-b296141d031e,Nicholas,Villarreal,itodd@hotmail.com,Other,1999-07-06,2023-02-28,South,Silver
aa4a4caa-1895-40b0-a5c1-fe818b431912,Kristin,Salazar,samantha74@wade-davis.net,Male,1970-02-03,2022-12-06,North,Platinum


Row Count: 10000


In [0]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DateType

# Add full name column first
customers_df = customers_df.withColumn(
    "name", F.concat_ws(" ", F.col("first_name"), F.col("last_name"))
)

# 1) Data Audit Functions

def get_row_counts(df: DataFrame, id_col: str = None):
    total = df.count()
    distinct = df.select(id_col).distinct().count() if id_col else None
    return total, distinct


def get_null_counts(df: DataFrame):
    null_exprs = [F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]
    return (
        df.select(null_exprs)
          .toPandas()
          .T.rename(columns={0: "null_count"})
          .sort_values("null_count", ascending=False)
    )


def get_duplicate_rows(df: DataFrame, id_col: str):
    return (
        df.groupBy(id_col)
          .count()
          .filter(F.col("count") > 1)
          .orderBy(F.col("count").desc())
    )


def get_date_summary(df: DataFrame, date_col: str):
    return df.select(date_col).summary()


# 2) Data Cleaning Functions

def drop_duplicates(df: DataFrame, id_col: str):
    return df.dropDuplicates([id_col])


def fill_missing_values(df: DataFrame, strategy: dict):
    return df.fillna(strategy)


def standardize_strings(df: DataFrame, string_cols: list):
    for col in string_cols:
        df = df.withColumn(col, F.trim(F.col(col)))
    return df


def cast_to_date(df: DataFrame, date_cols: list, format="yyyy-MM-dd"):
    for col in date_cols:
        df = df.withColumn(col, F.to_date(F.col(col), format))
    return df

# 3) Apply on Customers Dataset

# --- Audit ---
total_rows, distinct_ids = get_row_counts(customers_df, "customer_id")
print(f"Total Rows: {total_rows}, Distinct IDs: {distinct_ids}")

display(get_null_counts(customers_df))
display(get_duplicate_rows(customers_df, "customer_id"))
display(get_date_summary(customers_df, "date_of_birth"))

# --- Cleaning ---
customers_clean_df = customers_df.transform(lambda df: drop_duplicates(df, "customer_id")) \
                                 .transform(lambda df: fill_missing_values(df, {"name": "Unknown"})) \
                                 .transform(lambda df: standardize_strings(df, ["name", "region"])) \
                                 .transform(lambda df: cast_to_date(df, ["date_of_birth", "signup_date"]))

display(customers_clean_df.limit(10))


Total Rows: 10000, Distinct IDs: 10000


null_count
0
0
0
0
0
0
0
0
0
0


customer_id,count


summary
count
mean
stddev
min
25%
50%
75%
max


customer_id,first_name,last_name,email,gender,date_of_birth,signup_date,region,customer_segment,name
1adf1f4d-87f2-431e-90d4-2bd1c8ad3971,Zachary,Jackson,gdavis@salazar.com,Other,1990-10-30,2025-08-03,South,Gold,Zachary Jackson
b006f784-c562-4e8d-a830-c43fc1603096,Jill,Day,nunezlance@yahoo.com,Other,1993-04-12,2023-08-26,South,Gold,Jill Day
a528db6f-4f0c-42c9-9abe-d480cc5247dc,Caroline,Franklin,dianaflores@ruiz.info,Male,1999-10-03,2024-04-23,North,Silver,Caroline Franklin
47edf296-13b8-4ff1-a436-ab6c98797df8,Deborah,Stone,edwardsricky@gmail.com,Male,1994-06-06,2023-03-23,East,Platinum,Deborah Stone
64f9c9e1-b0bb-4f56-af8c-00dc625f12c6,Alexander,Campbell,kimberly82@harris.com,Other,1988-10-18,2025-06-28,East,Silver,Alexander Campbell
5d868d06-3fa3-4eca-8de4-4c54d0ea0e74,Beverly,Ramirez,jennifer66@gmail.com,Other,1976-10-12,2025-06-06,North,Platinum,Beverly Ramirez
9691c7cd-1d77-4053-a62c-876d461ab8f5,Ashley,Carlson,warnergeorge@hotmail.com,Male,1995-09-12,2023-02-04,West,Silver,Ashley Carlson
5af5d6e5-b22e-45a3-8bed-2819edef3134,Charles,Rose,susan33@ramos.com,Male,1957-08-16,2024-03-09,South,Silver,Charles Rose
4af30879-0276-4d31-a721-b296141d031e,Nicholas,Villarreal,itodd@hotmail.com,Other,1999-07-06,2023-02-28,South,Silver,Nicholas Villarreal
aa4a4caa-1895-40b0-a5c1-fe818b431912,Kristin,Salazar,samantha74@wade-davis.net,Male,1970-02-03,2022-12-06,North,Platinum,Kristin Salazar


Duplicates after cleaning: 


customer_id,count


Null Counts after cleaning: 


null_count
0
0
0
0
0
0
0
0
0
0


Schema after cleaning: 
root
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- region: string (nullable = true)
 |-- customer_segment: string (nullable = true)
 |-- name: string (nullable = false)



In [0]:
# Ingesting Product Dataset
products_df = spark.table("cip.products") 

# Audit Products Dataset
total_rows, distinct_ids = get_row_counts(products_df, "product_id")
print(f"Total Rows: {total_rows}, Distinct IDs: {distinct_ids}")

display(get_null_counts(products_df))
display(get_duplicate_rows(products_df, "product_id"))

# Clean Products dataset
products_clean_df = products_df.transform(lambda df: drop_duplicates(df, "product_id")) \
                               .transform(lambda df: fill_missing_values(df, {"category" : "Unknown", "brand" : "Unknown"})) \
                               .transform(lambda df:standardize_strings(df, ["category", "brand"]))

# Final audit after cleaning
print("Products dataset after cleaning: ")
display(get_null_counts(products_clean_df))
products_clean_df.printSchema()

Total Rows: 500, Distinct IDs: 500


null_count
0
0
0
0
0
0


product_id,count


Products dataset after cleaning: 


null_count
0
0
0
0
0
0


root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = false)
 |-- brand: string (nullable = false)
 |-- cost_price: double (nullable = true)
 |-- selling_price: double (nullable = true)



In [0]:
# Ingesting Transactions Dataset
transactions_df = spark.table("cip.transactions")

# Audit Transaction Datasets
total_rows, distinct_ids = get_row_counts(transactions_df, "transaction_id")
print(f"Transactions --> Total Rows: {total_rows}, Distinct IDs: {distinct_ids}")

display(get_null_counts(transactions_df))
display(get_duplicate_rows(transactions_df, "transaction_id"))
display(get_date_summary(transactions_df, "transaction_date"))

# Clean Transactions Dataset
transactions_clean_df = transactions_df.transform(lambda df: drop_duplicates(df, "transaction_id")) \
                                       .transform(lambda df: fill_missing_values(df, {"payment_method" : "Unknown",
                                                                                      "product_id" : "Unknown"})) \
                                       .transform(lambda df:standardize_strings(df, ["payment_method"])) \
                                       .transform(lambda df: cast_to_date(df, ["transaction_date"]))

# Final Audit afterr cleaning
print("Transactions dataset after cleaning: ")
display(get_null_counts(transactions_clean_df))
transactions_clean_df.printSchema()

Transactions --> Total Rows: 50000, Distinct IDs: 50000


null_count
0
0
0
0
0
0
0
0


transaction_id,count


summary
count
mean
stddev
min
25%
50%
75%
max


Transactions dataset after cleaning: 


null_count
0
0
0
0
0
0
0
0


root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = false)
 |-- quantity: long (nullable = true)
 |-- price: double (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_method: string (nullable = false)
 |-- transaction_date: date (nullable = true)



In [0]:
# Ingesting Returns Dataset
returns_df = spark.table("cip.returns")

# Audit Returns Dataset
total_rows, distinct_ids = get_row_counts(returns_df, "return_id")
print(f"Returns --> Total Rows: {total_rows}, Distinct IDs: {distinct_ids}")

display(get_null_counts(returns_df))
display(get_duplicate_rows(returns_df, "return_id"))
display(get_date_summary(returns_df, "return_date"))

# Clean Returns Dataset
returns_clean_df = returns_df.transform(lambda df: drop_duplicates(df, "return_id")) \
                              .transform(lambda df: fill_missing_values(df, {
                                  "return_reason" : "Not Specified", 
                                  "product_id" : "Unknown"
                              })) \
                              .transform(lambda df:standardize_strings(df, ["return_reason"])) \
                              .transform(lambda df: cast_to_date(df, ["return_date"]))

# Final Audit after cleaning
print("Returns dataset after cleaning: ")
display(get_null_counts(returns_clean_df))
returns_clean_df.printSchema()

Returns --> Total Rows: 1500, Distinct IDs: 1500


null_count
0
0
0
0
0
0
0


return_id,count


summary
count
mean
stddev
min
25%
50%
75%
max


Returns dataset after cleaning: 


null_count
0
0
0
0
0
0
0


root
 |-- return_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = false)
 |-- amount: double (nullable = true)
 |-- return_reason: string (nullable = false)
 |-- return_date: date (nullable = true)



In [0]:
# Ingesting Support Tickets Dataset
support_df = spark.table("cip.support_tickets")

# Audit Support Tickets dataset
total_rows, distinct_ids = get_row_counts(support_df, "ticket_id")
print(f"Support Tickets → Total Rows: {total_rows}, Distinct IDs: {distinct_ids}")

display(get_null_counts(support_df))
display(get_duplicate_rows(support_df, "ticket_id"))
display(get_date_summary(support_df, "opened_date"))
display(get_date_summary(support_df, "resolved_date"))

# Clean Support Tickets dataset
support_clean_df = support_df.transform(lambda df: drop_duplicates(df, "ticket_id")) \
                             .transform(lambda df: fill_missing_values(df, {
                                 "priority": "Medium",
                                 "status": "Open",
                                 "ticket_category": "General",
                                 "customer_id": "Unknown"
                             })) \
                             .transform(lambda df: standardize_strings(df, ["priority", "status", "ticket_category"])) \
                             .transform(lambda df: cast_to_date(df, ["opened_date", "resolved_date"]))

# Final audit after cleaning
print("Support Tickets dataset after cleaning:")
display(get_null_counts(support_clean_df))
support_clean_df.printSchema()

Support Tickets → Total Rows: 10000, Distinct IDs: 10000


null_count
5019
0
0
0
0
0
0


ticket_id,count


summary
count
mean
stddev
min
25%
50%
75%
max


summary
count
mean
stddev
min
25%
50%
75%
max


Support Tickets dataset after cleaning:


null_count
5019
0
0
0
0
0
0


root
 |-- ticket_id: string (nullable = true)
 |-- customer_id: string (nullable = false)
 |-- ticket_category: string (nullable = false)
 |-- priority: string (nullable = false)
 |-- status: string (nullable = false)
 |-- opened_date: date (nullable = true)
 |-- resolved_date: date (nullable = true)



In [0]:
# Ingesting Web Activity Dataset Tickets Dataset
web_df = spark.table("cip.web_activity")

# Audit Web Activity dataset
total_rows, distinct_ids = get_row_counts(web_df, "session_id")
print(f"Web Activity → Total Rows: {total_rows}, Distinct IDs: {distinct_ids}")

display(get_null_counts(web_df))
display(get_duplicate_rows(web_df, "session_id"))
display(get_date_summary(web_df, "timestamp"))

# Clean Web Activity dataset
web_clean_df = web_df.transform(lambda df: drop_duplicates(df, "session_id")) \
                     .transform(lambda df: fill_missing_values(df, {
                         "activity_type": "Unknown",
                         "product_id": "Unknown"
                     })) \
                     .transform(lambda df: standardize_strings(df, ["activity_type"])) \
                     .transform(lambda df: cast_to_date(df, ["timestamp"]))

# Final audit after cleaning
print("Web Activity dataset after cleaning:")
display(get_null_counts(web_clean_df))
web_clean_df.printSchema()


Web Activity → Total Rows: 30000, Distinct IDs: 30000


null_count
9022
0
0
0
0


session_id,count


summary
count
mean
stddev
min
25%
50%
75%
max


Web Activity dataset after cleaning:


null_count
0
0
0
0
0


root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- activity_type: string (nullable = false)
 |-- product_id: string (nullable = false)
 |-- timestamp: date (nullable = true)



In [0]:
# null count with column names preserved
null_counts_named = web_df.select(
    [F.count(F.when(F.col(c).isNull(), 1)).alias(c) for c in web_df.columns]
)
display(null_counts_named)


session_id,customer_id,activity_type,product_id,timestamp
0,0,0,9022,0


In [0]:
# Handling Null in Web_df
web_clean_df = web_df.transform(lambda df: drop_duplicates(df, "session_id")) \
                     .transform(lambda df: fill_missing_values(df, {
                         "product_id": "Unknown_Product"
                     })) \
                     .transform(lambda df: standardize_strings(df, ["activity_type"])) \
                     .transform(lambda df: cast_to_date(df, ["timestamp"]))

# Final audit after cleaning
print("Web Activity dataset after cleaning:")
display(get_null_counts(web_clean_df))
web_clean_df.printSchema()

Web Activity dataset after cleaning:


null_count
0
0
0
0
0


root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- activity_type: string (nullable = true)
 |-- product_id: string (nullable = false)
 |-- timestamp: date (nullable = true)



In [0]:
# Saving all the cleaned files 
customers_clean_df.toPandas().to_csv("customers_clean.csv", index=False)
products_clean_df.toPandas().to_csv("products_clean.csv", index=False)
transactions_clean_df.toPandas().to_csv("transactions_clean.csv", index=False)
returns_clean_df.toPandas().to_csv("returns_clean.csv", index=False)
support_clean_df.toPandas().to_csv("support_clean.csv", index=False)
web_clean_df.toPandas().to_csv("web_clean.csv", index=False)
