# Data Selection and Joining

In this section, we will perform data selection & joining operations on the ingested datasets and eventually forming the gold layer. We will select relevant columns from the raw tables and join them to create a consolidated view of the credit scoring data. This step is essential for preparing the data for further analysis and modeling.

In [0]:
CATALOG = 'workspace'
BRONZE_SCHEMA = 'bronze'
SILVER_SCHEMA = 'silver'
GOLD_SCHEMA = 'gold'

In [0]:
display(
    spark.sql(f"""
    CREATE SCHEMA IF NOT EXISTS {CATALOG}.{GOLD_SCHEMA}
    """)
)


# Select and Join required Columns for Gold Schema

In [0]:
# aggregate data pembayaran

from pyspark.sql.functions import col, avg, sum as _sum, count, when
from pyspark.sql import functions as F


repayments_df = spark.table(f"{CATALOG}.{SILVER_SCHEMA}.repayments")

repayments_df.limit(5).display()

gold_repayments_df = (
    repayments_df
    .groupBy("loan_id")
    .agg(
        # Total number of payments made
        F.count("payment_id").alias("total_payments"),

        # Count by payment status
        F.sum(F.when(F.col("status_pembayaran") == "on time", 1).otherwise(0)).alias("count_tepat_waktu"),
        F.sum(F.when(F.col("status_pembayaran") == "late", 1).otherwise(0)).alias("count_terlambat"),
        F.sum(F.when(F.col("status_pembayaran") == "default", 1).otherwise(0)).alias("count_gagal_bayar"),

        # Payment on-time ratio (0.0 to 1.0)
        F.round(
            F.sum(F.when(F.col("status_pembayaran") == "on time", 1).otherwise(0)) / F.count("payment_id"),
            4
        ).alias("rasio_tepat_waktu"),

        # Days past due statistics
        F.round(F.avg("hari_keterlambatan"), 2).alias("avg_hari_keterlambatan"),
        F.max("hari_keterlambatan").alias("max_hari_keterlambatan"),
        F.sum("hari_keterlambatan").alias("total_hari_keterlambatan"),

        # Total penalty fees paid
        F.sum("denda").alias("total_denda"),

        # Total amount paid vs total amount due
        F.sum("jumlah_dibayar").alias("total_dibayar"),
        F.sum("jumlah_angsuran").alias("total_angsuran"),
    )
    # Payment ratio: total paid / total due
    .withColumn(
        "rasio_pembayaran",
        F.round(F.col("total_dibayar") / F.when(F.col("total_angsuran") == 0, 1).otherwise(F.col("total_angsuran")), 4)
    )
)

display(gold_repayments_df.limit(5))

In [0]:
gold_applicants_df = (
    spark.table(f"{CATALOG}.{SILVER_SCHEMA}.applicants")
    .select(
        "applicant_id",
        "usia",
        "pendidikan_terakhir",
        "jenis_pekerjaan",
        "lama_bekerja_tahun",
        "pendapatan_bulanan",
        "jumlah_tanggungan",
        "hutang_bulanan_existing",
        "status_pernikahan",
    )
)

gold_applicants_df.limit(3).display()

In [0]:
gold_loans_df = (
    spark.table(f"{CATALOG}.{SILVER_SCHEMA}.loans")
    .filter(F.col("flag_default").isin(0, 1))  # Approved loans only
    .select(
        "loan_id",
        "applicant_id",
        "jumlah_pinjaman",
        "tenor_bulan",
        "suku_bunga_persen",
        "angsuran_bulanan",
        "skor_kredit",
        "agunan",
        "nilai_agunan",
        "tujuan_pinjaman",
        "flag_default",  # Target variable
    )
)

gold_loans_df.limit(3).display()

In [0]:
# Join all into final feature table

gold_final_features_df = (
    gold_loans_df
    .join(gold_applicants_df, on="applicant_id", how="left")
    .join(gold_repayments_df, on="loan_id", how="left")
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## 3. Feature Engineering

# COMMAND ----------

gold_final_features_df = (
    gold_final_features_df
    .withColumn(
        "debt_to_income_ratio",
        F.round(F.col("angsuran_bulanan") / F.when(F.col("pendapatan_bulanan") == 0, 1).otherwise(F.col("pendapatan_bulanan")), 4)
    )
    .withColumn(
        "loan_to_collateral_ratio",
        F.round(F.col("jumlah_pinjaman") / F.when(F.col("nilai_agunan") == 0, F.lit(None)).otherwise(F.col("nilai_agunan")), 4)
    )
    .withColumn(
        "total_debt_to_income_ratio",
        F.round((F.col("angsuran_bulanan") + F.col("hutang_bulanan_existing")) / F.when(F.col("pendapatan_bulanan") == 0, 1).otherwise(F.col("pendapatan_bulanan")), 4)
    )
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## 4. Preview & Save

# COMMAND ----------

gold_final_features_df.orderBy("loan_id").limit(20).display()


In [0]:
# Write all gold tables into gold schema
gold_repayments_df.write.mode("overwrite").saveAsTable(f"{CATALOG}.{GOLD_SCHEMA}.repayments")
gold_applicants_df.write.mode("overwrite").saveAsTable(f"{CATALOG}.{GOLD_SCHEMA}.applicants")
gold_loans_df.write.mode("overwrite").saveAsTable(f"{CATALOG}.{GOLD_SCHEMA}.loans")
gold_final_features_df.write.mode("overwrite").saveAsTable(f"{CATALOG}.{GOLD_SCHEMA}.final_features")