# Cleaning Customer Data

## Data Cleaning Actions to Take

1. Create a dataframe with proper datatypes
2. Rename a few columns
annual_inc -> annual_income
addr_state -> address_state
xip_cod -> address_zipcode
country -> address_country
total_hi_cred_lim -> total_high_credit_limit
annual_inc_joint -> join_annual_income

3. Insert a new column named as ingestion date (current time )

4. Remove complete duplicate rows

5. Remove rows where annual_income is null

6. Convert emp_length to integer

7. We need to replace all the nulls in emp_lenths column with average of this column

8. Clean the address_state (it should be 2 characters only), replace all others with NA

9. Write the cleaned customers data in cleaned folder in hdfs

In [2]:
# Create spark session
import getpass
from pyspark.sql import SparkSession


username = getpass.getuser()
spark = SparkSession.builder.\
    config('spark.shuffle.useOldFetchProtocol','true').\
    config('spark.ui.port', '0').\
    config('spark.sql.warehouse.dir', f'/user/{username}/warehouse').\
    enableHiveSupport().\
    master('yarn').\
    getOrCreate()

## 1. Handle Datatype

In [5]:
customer_schema = """member_id string, emp_title string, emp_length string, home_ownership string, 
annual_inc float, addr_state string, zip_code string, country string, grade string, sub_grade string, 
verification_status string, tot_hi_cred_lim float, application_type string, annual_inc_joint float, 
verification_status_joint string"""

In [7]:
# Load customers raw data with the schema defined above

customers_raw_df = spark.read\
.format("csv")\
.option("header", True)\
.schema(customer_schema)\
.load("/user/itv008299/lendingclubproject/raw/customers_data_csv")

In [8]:
customers_raw_df

member_id,emp_title,emp_length,home_ownership,annual_inc,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,annual_inc_joint,verification_status_joint
6d5091b3fcaaeb4ea...,leadman,10+ years,MORTGAGE,55000.0,PA,190xx,USA,C,C4,Not Verified,178050.0,Individual,,
b5e7938b0a2da4cea...,Engineer,10+ years,MORTGAGE,65000.0,SD,577xx,USA,C,C1,Not Verified,314017.0,Individual,,
91060b858433e8a61...,truck driver,10+ years,MORTGAGE,63000.0,IL,605xx,USA,B,B4,Not Verified,218418.0,Joint App,71000.0,Not Verified
cab1fa9f533688b0a...,Information Syste...,10+ years,MORTGAGE,110000.0,NJ,076xx,USA,C,C5,Source Verified,381215.0,Individual,,
f74e401c1ab0adf78...,Contract Specialist,3 years,MORTGAGE,104433.0,PA,174xx,USA,F,F1,Source Verified,439570.0,Individual,,
8aef4bb29d609d8d6...,Veterinary Tecnician,4 years,RENT,34000.0,GA,300xx,USA,C,C3,Source Verified,16900.0,Individual,,
538b4653da3b1e814...,Vice President of...,10+ years,MORTGAGE,180000.0,MN,550xx,USA,B,B2,Not Verified,388852.0,Individual,,
b24d55f21390533c5...,road driver,10+ years,MORTGAGE,85000.0,SC,293xx,USA,B,B1,Not Verified,193390.0,Individual,,
1035c5401b0ca76d0...,SERVICE MANAGER,6 years,RENT,85000.0,PA,160xx,USA,A,A2,Not Verified,61099.0,Individual,,
cb0f1777593e77909...,Vendor liaison,10+ years,MORTGAGE,42000.0,RI,029xx,USA,B,B5,Not Verified,256513.0,Individual,,


In [9]:
customers_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



## 2. Rename a few columns

In [13]:
customers_raw_df_renamed = customers_raw_df.withColumnRenamed("annual_inc", "annual_income")\
.withColumnRenamed("addr_state", "address_state")\
.withColumnRenamed("zip_code", "address_zipcode")\
.withColumnRenamed("country", "address_country")\
.withColumnRenamed("tot_hi_cred_lim", "total_high_credit_limit")\
.withColumnRenamed("annual_inc_joint", "join_annual_income")

