In [None]:
pip install pyspark



In [None]:
# Importing required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, lag, avg, expr, desc
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.types import IntegerType
import os


In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Milestone3") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

In [None]:
from google.colab import files
uploaded = files.upload()

# Get the path of the uploaded file
dataset_path = list(uploaded.keys())[0]
print(f"Dataset uploaded: {dataset_path}")

Saving fintech_data_32_52_22725.parquet to fintech_data_32_52_22725.parquet
Dataset uploaded: fintech_data_32_52_22725.parquet


In [None]:
# 4.1 Loading the Dataset
df = spark.read.parquet(dataset_path)
print(f"Initial number of partitions: {df.rdd.getNumPartitions()}")
df = df.repartition(4)  # Adjust number of partitions to logical cores (saw from the Task Manager)
print(f"Updated number of partitions: {df.rdd.getNumPartitions()}")
df.show(20)

Initial number of partitions: 1
Updated number of partitions: 4
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+-----------------

In [None]:
# 4.2 Cleaning the Dataset

# Rename columns
df = df.select([col(c).alias(c.replace(" ", "_").lower()) for c in df.columns])

df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceDEyXHgwYk00X...|Medical support a.

In [None]:
# Detect missing values
def detect_missing(df):
    total_rows = df.count()
    missing_info = {col: (df.filter(df[col].isNull()).count() / total_rows) * 100 for col in df.columns}
    return missing_info

In [None]:
missing_info = detect_missing(df)
print("Missing Information:")
for col, missing_pct in missing_info.items():
    print(f"Column: {col}, Missing Percentage: {missing_pct:.2f}%")


Missing Information:
Column: customer_id, Missing Percentage: 0.00%
Column: emp_title, Missing Percentage: 8.82%
Column: emp_length, Missing Percentage: 6.99%
Column: home_ownership, Missing Percentage: 0.00%
Column: annual_inc, Missing Percentage: 0.00%
Column: annual_inc_joint, Missing Percentage: 92.94%
Column: verification_status, Missing Percentage: 0.00%
Column: zip_code, Missing Percentage: 0.00%
Column: addr_state, Missing Percentage: 0.00%
Column: avg_cur_bal, Missing Percentage: 0.00%
Column: tot_cur_bal, Missing Percentage: 0.00%
Column: loan_id, Missing Percentage: 0.00%
Column: loan_status, Missing Percentage: 0.00%
Column: loan_amount, Missing Percentage: 0.00%
Column: state, Missing Percentage: 0.00%
Column: funded_amount, Missing Percentage: 0.00%
Column: term, Missing Percentage: 0.00%
Column: int_rate, Missing Percentage: 4.61%
Column: grade, Missing Percentage: 0.00%
Column: issue_date, Missing Percentage: 0.00%
Column: pymnt_plan, Missing Percentage: 0.00%
Column: t

In [None]:
# Handling the missing values
for column, dtype in df.dtypes:
    if dtype == "string":
        # Get the mode value, with a fallback for empty columns
        mode_value_row = df.groupBy(column).count().orderBy("count", ascending=False).first()

        # If mode_value_row is None (empty column), assign mode as an empty string
        if mode_value_row is not None:
            mode_value = mode_value_row[0]
        else:
            mode_value = ""  # Fallback in case the column has only missing values

        # Ensure mode_value is not None before calling fillna
        if mode_value is not None:
            df = df.fillna({column: mode_value})
        else:
            print(f"Warning: Column '{column}' has no mode value. Skipping fillna.")
    else:
        # For numeric columns, fill missing values with 0
        df = df.fillna({column: 0})





In [None]:
from pyspark.sql.functions import col  # Make sure to import col explicitly

# emp title
mode_title = df.groupBy('emp_title').count().orderBy(col('count').desc()).limit(2) # Use the imported col function
mode_title.show()
mode_title = mode_title.select('emp_title').collect()[1][0]
print(mode_title)
df = df.fillna(value = mode_title,subset=['emp_title'])
df.filter(df.emp_title.isNull()).count()

+---------+-----+
|emp_title|count|
+---------+-----+
|     NULL| 2383|
|  Teacher|  467|
+---------+-----+

Teacher


0

In [None]:
# Verify missing values are handled
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+---------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----+--------+-----+----------+----------+----+-------+-----------+
|customer_id|emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|term|int_rate|grade|issue_date|pymnt_plan|type|purpose|description|
+-----------+---------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----+--------+-----+----------+----------+----+-------+-----------+
|          0|        0|         0|             0|         0|               0|                  0|       0|         0|          0|          0|      0|          0|          0|    0|            0|   0|       0|    0|    

