In [1]:
from pyspark.sql import SparkSession
import getpass 
username=getpass.getuser()
spark=SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

#### 1. create a dataframe with proper datatypes 

In [2]:
customer_schema = 'member_id string, emp_title string, emp_length string, home_ownership string, annual_inc float, addr_state string, zip_code string, country string, grade string, sub_grade string, verification_status string, tot_hi_cred_lim float, application_type string, annual_inc_joint float, verification_status_joint string'

In [3]:
customers_raw_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customer_schema) \
.load("/public/trendytech/lendingclubproject/raw/customers_data_csv")

In [4]:
customers_raw_df

member_id,emp_title,emp_length,home_ownership,annual_inc,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,annual_inc_joint,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


In [5]:
customers_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



#### 2. Rename a few columns

In [6]:
customer_df_renamed = customers_raw_df.withColumnRenamed("annual_inc", "annual_income") \
.withColumnRenamed("addr_state", "address_state") \
.withColumnRenamed("zip_code", "address_zipcode") \
.withColumnRenamed("country", "address_country") \
.withColumnRenamed("tot_hi_credit_lim", "total_high_credit_limit") \
.withColumnRenamed("annual_inc_joint", "join_annual_income")

In [7]:
customer_df_renamed

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


In [8]:
from pyspark.sql.functions import current_timestamp

#### 3. insert a new column named as ingestion date(current time)

In [9]:
customers_df_ingestd = customer_df_renamed.withColumn("ingest_date", current_timestamp())

In [10]:
customers_df_ingestd

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,,2023-10-04 05:37:...
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,,2023-10-04 05:37:...
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,,2023-10-04 05:37:...
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,,2023-10-04 05:37:...
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,,2023-10-04 05:37:...
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,,2023-10-04 05:37:...
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,,2023-10-04 05:37:...
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,,2023-10-04 05:37:...
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,,2023-10-04 05:37:...
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,,2023-10-04 05:37:...


#### 4. Remove complete duplicate rows

In [11]:
customers_df_ingestd.count()

2260701

In [12]:
customers_distinct = customers_df_ingestd.distinct()

In [13]:
customers_distinct.count()

2260638

In [14]:
customers_distinct.createOrReplaceTempView("customers")

In [15]:
spark.sql("select * from customers")

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
b9bb6a9d166023d70...,ACCOUNTANT,10+ years,MORTGAGE,221000.0,TX,770xx,USA,D,D4,Verified,340215.0,Individual,,,2023-10-04 05:37:...
2f6abb5a7514011a6...,Registered Nurse,2 years,MORTGAGE,81000.0,TX,763xx,USA,B,B4,Verified,318415.0,Individual,,,2023-10-04 05:37:...
639fb9944b0256b29...,Teacher/Counselor,8 years,MORTGAGE,86000.0,KY,410xx,USA,D,D3,Source Verified,321739.0,Individual,,,2023-10-04 05:37:...
86c0c2ba01550347e...,Science teacher,< 1 year,RENT,47000.0,IL,606xx,USA,C,C3,Verified,69342.0,Individual,,,2023-10-04 05:37:...
396e35c3ecb4ab284...,Service truck driver,10+ years,RENT,60000.0,CA,953xx,USA,C,C4,Not Verified,57892.0,Individual,,,2023-10-04 05:37:...
d2b06d63808cbb7a1...,Sales,10+ years,OWN,45000.0,CO,800xx,USA,C,C2,Verified,288418.0,Individual,,,2023-10-04 05:37:...
697c3b8db22e0207a...,Staff Accountant,2 years,MORTGAGE,69000.0,GA,305xx,USA,A,A5,Source Verified,241934.0,Individual,,,2023-10-04 05:37:...
98245e415107a8ac8...,Solutions Principal,3 years,MORTGAGE,204000.0,GA,300xx,USA,A,A5,Source Verified,382772.0,Individual,,,2023-10-04 05:37:...
3ccb0a480cb9f58de...,Managing Squisher,5 years,MORTGAGE,45000.0,MI,496xx,USA,B,B3,Source Verified,154653.0,Individual,,,2023-10-04 05:37:...
cc4643e1e1b29f8c4...,Software Engineer,2 years,MORTGAGE,124000.0,PA,168xx,USA,C,C1,Verified,182271.0,Individual,,,2023-10-04 05:37:...


#### 5. Remove the rows where annual_income is null

In [16]:
spark.sql("select count(*) from customers where annual_income is null")

count(1)
5


In [17]:
customers_income_filtered = spark.sql("select * from customers where annual_income is not null")

In [18]:
customers_income_filtered.createOrReplaceTempView("customers")

In [19]:
spark.sql("select count(*) from customers where annual_income is null")

count(1)
0


### 6. convert emp_length to integer

In [20]:
spark.sql("select distinct(emp_length) from customers")

emp_length
9 years
5 years
""
1 year
2 years
7 years
8 years
4 years
6 years
3 years


In [21]:
from pyspark.sql.functions import regexp_replace, col

In [22]:
customers_emplength_cleaned = customers_income_filtered.withColumn("emp_length", regexp_replace(col("emp_length"), "(\D)",""))

