# info

In [1]:
import pyspark

In [2]:
pyspark.__version__

'3.5.2'

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.getOrCreate()

25/01/25 22:22:18 WARN Utils: Your hostname, Yanellys-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.14 instead (on interface en0)
25/01/25 22:22:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/25 22:22:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/25 22:22:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [6]:
data = spark.read.csv('LoansTrainingSet.csv', header=True)
data.show()

                                                                                

+--------------------+--------------------+-----------+-------------------+----------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+
|             Loan ID|         Customer ID|Loan Status|Current Loan Amount|      Term|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|
+--------------------+--------------------+-----------+-------------------+----------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+--------------

In [7]:
data.count()

                                                                                

256984

In [8]:
data.columns

['Loan ID',
 'Customer ID',
 'Loan Status',
 'Current Loan Amount',
 'Term',
 'Credit Score',
 'Years in current job',
 'Home Ownership',
 'Annual Income',
 'Purpose',
 'Monthly Debt',
 'Years of Credit History',
 'Months since last delinquent',
 'Number of Open Accounts',
 'Number of Credit Problems',
 'Current Credit Balance',
 'Maximum Open Credit',
 'Bankruptcies',
 'Tax Liens']

In [9]:
data.printSchema()

root
 |-- Loan ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Loan Status: string (nullable = true)
 |-- Current Loan Amount: string (nullable = true)
 |-- Term: string (nullable = true)
 |-- Credit Score: string (nullable = true)
 |-- Years in current job: string (nullable = true)
 |-- Home Ownership: string (nullable = true)
 |-- Annual Income: string (nullable = true)
 |-- Purpose: string (nullable = true)
 |-- Monthly Debt: string (nullable = true)
 |-- Years of Credit History: string (nullable = true)
 |-- Months since last delinquent: string (nullable = true)
 |-- Number of Open Accounts: string (nullable = true)
 |-- Number of Credit Problems: string (nullable = true)
 |-- Current Credit Balance: string (nullable = true)
 |-- Maximum Open Credit: string (nullable = true)
 |-- Bankruptcies: string (nullable = true)
 |-- Tax Liens: string (nullable = true)



In [10]:
data.write.csv('Revised_Loans', header=True)

                                                                                

In [12]:
df = spark.read.csv('Revised_Loans', header=True)
df.show()

+--------------------+--------------------+-----------+-------------------+----------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+
|             Loan ID|         Customer ID|Loan Status|Current Loan Amount|      Term|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|
+--------------------+--------------------+-----------+-------------------+----------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+--------------

In [13]:
from pyspark.sql import functions as f

# missing values

In [14]:
df.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+-------+-----------+-----------+-------------------+----+------------+--------------------+--------------+-------------+-------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+
|Loan ID|Customer ID|Loan Status|Current Loan Amount|Term|Credit Score|Years in current job|Home Ownership|Annual Income|Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|
+-------+-----------+-----------+-------------------+----+------------+--------------------+--------------+-------------+-------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+
|      0|          0|          0|                  0|   0|      

                                                                                

In [15]:
from pyspark.sql.functions import skewness

In [16]:
df.select(skewness('Credit Score')).show()



+----------------------+
|skewness(Credit Score)|
+----------------------+
|    3.0383081629821693|
+----------------------+



                                                                                

In [17]:
# imputing Credit Score and Annual Income with median Imputation for null values
from pyspark.sql.functions import col, when, median
# Calculate the median for Credit Score and Annual Income
median_credit_score = df.select(median(col('Credit Score'))).collect()[0][0]
median_annual_income = df.select(median(col('Annual Income'))).collect()[0][0]

# Imputing the null values with the median
df = df.fillna({'Credit Score': median_credit_score, 'Annual Income': median_annual_income})