In [None]:
def one_hot_encoding(df,col_name):
    # Get the unique categories in the column
    unique_values = df.select(col_name).distinct().rdd.flatMap(lambda x: x).collect()

    # Create a new column for each unique value in the column
    for value in unique_values:
        new_col_name = f"{col_name}_{value}"
        df = df.withColumn(new_col_name, F.when(F.col(col_name) == value, 1).otherwise(0))

    #df = df.drop(col_name)
    return df

In [None]:
from pyspark.ml.feature import StringIndexer

def label_encoding(df, col_name):
    # Create a StringIndexer
    indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_encoded")

    # Fit the indexer to the data and transform the dataframe
    indexer_model = indexer.fit(df)
    df_encoded = indexer_model.transform(df)

    # Convert the encoded column to an integer type to avoid float encoding
    df_encoded = df_encoded.withColumn(f"{col_name}_encoded", df_encoded[f"{col_name}_encoded"].cast("int"))

    # Drop the original column if you don't need it anymore
    # df_encoded = df_encoded.drop(col_name)

    # Extract the mapping from StringIndexerModel
    mapping = indexer_model.labels  # This will give the categories in the order they are encoded

    # Create the mapping table as a DataFrame
    mapping_table = [(label, idx) for idx, label in enumerate(mapping)]
    mapping_df = spark.createDataFrame(mapping_table, [f"{col_name}_category", f"{col_name}_encoded_value"])

    return df_encoded, mapping_df


    # Create a OneHotEncoder
    # encoder = OneHotEncoder(inputCols=[f"{col_name}_encoded"], outputCols=[f"{col_name}_onehot"])
    # df = indexer.fit(df).transform(df)


In [None]:
# Use the label encoding
df_encoded, mapping_df1 = label_encoding(df, "emp_length")
df_encoded, mapping_df2 = label_encoding(df_encoded, "state")
df_encoded, mapping_df3 = label_encoding(df_encoded, "purpose")

# Combine the mapping DataFrames into one
mapping_df = mapping_df1.union(mapping_df2).union(mapping_df3)

# Rename the columns in the final combined DataFrame
mapping_df = mapping_df.withColumnRenamed("home_ownership_category", "original_value") \
                                     .withColumnRenamed("home_ownership_encoded_value", "encoded_value")

# Show the final combined mapping DataFrame
mapping_df.show()


+-------------------+------------------------+
|emp_length_category|emp_length_encoded_value|
+-------------------+------------------------+
|          10+ years|                       0|
|           < 1 year|                       1|
|            2 years|                       2|
|            3 years|                       3|
|             1 year|                       4|
|            5 years|                       5|
|            4 years|                       6|
|            6 years|                       7|
|            8 years|                       8|
|            7 years|                       9|
|            9 years|                      10|
|                 CA|                       0|
|                 TX|                       1|
|                 NY|                       2|
|                 FL|                       3|
|                 IL|                       4|
|                 NJ|                       5|
|                 OH|                       6|
|            

In [None]:
df_encoded.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+------------------+-------------+---------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|emp_length_encoded|state_encoded|purpose_encoded|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+-----

In [None]:
# Use the one hot encoding
df_encoded = one_hot_encoding(df_encoded, "home_ownership")
df_encoded = one_hot_encoding(df_encoded, "verification_status")
df_encoded = one_hot_encoding(df_encoded, "term")

In [None]:
df_encoded.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+------------------+-------------+---------------+------------------+-----------------------+-------------------+------------------+----------------------------+-----------------------------------+--------------------------------+---------------+---------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|emp_length_encoded|state_encoded|purpose_encoded|home_ownership_OWN|home_ownership_MORTGAGE|home_own

In [None]:
# Discretize Grade to only letters
df_encoded = df_encoded.withColumn("grade", expr("substring(grade, 1, 1)"))

In [None]:
df_encoded.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+------------------+-------------+---------------+------------------+-----------------------+-------------------+------------------+----------------------------+-----------------------------------+--------------------------------+---------------+---------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|emp_length_encoded|state_encoded|purpose_encoded|home_ownership_OWN|home_ownership_MORTGAGE|home_own

In [None]:
# 4.4 Feature Engineering

