In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, roc_auc_score, classification_report

In [0]:
application_train = spark.table("hive_metastore.default.application_train")
credit_card_balance = spark.table("hive_metastore.default.credit_card_balance")
pos_cash_balance = spark.table("hive_metastore.default.pos_cash_balance")
installments_payments = spark.table("hive_metastore.default.installments_payments")

application_train.createOrReplaceTempView("application_train_temp")
installments_payments.createOrReplaceTempView("installments_payments_temp")
pos_cash_balance.createOrReplaceTempView("pos_cash_balance_temp")
credit_card_balance.createOrReplaceTempView("credit_card_balance_temp")
 

In [0]:
pos_cash_balance_reduced = pos_cash_balance.alias("a") \
    .join(application_train.alias("b"), on="SK_ID_CURR", how="inner") \
    .withColumn("NAME_CONTRACT_TYPE", F.lit("Cash loans")) \
    .select("a.*", "NAME_CONTRACT_TYPE") 

In [0]:
if pos_cash_balance_reduced.count()<pos_cash_balance.count():
    print(True)
else:
    print(False)

In [0]:
credit_card_balance_reduced = credit_card_balance.alias("a") \
    .join(application_train.alias("b"), on="SK_ID_CURR", how="inner") \
    .withColumn("NAME_CONTRACT_TYPE", F.lit("Revolving loans")) \
    .select("a.*", "NAME_CONTRACT_TYPE") 

In [0]:
if credit_card_balance_reduced.count()<credit_card_balance.count():
    print(True)
else:
    print(False)

In [0]:
sql_query="""
Select distinct a.*, CASE 
        WHEN c.SK_ID_PREV IS NOT NULL AND c.SK_ID_CURR IS NOT NULL THEN 'Cash loans'
        WHEN d.SK_ID_PREV IS NOT NULL AND d.SK_ID_CURR IS NOT NULL THEN 'Revolving loans'
        ELSE NULL
    END AS NAME_CONTRACT_TYPE 
     from (
SELECT 
    a.* 
   
FROM 
    installments_payments_temp a 
JOIN 
    application_train_temp b 
ON 
    a.SK_ID_CURR = b.SK_ID_CURR ) a
LEFT JOIN 
    pos_cash_balance_temp c 
ON 
    a.SK_ID_PREV = c.SK_ID_PREV and a.SK_ID_CURR = c.SK_ID_CURR
LEFT JOIN 
    credit_card_balance_temp d 
ON 
    a.SK_ID_PREV = d.SK_ID_PREV  and a.SK_ID_CURR = d.SK_ID_CURR
"""
installments_payments_reduced=spark.sql(sql_query)

In [0]:
sql_query="""
Select a.SK_ID_CURR, d.SK_ID_CURR from
    pos_cash_balance_temp a
JOIN 
    credit_card_balance_temp d 
ON 
     a.SK_ID_CURR = d.SK_ID_CURR
"""
temp=spark.sql(sql_query)
temp.show( 10)

In [0]:
if installments_payments_reduced.count()<installments_payments.count():
    print(True)
else:
    print(False,installments_payments.count(),installments_payments_reduced.count())

In [0]:
pos_cash_balance_reduced.show(10)
                                      

