In [29]:
import pandas as pd
from pyspark.sql import SparkSession
import mysql.connector

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("CreditCardApprovalPipeline") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Load Application Data from GCS
gcs_bucket = "dataproc-staging-us-central1-458263062208-tw36mmqt"
gcs_data_path = f"gs://{gcs_bucket}/notebooks/jupyter/Data/loan_prediction_data/application_record.csv"

application_df = spark.read.option("header", "true").option("inferSchema", "true").csv(gcs_data_path)

# Load Credit Record Data from MySQL using Pandas
mysql_config = {
    "host": "34.46.229.160",
    "user": "root",
    "password": "b)p[/&GH-o|B]6u+",
    "database": "loan_data",
    "table": "credit_record_final"
}

mysql_conn = mysql.connector.connect(
    host=mysql_config["host"],
    user=mysql_config["user"],
    password=mysql_config["password"],
    database=mysql_config["database"]
)

query = f"SELECT * FROM {mysql_config['table']}"
pandas_df = pd.read_sql(query, mysql_conn)
mysql_conn.close()

credit_df = spark.createDataFrame(pandas_df)

# Data Exploration
application_df.printSchema()
credit_df.printSchema()

application_df.show(5)
credit_df.show(5)

print(f"Application Records: {application_df.count()}")
print(f"Credit Records: {credit_df.count()}")

application_df.describe().show()
credit_df.describe().show()


