In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import col, avg, when
import pprint


In [3]:
spark = SparkSession.builder.appName("Data Engineering").getOrCreate()

In [4]:
house_schema = StructType([
    StructField("HH_ID", IntegerType()),
    StructField("CUST_ID", IntegerType()),
    StructField("CAR_ID", IntegerType()),
    StructField("Active HH", IntegerType()),
    StructField("HH Start Date", StringType()),
    StructField("Phone Number", StringType()),
    StructField("ZIP ", IntegerType()),
    StructField("State",StringType()),
    StructField("Country", StringType()),
    StructField("Referral Source", StringType()),
])

In [5]:
car_schema = StructType([StructField("Car ID", IntegerType()),
                         StructField("Status", StringType()),
                         StructField("State", StringType()),
                         StructField("Model Year", IntegerType()),
                         StructField("Make", StringType()),
                         StructField("Body Style", StringType()),
                         StructField("Vehicle Value", IntegerType()),
                         StructField("Annual Miles Driven", IntegerType()),
                         StructField("Business Use", IntegerType()),
                         StructField("Antique Vehicle", IntegerType()),
                         StructField("Lien", IntegerType()),
                         StructField("Lease", IntegerType()),
                         StructField("Driver Safety Discount", IntegerType()),
                         StructField("Vehicle Safety Discount",IntegerType()),
                         StructField("Claim Payout", IntegerType()),
                         StructField("6 Month Premium Amount", FloatType())
                         ])

In [6]:
car_df = spark.read.option("header",True).schema(car_schema).csv("./CARS.csv")

In [7]:
household_df = spark.read.option("header",True).schema(house_schema).csv("./HOUSEHOLDS.csv")

In [8]:
customer_df = spark.read.option("header",True).csv("./CUSTOMERS.csv")

In [9]:
customer_df.count()

499999

## Clean customer data frame (remove columns that have only null values)

In [10]:
customer_df1 = customer_df.select([col for col in customer_df.columns if customer_df.filter(customer_df[col].isNotNull()).count() > 0])


## Clean up columns with malformed/missing values in the df

In [11]:
customer_df2 = customer_df1.withColumn("Income", when(col("Income") == "#REF!", \
                        customer_df1.where((col("Income") != "#REF!") | (col("Income") != "")).agg(avg("Income")).first()[0]) \
                        .otherwise(col("Income")))


customer_df2 = customer_df1.withColumn("Employment Type", when(col("Employment Type") == "#REF!", 
                            customer_df1.where((col("Employment Type") != "#REF!") | (col("Employment Type") != "")) \
                            .groupBy("Employment Type").count().orderBy('count', ascending=False).first()[0]) \
                            .otherwise(col("Employment Type")))
customer_df2 = customer_df2.withColumnRenamed("Employment Type", "Employment_Type").drop("Employment Type")

In [12]:
household_df.columns

['HH_ID',
 'CUST_ID',
 'CAR_ID',
 'Active HH',
 'HH Start Date',
 'Phone Number',
 'ZIP ',
 'State',
 'Country',
 'Referral Source']

In [13]:
car_df.columns

['Car ID',
 'Status',
 'State',
 'Model Year',
 'Make',
 'Body Style',
 'Vehicle Value',
 'Annual Miles Driven',
 'Business Use',
 'Antique Vehicle',
 'Lien',
 'Lease',
 'Driver Safety Discount',
 'Vehicle Safety Discount',
 'Claim Payout',
 '6 Month Premium Amount']

In [14]:
customer_df2.columns

['CUST_ID', 'Date of Birth', 'Marital Status', 'Employment_Type', 'Income']

In [15]:
def convert_to_snakecase(col_list):
    renamed_col = {}
    for col in col_list:
        col_strip = col.strip().upper()
        renamed_col[col] = '_'.join(col_strip.split())
    return renamed_col

def clean_column_names(df):
    df1 = df.withColumnsRenamed(convert_to_snakecase(df.columns))
    return df1

In [16]:
car_df = clean_column_names(car_df)
household_df = clean_column_names(household_df)
customer_df2 = clean_column_names(customer_df2)


In [17]:
car_df.columns

['CAR_ID',
 'STATUS',
 'STATE',
 'MODEL_YEAR',
 'MAKE',
 'BODY_STYLE',
 'VEHICLE_VALUE',
 'ANNUAL_MILES_DRIVEN',
 'BUSINESS_USE',
 'ANTIQUE_VEHICLE',
 'LIEN',
 'LEASE',
 'DRIVER_SAFETY_DISCOUNT',
 'VEHICLE_SAFETY_DISCOUNT',
 'CLAIM_PAYOUT',
 '6_MONTH_PREMIUM_AMOUNT']

In [18]:
household_df = household_df.withColumnRenamed("STATE", "HOUSEHOLD_STATE")
car_df = car_df.withColumnRenamed("STATE", "CAR_STATE")

In [19]:
combined_df = household_df.join(customer_df2, [household_df.CUST_ID == customer_df2.CUST_ID]).join(car_df, [car_df.CAR_ID == household_df.CAR_ID, car_df.CAR_STATE == household_df.HOUSEHOLD_STATE]) \
    .drop(customer_df2.CUST_ID, car_df.CAR_ID)

In [20]:
combined_df.count()

500297

In [22]:
combined_df.write.parquet('./final_data.parquet', "overwrite")

In [23]:
parquet_schema = spark.read.parquet("./final_data.parquet/").schema

In [24]:
parquet_schema

StructType([StructField('HH_ID', IntegerType(), True), StructField('CUST_ID', IntegerType(), True), StructField('CAR_ID', IntegerType(), True), StructField('ACTIVE_HH', IntegerType(), True), StructField('HH_START_DATE', StringType(), True), StructField('PHONE_NUMBER', StringType(), True), StructField('ZIP', IntegerType(), True), StructField('HOUSEHOLD_STATE', StringType(), True), StructField('COUNTRY', StringType(), True), StructField('REFERRAL_SOURCE', StringType(), True), StructField('DATE_OF_BIRTH', StringType(), True), StructField('MARITAL_STATUS', StringType(), True), StructField('EMPLOYMENT_TYPE', StringType(), True), StructField('INCOME', StringType(), True), StructField('STATUS', StringType(), True), StructField('CAR_STATE', StringType(), True), StructField('MODEL_YEAR', IntegerType(), True), StructField('MAKE', StringType(), True), StructField('BODY_STYLE', StringType(), True), StructField('VEHICLE_VALUE', IntegerType(), True), StructField('ANNUAL_MILES_DRIVEN', IntegerType(),

### # DATA ANALYSIS

In [None]:
intial_df = spark.read.