In [0]:
%fs ls /mnt/lending_club/landing/customers_df/csv/single

path,name,size,modificationTime
dbfs:/mnt/lending_club/landing/customers_df/csv/single/_SUCCESS,_SUCCESS,0,1702882677000
dbfs:/mnt/lending_club/landing/customers_df/csv/single/_committed_3236819910337846505,_committed_3236819910337846505,113,1702882677000
dbfs:/mnt/lending_club/landing/customers_df/csv/single/_started_3236819910337846505,_started_3236819910337846505,0,1702882668000
dbfs:/mnt/lending_club/landing/customers_df/csv/single/part-00000-tid-3236819910337846505-cec1c320-11df-4a4a-8114-f0d0d2bfc2c9-214-1-c000.csv,part-00000-tid-3236819910337846505-cec1c320-11df-4a4a-8114-f0d0d2bfc2c9-214-1-c000.csv,354975108,1702882677000


In [0]:
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, StringType, FloatType
from pyspark.sql.functions import regexp_replace, concat_ws
from pyspark.sql.functions import when, col, length
from pyspark.sql.functions import current_timestamp
from pyspark.sql.functions import avg

In [0]:
customers_schema = StructType([
    StructField("member_id", StringType(), True)
    ,StructField("emp_title", StringType(), True)
    ,StructField("emp_length", StringType(), True)
    ,StructField("home_ownership", StringType(), True)
    ,StructField("annual_inc", FloatType(), True)
    ,StructField("addr_state", StringType(), True)
    ,StructField("grade", StringType(), True)
    ,StructField("zip_code", StringType(), True)
    ,StructField("sub_grade", StringType(), True)
    ,StructField("verification_status", StringType(), True)
    ,StructField("tot_hi_cred_lim", FloatType(), True)
    ,StructField("application_type", StringType(), True)
    ,StructField("annual_inc_joint", FloatType(), True)
    ,StructField("verification_status_joint", StringType(), True)
    ,StructField("country", StringType(), True)

])

In [0]:
customers_df = spark.read\
    .option("header", True)\
    .format("csv")\
    .schema(customers_schema)\
    .load("dbfs:/mnt/lending_club/landing/customers_df/csv/single/part-00000-tid-3236819910337846505-cec1c320-11df-4a4a-8114-f0d0d2bfc2c9-214-1-c000.csv")
customers_df.show()

+--------------------+--------------------+----------+--------------+----------+----------+-----+--------+---------+-------------------+---------------+----------------+----------------+-------------------------+-------+
|           member_id|           emp_title|emp_length|home_ownership|annual_inc|addr_state|grade|zip_code|sub_grade|verification_status|tot_hi_cred_lim|application_type|annual_inc_joint|verification_status_joint|country|
+--------------------+--------------------+----------+--------------+----------+----------+-----+--------+---------+-------------------+---------------+----------------+----------------+-------------------------+-------+
|af2c01919a67ad070...|             leadman| 10+ years|      MORTGAGE|   55000.0|        PA|190xx|       C|       C4|       Not Verified|       178050.0|      Individual|            NULL|                     NULL|    USA|
|51cf0089ac3e1beeb...|            Engineer| 10+ years|      MORTGAGE|   65000.0|        SD|577xx|       C|       C1|

In [0]:
customers_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- country: string (nullable = true)




# Rename cols

In [0]:
customers_df_renamed = customers_df.withColumnRenamed("annual_inc", "annual_income") \
.withColumnRenamed("addr_state", "address_state") \
.withColumnRenamed("zip_code", "address_zipcode") \
.withColumnRenamed("country", "address_country") \
.withColumnRenamed("tot_hi_credit_lim", "total_high_credit_limit") \
.withColumnRenamed("annual_inc_joint", "joint_annual_income")

customers_df_renamed.show()

+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|grade|address_zipcode|sub_grade|verification_status|tot_hi_cred_lim|application_type|joint_annual_income|verification_status_joint|address_country|
+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+
|af2c01919a67ad070...|             leadman| 10+ years|      MORTGAGE|      55000.0|           PA|190xx|              C|       C4|       Not Verified|       178050.0|      Individual|               NULL|                     NULL|            USA|
|51cf0089ac3e1beeb..


####  insert a new column named as ingestion date(current time)

In [0]:
customers_df_ingested_ts = customers_df_renamed.withColumn("ingest_date", current_timestamp())
customers_df_ingested_ts.show()

+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|grade|address_zipcode|sub_grade|verification_status|tot_hi_cred_lim|application_type|joint_annual_income|verification_status_joint|address_country|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|af2c01919a67ad070...|             leadman| 10+ years|      MORTGAGE|      55000.0|           PA|190xx|              C|       C4|       Not Verified|       178050.0|      Individual|               NULL|

In [0]:
customers_df_ingested_ts.count()

2260701

####  Remove complete duplicate rows

In [0]:
customers_df_ingested_ts_distinct = customers_df_ingested_ts.distinct()
customers_df_ingested_ts_distinct_count = customers_df_ingested_ts_distinct.count()
customers_df_ingested_ts_count = customers_df_ingested_ts.count()
duplicates = customers_df_ingested_ts_count - customers_df_ingested_ts_distinct_count

In [0]:
print(f"Total no of rows : {customers_df_ingested_ts_count}")
print(f"Total no of distinct rows : {customers_df_ingested_ts_distinct_count}")
print(f"Total no of duplicate rows : {duplicates} \n")
print(f"\n ************************** no of duplicate rows are {duplicates}  **************************")