# Verifying if null values are imputed
df.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+-------+-----------+-----------+-------------------+----+------------+--------------------+--------------+-------------+-------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+
|Loan ID|Customer ID|Loan Status|Current Loan Amount|Term|Credit Score|Years in current job|Home Ownership|Annual Income|Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|
+-------+-----------+-----------+-------------------+----+------------+--------------------+--------------+-------------+-------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+
|      0|          0|          0|                  0|   0|      

                                                                                

# spelling differences

In [18]:
# spelling differences
df.groupBy('Years in current job').count().show()

[Stage 23:>                                                         (0 + 8) / 8]

+--------------------+-----+
|Years in current job|count|
+--------------------+-----+
|             5 years|17864|
|             9 years| 9932|
|              1 year|16746|
|                 n/a|11476|
|             2 years|23462|
|             7 years|13968|
|             8 years|12206|
|             4 years|16166|
|             6 years|14597|
|             3 years|20659|
|           10+ years|78896|
|            < 1 year|21012|
+--------------------+-----+



                                                                                

In [19]:
df.filter(df['Years in current job'] == 'n/a').count()


11476

In [20]:
# replace n/a with mode imputation
from pyspark.sql.functions import mode
# Find the mode of the 'Years in current job' column
mode_value = df.select(mode(col('Years in current job'))).collect()[0][0]

# Replace 'n/a' with the mode
df = df.withColumn('Years in current job', when(col('Years in current job') == 'n/a', mode_value).otherwise(col('Years in current job')))

# Verify the replacement
df.groupBy('Years in current job').count().show()
df.filter(df['Years in current job'] == 'n/a').count()

                                                                                

+--------------------+-----+
|Years in current job|count|
+--------------------+-----+
|             5 years|17864|
|             9 years| 9932|
|              1 year|16746|
|             2 years|23462|
|             7 years|13968|
|             8 years|12206|
|             4 years|16166|
|             6 years|14597|
|             3 years|20659|
|           10+ years|90372|
|            < 1 year|21012|
+--------------------+-----+



0

In [21]:
df.groupBy('Home Ownership').count().show()

+--------------+------+
|Home Ownership| count|
+--------------+------+
|          Rent|109010|
|      Own Home| 22923|
| Home Mortgage|124477|
|  HaveMortgage|   574|
+--------------+------+



In [22]:
df = df.replace('HaveMortgage', 'Home Mortgage')
df.groupBy('Home Ownership').count().show()

[Stage 41:>                                                         (0 + 8) / 8]

+--------------+------+
|Home Ownership| count|
+--------------+------+
|          Rent|109010|
|      Own Home| 22923|
| Home Mortgage|125051|
+--------------+------+



                                                                                

In [23]:
df.groupBy('Purpose').count().show()

+--------------------+------+
|             Purpose| count|
+--------------------+------+
|           Buy a Car|  3276|
|  Debt Consolidation|203911|
|       Business Loan|  4712|
|   Home Improvements| 14915|
|               other| 14268|
|               Other|  9667|
|Educational Expenses|   267|
|       Medical Bills|  2868|
|         Take a Trip|  1570|
|           Buy House|  1530|
+--------------------+------+



In [24]:
df = df.replace('other', 'Other')
df.groupBy('Purpose').count().show()

+--------------------+------+
|             Purpose| count|
+--------------------+------+
|           Buy a Car|  3276|
|  Debt Consolidation|203911|
|       Business Loan|  4712|
|   Home Improvements| 14915|
|               Other| 23935|
|Educational Expenses|   267|
|       Medical Bills|  2868|
|         Take a Trip|  1570|
|           Buy House|  1530|
+--------------------+------+



In [25]:
df.groupBy('Months since last delinquent').count().show()

+----------------------------+-----+
|Months since last delinquent|count|
+----------------------------+-----+
|                          51| 1080|
|                           7| 2116|
|                          54| 1086|
|                          15| 2189|
|                          11| 1933|
|                          29| 1756|
|                          69|  916|
|                          42| 1601|
|                          73|  969|
|                          87|    9|
|                          64|  975|
|                           3| 1129|
|                          30| 1872|
|                          34| 1694|
|                          59|  995|
|                           8| 2164|
|                          28| 1856|
|                          22| 1857|
|                          85|   12|
|                          35| 1666|
+----------------------------+-----+
only showing top 20 rows



