In [40]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.5'
spark_version = 'spark-3.5.5'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -nc https://jdbc.postgresql.org/download/postgresql-42.7.3.jar
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
import json

# mount the google drive
from google.colab import drive,files
drive.mount('/content/drive')
uploaded = files.upload()

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, lit, row_number
from pyspark.sql.window import Window

findspark.init()

# Create Spark with PostgreSQL driver included
spark = SparkSession.builder \
    .appName("ExportToAWSPostgres") \
    .config("spark.jars", "/content/postgresql-42.7.3.jar") \
    .config("spark.driver.extraClassPath", "/content/postgresql-42.7.3.jar") \
    .getOrCreate()



0% [Working]            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:2 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
File ‘postgresql-42.7.3.jar’ already there;

Saving db_config.json to db_config.json


In [None]:
# for priavacy reason we will not include the database connection details in the code
# Instead, we will load them from a JSON file and use them to connect to the database
# We will upload the json always when we run the code and then we did remove it from the colab when  we
# finish the code. This is a good practice to avoid exposing sensitive information in the code.
# Load config
with open("db_config.json", "r") as f:
    db_config = json.load(f)


In [24]:
#PostgreSQL database adapter for the Python programming language.
import psycopg2

# Connect using the config
conn = psycopg2.connect(
    host=db_config["host"],
    port=db_config["port"],
    database=db_config["database"],
    user=db_config["user"],
    password=db_config["password"]
)

cursor = conn.cursor()
print("Connected!")


Connected!


In [25]:
# Define the file path
file_path = '/content/drive/MyDrive/Project4-Loan_Analysis/data/df_subset.csv'

# Load the CSV file into a Spark DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the data
df.show()

+---------+--------+-----------+-----------+-----------+--------------+--------+----------+---------------+---------+-----------------+--------------+-----------+-----+-----------------+----------+--------------+------------------+-------------------------+-------------+-----------------------+--------------------+-------------------------+-------------------------------+-----------------+-----------------------+-------------------------+-----------------------------+
|member_id|all_util|avg_cur_bal|delinq_amnt|funded_amnt|home_ownership|int_rate|      term|tot_hi_cred_lim|total_acc|total_bal_ex_mort|total_bc_limit|loan_status|grade|merged_annual_inc|merged_dti|merged_purpose|emp_length_grouped|clean_verification_status|delinq_bucket|acc_now_delinq_bucketed|delinq_2yrs_bucketed|accounts_90d_past_due_24m|total_chargeoffs_within_12_mths|open_acc_bucketed|pct_tl_nvr_dlq_bucketed|percent_bc_gt_75_bucketed|pub_rec_bankruptcies_bucketed|
+---------+--------+-----------+-----------+----------

In [26]:
df.printSchema()


