In [1]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL Pipeline Project").enableHiveSupport().getOrCreate()


In [5]:
loan_df=spark.read.csv("/content/Loan_Default.csv",header=True,inferSchema=True)

In [6]:
loan_df.show()

+-----+----+----------+-----------------+-------------+---------+------------+-----------------+-----------+----------------------+-----------+----------------+--------------------+---------------+----+-----------------+-------------+----------------+--------------+-----------------+--------------+----------+-----------+------+-----------+------------+------------------------+-----+-------------------------+-----------+-------+-------------+------+-----+
|   ID|year|loan_limit|           Gender|approv_in_adv|loan_type|loan_purpose|Credit_Worthiness|open_credit|business_or_commercial|loan_amount|rate_of_interest|Interest_rate_spread|Upfront_charges|term|Neg_ammortization|interest_only|lump_sum_payment|property_value|construction_type|occupancy_type|Secured_by|total_units|income|credit_type|Credit_Score|co-applicant_credit_type|  age|submission_of_application|        LTV| Region|Security_Type|Status|dtir1|
+-----+----+----------+-----------------+-------------+---------+------------+----

In [9]:
loan_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- loan_limit: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- approv_in_adv: string (nullable = true)
 |-- loan_type: string (nullable = true)
 |-- loan_purpose: string (nullable = true)
 |-- Credit_Worthiness: string (nullable = true)
 |-- open_credit: string (nullable = true)
 |-- business_or_commercial: string (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- rate_of_interest: double (nullable = true)
 |-- Interest_rate_spread: double (nullable = true)
 |-- Upfront_charges: double (nullable = true)
 |-- term: integer (nullable = true)
 |-- Neg_ammortization: string (nullable = true)
 |-- interest_only: string (nullable = true)
 |-- lump_sum_payment: string (nullable = true)
 |-- property_value: integer (nullable = true)
 |-- construction_type: string (nullable = true)
 |-- occupancy_type: string (nullable = true)
 |-- Secured_by: string (nullable = true)
 |-- total_un

In [8]:
from pyspark.sql.functions import col, sum
# Checking null values column-wise
loan_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in loan_df.columns]).show()

+---+----+----------+------+-------------+---------+------------+-----------------+-----------+----------------------+-----------+----------------+--------------------+---------------+----+-----------------+-------------+----------------+--------------+-----------------+--------------+----------+-----------+------+-----------+------------+------------------------+---+-------------------------+-----+------+-------------+------+-----+
| ID|year|loan_limit|Gender|approv_in_adv|loan_type|loan_purpose|Credit_Worthiness|open_credit|business_or_commercial|loan_amount|rate_of_interest|Interest_rate_spread|Upfront_charges|term|Neg_ammortization|interest_only|lump_sum_payment|property_value|construction_type|occupancy_type|Secured_by|total_units|income|credit_type|Credit_Score|co-applicant_credit_type|age|submission_of_application|  LTV|Region|Security_Type|Status|dtir1|
+---+----+----------+------+-------------+---------+------------+-----------------+-----------+----------------------+--------

In [15]:
no_years = loan_df.select("year").distinct()
no_years.show()

+----+
|year|
+----+
|2019|
+----+



In [16]:
from pyspark.sql import functions as F

In [18]:
region_highest_loan = loan_df.groupBy("region").agg(F.max("loan_amount").alias("max_loan_amount")).orderBy(F.desc("max_loan_amount"))
region_highest_loan.show()

+----------+---------------+
|    region|max_loan_amount|
+----------+---------------+
|     North|        3576500|
|     south|        3346500|
|   central|        1916500|
|North-East|        1276500|
+----------+---------------+



In [19]:
numeric_cols = ['loan_amount', 'rate_of_interest', 'income']

# Calculate mean for all numeric columns
mean_dict = {col: loan_df.select(F.mean(col)).collect()[0][0] for col in numeric_cols}

# Fill null values in multiple columns
loan_df = loan_df.fillna(mean_dict)

loan_df.show()


