# Credit Risk Assessment

## Cleaning and Preprocessing

### Setting SparkContext and SparkSession

In [1]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataSet Cleaning and Preprocessing").enableHiveSupport().getOrCreate()

sc = spark.sparkContext

### Loading csv file into spark dataframe

In [2]:
file_path = "file:////home/talentum/shared/Project/project/GiveMeSomeCredit-training.csv"

df = spark.read.csv(file_path,header=True,inferSchema=True)

###  understand the data types and structure.

In [3]:
# Shape of Data
print("Number of Records : ",df.count())
print("Number of Columns : ", len(df.columns))

Number of Records :  150000
Number of Columns :  12


In [4]:
# Schema of Data
print(df.printSchema())

root
 |-- _c0: integer (nullable = true)
 |-- SeriousDlqin2yrs: integer (nullable = true)
 |-- RevolvingUtilizationOfUnsecuredLines: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- NumberOfTime30-59DaysPastDueNotWorse: integer (nullable = true)
 |-- DebtRatio: double (nullable = true)
 |-- MonthlyIncome: string (nullable = true)
 |-- NumberOfOpenCreditLinesAndLoans: integer (nullable = true)
 |-- NumberOfTimes90DaysLate: integer (nullable = true)
 |-- NumberRealEstateLoansOrLines: integer (nullable = true)
 |-- NumberOfTime60-89DaysPastDueNotWorse: integer (nullable = true)
 |-- NumberOfDependents: string (nullable = true)

None


In [5]:
# Droping "_c0" column
print(len(df.columns))
df = df.drop('_c0')
print(len(df.columns))

12
11


### Data type conversion

In [6]:
# Column  MonthlyIncome, NumberOfDependents have String Datatype.
# Converting Datatype to Integer
from pyspark.sql.types import IntegerType, DoubleType

print(df.select(df.MonthlyIncome).show(10))
print(df.select(df.NumberOfDependents).show(10))

df = df.withColumn("NumberOfDependents", df["NumberOfDependents"].cast(IntegerType()))
df = df.withColumn("MonthlyIncome", df["MonthlyIncome"].cast(DoubleType()))

df.printSchema()

+-------------+
|MonthlyIncome|
+-------------+
|         9120|
|         2600|
|         3042|
|         3300|
|        63588|
|         3500|
|           NA|
|         3500|
|           NA|
|        23684|
+-------------+
only showing top 10 rows

None
+------------------+
|NumberOfDependents|
+------------------+
|                 2|
|                 1|
|                 0|
|                 0|
|                 0|
|                 1|
|                 0|
|                 0|
|                NA|
|                 2|
+------------------+
only showing top 10 rows

None
root
 |-- SeriousDlqin2yrs: integer (nullable = true)
 |-- RevolvingUtilizationOfUnsecuredLines: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- NumberOfTime30-59DaysPastDueNotWorse: integer (nullable = true)
 |-- DebtRatio: double (nullable = true)
 |-- MonthlyIncome: double (nullable = true)
 |-- NumberOfOpenCreditLinesAndLoans: integer (nullable = true)
 |-- NumberOfTimes90DaysLate: integer (null

### Missing values

In [7]:
# Dataframe have "NA" values
# Replace 'NA' values with null
df = df.replace('NA', None)

# Detect missing values
from pyspark.sql.functions import col, when, isnull, count
missing_values = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])
missing_values.select(missing_values.columns[0:5]).show()
missing_values.select(missing_values.columns[5:9]).show()
missing_values.select(missing_values.columns[9:12]).show()

+----------------+------------------------------------+---+------------------------------------+---------+
|SeriousDlqin2yrs|RevolvingUtilizationOfUnsecuredLines|age|NumberOfTime30-59DaysPastDueNotWorse|DebtRatio|
+----------------+------------------------------------+---+------------------------------------+---------+
|               0|                                   0|  0|                                   0|        0|
+----------------+------------------------------------+---+------------------------------------+---------+