In [26]:
df.select(skewness('Months since last delinquent')).show()



+--------------------------------------+
|skewness(Months since last delinquent)|
+--------------------------------------+
|                    0.4270324234279137|
+--------------------------------------+



                                                                                

In [27]:
# replace NA with mean imputation for Months since last delinquent
from pyspark.sql.functions import mean
# Calculate the mean of the 'Months since last delinquent' column
mean_value = df.select(mean(col('Months since last delinquent'))).collect()[0][0]

# Replace 'NA' with the mean
df = df.withColumn('Months since last delinquent', when(col('Months since last delinquent') == 'NA', mean_value).otherwise(col('Months since last delinquent')))

In [28]:
df.groupBy('Months since last delinquent').count().show()

[Stage 59:>                                                         (0 + 8) / 8]

+----------------------------+-----+
|Months since last delinquent|count|
+----------------------------+-----+
|                          51| 1080|
|                           7| 2116|
|                          54| 1086|
|                          15| 2189|
|                          11| 1933|
|                          29| 1756|
|                          69|  916|
|                          42| 1601|
|                          73|  969|
|                          87|    9|
|                          64|  975|
|                           3| 1129|
|                          30| 1872|
|                          34| 1694|
|                          59|  995|
|                           8| 2164|
|                          28| 1856|
|                          22| 1857|
|                          85|   12|
|                          35| 1666|
+----------------------------+-----+
only showing top 20 rows



                                                                                

In [29]:
df.groupBy('Bankruptcies').count().show()


+------------+------+
|Bankruptcies| count|
+------------+------+
|           3|   180|
|           0|229661|
|          NA|   529|
|           1| 25605|
|           4|    33|
|           2|   957|
|           5|    15|
|           6|     3|
|           7|     1|
+------------+------+



In [30]:
# Find the mode of the 'Bankruptcies' column
mode_bankruptcies = df.select(mode(col('Bankruptcies'))).collect()[0][0]

# Replace 'NA' with the mode
df = df.withColumn('Bankruptcies', when(col('Bankruptcies') == 'NA', mode_bankruptcies).otherwise(col('Bankruptcies')))

In [31]:
df.groupBy('Bankruptcies').count().show(30)

+------------+------+
|Bankruptcies| count|
+------------+------+
|           3|   180|
|           0|230190|
|           1| 25605|
|           4|    33|
|           2|   957|
|           5|    15|
|           6|     3|
|           7|     1|
+------------+------+



In [32]:
# do val counts on Tax Liens
df.groupBy('Tax Liens').count().show()


+---------+------+
|Tax Liens| count|
+---------+------+
|       11|     2|
|        3|   247|
|        0|252322|
|       NA|    23|
|        5|    61|
|        6|    30|
|        1|  3276|
|        4|   124|
|        2|   872|
|        7|     6|
|        8|     8|
|        9|    10|
|       10|     3|
+---------+------+



In [33]:
# Calculate the mode of the 'Tax Liens' column
mode_tax_liens = df.select(mode(col('Tax Liens'))).collect()[0][0]
# Replace 'NA' with the mode
df = df.withColumn('Tax Liens', when(col('Tax Liens') == 'NA', mode_tax_liens).otherwise(col('Tax Liens')))

In [34]:
df.groupBy('Tax Liens').count().show()



+---------+------+
|Tax Liens| count|
+---------+------+
|       11|     2|
|        3|   247|
|        0|252345|
|        5|    61|
|        6|    30|
|        1|  3276|
|        4|   124|
|        2|   872|
|        7|     6|
|        8|     8|
|        9|    10|
|       10|     3|
+---------+------+



                                                                                

In [35]:
df.show()