In [23]:
customers_emplength_cleaned

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
7138261c576d8d781...,Intelligence Spec...,5.0,MORTGAGE,132000.0,GA,309xx,USA,C,C3,Not Verified,280896.0,Individual,,,2023-10-04 05:38:...
1089bb397d2a3b34a...,Production Coordi...,2.0,RENT,55000.0,CA,900xx,USA,C,C1,Verified,115457.0,Individual,,,2023-10-04 05:38:...
cd4c19c9b39b37285...,Truck Driver,1.0,MORTGAGE,46600.0,TX,751xx,USA,C,C2,Verified,199950.0,Individual,,,2023-10-04 05:38:...
88fa93d11f86cf27c...,Customer Engageme...,5.0,RENT,50000.0,CA,905xx,USA,C,C4,Verified,76510.0,Individual,,,2023-10-04 05:38:...
ff6ec051c988dc700...,,,MORTGAGE,52000.0,MD,211xx,USA,B,B5,Verified,286938.0,Individual,,,2023-10-04 05:38:...
da9f01cc0239b88cb...,Baker/Maintenance,3.0,RENT,32400.0,NY,139xx,USA,C,C2,Source Verified,6100.0,Individual,,,2023-10-04 05:38:...
91b5b6c7dbec66919...,RN,7.0,MORTGAGE,94624.0,GA,300xx,USA,B,B5,Source Verified,361104.0,Individual,,,2023-10-04 05:38:...
4b3a6ec7b33357738...,Managing Director...,4.0,RENT,60000.0,FL,334xx,USA,A,A4,Source Verified,31341.0,Individual,,,2023-10-04 05:38:...
5dfc356747ea4e4ac...,Annual Fund Coord...,1.0,MORTGAGE,51200.0,NH,037xx,USA,F,F5,Not Verified,220461.0,Joint App,83200.0,Not Verified,2023-10-04 05:38:...
31903319d781198f7...,Ramper,10.0,MORTGAGE,100000.0,GA,300xx,USA,D,D1,Source Verified,26800.0,Individual,,,2023-10-04 05:38:...


In [24]:
customers_emplength_cleaned.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [25]:
customers_emplength_casted = customers_emplength_cleaned.withColumn("emp_length", customers_emplength_cleaned.emp_length.cast('int'))

In [26]:
customers_emplength_casted

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
c7dacd496816500fa...,Truck driver,4,OWN,58000.0,TX,751xx,USA,A,A4,Not Verified,38695.0,Individual,,,2023-10-04 05:38:...
2ad83feafde39dc4c...,Sales Supervisor,2,RENT,58000.0,CO,800xx,USA,B,B1,Source Verified,50390.0,Individual,,,2023-10-04 05:38:...
c864e7909219b482c...,Commission Sales,10,RENT,25000.0,WI,546xx,USA,A,A4,Source Verified,45372.0,Individual,,,2023-10-04 05:38:...
5e16287e06f0ddc3a...,Sales Representative,2,OWN,80000.0,FL,331xx,USA,A,A3,Source Verified,51806.0,Individual,,,2023-10-04 05:38:...
31dc64a9f38709240...,Admin. Assistant,10,RENT,50000.0,NY,100xx,USA,C,C2,Not Verified,53523.0,Individual,,,2023-10-04 05:38:...
f85b6a6082b447681...,Engineer,4,RENT,78000.0,VA,244xx,USA,C,C3,Not Verified,34457.0,Individual,,,2023-10-04 05:38:...
8c7b00e2211cf7ac4...,Technician,2,RENT,72000.0,IN,460xx,USA,C,C2,Verified,40231.0,Individual,,,2023-10-04 05:38:...
320471b0deb720d9f...,Financial Analyst,8,MORTGAGE,250000.0,VA,221xx,USA,A,A1,Source Verified,508035.0,Individual,,,2023-10-04 05:38:...
8fc5c7a7321081dd6...,Lead Mechanic,10,RENT,120000.0,VA,201xx,USA,B,B1,Not Verified,81700.0,Individual,,,2023-10-04 05:38:...
4bc27de17c6494e02...,Registered Nurse,1,MORTGAGE,75000.0,NV,891xx,USA,B,B1,Source Verified,68999.0,Individual,,,2023-10-04 05:38:...


In [27]:
customers_emplength_casted.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



#### 7. we need to replace all the nulls in emp_length column with average of this column

In [28]:
customers_emplength_casted.filter("emp_length is null").count()

146903

In [29]:
customers_emplength_casted.createOrReplaceTempView("customers")

In [30]:
avg_emp_length = spark.sql("select floor(avg(emp_length)) as avg_emp_length from customers").collect()

In [31]:
print(avg_emp_length)

[Row(avg_emp_length=6)]


In [32]:
avg_emp_duration = avg_emp_length[0][0]

In [33]:
print(avg_emp_duration)

6


In [34]:
customers_emplength_replaced = customers_emplength_casted.na.fill(avg_emp_duration, subset=['emp_length'])