In [14]:
customers_raw_df_renamed

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint
6d5091b3fcaaeb4ea...,leadman,10+ years,MORTGAGE,55000.0,PA,190xx,USA,C,C4,Not Verified,178050.0,Individual,,
b5e7938b0a2da4cea...,Engineer,10+ years,MORTGAGE,65000.0,SD,577xx,USA,C,C1,Not Verified,314017.0,Individual,,
91060b858433e8a61...,truck driver,10+ years,MORTGAGE,63000.0,IL,605xx,USA,B,B4,Not Verified,218418.0,Joint App,71000.0,Not Verified
cab1fa9f533688b0a...,Information Syste...,10+ years,MORTGAGE,110000.0,NJ,076xx,USA,C,C5,Source Verified,381215.0,Individual,,
f74e401c1ab0adf78...,Contract Specialist,3 years,MORTGAGE,104433.0,PA,174xx,USA,F,F1,Source Verified,439570.0,Individual,,
8aef4bb29d609d8d6...,Veterinary Tecnician,4 years,RENT,34000.0,GA,300xx,USA,C,C3,Source Verified,16900.0,Individual,,
538b4653da3b1e814...,Vice President of...,10+ years,MORTGAGE,180000.0,MN,550xx,USA,B,B2,Not Verified,388852.0,Individual,,
b24d55f21390533c5...,road driver,10+ years,MORTGAGE,85000.0,SC,293xx,USA,B,B1,Not Verified,193390.0,Individual,,
1035c5401b0ca76d0...,SERVICE MANAGER,6 years,RENT,85000.0,PA,160xx,USA,A,A2,Not Verified,61099.0,Individual,,
cb0f1777593e77909...,Vendor liaison,10+ years,MORTGAGE,42000.0,RI,029xx,USA,B,B5,Not Verified,256513.0,Individual,,


## 3. Ingest a new column named as ingestion date (current time)

This is to keep track when file was ingested to our datalake.

In [15]:
from pyspark.sql.functions import current_timestamp

customers_raw_df_ingested = customers_raw_df_renamed.withColumn("ingest_date", current_timestamp())

In [16]:
customers_raw_df_ingested

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
6d5091b3fcaaeb4ea...,leadman,10+ years,MORTGAGE,55000.0,PA,190xx,USA,C,C4,Not Verified,178050.0,Individual,,,2023-11-08 16:07:...
b5e7938b0a2da4cea...,Engineer,10+ years,MORTGAGE,65000.0,SD,577xx,USA,C,C1,Not Verified,314017.0,Individual,,,2023-11-08 16:07:...
91060b858433e8a61...,truck driver,10+ years,MORTGAGE,63000.0,IL,605xx,USA,B,B4,Not Verified,218418.0,Joint App,71000.0,Not Verified,2023-11-08 16:07:...
cab1fa9f533688b0a...,Information Syste...,10+ years,MORTGAGE,110000.0,NJ,076xx,USA,C,C5,Source Verified,381215.0,Individual,,,2023-11-08 16:07:...
f74e401c1ab0adf78...,Contract Specialist,3 years,MORTGAGE,104433.0,PA,174xx,USA,F,F1,Source Verified,439570.0,Individual,,,2023-11-08 16:07:...
8aef4bb29d609d8d6...,Veterinary Tecnician,4 years,RENT,34000.0,GA,300xx,USA,C,C3,Source Verified,16900.0,Individual,,,2023-11-08 16:07:...
538b4653da3b1e814...,Vice President of...,10+ years,MORTGAGE,180000.0,MN,550xx,USA,B,B2,Not Verified,388852.0,Individual,,,2023-11-08 16:07:...
b24d55f21390533c5...,road driver,10+ years,MORTGAGE,85000.0,SC,293xx,USA,B,B1,Not Verified,193390.0,Individual,,,2023-11-08 16:07:...
1035c5401b0ca76d0...,SERVICE MANAGER,6 years,RENT,85000.0,PA,160xx,USA,A,A2,Not Verified,61099.0,Individual,,,2023-11-08 16:07:...
cb0f1777593e77909...,Vendor liaison,10+ years,MORTGAGE,42000.0,RI,029xx,USA,B,B5,Not Verified,256513.0,Individual,,,2023-11-08 16:07:...


