### Part 1: Loading the dataset

In [129]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, isnan, isnull, when, lit, count, desc, first,  regexp_extract, lag,to_date, avg, datediff, sum
from pyspark.sql.types import NumericType, StringType, FloatType, StructType, StructField
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.window import Window


In [130]:
import os
spark = SparkSession.builder.appName("MS3").getOrCreate()
sc = spark.sparkContext

df = spark.read.parquet("./fintech_data_43_52_0812.parquet")


In [131]:
df.show()


+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|       Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidPVlx4ZTdceDkwN.

In [132]:
num_partitions = df.rdd.getNumPartitions()
print("Number of partitions: ", num_partitions)

Number of partitions:  1


In [133]:
import psutil

logical_cores = psutil.cpu_count(logical=True)  # Logical cores
physical_cores = psutil.cpu_count(logical=False)  # Physical cores

print(f"Logical cores: {logical_cores}")
print(f"Physical cores: {physical_cores}")

df_repartitioned = df.repartition(logical_cores)
num_partitions = df_repartitioned.rdd.getNumPartitions()
print("Number of partitions after repartitioning: ", num_partitions)

Logical cores: 12
Physical cores: 10
Number of partitions after repartitioning:  12


### Part 2: Cleaning

##### 1. Rename All Columns

In [134]:
original_columns = df.columns

new_columns = [col.replace(' ', '_').lower() for col in original_columns]

df = df.toDF(*new_columns)


In [135]:
df.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|       loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidPVlx4ZTdceDkwN.

##### 2. Detect Missing

In [136]:

def missing_percentage(df):
    total_count = df.count()
    missing_data = {}
    
    for column in df.columns:        
        condition = isnull(col(column))
        missing_count = df.filter(condition).count()
        
        missing_percentage = (missing_count / total_count) * 100
        missing_data[column] = missing_percentage
    
    return missing_data


In [137]:
missing_info = missing_percentage(df)
print("Missing data:")
for col_name, percentage in missing_info.items():
    print(f"{col_name}: {percentage:.2f}%")


Missing data:
customer_id: 0.00%
emp_title: 8.68%
emp_length: 6.93%
home_ownership: 0.00%
annual_inc: 0.00%
annual_inc_joint: 93.15%
verification_status: 0.00%
zip_code: 0.00%
addr_state: 0.00%
avg_cur_bal: 0.00%
tot_cur_bal: 0.00%
loan_id: 0.00%
loan_status: 0.00%
loan_amount: 0.00%
state: 0.00%
funded_amount: 0.00%
term: 0.00%
int_rate: 4.48%
grade: 0.00%
issue_date: 0.00%
pymnt_plan: 0.00%
type: 0.00%
purpose: 0.00%
description: 0.79%


##### 3. Handle Missing Values

In [138]:
# Identify numerical and categorical columns
numerical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]
categorical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]

df = df.fillna(0, subset=numerical_cols)

mode_dict = {}
for col_name in categorical_cols:

    mode_row = (
        df.filter(col(col_name).isNotNull())
          .groupBy(col_name)
          .count()
          .orderBy(desc('count'))
          .first()
    )
    mode_value = mode_row[col_name]
    mode_dict[col_name] = mode_value

df = df.fillna(value = mode_dict)


In [139]:
missing_info = missing_percentage(df)
print("Missing data:")
for col_name, percentage in missing_info.items():
    print(f"{col_name}: {percentage:.2f}%")

Missing data:
customer_id: 0.00%
emp_title: 0.00%
emp_length: 0.00%
home_ownership: 0.00%
annual_inc: 0.00%
annual_inc_joint: 0.00%
verification_status: 0.00%
zip_code: 0.00%
addr_state: 0.00%
avg_cur_bal: 0.00%
tot_cur_bal: 0.00%
loan_id: 0.00%
loan_status: 0.00%
loan_amount: 0.00%
state: 0.00%
funded_amount: 0.00%
term: 0.00%
int_rate: 0.00%
grade: 0.00%
issue_date: 0.00%
pymnt_plan: 0.00%
type: 0.00%
purpose: 0.00%
description: 0.00%


### Part 3: Encoding