In [0]:
pos_cash_features = pos_cash_balance_reduced.groupBy("SK_ID_CURR", "SK_ID_PREV").agg(
   F.max("MONTHS_BALANCE").alias("amtf_months_balance_max"),  # Most recent snapshot
    F.min("MONTHS_BALANCE").alias("amtf_months_balance_min"),  # Earliest snapshot
    F.avg("MONTHS_BALANCE").alias("amtf_months_balance_avg"),  # Average snapshot
    F.stddev("MONTHS_BALANCE").alias("amtf_months_balance_stddev"),  # Stddev of snapshots
    (F.max("MONTHS_BALANCE") - F.min("MONTHS_BALANCE") + 1).alias("amtf_loan_duration_months"),  # Loan duration

    # Installment Features
    F.max("CNT_INSTALMENT").alias("amtf_cnt_instalment_max"),  # Max number of installments
    F.min("CNT_INSTALMENT").alias("amtf_cnt_instalment_min"),  # Min number of installments
    F.avg("CNT_INSTALMENT").alias("amtf_cnt_instalment_avg"),  # Avg number of installments
    F.stddev("CNT_INSTALMENT").alias("amtf_cnt_instalment_stddev"),  # Stddev of installments
    F.min("CNT_INSTALMENT_FUTURE").alias("amtf_cnt_instalment_future_min"),  # Min future installments
    F.avg("CNT_INSTALMENT_FUTURE").alias("amtf_cnt_instalment_future_avg"),  # Avg future installments
    F.max("CNT_INSTALMENT_FUTURE").alias("amtf_cnt_instalment_future_max"),  # Max future installments
    F.stddev("CNT_INSTALMENT_FUTURE").alias("amtf_cnt_instalment_future_stddev"),  # Stddev of future installments
    (F.sum("CNT_INSTALMENT") - F.sum("CNT_INSTALMENT_FUTURE")).alias("amtf_instalments_completed"),  # Completed installments
    (F.sum("CNT_INSTALMENT_FUTURE") / F.sum("CNT_INSTALMENT")).alias("amtf_future_installment_ratio"),  # Remaining ratio

    # Delinquency and Default Risk Features
    F.max("SK_DPD").alias("amtf_max_dpd"),  # Max days past due
    F.avg("SK_DPD").alias("amtf_avg_dpd"),  # Average days past due
    F.stddev("SK_DPD").alias("amtf_stddev_dpd"),  # Stddev of days past due
    F.sum(F.when(F.col("SK_DPD") > 0, 1).otherwise(0)).alias("amtf_overdue_count"),  # Count of overdue records
    F.max("SK_DPD_DEF").alias("amtf_max_dpd_def"),  # Max days past due with deferrals
    F.avg("SK_DPD_DEF").alias("amtf_avg_dpd_def"),  # Average days past due with deferrals
    F.stddev("SK_DPD_DEF").alias("amtf_stddev_dpd_def"),  # Stddev of days past due with deferrals
    F.sum(F.when(F.col("SK_DPD_DEF") > 0, 1).otherwise(0)).alias("amtf_deferral_count"),  # Count of deferrals
    (F.sum(F.when(F.col("SK_DPD_DEF") > 0, 1).otherwise(0)) / F.count("*")).alias("amtf_deferral_proportion"),  # Proportion of deferrals

    # Contract Status Features
    F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Active", 1).otherwise(0)).alias("amtf_active_status_count"),  # Active status count
    F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Completed", 1).otherwise(0)).alias("amtf_completed_status_count"),  # Completed status count
    F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Approved", 1).otherwise(0)).alias("amtf_approved_status_count"),  # Approved status count
    F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Cancelled", 1).otherwise(0)).alias("amtf_cancelled_status_count"),  # Cancelled status count
    F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Demand", 1).otherwise(0)).alias("amtf_demand_status_count"),  # Demand status count

    # Derived Ratios
    (F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Active", 1).otherwise(0)) / F.count("*")).alias("amtf_active_status_proportion"),  # Active proportion
    (F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Completed", 1).otherwise(0)) / F.count("*")).alias("amtf_completed_status_proportion"),  # Completed proportion

    # Status Duration Features
    F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Active", F.col("MONTHS_BALANCE")).otherwise(0)).alias("amtf_active_status_duration"),  # Active duration
    F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Completed", F.col("MONTHS_BALANCE")).otherwise(0)).alias("amtf_completed_status_duration")  # Completed duration
)
pos_cash_features_final = pos_cash_balance_reduced.select("SK_ID_CURR", "SK_ID_PREV", "NAME_CONTRACT_TYPE").distinct() \
    .join(pos_cash_features, on=["SK_ID_CURR", "SK_ID_PREV"], how="inner")
pos_cash_features_final.show(truncate=False)


In [0]:
pos_cash_balance_reduced.select("SK_ID_CURR", "SK_ID_PREV").distinct().count()

In [0]:
pos_cash_features_final.count()

In [0]:
credit_card_balance_reduced.show()

In [0]:

from pyspark.sql import functions as F

# Aggregating features for credit_card_balance_reduced
credit_card_features = credit_card_balance_reduced.groupBy("SK_ID_CURR", "SK_ID_PREV", "NAME_CONTRACT_TYPE").agg(
    # Amount-Financed Features
    F.max("AMT_BALANCE").alias("amtf_amt_balance_max"),  # Max balance
    F.min("AMT_BALANCE").alias("amtf_amt_balance_min"),  # Min balance
    F.avg("AMT_BALANCE").alias("amtf_amt_balance_avg"),  # Avg balance
    F.stddev("AMT_BALANCE").alias("amtf_amt_balance_stddev"),  # Stddev of balance
    F.sum("AMT_BALANCE").alias("amtf_amt_balance_sum"),  # Total balance

    F.max("AMT_CREDIT_LIMIT_ACTUAL").alias("amtf_credit_limit_max"),  # Max credit limit
    F.min("AMT_CREDIT_LIMIT_ACTUAL").alias("amtf_credit_limit_min"),  # Min credit limit
    F.avg("AMT_CREDIT_LIMIT_ACTUAL").alias("amtf_credit_limit_avg"),  # Avg credit limit
    (F.sum("AMT_BALANCE") / F.sum("AMT_CREDIT_LIMIT_ACTUAL")).alias("amtf_credit_utilization_ratio"),  # Avg credit utilization
    (F.max("AMT_BALANCE") / F.max("AMT_CREDIT_LIMIT_ACTUAL")).alias("amtf_credit_utilization_max"),  # Max credit utilization

    # Payment Features
    F.max("AMT_PAYMENT_CURRENT").alias("amtf_amt_payment_current_max"),  # Max payment
    F.min("AMT_PAYMENT_CURRENT").alias("amtf_amt_payment_current_min"),  # Min payment
    F.avg("AMT_PAYMENT_CURRENT").alias("amtf_amt_payment_current_avg"),  # Avg payment
    F.sum("AMT_PAYMENT_CURRENT").alias("amtf_amt_payment_current_sum"),  # Total payment
    F.max("AMT_PAYMENT_TOTAL_CURRENT").alias("amtf_amt_payment_total_current_max"),  # Max total payments
    F.sum("AMT_PAYMENT_TOTAL_CURRENT").alias("amtf_amt_payment_total_current_sum"),  # Total of all payments

    # Receivable Features
    F.max("AMT_RECEIVABLE_PRINCIPAL").alias("amtf_receivable_principal_max"),  # Max principal receivable
    F.min("AMT_RECEIVABLE_PRINCIPAL").alias("amtf_receivable_principal_min"),  # Min principal receivable
    F.avg("AMT_RECEIVABLE_PRINCIPAL").alias("amtf_receivable_principal_avg"),  # Avg principal receivable
    F.sum("AMT_RECIVABLE").alias("amtf_amt_receivable_sum"),  # Total receivable amount
    F.avg("AMT_RECIVABLE").alias("amtf_amt_receivable_avg"),  # Avg receivable amount

    # Drawing Features
    F.sum("AMT_DRAWINGS_ATM_CURRENT").alias("amtf_amt_drawings_atm_sum"),  # Total ATM drawings
    F.sum("AMT_DRAWINGS_POS_CURRENT").alias("amtf_amt_drawings_pos_sum"),  # Total POS drawings
    F.sum("AMT_DRAWINGS_OTHER_CURRENT").alias("amtf_amt_drawings_other_sum"),  # Total other drawings
    F.sum("AMT_DRAWINGS_CURRENT").alias("amtf_amt_drawings_total_sum"),  # Total drawings
    (F.sum("AMT_DRAWINGS_ATM_CURRENT") / F.sum("AMT_DRAWINGS_CURRENT")).alias("amtf_atm_to_total_drawings_ratio"),  # ATM-to-total ratio
    (F.sum("AMT_DRAWINGS_POS_CURRENT") / F.sum("AMT_DRAWINGS_CURRENT")).alias("amtf_pos_to_total_drawings_ratio"),  # POS-to-total ratio

    # Drawing Counts
    F.avg("CNT_DRAWINGS_ATM_CURRENT").alias("amtf_cnt_drawings_atm_avg"),  # Avg ATM withdrawals
    F.avg("CNT_DRAWINGS_POS_CURRENT").alias("amtf_cnt_drawings_pos_avg"),  # Avg POS transactions
    F.avg("CNT_DRAWINGS_CURRENT").alias("amtf_cnt_drawings_total_avg"),  # Avg total withdrawals

    # DPD Features
    F.max("SK_DPD").alias("amtf_dpd_max"),  # Max days past due
    F.avg("SK_DPD").alias("amtf_dpd_avg"),  # Avg days past due
    F.sum(F.when(F.col("SK_DPD") > 0, 1).otherwise(0)).alias("amtf_dpd_overdue_count"),  # Count of overdue records
    (F.sum(F.when(F.col("SK_DPD") > 0, 1).otherwise(0)) / F.count("*")).alias("amtf_dpd_proportion"),  # Proportion of overdue records
    F.max("SK_DPD_DEF").alias("amtf_dpd_def_max"),  # Max DPD with deferrals
    F.avg("SK_DPD_DEF").alias("amtf_dpd_def_avg"),  # Avg DPD with deferrals

    # Contract Status Features
    F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Active", 1).otherwise(0)).alias("amtf_active_contract_count"),  # Count Active contracts
    F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Completed", 1).otherwise(0)).alias("amtf_completed_contract_count"),  # Count Completed contracts
    (F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Active", 1).otherwise(0)) / F.count("*")).alias("amtf_active_status_proportion"),  # Active proportion
    (F.sum(F.when(F.col("NAME_CONTRACT_STATUS") == "Completed", 1).otherwise(0)) / F.count("*")).alias("amtf_completed_status_proportion")  # Completed proportion
)

# Joining with NAME_CONTRACT_TYPE
credit_card_features_final = credit_card_balance_reduced.select("SK_ID_CURR", "SK_ID_PREV", "NAME_CONTRACT_TYPE").distinct() \
    .join(credit_card_features, on=["SK_ID_CURR", "SK_ID_PREV", "NAME_CONTRACT_TYPE"], how="inner")

# Show the final DataFrame
credit_card_features_final.show(truncate=False)



In [0]:
credit_card_balance_reduced.select("SK_ID_CURR", "SK_ID_PREV").distinct().count()

In [0]:
credit_card_features_final.count()

In [0]:
installments_payments_reduced.show()

In [0]:
# Aggregate features for installments_payments_reduced
installments_features = installments_payments_reduced.groupBy("SK_ID_CURR", "SK_ID_PREV", "NAME_CONTRACT_TYPE").agg(
    # Amount-Financed Features
    F.sum("AMT_INSTALMENT").alias("amtf_amt_instalment_total"),                # Total amount due
    F.avg("AMT_INSTALMENT").alias("amtf_amt_instalment_avg"),                  # Average amount due
    F.max("AMT_INSTALMENT").alias("amtf_amt_instalment_max"),                  # Max amount due
    F.min("AMT_INSTALMENT").alias("amtf_amt_instalment_min"),                  # Min amount due
    F.sum("AMT_PAYMENT").alias("amtf_amt_payment_total"),                      # Total amount paid
    F.avg("AMT_PAYMENT").alias("amtf_amt_payment_avg"),                        # Average amount paid
    F.max("AMT_PAYMENT").alias("amtf_amt_payment_max"),                        # Max amount paid
    F.min("AMT_PAYMENT").alias("amtf_amt_payment_min"),                        # Min amount paid
    F.stddev("AMT_PAYMENT").alias("amtf_amt_payment_stddev"),                  # Stddev of payment amounts
    (F.sum("AMT_PAYMENT") / F.sum("AMT_INSTALMENT")).alias("amtf_payment_to_instalment_ratio"),  # Payment-to-installment ratio

    # Timing Features
    F.sum(F.when(F.col("DAYS_ENTRY_PAYMENT") > 0, 1).otherwise(0)).alias("amtf_late_payment_count"),  # Count of late payments
    F.avg(F.when(F.col("DAYS_ENTRY_PAYMENT") > 0, F.col("DAYS_ENTRY_PAYMENT")).otherwise(None)).alias("amtf_avg_days_late"),  # Avg days late
    F.max(F.when(F.col("DAYS_ENTRY_PAYMENT") > 0, F.col("DAYS_ENTRY_PAYMENT")).otherwise(None)).alias("amtf_max_days_late"),  # Max days late
    F.min(F.when(F.col("DAYS_ENTRY_PAYMENT") > 0, F.col("DAYS_ENTRY_PAYMENT")).otherwise(None)).alias("amtf_min_days_late"),  # Min days late
    F.sum(F.when(F.col("DAYS_ENTRY_PAYMENT") < 0, 1).otherwise(0)).alias("amtf_early_payment_count"),  # Count of early payments
    F.avg(F.when(F.col("DAYS_ENTRY_PAYMENT") < 0, F.col("DAYS_ENTRY_PAYMENT")).otherwise(None)).alias("amtf_avg_days_early"),  # Avg days early
    F.max(F.when(F.col("DAYS_ENTRY_PAYMENT") < 0, F.col("DAYS_ENTRY_PAYMENT")).otherwise(None)).alias("amtf_max_days_early"),  # Max days early
    F.min(F.when(F.col("DAYS_ENTRY_PAYMENT") < 0, F.col("DAYS_ENTRY_PAYMENT")).otherwise(None)).alias("amtf_min_days_early"),  # Min days early
    F.stddev("DAYS_ENTRY_PAYMENT").alias("amtf_days_entry_payment_stddev"),    # Stddev of payment timing

    # Installment-Specific Features
    F.countDistinct("NUM_INSTALMENT_VERSION").alias("amtf_distinct_version_count"),  # Distinct installment plan versions
    F.countDistinct("NUM_INSTALMENT_NUMBER").alias("amtf_distinct_installment_count"),  # Distinct installment numbers
    F.sum(F.when(F.col("AMT_PAYMENT") >= F.col("AMT_INSTALMENT"), 1).otherwise(0)).alias("amtf_exact_or_over_payment_count"),  # Count of exact or overpayments
    F.sum(F.when(F.col("AMT_PAYMENT") < F.col("AMT_INSTALMENT"), 1).otherwise(0)).alias("amtf_underpayment_count"),  # Count of underpayments
    F.sum(F.when(F.col("AMT_PAYMENT") < F.col("AMT_INSTALMENT"), F.col("AMT_INSTALMENT") - F.col("AMT_PAYMENT")).otherwise(0)).alias("amtf_underpayment_amount_total"),  # Total underpayment amount

    # Days-Installment Features
    F.max("DAYS_INSTALMENT").alias("amtf_days_installment_max"),               # Latest installment due date
    F.min("DAYS_INSTALMENT").alias("amtf_days_installment_min"),               # Earliest installment due date
    F.avg("DAYS_INSTALMENT").alias("amtf_days_installment_avg"),               # Avg days installment due

    # Loan Type-Specific Aggregates
    F.sum(F.when(F.col("NAME_CONTRACT_TYPE") == "Cash loans", "AMT_PAYMENT").otherwise(0)).alias("amtf_cash_loans_total_payment"),  # Total payment for cash loans
    F.sum(F.when(F.col("NAME_CONTRACT_TYPE") == "Revolving loans", "AMT_PAYMENT").otherwise(0)).alias("amtf_revolving_loans_total_payment"),  # Total payment for revolving loans
    F.count(F.when(F.col("NAME_CONTRACT_TYPE") == "Cash loans", 1).otherwise(None)).alias("amtf_cash_loans_count"),  # Count for cash loans
    F.count(F.when(F.col("NAME_CONTRACT_TYPE") == "Revolving loans", 1).otherwise(None)).alias("amtf_revolving_loans_count")  # Count for revolving loans
)

# Add additional features if necessary
installments_features = installments_features.withColumn(
    "amtf_late_payment_proportion",
    F.col("amtf_late_payment_count") / F.col("amtf_distinct_installment_count")  # Proportion of late payments
).withColumn(
    "amtf_early_payment_proportion",
    F.col("amtf_early_payment_count") / F.col("amtf_distinct_installment_count")  # Proportion of early payments
)

# Final DataFrame
installments_features_final = installments_payments_reduced.select("SK_ID_CURR", "SK_ID_PREV", "NAME_CONTRACT_TYPE").distinct() \
    .join(installments_features, on=["SK_ID_CURR", "SK_ID_PREV", "NAME_CONTRACT_TYPE"], how="inner")

# Show the final result
installments_features_final.show(truncate=False)


In [0]:
installments_payments_reduced.select("SK_ID_CURR", "SK_ID_PREV").distinct().count()

In [0]:
installments_features_final.count()

In [0]:
# Perform full outer join for all three tables on SK_ID_CURR and SK_ID_PREV
combined_features = pos_cash_features_final.alias("pos") \
    .join(credit_card_features_final.alias("cc"), ["SK_ID_CURR", "SK_ID_PREV"], "full_outer") \
    .join(installments_features_final.alias("inst"), ["SK_ID_CURR", "SK_ID_PREV"], "full_outer")

# Fill missing values (optional, based on requirements)
combined_amtf_features = combined_features.fillna(0)  # Replace NaN with 0 for numeric columns

# Show the combined result
combined_amtf_features.limit(10).display()

# Count the number of rows and columns to verify completeness
print(f"Total Rows: {combined_amtf_features.count()}")
print(f"Total Features (Columns): {len(combined_amtf_features.columns)}")

In [0]:
combined_amtf_features.limit(10).display()

In [0]:
installments_payments_reduced.createOrReplaceTempView('installments_payments_reduced_temp')


In [0]:
%sql
select NAME_CONTRACT_TYPE,count(*) from installments_payments_reduced_temp group by 1

In [0]:
credit_card_installment_features_level_1 = credit_card_features_final.alias("a") \
    .join(installments_features_final.alias("b"), 
          (F.col("a.SK_ID_CURR") == F.col("b.SK_ID_CURR")) & 
          (F.col("a.SK_ID_PREV") == F.col("b.SK_ID_PREV")), 
          how="left") \
     .select("a.*", *[F.col(f"b.{col}") for col in installments_features_final.columns if col not in ["NAME_CONTRACT_TYPE", "SK_ID_CURR", "SK_ID_PREV"]])

In [0]:
pos_cash_installment_features_level_1 = pos_cash_features_final.alias("a") \
    .join(installments_features_final.alias("b"), 
          (F.col("a.SK_ID_CURR") == F.col("b.SK_ID_CURR")) & 
          (F.col("a.SK_ID_PREV") == F.col("b.SK_ID_PREV")), 
          how="left") \
    .select("a.*", *[F.col(f"b.{col}") for col in installments_features_final.columns if col not in ["NAME_CONTRACT_TYPE", "SK_ID_CURR", "SK_ID_PREV"]])

In [0]:
pos_cash_installment_features_level_1.limit(10).display()

In [0]:
pos_cash_features_final.select("SK_ID_CURR", "SK_ID_PREV").distinct().count()

In [0]:
pos_cash_features_final.select().count()

In [0]:
pos_cash_installment_features_level_1.select().count()

In [0]:
pos_cash_installment_features_level_1.write.mode("overwrite").saveAsTable("pos_cash_installment_features_level_1")

In [0]:
installments_features_final.filter(F.col("NAME_CONTRACT_TYPE") != "Cash loans").count()

In [0]:
credit_card_features_final.count()

In [0]:
credit_card_installment_features_level_1.count()

In [0]:
credit_card_installment_features_level_1.write.mode("overwrite").saveAsTable("credit_card_installment_features_level_1")

In [0]:
# Aggregate features at SK_ID_CURR level
pos_cash_installments_aggregated = pos_cash_installment_features_level_1.groupBy("SK_ID_CURR").agg(
    # Loan Duration Features
    F.max("amtf_loan_duration_months").alias("amtf_loan_duration_max"),
    F.min("amtf_loan_duration_months").alias("amtf_loan_duration_min"),
    F.avg("amtf_loan_duration_months").alias("amtf_loan_duration_avg"),
    F.stddev("amtf_loan_duration_months").alias("amtf_loan_duration_stddev"),

    # Amount Features
    F.sum("amtf_amt_instalment_total").alias("amtf_amt_instalment_total_sum"),
    F.avg("amtf_amt_instalment_avg").alias("amtf_amt_instalment_avg"),
    F.max("amtf_amt_instalment_max").alias("amtf_amt_instalment_max"),
    F.min("amtf_amt_instalment_min").alias("amtf_amt_instalment_min"),
    F.stddev("amtf_amt_instalment_avg").alias("amtf_amt_instalment_avg_stddev"),
    
    F.sum("amtf_amt_payment_total").alias("amtf_amt_payment_total_sum"),
    F.avg("amtf_amt_payment_avg").alias("amtf_amt_payment_avg"),
    F.max("amtf_amt_payment_max").alias("amtf_amt_payment_max"),
    F.min("amtf_amt_payment_min").alias("amtf_amt_payment_min"),
    F.stddev("amtf_amt_payment_avg").alias("amtf_amt_payment_avg_stddev"),

    # Timing Features
    F.avg("amtf_avg_days_late").alias("amtf_avg_days_late_avg"),
    F.max("amtf_max_days_late").alias("amtf_max_days_late_max"),
    F.min("amtf_min_days_late").alias("amtf_min_days_late_min"),
    F.avg("amtf_avg_days_early").alias("amtf_avg_days_early_avg"),
    F.max("amtf_max_days_early").alias("amtf_max_days_early_max"),
    F.min("amtf_min_days_early").alias("amtf_min_days_early_min"),
    F.sum("amtf_late_payment_count").alias("amtf_late_payment_total"),
    F.sum("amtf_early_payment_count").alias("amtf_early_payment_total"),
    F.avg("amtf_days_entry_payment_stddev").alias("amtf_days_entry_payment_stddev_avg"),

    # Status Features
    F.sum("amtf_active_status_count").alias("amtf_active_status_total"),
    F.sum("amtf_completed_status_count").alias("amtf_completed_status_total"),
    F.avg("amtf_active_status_proportion").alias("amtf_active_status_proportion_avg"),
    F.avg("amtf_completed_status_proportion").alias("amtf_completed_status_proportion_avg"),

    # Ratios
    F.avg("amtf_payment_to_instalment_ratio").alias("amtf_payment_to_instalment_ratio_avg"),
    F.sum("amtf_payment_to_instalment_ratio").alias("amtf_payment_to_instalment_ratio_sum"),
    
    # Default Features
    F.sum("amtf_deferral_count").alias("amtf_deferral_count_total"),
    F.avg("amtf_deferral_proportion").alias("amtf_deferral_proportion_avg"),
    F.sum("amtf_overdue_count").alias("amtf_overdue_count_total"),
    F.avg("amtf_stddev_dpd").alias("amtf_stddev_dpd_avg"),

    # Loan Count Features
    F.sum("amtf_cash_loans_count").alias("amtf_cash_loans_total"),
    F.avg("amtf_cash_loans_count").alias("amtf_cash_loans_avg"),
    F.sum("amtf_revolving_loans_count").alias("amtf_revolving_loans_total"),
    F.avg("amtf_revolving_loans_count").alias("amtf_revolving_loans_avg"),

    # Installment Features
    F.sum("amtf_instalments_completed").alias("amtf_instalments_completed_total"),
    F.avg("amtf_future_installment_ratio").alias("amtf_future_installment_ratio_avg"),
    
    # Days Installment Features
    F.max("amtf_days_installment_max").alias("amtf_days_installment_latest"),
    F.min("amtf_days_installment_min").alias("amtf_days_installment_earliest"),
    F.avg("amtf_days_installment_avg").alias("amtf_days_installment_avg"),
    
    # Aggregated Trends
    F.sum("amtf_amt_instalment_total").alias("amtf_total_financed_amount"),
    F.sum("amtf_amt_payment_total").alias("amtf_total_repaid_amount"),
    (F.sum("amtf_amt_payment_total") / F.sum("amtf_amt_instalment_total")).alias("amtf_total_repayment_ratio"),
)
# Final DataFrame

pos_cash_installment_features_level_2 = pos_cash_installments_aggregated.alias("a") \
    .join(
        pos_cash_installment_features_level_1.select("SK_ID_CURR", "NAME_CONTRACT_TYPE").distinct().alias("b"),
        on=F.col("a.SK_ID_CURR") == F.col("b.SK_ID_CURR"),
        how="left"
    ) \
    .select("a.*", "b.NAME_CONTRACT_TYPE")

# Display the first 10 rows
pos_cash_installment_features_level_2.limit(10).display()

In [0]:
pos_cash_installment_features_level_2.select().count()

In [0]:
pos_cash_installment_features_level_1.select("SK_ID_CURR").distinct().count()

In [0]:
pos_cash_installment_features_level_2.write.mode("overwrite").saveAsTable("pos_cash_installment_features_level_2")

In [0]:
credit_card_installment_features_level_1.limit(10).display()

In [0]:
credit_card_installments_aggregated = credit_card_installment_features_level_1.groupBy("SK_ID_CURR").agg(

    # **Amount-Financed Features**
    F.sum("amtf_amt_balance_sum").alias("amtf_total_balance"),
    F.avg("amtf_amt_balance_avg").alias("amtf_avg_balance"),
    F.max("amtf_amt_balance_max").alias("amtf_max_balance"),
    F.min("amtf_amt_balance_min").alias("amtf_min_balance"),
    F.stddev("amtf_amt_balance_stddev").alias("amtf_balance_stddev"),
    F.sum("amtf_amt_payment_current_sum").alias("amtf_total_current_payment"),
    F.avg("amtf_amt_payment_current_avg").alias("amtf_avg_current_payment"),
    F.max("amtf_amt_payment_current_max").alias("amtf_max_current_payment"),
    F.min("amtf_amt_payment_current_min").alias("amtf_min_current_payment"),
    F.stddev("amtf_amt_payment_current_avg").alias("amtf_current_payment_stddev"),

    # **Credit Utilization**
    F.avg("amtf_credit_utilization_ratio").alias("amtf_avg_credit_utilization_ratio"),
    F.max("amtf_credit_utilization_ratio").alias("amtf_max_credit_utilization_ratio"),
    F.avg("amtf_credit_limit_avg").alias("amtf_avg_credit_limit"),
    F.max("amtf_credit_limit_max").alias("amtf_max_credit_limit"),
    F.min("amtf_credit_limit_min").alias("amtf_min_credit_limit"),

    # **Receivable and Drawing Features**
    F.sum("amtf_amt_receivable_sum").alias("amtf_total_receivable"),
    F.avg("amtf_amt_receivable_avg").alias("amtf_avg_receivable"),
    F.sum("amtf_amt_drawings_total_sum").alias("amtf_total_drawings"),
    F.avg("amtf_amt_drawings_total_sum").alias("amtf_avg_drawings"),
    F.sum("amtf_amt_drawings_pos_sum").alias("amtf_total_pos_drawings"),
    F.sum("amtf_amt_drawings_atm_sum").alias("amtf_total_atm_drawings"),
    F.sum("amtf_amt_drawings_other_sum").alias("amtf_total_other_drawings"),
    F.avg("amtf_atm_to_total_drawings_ratio").alias("amtf_avg_atm_to_total_drawings_ratio"),
    F.avg("amtf_pos_to_total_drawings_ratio").alias("amtf_avg_pos_to_total_drawings_ratio"),

    # **Installments and Payment Ratios**
    F.sum("amtf_amt_instalment_total").alias("amtf_total_instalment"),
    F.avg("amtf_amt_instalment_avg").alias("amtf_avg_instalment"),
    F.sum("amtf_amt_payment_total").alias("amtf_total_payment"),
    F.avg("amtf_payment_to_instalment_ratio").alias("amtf_avg_payment_to_instalment_ratio"),
    F.sum("amtf_underpayment_amount_total").alias("amtf_total_underpayment"),
    F.avg("amtf_underpayment_count").alias("amtf_avg_underpayment_count"),
    F.sum("amtf_late_payment_count").alias("amtf_total_late_payments"),
    F.avg("amtf_late_payment_proportion").alias("amtf_avg_late_payment_proportion"),

    # **Timing Features**
    F.max("amtf_max_days_late").alias("amtf_max_days_late"),
    F.avg("amtf_avg_days_late").alias("amtf_avg_days_late"),
    F.min("amtf_min_days_late").alias("amtf_min_days_late"),
    F.max("amtf_max_days_early").alias("amtf_max_days_early"),
    F.avg("amtf_avg_days_early").alias("amtf_avg_days_early"),
    F.stddev("amtf_days_entry_payment_stddev").alias("amtf_payment_timing_stddev"),

    # **Status Features**
    F.sum("amtf_active_contract_count").alias("amtf_total_active_contracts"),
    F.sum("amtf_completed_contract_count").alias("amtf_total_completed_contracts"),
    F.avg("amtf_active_status_proportion").alias("amtf_avg_active_status_proportion"),
    F.avg("amtf_completed_status_proportion").alias("amtf_avg_completed_status_proportion"),

    # **Loan Duration and Version Features**
    F.sum("amtf_days_installment_max").alias("amtf_total_days_installment_max"),
    F.min("amtf_days_installment_min").alias("amtf_total_days_installment_min"),
    F.avg("amtf_days_installment_avg").alias("amtf_avg_days_installment"),
    F.countDistinct("amtf_distinct_version_count").alias("amtf_total_distinct_versions"),
    F.countDistinct("amtf_distinct_installment_count").alias("amtf_total_distinct_installments"),

    # **Derived Ratios**
    (F.sum("amtf_amt_payment_current_sum") / F.sum("amtf_amt_balance_sum")).alias("amtf_payment_to_balance_ratio"),
    (F.sum("amtf_amt_drawings_total_sum") / F.sum("amtf_amt_balance_sum")).alias("amtf_drawings_to_balance_ratio"),
    F.avg(F.when(F.col("amtf_dpd_max") > 0, 1).otherwise(0)).alias("amtf_overdue_proportion"),
    F.avg(F.when(F.col("amtf_dpd_def_max") > 0, 1).otherwise(0)).alias("amtf_deferral_proportion"),

    # **Loan Type-Specific Features**
    F.sum(F.when(F.col("NAME_CONTRACT_TYPE") == "Revolving loans", "amtf_amt_payment_total_current_sum")).alias("amtf_revolving_loans_payment_sum"),
    F.avg(F.when(F.col("NAME_CONTRACT_TYPE") == "Revolving loans", "amtf_amt_payment_total_current_avg")).alias("amtf_revolving_loans_payment_avg"),
    F.sum(F.when(F.col("NAME_CONTRACT_TYPE") == "Revolving loans", "amtf_amt_drawings_total_sum")).alias("amtf_revolving_loans_drawings_sum"),
    F.avg(F.when(F.col("NAME_CONTRACT_TYPE") == "Revolving loans", "amtf_amt_drawings_total_sum")).alias("amtf_revolving_loans_drawings_avg"),
)

credit_card_installment_features_level_2 = credit_card_installments_aggregated.alias("a") \
    .join(
        credit_card_installment_features_level_1.select("SK_ID_CURR", "NAME_CONTRACT_TYPE").distinct().alias("b"),
        on=F.col("a.SK_ID_CURR") == F.col("b.SK_ID_CURR"),
        how="left"
    ) \
    .select("a.*", "b.NAME_CONTRACT_TYPE")

# Display the first 10 rows
credit_card_installment_features_level_2.limit(10).display()

In [0]:
credit_card_installment_features_level_2.select().count()

In [0]:
credit_card_installment_features_level_1.select("SK_ID_CURR").distinct().count()

In [0]:
credit_card_installment_features_level_2.filter("NAME_CONTRACT_TYPE = 'Revolving loans'").count()

In [0]:
credit_card_installment_features_level_2.write.mode("overwrite").saveAsTable("credit_card_installment_features_level_2")

In [0]:
# Step 1: Specify demographic features to pull directly
demographic_features = [
    "CODE_GENDER",
    "FLAG_OWN_CAR",
    "FLAG_OWN_REALTY",
    "CNT_CHILDREN",
    "NAME_EDUCATION_TYPE",
    "NAME_INCOME_TYPE",
    "NAME_FAMILY_STATUS",
    "REGION_POPULATION_RELATIVE",
    "DAYS_BIRTH",
    "DAYS_EMPLOYED"
]

# Step 2: Filter numeric columns and compute correlations
numeric_cols = [col for col, dtype in application_train.dtypes if dtype in ['int', 'double']]
correlation_df = []
for col in numeric_cols:
    if col != "TARGET":
        correlation_value = application_train.select(F.corr("TARGET", col).alias("correlation")).collect()[0][0]
        if correlation_value is not None:
            correlation_df.append((col, correlation_value))

# Step 3: Create a DataFrame of correlations and select top numeric features
correlation_df = spark.createDataFrame(correlation_df, ["feature", "correlation"])
top_numeric_features = correlation_df.withColumn("abs_corr", F.abs(F.col("correlation"))) \
                                     .orderBy(F.col("abs_corr").desc()) \
                                     .limit(10)

numeric_features = [row["feature"] for row in top_numeric_features.collect()]

# Step 4: Combine demographic and numeric features
selected_features = demographic_features + numeric_features + ["TARGET", "SK_ID_CURR"]

# Step 5: Filter the application_train DataFrame with the selected features
application_selected = application_train.select(selected_features)

# Display the selected features
application_selected.show()


In [0]:
application_columns = [col for col in application_selected.columns if col != "SK_ID_CURR"]

# Perform the join and select the required columns
credit_card_installment_features_level_3 = credit_card_installment_features_level_2.alias("a") \
    .join(
        application_selected.alias("b"),
        on=F.col("a.SK_ID_CURR") == F.col("b.SK_ID_CURR"),
        how="left"
    ) \
    .select("a.*", *[F.col(f"b.{col}") for col in application_columns])

# Display the result
credit_card_installment_features_level_3.limit(10).display()

In [0]:
credit_card_installment_features_level_3.count()

In [0]:
pos_cash_installment_features_level_3 = pos_cash_installment_features_level_2.alias("a") \
    .join(
        application_selected.alias("b"),
        on=F.col("a.SK_ID_CURR") == F.col("b.SK_ID_CURR"),
        how="left"
    ) \
    .select("a.*", *[F.col(f"b.{col}") for col in application_columns])

pos_cash_installment_features_level_3.limit(10).display()

In [0]:
pos_cash_installment_features_level_3.count()

In [0]:
pos_cash_installment_features_level_3.write.mode("overwrite").saveAsTable("pos_cash_installment_features_level_3")


In [0]:
credit_card_installment_features_level_3.write.mode("overwrite").saveAsTable("credit_card_installment_features_level_3")

In [0]:

# # Step 1: Handle missing values
# pos_cash_installment_features_level_3_temp = pos_cash_installment_features_level_3.fillna(0)
# credit_card_installment_features_level_3_temp = credit_card_installment_features_level_3.fillna(0)

# # Step 2: Identify numeric columns for scaling
# numeric_columns_pos = [col for col, dtype in pos_cash_installment_features_level_3_temp.dtypes if dtype in ('double', 'int') and col != "TARGET"]
# numeric_columns_credit = [col for col, dtype in credit_card_installment_features_level_3_temp.dtypes if dtype in ('double', 'int') and col != "TARGET"]

# # Step 3: Assemble features
# assembler_pos = VectorAssembler(inputCols=numeric_columns_pos, outputCol="features_assembled_pos")
# assembler_credit = VectorAssembler(inputCols=numeric_columns_credit, outputCol="features_assembled_credit")

# assembled_pos = assembler_pos.transform(pos_cash_installment_features_level_3_temp)
# assembled_credit = assembler_credit.transform(credit_card_installment_features_level_3_temp)

# # Step 4: Scale features
# scaler_pos = StandardScaler(inputCol="features_assembled_pos", outputCol="features_scaled_pos", withStd=True, withMean=True)
# scaler_credit = StandardScaler(inputCol="features_assembled_credit", outputCol="features_scaled_credit", withStd=True, withMean=True)

# scaled_pos = scaler_pos.fit(assembled_pos).transform(assembled_pos)
# scaled_credit = scaler_credit.fit(assembled_credit).transform(assembled_credit)

In [0]:
# # Step 5: Split data into train and test
# train_pos, test_pos = scaled_pos.randomSplit([0.8, 0.2], seed=42)
# train_credit, test_credit = scaled_credit.randomSplit([0.8, 0.2], seed=42)

In [0]:
# models = {
#     "Logistic Regression": LogisticRegression(labelCol="TARGET", featuresCol="features_scaled_pos", maxIter=100),  # Spark ML Logistic Regression
#     "Random Forest": RandomForestClassifier(labelCol="TARGET", featuresCol="features_scaled_pos", numTrees=50, maxDepth=10, seed=42),  # Spark ML Random Forest
#     "Gradient Boosted Trees": GBTClassifier(labelCol="TARGET", featuresCol="features_scaled_pos", maxIter=50, maxDepth=5, seed=42),  # Spark ML Gradient Boosted Trees
# }

In [0]:
# # Function to train and evaluate models
# def evaluate_models(train_data, test_data, models):
#     results = []
#     for model_name, model in models.items():
#         print(f"Training {model_name}...")
#         trained_model = model.fit(train_data)
        
#         # Make predictions
#         predictions = trained_model.transform(test_data)
        
#         # Evaluate using ROC-AUC
#         evaluator = BinaryClassificationEvaluator(labelCol="TARGET", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
#         roc_auc = evaluator.evaluate(predictions)
        
#         # Store results
#         results.append((model_name, trained_model, roc_auc))
#         print(f"{model_name} ROC-AUC: {roc_auc:.4f}")
    
#     return results


In [0]:
# # Step 7: Train and evaluate models on POS Cash and Installments
# print("\nEvaluating models for POS Cash and Installments...")
# results_pos = evaluate_models(train_pos, test_pos, models)

# # Step 8: Train and evaluate models on Credit Card and Installments
# print("\nEvaluating models for Credit Card and Installments...")
# results_credit = evaluate_models(train_credit, test_credit, models)
