# Monte Carlo Simulation of CPPI with Interactive Reporting of Floor Violations and Terminal Wealth Statistics

In [74]:
import ipywidgets as widgets
from IPython.display import display
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


def gbm(n_years = 10 , n_scenarios=1000, mu=0.07, sigma=0.15, steps_per_year=12, s_0=100.0, prices=True):
    
    """ Geometric Brownian Motion trajectories for Stock Prices through Monte Carlo
        n_years:  The number of years to generate data for
        n_paths: The number of scenarios/trajectories
        mu: Annualized Drift, e.g. Market Return
        sigma: Annualized Volatility
        steps_per_year: granularity of the simulation
        s_0: initial value
        return: numpy array of n_paths columns and n_years*steps_per_year rows
    """
    dt = 1/steps_per_year
    n_steps = int(n_years*steps_per_year) + 1
    rets_plus_1 = np.random.normal(loc=(1+mu)**dt, scale=(sigma*np.sqrt(dt)), size=(n_steps, n_scenarios))
    rets_plus_1[0] = 1
    ret_val = s_0*pd.DataFrame(rets_plus_1).cumprod() if prices else rets_plus_1-1
    return ret_val


def show_gbm (n_scenarios, mu, sigma):
    """ Simulates and visualizes the evolution of stock prices under the Geometric Brownian Motion (GBM) model
        n_scenarios : Number of simulation paths (scenarios) to generate   
        assumptions:
            - GBM models stock prices as lognormally distributed.
            - It captures both drift (expected return) and random fluctuations (volatility)
    """
    s_0 = 100
    prices = gbm(n_scenarios = n_scenarios, mu = mu, sigma = sigma, s_0 = s_0)
    ax = prices.plot( legend = False, alpha = 0.5, linewidth = 2, figsize = (12,5))
    ax.axhline( y=s_0, ls=":", color ="black")
    ax.set_ylim(top=500) # y axis
    ax.plot(0, s_0, marker='o', alpha = 0.2)

def run_cppi(risky_r, safe_r=None, m=3, start=1000, floor=0.8, riskfree_rate=0.03, drawdown=None):
    """
    Runs a backtest for the Constant Proportion Portfolio Insurance (CPPI) strategy and simulates
    the portfolio’s performance over time based on the historical returns of a risky asset and a risk-free asset,
    adjusting the portfolio’s exposure to risk based on the cushion between the current 
    account value and the floor value.

    risky_r : A DataFrame or Series containing the returns of the risky asset. 
                                           Each row corresponds to one time step.
    safe_r (pd.DataFrame or pd.Series, optional): A DataFrame or Series containing the returns of the safe (risk-free) asset. 
                                                  Defaults to None, in which case the risk-free rate is used.
    m (float, optional): The multiplier used to adjust the risky asset allocation. Defaults to 3.
    start (float, optional): The initial account value. Defaults to 1000.
    floor (float, optional): The floor value as a proportion of the initial account value. Defaults to 0.8.
    riskfree_rate (float, optional): The annual risk-free rate, used if `safe_r` is not provided. Defaults to 0.03.
    drawdown (float, optional): A maximum allowed drawdown, as a percentage of the peak value. Defaults to None (no drawdown limit).

    Returns:
    
        "Wealth": A DataFrame with the value of the portfolio at each time step.
        "Risky Wealth": A DataFrame with the wealth of the risky asset alone.
        "Risk Budget": A DataFrame with the cushion value (account value above the floor).
        "Risky Allocation": A DataFrame with the percentage of wealth allocated to the risky asset.
        "m": The multiplier used in the strategy.
        "start": The starting account value.
        "floor": The floor value used to compute the cushion.
        "risky_r": The returns of the risky asset.
        "safe_r": The returns of the safe asset.
        "drawdown": The maximum drawdown (if provided).
        "peak": A DataFrame with the peak value of the portfolio at each time step.
        "floor": A DataFrame with the floor value at each time step.
    """
    
    dates = risky_r.index
    n_steps = len(dates)
    account_value = start
    floor_value = start*floor
    peak = account_value
    if isinstance(risky_r, pd.Series): 
        risky_r = pd.DataFrame(risky_r, columns=["R"])

    if safe_r is None:
        safe_r = pd.DataFrame().reindex_like(risky_r)
        safe_r.values[:] = riskfree_rate/12 
    account_history = pd.DataFrame().reindex_like(risky_r)
    risky_w_history = pd.DataFrame().reindex_like(risky_r)
    cushion_history = pd.DataFrame().reindex_like(risky_r)
    floorval_history = pd.DataFrame().reindex_like(risky_r)
    peak_history = pd.DataFrame().reindex_like(risky_r)

    for step in range(n_steps):
        if drawdown is not None:
            peak = np.maximum(peak, account_value)
            floor_value = peak*(1-drawdown)
        cushion = (account_value - floor_value)/account_value
        risky_w = m*cushion
        risky_w = np.minimum(risky_w, 1)
        risky_w = np.maximum(risky_w, 0)
        safe_w = 1-risky_w
        risky_alloc = account_value*risky_w
        safe_alloc = account_value*safe_w
        account_value = risky_alloc*(1+risky_r.iloc[step]) + safe_alloc*(1+safe_r.iloc[step])
        cushion_history.iloc[step] = cushion
        risky_w_history.iloc[step] = risky_w
        account_history.iloc[step] = account_value
        floorval_history.iloc[step] = floor_value
        peak_history.iloc[step] = peak
    risky_wealth = start*(1+risky_r).cumprod()
    backtest_result = {
        "Wealth": account_history,
        "Risky Wealth": risky_wealth, 
        "Risk Budget": cushion_history,
        "Risky Allocation": risky_w_history,
        "m": m,
        "start": start,
        "floor": floor,
        "risky_r":risky_r,
        "safe_r": safe_r,
        "drawdown": drawdown,
        "peak": peak_history,
        "floor": floorval_history
    }
    return backtest_result



