In [30]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()

spark = SparkSession.builder.appName("Practice-itv021172-2 ").config('spark.ui.port','0').config('spark.sql.warehouse.dir',f"/user/itv021172/warehouse").enableHiveSupport().master('yarn').getOrCreate()

In [31]:
schema = '''member_id string,emp_title string,
emp_length string,home_ownership string,annual_income float,addr_state string,zip_code string,
country string,grade string,sub_grade string,
verification_status string,tot_hi_cred_lim float,application_type string,
 annual_inc_joint float,verification_status_joint string
'''

In [32]:
df = spark.read.csv('/public/trendytech/lendingclubproject/raw/customers_data_csv/part-00000-694b65c5-0b8f-4760-ac12-2b59978292e0-c000.csv',schema,header=True)

In [33]:
col_df = df.withColumnRenamed('addr_state','address_state') \
.withColumnRenamed('country','address_country') \
.withColumnRenamed('tot_hi_cred_lim','total_high_credit_limit') \
.withColumnRenamed('annual_inc_joint','annual_joint_income') \


In [34]:
col_df

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,zip_code,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,annual_joint_income,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


In [35]:
from pyspark.sql.functions import *

In [36]:
cusst_df = col_df.withColumn('Ingest_date',current_timestamp())

In [37]:
cusst_df

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,zip_code,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,annual_joint_income,verification_status_joint,Ingest_date
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,,2025-08-25 10:07:...
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,,2025-08-25 10:07:...
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,,2025-08-25 10:07:...
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,,2025-08-25 10:07:...
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,,2025-08-25 10:07:...
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,,2025-08-25 10:07:...
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,,2025-08-25 10:07:...
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,,2025-08-25 10:07:...
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,,2025-08-25 10:07:...
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,,2025-08-25 10:07:...


In [38]:
cust_distinct = cusst_df.distinct()

In [39]:
cust_nondup = cust_distinct.dropna(subset=['annual_income'])

In [40]:
cust_nondup.createOrReplaceTempView('Customer')

In [41]:
cust_mod = cust_nondup.withColumn('emp_length', regexp_replace(col('emp_length'),'\D',''))

In [42]:
custfin =  cust_mod.withColumn('emp_length',cust_mod['emp_length'].cast("int"))

In [43]:
custfin.select(avg('emp_length'))

avg(emp_length)
6.021258628112389


In [44]:
cust_null = custfin.fillna( 6 , subset=['emp_length'])

In [45]:
cust_statefilter = cust_null.withColumn('address_state',when(length(col('address_state'))>2,'NA').otherwise(col('address_state')))

In [46]:
cust_statefilter.show()

In [49]:
cust_statefilter.write.format('parquet').mode('overwrite').option('path','data/cleaned').save()

In [48]:
cust_statefilter.write.format('csv').mode('overwrite').option('path','data/cleaned').save()