In [1]:
from pyspark.sql import SparkSession
import getpass 
username=getpass.getuser()
spark=SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

#### 1. create a dataframe with proper datatypes 

In [2]:
customer_schema = 'member_id string, emp_title string, emp_length string, home_ownership string, annual_inc float, addr_state string, zip_code string, country string, grade string, sub_grade string, verification_status string, tot_hi_cred_lim float, application_type string, annual_inc_joint float, verification_status_joint string'

In [3]:
customers_raw_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customer_schema) \
.load("/public/trendytech/lendingclubproject/raw/customers_data_csv")

In [4]:
customers_raw_df

member_id,emp_title,emp_length,home_ownership,annual_inc,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,annual_inc_joint,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


In [5]:
customers_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



#### 2. Rename a few columns

In [6]:
customer_df_renamed = customers_raw_df.withColumnRenamed("annual_inc", "annual_income") \
.withColumnRenamed("addr_state", "address_state") \
.withColumnRenamed("zip_code", "address_zipcode") \
.withColumnRenamed("country", "address_country") \
.withColumnRenamed("tot_hi_credit_lim", "total_high_credit_limit") \
.withColumnRenamed("annual_inc_joint", "join_annual_income")

In [7]:
customer_df_renamed

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


In [8]:
from pyspark.sql.functions import current_timestamp

#### 3. insert a new column named as ingestion date(current time)

In [9]:
customers_df_ingestd = customer_df_renamed.withColumn("ingest_date", current_timestamp())

In [10]:
customers_df_ingestd

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,,2024-02-11 00:13:...
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,,2024-02-11 00:13:...
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,,2024-02-11 00:13:...
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,,2024-02-11 00:13:...
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,,2024-02-11 00:13:...
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,,2024-02-11 00:13:...
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,,2024-02-11 00:13:...
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,,2024-02-11 00:13:...
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,,2024-02-11 00:13:...
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,,2024-02-11 00:13:...


#### 4. Remove complete duplicate rows

In [11]:
customers_df_ingestd.count()

2260701

In [12]:
customers_distinct = customers_df_ingestd.distinct()

In [13]:
customers_distinct.count()

2260638

In [14]:
customers_distinct.createOrReplaceTempView("customers")

In [15]:
spark.sql("select * from customers")

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
d0171cf38c58c37cd...,JL transport,< 1 year,RENT,33600.0,CA,921xx,USA,B,B2,Not Verified,,Individual,,,2024-02-11 00:14:...
20943584b28ed9af5...,CLEVELAND REGIONA...,10+ years,MORTGAGE,58000.0,OH,441xx,USA,A,A1,Not Verified,,Individual,,,2024-02-11 00:14:...
df67ac77e6b65dd4b...,JSTOR/Ithaka,1 year,RENT,45000.0,NY,100xx,USA,C,C1,Source Verified,,Individual,,,2024-02-11 00:14:...
5a9213685bdb032c8...,Howe Wood Products,8 years,MORTGAGE,57000.0,PA,174xx,USA,A,A2,Not Verified,,Individual,,,2024-02-11 00:14:...
107b062a4631d7a58...,Rich's,5 years,OWN,20000.0,MO,631xx,USA,B,B5,Not Verified,,Individual,,,2024-02-11 00:14:...
70d7bf8aec58ef52e...,Chevron phillips,10+ years,RENT,85000.0,TX,790xx,USA,B,B5,Source Verified,,Individual,,,2024-02-11 00:14:...
ce0f8bc1db222c878...,The Buffalo Group,< 1 year,MORTGAGE,120000.0,PA,170xx,USA,E,E5,Verified,,Individual,,,2024-02-11 00:14:...
bfe20161a22d9aafb...,25 years,10+ years,MORTGAGE,56000.0,AZ,853xx,USA,D,D5,Verified,,Individual,,,2024-02-11 00:14:...
669eb4a3ca55ba2a3...,ST JOSEPH'S MEDIC...,10+ years,RENT,38000.0,NY,104xx,USA,C,C5,Not Verified,,Individual,,,2024-02-11 00:14:...
ee7935d00acfda580...,State of CA Dept....,10+ years,RENT,77592.0,CA,917xx,USA,D,D3,Source Verified,,Individual,,,2024-02-11 00:14:...


#### 5. Remove the rows where annual_income is null

In [16]:
spark.sql("select count(*) from customers where annual_income is null")

count(1)
5


In [17]:
customers_income_filtered = spark.sql("select * from customers where annual_income is not null")

In [18]:
customers_income_filtered.createOrReplaceTempView("customers")

In [19]:
spark.sql("select count(*) from customers where annual_income is null")

count(1)
0


### 6. convert emp_length to integer

In [20]:
spark.sql("select distinct(emp_length) from customers")

emp_length
5 years
9 years
""
1 year
2 years
7 years
8 years
4 years
6 years
3 years


In [21]:
from pyspark.sql.functions import regexp_replace, col

In [22]:
customers_emplength_cleaned = customers_income_filtered.withColumn("emp_length", regexp_replace(col("emp_length"), "(\D)",""))