# Define windows for calculations
window_grade = Window.partitionBy("grade").orderBy("issue_date")
window_state_grade = Window.partitionBy("addr_state", "grade").orderBy("issue_date")

# Add features
df_encoded = df_encoded.withColumn("prev_loan_date_same_grade", lag("issue_date", 1).over(window_grade))
df_encoded = df_encoded.withColumn("prev_loan_amount_same_grade", lag("loan_amount", 1).over(window_grade))
df_encoded = df_encoded.withColumn("prev_loan_date_same_state_grade", lag("issue_date", 1).over(window_state_grade))
df_encoded = df_encoded.withColumn("prev_loan_amount_same_state_grade", lag("loan_amount", 1).over(window_state_grade))

df_encoded.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+------------------+-------------+---------------+------------------+-----------------------+-------------------+------------------+----------------------------+-----------------------------------+--------------------------------+---------------+---------------+-------------------------+---------------------------+-------------------------------+---------------------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|      issue_date|pymnt_plan|      type|           purpose|   

In [None]:
# To convert data columns from string to date format
from pyspark.sql.functions import to_date
df_encoded = df_encoded.withColumn("issue_date", to_date("issue_date", "d MMMM yyyy")) \
                                   .withColumn("prev_loan_date_same_grade", to_date("prev_loan_date_same_grade", "d MMMM yyyy"))\
                                   .withColumn("prev_loan_date_same_state_grade", to_date("prev_loan_date_same_state_grade", "d MMMM yyyy"))

In [None]:
# Create a temporary SQL view
df_encoded.createOrReplaceTempView("loan_data")

### 4.5.1 Average Loan Amount and Interest Rate for "Default" Loans by Emp Length and Income Ranges
# SQL Query
query1 = """
SELECT emp_length,
       CASE
           WHEN annual_inc < 40000 THEN 'Low'
           WHEN annual_inc BETWEEN 40000 AND 80000 THEN 'Medium'
           ELSE 'High'
       END AS income_range,
       AVG(loan_amount) AS avg_loan_amount,
       AVG(int_rate) AS avg_interest_rate
FROM loan_data
WHERE loan_status = 'Current'
GROUP BY emp_length, income_range
"""
sql_result1 = spark.sql(query1)
sql_result1.show()



+----------+------------+------------------+-------------------+
|emp_length|income_range|   avg_loan_amount|  avg_interest_rate|
+----------+------------+------------------+-------------------+
|  < 1 year|        High| 21292.85714285714|0.11409457504520797|
|  < 1 year|      Medium|14542.793367346938|0.12136454081632658|
|   3 years|        High|20623.275862068964|0.11100000000000003|
|   7 years|      Medium|14080.877483443708| 0.1286278145695364|
|   6 years|        High|22403.212851405624|0.11505020080321282|
|   9 years|      Medium|13750.086505190311| 0.1239719723183391|
|   5 years|      Medium|13371.512770137524|0.13101886051080552|
|   6 years|      Medium|14383.701657458563|0.12888314917127078|
|    1 year|        High| 21542.50700280112|0.11958935574229691|
| 10+ years|      Medium| 14449.71368294153|0.12425780590717299|
|   5 years|        High|19831.936813186814|0.11540082417582417|
|   5 years|         Low|10416.313559322034|0.13538050847457628|
|   9 years|         Low|

In [None]:
mode = df_encoded.groupBy('loan_status').count().orderBy(col('count').desc())
mode.show()

+------------------+-----+
|       loan_status|count|
+------------------+-----+
|           Current|16977|
|        Fully Paid| 7692|
|       Charged Off| 1758|
|Late (31-120 days)|  340|
|   In Grace Period|  168|
| Late (16-30 days)|   95|
+------------------+-----+



In [None]:
# PySpark Equivalent
df_spark_result1 = df_encoded.filter(F.col("loan_status") == "Current") \
    .withColumn("income_range", F.when(F.col("annual_inc") < 40000, "Low")
                .when((F.col("annual_inc") >= 40000) & (F.col("annual_inc") <= 80000), "Medium")
                .otherwise("High")) \
    .groupBy("emp_length", "income_range") \
    .agg(F.avg("loan_amount").alias("avg_loan_amount"),
         F.avg("int_rate").alias("avg_interest_rate"))
df_spark_result1.show()

