---

# <span style="color: orange;"> 1. Setup Working Environment and constants </span>

In [1]:
!pwd

/home/jovyan


In [2]:
!ls data/

CARS.csv  CUSTOMERS.csv  HOUSEHOLDS.csv


In [3]:
# Declare constant paths
DATA_DIR        =   "/home/jovyan/data"
CARS_CSV        =   f"{DATA_DIR}/CARS.csv"
CUSTOMERS_CSV   =   f"{DATA_DIR}/CUSTOMERS.csv"
HOUSEHOLDS_CSV  =   f"{DATA_DIR}/HOUSEHOLDS.csv"
OUTPUT_DIR        =   "/home/jovyan/output"

In [4]:
# Initialize Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .appName("SF-Data-Engineering-Work-Sample") \
                    .master("local[*]") \
                    .config("spark.driver.memory", "4g") \
                    .config("spark.executor.memory", "8g") \
                    .getOrCreate()
spark

---

# <span style="color: orange;"> 2. Load all the Datasets </span>

In [5]:
# Load the CARS_CSV file into a Spark DataFrame
cars_df = spark.read.csv(CARS_CSV, header=True, inferSchema=True)

print('-'*30)
print('FIRST 5 Records from CARS:')
print('-'*30, '\n')
cars_df.show(5)

print('\n')
print('-'*30)
print('Schema for CARS:')
print('-'*30, '\n')
cars_df.printSchema()

------------------------------
FIRST 5 Records from CARS:
------------------------------ 

+------+--------+-----+----------+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+----------------------+
|Car ID|  Status|State|Model Year|         Make|Body Style|Vehicle Value|Annual Miles Driven|Business Use|Antique Vehicle|Lien|Lease|Driver Safety Discount|Vehicle Safety Discount|Claim Payout|6 Month Premium Amount|
+------+--------+-----+----------+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+----------------------+
|844435|In Force|   OK|      1990|Manufacturer7|    4 door|      50000.0|                 56|           0|              1|   1|    0|                     0|                      1|           0|                 42.89|
|410619|In Force|   OK|      2019|Manufac

In [6]:
# Load the CUSTOMERS_CSV file into a Spark DataFrame
customers_df = spark.read.csv(CUSTOMERS_CSV, header=True, inferSchema=True)
print('-'*30)
print('FIRST 5 Records from CUSTOMERS:')
print('-'*30, '\n')
customers_df.show(5)

print('\n')
print('-'*30)
print('Schema for CUSTOMERS:')
print('-'*30, '\n')
customers_df.printSchema()

------------------------------
FIRST 5 Records from CUSTOMERS:
------------------------------ 

+---------+-------------+--------------+--------------------+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  CUST_ID|Date of Birth|Marital Status|     Employment Type|Income| _c5| _c6| _c7| _c8| _c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|
+---------+-------------+--------------+--------------------+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|801198110|      11/6/02|             D|             Student|     0|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|281855167|      6/23/55|             D|             Retired|     0|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|688373183|      8/16/96|             M|Office and Admini...| 42052|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|752746800|      8/22/36|             M|

In [7]:
# Load the HOUSEHOLDS_CSV file into a Spark DataFrame
households_df = spark.read.csv(HOUSEHOLDS_CSV, header=True, inferSchema=True)
print('-'*35)
print('FIRST 5 Records from HOUSEHOLDS:')
print('-'*35, '\n')
households_df.show(5)

print('\n')
print('-'*35)
print('Schema for HOUSEHOLDS:')
print('-'*35, '\n')
households_df.printSchema()

-----------------------------------
FIRST 5 Records from HOUSEHOLDS:
----------------------------------- 

+---------+---------+------+---------+-------------+--------------+-----+-----+-------+---------------+
|    HH_ID|  CUST_ID|CAR_ID|Active HH|HH Start Date|  Phone Number| ZIP |State|Country|Referral Source|
+---------+---------+------+---------+-------------+--------------+-----+-----+-------+---------------+
|219790301|801198110|844435|        1|     11/18/22|(709) 379-9036|70442|   OK|    USA|          Other|
|219790301|281855167|410619|        1|     11/18/22|(740) 565-4060|70442|   OK|    USA|          Other|
|219790301|688373183|192812|        1|     11/18/22|(117) 457-9582|70442|   OK|    USA|          Other|
|219790301|752746800|752033|        1|     11/18/22|(536) 797-5920|70442|   OK|    USA|          Other|
|464806390|114187354| 23783|        1|      10/9/20|(152) 373-1773|42706|   NY|    USA|          Event|
+---------+---------+------+---------+-------------+---------

---

# <span style="color: orange;"> 3. Initial Data Cleaning </span>

> ***The column names contain spaces and are inconsistent across the datasets, not adhering to best practices. Fixing them below:***

In [8]:
# Renaming columns in CARS DataFrame following best practices
cars_df   =  cars_df.withColumnRenamed('Car ID', 'car_id') \
                    .withColumnRenamed('Status', 'status') \
                    .withColumnRenamed('State', 'state') \
                    .withColumnRenamed('Model Year', 'model_year') \
                    .withColumnRenamed('Make', 'make') \
                    .withColumnRenamed('Body Style', 'body_style') \
                    .withColumnRenamed('Vehicle Value', 'vehicle_value') \
                    .withColumnRenamed('Annual Miles Driven', 'annual_miles_driven') \
                    .withColumnRenamed('Business Use', 'business_use') \
                    .withColumnRenamed('Antique Vehicle', 'antique_vehicle') \
                    .withColumnRenamed('Lien', 'lien') \
                    .withColumnRenamed('Lease', 'lease') \
                    .withColumnRenamed('Driver Safety Discount', 'driver_safety_discount') \
                    .withColumnRenamed('Vehicle Safety Discount', 'vehicle_safety_discount') \
                    .withColumnRenamed('Claim Payout', 'claim_payout') \
                    .withColumnRenamed('6 Month Premium Amount', 'six_month_premium_amount')

