In [0]:
#stress scenarios (baseline, adverse, and extreme) 

'''
Define Mortgage Data: Import and create a DataFrame that holds sample mortgage data, including loan balances, interest rates, loan-to-value (LTV) ratios, credit scores, and property values.

Define Scenarios: Create different economic stress scenarios (e.g., baseline, adverse, extreme) that apply varying levels of unemployment increase and property value decline.

Model Losses under Stress: For each scenario, apply adjustments to the mortgage data (e.g., adjusting property values, recalculating LTV ratios, and increasing the probability of default) to simulate the effects of the stress scenario on the loans.

Calculate Expected Losses: For each loan, calculate the expected loss based on the adjusted data (using parameters such as stress-tested property values, probability of default, and loss given default).

Aggregate Results: Aggregate the expected losses across all loans for each scenario to calculate the total financial loss for each scenario.

Store and Display Results: Store the total loss for each scenario in a DataFrame and display the results to help understand the impact of different stress conditions on the mortgage portfolio'''

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
# Importing required types for defining the schema of the DataFrame.

# Define schema for scenario results
results_schema = StructType([ 
    StructField("scenario", StringType(), nullable=False),  # Defines the 'scenario' field of type String
    StructField("total_loss", DoubleType(), nullable=False)  # Defines the 'total_loss' field of type Double
])

# Initialize empty DataFrame with the schema
results_df = spark.createDataFrame([], schema=results_schema) 
# Creates an empty DataFrame 'results_df' with the defined schema 'results_schema'.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, udf
from pyspark.sql.types import DoubleType, StringType, StructType, StructField
# Importing additional necessary functions and types from PySpark.

# Initialize Spark session
spark = SparkSession.builder.appName("MultiScenarioAnalysis").getOrCreate() 
# Initializes a Spark session, which is necessary for creating and processing DataFrames.

# Sample mortgage data
data = [
    (1, 500000, 0.04, 0.75, 700, 300000),  # Sample data for mortgage (loan_id, loan_balance, interest_rate, ltv, credit_score, property_value)
    (2, 750000, 0.035, 0.85, 680, 600000), 
    (3, 300000, 0.05, 0.65, 720, 200000)
]
columns = ["loan_id", "loan_balance", "interest_rate", "ltv", "credit_score", "property_value"]
mortgage_df = spark.createDataFrame(data, columns) 
# Creates a DataFrame 'mortgage_df' from the sample data and assigns appropriate column names.

# Define scenarios
scenarios = {
    "baseline": {"unemployment_increase": 0.0, "property_value_decline": 0.0},  # Baseline scenario (no change)
    "adverse": {"unemployment_increase": 0.10, "property_value_decline": 0.15},  # Adverse scenario (with certain stress factors)
    "extreme": {"unemployment_increase": 0.20, "property_value_decline": 0.30}  # Extreme scenario (greater stress factors)
}

# Define UDF for base PD calculation
@udf(DoubleType())  # Declares a User Defined Function (UDF) to calculate base Probability of Default (PD)
def calculate_base_pd(credit_score):
    return 1 / (1 + (2.718 ** (-0.02 * (credit_score - 650)))) 
    # The UDF calculates the base PD based on a credit score using the logistic function.

# Initialize results DataFrame with explicit schema
results_schema = StructType([ 
    StructField("scenario", StringType(), nullable=False), 
    StructField("total_loss", DoubleType(), nullable=False) 
]) 
# Re-defining the schema for results (it was already defined earlier, but this line restates it).

results_df = spark.createDataFrame([], schema=results_schema) 
# Re-initializes an empty DataFrame 'results_df' to store the results of each scenario.

# Iterate through scenarios
for scenario_name, params in scenarios.items():  # Iterating over each defined scenario (baseline, adverse, extreme)
    # Apply scenario parameters
    mortgage_stressed = mortgage_df \
        .withColumn("stressed_property_value", col("property_value") * (1 - params["property_value_decline"])) \
        .withColumn("stressed_ltv", col("loan_balance") / col("stressed_property_value")) \
        .withColumn("lgd", when(col("stressed_ltv") > 0.8, 0.4).otherwise(0.25)) \
        .withColumn("base_pd", calculate_base_pd(col("credit_score"))) \
        .withColumn("stress_pd", col("base_pd") * (1 + params["unemployment_increase"])) \
        .withColumn("expected_loss", col("stress_pd") * col("lgd") * col("loan_balance"))
    # The code creates new columns based on the scenario parameters, including 'stressed_property_value', 
    # 'stressed_ltv', 'lgd' (Loss Given Default), 'base_pd' (base Probability of Default), 'stress_pd' (stressed PD), 
    # and 'expected_loss' for each mortgage in the dataset.

    # Calculate total loss for the scenario
    total_loss = mortgage_stressed.agg({"expected_loss": "sum"}).collect()[0][0]
    # Sums the 'expected_loss' column across all mortgages to get the total loss for the scenario.

    # Append results to results_df
    new_row = spark.createDataFrame(
        [(scenario_name, float(total_loss))],  # Creates a new row for the result
        schema=results_schema  # Ensures the new row follows the defined schema
    )
    results_df = results_df.union(new_row)  # Appends the new row to the results DataFrame.

# Show results
results_df.show()  # Displays the final DataFrame containing the total loss for each scenario.


+--------+------------------+
|scenario|        total_loss|
+--------+------------------+
|baseline| 436159.5628857756|
| adverse|479775.51917435334|
| extreme| 523391.4754629308|
+--------+------------------+