In [140]:
schema = StructType([
    StructField('column_name', StringType(), True),
    StructField('original_value', StringType(), True),
    StructField('encoded_value', StringType(), True)
])

lookup = spark.createDataFrame([], schema)


In [141]:
df = df.withColumn(
    'emp_length',
    when(col('emp_length') == '< 1 year', 0.5)
    .when(col('emp_length') == '10+ years', 10)
    .otherwise(regexp_extract(col('emp_length'), r'(\d+)', 1).cast(FloatType()))
)

In [142]:
df = df.withColumn(
    'grade_letter',
    when((col('grade') >= 1) & (col('grade') <= 5), 'A')
    .when((col('grade') >= 6) & (col('grade') <= 10), 'B')
    .when((col('grade') >= 11) & (col('grade') <= 15), 'C')
    .when((col('grade') >= 16) & (col('grade') <= 20), 'D')
    .when((col('grade') >= 21) & (col('grade') <= 25), 'E')
    .when((col('grade') >= 26) & (col('grade') <= 30), 'F')
    .when((col('grade') >= 31) & (col('grade') <= 35), 'G')
    .otherwise('Unknown')
)

In [None]:
grade_mappings = []

def map_grade(grade):
    if 1 <= grade <= 5:
        return 'A'
    elif 6 <= grade <= 10:
        return 'B'
    elif 11 <= grade <= 15:
        return 'C'
    elif 16 <= grade <= 20:
        return 'D'
    elif 21 <= grade <= 25:
        return 'E'
    elif 26 <= grade <= 30:
        return 'F'
    elif 31 <= grade <= 35:
        return 'G'
    else:
        return 'Unknown'
    
for i in range(1, 36):
    letter = map_grade(i)
    grade_mappings.append(('grade', str(i), letter))

grade_lookup_df = spark.createDataFrame(grade_mappings, ['column_name', 'original_value', 'encoded_value'])

lookup = lookup.union(grade_lookup_df)
lookup.show()

+-----------+--------------+-------------+
|column_name|original_value|encoded_value|
+-----------+--------------+-------------+
|      grade|             1|            A|
|      grade|             2|            A|
|      grade|             3|            A|
|      grade|             4|            A|
|      grade|             5|            A|
|      grade|             6|            B|
|      grade|             7|            B|
|      grade|             8|            B|
|      grade|             9|            B|
|      grade|            10|            B|
|      grade|            11|            C|
|      grade|            12|            C|
|      grade|            13|            C|
|      grade|            14|            C|
|      grade|            15|            C|
|      grade|            16|            D|
|      grade|            17|            D|
|      grade|            18|            D|
|      grade|            19|            D|
|      grade|            20|            D|
+----------

In [144]:
state_indexer = StringIndexer(inputCol='state', outputCol='state_encoded', handleInvalid='keep')
state_indexer_model = state_indexer.fit(df)

purpose_indexer = StringIndexer(inputCol='purpose', outputCol='purpose_encoded', handleInvalid='keep')
purpose_indexer_model = purpose_indexer.fit(df)



In [145]:
state_labels = state_indexer_model.labels
purpose_labels = purpose_indexer_model.labels

state_mappings = [('state', label, float(index)) for index, label in enumerate(state_labels)]
purpose_mappings = [('purpose', label, float(index)) for index, label in enumerate(purpose_labels)]

state_lookup_df = spark.createDataFrame(state_mappings, ['column_name', 'original_value', 'encoded_value'])
purpose_lookup_df = spark.createDataFrame(purpose_mappings, ['column_name', 'original_value', 'encoded_value'])

lookup = lookup.union(state_lookup_df).union(purpose_lookup_df)


In [146]:
lookup.show(50)

+-----------+--------------+-------------+
|column_name|original_value|encoded_value|
+-----------+--------------+-------------+
|      grade|             1|            A|
|      grade|             2|            A|
|      grade|             3|            A|
|      grade|             4|            A|
|      grade|             5|            A|
|      grade|             6|            B|
|      grade|             7|            B|
|      grade|             8|            B|
|      grade|             9|            B|
|      grade|            10|            B|
|      grade|            11|            C|
|      grade|            12|            C|
|      grade|            13|            C|
|      grade|            14|            C|
|      grade|            15|            C|
|      grade|            16|            D|
|      grade|            17|            D|
|      grade|            18|            D|
|      grade|            19|            D|
|      grade|            20|            D|
|      grad