print('-'*35)
print(f"Updated CARS Schema:")
print('-'*35, '\n')

cars_df.printSchema()

-----------------------------------
Updated CARS Schema:
----------------------------------- 

root
 |-- car_id: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- state: string (nullable = true)
 |-- model_year: integer (nullable = true)
 |-- make: string (nullable = true)
 |-- body_style: string (nullable = true)
 |-- vehicle_value: double (nullable = true)
 |-- annual_miles_driven: integer (nullable = true)
 |-- business_use: integer (nullable = true)
 |-- antique_vehicle: integer (nullable = true)
 |-- lien: integer (nullable = true)
 |-- lease: integer (nullable = true)
 |-- driver_safety_discount: integer (nullable = true)
 |-- vehicle_safety_discount: integer (nullable = true)
 |-- claim_payout: integer (nullable = true)
 |-- six_month_premium_amount: double (nullable = true)



In [9]:
customers_df = customers_df.withColumnRenamed('CUST_ID', 'cust_id') \
                            .withColumnRenamed('Date of Birth', 'date_of_birth') \
                            .withColumnRenamed('Marital Status', 'marital_status') \
                            .withColumnRenamed('Employment Type', 'employment_type') \
                            .withColumnRenamed('Income', 'income')

print('-'*35)
print(f"Updated CUSTOMERS Schema:")
print('-'*35, '\n')

customers_df.printSchema()

-----------------------------------
Updated CUSTOMERS Schema:
----------------------------------- 

root
 |-- cust_id: integer (nullable = true)
 |-- date_of_birth: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- income: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)



In [10]:
# Renaming columns in HOUSEHOLDS DataFrame following best practices
households_df = households_df.withColumnRenamed('HH_ID', 'hh_id') \
                                .withColumnRenamed('CUST_ID', 'cust_id') \
                                .withColumnRenamed('CAR_ID', 'car_id') \
                                .withColumnRenamed('Active HH', 'active_hh') \
                                .withColumnRenamed('HH Start Date', 'hh_start_date') \
                                .withColumnRenamed('Phone Number', 'phone_number') \
                                .withColumnRenamed('ZIP ', 'zip') \
                                .withColumnRenamed('State', 'state') \
                                .withColumnRenamed('Country', 'country') \
                                .withColumnRenamed('Referral Source', 'referral_source')

print('-'*35)
print(f"Updated HOUSEHOLDS Schema:")
print('-'*35, '\n')

households_df.printSchema()

-----------------------------------
Updated HOUSEHOLDS Schema:
----------------------------------- 

root
 |-- hh_id: integer (nullable = true)
 |-- cust_id: integer (nullable = true)
 |-- car_id: integer (nullable = true)
 |-- active_hh: integer (nullable = true)
 |-- hh_start_date: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- referral_source: string (nullable = true)



***<u style="color: green;">OBSERVATION:</u> &nbsp;***
- ***From the sample records in Section 2, the CUSTOMERS DataFrame seems to have unnecessary (blank) columns. Checking if they can be dropped to reduce data size before combining the datasets:***

In [11]:
from pyspark.sql.functions import col, sum as spark_sum

cust_columns_to_check = customers_df.columns
cust_null_count_expressions = [spark_sum(col(column).isNull().cast("int")).alias(column) for column in cust_columns_to_check]
cust_null_counts_df = customers_df.select(cust_null_count_expressions)

print('-'*35)
print("Null counts for each column:")
print('-'*35, '\n')

cust_null_counts_df.show()

print('-'*35)
print("Total rows in CUSTOMERS DF:")
print('-'*35, '\n')

total_cust_rows = customers_df.count()
print(total_cust_rows)

-----------------------------------
Null counts for each column:
----------------------------------- 

+-------+-------------+--------------+---------------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+
|cust_id|date_of_birth|marital_status|employment_type|income|   _c5|   _c6|   _c7|   _c8|   _c9|  _c10|  _c11|  _c12|  _c13|  _c14|  _c15|  _c16|  _c17|  _c18|  _c19|
+-------+-------------+--------------+---------------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+
|      0|            0|             0|              0|     0|499999|499999|499999|499999|499999|499999|499999|499999|499999|499999|499999|499999|499999|499999|499999|
+-------+-------------+--------------+---------------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+

-----------------------------------
Total rows in CUSTOMERS D

***<u style="color: green;">OBSERVATION:</u> &nbsp;***
- ***All the records have null values in columns _c5 to _c19, so they can be dropped safely.***


In [12]:
columns_to_drop = [f'_c{i}' for i in range(5, 20)]     # all the columns from _c5 to _c19 only have NULL values.
customers_df = customers_df.drop(*columns_to_drop)
customers_df.printSchema()

root
 |-- cust_id: integer (nullable = true)
 |-- date_of_birth: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- income: string (nullable = true)



***<u style="color: green;">OBSERVATION:</u> &nbsp;***
- ***Now, all the columns in the CARS, CUSTOMERS, and HOUSEHOLDS DataFrames have some values.***
  
- ***We can handle any more NULL values after combining the DataFrames to avoid premature dropping of records or missing value imputation.***

> ***Here is an overview of the number of NULL values per column in each DataFrame:***

In [13]:
from pyspark.sql.functions import col, sum as spark_sum