root
 |-- member_id: integer (nullable = true)
 |-- all_util: double (nullable = true)
 |-- avg_cur_bal: double (nullable = true)
 |-- delinq_amnt: double (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- term: string (nullable = true)
 |-- tot_hi_cred_lim: double (nullable = true)
 |-- total_acc: double (nullable = true)
 |-- total_bal_ex_mort: double (nullable = true)
 |-- total_bc_limit: double (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- merged_annual_inc: double (nullable = true)
 |-- merged_dti: double (nullable = true)
 |-- merged_purpose: string (nullable = true)
 |-- emp_length_grouped: string (nullable = true)
 |-- clean_verification_status: string (nullable = true)
 |-- delinq_bucket: string (nullable = true)
 |-- acc_now_delinq_bucketed: string (nullable = true)
 |-- delinq_2yrs_bucketed: integer (nullable = true)
 

In [27]:
# Create a dummy window for row_number (no specific order needed)
windowSpec = Window.orderBy(lit(1))

# Add row number starting from 1
df = df.withColumn("row_num", row_number().over(windowSpec))

# Create loan_id by concatenating 'loan' with the row number
df = df.withColumn("loan_id", concat(lit("loan"), df["row_num"].cast("string")))

# Drop the helper column if not needed
df = df.drop("row_num")

# Show result
df.show()


+---------+--------+-----------+-----------+-----------+--------------+--------+----------+---------------+---------+-----------------+--------------+-----------+-----+-----------------+----------+--------------+------------------+-------------------------+-------------+-----------------------+--------------------+-------------------------+-------------------------------+-----------------+-----------------------+-------------------------+-----------------------------+-------+
|member_id|all_util|avg_cur_bal|delinq_amnt|funded_amnt|home_ownership|int_rate|      term|tot_hi_cred_lim|total_acc|total_bal_ex_mort|total_bc_limit|loan_status|grade|merged_annual_inc|merged_dti|merged_purpose|emp_length_grouped|clean_verification_status|delinq_bucket|acc_now_delinq_bucketed|delinq_2yrs_bucketed|accounts_90d_past_due_24m|total_chargeoffs_within_12_mths|open_acc_bucketed|pct_tl_nvr_dlq_bucketed|percent_bc_gt_75_bucketed|pub_rec_bankruptcies_bucketed|loan_id|
+---------+--------+-----------+------

In [None]:
df.printSchema()

root
 |-- member_id: integer (nullable = true)
 |-- all_util: double (nullable = true)
 |-- avg_cur_bal: double (nullable = true)
 |-- delinq_amnt: double (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- term: string (nullable = true)
 |-- tot_hi_cred_lim: double (nullable = true)
 |-- total_acc: double (nullable = true)
 |-- total_bal_ex_mort: double (nullable = true)
 |-- total_bc_limit: double (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- merged_annual_inc: double (nullable = true)
 |-- merged_dti: double (nullable = true)
 |-- merged_purpose: string (nullable = true)
 |-- emp_length_grouped: string (nullable = true)
 |-- clean_verification_status: string (nullable = true)
 |-- delinq_bucket: string (nullable = true)
 |-- acc_now_delinq_bucketed: string (nullable = true)
 |-- delinq_2yrs_bucketed: integer (nullable = true)
 

In [28]:
# Creating Dataframes
loan_info = df.select("loan_id", # Primary Key
                      "member_id", # Foreign key
                      "funded_amnt",
                      "term",
                      "int_rate",
                      "grade",
                      "loan_status")
loan_info.show()


+-------+---------+-----------+----------+--------+-----+-----------+
|loan_id|member_id|funded_amnt|      term|int_rate|grade|loan_status|
+-------+---------+-----------+----------+--------+-----+-----------+
|  loan1|        1|       2500| 36 months|   13.56|    C|    Current|
|  loan2|        2|      30000| 60 months|   18.94|    D|    Current|
|  loan3|        3|       5000| 36 months|   17.97|    D|    Current|
|  loan4|        4|       4000| 36 months|   18.94|    D|    Current|
|  loan5|        5|      30000| 60 months|   16.14|    C|    Current|
|  loan6|        6|       5550| 36 months|   15.02|    C|    Current|
|  loan7|        7|       2000| 36 months|   17.97|    D|    Current|
|  loan8|        8|       6000| 36 months|   13.56|    C|    Current|
|  loan9|        9|       5000| 36 months|   17.97|    D|    Current|
| loan10|       10|       6000| 36 months|   14.47|    C|    Current|
| loan11|       11|       5500| 36 months|   22.35|    D|    Current|
| loan12|       12| 

In [29]:
borrower_info = df.select(
    "member_id", # Primary key
    "home_ownership",
    "merged_annual_inc",
    "merged_dti",
    "emp_length_grouped",
    "clean_verification_status",
    "merged_purpose"
).dropDuplicates(["member_id"])

borrower_info.show()


+---------+--------------+-----------------+----------+------------------+-------------------------+-----------------+
|member_id|home_ownership|merged_annual_inc|merged_dti|emp_length_grouped|clean_verification_status|   merged_purpose|
+---------+--------------+-----------------+----------+------------------+-------------------------+-----------------+
|        1|          RENT|          55000.0|     18.24|         10+ years|             Not Verified|     Debt-related|
|        3|      MORTGAGE|          59280.0|     10.51|         4-6 years|                 Verified|     Debt-related|
|        5|      MORTGAGE|          57250.0|     26.35|         10+ years|             Not Verified|     Debt-related|
|        6|      MORTGAGE|         152500.0|     37.94|         10+ years|             Not Verified|     Debt-related|
|        9|      MORTGAGE|          53580.0|     21.16|         10+ years|                 Verified|     Debt-related|
|       12|      MORTGAGE|          70000.0|    

In [30]:
credit_history = df.select(
    "loan_id", # Foreign key
    "all_util",
    "avg_cur_bal",
    "tot_hi_cred_lim",
    "total_acc",
    "total_bal_ex_mort",
    "total_bc_limit"
)
credit_history.show()

+-------+--------+-----------+---------------+---------+-----------------+--------------+
|loan_id|all_util|avg_cur_bal|tot_hi_cred_lim|total_acc|total_bal_ex_mort|total_bc_limit|
+-------+--------+-----------+---------------+---------+-----------------+--------------+
|  loan1|    28.0|     1878.0|        60124.0|     34.0|          16901.0|       36500.0|
|  loan2|    57.0|    24763.0|       372872.0|     44.0|          99468.0|       15000.0|
|  loan3|    35.0|    18383.0|       136927.0|     13.0|          11749.0|       13800.0|
|  loan4|    70.0|    30505.0|       385183.0|     13.0|          36151.0|        5000.0|
|  loan5|    54.0|     9667.0|       157548.0|     26.0|          29674.0|        9300.0|
|  loan6|    58.0|    40338.0|       831687.0|     44.0|         185378.0|       65900.0|
|  loan7|   100.0|      854.0|          854.0|      9.0|            854.0|           0.0|
|  loan8|    74.0|     5085.0|       117242.0|     37.0|          91535.0|       33100.0|
|  loan9| 

In [31]:
delinquency_info = df.select(
    "loan_id",
    "delinq_amnt",
    "delinq_bucket",
    "acc_now_delinq_bucketed",
    "delinq_2yrs_bucketed",
    "accounts_90d_past_due_24m",
    "total_chargeoffs_within_12_mths",
    "pub_rec_bankruptcies_bucketed",
    "percent_bc_gt_75_bucketed",
    "pct_tl_nvr_dlq_bucketed",
    "open_acc_bucketed")
delinquency_info.show()

+-------+-----------+-------------+-----------------------+--------------------+-------------------------+-------------------------------+-----------------------------+-------------------------+-----------------------+-----------------+
|loan_id|delinq_amnt|delinq_bucket|acc_now_delinq_bucketed|delinq_2yrs_bucketed|accounts_90d_past_due_24m|total_chargeoffs_within_12_mths|pub_rec_bankruptcies_bucketed|percent_bc_gt_75_bucketed|pct_tl_nvr_dlq_bucketed|open_acc_bucketed|
+-------+-----------+-------------+-----------------------+--------------------+-------------------------+-------------------------------+-----------------------------+-------------------------+-----------------------+-----------------+
|  loan1|        0.0|      Unknown|       No Delinquencies|                   0|                        0|                              0|                            1|                        0|                  100.0|             6-10|
|  loan2|        0.0|   61+ months|       No Delinqu

In [32]:
delinquency_info.printSchema()

root
 |-- loan_id: string (nullable = false)
 |-- delinq_amnt: double (nullable = true)
 |-- delinq_bucket: string (nullable = true)
 |-- acc_now_delinq_bucketed: string (nullable = true)
 |-- delinq_2yrs_bucketed: integer (nullable = true)
 |-- accounts_90d_past_due_24m: integer (nullable = true)
 |-- total_chargeoffs_within_12_mths: integer (nullable = true)
 |-- pub_rec_bankruptcies_bucketed: integer (nullable = true)
 |-- percent_bc_gt_75_bucketed: string (nullable = true)
 |-- pct_tl_nvr_dlq_bucketed: string (nullable = true)
 |-- open_acc_bucketed: string (nullable = true)



In [None]:
loan_info.select("loan_id").distinct().count(), loan_info.count()

(2260668, 2260668)

In [33]:
#JDBC Configuration for PySpark to Connect to AWS PostgreSQL (RDS)
url = "jdbc:postgresql://database-1.cd6smuu00g7x.us-east-2.rds.amazonaws.com:5432/postgres"

properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

In [None]:
# schema_sql = """
# CREATE TABLE IF NOT EXISTS borrower_info (
#     member_id INT PRIMARY KEY,
#     home_ownership VARCHAR(50),
#     merged_annual_inc DOUBLE PRECISION,
#     merged_dti DOUBLE PRECISION,
#     emp_length_grouped VARCHAR(50),
#     clean_verification_status VARCHAR(50),
#     merged_purpose VARCHAR(100)
# );

# CREATE TABLE IF NOT EXISTS loan_info (
#     loan_id VARCHAR(50) PRIMARY KEY,
#     member_id INT REFERENCES borrower_info(member_id),
#     funded_amnt INT,
#     term VARCHAR(20),
#     int_rate DOUBLE PRECISION,
#     grade VARCHAR(5),
#     loan_status VARCHAR(50)
# );

# CREATE TABLE IF NOT EXISTS credit_history (
#     loan_id VARCHAR(50) PRIMARY KEY REFERENCES loan_info(loan_id),
#     all_util DOUBLE PRECISION,
#     avg_cur_bal DOUBLE PRECISION,
#     tot_hi_cred_lim DOUBLE PRECISION,
#     total_acc DOUBLE PRECISION,
#     total_bal_ex_mort DOUBLE PRECISION,
#     total_bc_limit DOUBLE PRECISION
# );

# CREATE TABLE IF NOT EXISTS delinquency_info (
#     loan_id VARCHAR(50) PRIMARY KEY REFERENCES loan_info(loan_id),
#     delinq_amnt DOUBLE PRECISION,
#     delinq_bucket VARCHAR(50),
#     acc_now_delinq_bucketed VARCHAR(50),
#     delinq_2yrs_bucketed INT,
#     accounts_90d_past_due_24m INT,
#     total_chargeoffs_within_12_mths INT,
#     pub_rec_bankruptcies_bucketed INT,
#     pct_tl_nvr_dlq_bucketed,
#     percent_bc_gt_75_bucketed,
#     pub_rec_bankruptcies_bucketed
# );
# """

# cursor.execute(schema_sql)
# conn.commit()
# # closed at bottom
# # closed at bottom


In [34]:
cursor.execute("""
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema = 'public';
""")

tables = cursor.fetchall()
for table in tables:
    print(table[0])


loan_info
borrower_info
credit_history
delinquency_info


In [None]:
# # Use append to avoid table-exists error
# borrower_info.repartition(1).write.jdbc(
#     url=url,
#     table="borrower_info",
#     mode="append",
#     properties=properties
# )


In [None]:
# loan_info.repartition(1).write.jdbc(
#                     url=url,
#                     table="loan_info",
#                     mode="append",
#                     properties=properties)

In [None]:
# credit_history.repartition(1).write.jdbc(
#                     url=url,
#                     table="credit_history",
#                     mode="append",
#                     properties=properties)

In [None]:
# delinquency_info.repartition(1).write.jdbc(
#                     url=url,
#                     table="delinquency_info",
#                     mode="append",
#                     properties=properties)

In [35]:
cursor.execute("SELECT COUNT(*) FROM delinquency_info;")
count = cursor.fetchone()[0]
print(f"Rows in borrower_info: {count}")

Rows in borrower_info: 2260668


In [36]:
# Close database connection
try:
    cursor.close()
    conn.close()
    print("PostgreSQL connection closed.")
except:
    print("PostgreSQL connection was already closed or not established.")

# Stop Spark session
try:
    spark.stop()
    print("Spark session stopped.")
except:
    print("ℹSpark session was already stopped or not initialized.")


PostgreSQL connection closed.
Spark session stopped.


In [42]:
import os
os.remove("db_config.json")