## 4. Remove completely duplicate rows

In [18]:
# count total records in customer df

customers_raw_df_ingested.count()

2260701

In [21]:
# Get distinct record in customer df and store in new df

customers_distinct = customers_raw_df_ingested.distinct()

customers_distinct.count()

2260638

In [23]:
customers_distinct.createOrReplaceTempView("customers")

## 5. Remove rows where annual_income is null

In [24]:
# Check if there is any row with null annual_income

spark.sql("select count(*) from customers where annual_income is null")

count(1)
5


In [25]:
# Filter record where annual income is not null
customers_income_filtered_df = spark.sql("select * from customers where annual_income is not null")

In [26]:
customers_income_filtered_df.createOrReplaceTempView("customers")

In [27]:
spark.sql("select count(*) from customers where annual_income is null")

count(1)
0


## 6. Convert emp_length to integer

In [29]:
# Check what kind of data is present in the column

spark.sql("select distinct(emp_length) from customers")

emp_length
9 years
5 years
""
1 year
2 years
7 years
8 years
4 years
6 years
3 years


In [31]:
from pyspark.sql.functions import regexp_replace, col

customers_emplength_cleaned = customers_income_filtered_df.withColumn("emp_length", regexp_replace(col("emp_length"), "(\D)", ""))

In [32]:
customers_emplength_cleaned

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
b8a34df9ade163507...,Driver,1,OWN,55000.0,AL,355xx,USA,B,B4,Not Verified,118711.0,Individual,,,2023-11-08 16:17:...
4efe779cdacc58ed8...,Mechanic,10,MORTGAGE,80000.0,PA,172xx,USA,A,A5,Not Verified,125210.0,Individual,,,2023-11-08 16:17:...
48270bc4aaaa14ef4...,Lecturer and Curr...,3,RENT,56457.0,MA,021xx,USA,B,B5,Verified,26400.0,Individual,,,2023-11-08 16:17:...
8a3e78e0ba855d3c8...,Sr Finance Projec...,10,MORTGAGE,210000.0,TX,773xx,USA,C,C4,Source Verified,464642.0,Individual,,,2023-11-08 16:17:...
780a3c00b828d1b29...,Manager,10,MORTGAGE,127000.0,GA,300xx,USA,A,A3,Not Verified,341600.0,Individual,,,2023-11-08 16:17:...
e6319fced1584ee09...,Contracting Officer,10,RENT,132700.0,PA,172xx,USA,D,D1,Source Verified,123715.0,Individual,,,2023-11-08 16:17:...
b37f7aecd4d486a4f...,President,10,MORTGAGE,85000.0,NJ,080xx,USA,B,B2,Not Verified,599850.0,Individual,,,2023-11-08 16:17:...
edbdbb631c5321a02...,stacker operator,10,OWN,43000.0,MS,392xx,USA,D,D1,Source Verified,56210.0,Individual,,,2023-11-08 16:17:...
333d2d7fc3f6e5484...,President,6,RENT,88000.0,FL,331xx,USA,D,D4,Source Verified,137996.0,Individual,,,2023-11-08 16:17:...
60f8478c32f8b12b0...,Merchandizer,10,RENT,65000.0,CA,910xx,USA,D,D3,Verified,60620.0,Individual,,,2023-11-08 16:17:...


In [33]:
customers_emplength_cleaned.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [34]:
# Here, emp_length is still string, so need to convert to int