In [23]:
customers_emplength_cleaned

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
7544103829c3082b7...,technician,10.0,MORTGAGE,36000.0,OH,433xx,USA,B,B2,Not Verified,23820.0,Individual,,,2024-02-11 00:14:...
c25963d7a66a7abc6...,Claims adjuster,7.0,MORTGAGE,57500.0,OK,741xx,USA,B,B5,Source Verified,109500.0,Individual,,,2024-02-11 00:14:...
6c32bb4a54a6b52f8...,Clinical Director,7.0,RENT,78250.0,FL,325xx,USA,D,D1,Not Verified,62519.0,Individual,,,2024-02-11 00:14:...
6d04824f59eb43a2e...,OTR Driver,1.0,OWN,60000.0,NC,287xx,USA,F,F3,Source Verified,9000.0,Individual,,,2024-02-11 00:14:...
5674e80b7cc0315da...,Safety managerr,1.0,RENT,195000.0,UT,840xx,USA,A,A4,Source Verified,129849.0,Individual,,,2024-02-11 00:14:...
dbb33bb6f937e49ec...,Administrative Of...,10.0,OWN,153510.0,VA,234xx,USA,A,A4,Not Verified,512205.0,Individual,,,2024-02-11 00:14:...
1eb4c05e05010c9cd...,Sales Manager,1.0,MORTGAGE,205000.0,NY,115xx,USA,E,E4,Verified,589964.0,Individual,,,2024-02-11 00:14:...
6ea4a54bbf43cc2f5...,Business Consultant,10.0,OWN,86000.0,OH,451xx,USA,D,D2,Not Verified,242788.0,Individual,,,2024-02-11 00:14:...
a9d2af1b8e43f4f73...,housekeeping,10.0,RENT,39000.0,NY,107xx,USA,C,C2,Source Verified,15000.0,Individual,,,2024-02-11 00:14:...
ac8452e99a0540b43...,Met Associate,5.0,OWN,64000.0,CA,945xx,USA,C,C1,Source Verified,25200.0,Individual,,,2024-02-11 00:14:...


In [24]:
customers_emplength_cleaned.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [25]:
customers_emplength_casted = customers_emplength_cleaned.withColumn("emp_length", customers_emplength_cleaned.emp_length.cast('int'))

In [26]:
customers_emplength_casted

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
e80b2072937e13019...,Automobile Techni...,8,RENT,83000.0,IL,606xx,USA,B,B2,Not Verified,50922.0,Individual,,,2024-02-11 00:14:...
1162a5aef0355f5f4...,teacher,10,MORTGAGE,65000.0,AZ,851xx,USA,B,B3,Source Verified,169436.0,Individual,,,2024-02-11 00:14:...
18f2a9414e873e2d1...,Registered Nurse,3,OWN,60000.0,FL,322xx,USA,C,C2,Source Verified,77604.0,Individual,,,2024-02-11 00:14:...
d9af6aba979703a44...,Automation Engineer,2,MORTGAGE,60000.0,OH,452xx,USA,A,A5,Verified,235000.0,Individual,,,2024-02-11 00:14:...
fb570e5cf8208009a...,Director of sales,10,MORTGAGE,93600.0,CA,956xx,USA,B,B3,Verified,559343.0,Individual,,,2024-02-11 00:14:...
d752764467c1c65b3...,Medical Technician,1,RENT,30000.0,NV,897xx,USA,C,C2,Not Verified,22300.0,Individual,,,2024-02-11 00:14:...
ce25e0ea5bfde5727...,plant operator,10,RENT,45000.0,CA,928xx,USA,B,B2,Not Verified,23074.0,Individual,,,2024-02-11 00:14:...
76af651bf2507dc9f...,Principal,6,MORTGAGE,97000.0,OH,440xx,USA,E,E2,Verified,357201.0,Individual,,,2024-02-11 00:14:...
5fcddfe82ad68b035...,Class A CDL Driver,10,OWN,45000.0,TX,761xx,USA,B,B5,Not Verified,53101.0,Individual,,,2024-02-11 00:14:...
8f85013a66fc4853d...,President/CEO,1,MORTGAGE,125000.0,TX,792xx,USA,A,A5,Verified,430208.0,Individual,,,2024-02-11 00:14:...


In [27]:
customers_emplength_casted.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



#### 7. we need to replace all the nulls in emp_length column with average of this column

In [28]:
customers_emplength_casted.filter("emp_length is null").count()

146903

In [29]:
customers_emplength_casted.createOrReplaceTempView("customers")

In [30]:
avg_emp_length = spark.sql("select floor(avg(emp_length)) as avg_emp_length from customers").collect()

In [31]:
print(avg_emp_length)

[Row(avg_emp_length=6)]


In [32]:
avg_emp_duration = avg_emp_length[0][0]

In [33]:
print(avg_emp_duration)

6


In [34]:
customers_emplength_replaced = customers_emplength_casted.na.fill(avg_emp_duration, subset=['emp_length'])