def calculate_null_counts(df, df_name):
    print("#"*20)
    print(f"{df_name} DataFrame:")
    print("#"*20, "\n")

    null_count_expressions = [spark_sum(col(column).isNull().cast("int")).alias(column) for column in df.columns]
    null_counts_df = df.select(null_count_expressions)
    total_rows = df.count()
    
    print('-'*35)
    print(f"Null counts for {df_name} DataFrame:")
    print('-'*35, '\n')
    null_counts_df.show()

    print('-'*35)
    print(f"Total rows in {df_name} DataFrame:")
    print('-'*35, '\n')
    print(total_rows)

    print("\n\n")

# Calculate null counts for CARS DataFrame
calculate_null_counts(cars_df, "CARS")

# Calculate null counts for CUSTOMERS DataFrame
calculate_null_counts(customers_df, "CUSTOMERS")

# Calculate null counts for HOUSEHOLDS DataFrame
calculate_null_counts(households_df, "HOUSEHOLDS")

####################
CARS DataFrame:
#################### 

-----------------------------------
Null counts for CARS DataFrame:
----------------------------------- 

+------+------+-----+----------+----+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+------------------------+
|car_id|status|state|model_year|make|body_style|vehicle_value|annual_miles_driven|business_use|antique_vehicle|lien|lease|driver_safety_discount|vehicle_safety_discount|claim_payout|six_month_premium_amount|
+------+------+-----+----------+----+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+------------------------+
|     0|     0|    0|         0|   0|         0|            0|                  0|           0|              0|   0|    0|                     0|                      0|           0|                       0|
+-

***<u style="color: green;">OBSERVATION:</u> &nbsp;***
- ***There aren't any NULL values in the datasets now.***

In [14]:
from pyspark.sql.functions import col, sum as spark_sum

def show_sample_records(df, df_name):
    print("#"*40)
    print(f"Sample records from {df_name} DataFrame:")
    print("#"*40, "\n")
    df.show(5)
    print("\n")

show_sample_records(cars_df, "CARS")
show_sample_records(customers_df, "CUSTOMERS")
show_sample_records(households_df, "HOUSEHOLDS")

########################################
Sample records from CARS DataFrame:
######################################## 

+------+--------+-----+----------+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+------------------------+
|car_id|  status|state|model_year|         make|body_style|vehicle_value|annual_miles_driven|business_use|antique_vehicle|lien|lease|driver_safety_discount|vehicle_safety_discount|claim_payout|six_month_premium_amount|
+------+--------+-----+----------+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+------------------------+
|844435|In Force|   OK|      1990|Manufacturer7|    4 door|      50000.0|                 56|           0|              1|   1|    0|                     0|                      1|           0|                   42.89|
|410

> ***Checking for duplicate records before merging the DataFrames to avoid unintended joins.***

In [15]:
def check_for_full_duplicates(df, df_name):
    total_count = df.count()
    unique_count = df.dropDuplicates().count()
    duplicate_count = total_count - unique_count
    if duplicate_count > 0:
        print(f"{df_name} DataFrame has has {duplicate_count} fully duplicated rows.")
    else:
        print(f"No fully duplicated rows found in {df_name} DataFrame.")

check_for_full_duplicates(cars_df, "CARS")
check_for_full_duplicates(customers_df, "CUSTOMERS")
check_for_full_duplicates(households_df, "HOUSEHOLDS")

No fully duplicated rows found in CARS DataFrame.
No fully duplicated rows found in CUSTOMERS DataFrame.
No fully duplicated rows found in HOUSEHOLDS DataFrame.


In [16]:
def check_duplicates(df, primary_key, df_name):
    duplicate_count = df.groupBy(primary_key).count().filter("count > 1").count()
    if duplicate_count > 0:
        print(f"{df_name} DataFrame has {duplicate_count} duplicate records based on {primary_key}.")
    else:
        print(f"No duplicate records found in {df_name}  DataFrame based on {primary_key}.")

check_duplicates(cars_df, "car_id", "CARS")
check_duplicates(customers_df, "cust_id", "CUSTOMERS")
check_duplicates(households_df, "hh_id", "HOUSEHOLDS")

CARS DataFrame has 95051 duplicate records based on car_id.
CUSTOMERS DataFrame has 149 duplicate records based on cust_id.
HOUSEHOLDS DataFrame has 131352 duplicate records based on hh_id.


***<u style="color: green;">OBSERVATION:</u> &nbsp;***
- ***Approximately 20% of records in CARS, 0.03% of records in CUSTOMERS, and 26% of records in HOUSEHOLDS have duplicate IDs, which is not ideal.***

- ***Inspect the duplicate records to take a strategic approach in handling them without losing valuable information.***


In [17]:
from pyspark.sql.functions import col

# Function to inspect duplicate records based on 'id' columns
def inspect_duplicates(df, primary_key, df_name):
    duplicate_keys = df.groupBy(primary_key).count().filter("count > 1").select(primary_key).limit(5)  # Limiting to 5 for inspection
    duplicates = df.join(duplicate_keys, on=primary_key, how='inner').orderBy(primary_key)

    print(f"Duplicate records in {df_name} DataFrame based on {primary_key}:")
    duplicates.show(truncate=False)

# Inspect duplicates in CARS DataFrame
inspect_duplicates(cars_df, "car_id", "CARS")

Duplicate records in CARS DataFrame based on car_id:
+------+---------------------+-----+----------+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+------------------------+
|car_id|status               |state|model_year|make         |body_style|vehicle_value|annual_miles_driven|business_use|antique_vehicle|lien|lease|driver_safety_discount|vehicle_safety_discount|claim_payout|six_month_premium_amount|
+------+---------------------+-----+----------+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+------------------------+
|7982  |Customer Cancellation|HI   |2018      |Manufacturer7|2 door    |27416.9      |5898               |0           |0              |1   |0    |0                     |1                      |0           |156.8435672             |
|7982  |Customer Ca