+-------------+-------------------------------+-----------------------+----------------------------+
|MonthlyIncome|NumberOfOpenCreditLinesAndLoans|NumberOfTimes90DaysLate|NumberRealEstateLoansOrLines|
+-------------+-------------------------------+-----------------------+----------------------------+
|        29731|                              0|                      0|                           0|
+-------------+-------------------------------+-------------

### Handling missing values

In [8]:
# MonthlyIncome and Number of Dependents feature have null values 
# Impute missing values
from pyspark.sql import Window

# 1. Impute MonthlyIncome with median
median_income = df.approxQuantile("MonthlyIncome", [0.5], 0.0)[0]
print(f"Median MonthlyIncome: {median_income}")
df = df.withColumn("MonthlyIncome", when(col("MonthlyIncome").isNull(), median_income).otherwise(col("MonthlyIncome")))

# 2. Impute NumberOfDependents with mode
mode_window = Window.partitionBy("NumberOfDependents").orderBy(col("NumberOfDependents").desc())
mode_dependents = df.groupBy("NumberOfDependents").count().orderBy("count", ascending=False).first()[0]
df = df.withColumn("NumberOfDependents", when(col("NumberOfDependents").isNull(), mode_dependents).otherwise(col("NumberOfDependents")))
print(f"Mode NumberOfDependents: {mode_dependents}")

# Display the cleaned and transformed data
df.select('MonthlyIncome','NumberOfDependents').show(10)

Median MonthlyIncome: 5400.0
Mode NumberOfDependents: 0
+-------------+------------------+
|MonthlyIncome|NumberOfDependents|
+-------------+------------------+
|       9120.0|                 2|
|       2600.0|                 1|
|       3042.0|                 0|
|       3300.0|                 0|
|      63588.0|                 0|
|       3500.0|                 1|
|       5400.0|                 0|
|       3500.0|                 0|
|       5400.0|                 0|
|      23684.0|                 2|
+-------------+------------------+
only showing top 10 rows



### Summary of data

In [9]:
df.select(df.columns[0:4]).summary().show()
df.select(df.columns[4:8]).summary().show()
df.select(df.columns[8:12]).summary().show()


+-------+-------------------+------------------------------------+------------------+------------------------------------+
|summary|   SeriousDlqin2yrs|RevolvingUtilizationOfUnsecuredLines|               age|NumberOfTime30-59DaysPastDueNotWorse|
+-------+-------------------+------------------------------------+------------------+------------------------------------+
|  count|             150000|                              150000|            150000|                              150000|
|   mean|            0.06684|                   6.048438054666792|52.295206666666665|                  0.4210333333333333|
| stddev|0.24974553092871946|                  249.75537062544046|14.771865863100334|                    4.19278127201834|
|    min|                  0|                                 0.0|                 0|                                   0|
|    25%|                  0|                         0.029859118|                41|                                   0|
|    50%|       

### Storing Dataframe With Outliers (Cleaned) into HDFS

In [10]:
# Define the output directory in HDFS
hdfs_output_dir = "hdfs:///user/talentum/processed_data/cleaned_data_with_outliers"

# Save the DataFrame to HDFS in overwrite mode
df.coalesce(1).write.mode("overwrite").csv(hdfs_output_dir, header=True)


### Storing Dataframe With Outliers (Cleaned) into HIVE

In [12]:
df.write.mode("overwrite").saveAsTable("cleaned_data_with_outliers")

# Query the Hive table to verify
spark.sql("describe cleaned_data_with_outliers").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|    SeriousDlqin2yrs|      int|   null|
|RevolvingUtilizat...|   double|   null|
|                 age|      int|   null|
|NumberOfTime30-59...|      int|   null|
|           DebtRatio|   double|   null|
|       MonthlyIncome|   double|   null|
|NumberOfOpenCredi...|      int|   null|
|NumberOfTimes90Da...|      int|   null|
|NumberRealEstateL...|      int|   null|
|NumberOfTime60-89...|      int|   null|
|  NumberOfDependents|      int|   null|
+--------------------+---------+-------+