Total no of rows : 2260701
Total no of distinct rows : 2260638
Total no of duplicate rows : 63 


 ************************** no of duplicate rows are 63  **************************


In [0]:
customers_filtered = customers_df_ingested_ts.filter("annual_income is not null")
customers_filtered.show()

+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|grade|address_zipcode|sub_grade|verification_status|tot_hi_cred_lim|application_type|joint_annual_income|verification_status_joint|address_country|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|af2c01919a67ad070...|             leadman| 10+ years|      MORTGAGE|      55000.0|           PA|190xx|              C|       C4|       Not Verified|       178050.0|      Individual|               NULL|

In [0]:
customers_filtered.filter("annual_income is null").count()

0

### convert emp_length to integer

In [0]:
customers_emp_length_cleaned = customers_filtered.withColumn("emp_length", regexp_replace(col("emp_length"), "(\D)", ""))
customers_emp_length_cleaned.show()

+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|grade|address_zipcode|sub_grade|verification_status|tot_hi_cred_lim|application_type|joint_annual_income|verification_status_joint|address_country|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|af2c01919a67ad070...|             leadman|        10|      MORTGAGE|      55000.0|           PA|190xx|              C|       C4|       Not Verified|       178050.0|      Individual|               NULL|

In [0]:
customers_emp_length_cleaned = customers_emp_length_cleaned.withColumn("emp_length", customers_emp_length_cleaned.emp_length.cast("int"))
customers_emp_length_cleaned.show()

+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|grade|address_zipcode|sub_grade|verification_status|tot_hi_cred_lim|application_type|joint_annual_income|verification_status_joint|address_country|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|af2c01919a67ad070...|             leadman|        10|      MORTGAGE|      55000.0|           PA|190xx|              C|       C4|       Not Verified|       178050.0|      Individual|               NULL|

In [0]:
customers_emp_length_cleaned.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- joint_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)




# Replace al the nulls in emp_length with average of the column

In [0]:
customers_emp_length_cleaned.filter("emp_length is null").count()

146907

In [0]:
average = customers_emp_length_cleaned.select(avg("emp_length")).collect()[0][0]

print(f"The average of column_name is: {average}")

The average of column_name is: 6.021252216433685


In [0]:
customers_emp_length_cleaned_average = customers_emp_length_cleaned.na.fill(average, subset=["emp_length"])
customers_emp_length_cleaned_average.show()

+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|grade|address_zipcode|sub_grade|verification_status|tot_hi_cred_lim|application_type|joint_annual_income|verification_status_joint|address_country|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|af2c01919a67ad070...|             leadman|        10|      MORTGAGE|      55000.0|           PA|190xx|              C|       C4|       Not Verified|       178050.0|      Individual|               NULL|

In [0]:
customers_emp_length_cleaned_average.filter("emp_length is null").count()

0


####  Clean the address_state(it should be 2 characters only),replace all others with NA

In [0]:
customers_emp_length_cleaned_average.filter(length(col("address_state"))>2).show()

+--------------------+--------------------+----------+--------------+-------------+--------------------+--------------------+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|       address_state|               grade|address_zipcode|sub_grade|verification_status|tot_hi_cred_lim|application_type|joint_annual_income|verification_status_joint|address_country|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+--------------------+--------------------+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|7a3dcd449914b0ab5...|          hr manager|         2|          RENT|      65000.0|  debt_consolidation|as long as it doe...|           

In [0]:
customers_state_cleaned = customers_emp_length_cleaned_average.withColumn(
    "address_state",
    when(length(col("address_state"))> 2, "NA").otherwise(col("address_state"))
)
customers_state_cleaned.show()

+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|grade|address_zipcode|sub_grade|verification_status|tot_hi_cred_lim|application_type|joint_annual_income|verification_status_joint|address_country|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|af2c01919a67ad070...|             leadman|        10|      MORTGAGE|      55000.0|           PA|190xx|              C|       C4|       Not Verified|       178050.0|      Individual|               NULL|

In [0]:
customers_state_cleaned = customers_state_cleaned.withColumn("address_state", concat_ws("||", col("address_country"), col("address_state")))
customers_state_cleaned.show()

+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|grade|address_zipcode|sub_grade|verification_status|tot_hi_cred_lim|application_type|joint_annual_income|verification_status_joint|address_country|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+-------------+-----+---------------+---------+-------------------+---------------+----------------+-------------------+-------------------------+---------------+--------------------+
|af2c01919a67ad070...|             leadman|        10|      MORTGAGE|      55000.0|      USA||PA|190xx|              C|       C4|       Not Verified|       178050.0|      Individual|               NULL|

In [0]:
customers_state_cleaned.select("verification_status").distinct().show()

+-------------------+
|verification_status|
+-------------------+
|           Verified|
|    Source Verified|
|       Not Verified|
+-------------------+



In [0]:
customers_state_cleaned.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/mnt/lending_club/staging/customers/partitioned/parquet") \
.partitionBy("verification_status")\
.save()

In [0]:
customers_state_cleaned.write \
.format("csv") \
.mode("overwrite") \
.option("path", "/mnt/lending_club/staging/customers/partitioned/csv") \
.partitionBy("verification_status")\
.save()

In [0]:
customers_state_cleaned.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/mnt/lending_club/staging/customers/non_partitioned/parquet") \
.save()

In [0]:
customers_state_cleaned.write \
.format("csv") \
.mode("overwrite") \
.option("path", "/mnt/lending_club/staging/customers/non_partitioned/csv") \
.save()