In [35]:
customers_emplength_replaced

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
2dcb219f160d64890...,Registered Nurse,10,MORTGAGE,80000.0,TX,774xx,USA,B,B1,Source Verified,281142.0,Individual,,,2024-02-11 00:15:...
dcad4689b72372f81...,registered nurse,5,MORTGAGE,126000.0,MA,027xx,USA,D,D1,Verified,360331.0,Individual,,,2024-02-11 00:15:...
c04b81d4c045e4273...,Teamster Driver,10,RENT,200000.0,NJ,074xx,USA,C,C2,Source Verified,40190.0,Individual,,,2024-02-11 00:15:...
646124ce14ff72463...,Customer Service Rep,10,RENT,35000.0,CA,916xx,USA,B,B2,Verified,86659.0,Individual,,,2024-02-11 00:15:...
2af49d3c92e632687...,Receiving Coordin...,6,MORTGAGE,45000.0,SC,291xx,USA,D,D2,Not Verified,129573.0,Individual,,,2024-02-11 00:15:...
027761fcee352424c...,General Sales Man...,2,MORTGAGE,120000.0,CA,902xx,USA,A,A5,Verified,113296.0,Individual,,,2024-02-11 00:15:...
b0db9762e193d119f...,home health aide,7,OWN,24000.0,NJ,077xx,USA,C,C4,Not Verified,11280.0,Individual,,,2024-02-11 00:15:...
f0bc0568272301f29...,Director,10,MORTGAGE,100000.0,MI,496xx,USA,B,B1,Source Verified,336983.0,Individual,,,2024-02-11 00:15:...
3a070f8a5fe4397cf...,Sales,3,RENT,40000.0,NJ,070xx,USA,E,E1,Source Verified,26200.0,Individual,,,2024-02-11 00:15:...
ca9bd73de2552366c...,Teacher,7,MORTGAGE,42000.0,FL,342xx,USA,C,C1,Source Verified,53064.0,Individual,,,2024-02-11 00:15:...


In [36]:
customers_emplength_replaced.filter("emp_length is null").count()

0

#### 8. Clean the address_state(it should be 2 characters only),replace all others with NA

In [37]:
customers_emplength_replaced.createOrReplaceTempView("customers")

In [38]:
spark.sql("select distinct(address_state) from customers")

address_state
Helping Kenya's D...
175 (total projec...
223xx
AZ
SC
"so Plan """"C"""" is ..."
I am 56 yrs. old ...
financially I mad...
but no one will l...
LA


In [39]:
spark.sql("select count(address_state) from customers where length(address_state)>2")

count(address_state)
254


In [40]:
from pyspark.sql.functions import when, col, length

In [41]:
customers_state_cleaned = customers_emplength_replaced.withColumn(
    "address_state",
    when(length(col("address_state"))> 2, "NA").otherwise(col("address_state"))
)

In [42]:
customers_state_cleaned

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
7986fdf230717025a...,Family Support Wo...,1,RENT,32000.0,KY,421xx,USA,B,B1,Source Verified,57322.0,Individual,,,2024-02-11 00:15:...
1ec06e28c1b24ee32...,Teacher,7,RENT,64000.0,CA,952xx,USA,D,D3,Source Verified,16300.0,Individual,,,2024-02-11 00:15:...
3426f48d03064b3b9...,CEO,10,OWN,110000.0,MN,551xx,USA,B,B5,Not Verified,387565.0,Joint App,170000.0,Not Verified,2024-02-11 00:15:...
7883d38ce92329af1...,Wellness Nurse,6,MORTGAGE,60000.0,CA,923xx,USA,A,A1,Source Verified,388900.0,Individual,,,2024-02-11 00:15:...
77689cc8225a60f1e...,Medical doctor,3,MORTGAGE,110000.0,NV,891xx,USA,B,B4,Source Verified,565401.0,Individual,,,2024-02-11 00:15:...
504c4dd65674ed6ce...,Director of Techn...,5,MORTGAGE,136500.0,TX,751xx,USA,D,D3,Not Verified,187300.0,Individual,,,2024-02-11 00:15:...
a4e19e56298e28e0b...,Vice President,10,MORTGAGE,165000.0,WA,980xx,USA,A,A1,Verified,950663.0,Individual,,,2024-02-11 00:15:...
2e9901edcadab0a32...,police officer,6,RENT,100000.0,NY,117xx,USA,B,B2,Not Verified,41797.0,Individual,,,2024-02-11 00:15:...
5169bacc7091e2385...,Driver,1,MORTGAGE,60000.0,MO,657xx,USA,D,D5,Not Verified,143537.0,Joint App,104200.0,Not Verified,2024-02-11 00:15:...
2849046e8825b583f...,Lab tech,10,OWN,90000.0,MD,209xx,USA,A,A4,Source Verified,66800.0,Individual,,,2024-02-11 00:15:...


In [43]:
customers_state_cleaned.select("address_state").distinct()

address_state
AZ
SC
LA
MN
NJ
DC
OR
""
VA
""


In [45]:
customers_state_cleaned.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv008041/lendingclubproject/raw/cleaned/customers_parquet") \
.save()

In [46]:
customers_state_cleaned.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv008041/lendingclubproject/raw/cleaned/customers_csv") \
.save()