In [18]:
distinct_status_values = cars_df.select("status").distinct()
distinct_status_values.show(truncate=False)

+---------------------+
|status               |
+---------------------+
|In Force             |
|Company Cancellation |
|Non Pay              |
|Customer Cancellation|
+---------------------+



In [19]:
# Define a function to check and print duplicate count based on a combination of columns
def check_combination_duplicates(df, columns, combination_name):
    duplicate_count = df.groupBy(columns).count().filter("count > 1").count()
    print(f"Duplicate count for {combination_name}: {duplicate_count}")

# Checking duplicates for different combinations of columns
check_combination_duplicates(cars_df, ["car_id", "status"], "car_id + status")
check_combination_duplicates(cars_df, ["car_id", "state"], "car_id + state")
check_combination_duplicates(cars_df, ["car_id", "model_year"], "car_id + model_year")
check_combination_duplicates(cars_df, ["car_id", "make"], "car_id + make")


Duplicate count for car_id + status: 78948
Duplicate count for car_id + state: 0
Duplicate count for car_id + model_year: 4252
Duplicate count for car_id + make: 24730


***<u style="color: green;">OBSERVATION:</u> &nbsp;***

 ***The above stats indicate that (car_id, state) form a unique combination in the CARS dataset, meaning that car_ids may have been assigned uniquely for each state instead of globally.***

> ***DQ Checks for CARS DataFrame:***

In [20]:
from pyspark.sql import functions as F
from pyspark.sql.functions import year, current_date

# Check if state is valid
valid_states = ["AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", 
                "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
                "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", 
                "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
                "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"]

invalid_states_df = cars_df.filter(~col("state").isin(valid_states))
invalid_state_count = invalid_states_df.count()
print(f"Count of invalid state records: {invalid_state_count}")
invalid_states_df.show(5)

# Model year not in a valid range
model_year_outliers = cars_df.filter((F.col("model_year") < 1900) | (F.col("model_year") > 2024)).count()
print(f"Records with invalid model_year: {model_year_outliers}")

# Check if 'Make' values are valid
distinct_make_values = cars_df.select("make").distinct()
distinct_make_values.show(truncate=False)

# Check if 'body_style' values are valid
distinct_body_style_values = cars_df.select("body_style").distinct()
distinct_body_style_values.show(truncate=False)

# Check if Vehicle value is negative
vehicle_value_outliers = cars_df.filter(F.col("vehicle_value") < 0).count()
print(f"Records with negative vehicle_value: {vehicle_value_outliers}")

# Check if Annual miles driven is negative
miles_driven_outliers = cars_df.filter(F.col("annual_miles_driven") < 0).count()
print(f"Records with negative annual_miles_driven: {miles_driven_outliers}")


# Check if Business Use is non-binary:
invalid_business_use = cars_df.filter((F.col("business_use") != 0) & (F.col("business_use") != 1)).count()
print(f"Records with invalid business_use values: {invalid_business_use}")

# Antique vehicle should be older than 25 years
invalid_antique_vehicle = cars_df.filter((F.col("antique_vehicle") == 1) & (F.col("model_year") >= (year(current_date()) - 25))).count()
print(f"Records with invalid antique_vehicle: {invalid_antique_vehicle}")

# Check if both lien and lease are 1
invalid_lien_lease = cars_df.filter((F.col("lease") == 1) & (F.col("lien") == 1) ).count()
print(f"Records where lien and lease are 1: {invalid_lien_lease}")

# Check for discrepancy in Discount Logic (discounts should be 0 if status="Customer Cancelled")
invalid_discount_status = cars_df.filter((F.col("status") == "Customer Cancellation") & ((F.col("driver_safety_discount") == 1) | (F.col("vehicle_safety_discount") == 1))).count()
print(f"Records with invalid discount for canceled vehicles: {invalid_discount_status}")

# Check if six_month_premium_amount is negative
amount_outliers = cars_df.filter(F.col("six_month_premium_amount") < 0).count()
print(f"Records with negative annual_miles_driven: {amount_outliers}")

Count of invalid state records: 0
+------+------+-----+----------+----+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+------------------------+
|car_id|status|state|model_year|make|body_style|vehicle_value|annual_miles_driven|business_use|antique_vehicle|lien|lease|driver_safety_discount|vehicle_safety_discount|claim_payout|six_month_premium_amount|
+------+------+-----+----------+----+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+------------------------+
+------+------+-----+----------+----+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+------------------------+

Records with invalid model_year: 0
+-------------+
|make         |
+-------------+
|Manufacturer5|
|Manufacturer7|
|Manufacturer1|
|M

In [21]:
# Inspect duplicates in CUSTOMERS DataFrame
inspect_duplicates(customers_df, "cust_id", "CUSTOMERS")

Duplicate records in CUSTOMERS DataFrame based on cust_id:
+---------+-------------+--------------+---------------------------------+------+
|cust_id  |date_of_birth|marital_status|employment_type                  |income|
+---------+-------------+--------------+---------------------------------+------+
|227538866|2/18/49      |S             |Retired                          |0     |
|227538866|4/7/92       |M             |Information Technology           |94119 |
|312991279|1/26/51      |S             |Retired                          |0     |
|312991279|5/6/50       |M             |Retired                          |0     |
|461580480|3/26/99      |M             |Sales and Related                |44900 |
|461580480|9/23/91      |M             |Education, Training, and Library |51942 |
|471247910|2/28/08      |S             |Healthcare Practitioner          |203297|
|471247910|6/21/94      |S             |Management                       |146625|
|602067883|2/20/74      |M             

In [22]:
# Check number of cust_ids having count > 1
from pyspark.sql import functions as F