+----------+------------+------------------+-------------------+
|emp_length|income_range|   avg_loan_amount|  avg_interest_rate|
+----------+------------+------------------+-------------------+
|  < 1 year|        High| 21292.85714285714|0.11409457504520797|
|  < 1 year|      Medium|14542.793367346938|0.12136454081632658|
|   3 years|        High|20623.275862068964|0.11100000000000003|
|   7 years|      Medium|14080.877483443708| 0.1286278145695364|
|   6 years|        High|22403.212851405624|0.11505020080321282|
|   9 years|      Medium|13750.086505190311| 0.1239719723183391|
|   5 years|      Medium|13371.512770137524|0.13101886051080552|
|   6 years|      Medium|14383.701657458563|0.12888314917127078|
|    1 year|        High| 21542.50700280112|0.11958935574229691|
| 10+ years|      Medium| 14449.71368294153|0.12425780590717299|
|   5 years|        High|19831.936813186814|0.11540082417582417|
|   5 years|         Low|10416.313559322034|0.13538050847457628|
|   9 years|         Low|

In [None]:
### 4.5.2 Average Difference Between Loan Amount and Funded Amount for Each Grade
# SQL Query
query2 = """
SELECT grade,
       AVG(loan_amount - funded_amount) AS avg_difference
FROM loan_data
GROUP BY grade
ORDER BY avg_difference DESC
"""
sql_result2 = spark.sql(query2)
sql_result2.show()

# PySpark Equivalent
df_spark_result2 = df.groupBy("grade") \
    .agg(F.avg(F.col("loan_amount") - F.col("funded_amount")).alias("avg_difference")) \
    .orderBy(F.col("avg_difference").desc())
df_spark_result2.show()

+-----+--------------+
|grade|avg_difference|
+-----+--------------+
|    1|           0.0|
|    2|           0.0|
|    5|           0.0|
|    8|           0.0|
|    4|           0.0|
|    3|           0.0|
|    9|           0.0|
|    7|           0.0|
|    6|           0.0|
+-----+--------------+

+-----+--------------+
|grade|avg_difference|
+-----+--------------+
|    8|           0.0|
|    4|           0.0|
|   23|           0.0|
|   15|           0.0|
|   25|           0.0|
|   19|           0.0|
|   12|           0.0|
|    2|           0.0|
|   29|           0.0|
|    5|           0.0|
|   30|           0.0|
|   26|           0.0|
|   34|           0.0|
|   28|           0.0|
|   33|           0.0|
|   31|           0.0|
|   11|           0.0|
|    9|           0.0|
|    1|           0.0|
|    6|           0.0|
+-----+--------------+
only showing top 20 rows



In [None]:
### 4.5.3 Total Loan Amount for Verified vs Not Verified Across Each State
# SQL Query
query3 = """
SELECT addr_state,
       verification_status,
       SUM(loan_amount) AS total_loan_amount
FROM loan_data
GROUP BY addr_state, verification_status
"""
sql_result3 = spark.sql(query3)
sql_result3.show()

# PySpark Equivalent
df_spark_result3 = df.groupBy("addr_state", "verification_status") \
    .agg(F.sum("loan_amount").alias("total_loan_amount"))
df_spark_result3.show()

+----------+-------------------+-----------------+
|addr_state|verification_status|total_loan_amount|
+----------+-------------------+-----------------+
|        FL|           Verified|        8182775.0|
|        KS|    Source Verified|        1478100.0|
|        NY|    Source Verified|      1.2939025E7|
|        FL|       Not Verified|        9621875.0|
|        WI|           Verified|         956575.0|
|        IL|    Source Verified|        6437925.0|
|        MI|    Source Verified|        3715650.0|
|        AZ|    Source Verified|        4280700.0|
|        MS|    Source Verified|         880550.0|
|        AR|    Source Verified|        1058400.0|
|        VT|    Source Verified|         326725.0|
|        PA|           Verified|        3855500.0|
|        UT|    Source Verified|        1038500.0|
|        IN|           Verified|        1879200.0|
|        VA|           Verified|        3593675.0|
|        MD|       Not Verified|        2961800.0|
|        IN|       Not Verified

In [None]:
### 4.5.4 Average Time Gap (in Days) Between Consecutive Loans for Each Grade
# SQL Query
query4 = """
SELECT grade,
       AVG(DATEDIFF(issue_date, prev_loan_date_same_grade)) AS avg_time_gap
FROM loan_data
WHERE prev_loan_date_same_grade IS NOT NULL
GROUP BY grade
"""
sql_result4 = spark.sql(query4)
sql_result4.show()

