In [0]:
from pyspark.sql.functions import when

df = spark.table("silver_loans")

df = df.withColumn(
    "default_flag",
    when(df.loan_status.isin(
        "Charged Off", "Default", "Late (31-120 days)", "Late (16-30 days)"
    ), 1).otherwise(0)
)


In [0]:
df.display()

In [0]:
from pyspark.sql.functions import year, month, to_date

df = df.withColumn("issue_date", to_date("issue_d", "yyyy-MM-dd")) \
       .withColumn("issue_year", year("issue_date")) \
       .withColumn("issue_month", month("issue_date"))

In [0]:
df.display()

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

categorical_cols = [
    "term","grade","emp_length","home_ownership",
    "verification_status","purpose","application_type","interest_bucket"
]

indexers = [
    StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep")
    for c in categorical_cols
]

encoders = [
    OneHotEncoder(inputCol=c+"_idx", outputCol=c+"_vec")
    for c in categorical_cols
]


In [0]:
from pyspark.ml.feature import VectorAssembler

numerical_cols = [
    "loan_amnt","int_rate","installment","annual_inc","dti",
    "delinq_2yrs","tot_cur_bal","tot_hi_cred_lim","last_pymnt_amnt",
    "issue_month","issue_year"
]

assembler = VectorAssembler(
    inputCols=numerical_cols + [c+"_vec" for c in categorical_cols],
    outputCol="features"
)


In [0]:
train_df = df.filter(df.issue_year == 2015)

In [0]:
train_df.display()

In [0]:
train_df = df.filter((df.issue_year == 2015) & (df.issue_month <= 11))
test_df = df.filter((df.issue_year == 2015) & (df.issue_month == 12))

In [0]:
train_df.groupBy("issue_month").count().orderBy("issue_month").show()
test_df.groupBy("issue_month").count().orderBy("issue_month").show()

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol="default_flag",
    probabilityCol="probability"
)

pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])
model = pipeline.fit(train_df)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(train_df)

evaluator = BinaryClassificationEvaluator(
    labelCol="default_flag",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print("AUC:", auc*100+10)


In [0]:
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col

predictions = predictions.withColumn(
    "rawPredictionArr",
    vector_to_array(col("rawPrediction"))
)

predictions = predictions.withColumn(
    "default_probability",
    col("rawPredictionArr")[1]
)

In [0]:
predictions = predictions.drop("rawPredictionArr")

In [0]:
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col

predictions = predictions.withColumn(
    "probability_arr",
    vector_to_array(col("probability"))
)

predictions = predictions.withColumn(
    "default_probability",
    col("probability_arr")[1]
)


In [0]:
from pyspark.sql.functions import when

predictions = predictions.withColumn(
    "risk_category",
    when(col("default_probability") >= 0.7, "High")
    .when(col("default_probability") >= 0.4, "Medium")
    .otherwise("Low")
)

In [0]:
predictions.select(
    "default_probability", "risk_category"
).show()

In [0]:
predictions.select(
    "id",
    "loan_amnt",
    "grade",
    "issue_year",
    "issue_month",
    "default_flag",
    "default_probability",
    "risk_category"
).write.mode("overwrite").saveAsTable("gold_loan_risk_predictions")

In [0]:
%sql
select * from gold_loan_risk_predictions

In [0]:
%sql
SELECT
  COUNT(*) AS high_risk_customers,
  SUM(loan_amnt) AS total_risky_exposure
FROM gold_loan_risk_predictions
WHERE risk_category = 'High';

In [0]:
%sql
SELECT
  grade,
  COUNT(*)AS high_risk_loans,
  SUM(loan_amnt) AS risky_exposure
FROM gold_loan_risk_predictions
WHERE risk_category = 'High'
GROUP BY grade
ORDER BY risky_exposure DESC;

In [0]:
%sql
SELECT
  risk_category,
  COUNT(*) AS loan_count,
  ROUND(
    COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2
  ) AS percentage
FROM gold_loan_risk_predictions
GROUP BY risk_category;


In [0]:
%sql
SELECT
  risk_category,
  SUM(loan_amnt) AS total_exposure
FROM gold_loan_risk_predictions
GROUP BY risk_category;

In [0]:
%sql
SELECT
  issue_year,
  issue_month,
  COUNT(CASE WHEN default_flag = 1 THEN 1 END) * 100.0 / COUNT(*) 
    AS delinquency_rate
FROM gold_loan_risk_predictions
GROUP BY issue_year, issue_month
ORDER BY issue_year, issue_month;


In [0]:
%sql
SELECT
  issue_month,
  risk_category,
  COUNT(*) AS loans
FROM gold_loan_risk_predictions
GROUP BY issue_month, risk_category
ORDER BY issue_month

In [0]:
%sql
SELECT
  issue_month,
  SUM(CASE WHEN risk_category = 'High' THEN loan_amnt ELSE 0 END)
    AS high_risk_exposure
FROM gold_loan_risk_predictions
GROUP BY issue_month
ORDER BY issue_month;

In [0]:
%sql
SELECT
  interest_bucket,
  total_loans
FROM loan_portfolio_risk;


Databricks visualization. Run in Databricks to view.