cust_id_counts = customers_df.groupBy("cust_id").agg(F.count("*").alias("count_of_records"))
record_count_summary = cust_id_counts.groupBy("count_of_records").agg(F.count("*").alias("number_of_cust_ids"))
record_count_summary.orderBy("count_of_records").show(truncate=False)

+----------------+------------------+
|count_of_records|number_of_cust_ids|
+----------------+------------------+
|1               |499701            |
|2               |149               |
+----------------+------------------+



In [23]:
# Inspect the age difference between records having same cust_ids to gauge possible relationships
age_diff_df = (
    customers_df
    .groupBy("cust_id")
    .agg(
        F.collect_list("date_of_birth").alias("dob_list")
    )
    .filter(F.size("dob_list") == 2)  # To only consider the 149 cust_ids with exactly 2 DOBs
)

# Calculate the approximate age difference in years
age_diff_result = age_diff_df.select(
    "cust_id",
    F.abs(F.year(F.to_date(F.col("dob_list")[0], 'M/d/yy')) - F.year(F.to_date(F.col("dob_list")[1], 'M/d/yy'))).alias("age_diff")
)

age_diff_result.show(truncate=False)

+---------+--------+
|cust_id  |age_diff|
+---------+--------+
|109866437|0       |
|168700118|26      |
|227538866|43      |
|312991279|1       |
|420542258|22      |
|440410970|39      |
|461580480|8       |
|471247910|86      |
|519811680|28      |
|536960589|25      |
|547773814|81      |
|574537471|60      |
|602067883|10      |
|693618114|85      |
|738641435|52      |
|764819779|44      |
|770469307|42      |
|810195055|22      |
|932153522|12      |
|949920394|65      |
+---------+--------+
only showing top 20 rows



***<u style="color: green;">OBSERVATION:</u> &nbsp;***
- **The above results indicate that 0.06% of the cust_ids are duplicates, with a count of 2.**

- **Given the age gaps (in years) as seen above, the duplicates could represent family members (e.g., spouses, siblings, children, or grandchildren) added to the same insurance account.**

- **These individuals are ideally living in the same household as the primary insured.**


In [24]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

window_spec = Window.partitionBy("cust_id").orderBy("date_of_birth")

ranked_customers_df = (customers_df.withColumn("row_number", F.row_number().over(window_spec)))

new_cust_df = ranked_customers_df.filter(F.col("row_number") == 1).drop("row_number")

new_cust_df.show(truncate=False)

+---------+-------------+--------------+---------------------------------+------+
|cust_id  |date_of_birth|marital_status|employment_type                  |income|
+---------+-------------+--------------+---------------------------------+------+
|100000879|4/5/34       |M             |Retired                          |0     |
|100031685|10/3/01      |S             |Management                       |105253|
|100043441|5/2/97       |M             |Student                          |0     |
|100052059|4/18/80      |M             |Business and Financial Operations|95633 |
|100090208|12/12/03     |S             |Student                          |0     |
|100150473|4/9/31       |M             |Retired                          |0     |
|100151275|5/6/97       |W             |Information Technology           |98536 |
|100152292|9/18/00      |M             |Information Technology           |138003|
|100154073|1/24/43      |M             |Retired                          |0     |
|100175646|4/17/

In [25]:
new_cust_df.count()

499850

In [26]:
# Inspect duplicates in the new DataFrame
inspect_duplicates(new_cust_df, "cust_id", "CUSTOMERS")

Duplicate records in CUSTOMERS DataFrame based on cust_id:
+-------+-------------+--------------+---------------+------+
|cust_id|date_of_birth|marital_status|employment_type|income|
+-------+-------------+--------------+---------------+------+
+-------+-------------+--------------+---------------+------+



> ***DQ Checks for CUSTOMERS DataFrame:***

In [27]:
from pyspark.sql.functions import col, regexp_extract, when, isnan, length, count, sum as spark_sum, to_date, regexp_extract

# Check for invalid date format or invalid date
invalid_dob_count = new_cust_df.filter(
                        # Either the format is incorrect
                        (regexp_extract(col("date_of_birth"), r"^\d{1,2}/\d{1,2}/\d{2}$", 0) == "") | 
                        # Or the date is not valid
                        (to_date(col("date_of_birth"), "M/d/yy").isNull())
                    ).count()
print(f"Invalid date_of_birth count: {invalid_dob_count}")

# Check for valid marital_status values (S: Single, M: Married, D: Divorced, W: Widowed)
valid_marital_status_values = ["S", "M", "D", "W"]
invalid_marital_status_count = new_cust_df.filter(~col("marital_status").isin(valid_marital_status_values)).count()
print(f"Invalid marital_status count: {invalid_marital_status_count}")

# Check for valid employment_type (should not be null or empty)
unique_employment_types = new_cust_df.select("employment_type").distinct()
print("Unique Employment types from CUSTOMERS DataFrame are:")
unique_employment_types.show(truncate=False)

invalid_employment_type_df = new_cust_df.filter((col("employment_type").isNull()) | (length(col("employment_type")) == 0))
invalid_employment_type_count = invalid_employment_type_df.count()
print(f"Invalid employment_type count: {invalid_employment_type_count}")

# Check for invalid income
invalid_income_df = new_cust_df.filter(
    col("income").cast("float").isNull() | (col("income").cast("float") < 0)
)
invalid_income_count = invalid_income_df.count()
print(f"Invalid income count: {invalid_income_count}")
invalid_income_df.show(n=5, truncate=False)