In [147]:
# Create a list of pipeline stages
pipeline_stages = [
    state_indexer,
    purpose_indexer
]

pipeline = Pipeline(stages=pipeline_stages)

df = pipeline.fit(df).transform(df)


In [148]:
home_ownership_categories = [row[0] for row in df.select('home_ownership').distinct().collect()]
print("Home Ownership Categories:", home_ownership_categories)
for category in home_ownership_categories:
    column_name = f'home_ownership_{category}'
    df = df.withColumn(
        column_name,
        when(col('home_ownership') == category, 1).otherwise(0)
    )


Home Ownership Categories: ['OWN', 'RENT', 'MORTGAGE', 'ANY']


In [149]:
verification_status_categories = [row[0] for row in df.select('verification_status').distinct().collect()]
print("Verification Status Categories:", verification_status_categories)
for category in verification_status_categories:
    column_name = f'verification_status_{category}'
    df = df.withColumn(
        column_name,
        when(col('verification_status') == category, 1).otherwise(0)
    )


Verification Status Categories: ['Verified', 'Source Verified', 'Not Verified']


In [150]:
type_categories = [row[0] for row in df.select('type').distinct().collect()]
print("Type Categories:", type_categories)
for category in type_categories:
    column_name = f'type_{category}'
    df = df.withColumn(
        column_name,
        when(col('type') == category, 1).otherwise(0)
    )


Type Categories: ['Joint App', 'Individual', 'DIRECT_PAY', 'JOINT', 'INDIVIDUAL']


In [151]:
selected_columns = (
    ['emp_length', 'grade', 'grade_letter'] +
    ['home_ownership'] + [f'home_ownership_{cat}' for cat in home_ownership_categories] +
    ['verification_status'] + [f'verification_status_{cat}' for cat in verification_status_categories] +
    ['state', 'state_encoded'] +
    ['type'] + [f'type_{cat}' for cat in type_categories] +
    ['purpose', 'purpose_encoded']
)

df.select(*selected_columns).show(5, truncate=False)

+----------+-----+------------+--------------+------------------+-------------------+-----------------------+------------------+-------------------+----------------------------+-----------------------------------+--------------------------------+-----+-------------+----------+--------------+---------------+---------------+----------+---------------+------------------+---------------+
|emp_length|grade|grade_letter|home_ownership|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|verification_status|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|state|state_encoded|type      |type_Joint App|type_Individual|type_DIRECT_PAY|type_JOINT|type_INDIVIDUAL|purpose           |purpose_encoded|
+----------+-----+------------+--------------+------------------+-------------------+-----------------------+------------------+-------------------+----------------------------+-----------------------------------+-------------

### Part 4: Feature Engineering

In [152]:
def add_previous_loan_features(df):
    df = df.withColumn('issue_date', to_date(col('issue_date'), 'dd MMMM yyyy'))


    grade_window = Window.partitionBy('grade').orderBy('issue_date')
    state_grade_window = Window.partitionBy('state', 'grade').orderBy('issue_date')

    # Compute previous loan features
    df = df.withColumn('prev_issue_date_same_grade', lag('issue_date').over(grade_window))
    df = df.withColumn('prev_loan_amount_same_grade', lag('loan_amount').over(grade_window))
    df = df.withColumn('prev_issue_date_same_state_grade', lag('issue_date').over(state_grade_window))
    df = df.withColumn('prev_loan_amount_same_state_grade', lag('loan_amount').over(state_grade_window))

    return df


In [153]:
df = add_previous_loan_features(df)

df.select(
    'issue_date', 'grade', 'state', 'loan_amount',
    'prev_issue_date_same_grade', 'prev_loan_amount_same_grade',
    'prev_issue_date_same_state_grade', 'prev_loan_amount_same_state_grade'
).show(10)