### Outlier detection and handling

In [None]:

from pyspark.sql.functions import col, when, round


# Function to cap outliers using IQR
def cap_outliers(col_name, df):
    quantiles = df.approxQuantile(col_name, [0.25, 0.75], 0.05)
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    df = df.withColumn(col_name, when(col(col_name) < lower_bound, lower_bound)
                                    .when(col(col_name) > upper_bound, upper_bound)
                                    .otherwise(col(col_name)))
    return df


# Drop records where age is less than 21
df = df.filter(col("age") >= 21)

# Applying capping for columns with potential outliers using IQR
iqr_columns = ["age", "DebtRatio", "MonthlyIncome", "RevolvingUtilizationOfUnsecuredLines"]
for col_name in iqr_columns:
    df = cap_outliers(col_name, df)

# Round off the values after applying IQR to the age column and convert to integer
df = df.withColumn("age", round(col("age"), 0).cast("integer"))

# Handle outliers by dropping records with values greater than specified thresholds
df = df.filter(col("NumberOfTimes90DaysLate") <= 8)
df = df.filter(col("NumberOfTime60-89DaysPastDueNotWorse") <= 12)
df = df.filter(col("NumberOfTime30-59DaysPastDueNotWorse") <= 24)





In [None]:
df.select(df.columns[0:4]).summary().show()
df.select(df.columns[4:8]).summary().show()
df.select(df.columns[8:12]).summary().show()


### Feature engineering

In [None]:
# Create a new column 'DebtRatioCategory'

df = df.withColumn("DebtRatioCategory", when(col("DebtRatio") < 0.2, "Low")
                                       .when(col("DebtRatio") < 0.5, "Medium")
                                       .otherwise("High"))

print("Number of Columns : ",len(df.columns))
df.select('DebtRatioCategory').printSchema()
df.select('DebtRatioCategory').show(5)

### Storing  DateFrame WITHOUT OUTLIERS In HDFS 

In [None]:
# Define the output directory in HDFS
hdfs_output_dir = "hdfs:///user/talentum/processed_data/cleaned_data_without_outliers"

# Save the DataFrame to HDFS in overwrite mode
df.coalesce(1).write.mode("overwrite").csv(hdfs_output_dir, header=True)


### Storing  DateFrame WITHOUT OUTLIERS In Hive

In [None]:
df.write.mode("overwrite").saveAsTable("cleaned_data_without_outliers")

# Query the Hive table to verify
spark.sql("describe cleaned_data_without_outliers").show()

### Normalization/Scaling

In [None]:
# Normalization/Scaling (example: Min-Max Scaling for numerical columns)
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

# List of numerical columns to scale
numerical_cols = ["RevolvingUtilizationOfUnsecuredLines", "age", "DebtRatio", "MonthlyIncome", "NumberOfOpenCreditLinesAndLoans"]
assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")
assembled_df = assembler.transform(df)

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(assembled_df)
scaled_df = scaler_model.transform(assembled_df)

# Display the cleaned and transformed data
scaled_df.select(scaled_df.columns[0:5]).show(5)
scaled_df.select(scaled_df.columns[5:9]).show(5)
scaled_df.select(scaled_df.columns[9:12]).show(5)

### Storing  DateFrame SCALLED  In HDFS 

In [None]:
# Define the output directory in HDFS
hdfs_output_dir = "hdfs:///user/talentum/processed_data/cleaned_data_scalled"

# Save the DataFrame to HDFS in overwrite mode
df.coalesce(1).write.mode("overwrite").csv(hdfs_output_dir, header=True)

### Storing  DateFrame WITHOUT OUTLIERS In Hive

In [None]:
df.write.mode("overwrite").saveAsTable("cleaned_data_scalled")

# Query the Hive table to verify
spark.sql("describe cleaned_data_scalled").show()