Invalid date_of_birth count: 0
Invalid marital_status count: 0
Unique Employment types from CUSTOMERS DataFrame are:
+---------------------------------+
|employment_type                  |
+---------------------------------+
|Healthcare Practitioner          |
|#REF!                            |
|Student                          |
|Management                       |
|Information Technology           |
|Education, Training, and Library |
|Office and Administrative Support|
|Business and Financial Operations|
|Sales and Related                |
|Retired                          |
+---------------------------------+

Invalid employment_type count: 0
Invalid income count: 5464
+---------+-------------+--------------+---------------+------+
|cust_id  |date_of_birth|marital_status|employment_type|income|
+---------+-------------+--------------+---------------+------+
|100578501|10/24/88     |D             |#REF!          |#REF! |
|101470240|1/22/88      |M             |#REF!          |#REF! 

***<u style="color: green;">OBSERVATION:</u> &nbsp;***

- **The employment_type and income columns contain a few undesirable values (#REF!).**

- **Replace #REF! with NULL instead of dropping the records and potentially losing valuable information.**


In [28]:
# Replace "#REF!" with NULL in both employment_type and income columns
from pyspark.sql.functions import when, col

cleaned_cust_df = new_cust_df.withColumn(
                                "employment_type", when(col("employment_type") == "#REF!", None).otherwise(col("employment_type"))
                            ).withColumn(
                                "income", when(col("income") == "#REF!", None).otherwise(col("income"))
                            )

null_counts = cleaned_cust_df.select(
                                count(when(col("employment_type").isNull() | isnan(col("employment_type")), True)).alias("null_employment_type_count"),
                                count(when(col("income").isNull() | isnan(col("income")), True)).alias("null_income_count")
                            )
null_counts.show()

+--------------------------+-----------------+
|null_employment_type_count|null_income_count|
+--------------------------+-----------------+
|                      5464|             5464|
+--------------------------+-----------------+



In [29]:
# Inspect duplicates in HOUSEHOLDS DataFrame
inspect_duplicates(households_df, "hh_id", "HOUSEHOLDS")

Duplicate records in HOUSEHOLDS DataFrame based on hh_id:
+---------+---------+------+---------+-------------+--------------+-----+-----+-------+---------------+
|hh_id    |cust_id  |car_id|active_hh|hh_start_date|phone_number  |zip  |state|country|referral_source|
+---------+---------+------+---------+-------------+--------------+-----+-----+-------+---------------+
|219883100|276802922|15066 |1        |4/21/14      |(925) 972-9067|42362|TN   |USA    |Event          |
|219883100|857918981|678149|1        |4/21/14      |(551) 512-6795|42362|TN   |USA    |Event          |
|219883100|677506232|598410|1        |4/21/14      |(477) 286-9633|42362|TN   |USA    |Event          |
|219883100|793331621|326708|1        |4/21/14      |(544) 387-2179|42362|TN   |USA    |Event          |
|219883100|399267806|94351 |1        |4/21/14      |(186) 201-9229|42362|TN   |USA    |Event          |
|394554865|797030169|3026  |0        |8/9/01       |(463) 615-3414|58861|OH   |USA    |Friend         |
|39455

In [30]:
# Check and print duplicate count based on a combination of columns
def check_combination_duplicates(df, columns, combination_name):
    duplicate_count = df.groupBy(columns).count().filter("count > 1").count()
    print(f"Duplicate count for {combination_name}: {duplicate_count}")

# Checking duplicates for different combinations of columns
check_combination_duplicates(households_df, ["hh_id"], "hh_id")
check_combination_duplicates(households_df, ["hh_id", "cust_id"], "hh_id + cust_id")
check_combination_duplicates(households_df, ["hh_id", "car_id"], "hh_id + car_id")
check_combination_duplicates(households_df, ["hh_id", "cust_id", "car_id"], "hh_id + cust_id + car_id")

Duplicate count for hh_id: 131352
Duplicate count for hh_id + cust_id: 0
Duplicate count for hh_id + car_id: 0
Duplicate count for hh_id + cust_id + car_id: 0


***<u style="color: green;">OBSERVATION:</u> &nbsp;***

- **The above results indicate that 26% of the hh_ids are duplicates, as each household may have more than one car and insurance policy.**

- **A combination of hh_id and cust_id could be used as the primary key.**

> ***DQ Checks for HOUSEHOLDS DataFrame:***

In [31]:
from pyspark.sql.functions import date_format

# cust_id - Null check
cust_id_null_count = households_df.filter(col("cust_id").isNull()).count()
invalid_cust_id_df = households_df.join(customers_df, on="cust_id", how="left_anti")
print("Invalid cust_id:")
invalid_cust_id_df.show()
invalid_cust_id_count = invalid_cust_id_df.count()

# car_id - Null check
car_id_null_count = households_df.filter(col("car_id").isNull()).count()
invalid_car_id_count = households_df.join(cars_df, on="car_id", how="left_anti").count()

# Check for invalid active_hh values
invalid_active_hh_count = households_df.filter(~col("active_hh").isin([0, 1])).count()

# Check for invalid date format or invalid date
invalid_hh_start_date_df = households_df.filter(
    (regexp_extract(col("hh_start_date"), r"^\d{1,2}/\d{1,2}/\d{2}$", 0) == "") |
    (to_date(col("hh_start_date"), "M/d/yy").isNull()) |
    (to_date(col("hh_start_date"), "M/d/yy") > date_format(current_date(), "M/d/yy"))
)
print("Sample Invalid hh_start_dates:")
invalid_hh_start_date_df.show(5)
invalid_hh_start_date_count = invalid_hh_start_date_df.count()

# Check for invalid phone number format
invalid_phone_number_count = households_df.filter(~col("phone_number").rlike(r"^\(\d{3}\) \d{3}-\d{4}$")).count()

# Check for invalid zip code format
invalid_zip_count = households_df.filter(~col("zip").rlike(r"^\d{5}$")).count()

# List of valid U.S. states
valid_states = ["AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", 
                "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
                "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", 
                "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
                "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"]

# Check for invalid state codes
invalid_state_count = households_df.filter(~col("state").isin(valid_states)).count()

# Check for invalid country value (assuming only 'USA' is valid)
invalid_country_count = households_df.filter(col("country") != "USA").count()

# Check for null values in referral_source
referral_source_null_count = households_df.filter(col("referral_source").isNull()).count()
distinct_referral_source_values = households_df.select("referral_source").distinct()
distinct_referral_source_values.show(truncate=False)


print(f"Null cust_id count: {cust_id_null_count}")
print(f"Invalid cust_id count: {invalid_cust_id_count}")
print(f"Null car_id count: {car_id_null_count}")
print(f"Invalid car_id count: {invalid_car_id_count}")
print(f"Invalid active_hh count: {invalid_active_hh_count}")
print(f"Invalid hh_start_date count: {invalid_hh_start_date_count}")
print(f"Invalid phone number count: {invalid_phone_number_count}")
print(f"Invalid zip count: {invalid_zip_count}")
print(f"Invalid state count: {invalid_state_count}")
print(f"Invalid country count: {invalid_country_count}")
print(f"Null referral_source count: {referral_source_null_count}")

Invalid cust_id:
+---------+---------+------+---------+-------------+--------------+-----+-----+-------+---------------+
|  cust_id|    hh_id|car_id|active_hh|hh_start_date|  phone_number|  zip|state|country|referral_source|
+---------+---------+------+---------+-------------+--------------+-----+-----+-------+---------------+
|774461007|739569792|640160|        0|       8/1/81|(678) 123-4567|54321|   GA|    USA|          Other|
+---------+---------+------+---------+-------------+--------------+-----+-----+-------+---------------+

Sample Invalid hh_start_dates:
+-----+-------+------+---------+-------------+------------+---+-----+-------+---------------+
|hh_id|cust_id|car_id|active_hh|hh_start_date|phone_number|zip|state|country|referral_source|
+-----+-------+------+---------+-------------+------------+---+-----+-------+---------------+
+-----+-------+------+---------+-------------+------------+---+-----+-------+---------------+

+---------------+
|referral_source|
+---------------+


***<u style="color: green;">OBSERVATION:</u> &nbsp;***

- **One cust_id from the HOUSEHOLD DataFrame is not present in the CUSTOMERS DataFrame.**
- **This record will be excluded from the HOUSEHOLD DataFrame, as there is no information about that customer.**


In [32]:
new_households_df = households_df.filter(households_df.cust_id != 774461007)
calculate_null_counts(new_households_df, "new HOUSEHOLDS")

####################
new HOUSEHOLDS DataFrame:
#################### 

-----------------------------------
Null counts for new HOUSEHOLDS DataFrame:
----------------------------------- 

+-----+-------+------+---------+-------------+------------+---+-----+-------+---------------+
|hh_id|cust_id|car_id|active_hh|hh_start_date|phone_number|zip|state|country|referral_source|
+-----+-------+------+---------+-------------+------------+---+-----+-------+---------------+
|    0|      0|     0|        0|            0|           0|  0|    0|      0|              0|
+-----+-------+------+---------+-------------+------------+---+-----+-------+---------------+

-----------------------------------
Total rows in new HOUSEHOLDS DataFrame:
----------------------------------- 

499999





---

# <span style="color: orange;"> 4. Combine the Datasets into One DataFrame </span>

In [33]:
# Join households_df with new_cust_df on cust_id
households_customers_df = new_households_df.join(new_cust_df, on="cust_id", how="inner")

# Join the result with cars_df on [car_id, state]
final_df = households_customers_df.join(cars_df, on=["car_id", "state"], how="inner")

final_df.show(5, truncate=False)

+------+-----+---------+---------+---------+-------------+--------------+-----+-------+---------------+-------------+--------------+---------------------------------+------+--------+----------+-------------+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+------------------------+
|car_id|state|cust_id  |hh_id    |active_hh|hh_start_date|phone_number  |zip  |country|referral_source|date_of_birth|marital_status|employment_type                  |income|status  |model_year|make         |body_style|vehicle_value|annual_miles_driven|business_use|antique_vehicle|lien|lease|driver_safety_discount|vehicle_safety_discount|claim_payout|six_month_premium_amount|
+------+-----+---------+---------+---------+-------------+--------------+-----+-------+---------------+-------------+--------------+---------------------------------+------+--------+----------+-------------+----------+-------------+------------

In [34]:
calculate_null_counts(final_df, "FINAL")

####################
FINAL DataFrame:
#################### 

-----------------------------------
Null counts for FINAL DataFrame:
----------------------------------- 

+------+-----+-------+-----+---------+-------------+------------+---+-------+---------------+-------------+--------------+---------------+------+------+----------+----+----------+-------------+-------------------+------------+---------------+----+-----+----------------------+-----------------------+------------+------------------------+
|car_id|state|cust_id|hh_id|active_hh|hh_start_date|phone_number|zip|country|referral_source|date_of_birth|marital_status|employment_type|income|status|model_year|make|body_style|vehicle_value|annual_miles_driven|business_use|antique_vehicle|lien|lease|driver_safety_discount|vehicle_safety_discount|claim_payout|six_month_premium_amount|
+------+-----+-------+-----+---------+-------------+------------+---+-------+---------------+-------------+--------------+---------------+------+------+--

In [35]:
from pyspark.sql import DataFrame
import sys

# Function to calculate the size of a DataFrame in memory
def get_df_size_in_memory(df: DataFrame) -> float:
    partition_sizes = df.rdd.mapPartitions(lambda iter: [sum(sys.getsizeof(row) for row in iter)]).collect()    # Calculate the size of each partition
    total_size = sum(partition_sizes)               # Total size in bytes
    total_size_in_mb = total_size / (1024 * 1024)   # Convert size to megabytes
    return total_size_in_mb

df_size_in_mb = get_df_size_in_memory(final_df)
print(f"DataFrame size: {df_size_in_mb} MB")

DataFrame size: 129.69944763183594 MB


In [36]:
spark

In [38]:
# Inspecting count of distinct values in each column to determine potential partitioning columns
columns_to_check = ['car_id', 'state', 'cust_id', 'hh_id', 'active_hh', 'hh_start_date', 'phone_number', 'zip', 
                    'country', 'referral_source', 'date_of_birth', 'marital_status', 'employment_type', 'income', 
                    'status', 'model_year', 'make', 'body_style', 'vehicle_value', 'annual_miles_driven', 'business_use',
                    'antique_vehicle', 'lien', 'lease', 'driver_safety_discount', 'vehicle_safety_discount', 'claim_payout', 
                    'six_month_premium_amount']
                    
for column in columns_to_check:
    distinct_count = final_df.select(column).distinct().count()
    print(f"Distinct count for column '{column}': {distinct_count}")

Distinct count for column 'car_id': 372536
Distinct count for column 'state': 50
Distinct count for column 'cust_id': 499850
Distinct count for column 'hh_id': 131352
Distinct count for column 'active_hh': 2
Distinct count for column 'hh_start_date': 12774
Distinct count for column 'phone_number': 499984
Distinct count for column 'zip': 69148
Distinct count for column 'country': 1
Distinct count for column 'referral_source': 7
Distinct count for column 'date_of_birth': 28560
Distinct count for column 'marital_status': 4
Distinct count for column 'employment_type': 10
Distinct count for column 'income': 119552
Distinct count for column 'status': 4
Distinct count for column 'model_year': 73
Distinct count for column 'make': 7
Distinct count for column 'body_style': 4
Distinct count for column 'vehicle_value': 209693
Distinct count for column 'annual_miles_driven': 92634
Distinct count for column 'business_use': 2
Distinct count for column 'antique_vehicle': 2
Distinct count for column 'l

In [39]:
# For "state"
state_partition_count = final_df.groupBy("state").agg(F.count("*").alias("record_count")).orderBy("record_count", ascending=False)
state_partition_count.show(50)

+-----+------------+
|state|record_count|
+-----+------------+
|   KY|       10519|
|   SC|       10453|
|   AK|       10401|
|   HI|       10356|
|   WY|       10327|
|   CT|       10326|
|   NE|       10295|
|   MI|       10272|
|   MA|       10266|
|   AR|       10207|
|   MN|       10204|
|   GA|       10185|
|   WA|       10184|
|   AL|       10175|
|   NY|       10154|
|   NV|       10148|
|   CA|       10110|
|   MS|       10061|
|   NJ|       10058|
|   MO|       10044|
|   NM|       10040|
|   IN|       10030|
|   MT|       10029|
|   AZ|        9988|
|   UT|        9986|
|   FL|        9963|
|   SD|        9961|
|   OH|        9945|
|   RI|        9936|
|   ND|        9921|
|   IL|        9920|
|   MD|        9911|
|   OR|        9908|
|   TX|        9901|
|   NC|        9878|
|   IA|        9855|
|   WV|        9849|
|   NH|        9810|
|   VA|        9808|
|   LA|        9802|
|   KS|        9775|
|   ME|        9748|
|   OK|        9740|
|   DE|        9712|
|   VT|      

In [40]:
# For "model_year"
model_year_partition_count = final_df.groupBy("model_year").agg(F.count("*").alias("record_count")).orderBy("model_year", ascending=False)
model_year_partition_count.show(73)

+----------+------------+
|model_year|record_count|
+----------+------------+
|      2024|       16405|
|      2023|       16148|
|      2022|       16212|
|      2021|       16477|
|      2020|       16065|
|      2019|       16074|
|      2018|       16143|
|      2017|       16393|
|      2016|       16445|
|      2015|       16478|
|      2014|       16268|
|      2013|       16258|
|      2012|       16114|
|      2011|       16522|
|      2010|       16108|
|      2009|       16389|
|      2008|       16334|
|      2007|       16269|
|      2006|       16220|
|      2005|       16345|
|      2004|       16464|
|      2003|       16194|
|      2002|       16298|
|      2001|        2489|
|      2000|        2450|
|      1999|        2523|
|      1998|        2521|
|      1997|        2533|
|      1996|        2497|
|      1995|        2532|
|      1994|        2522|
|      1993|        2456|
|      1992|        2446|
|      1991|        2498|
|      1990|        2516|
|      1989|

In [41]:
make_partition_count = final_df.groupBy("make").agg(F.count("*").alias("record_count")).orderBy("record_count", ascending=False)
make_partition_count.show(7)

+-------------+------------+
|         make|record_count|
+-------------+------------+
|Manufacturer2|      100020|
|Manufacturer3|       99986|
|Manufacturer1|       99772|
|Manufacturer6|       65297|
|Manufacturer7|       65243|
|Manufacturer5|       34850|
|Manufacturer4|       34831|
+-------------+------------+



***<u style="color: green;">OBSERVATION:</u> &nbsp;***

- **'state' appears to be a good partitioning column, as the data is almost evenly distributed across the states.**


In [37]:
# Save DataFrame to parquet
final_df.coalesce(1).write.partitionBy("state") \
        .parquet(f"{OUTPUT_DIR}/final_df.parquet", mode="overwrite", compression="snappy")