# Milestone 3: Spark Analysis


## Objectives

1. Loading the dataset : 5%
2. Perform some simple cleaning: 30%
   - Column renaming: 10%
   - Detect missing: 35%
   - Handle missing: 35%
   - Check missing : 20%
3. Perform some analysis on the dataset: 30%
4. Add new columns with feature engineering: 15%
5. Encode categorical columns: 10%
6. Create a lookup table for encoding only: 5%
7. Saving Cleaned dataset and lookup table: 5%
8. Bonus: Saving the output into a postgres database: 5%


#### Importing the required libraries


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.functions import col, udf, lit, when, create_map, lag, lead
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, ArrayType, StructType, StructField, DateType
from pyspark.sql.window import Window
import psutil

#### Initialize the spark session


In [2]:
spark = SparkSession.builder.appName("milestone3").getOrCreate()
sc = spark.sparkContext

## Part 1: Load the dataset


In [3]:
df = spark.read.parquet("data/fintech_data_22_52_14669.parquet")

#### Print the schema of the dataset


In [4]:
df.printSchema()

root
 |-- Customer Id: string (nullable = true)
 |-- Emp Title: string (nullable = true)
 |-- Emp Length: string (nullable = true)
 |-- Home Ownership: string (nullable = true)
 |-- Annual Inc: double (nullable = true)
 |-- Annual Inc Joint: double (nullable = true)
 |-- Verification Status: string (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- Addr State: string (nullable = true)
 |-- Avg Cur Bal: double (nullable = true)
 |-- Tot Cur Bal: double (nullable = true)
 |-- Loan Id: long (nullable = true)
 |-- Loan Status: string (nullable = true)
 |-- Loan Amount: double (nullable = true)
 |-- State: string (nullable = true)
 |-- Funded Amount: double (nullable = true)
 |-- Term: string (nullable = true)
 |-- Int Rate: double (nullable = true)
 |-- Grade: long (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Pymnt Plan: boolean (nullable = true)
 |-- Type: string (nullable = true)
 |-- Purpose: string (nullable = true)
 |-- Description: string (nullable = t

#### Display the first 20 rows of the dataset


In [5]:
df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|      Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+
|YidceGZkXHgxZmtce...|         Lieutenant |

In [6]:
df.show(20, vertical=True)

-RECORD 0-----------------------------------
 Customer Id         | YidceGZkXHgxZmtce... 
 Emp Title           | Lieutenant           
 Emp Length          | 10+ years            
 Home Ownership      | MORTGAGE             
 Annual Inc          | 200000.0             
 Annual Inc Joint    | NULL                 
 Verification Status | Source Verified      
 Zip Code            | 117xx                
 Addr State          | NY                   
 Avg Cur Bal         | 19404.0              
 Tot Cur Bal         | 329872.0             
 Loan Id             | 197838               
 Loan Status         | Current              
 Loan Amount         | 20000.0              
 State               | NY                   
 Funded Amount       | 20000.0              
 Term                |  60 months           
 Int Rate            | 0.124                
 Grade               | 6                    
 Issue Date          | 19 August 2019       
 Pymnt Plan          | false                
 Type     

#### How many partitions is this dataframe split into?

- Number of partitions: 1


In [7]:
number_of_partitions = df.rdd.getNumPartitions()
print(f"Number of partitions: {number_of_partitions}")

Number of partitions: 1


#### Number of my logical cores


In [8]:
logical_cores = psutil.cpu_count(logical=True)
print(f"Logical cores: {logical_cores}")

physical_cores = psutil.cpu_count(logical=False)
print(f"Physical cores: {physical_cores}")

Logical cores: 16
Physical cores: 8


#### Change partitions to be equal to the logical cores


In [9]:
df = df.repartition(logical_cores)

number_of_partitions = df.rdd.getNumPartitions()
print(f"Number of partitions: {number_of_partitions}")

Number of partitions: 16


## Part 2: Cleaning


#### Rename all columns (replacing a space with an underscore, and making it lowercase)


In [10]:
def rename_columns(df): 
    """
    Renames the columns of a DataFrame by converting them to lowercase, 
    stripping leading and trailing whitespace, replacing spaces with underscores, 
    and removing any non-alphanumeric characters except underscores.
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame whose columns need to be renamed.
    Returns:
        pyspark.sql.DataFrame: A new DataFrame with renamed columns.
    Example:
        >>> df = spark.createDataFrame([(1, 2)], ["Column A", "Column B"])
        >>> df = rename_columns(df)
        >>> df.columns
        ['column_a', 'column_b']
    """
    new_columns = {}
    for column in df.columns:
        new_column = column.lower().strip().replace(" ", "_")
        new_column = ''.join(e for e in new_column if e.isalnum() or e == '_')
        new_columns[column] = new_column

    for column in new_columns:
        df = df.withColumnRenamed(column, new_columns[column])

    print(f"Columns renamed: {new_columns}")

    return df

df = rename_columns(df)

Columns renamed: {'Customer Id': 'customer_id', 'Emp Title': 'emp_title', 'Emp Length': 'emp_length', 'Home Ownership': 'home_ownership', 'Annual Inc': 'annual_inc', 'Annual Inc Joint': 'annual_inc_joint', 'Verification Status': 'verification_status', 'Zip Code': 'zip_code', 'Addr State': 'addr_state', 'Avg Cur Bal': 'avg_cur_bal', 'Tot Cur Bal': 'tot_cur_bal', 'Loan Id': 'loan_id', 'Loan Status': 'loan_status', 'Loan Amount': 'loan_amount', 'State': 'state', 'Funded Amount': 'funded_amount', 'Term': 'term', 'Int Rate': 'int_rate', 'Grade': 'grade', 'Issue Date': 'issue_date', 'Pymnt Plan': 'pymnt_plan', 'Type': 'type', 'Purpose': 'purpose', 'Description': 'description'}


In [11]:
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- annual_inc_joint: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- avg_cur_bal: double (nullable = true)
 |-- tot_cur_bal: double (nullable = true)
 |-- loan_id: long (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- loan_amount: double (nullable = true)
 |-- state: string (nullable = true)
 |-- funded_amount: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- grade: long (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- pymnt_plan: boolean (nullable = true)
 |-- type: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- description: string (nullable = t

#### Detect Missing Values


In [12]:
def calculate_missing_percentages(df):
    """
    Calculates the percentage of missing values in each column of a DataFrame.
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame whose missing values need to be calculated.
    Returns:
        dict: A dictionary containing the column names as keys and the percentage of missing values as values.
    Example:
        >>> df = spark.createDataFrame([(None, 2)], ["Column A", "Column B"])
        >>> missing_percentages = calculate_missing_percentages(df)
        >>> missing_percentages
        {'Column A': 100.0, 'Column B': 0.0}
    """
    missing_percentages = {}
    total_rows = df.count()

    for column in df.columns:
        missing_rows = df.filter(fn.col(column).isNull()).count()
        missing_percentage = (missing_rows / total_rows) * 100
        missing_percentages[column] = missing_percentage

    print(f"Missing percentages calculated: {missing_percentages}")

    return missing_percentages

missing_percentages = calculate_missing_percentages(df)
missing_percentages

Missing percentages calculated: {'customer_id': 0.0, 'emp_title': 8.790233074361819, 'emp_length': 6.870144284128746, 'home_ownership': 0.0, 'annual_inc': 0.0, 'annual_inc_joint': 92.89678135405104, 'verification_status': 0.0, 'zip_code': 0.0, 'addr_state': 0.0, 'avg_cur_bal': 0.0, 'tot_cur_bal': 0.0, 'loan_id': 0.0, 'loan_status': 0.0, 'loan_amount': 0.0, 'state': 0.0, 'funded_amount': 0.0, 'term': 0.0, 'int_rate': 4.480207177210507, 'grade': 0.0, 'issue_date': 0.0, 'pymnt_plan': 0.0, 'type': 0.0, 'purpose': 0.0, 'description': 0.8065112837587864}


{'customer_id': 0.0,
 'emp_title': 8.790233074361819,
 'emp_length': 6.870144284128746,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 92.89678135405104,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 4.480207177210507,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.8065112837587864}

#### Handle Missing Values


In [13]:
def impute_with_arbitrary_value(df, column, value):
    """
    Imputes missing values in a column of a DataFrame with an arbitrary value.
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame whose missing values need to be imputed.
        column (str): The name of the column to impute missing values.
        value (str, int, float): The arbitrary value to impute missing values.
    Returns:
        pyspark.sql.DataFrame: A new DataFrame with missing values imputed.
    Example:
        >>> df = spark.createDataFrame([(None, 2)], ["Column A", "Column B"])
        >>> df = imput_with_arbitrary_value(df, "Column A", 0)
        >>> df.show()
        +--------+--------+
        |Column A|Column B|
        +--------+--------+
        |       0|       2|
        +--------+--------+
    """
    df = df.fillna(value, subset=[column])

    print(f"Missing values imputed in column '{column}' with value '{value}'")

    return df

def calculate_mode(df, column):
    """
    Calculates the mode of a column in a DataFrame.
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame whose mode needs to be calculated.
        column (str): The name of the column to calculate the mode.
    Returns:
        str, int, float: The mode of the column.
    Example:
        >>> df = spark.createDataFrame([(1, 2), (1, 2), (2, 2)], ["Column A", "Column B"])
        >>> mode = calculate_mode(df, "Column A")
        >>> mode
        1
    """

    df_without_nulls = df.filter(fn.col(column).isNotNull())
    mode = df_without_nulls.groupBy(column).count().sort(fn.desc("count")).first()[column]

    return mode

def impute_with_mode(df, column):
    """
    Imputes missing values in a column of a DataFrame with the mode.
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame whose missing values need to be imputed.
        column (str): The name of the column to impute missing values.
    Returns:
        pyspark.sql.DataFrame: A new DataFrame with missing values imputed.
    Example:
        >>> df = spark.createDataFrame([(None, 2), (1, 2), (1, 2)], ["Column A", "Column B"])
        >>> df = impute_with_mode(df, "Column A")
        >>> df.show()
        +--------+--------+
        |Column A|Column B|
        +--------+--------+
        |       1|       2|
        |       1|       2|
        |       1|       2|
        +--------+--------+
    """
    mode = calculate_mode(df, column)
    df = df.fillna(mode, subset=[column])

    print(f"Missing values imputed in column '{column}' with mode '{mode}'")

    return df

# For numerical features replace with 0.
#  For categorical/strings replace with mode
def handle_missing_values(df):
    """
    Handles missing values in a DataFrame by imputing with an arbitrary value or the mode.
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame whose missing values need to be handled.
    Returns:
        pyspark.sql.DataFrame: A new DataFrame with missing values imputed.
    Example:
        >>> df = spark.createDataFrame([(None, 2), (1, 2), (1, 2)], ["Column A", "Column B"])
        >>> df = handle_missing_values(df)
        >>> df.show()
        +--------+--------+
        |Column A|Column B|
        +--------+--------+
        |       0|       2|
        |       1|       2|
        |       1|       2|
        +--------+--------+
    """
    for column in df.columns:
        if df.filter(fn.col(column).isNull()).count() > 0:
            if df.schema[column].dataType in [StringType()]:
                df = impute_with_mode(df, column)
            else:
                df = impute_with_arbitrary_value(df, column, 0)

    return df

In [14]:
df_imputed = handle_missing_values(df)

Missing values imputed in column 'emp_title' with mode 'Teacher'
Missing values imputed in column 'emp_length' with mode '10+ years'
Missing values imputed in column 'annual_inc_joint' with value '0'
Missing values imputed in column 'int_rate' with value '0'
Missing values imputed in column 'description' with mode 'Debt consolidation'


#### Check Missing Values


In [15]:
calculate_missing_percentages(df_imputed)

Missing percentages calculated: {'customer_id': 0.0, 'emp_title': 0.0, 'emp_length': 0.0, 'home_ownership': 0.0, 'annual_inc': 0.0, 'annual_inc_joint': 0.0, 'verification_status': 0.0, 'zip_code': 0.0, 'addr_state': 0.0, 'avg_cur_bal': 0.0, 'tot_cur_bal': 0.0, 'loan_id': 0.0, 'loan_status': 0.0, 'loan_amount': 0.0, 'state': 0.0, 'funded_amount': 0.0, 'term': 0.0, 'int_rate': 0.0, 'grade': 0.0, 'issue_date': 0.0, 'pymnt_plan': 0.0, 'type': 0.0, 'purpose': 0.0, 'description': 0.0}


{'customer_id': 0.0,
 'emp_title': 0.0,
 'emp_length': 0.0,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 0.0,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 0.0,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.0}

In [16]:
def check_missing_values(df):
    """
    Checks for missing values in a DataFrame.
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to check for missing values.
    Returns:
        bool: True if there are missing values, False otherwise.
    Example:
        >>> df = spark.createDataFrame([(1, 2), (1, None)], ["Column A", "Column B"])
        >>> check_missing_values(df)
        True
    """
    missing_values_dict = {column: df.filter(fn.col(column).isNull()).count() for column in df.columns}

    for column, missing_values in missing_values_dict.items():
        print(f"Column '{column}' has {missing_values} missing values")

check_missing_values(df_imputed)


Column 'customer_id' has 0 missing values
Column 'emp_title' has 0 missing values
Column 'emp_length' has 0 missing values
Column 'home_ownership' has 0 missing values
Column 'annual_inc' has 0 missing values
Column 'annual_inc_joint' has 0 missing values
Column 'verification_status' has 0 missing values
Column 'zip_code' has 0 missing values
Column 'addr_state' has 0 missing values
Column 'avg_cur_bal' has 0 missing values
Column 'tot_cur_bal' has 0 missing values
Column 'loan_id' has 0 missing values
Column 'loan_status' has 0 missing values
Column 'loan_amount' has 0 missing values
Column 'state' has 0 missing values
Column 'funded_amount' has 0 missing values
Column 'term' has 0 missing values
Column 'int_rate' has 0 missing values
Column 'grade' has 0 missing values
Column 'issue_date' has 0 missing values
Column 'pymnt_plan' has 0 missing values
Column 'type' has 0 missing values
Column 'purpose' has 0 missing values
Column 'description' has 0 missing values


In [17]:
df_imputed.show(20, vertical=True)

-RECORD 0-----------------------------------
 customer_id         | YidhaVIvXHhlMVx4Z... 
 emp_title           | Owner/President      
 emp_length          | 10+ years            
 home_ownership      | MORTGAGE             
 annual_inc          | 273800.0             
 annual_inc_joint    | 0.0                  
 verification_status | Source Verified      
 zip_code            | 730xx                
 addr_state          | OK                   
 avg_cur_bal         | 26239.0              
 tot_cur_bal         | 577250.0             
 loan_id             | 261943               
 loan_status         | Current              
 loan_amount         | 35000.0              
 state               | OK                   
 funded_amount       | 35000.0              
 term                |  60 months           
 int_rate            | 0.2499               
 grade               | 29                   
 issue_date          | 14 October 2014      
 pymnt_plan          | false                
 type     

## Creating Lookup Table for Encoding


In [18]:
lookup_table_schema = StructType([
    StructField("Column", StringType(), True),
    StructField("Original", StringType(), True),
    StructField("Encoded", StringType(), True)
])

lookup_table = spark.createDataFrame([], lookup_table_schema)

In [19]:
lookup_table.show()

+------+--------+-------+
|Column|Original|Encoded|
+------+--------+-------+
+------+--------+-------+



In [20]:
lookup_table.printSchema()

root
 |-- Column: string (nullable = true)
 |-- Original: string (nullable = true)
 |-- Encoded: string (nullable = true)



## Part 3: Encoding Columns


In [21]:
def label_encode(df, column, mapping, lookup_table):
    """
    Label encodes a categorical column in a DataFrame using a mapping.
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to label encode.
        column (str): The name of the column to label encode.
        mapping (dict): A dictionary containing the mapping of category to label.
    Returns:
        pyspark.sql.DataFrame: A new DataFrame with the column label encoded.
    Example:
        >>> df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "A")], ["Column A", "Column B"])
        >>> mapping = {"A": 0, "B": 1}
        >>> df = label_encode(df, "Column B", mapping)
        >>> df.show()
        +--------+--------+
        |Column A|Column B|
        +--------+--------+
        |       1|       0|
        |       2|       1|
        |       3|       0|
        +--------+--------+
    """
    mapping = {key: float(value) for key, value in mapping.items()}
    mapping_expr = udf(lambda category: mapping[category], DoubleType())
    df = df.withColumn(column, mapping_expr(fn.col(column)))
    
    print(f"Column '{column}' label encoded with mapping '{mapping}'")

    lookup_table = lookup_table.union(spark.createDataFrame([(column, key, value) for key, value in mapping.items()], lookup_table_schema))
    lookup_table = lookup_table.coalesce(16)

    print(f"Lookup table updated with column '{column}'")

    return df, lookup_table


In [23]:
emp_length_mapping = {
    '10+ years': 10,
    '9 years': 9,
    '8 years': 8,
    '7 years': 7,
    '6 years': 6,
    '5 years': 5,
    '4 years': 4,
    '3 years': 3,
    '2 years': 2,
    '1 year': 1,
    '< 1 year': 0.5
}

df_encoded, lookup_table = label_encode(df_imputed, "emp_length", emp_length_mapping, lookup_table)
df_encoded.show()

lookup_table.show()

Column 'emp_length' label encoded with mapping '{'10+ years': 10.0, '9 years': 9.0, '8 years': 8.0, '7 years': 7.0, '6 years': 6.0, '5 years': 5.0, '4 years': 4.0, '3 years': 3.0, '2 years': 2.0, '1 year': 1.0, '< 1 year': 0.5}'
Lookup table updated with column 'emp_length'
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|       loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+--

In [25]:
df_encoded.select("home_ownership").distinct().show()
df_encoded.select("verification_status").distinct().show()
df_encoded.select("state").distinct().show()
df_encoded.select("type").distinct().show()
df_encoded.select("purpose").distinct().show()

+--------------+
|home_ownership|
+--------------+
|           OWN|
|          RENT|
|      MORTGAGE|
|           ANY|
+--------------+

+-------------------+
|verification_status|
+-------------------+
|           Verified|
|    Source Verified|
|       Not Verified|
+-------------------+

+-----+
|state|
+-----+
|   AZ|
|   SC|
|   LA|
|   MN|
|   NJ|
|   DC|
|   OR|
|   VA|
|   RI|
|   WY|
|   KY|
|   NH|
|   MI|
|   NV|
|   WI|
|   ID|
|   CA|
|   CT|
|   NE|
|   MT|
+-----+
only showing top 20 rows

+----------+
|      type|
+----------+
| Joint App|
|Individual|
|DIRECT_PAY|
|     JOINT|
|INDIVIDUAL|
+----------+

+------------------+
|           purpose|
+------------------+
|             other|
|    small_business|
|debt_consolidation|
|       credit_card|
|            moving|
|          vacation|
|  renewable_energy|
|             house|
|               car|
|    major_purchase|
|           medical|
|  home_improvement|
|           wedding|
+------------------+



Since there is inconsistency in the values of the columns, for example in type column Joint App and Joint will be mapped to Joint, DIRECT_PAY will be mapped to DIRECT_PAY, etc. We will encode the columns to have consistent values.


In [26]:
def mapping(df, column, mapping_dict):
    """
    Maps the values of a column in a DataFrame using a dictionary.
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to map values.
        column (str): The name of the column to map values.
        mapping_dict (dict): A dictionary containing the mapping of original values to new values.
    Returns:
        pyspark.sql.DataFrame: A new DataFrame with the column values mapped.
    Example:
        >>> df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "A")], ["Column A", "Column B"])
        >>> mapping_dict = {"A": "X", "B": "Y"}
        >>> df = mapping(df, "Column B", mapping_dict)
        >>> df.show()
        +--------+--------+
        |Column A|Column B|
        +--------+--------+
        |       1|       X|
        |       2|       Y|
        |       3|       X|
        +--------+--------+
    """

    mapping_expr = col(column)
    for old_value, new_value in mapping_dict.items():
        mapping_expr = when(col(column) == old_value, new_value).otherwise(mapping_expr)
    
    return df.withColumn(column, mapping_expr)

In [27]:
type_mapping = {
    "Joint App": "JOINT",
    "Individual": "INDIVIDUAL",
    "DIRECT_PAY": "DIRECT PAY",
}

df_encoded = mapping(df_encoded, "type", type_mapping)
df_encoded.show()


+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|       loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidhaVIvXHhlMVx4Z.

In [28]:
df_encoded.select("type").distinct().show()

+----------+
|      type|
+----------+
|DIRECT PAY|
|     JOINT|
|INDIVIDUAL|
+----------+



#### The values in the type column are consistent, so we can encode them


In [30]:
def clean_column_name(column):
    """
    Cleans a column name by converting it to lowercase, stripping leading and trailing whitespace, 
    replacing spaces with underscores, and removing any non-alphanumeric characters except underscores.
    Args:
        column (str): The column name to clean.
    Returns:
        str: The cleaned column name.
    Example:
        >>> clean_column_name("  Column A  ")
        'column_a'
    """
    column = column.lower().strip().replace(" ", "_")
    column = ''.join(e for e in column if e.isalnum() or e == '_')

    return column


def one_hot_encode(df, column):
    """
    One-hot encodes the specified column in the given DataFrame.
    Parameters:
    df (pyspark.sql.DataFrame): The input DataFrame.
    column (str): The name of the column to be one-hot encoded.
    Returns:
    pyspark.sql.DataFrame: A new DataFrame with the specified column one-hot encoded and the original column dropped.
    Example:
    >>> df = spark.createDataFrame([(0, "a"), (1, "b"), (2, "a")], ["id", "category"])
    >>> one_hot_encoded_df = one_hot_encode(df, "category")
    >>> one_hot_encoded_df.show()
    +---+-------------+-------------+
    | id|category_a   |category_b   |
    +---+-------------+-------------+
    |  0|            1|            0|
    |  1|            0|            1|
    |  2|            1|            0|
    +---+-------------+-------------+
    """
    
    unique_values = df.select(column).distinct().rdd.flatMap(lambda x: x).collect()
    print(f"Unique values in column '{column}': {unique_values}")

    for value in unique_values:
        new_column_name = clean_column_name(column + "_" + str(value))
        df = df.withColumn(new_column_name, when(col(column) == value, 1).otherwise(0))

    df = df.drop(column)

    return df

#### Let's encode the following columns using one hot encoding:

    1. Home Ownership Column
    2. Verification Status Column
    3. Type Column


In [31]:
df_encoded = one_hot_encode(df_encoded, "home_ownership")
df_encoded = one_hot_encode(df_encoded, "verification_status")
df_encoded = one_hot_encode(df_encoded, "type")
df_encoded.show()

Unique values in column 'home_ownership': ['OWN', 'RENT', 'MORTGAGE', 'ANY']
Unique values in column 'verification_status': ['Verified', 'Source Verified', 'Not Verified']
Unique values in column 'type': ['DIRECT PAY', 'JOINT', 'INDIVIDUAL']
+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+---------------+----------+---------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|       loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|           pur

In [32]:
def check_one_hot_encoding(df, columns):
    """
    Checks the one-hot encoding consistency for the specified columns in the given DataFrame.
    Parameters:
    df (pyspark.sql.DataFrame): The DataFrame to check for one-hot encoding.
    columns (list of str): The list of column prefixes to check for one-hot encoding.
    Returns:
    dict: A dictionary containing the count of ones, zeros, and total for each one-hot encoded column.
    Example:
    >>> df = spark.createDataFrame([(1, 0, 0), (0, 1, 0), (0, 0, 1)], ["A_1", "A_2", "A_3"])
    >>> check_one_hot_encoding(df, ["A"])
    One-hot encoded columns: ['A_1', 'A_2', 'A_3']
    One-hot encoding consistency: {'A_1_ones': 1, 'A_1_zeros': 2, 'A_1_total': 3, 'A_2_ones': 1, 'A_2_zeros': 2, 'A_2_total': 3, 'A_3_ones': 1, 'A_3_zeros': 2, 'A_3_total': 3}
    {'A_1_ones': 1, 'A_1_zeros': 2, 'A_1_total': 3, 'A_2_ones': 1, 'A_2_zeros': 2, 'A_2_total': 3, 'A_3_ones': 1, 'A_3_zeros': 2, 'A_3_total': 3}
    """

    one_hot_columns = []
    for column in columns:
        one_hot_columns += [col for col in df.columns if col.startswith(column + "_")]

    print(f"One-hot encoded columns: {one_hot_columns}")

    output_dict = {}
    for column in one_hot_columns:
        output_dict[column + "_ones"] = df.filter(fn.col(column) == 1).count()
        output_dict[column + "_zeros"] = df.filter(fn.col(column) == 0).count()
        output_dict[column + "_total"] = output_dict[column + "_ones"] + output_dict[column + "_zeros"]

    print(f"One-hot encoding consistency: {output_dict}")




    return output_dict

check_one_hot_encoding(df_encoded, ["home_ownership", "verification_status", "type"])

One-hot encoded columns: ['home_ownership_own', 'home_ownership_rent', 'home_ownership_mortgage', 'home_ownership_any', 'verification_status_verified', 'verification_status_source_verified', 'verification_status_not_verified', 'type_direct_pay', 'type_joint', 'type_individual']
One-hot encoding consistency: {'home_ownership_own_ones': 3127, 'home_ownership_own_zeros': 23903, 'home_ownership_own_total': 27030, 'home_ownership_rent_ones': 10517, 'home_ownership_rent_zeros': 16513, 'home_ownership_rent_total': 27030, 'home_ownership_mortgage_ones': 13349, 'home_ownership_mortgage_zeros': 13681, 'home_ownership_mortgage_total': 27030, 'home_ownership_any_ones': 37, 'home_ownership_any_zeros': 26993, 'home_ownership_any_total': 27030, 'verification_status_verified_ones': 6683, 'verification_status_verified_zeros': 20347, 'verification_status_verified_total': 27030, 'verification_status_source_verified_ones': 10606, 'verification_status_source_verified_zeros': 16424, 'verification_status_sou

{'home_ownership_own_ones': 3127,
 'home_ownership_own_zeros': 23903,
 'home_ownership_own_total': 27030,
 'home_ownership_rent_ones': 10517,
 'home_ownership_rent_zeros': 16513,
 'home_ownership_rent_total': 27030,
 'home_ownership_mortgage_ones': 13349,
 'home_ownership_mortgage_zeros': 13681,
 'home_ownership_mortgage_total': 27030,
 'home_ownership_any_ones': 37,
 'home_ownership_any_zeros': 26993,
 'home_ownership_any_total': 27030,
 'verification_status_verified_ones': 6683,
 'verification_status_verified_zeros': 20347,
 'verification_status_verified_total': 27030,
 'verification_status_source_verified_ones': 10606,
 'verification_status_source_verified_zeros': 16424,
 'verification_status_source_verified_total': 27030,
 'verification_status_not_verified_ones': 9741,
 'verification_status_not_verified_zeros': 17289,
 'verification_status_not_verified_total': 27030,
 'type_direct_pay_ones': 19,
 'type_direct_pay_zeros': 27011,
 'type_direct_pay_total': 27030,
 'type_joint_ones': 1

#### Let's encode the following columns using label encoding based on the lexographical order:

    1. State Column
    2. Purpose Column


In [35]:
def label_encode_lexicographically(df, column, lookup_table):
    """
    Label encodes a categorical column in a DataFrame lexicographically.
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to label encode.
        column (str): The name of the column to label encode.
    Returns:
        pyspark.sql.DataFrame: A new DataFrame with the column label encoded.
    Example:
        >>> df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "A")], ["Column A", "Column B"])
        >>> df = label_encode_lexicographically(df, "Column B")
        >>> df.show()
        +--------+--------+
        |Column A|Column B|
        +--------+--------+
        |       1|       0|
        |       2|       1|
        |       3|       0|
        +--------+--------+
    """
    categories = df.select(column).distinct().rdd.flatMap(lambda x: x).collect()
    categories.sort()

    mapping = {category: index + 1 for index, category in enumerate(categories)}
    
    print(f"Mapping for column '{column}': {mapping}")
    df, lookup_table = label_encode(df, column, mapping, lookup_table)

    return df, lookup_table

In [40]:
df_encoded, lookup_table = label_encode_lexicographically(df_encoded, "state", lookup_table)

Mapping for column 'state': {'AK': 1, 'AL': 2, 'AR': 3, 'AZ': 4, 'CA': 5, 'CO': 6, 'CT': 7, 'DC': 8, 'DE': 9, 'FL': 10, 'GA': 11, 'HI': 12, 'ID': 13, 'IL': 14, 'IN': 15, 'KS': 16, 'KY': 17, 'LA': 18, 'MA': 19, 'MD': 20, 'ME': 21, 'MI': 22, 'MN': 23, 'MO': 24, 'MS': 25, 'MT': 26, 'NC': 27, 'ND': 28, 'NE': 29, 'NH': 30, 'NJ': 31, 'NM': 32, 'NV': 33, 'NY': 34, 'OH': 35, 'OK': 36, 'OR': 37, 'PA': 38, 'RI': 39, 'SC': 40, 'SD': 41, 'TN': 42, 'TX': 43, 'UT': 44, 'VA': 45, 'VT': 46, 'WA': 47, 'WI': 48, 'WV': 49, 'WY': 50}
Column 'state' label encoded with mapping '{'AK': 1.0, 'AL': 2.0, 'AR': 3.0, 'AZ': 4.0, 'CA': 5.0, 'CO': 6.0, 'CT': 7.0, 'DC': 8.0, 'DE': 9.0, 'FL': 10.0, 'GA': 11.0, 'HI': 12.0, 'ID': 13.0, 'IL': 14.0, 'IN': 15.0, 'KS': 16.0, 'KY': 17.0, 'LA': 18.0, 'MA': 19.0, 'MD': 20.0, 'ME': 21.0, 'MI': 22.0, 'MN': 23.0, 'MO': 24.0, 'MS': 25.0, 'MT': 26.0, 'NC': 27.0, 'ND': 28.0, 'NE': 29.0, 'NH': 30.0, 'NJ': 31.0, 'NM': 32.0, 'NV': 33.0, 'NY': 34.0, 'OH': 35.0, 'OK': 36.0, 'OR': 37.0, '

In [41]:
df_encoded.show()
lookup_table.show(n=lookup_table.count(), truncate=False)

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+---------------+----------+---------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|       loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|           purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|verification_status_verified|verification_status_source_verified|verification_status_not_verified|type_direct_pay|type_joint|type_indi

In [42]:
df_encoded, lookup_table = label_encode_lexicographically(df_encoded, "purpose", lookup_table)

Mapping for column 'purpose': {'car': 1, 'credit_card': 2, 'debt_consolidation': 3, 'home_improvement': 4, 'house': 5, 'major_purchase': 6, 'medical': 7, 'moving': 8, 'other': 9, 'renewable_energy': 10, 'small_business': 11, 'vacation': 12, 'wedding': 13}
Column 'purpose' label encoded with mapping '{'car': 1.0, 'credit_card': 2.0, 'debt_consolidation': 3.0, 'home_improvement': 4.0, 'house': 5.0, 'major_purchase': 6.0, 'medical': 7.0, 'moving': 8.0, 'other': 9.0, 'renewable_energy': 10.0, 'small_business': 11.0, 'vacation': 12.0, 'wedding': 13.0}'
Lookup table updated with column 'purpose'


In [43]:
df_encoded.show()
lookup_table.show(n=lookup_table.count(), truncate=False)

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+-------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+---------------+----------+---------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|       loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|verification_status_verified|verification_status_source_verified|verification_status_not_verified|type_direct_pay|type_joint|type_individual|
+-------------

In [46]:
def discretize_grade_to_letter_grade(df, column):

    df = df.withColumn(
        "letter_grade",
        when(col(column).between(1, 5), "A")
        .when(col(column).between(6, 10), "B")
        .when(col(column).between(11, 15), "C")
        .when(col(column).between(16, 20), "D")
        .when(col(column).between(21, 25), "E")
        .when(col(column).between(26, 30), "F")
        .when(col(column).between(31, 35), "G")
        .otherwise("None")
    )

    print(f"Column '{column}' discretized to 'letter_grade'")

    return df

    

In [47]:
df_encoded = discretize_grade_to_letter_grade(df_encoded, "grade")

Column 'grade' discretized to 'letter_grade'


In [48]:
df_encoded.show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+-------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+---------------+----------+---------------+------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|       loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|verification_status_verified|verification_status_source_verified|verification_status_not_verified|type_direct_pay|type_joint|type_individual|le

## Part 4: Feature Engineering


Write a function that adds the following 3 features to the dataset:

    - Previous loan issue date from the same grade
    - Previous Loan amount from the same grade
    - Previous loan date from the same state and grade combined
    - Previous loan amount from the same state and grade combined


#### Let's first standardize the date columns to have the same format and then convert them to date type columns


In [50]:
df_encoded.select("issue_date").distinct().show()   

+-----------------+
|       issue_date|
+-----------------+
|17 September 2017|
|  16 October 2016|
|      18 May 2018|
| 19 December 2019|
|   19 August 2019|
| 15 December 2015|
|    14 April 2014|
|     16 June 2016|
|18 September 2018|
|  13 January 2013|
|  18 October 2018|
|   16 August 2016|
|     19 July 2019|
| 16 February 2016|
|  17 January 2017|
|     13 June 2013|
| 19 November 2019|
| 15 February 2015|
|  15 January 2015|
|  15 October 2015|
+-----------------+
only showing top 20 rows



In [51]:
def standardize_date(df, column):
    """
    Standardizes the date format of a specified column in a DataFrame to ISO format.
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame containing the date column to be standardized.
        column (str): The name of the column in the DataFrame that contains the date values to be standardized.
    Returns:
        pyspark.sql.DataFrame: A new DataFrame with an additional column 'clean_date' containing the standardized date values in ISO format (yyyy-MM-dd).
    Example:
        df = standardize_date(df, 'date_column')
    """
    
    df = df.withColumn("clean_" + column, fn.to_date(fn.col(column), "dd MMMM yyyy"))

    print(f"Column '{column}' date format standardized to ISO format")

    return df

In [52]:
df_encoded = standardize_date(df_encoded, "issue_date")
df_encoded.show()

Column 'issue_date' date format standardized to ISO format
+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+-------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+---------------+----------+---------------+------------+----------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|       loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|verification_status_verified|verification_status_source_verified|ve

#### check consistency of the date columns


In [53]:
df_encoded.select("issue_date", "clean_issue_date").show(50)

+----------------+----------------+
|      issue_date|clean_issue_date|
+----------------+----------------+
|  13 August 2013|      2013-08-13|
|  13 August 2013|      2013-08-13|
|  13 August 2013|      2013-08-13|
|  13 August 2013|      2013-08-13|
|  13 August 2013|      2013-08-13|
|  13 August 2013|      2013-08-13|
|  13 August 2013|      2013-08-13|
|  13 August 2013|      2013-08-13|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016-12-16|
|16 December 2016|      2016

In [54]:
def feature_engineering(df, issue_date_column, grade_column, loan_amount_column, state_column):
    """
    Perform feature engineering on the given DataFrame by adding new columns that provide information
    about previous loan dates and amounts based on grade and state-grade combinations.
    Parameters:
    df (DataFrame): The input Spark DataFrame containing loan data.
    issue_date_column (str): The name of the column containing the issue date of the loans.
    grade_column (str): The name of the column containing the grade of the loans.
    loan_amount_column (str): The name of the column containing the loan amounts.
    state_column (str): The name of the column containing the state information.
    Returns:
    DataFrame: A new DataFrame with additional columns:
        - "prev_loan_date_same_grade": The issue date of the previous loan with the same grade.
        - "prev_loan_amount_same_grade": The amount of the previous loan with the same grade.
        - "prev_loan_date_same_state_grade": The issue date of the previous loan with the same state and grade.
        - "prev_loan_amount_same_state_grade": The amount of the previous loan with the same state and grade.
    """

    grade_window = Window.partitionBy(grade_column).orderBy(issue_date_column)

    df = df.withColumn("prev_loan_date_same_grade", fn.lag(issue_date_column, 1).over(grade_window))
    print(f"Previous loan issue date from the same grade is added")
    df.show()

    df = df.withColumn("prev_loan_amount_same_grade", fn.lag(loan_amount_column, 1).over(grade_window))
    print(f"Previous loan amount from the same grade is added")
    df.show()

    state_grade_window = Window.partitionBy(state_column, grade_column).orderBy(issue_date_column)

    df = df.withColumn("prev_loan_date_same_state_grade", lag(issue_date_column).over(state_grade_window))
    print(f"Previous loan issue date from the same state and grade is added")
    df.show()

    df = df.withColumn("prev_loan_amount_same_state_grade", lag(loan_amount_column).over(state_grade_window))
    print(f"Previous loan amount from the same state and grade is added")
    df.show()

    return df

In [55]:
df_encoded = feature_engineering(df_encoded, "clean_issue_date", "letter_grade", "loan_amount", "state")

Previous loan issue date from the same grade is added
+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+-------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+---------------+----------+---------------+------------+----------------+-------------------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|verification_status_verified|verification_status_source_veri

In [56]:
check_missing_values(df_encoded)

Column 'customer_id' has 0 missing values
Column 'emp_title' has 0 missing values
Column 'emp_length' has 0 missing values
Column 'annual_inc' has 0 missing values
Column 'annual_inc_joint' has 0 missing values
Column 'zip_code' has 0 missing values
Column 'addr_state' has 0 missing values
Column 'avg_cur_bal' has 0 missing values
Column 'tot_cur_bal' has 0 missing values
Column 'loan_id' has 0 missing values
Column 'loan_status' has 0 missing values
Column 'loan_amount' has 0 missing values
Column 'state' has 0 missing values
Column 'funded_amount' has 0 missing values
Column 'term' has 0 missing values
Column 'int_rate' has 0 missing values
Column 'grade' has 0 missing values
Column 'issue_date' has 0 missing values
Column 'pymnt_plan' has 0 missing values
Column 'purpose' has 0 missing values
Column 'description' has 0 missing values
Column 'home_ownership_own' has 0 missing values
Column 'home_ownership_rent' has 0 missing values
Column 'home_ownership_mortgage' has 0 missing value

In [57]:
df_encoded.rdd.getNumPartitions()

3

## Part 5: Analysis SQL vs Spark


In [58]:
df_encoded.createOrReplaceTempView("loans")

1.  Identify the average loan amount and interest rate for loans marked as "Fully Paid" in the Loan Status, grouped by Emp Length and annual income ranges.


In [66]:
df_encoded.describe("annual_inc").show()

+-------+------------------+
|summary|        annual_inc|
+-------+------------------+
|  count|             27030|
|   mean| 80384.02745567886|
| stddev|105958.59546508544|
|    min|            2000.0|
|    max|         7691779.0|
+-------+------------------+



In [67]:
# iqr for annual inc
percentiles = df_encoded.approxQuantile("annual_inc", [0.25, 0.50, 0.75, 0.90], 0)
iqr = percentiles[2] - percentiles[0]
lower_bound = percentiles[0] - 1.5 * iqr
upper_bound = percentiles[2] + 1.5 * iqr

print(f"Percentiles: {percentiles}")
print(f"Lower bound: {lower_bound}, Upper bound: {upper_bound}")


Percentiles: [47840.0, 66300.0, 95000.0, 133000.0]
Lower bound: -22900.0, Upper bound: 165740.0


We could bin the annual income as follows:

- Low: < 45,000
- Medium: 45,000 - 95,000
- High: 95,000 - 135,000
- Very High: > 135,000


#### SQL Query


In [74]:
q1_sql = """
        SELECT emp_length,
        CASE
            WHEN annual_inc < 45000 THEN 'Low Income'
            WHEN annual_inc >= 45000 AND annual_inc < 95000 THEN 'Medium Income'
            WHEN annual_inc >= 95000 AND annual_inc < 135000 THEN 'High Income'
            ELSE 'Very High Income'
        END AS income_range,
        AVG(loan_amount) AS avg_loan_amount,
        AVG(int_rate) AS avg_int_rate
        FROM loans
        WHERE loan_status = 'Fully Paid'
        GROUP BY emp_length, income_range
        ORDER BY emp_length, income_range
        """

spark.sql(q1_sql).show(44)

+----------+----------------+------------------+-------------------+
|emp_length|    income_range|   avg_loan_amount|       avg_int_rate|
+----------+----------------+------------------+-------------------+
|       0.5|     High Income|18976.785714285714|0.12183809523809523|
|       0.5|      Low Income| 8521.337579617835|0.13147707006369427|
|       0.5|   Medium Income| 13423.27868852459| 0.1181554098360656|
|       0.5|Very High Income|22304.166666666668|0.10875151515151515|
|       1.0|     High Income| 18556.73076923077| 0.1199576923076923|
|       1.0|      Low Income| 7868.840579710145|0.12871884057971014|
|       1.0|   Medium Income|12712.240663900415|0.12310995850622407|
|       1.0|Very High Income|24143.243243243243|0.09983243243243242|
|       2.0|     High Income|18204.320987654322|0.11271358024691358|
|       2.0|      Low Income| 7876.541095890411| 0.1278527397260274|
|       2.0|   Medium Income| 12854.40340909091|0.11735426136363637|
|       2.0|Very High Income|22360

In [75]:
df_encoded.filter(col("loan_status") == 'Fully Paid').show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+-------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+---------------+----------+---------------+------------+----------------+-------------------------+---------------------------+-------------------------------+---------------------------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|verification_status_

#### Spark Query


In [76]:
q1_spark = df_encoded.withColumn("income_range", 
    when(col("annual_inc") < 45000, "Low Income")
    .when((col("annual_inc") >= 45000) & (col("annual_inc") < 95000), "Medium Income")
    .when((col("annual_inc") >= 95000) & (col("annual_inc") < 135000), "High Income")
    .otherwise("Very High Income")
).filter(col("loan_status") == 'Fully Paid').groupBy("emp_length", "income_range").agg(
    fn.avg("loan_amount").alias("avg_loan_amount"),
    fn.avg("int_rate").alias("avg_int_rate")
).orderBy("emp_length", "income_range")

q1_spark.show(44)


+----------+----------------+------------------+-------------------+
|emp_length|    income_range|   avg_loan_amount|       avg_int_rate|
+----------+----------------+------------------+-------------------+
|       0.5|     High Income|18976.785714285714|0.12183809523809523|
|       0.5|      Low Income| 8521.337579617835|0.13147707006369427|
|       0.5|   Medium Income| 13423.27868852459| 0.1181554098360656|
|       0.5|Very High Income|22304.166666666668|0.10875151515151515|
|       1.0|     High Income| 18556.73076923077| 0.1199576923076923|
|       1.0|      Low Income| 7868.840579710145|0.12871884057971014|
|       1.0|   Medium Income|12712.240663900415|0.12310995850622407|
|       1.0|Very High Income|24143.243243243243|0.09983243243243242|
|       2.0|     High Income|18204.320987654322|0.11271358024691358|
|       2.0|      Low Income| 7876.541095890411| 0.1278527397260274|
|       2.0|   Medium Income| 12854.40340909091|0.11735426136363637|
|       2.0|Very High Income|22360

2.  Calculate the average difference between Loan Amount and Funded Amount for each loan Grade and sort by the grades with the largest differences.


#### SQL Query


In [77]:
q2_sql = """
SELECT letter_grade, AVG(loan_amount - funded_amount) AS avg_diff
FROM loans
GROUP BY letter_grade
ORDER BY avg_diff DESC
"""

spark.sql(q2_sql).show()

+------------+--------+
|letter_grade|avg_diff|
+------------+--------+
|           F|     0.0|
|           E|     0.0|
|           B|     0.0|
|           D|     0.0|
|           C|     0.0|
|           A|     0.0|
|           G|     0.0|
+------------+--------+



#### Spark Query


In [78]:
q2_spark = df_encoded.select("letter_grade", (col("loan_amount") - col("funded_amount"))
                             .alias("diff")).groupBy("letter_grade").agg(fn.avg("diff").alias("avg_diff")).orderBy("avg_diff", ascending=False)

q2_spark.show()

+------------+--------+
|letter_grade|avg_diff|
+------------+--------+
|           F|     0.0|
|           E|     0.0|
|           B|     0.0|
|           D|     0.0|
|           C|     0.0|
|           A|     0.0|
|           G|     0.0|
+------------+--------+



3. Compare the total Loan Amount for loans with "Verified" and "Not Verified" Verification Status across each state (Addr State).


#### SQL Query


In [79]:
q3_sql = """
SELECT addr_state, SUM(verification_status_verified * loan_amount) AS total_verified_loan_amount, SUM(verification_status_not_verified * loan_amount) AS total_not_verified_loan_amount
FROM loans
GROUP BY addr_state
ORDER BY addr_state
"""

spark.sql(q3_sql).show(50)

+----------+--------------------------+------------------------------+
|addr_state|total_verified_loan_amount|total_not_verified_loan_amount|
+----------+--------------------------+------------------------------+
|        AK|                  180850.0|                      260900.0|
|        AL|                 1657500.0|                     1204100.0|
|        AR|                 1113625.0|                      895850.0|
|        AZ|                 2831050.0|                     3068500.0|
|        CA|               1.5920375E7|                   1.8114825E7|
|        CO|                 2237400.0|                     2705500.0|
|        CT|                 1768750.0|                     2524225.0|
|        DC|                  322325.0|                      410825.0|
|        DE|                  391275.0|                      375325.0|
|        FL|                 7655725.0|                     9928375.0|
|        GA|                 3580500.0|                     4335850.0|
|     

#### Spark Query


In [80]:
q3_spark = df_encoded.groupby("addr_state").agg(
    fn.sum(col("verification_status_VERIFIED") * col("loan_amount")).alias("total_verified_loan_amount"),
    fn.sum(col("verification_status_NOT_VERIFIED") * col("loan_amount")).alias("total_not_verified_loan_amount")
).orderBy("addr_state")

q3_spark.show(50)

+----------+--------------------------+------------------------------+
|addr_state|total_verified_loan_amount|total_not_verified_loan_amount|
+----------+--------------------------+------------------------------+
|        AK|                  180850.0|                      260900.0|
|        AL|                 1657500.0|                     1204100.0|
|        AR|                 1113625.0|                      895850.0|
|        AZ|                 2831050.0|                     3068500.0|
|        CA|               1.5920375E7|                   1.8114825E7|
|        CO|                 2237400.0|                     2705500.0|
|        CT|                 1768750.0|                     2524225.0|
|        DC|                  322325.0|                      410825.0|
|        DE|                  391275.0|                      375325.0|
|        FL|                 7655725.0|                     9928375.0|
|        GA|                 3580500.0|                     4335850.0|
|     

4.  Calculate the average time gap (in days) between consecutive loans for each grade using the new features you added in the feature engineering phase.


#### SQL Query


In [81]:
q4_sql = """
    SELECT letter_grade, AVG(DATEDIFF(clean_issue_date, prev_loan_date_same_grade)) AS avg_time_gap_days
    FROM loans
    WHERE prev_loan_date_same_grade IS NOT NULL
    GROUP BY letter_grade
    ORDER BY letter_grade
"""

spark.sql(q4_sql).show()

+------------+-------------------+
|letter_grade|  avg_time_gap_days|
+------------+-------------------+
|           A| 0.4526692819375746|
|           B|0.33811862485833016|
|           C| 0.3557705048363588|
|           D| 0.6900539707016191|
|           E| 1.8901960784313725|
|           F| 6.2936507936507935|
|           G|         17.1640625|
+------------+-------------------+



#### Spark Query


In [82]:
q4_spark = df_encoded.filter(col("prev_loan_date_same_grade").isNotNull()).groupBy("letter_grade")\
                    .agg(fn.avg(fn.datediff(col("clean_issue_date"), col("prev_loan_date_same_grade"))).alias("avg_time_gap_days"))\
                    .orderBy("letter_grade")

q4_spark.show()

+------------+-------------------+
|letter_grade|  avg_time_gap_days|
+------------+-------------------+
|           A| 0.4526692819375746|
|           B|0.33811862485833016|
|           C| 0.3557705048363588|
|           D| 0.6900539707016191|
|           E| 1.8901960784313725|
|           F| 6.2936507936507935|
|           G|         17.1640625|
+------------+-------------------+



5. Identify the average difference in loan amounts between consecutive loans within the same state and grade combination.


#### SQL Query


In [83]:
q5_sql = """
    SELECT addr_state, letter_grade, AVG(ABS(loan_amount - prev_loan_amount_same_state_grade)) AS avg_loan_amount_diff
    FROM loans
    WHERE prev_loan_amount_same_state_grade IS NOT NULL
    GROUP BY addr_state, letter_grade
    ORDER BY addr_state, letter_grade
"""

spark.sql(q5_sql).show(350)

+----------+------------+--------------------+
|addr_state|letter_grade|avg_loan_amount_diff|
+----------+------------+--------------------+
|        AK|           A|   6164.285714285715|
|        AK|           B|  12660.227272727272|
|        AK|           C|  11964.285714285714|
|        AK|           D|             11175.0|
|        AK|           E|              9500.0|
|        AL|           A|   8478.365384615385|
|        AL|           B|  10426.898734177215|
|        AL|           C|  10857.926829268292|
|        AL|           D|   9186.538461538461|
|        AL|           E|              8290.0|
|        AL|           F|             15200.0|
|        AL|           G|              5075.0|
|        AR|           A|  10522.916666666666|
|        AR|           B|   8684.154929577464|
|        AR|           C|              9157.0|
|        AR|           D|   9905.357142857143|
|        AR|           E|            13093.75|
|        AR|           F|  10033.333333333334|
|        AZ| 

#### Spark Query


In [84]:
q5_spark = df_encoded.filter(col("prev_loan_amount_same_state_grade").isNotNull()).groupBy("addr_state", "letter_grade")\
                    .agg(fn.avg(fn.abs(col("loan_amount") - col("prev_loan_amount_same_state_grade"))).alias("avg_loan_amount_diff"))\
                    .orderBy("addr_state", "letter_grade")


q5_spark.show(350)

+----------+------------+--------------------+
|addr_state|letter_grade|avg_loan_amount_diff|
+----------+------------+--------------------+
|        AK|           A|   6164.285714285715|
|        AK|           B|  12660.227272727272|
|        AK|           C|  11964.285714285714|
|        AK|           D|             11175.0|
|        AK|           E|              9500.0|
|        AL|           A|   8478.365384615385|
|        AL|           B|  10426.898734177215|
|        AL|           C|  10857.926829268292|
|        AL|           D|   9186.538461538461|
|        AL|           E|              8290.0|
|        AL|           F|             15200.0|
|        AL|           G|              5075.0|
|        AR|           A|  10522.916666666666|
|        AR|           B|   8684.154929577464|
|        AR|           C|              9157.0|
|        AR|           D|   9905.357142857143|
|        AR|           E|            13093.75|
|        AR|           F|  10033.333333333334|
|        AZ| 

In [103]:
df_encoded = df_encoded.repartition(16)

In [104]:
df_encoded.show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+-------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+---------------+----------+---------------+------------+----------------+-------------------------+---------------------------+-------------------------------+---------------------------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|verification_status_

In [105]:
df_encoded.rdd.getNumPartitions()

16

In [107]:
lookup_table.show(n = lookup_table.count(), truncate=False)

+----------+------------------+-------+
|Column    |Original          |Encoded|
+----------+------------------+-------+
|emp_length|10+ years         |10.0   |
|emp_length|9 years           |9.0    |
|emp_length|8 years           |8.0    |
|emp_length|7 years           |7.0    |
|emp_length|6 years           |6.0    |
|emp_length|5 years           |5.0    |
|emp_length|4 years           |4.0    |
|emp_length|3 years           |3.0    |
|emp_length|2 years           |2.0    |
|emp_length|1 year            |1.0    |
|emp_length|< 1 year          |0.5    |
|state     |AK                |1.0    |
|state     |AL                |2.0    |
|state     |AR                |3.0    |
|state     |AZ                |4.0    |
|state     |CA                |5.0    |
|state     |CO                |6.0    |
|state     |CT                |7.0    |
|state     |DC                |8.0    |
|state     |DE                |9.0    |
|state     |FL                |10.0   |
|state     |GA                |11.0   |


In [108]:
lookup_table.rdd.getNumPartitions()

16

## Part 6: Saving the Cleaned Dataset and Lookup Table


In [109]:
df_file_path = "fintech_spark_52_14669_clean.parquet"
lookup_table_file_path = "lookup_spark_52_14669.parquet"

df_encoded.write.mode("overwrite").parquet(df_file_path)
lookup_table.write.mode("overwrite").parquet(lookup_table_file_path)


## Bonus: Saving the output into a postgres database


In [110]:
df_pandas = df_encoded.toPandas()

In [111]:
df_pandas.head()

Unnamed: 0,customer_id,emp_title,emp_length,annual_inc,annual_inc_joint,zip_code,addr_state,avg_cur_bal,tot_cur_bal,loan_id,...,verification_status_not_verified,type_direct_pay,type_joint,type_individual,letter_grade,clean_issue_date,prev_loan_date_same_grade,prev_loan_amount_same_grade,prev_loan_date_same_state_grade,prev_loan_amount_same_state_grade
0,YicuXHhlYn1RNUJceGRiXHhjNFx4ZWRceGFmSTFceGE1XH...,Teacher,10.0,60000.0,0.0,851xx,AZ,14860.0,237754.0,216186,...,1,0,0,1,B,2019-04-19,2019-04-19,11000.0,2019-04-19,25000.0
1,YidceGYxXHhhNlx4OGFceDA1XHhmMVx4YTRceDhmMSp9XH...,Technician,1.0,100000.0,0.0,286xx,NC,5444.0,65331.0,205735,...,0,0,0,1,B,2018-11-18,2018-11-18,25000.0,2018-10-18,25000.0
2,YidceGRkTkNtXHhlMFx4ZjBoXHhjYndcXGBceDE4IFx4MG...,Customs analyst,2.0,50000.0,0.0,480xx,MI,2470.0,14819.0,67155,...,1,0,0,1,B,2019-08-19,2019-08-19,2200.0,2019-08-19,18000.0
3,YidceDFhflx4ZGJceGQ4XHhkMypZO3NDXHhmN1x4ZjkoMV...,cna,6.0,30000.0,0.0,633xx,MO,10238.0,71665.0,103074,...,0,0,0,1,E,2015-01-15,2015-01-15,24000.0,2014-11-14,20000.0
4,YidceGU0XHgwMFx4OTJceDFlbExbXHhhMVVceGNhXHgxM1...,Program Manager,10.0,100000.0,0.0,604xx,IL,16789.0,285409.0,205051,...,0,0,0,1,B,2013-11-13,2013-11-13,20000.0,2013-10-13,17500.0


In [112]:
lookup_table_pandas = lookup_table.toPandas()

In [113]:
lookup_table_pandas.head()

Unnamed: 0,Column,Original,Encoded
0,emp_length,10+ years,10.0
1,emp_length,9 years,9.0
2,emp_length,8 years,8.0
3,emp_length,7 years,7.0
4,emp_length,6 years,6.0


In [114]:
from sqlalchemy import create_engine

engine = create_engine('postgresql://postgres.fvbykscexuguvhyznrqt:vermithor54321@aws-0-eu-central-1.pooler.supabase.com:6543/postgres')

if (engine.connect()):
    print("Connected to database")
else:
    print("Connection failed")

df_pandas.to_sql('fintech_spark_52_14669_clean_table', engine, if_exists='replace', index=False)
lookup_table_pandas.to_sql('lookup_spark_52_14669_table', engine, if_exists='replace', index=False)

Connected to database


74