+--------------------+--------------------+-----------+-------------------+----------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+
|             Loan ID|         Customer ID|Loan Status|Current Loan Amount|      Term|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|
+--------------------+--------------------+-----------+-------------------+----------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+--------------

# some enc to 0 and 1's for loan status and term

In [36]:
df.groupBy("Loan Status").count().show()

+-----------+------+
|Loan Status| count|
+-----------+------+
| Fully Paid|176191|
|Charged Off| 80793|
+-----------+------+



In [37]:
from pyspark.sql import functions as f
loan_func = f.udf(lambda x: 1 if x=='Fully Paid' else 0) # 1 write func

In [38]:
df = df.withColumn('New_Loanstatus', loan_func(f.col('Loan Status'))).drop('Loan Status').withColumnRenamed('new_Loanstatus', 'Loan Status')
df.show()

[Stage 84:>                                                         (0 + 1) / 1]

+--------------------+--------------------+-------------------+----------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+-----------+
|             Loan ID|         Customer ID|Current Loan Amount|      Term|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|Loan Status|
+--------------------+--------------------+-------------------+----------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+---

                                                                                

In [39]:
df.groupBy("Loan Status").count().show()

[Stage 85:>                                                         (0 + 8) / 8]

+-----------+------+
|Loan Status| count|
+-----------+------+
|          0| 80793|
|          1|176191|
+-----------+------+



                                                                                

In [40]:
df.groupBy("Term").count().show()

+----------+------+
|      Term| count|
+----------+------+
| Long Term| 64352|
|Short Term|192632|
+----------+------+



In [41]:
term_func = f.udf(lambda x: 1 if x=='Short Term' else 0)

In [42]:
df = df.withColumn('New_Term', term_func(f.col('Term'))).drop('Term').withColumnRenamed('new_Term', 'Term')
df.show()

+--------------------+--------------------+-------------------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+-----------+----+
|             Loan ID|         Customer ID|Current Loan Amount|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|Loan Status|Term|
+--------------------+--------------------+-------------------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------

In [43]:
df.groupBy("Term").count().show()

[Stage 92:>                                                         (0 + 8) / 8]

+----+------+
|Term| count|
+----+------+
|   0| 64352|
|   1|192632|
+----+------+



                                                                                

# Data types: Changing strings to int for numeric cols

In [44]:
df.groupBy("Years in current job").count().show()

[Stage 95:>                                                         (0 + 8) / 8]

+--------------------+-----+
|Years in current job|count|
+--------------------+-----+
|             5 years|17864|
|             9 years| 9932|
|              1 year|16746|
|             2 years|23462|
|             7 years|13968|
|             8 years|12206|
|             4 years|16166|
|             6 years|14597|
|             3 years|20659|
|           10+ years|90372|
|            < 1 year|21012|
+--------------------+-----+



                                                                                

In [45]:
df = df.replace('10+ years', '10').replace('9 years', '9').replace('8 years', '8').replace('7 years', '7').replace(
    '6 years', '6').replace('5 years', '5').replace('4 years', '4').replace('3 years', '3').replace('2 years', '2').replace(
    '1 year', '1').replace('< 1 year', '0')

df.show()

+--------------------+--------------------+-------------------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+-----------+----+
|             Loan ID|         Customer ID|Current Loan Amount|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|Loan Status|Term|
+--------------------+--------------------+-------------------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------

In [46]:
df.printSchema()


root
 |-- Loan ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Current Loan Amount: string (nullable = true)
 |-- Credit Score: string (nullable = false)
 |-- Years in current job: string (nullable = true)
 |-- Home Ownership: string (nullable = true)
 |-- Annual Income: string (nullable = false)
 |-- Purpose: string (nullable = true)
 |-- Monthly Debt: string (nullable = true)
 |-- Years of Credit History: string (nullable = true)
 |-- Months since last delinquent: string (nullable = true)
 |-- Number of Open Accounts: string (nullable = true)
 |-- Number of Credit Problems: string (nullable = true)
 |-- Current Credit Balance: string (nullable = true)
 |-- Maximum Open Credit: string (nullable = true)
 |-- Bankruptcies: string (nullable = true)
 |-- Tax Liens: string (nullable = true)
 |-- Loan Status: string (nullable = true)
 |-- Term: string (nullable = true)