+----------+-----+-----+-----------+--------------------------+---------------------------+--------------------------------+---------------------------------+
|issue_date|grade|state|loan_amount|prev_issue_date_same_grade|prev_loan_amount_same_grade|prev_issue_date_same_state_grade|prev_loan_amount_same_state_grade|
+----------+-----+-----+-----------+--------------------------+---------------------------+--------------------------------+---------------------------------+
|2017-11-17|    1|   AK|    21000.0|                2017-11-17|                    12000.0|                            NULL|                             NULL|
|2018-08-18|    1|   AK|    35000.0|                2018-08-18|                     4000.0|                      2017-11-17|                          21000.0|
|2019-01-19|    1|   AK|    10000.0|                2019-01-19|                    16000.0|                      2018-08-18|                          35000.0|
|2017-04-17|    2|   AK|    30000.0|          

### Part 5: Analysis SQL vs Spark


1. Identify the average loan amount and interest rate for loans marked as
"Default" in the Loan Status, grouped by Emp Length and annual income ranges.

In [154]:
df.show(2)
loan_status_categories = [row[0] for row in df.select('loan_status').distinct().collect()]
print("Loan Status Categories:", loan_status_categories)


+--------------------+---------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+------------------+------------+-------------+---------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+--------------------------+---------------------------+--------------------------------+---------------------------------+
|         customer_id|      emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|      type|           purpose|     

In [155]:
df.createOrReplaceTempView("fintech")

query = """
SELECT
    emp_length,
    CASE
        WHEN annual_inc < 20000 THEN 'Under $20k'
        WHEN annual_inc >= 20000 AND annual_inc < 40000 THEN '$20k - $40k'
        WHEN annual_inc >= 40000 AND annual_inc < 60000 THEN '$40k - $60k'
        WHEN annual_inc >= 60000 AND annual_inc < 80000 THEN '$60k - $80k'
        WHEN annual_inc >= 80000 AND annual_inc < 100000 THEN '$80k - $100k'
        ELSE '$100k and above'
    END AS income_range,
    AVG(loan_amount) AS avg_loan_amount,
    AVG(int_rate) AS avg_int_rate
FROM
    fintech
WHERE
    loan_status = 'Current'
GROUP BY
    emp_length,
    income_range
"""

result_df = spark.sql(query)
result_df.show()


+----------+---------------+------------------+-------------------+
|emp_length|   income_range|   avg_loan_amount|       avg_int_rate|
+----------+---------------+------------------+-------------------+
|       8.0|   $80k - $100k|16931.052631578947| 0.1269084210526316|
|       2.0|    $40k - $60k| 12700.37688442211|0.12375603015075379|
|       4.0|    $40k - $60k|    12542.08984375|0.13204453125000004|
|       3.0|    $40k - $60k|11908.090185676392|0.12477055702917764|
|       9.0|$100k and above|23534.601449275364| 0.1145789855072464|
|       0.5|    $60k - $80k| 16628.10650887574|0.12550857988165676|
|       8.0|    $20k - $40k| 10079.62962962963|0.12387592592592592|
|       7.0|$100k and above|21381.969696969696|0.11129090909090919|
|       8.0|    $40k - $60k|13156.612903225807| 0.1329161290322581|
|       5.0|   $80k - $100k|18131.676136363636| 0.1242278409090909|
|       5.0|$100k and above|22477.911646586344| 0.1163646586345382|
|       1.0|    $40k - $60k|11677.083333333334|0

In [156]:
df_current = df.filter(df.loan_status == 'Current')

df_current = df_current.withColumn(
    'income_range',
    when(df.annual_inc < 20000, 'Under $20k')
    .when((df.annual_inc >= 20000) & (df.annual_inc < 40000), '$20k - $40k')
    .when((df.annual_inc >= 40000) & (df.annual_inc < 60000), '$40k - $60k')
    .when((df.annual_inc >= 60000) & (df.annual_inc < 80000), '$60k - $80k')
    .when((df.annual_inc >= 80000) & (df.annual_inc < 100000), '$80k - $100k')
    .otherwise('$100k and above')
)

result = df_current.groupBy('emp_length', 'income_range').agg(
    avg('loan_amount').alias('avg_loan_amount'),
    avg('int_rate').alias('avg_int_rate')
)

result.show()