+-----+----+----------+-----------------+-------------+---------+------------+-----------------+-----------+----------------------+-----------+-----------------+--------------------+---------------+----+-----------------+-------------+----------------+--------------+-----------------+--------------+----------+-----------+------+-----------+------------+------------------------+-----+-------------------------+-----------+-------+-------------+------+-----+
|   ID|year|loan_limit|           Gender|approv_in_adv|loan_type|loan_purpose|Credit_Worthiness|open_credit|business_or_commercial|loan_amount| rate_of_interest|Interest_rate_spread|Upfront_charges|term|Neg_ammortization|interest_only|lump_sum_payment|property_value|construction_type|occupancy_type|Secured_by|total_units|income|credit_type|Credit_Score|co-applicant_credit_type|  age|submission_of_application|        LTV| Region|Security_Type|Status|dtir1|
+-----+----+----------+-----------------+-------------+---------+------------+--

In [20]:
# Show null count after imputation
loan_df.select([F.sum(col(c).isNull().cast("int")).alias(c) for c in numeric_cols]).show()


+-----------+----------------+------+
|loan_amount|rate_of_interest|income|
+-----------+----------------+------+
|          0|               0|     0|
+-----------+----------------+------+



In [22]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Define UDF Function
def loan_income_ratio(loan_amount, income):
    if income == 0 or income is None:
        return 0
    return loan_amount / income

# Register UDF with DoubleType
udf_loan_income_ratio = udf(loan_income_ratio, DoubleType())

# Apply UDF using withColumn
loan_df = loan_df.withColumn('loan_income_ratio', udf_loan_income_ratio(loan_df["loan_amount"], loan_df["income"]))

# Show Result
loan_df.select("loan_amount", "income", "loan_income_ratio").show(5)


+-----------+------+------------------+
|loan_amount|income| loan_income_ratio|
+-----------+------+------------------+
|     116500|  1740| 66.95402298850574|
|     206500|  4980| 41.46586345381526|
|     406500|  9480|42.879746835443036|
|     456500| 11880|38.425925925925924|
|     696500| 10440| 66.71455938697318|
+-----------+------+------------------+
only showing top 5 rows



In [23]:
loan_df.explain()

== Physical Plan ==
*(2) Project [ID#17, year#18, loan_limit#19, Gender#20, approv_in_adv#21, loan_type#22, loan_purpose#23, Credit_Worthiness#24, open_credit#25, business_or_commercial#26, loan_amount#867, rate_of_interest#868, Interest_rate_spread#29, Upfront_charges#30, term#31, Neg_ammortization#32, interest_only#33, lump_sum_payment#34, property_value#35, construction_type#36, occupancy_type#37, Secured_by#38, total_units#39, income#869, ... 11 more fields]
+- BatchEvalPython [loan_income_ratio(loan_amount#867, income#869)#1109], [pythonUDF0#1165]
   +- *(1) Project [ID#17, year#18, loan_limit#19, Gender#20, approv_in_adv#21, loan_type#22, loan_purpose#23, Credit_Worthiness#24, open_credit#25, business_or_commercial#26, coalesce(loan_amount#27, 331117) AS loan_amount#867, coalesce(nanvl(rate_of_interest#28, null), 4.045475804367209) AS rate_of_interest#868, Interest_rate_spread#29, Upfront_charges#30, term#31, Neg_ammortization#32, interest_only#33, lump_sum_payment#34, property_v

In [24]:
loan_df.write.parquet('/content/loan_data.parquet')

In [25]:
df = spark.read.parquet('/content/loan_data.parquet')
df.show()

+-----+----+----------+-----------------+-------------+---------+------------+-----------------+-----------+----------------------+-----------+-----------------+--------------------+---------------+----+-----------------+-------------+----------------+--------------+-----------------+--------------+----------+-----------+------+-----------+------------+------------------------+-----+-------------------------+-----------+-------+-------------+------+-----+------------------+
|   ID|year|loan_limit|           Gender|approv_in_adv|loan_type|loan_purpose|Credit_Worthiness|open_credit|business_or_commercial|loan_amount| rate_of_interest|Interest_rate_spread|Upfront_charges|term|Neg_ammortization|interest_only|lump_sum_payment|property_value|construction_type|occupancy_type|Secured_by|total_units|income|credit_type|Credit_Score|co-applicant_credit_type|  age|submission_of_application|        LTV| Region|Security_Type|Status|dtir1| loan_income_ratio|
+-----+----+----------+-----------------+-