25/03/21 18:03:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- ID: integer (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- FLAG_MOBIL: integer (nullable = true)
 |-- FLAG_WORK_PHONE: integer (nullable = true)
 |-- FLAG_PHONE: integer (nullable = true)
 |-- FLAG_EMAIL: integer (nullable = true)
 |-- OCCUPATION_TYPE: string (nullable = true)
 |-- CNT_FAM_MEMBERS: double (nullable = true)

root
 |-- ID: string (nullable = true)
 |-- MONTHS_BALANCE: long (nullable = true)
 |-- STATUS: string (nullable = true)

+-------+-----------+------------+---------------+--

25/03/21 18:03:56 WARN TaskSetManager: Stage 21 contains a task of very large size (9759 KiB). The maximum recommended task size is 1000 KiB.


+-------+--------------+------+
|     ID|MONTHS_BALANCE|STATUS|
+-------+--------------+------+
|5001711|            -3|     0|
|5001711|            -2|     0|
|5001711|            -1|     0|
|5001711|             0|     X|
|5001712|           -18|     0|
+-------+--------------+------+
only showing top 5 rows



25/03/21 18:03:58 WARN TaskSetManager: Stage 25 contains a task of very large size (9759 KiB). The maximum recommended task size is 1000 KiB.


Application Records: 438557


                                                                                

Credit Records: 1048575


                                                                                

+-------+-----------------+-----------+------------+---------------+-------------------+------------------+--------------------+--------------------+------------------+-----------------+-------------------+-----------------+----------+------------------+-------------------+-------------------+--------------------+-----------------+
|summary|               ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|       CNT_CHILDREN|  AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|         DAYS_BIRTH|    DAYS_EMPLOYED|FLAG_MOBIL|   FLAG_WORK_PHONE|         FLAG_PHONE|         FLAG_EMAIL|     OCCUPATION_TYPE|  CNT_FAM_MEMBERS|
+-------+-----------------+-----------+------------+---------------+-------------------+------------------+--------------------+--------------------+------------------+-----------------+-------------------+-----------------+----------+------------------+-------------------+-------------------+--------------------+-----------------

25/03/21 18:04:40 WARN TaskSetManager: Stage 31 contains a task of very large size (9759 KiB). The maximum recommended task size is 1000 KiB.
[Stage 31:>                                                         (0 + 2) / 2]

+-------+-----------------+-------------------+-------------------+
|summary|               ID|     MONTHS_BALANCE|             STATUS|
+-------+-----------------+-------------------+-------------------+
|  count|          1048575|            1048575|            1048575|
|   mean|5068286.424673486|-19.136998307226474|0.05824863961501482|
| stddev|46150.57850528666| 14.023497688326547| 0.3949878642052498|
|    min|          5001711|                -60|                  0|
|    max|          5150487|                  0|                  X|
+-------+-----------------+-------------------+-------------------+



                                                                                

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace, trim, upper, lower, round, datediff, current_date
import pandas as pd
import mysql.connector

# ==============================
# 1. SETUP SPARK SESSION
# ==============================

spark = SparkSession.builder \
    .appName("CreditCardApproval_Cleaning") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# ==============================
# 2. LOAD DATA FROM GCS & MYSQL
# ==============================

# GCS Bucket Path
gcs_bucket = "dataproc-staging-us-central1-458263062208-tw36mmqt"
gcs_data_path = f"gs://{gcs_bucket}/notebooks/jupyter/Data/loan_prediction_data/application_record.csv"

# Read CSV from GCS
application_df = spark.read.option("header", "true").option("inferSchema", "true").csv(gcs_data_path)

# Load Credit Record Data from MySQL
mysql_config = {
    "host": "34.46.229.160",
    "user": "root",
    "password": "b)p[/&GH-o|B]6u+",
    "database": "loan_data",
    "table": "credit_record_final"
}

mysql_conn = mysql.connector.connect(
    host=mysql_config["host"],
    user=mysql_config["user"],
    password=mysql_config["password"],
    database=mysql_config["database"]
)

query = f"SELECT * FROM {mysql_config['table']}"
pandas_df = pd.read_sql(query, mysql_conn)
mysql_conn.close()

credit_df = spark.createDataFrame(pandas_df)

# ==============================
# 3. HANDLING MISSING & INCONSISTENT DATA
# ==============================

# Drop completely empty rows
application_df = application_df.dropna(how="all")
credit_df = credit_df.dropna(how="all")

# Fill missing values
application_df = application_df.fillna({"OCCUPATION_TYPE": "Unknown"})
credit_df = credit_df.fillna({"STATUS": "X"})

# Handling incorrect data (negative days should be converted to positive)
application_df = application_df.withColumn("DAYS_BIRTH", col("DAYS_BIRTH") * -1)
application_df = application_df.withColumn("DAYS_EMPLOYED", when(col("DAYS_EMPLOYED") < 0, col("DAYS_EMPLOYED") * -1).otherwise(col("DAYS_EMPLOYED")))

# ==============================
# 4. STANDARDIZING DATA FORMATS
# ==============================

# Trim whitespace
application_df = application_df.select([trim(col(c)).alias(c) for c in application_df.columns])

# Standardize categorical values
application_df = application_df.withColumn("CODE_GENDER", upper(col("CODE_GENDER")))
application_df = application_df.withColumn("NAME_FAMILY_STATUS", lower(col("NAME_FAMILY_STATUS")))

# Convert amount fields to float & round to 2 decimals
application_df = application_df.withColumn("AMT_INCOME_TOTAL", round(col("AMT_INCOME_TOTAL"), 2))

# ==============================
# 5. CREATING NEW FEATURES
# ==============================

# Age in years
application_df = application_df.withColumn("AGE", round(col("DAYS_BIRTH") / 365, 0))

# Employment length in years
application_df = application_df.withColumn("EMPLOYMENT_YEARS", round(col("DAYS_EMPLOYED") / 365, 0))

# Customer's credit history length

# ==============================
# 6. SAVING CLEANED DATA
# ==============================

cleaned_gcs_path = f"gs://{gcs_bucket}/notebooks/jupyter/Data/cleaned_data/"
application_df.write.mode("overwrite").parquet(cleaned_gcs_path + "application_record_cleaned.parquet")
credit_df.write.mode("overwrite").parquet(cleaned_gcs_path + "credit_record_cleaned.parquet")

# ==============================
# 7. DATA EXPLORATION POST CLEANING
# ==============================

application_df.printSchema()
credit_df.printSchema()

application_df.show(5)
credit_df.show(5)

print(f"Cleaned Application Records: {application_df.count()}")
print(f"Cleaned Credit Records: {credit_df.count()}")

application_df.describe().show()
credit_df.describe().show()


25/03/21 18:09:41 WARN TaskSetManager: Stage 39 contains a task of very large size (9759 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

root
 |-- ID: string (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: string (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- DAYS_BIRTH: string (nullable = true)
 |-- DAYS_EMPLOYED: string (nullable = true)
 |-- FLAG_MOBIL: string (nullable = true)
 |-- FLAG_WORK_PHONE: string (nullable = true)
 |-- FLAG_PHONE: string (nullable = true)
 |-- FLAG_EMAIL: string (nullable = true)
 |-- OCCUPATION_TYPE: string (nullable = false)
 |-- CNT_FAM_MEMBERS: string (nullable = true)
 |-- AGE: double (nullable = true)
 |-- EMPLOYMENT_YEARS: double (nullable = true)

root
 |-- ID: string (nullable = true)
 |-- MONTHS_BALANCE: long (nullable = true)
 |-- STATUS: st

25/03/21 18:09:44 WARN TaskSetManager: Stage 41 contains a task of very large size (9759 KiB). The maximum recommended task size is 1000 KiB.


+-------+--------------+------+
|     ID|MONTHS_BALANCE|STATUS|
+-------+--------------+------+
|5001711|            -3|     0|
|5001711|            -2|     0|
|5001711|            -1|     0|
|5001711|             0|     X|
|5001712|           -18|     0|
+-------+--------------+------+
only showing top 5 rows



25/03/21 18:09:45 WARN TaskSetManager: Stage 45 contains a task of very large size (9759 KiB). The maximum recommended task size is 1000 KiB.


Cleaned Application Records: 438557


                                                                                

Cleaned Credit Records: 1048575


25/03/21 18:10:29 WARN TaskSetManager: Stage 51 contains a task of very large size (9759 KiB). The maximum recommended task size is 1000 KiB.


+-------+-----------------+-----------+------------+---------------+-------------------+------------------+--------------------+--------------------+------------------+-----------------+------------------+------------------+----------+------------------+-------------------+-------------------+--------------------+-----------------+------------------+------------------+
|summary|               ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|       CNT_CHILDREN|  AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|        DAYS_BIRTH|     DAYS_EMPLOYED|FLAG_MOBIL|   FLAG_WORK_PHONE|         FLAG_PHONE|         FLAG_EMAIL|     OCCUPATION_TYPE|  CNT_FAM_MEMBERS|               AGE|  EMPLOYMENT_YEARS|
+-------+-----------------+-----------+------------+---------------+-------------------+------------------+--------------------+--------------------+------------------+-----------------+------------------+------------------+----------+------------------+--

[Stage 51:>                                                         (0 + 2) / 2]

+-------+-----------------+-------------------+-------------------+
|summary|               ID|     MONTHS_BALANCE|             STATUS|
+-------+-----------------+-------------------+-------------------+
|  count|          1048575|            1048575|            1048575|
|   mean|5068286.424673486|-19.136998307226474|0.05824863961501482|
| stddev|46150.57850528666| 14.023497688326547| 0.3949878642052498|
|    min|          5001711|                -60|                  0|
|    max|          5150487|                  0|                  X|
+-------+-----------------+-------------------+-------------------+



                                                                                

In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# ==============================
# 1. SETUP SPARK SESSION
# ==============================

spark = SparkSession.builder \
    .appName("CreditCardApproval_Integration_Modeling") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# ==============================
# 2. LOAD CLEANED DATA FROM GCS
# ==============================

gcs_bucket = "dataproc-staging-us-central1-458263062208-tw36mmqt"
application_cleaned_path = f"gs://{gcs_bucket}/notebooks/jupyter/Data/cleaned_data/application_record_cleaned.parquet"
credit_cleaned_path = f"gs://{gcs_bucket}/notebooks/jupyter/Data/cleaned_data/credit_record_cleaned.parquet"

# Read cleaned data from GCS
application_df = spark.read.parquet(application_cleaned_path)
credit_df = spark.read.parquet(credit_cleaned_path)

# ==============================
# 3. DATA INTEGRATION
# ==============================

# Join on 'ID'
integrated_df = application_df.join(credit_df, on='ID', how='inner')

# ==============================
# 4. BASIC MACHINE LEARNING MODEL
# ==============================

# Define label (Creating a target variable if not present)
if 'TARGET' not in integrated_df.columns:
    integrated_df = integrated_df.withColumn('TARGET', when(col('STATUS') == '2', 1).otherwise(0))

# Select a few numerical columns for ML model
feature_columns = ['AMT_INCOME_TOTAL', 'AGE']

# Assemble features into a single column
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
integrated_df = assembler.transform(integrated_df)

# Split data into training and testing sets
train_df, test_df = integrated_df.randomSplit([0.8, 0.2], seed=42)

# Train a basic Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='TARGET')
lr_model = lr.fit(train_df)

# Make predictions
predictions = lr_model.transform(test_df)

# Evaluate using AUC
evaluator = BinaryClassificationEvaluator(labelCol='TARGET', metricName='areaUnderROC')
roc_auc = evaluator.evaluate(predictions)

print(f"ROC-AUC: {roc_auc:.4f}")

# ==============================
# 5. SAVE INTEGRATED DATA
# ==============================

integrated_data_path = f"gs://{gcs_bucket}/notebooks/jupyter/Data/integrated_data/"
integrated_df.write.mode("overwrite").parquet(integrated_data_path + "integrated_data.parquet")


                                                                                

ROC-AUC: 0.5221


                                                                                

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import mysql.connector

# ==============================
# 1. SETUP SPARK SESSION
# ==============================

spark = SparkSession.builder \
    .appName("CreditCardApproval_Optimization_Serving") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# ==============================
# 2. LOAD INTEGRATED DATA
# ==============================

gcs_bucket = "dataproc-staging-us-central1-458263062208-tw36mmqt"
integrated_data_path = f"gs://{gcs_bucket}/notebooks/jupyter/Data/integrated_data/integrated_data.parquet"

# Read integrated data
df = spark.read.parquet(integrated_data_path)

# ==============================
# 3. PERFORMANCE OPTIMIZATION
# ==============================

# Repartition for better parallel processing
df = df.repartition(10, "ID")

# Cache the dataset for faster reuse
df.cache()

# Enable Predicate Pushdown for optimized filtering
df = df.filter(col("AMT_INCOME_TOTAL") > 10000)

# ==============================
# 4. DATA SERVING (WRITE TO GCS AND MYSQL)
# ==============================

# Save final data to GCS in Parquet format
final_gcs_path = f"gs://{gcs_bucket}/notebooks/jupyter/Data/final_data/"
df.write.mode("overwrite").parquet(final_gcs_path + "final_data.parquet")

# Save data to MySQL
mysql_config = {
    "host": "34.46.229.160",
    "user": "root",
    "password": "b)p[/&GH-o|B]6u+",
    "database": "loan_data",
    "table": "final_credit_data"
}

# Convert Spark DataFrame to Pandas
pandas_df = df.toPandas()
pandas_df = pandas_df[:10]

# Connect to MySQL
mysql_conn = mysql.connector.connect(
    host=mysql_config["host"],
    user=mysql_config["user"],
    password=mysql_config["password"],
    database=mysql_config["database"]
)

cursor = mysql_conn.cursor()

# Create table if not exists
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {mysql_config['table']} (
    ID VARCHAR(50),
    AMT_INCOME_TOTAL FLOAT,
    AGE INT,
    CNT_FAM_MEMBERS INT,
    TARGET INT
);
"""
cursor.execute(create_table_query)

# Insert data
for _, row in pandas_df.iterrows():
    cursor.execute(f"""
        INSERT INTO {mysql_config['table']} (ID, AMT_INCOME_TOTAL, AGE, CNT_FAM_MEMBERS, TARGET)
        VALUES (%s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE TARGET=VALUES(TARGET);
    """, (row["ID"], row["AMT_INCOME_TOTAL"], row["AGE"], row["CNT_FAM_MEMBERS"], row["TARGET"]))

mysql_conn.commit()
cursor.close()
mysql_conn.close()

print("✅ Data Optimization & Serving Completed.")


25/03/21 18:20:13 WARN CacheManager: Asked to cache already cached data.
                                                                                

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# ==============================
# 1. SETUP SPARK SESSION
# ==============================

spark = SparkSession.builder \
    .appName("CreditCardApproval_Optimization_Serving") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# ==============================
# 2. LOAD INTEGRATED DATA
# ==============================

gcs_bucket = "dataproc-staging-us-central1-458263062208-tw36mmqt"
integrated_data_path = f"gs://{gcs_bucket}/notebooks/jupyter/Data/integrated_data/integrated_data.parquet"

# Read integrated data
df = spark.read.parquet(integrated_data_path)

# ==============================
# 3. PERFORMANCE OPTIMIZATION
# ==============================

# Repartition for better parallel processing
df = df.repartition(10, "ID")

# Cache the dataset for faster reuse
df.cache()

# Enable Predicate Pushdown for optimized filtering
df = df.filter(col("AMT_INCOME_TOTAL") > 10000)

# ==============================
# 4. SAVE FINAL DATA TO GCS
# ==============================

final_gcs_path = f"gs://{gcs_bucket}/notebooks/jupyter/Data/final_data/"
df.write.mode("overwrite").parquet(final_gcs_path + "final_data.parquet")

print("✅ Data Optimization & Serving Completed. Final data saved to GCS.")


25/03/21 18:29:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

✅ Data Optimization & Serving Completed. Final data saved to GCS.