+----------+---------------+------------------+-------------------+
|emp_length|   income_range|   avg_loan_amount|       avg_int_rate|
+----------+---------------+------------------+-------------------+
|       8.0|   $80k - $100k|16931.052631578947| 0.1269084210526316|
|       2.0|    $40k - $60k| 12700.37688442211|0.12375603015075379|
|       4.0|    $40k - $60k|    12542.08984375|0.13204453125000004|
|       3.0|    $40k - $60k|11908.090185676392|0.12477055702917764|
|       9.0|$100k and above|23534.601449275364| 0.1145789855072464|
|       0.5|    $60k - $80k| 16628.10650887574|0.12550857988165676|
|       8.0|    $20k - $40k| 10079.62962962963|0.12387592592592592|
|       7.0|$100k and above|21381.969696969696|0.11129090909090919|
|       8.0|    $40k - $60k|13156.612903225807| 0.1329161290322581|
|       5.0|   $80k - $100k|18131.676136363636| 0.1242278409090909|
|       5.0|$100k and above|22477.911646586344| 0.1163646586345382|
|       1.0|    $40k - $60k|11677.083333333334|0

2. Calculate the average difference between Loan Amount and Funded Amount for each
loan Grade and sort by the grades with the largest differences.


In [157]:
query = """
SELECT
    grade,
    AVG(loan_amount - funded_amount) AS avg_diff
FROM
    fintech
GROUP BY
    grade
ORDER BY
    avg_diff DESC
"""
result_df = spark.sql(query)
result_df.show()

+-----+--------+
|grade|avg_diff|
+-----+--------+
|   26|     0.0|
|   29|     0.0|
|   19|     0.0|
|   22|     0.0|
|    7|     0.0|
|   34|     0.0|
|   32|     0.0|
|   31|     0.0|
|   25|     0.0|
|    6|     0.0|
|    9|     0.0|
|   27|     0.0|
|   17|     0.0|
|   28|     0.0|
|   33|     0.0|
|    5|     0.0|
|    1|     0.0|
|   10|     0.0|
|    3|     0.0|
|   12|     0.0|
+-----+--------+
only showing top 20 rows



In [158]:
result = df.groupBy('grade') \
    .agg(avg(col('loan_amount') - col('funded_amount')).alias('avg_diff')) \
    .orderBy(col('avg_diff').desc())

result.show()

+-----+--------+
|grade|avg_diff|
+-----+--------+
|   26|     0.0|
|   29|     0.0|
|   19|     0.0|
|   22|     0.0|
|    7|     0.0|
|   34|     0.0|
|   32|     0.0|
|   31|     0.0|
|   25|     0.0|
|    6|     0.0|
|    9|     0.0|
|   27|     0.0|
|   17|     0.0|
|   28|     0.0|
|   33|     0.0|
|    5|     0.0|
|    1|     0.0|
|   10|     0.0|
|    3|     0.0|
|   12|     0.0|
+-----+--------+
only showing top 20 rows



3. Compare the total Loan Amount for loans with "Verified" and "Not Verified"
Verification Status across each state (Addr State).


In [159]:
query = """
SELECT
    addr_state,
    SUM(CASE WHEN verification_status = 'Verified' THEN loan_amount ELSE 0 END) AS total_verified,
    SUM(CASE WHEN verification_status = 'Not Verified' THEN loan_amount ELSE 0 END) AS total_not_verified
FROM
    fintech
GROUP BY
    addr_state
"""
result_df = spark.sql(query)
result_df.show()

+----------+--------------+------------------+
|addr_state|total_verified|total_not_verified|
+----------+--------------+------------------+
|        AZ|     2732350.0|         2761125.0|
|        SC|     1385325.0|         1549875.0|
|        LA|     1442725.0|          904850.0|
|        MN|     1642100.0|         2179400.0|
|        NJ|     4472550.0|         4470675.0|
|        DC|      252850.0|          171400.0|
|        OR|     1375500.0|         1742500.0|
|        VA|     3724325.0|         4232675.0|
|        RI|      510675.0|          488400.0|
|        WY|      363600.0|          205075.0|
|        KY|      871800.0|         1207050.0|
|        NH|      480175.0|          788725.0|
|        MI|     2970650.0|         3393675.0|
|        NV|     2077775.0|         2109600.0|
|        WI|     1622475.0|         1823100.0|
|        ID|      271900.0|          256475.0|
|        CA|     1.66511E7|       1.8556975E7|
|        CT|     1624875.0|         2336375.0|
|        NE| 