customers_emplength_casted = customers_emplength_cleaned.withColumn("emp_length", customers_emplength_cleaned.emp_length.cast("int"))

In [35]:
customers_emplength_casted.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



## 7. Replace null in emp_length with average of this column if any

In [36]:
# count if there are any nulls
customers_emplength_casted.filter("emp_length is null").count()

146903

In [37]:
customers_emplength_casted.createOrReplaceTempView("customers")

In [38]:
avg_empg_length = spark.sql("select floor(avg(emp_length)) as avg_emp_length from customers").collect()

In [39]:
avg_empg_length

[Row(avg_emp_length=6)]

In [40]:
avg_emp_duration = avg_empg_length[0][0]

In [41]:
avg_emp_duration

6

In [42]:
customers_emplength_replaced = customers_emplength_casted.na.fill(avg_emp_duration, subset=["emp_length"])

In [43]:
customers_emplength_replaced.filter("emp_length is null").count()

0

## 8. Clean address_state column ( it should be only of 2 chars). REplace others with NA

In [45]:
customers_emplength_replaced.createOrReplaceTempView("customers")

In [46]:
spark.sql("select distinct(address_state) from customers")

address_state
Helping Kenya's D...
223xx
175 (total projec...
AZ
SC
I am 56 yrs. old ...
"so Plan """"C"""" is ..."
financially I mad...
but no one will l...
LA


In [47]:
spark.sql("select count(address_state) from customers where length(address_state) > 2")

count(address_state)
254


In [48]:
from pyspark.sql.functions import when, length, col

customers_state_cleaned_df = customers_emplength_replaced.withColumn(
    "address_state",
    when(length(col("address_state")) > 2, "NA").otherwise(col("address_state"))
)

In [49]:
customers_state_cleaned_df

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
3abadaef9c7a5a1ec...,Account Manager,2,MORTGAGE,42773.0,TX,787xx,USA,B,B4,Source Verified,30716.0,Individual,,,2023-11-08 16:30:...
7c7a9bead42ae25ac...,Bank Manager,10,MORTGAGE,70000.0,GA,306xx,USA,E,E3,Not Verified,274281.0,Individual,,,2023-11-08 16:30:...
86e690df18febd790...,Outreach Director...,2,RENT,50000.0,CT,069xx,USA,C,C1,Source Verified,31184.0,Individual,,,2023-11-08 16:30:...
ba0550e40ab96ffcb...,,6,OWN,39000.0,TX,766xx,USA,B,B3,Verified,44200.0,Joint App,81000.0,Not Verified,2023-11-08 16:30:...
28bbe5a9e79077e5b...,Programmer analyst,5,RENT,120000.0,TX,750xx,USA,A,A3,Verified,54100.0,Individual,,,2023-11-08 16:30:...
aee02b340a06c557c...,Director of Infor...,1,MORTGAGE,88000.0,NY,104xx,USA,C,C2,Source Verified,94052.0,Individual,,,2023-11-08 16:30:...
88490f684ea2f7c41...,Executive,10,RENT,180000.0,FL,334xx,USA,C,C2,Source Verified,29400.0,Individual,,,2023-11-08 16:30:...
97a32fc937abdd9d2...,Supervisor,8,MORTGAGE,85000.0,NY,115xx,USA,B,B4,Source Verified,423844.0,Individual,,,2023-11-08 16:30:...
c90944cb2620fc932...,Senior IT Analyst,6,RENT,82000.0,OR,972xx,USA,B,B5,Not Verified,62614.0,Individual,,,2023-11-08 16:30:...
3956822f951035df8...,Server,1,RENT,60000.0,NV,891xx,USA,B,B5,Source Verified,34203.0,Individual,,,2023-11-08 16:30:...


## Write the final cleaned customer dataframe to raw folder

In [50]:
customers_state_cleaned_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv008299/lendingclubproject/cleaned/customers_parquet") \
.save()

In [51]:
customers_state_cleaned_df.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv008299/lendingclubproject/cleaned/customers_csv") \
.save()