# PySpark Equivalent
df_spark_result4 = df_encoded.filter(F.col("prev_loan_date_same_grade").isNotNull()) \
    .groupBy("grade") \
    .agg(F.avg(F.datediff("issue_date", "prev_loan_date_same_grade")).alias("avg_time_gap"))
df_spark_result4.show()

+-----+-------------------+
|grade|       avg_time_gap|
+-----+-------------------+
|    1|0.19343773303504846|
|    2| 0.7195561719833564|
|    3| 1.7758620689655173|
|    4|  2.201246660730187|
|    5| 2.1367380560131797|
|    6|  1.628374136848713|
|    7| 1.6800518134715026|
|    8| 1.5775366943203575|
|    9| 1.5715193897012079|
+-----+-------------------+

+-----+-------------------+
|grade|       avg_time_gap|
+-----+-------------------+
|    1|0.19343773303504846|
|    2| 0.7195561719833564|
|    3| 1.7758620689655173|
|    4|  2.201246660730187|
|    5| 2.1367380560131797|
|    6|  1.628374136848713|
|    7| 1.6800518134715026|
|    8| 1.5775366943203575|
|    9| 1.5715193897012079|
+-----+-------------------+



In [None]:
### 4.5.5 Average Difference in Loan Amounts Between Consecutive Loans Within Same State and Grade
# SQL Query
query5 = """
SELECT addr_state, grade,
       AVG(loan_amount - prev_loan_amount_same_state_grade) AS avg_amount_difference
FROM loan_data
WHERE prev_loan_amount_same_state_grade IS NOT NULL
GROUP BY addr_state, grade
"""
sql_result5 = spark.sql(query5)
sql_result5.show()

# PySpark Equivalent
df_spark_result5 = df_encoded.filter(F.col("prev_loan_amount_same_state_grade").isNotNull()) \
    .groupBy("addr_state", "grade") \
    .agg(F.avg(F.col("loan_amount") - F.col("prev_loan_amount_same_state_grade")).alias("avg_amount_difference"))
df_spark_result5.show()

+----------+-----+---------------------+
|addr_state|grade|avg_amount_difference|
+----------+-----+---------------------+
|        AK|    1|                625.0|
|        AK|    2|              -1600.0|
|        AK|    3|               5762.5|
|        AK|    4|               6000.0|
|        AK|    5|               5000.0|
|        AK|    6|               -630.0|
|        AK|    8|               5760.0|
|        AK|    9|              -1050.0|
|        AL|    1|   51.470588235294116|
|        AL|    2|   -42.72727272727273|
|        AL|    3|   194.11764705882354|
|        AL|    4|   1120.8333333333333|
|        AL|    5|   -266.6666666666667|
|        AL|    6|    917.6470588235294|
|        AL|    7|    388.8888888888889|
|        AL|    8|   41.666666666666664|
|        AL|    9|               -562.5|
|        AR|    1|   -136.4795918367347|
|        AR|    2|   1294.1176470588234|
|        AR|    3|   -333.3333333333333|
+----------+-----+---------------------+
only showing top

In [None]:
# Here before saving i should remove the columns of the one hot encoding and label encoding
# the 6 columns.
columns_to_drop = ["emp_length", "state", "home_ownership", "verification_status", "type", "purpose"]
for col_name in columns_to_drop:
    if col_name in df_encoded.columns:
       df_encoded = df_encoded.drop(col_name)
       print(f"Column '{col_name}' dropped successfully.")
    else:
       print(f"Column '{col_name}' not found in the DataFrame.")


Column 'emp_length' dropped successfully.
Column 'state' dropped successfully.
Column 'home_ownership' dropped successfully.
Column 'verification_status' dropped successfully.
Column 'type' dropped successfully.
Column 'purpose' dropped successfully.


In [None]:
# 4.6 Lookup Table and Saving
# from pyspark.sql.types import StructField, StringType, IntegerType, StructType

# Save the mapping _df
mapping_df.write.parquet("/content/drive/MyDrive/DEW24 Youssef Hamed 52-22725 P2 Networks/Milestone 3/lookup_spark_52_22725.parquet")

# Save cleaned dataset
df_encoded.write.parquet("/content/drive/MyDrive/DEW24 Youssef Hamed 52-22725 P2 Networks/Milestone 3/fintech_spark_52_22725_clean.parquet")