In [None]:
df_filtered = df.select('addr_state', 'verification_status', 'loan_amount')

df_filtered = df_filtered.withColumn('verified_status',
    when(col('verification_status') == 'Verified', 'Verified')
    .when(col('verification_status') == 'Not Verified', 'Not Verified')
    .otherwise('Other')
)

df_filtered = df_filtered.filter(col('verified_status') != 'Other')

df_grouped = df_filtered.groupBy('addr_state', 'verified_status') \
    .agg(sum('loan_amount').alias('total_loan_amount')) \
    .orderBy('addr_state')

df_grouped.show()


+----------+---------------+-----------------+
|addr_state|verified_status|total_loan_amount|
+----------+---------------+-----------------+
|        AK|   Not Verified|         377800.0|
|        AK|       Verified|         447375.0|
|        AL|   Not Verified|        1529775.0|
|        AL|       Verified|        1464600.0|
|        AR|   Not Verified|         961100.0|
|        AR|       Verified|         799400.0|
|        AZ|   Not Verified|        2761125.0|
|        AZ|       Verified|        2732350.0|
|        CA|   Not Verified|      1.8556975E7|
|        CA|       Verified|        1.66511E7|
|        CO|       Verified|        2388425.0|
|        CO|   Not Verified|        2852300.0|
|        CT|   Not Verified|        2336375.0|
|        CT|       Verified|        1624875.0|
|        DC|       Verified|         252850.0|
|        DC|   Not Verified|         171400.0|
|        DE|       Verified|         345325.0|
|        DE|   Not Verified|         475500.0|
|        FL| 

4. Calculate the average time gap (in days) between consecutive loans for each
grade using the new features you added in the feature engineering phase

In [122]:
query = """
SELECT
    grade,
    AVG(DATEDIFF(issue_date, prev_issue_date_same_grade)) AS avg_time_gap_days
FROM
    fintech
WHERE
    prev_issue_date_same_grade IS NOT NULL
GROUP BY
    grade
"""
result_df = spark.sql(query)
result_df.show()

+-----+------------------+
|grade| avg_time_gap_days|
+-----+------------------+
|    1|2.3186528497409324|
|    2|2.2664389410760033|
|    3|2.4171220400728597|
|    4| 2.228215767634855|
|    5|2.2800687285223367|
|    6|1.6016898008449003|
|    7|1.7222578576010263|
|    8|1.7067524115755628|
|    9|1.7503259452411994|
|   10|1.6656327543424319|
|   11|1.7884097035040432|
|   12|1.6958466453674121|
|   13|1.7769688947716744|
|   14| 1.855563234277816|
|   15| 1.789615643964936|
|   16| 3.325814536340852|
|   17|  3.39386189258312|
|   18| 3.437900128040973|
|   19| 3.446753246753247|
|   20|3.3354037267080745|
+-----+------------------+
only showing top 20 rows



In [123]:
from pyspark.sql.functions import col, datediff, avg

df_filtered = df.filter(col('prev_issue_date_same_grade').isNotNull())

df_with_gap = df_filtered.withColumn(
    'time_gap_days',
    datediff(col('issue_date'), col('prev_issue_date_same_grade'))
)

result = df_with_gap.groupBy('grade').agg(
    avg('time_gap_days').alias('avg_time_gap_days')
)

result.show()


+-----+------------------+
|grade| avg_time_gap_days|
+-----+------------------+
|    1|2.3186528497409324|
|    2|2.2664389410760033|
|    3|2.4171220400728597|
|    4| 2.228215767634855|
|    5|2.2800687285223367|
|    6|1.6016898008449003|
|    7|1.7222578576010263|
|    8|1.7067524115755628|
|    9|1.7503259452411994|
|   10|1.6656327543424319|
|   11|1.7884097035040432|
|   12|1.6958466453674121|
|   13|1.7769688947716744|
|   14| 1.855563234277816|
|   15| 1.789615643964936|
|   16| 3.325814536340852|
|   17|  3.39386189258312|
|   18| 3.437900128040973|
|   19| 3.446753246753247|
|   20|3.3354037267080745|
+-----+------------------+
only showing top 20 rows