## Interactive Constant Proportion Portfolio Insurance

In [81]:
def show_cppi(n_scenarios=1000, mu=0.07, sigma=0.15, m=3, floor=0., riskfree_rate=0.03, steps_per_year=12, y_max=100):
    """
    Runs a Monte Carlo Simulation of CPPI and generates two types of plots:
	1.	Wealth Path Plot: Shows the simulated wealth over time for multiple scenarios.
	2.	Terminal Wealth Histogram: Shows the distribution of terminal wealth at the end of the simulation.

        n_scenarios: The number of scenarios to simulate  
    	mu: The expected return of the risky asset 
    	sigma: The volatility (standard deviation) of the risky asset 
    	m: The multiplier for the CPPI strategy
    	floor: minimum acceptable wealth level
    	riskfree_rate: The risk-free rate of return 
    	steps_per_year: The number of time steps per year for the simulation 
    	y_max: The factor by which the maximum y-axis value for the wealth plot is scaled
    
    """
    start = 100
    sim_rets = gbm(n_scenarios=n_scenarios, mu=mu, sigma=sigma, prices=False, steps_per_year=steps_per_year)
    risky_r = pd.DataFrame(sim_rets)
    btr = run_cppi(risky_r=pd.DataFrame(risky_r),riskfree_rate=riskfree_rate,m=m, start=start, floor=floor)
    wealth = btr["Wealth"]

    y_max=wealth.values.max()*y_max/100
    terminal_wealth = wealth.iloc[-1]
    
    tw_mean = terminal_wealth.mean()
    tw_median = terminal_wealth.median()
    failure_mask = np.less(terminal_wealth, start*floor)
    n_failures = failure_mask.sum()
    p_fail = n_failures/n_scenarios

    e_shortfall = np.dot(tv-start*floor, failure_mask)/n_failures if n_failures > 0 else 0.0

    fig, (wealth_ax, hist_ax) = plt.subplots(nrows=1, ncols=2, sharey=True, gridspec_kw={'width_ratios':[3,2]}, figsize=(24, 9))
    plt.subplots_adjust(wspace=0.0)
    
    wealth.plot(ax=wealth_ax, legend=False, alpha=0.3)
    wealth_ax.axhline(y=start, ls=":", color="black")
    wealth_ax.axhline(y=start*floor, ls="--")
    wealth_ax.set_ylim(top=y_max)
    
    terminal_wealth.plot.hist(ax=hist_ax, bins=50, ec='w', orientation='horizontal')
    hist_ax.axhline(y=start, ls=":", color="black")
    hist_ax.axhline(y=tw_mean, ls=":", color="blue")
    hist_ax.axhline(y=tw_median, ls=":", color="purple")
    hist_ax.annotate(f"Mean: ${int(tw_mean)}", xy=(.7, .9),xycoords='axes fraction', fontsize=24)
    hist_ax.annotate(f"Median: ${int(tw_median)}", xy=(.7, .85),xycoords='axes fraction', fontsize=24)
    if (floor > 0.01):
        hist_ax.axhline(y=start*floor, ls="--", color="red", linewidth=3)
        hist_ax.annotate(f"Violations: {n_failures} ({p_fail*100:2.2f}%)\nE(shortfall)=${e_shortfall:2.2f}", xy=(.7, .7), xycoords='axes fraction', fontsize=24)

cppi_controls = widgets.interactive(show_cppi,
                                   n_scenarios=widgets.IntSlider(min=1, max=1000, step=5, value=50), 
                                   mu=(-0.2, 1, .01),
                                   sigma=(0, .5, .05),
                                   floor=(0, 2, .01),
                                   m=(1, 5, .5),
                                   riskfree_rate=(0, .1 , .01),
                                   steps_per_year=widgets.IntSlider(min=1, max=12, step=1, value=12,
                                                          description="Rebals/Year"),
                                   y_max=widgets.IntSlider(min=0, max=100, step=1, value=100,
                                                          description="Zoom Y Axis")
)
display(cppi_controls)

interactive(children=(IntSlider(value=50, description='n_scenarios', max=1000, min=1, step=5), FloatSlider(val…