In [47]:
# change the data types that need to change to int
df = df.withColumn('Current Loan Amount', col('Current Loan Amount').cast('int'))
df = df.withColumn('Credit Score', col('Credit Score').cast('int'))
df = df.withColumn('Years in current job', col('Years in current job').cast('int'))
df = df.withColumn('Annual Income', col('Annual Income').cast('int'))
df = df.withColumn('Years of Credit History', col('Years of Credit History').cast('int'))
df = df.withColumn('Months since las delinquent', col('Months since last delinquent').cast('int'))
df = df.withColumn('Number of Open Accounts', col('Number of Open Accounts').cast('int'))
df = df.withColumn('Number of Credit Problems', col('Number of Credit Problems').cast('int'))
df = df.withColumn('Current Credit Balance', col('Current Credit Balance').cast('int'))
df = df.withColumn('Maximum Open Credit', col('Maximum Open Credit').cast('int'))
df = df.withColumn('Bankruptcies', col('Bankruptcies').cast('int'))
df = df.withColumn('Tax Liens', col('Tax Liens').cast('int'))
df = df.withColumn('Loan Status', col('Loan Status').cast('int'))
df = df.withColumn('Term', col('Term').cast('int'))

In [48]:
from pyspark.sql.functions import col, regexp_replace
# Remove '$' and ',' from 'Monthly Debt' and then cast to integer
df = df.withColumn('Monthly Debt', regexp_replace(col('Monthly Debt'), '[$,]', '').cast('int'))

In [49]:
df.show()

+--------------------+--------------------+-------------------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+-----------+----+---------------------------+
|             Loan ID|         Customer ID|Current Loan Amount|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|Loan Status|Term|Months since las delinquent|
+--------------------+--------------------+-------------------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------

# Check for Duplicated Loan ID

In [50]:
from pyspark.sql.functions import col, count
df.groupBy('Loan ID').agg(count("*").alias("count")).filter(col("count") > 1).show()
# # drop duplicated Loan ID without



+--------------------+-----+
|             Loan ID|count|
+--------------------+-----+
|aec59c68-7343-4d6...|    2|
|af26c079-de36-401...|    2|
|af4756d1-1ae2-415...|    2|
|b026e161-8610-4d8...|    2|
|b3283146-3f46-462...|    2|
|b46e333a-6b4e-404...|    2|
|b503c204-d286-47a...|    2|
|b70e8680-99b7-462...|    2|
|b8371de0-7f29-43f...|    2|
|b89de3b7-309c-40f...|    2|
|b9ffd985-11b9-4b7...|    2|
|ba47e175-4bc0-48f...|    2|
|bc8a6586-b467-449...|    2|
|bc9fde3c-fa8b-4b8...|    2|
|c131c0b6-83c6-4c1...|    2|
|c145e11d-f975-420...|    2|
|c331bc9d-5916-417...|    2|
|c37b979f-3bf3-4bf...|    2|
|c3986284-5ab5-463...|    2|
|c3f0f698-b042-4bb...|    2|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [51]:
df.show()

+--------------------+--------------------+-------------------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+-----------+----+---------------------------+
|             Loan ID|         Customer ID|Current Loan Amount|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|Loan Status|Term|Months since las delinquent|
+--------------------+--------------------+-------------------+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------

In [52]:
from pyspark.sql.functions import max