In [35]:
customers_emplength_replaced

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
e5324e6ec6668c91a...,forman,10,RENT,100000.0,NJ,074xx,USA,C,C2,Verified,50640.0,Individual,,,2023-10-04 05:38:...
a9a8fcbcde5d446bb...,Lead clerk,10,MORTGAGE,55000.0,CA,958xx,USA,D,D3,Not Verified,266596.0,Individual,,,2023-10-04 05:38:...
b711748d9cacaeb06...,SECONDARY MARKETI...,1,RENT,35360.0,CA,928xx,USA,B,B5,Source Verified,16000.0,Individual,,,2023-10-04 05:38:...
00b2d0e6089cb33ed...,Police Sergeant,9,RENT,62000.0,VA,236xx,USA,C,C4,Not Verified,174524.0,Individual,,,2023-10-04 05:38:...
7f0f9bb5da43380c6...,laborer,1,RENT,38400.0,IN,461xx,USA,D,D3,Source Verified,7300.0,Individual,,,2023-10-04 05:38:...
13663735f3a3e3c03...,Health Unit Coord...,10,RENT,50000.0,OH,441xx,USA,B,B1,Not Verified,67458.0,Individual,,,2023-10-04 05:38:...
f7d087e0bd0e26a95...,Compliance Manager,2,MORTGAGE,61200.0,OK,731xx,USA,C,C2,Verified,277246.0,Individual,,,2023-10-04 05:38:...
bbaf77dea69ad59ba...,DIRECT CARE WORKER,3,OWN,25000.0,MO,654xx,USA,C,C3,Source Verified,29198.0,Individual,,,2023-10-04 05:38:...
3d5e1eb7b1a075d7c...,Office Manager,6,RENT,37900.0,HI,967xx,USA,D,D2,Verified,72700.0,Individual,,,2023-10-04 05:38:...
612623c0c3a9d059b...,,6,MORTGAGE,65000.0,WV,249xx,USA,C,C3,Source Verified,42392.0,Individual,,,2023-10-04 05:38:...


In [36]:
customers_emplength_replaced.filter("emp_length is null").count()

0

#### 8. Clean the address_state(it should be 2 characters only),replace all others with NA

In [37]:
customers_emplength_replaced.createOrReplaceTempView("customers")

In [38]:
spark.sql("select distinct(address_state) from customers")

address_state
Helping Kenya's D...
223xx
175 (total projec...
AZ
SC
I am 56 yrs. old ...
"so Plan """"C"""" is ..."
financially I mad...
but no one will l...
LA


In [39]:
spark.sql("select count(address_state) from customers where length(address_state)>2")

count(address_state)
254


In [40]:
from pyspark.sql.functions import when, col, length

In [41]:
customers_state_cleaned = customers_emplength_replaced.withColumn(
    "address_state",
    when(length(col("address_state"))> 2, "NA").otherwise(col("address_state"))
)

In [42]:
customers_state_cleaned

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
d16e2f99a726e8326...,Sr. Network Desig...,8,MORTGAGE,82000.0,GA,301xx,USA,B,B4,Source Verified,427673.0,Individual,,,2023-10-04 05:38:...
39c7ce3e4838895b4...,VP Marketing,10,RENT,155000.0,NY,105xx,USA,B,B3,Not Verified,92200.0,Individual,,,2023-10-04 05:38:...
b061c1f3197c674d1...,Driver,9,RENT,93000.0,HI,967xx,USA,C,C5,Verified,130019.0,Individual,,,2023-10-04 05:38:...
c728d0bd111b0240e...,Transport Driver ...,10,MORTGAGE,130000.0,TX,775xx,USA,C,C5,Source Verified,172012.0,Individual,,,2023-10-04 05:38:...
4fe2c555c12ec7e02...,Office Manager,10,MORTGAGE,62000.0,WA,985xx,USA,F,F1,Verified,333602.0,Individual,,,2023-10-04 05:38:...
1e4d2744d484898a7...,Senior Accountant,10,OWN,74000.0,KS,662xx,USA,B,B3,Verified,324392.0,Individual,,,2023-10-04 05:38:...
4888e607570b21f6c...,5th Grade Teacher,10,MORTGAGE,72000.0,GA,398xx,USA,A,A2,Verified,213654.0,Individual,,,2023-10-04 05:38:...
03dc367806c43d74d...,purchaser,10,RENT,59000.0,CA,921xx,USA,B,B3,Verified,75835.0,Individual,,,2023-10-04 05:38:...
59af1f6dae2947b4a...,Engineer,10,MORTGAGE,110000.0,OH,453xx,USA,C,C4,Source Verified,296094.0,Individual,,,2023-10-04 05:38:...
4ec343b2e7c664944...,teachers aide,10,MORTGAGE,35000.0,MO,641xx,USA,F,F3,Verified,65428.0,Individual,,,2023-10-04 05:38:...


In [43]:
customers_state_cleaned.select("address_state").distinct()

address_state
AZ
SC
LA
MN
NJ
DC
OR
""
VA
""


In [44]:
customers_state_cleaned.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv006277/lendingclubproject/raw/cleaned/customers_parquet") \
.save()

In [45]:
customers_state_cleaned.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv006277/lendingclubproject/raw/cleaned/customers_csv") \
.save()