5. Identify the average difference in loan amounts between consecutive loans
within the same state and grade combination.

In [124]:
query = """
SELECT
    state,
    grade,
    AVG(loan_amount - prev_loan_amount_same_state_grade) AS avg_diff_in_loan_amounts
FROM
    fintech
WHERE
    prev_loan_amount_same_state_grade IS NOT NULL
GROUP BY
    state,
    grade
"""
result_df = spark.sql(query)
result_df.show()

+-----+-----+------------------------+
|state|grade|avg_diff_in_loan_amounts|
+-----+-----+------------------------+
|   AK|    1|                 -5500.0|
|   AK|    2|      -4383.333333333333|
|   AK|    3|                 26000.0|
|   AK|    4|                  8400.0|
|   AK|    5|                  2387.5|
|   AK|    6|                  1250.0|
|   AK|    7|                  7000.0|
|   AK|    8|                  1937.5|
|   AK|    9|                     0.0|
|   AK|   10|                     0.0|
|   AK|   11|                  -875.0|
|   AK|   12|                 17000.0|
|   AK|   14|     -3466.6666666666665|
|   AK|   15|      -5333.333333333333|
|   AK|   16|                  9475.0|
|   AK|   17|                  2300.0|
|   AK|   19|                   275.0|
|   AK|   20|                 -4100.0|
|   AL|    1|     -1333.3333333333333|
|   AL|    2|      2555.5555555555557|
+-----+-----+------------------------+
only showing top 20 rows



In [125]:
df_filtered = df.filter(col('prev_loan_amount_same_state_grade').isNotNull())

df_with_diff = df_filtered.withColumn(
    'loan_amount_diff',
    col('loan_amount') - col('prev_loan_amount_same_state_grade')
)

result = df_with_diff.groupBy('state', 'grade').agg(
    avg('loan_amount_diff').alias('avg_loan_amount_diff')
)

result.show()


+-----+-----+--------------------+
|state|grade|avg_loan_amount_diff|
+-----+-----+--------------------+
|   AK|    1|             -5500.0|
|   AK|    2|  -4383.333333333333|
|   AK|    3|             26000.0|
|   AK|    4|              8400.0|
|   AK|    5|              2387.5|
|   AK|    6|              1250.0|
|   AK|    7|              7000.0|
|   AK|    8|              1937.5|
|   AK|    9|                 0.0|
|   AK|   10|                 0.0|
|   AK|   11|              -875.0|
|   AK|   12|             17000.0|
|   AK|   14| -3466.6666666666665|
|   AK|   15|  -5333.333333333333|
|   AK|   16|              9475.0|
|   AK|   17|              2300.0|
|   AK|   19|               275.0|
|   AK|   20|             -4100.0|
|   AL|    1| -1333.3333333333333|
|   AL|    2|  2555.5555555555557|
+-----+-----+--------------------+
only showing top 20 rows



### Part 6: Lookup Table & Saving the dataset


In [162]:
df = df.repartition(1)
lookup = lookup.repartition(1)

df.write.parquet("./fintech_spark_52_0812_clean.parquet", mode="overwrite")
lookup.write.parquet("./lookup_spark_52_0812.parquet", mode="overwrite")

In [127]:
from sqlalchemy import create_engine

engine = create_engine('postgresql://root:root@localhost:5432/testdb')

def save_to_db(cleaned, name):
    if(engine.connect()):
        print('Connected to Database')
        try:
            cleaned = cleaned.toPandas()
            print('Writing dataset to database')
            cleaned.to_sql(name, con=engine, if_exists='fail')
            print('Done writing to database')
        except ValueError as vx:
            print('Table already exists.')
        except Exception as ex:
            print(ex)
    else:
        print('Failed to connect to Database')

save_to_db(df,'cleaned_fintech')
save_to_db(lookup,'lookup_data')

Connected to Database
Writing dataset to database
Table already exists.
Connected to Database
Writing dataset to database
Table already exists.


In [128]:
sc.stop()