# Group by 'Loan ID' and select the row w the max val to deal w the duplicated Loan ID
df = df.groupBy('Loan ID').agg(max('Loan Status').alias('Loan Status'),
                               max('Current Loan Amount').alias('Current Loan Amount'),
                               max('Term').alias('Term'),
                               max('Credit Score').alias('Credit Score'),
                               max('Years in current job').alias('Years in current job'),
                               max('Home Ownership').alias('Home Ownership'),
                               max('Annual Income').alias('Annual Income'),
                               max('Purpose').alias('Purpose'),
                               max('Monthly Debt').alias('Monthly Debt'),
                               max('Years of Credit History'). alias('Years of Credit History'),
                               max('Months since last delinquent').alias('Months since last delinquent'),
                               max('Number of Open Accounts').alias('Number of Open Accounts'),
                               max('Number of Credit Problems').alias('Number of Credit Problems'),
                               max('Current Credit Balance').alias('Current Credit Balance'),
                               max('Maximum Open Credit').alias('Maximum Open Credit'),
                               max('Bankruptcies').alias('Bankruptcies'),
                               max('Tax Liens').alias('Tax Liens'))

In [53]:
df.count()

                                                                                

215700

In [54]:
df.groupBy('Loan ID').agg(count("*").alias("count")).filter(col("count") > 1).show()

+-------+-----+
|Loan ID|count|
+-------+-----+
+-------+-----+



In [55]:
df.show()



+--------------------+-----------+-------------------+----+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+
|             Loan ID|Loan Status|Current Loan Amount|Term|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|
+--------------------+-----------+-------------------+----+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+
|00035328-2636-439...|      

                                                                                

# Credit Score Higher Than 800

In [56]:
# Credit Score higher than 800 needs to be divided by 10
df = df.withColumn('Credit Score', when(col('Credit Score') > 800, col('Credit Score') / 10).otherwise(col('Credit Score')))
df.show()




+--------------------+-----------+-------------------+----+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+
|             Loan ID|Loan Status|Current Loan Amount|Term|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens|
+--------------------+-----------+-------------------+----+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+
|00035328-2636-439...|      

                                                                                

In [57]:
df.groupBy('Credit Score').count().show()

[Stage 123:>                                                        (0 + 8) / 8]

+------------+-----+
|Credit Score|count|
+------------+-----+
|       692.0|  695|
|       596.0|   28|
|       720.0| 1840|
|       735.0| 3477|
|       608.0|   42|
|       702.0|  962|
|       650.0|  160|
|       729.0| 2827|
|       594.0|   25|
|       677.0|  378|
|       699.0|  963|
|       724.0| 2155|
|       663.0|  277|
|       611.0|   44|
|       708.0| 1195|
|       643.0|  127|
|       681.0|  454|
|       685.0|  475|
|       625.0|   82|
|       741.0| 5343|
+------------+-----+
only showing top 20 rows



                                                                                

# ft engineering

In [58]:
# ft enginneering: Credit Utilization
df = df.withColumn('Credit Utilization', df['Current Credit Balance'] / df['Maximum Open Credit'])
df.show()



+--------------------+-----------+-------------------+----+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+-------------------+
|             Loan ID|Loan Status|Current Loan Amount|Term|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens| Credit Utilization|
+--------------------+-----------+-------------------+----+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------

                                                                                

In [59]:
# ft enginneering: Credit % of Annual Income
df = df.withColumn('Credit % of Annual Income', (df['Current Credit Balance'] / df['Annual Income']) * 100)
df.show()

[Stage 132:>                                                        (0 + 8) / 8]

+--------------------+-----------+-------------------+----+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+----------------------+-------------------+------------+---------+-------------------+-------------------------+
|             Loan ID|Loan Status|Current Loan Amount|Term|Credit Score|Years in current job|Home Ownership|Annual Income|           Purpose|Monthly Debt|Years of Credit History|Months since last delinquent|Number of Open Accounts|Number of Credit Problems|Current Credit Balance|Maximum Open Credit|Bankruptcies|Tax Liens| Credit Utilization|Credit % of Annual Income|
+--------------------+-----------+-------------------+----+------------+--------------------+--------------+-------------+------------------+------------+-----------------------+----------------------------+-----------------------+-------------------------+---

                                                                                