<a href="https://colab.research.google.com/github/presmoore/Jun23-Ruby-Assignment/blob/master/dca_portfolio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Initial

In [None]:
!pip install financetoolkit



In [None]:
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from financetoolkit import Toolkit
from typing import List, Any
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
import codecs
import base64
import csv
from io import StringIO

### Simulation code

In [None]:
def simulate(universe: pd.DataFrame, K: int, bet: int = 100, ranking_method: str = 'benchmark', loss_threshold: float = 0.0) -> pd.DataFrame:
    """
    Performs DCA investment simulation run.

    Parameters:
    universe (pd.DataFrame): DataFrame indexed by month with stock returns
    K (int):                 Number of stocks to invest in each month (only valid for 'losers' ranking_method)
    ranking_method (str):    Method to rank stocks.
                             'losers' to invest in K worst based on previous month performance
                             'benchmark' equal investment into all stocks in universe
                             'conditional_losers' invest in K worst only when `loss_threshold` is breached
    loss_threshold (float, optional):
                             Minimum loss percentage to consider for 'conditional_losers' method.

    Returns:
    pd.DataFrame:            DataFrame with performance of individual cohorts as well as aggregate performance
                             across all cohorts.
    """

    cohort_dict = {}

    for start_month in range(1, len(universe)):
        previous_month = universe.index[start_month - 1]

        # Select stocks based on the ranking method
        if ranking_method == 'losers':
            selected_stocks = universe.loc[previous_month].nsmallest(K).index
        elif ranking_method == 'benchmark':
            selected_stocks = universe.columns
        elif ranking_method == 'conditional_losers':
            # Select the K lowest-performing stocks and check the loss threshold
            losers = universe.loc[previous_month].nsmallest(K * 2) # Select more to ensure we get enough after filtering
            selected_stocks = losers[losers <= -loss_threshold].index[:K]
        else:
            raise ValueError("Invalid ranking_method. Choose 'losers' or 'benchmark'.")

        # Initialize the investment value for the cohort
        if selected_stocks.any():
            investment_per_stock = 100 / len(selected_stocks)
            investment_value = pd.Series(investment_per_stock, index=selected_stocks)
        else:
            # Zero investment for months with no qualifying stocks
            investment_value = pd.Series(0, index=universe.columns)

        # DataFrame to store the performance of this cohort
        cohort_performance = pd.Series(index=universe.index)

        # Track the performance of this cohort for each subsequent month
        for end_month in range(start_month, len(universe)):
            current_month = universe.index[end_month]
            monthly_returns = universe[selected_stocks].loc[current_month]
            investment_value *= (1 + monthly_returns)
            cohort_performance[current_month] = investment_value.sum()

        # Forward fill the missing values
        cohort_performance.ffill(inplace=True)

        # Add this cohort's performance to the dictionary
        cohort_name = f'cohort_{previous_month.strftime("%Y-%m")}'
        cohort_dict[cohort_name] = cohort_performance

    # Concatenate all cohort Series to create a single DataFrame
    combined_cohorts = pd.concat(cohort_dict, axis=1)

    # Aggregate the performance of all cohorts
    combined_cohorts['Total_Value'] = combined_cohorts.sum(axis=1)
    combined_cohorts['Total_Value'].replace(0.0, np.nan, inplace=True)
    combined_cohorts['Total_Value'].iloc[0] = bet
    combined_cohorts['Total_Value'].ffill(axis=0, inplace=True)

    # Fill individual cohorts
    combined_cohorts.fillna(bet, axis=0, inplace=True)
    combined_cohorts.replace(0.0, np.nan, inplace=True)
    combined_cohorts.ffill(axis=0, inplace=True)

    # Convert indexes to proper timestamps
    combined_cohorts.index = combined_cohorts.index.to_timestamp()

    return combined_cohorts

### Performance visualization and auxilary functions

In [None]:
def calculate_cagr(series: pd.DataFrame) -> float:
    years = (series.index[-1] - series.index[0]).days / 365.25
    return (series.iloc[-1] / series.iloc[0]) ** (1 / years) - 1

def calculate_max_drawdown(series: pd.DataFrame) -> float:
    rolling_max = series.cummax()
    drawdown = (series - rolling_max) / rolling_max
    return drawdown.min()

def calculate_sr(series: pd.DataFrame, risk_free_rate: float = .01) -> float:
    return (series.pct_change().mean() - risk_free_rate) / series.pct_change().std()

def perf_sheet(strategy: pd.Series, benchmark: pd.Series):
    """
    Visualize performance of strategy vs. backtest. Display equity graph over time as well as some
    performance metrics.

    Parameters:
    strategy (pd.DataFrame):  Simulated strategy
    benchmark (pd.DataFrame): Simulated benchmark
    """

    # Extract the Total_Value series from both pd.DataFrame objects
    strategy_total_value = strategy['Total_Value']
    benchmark_total_value = benchmark['Total_Value']

    # Calculate risk adjusted performance metrics
    strategy_cagr = calculate_cagr(strategy_total_value)
    benchmark_cagr = calculate_cagr(benchmark_total_value)
    strategy_max_drawdown = calculate_max_drawdown(strategy_total_value)
    benchmark_max_drawdown = calculate_max_drawdown(benchmark_total_value)
    strategy_sr = calculate_sr(strategy_total_value)
    benchmark_sr = calculate_sr(benchmark_total_value)

    # PLot equity growth
    plt.figure(figsize=(14, 5))
    plt.plot(strategy_total_value, label = 'Strategy')
    plt.plot(benchmark_total_value, label = 'Benchmark')
    plt.title('Equity Growth Over Time')
    plt.xlabel('Date')
    plt.ylabel('Total Portfolio Value')
    plt.legend()
    plt.show()

    # Create a pd.DataFrame for performance metrics
    metrics_data = {
        'CAGR': [strategy_cagr, benchmark_cagr],
        'Max_DD': [strategy_max_drawdown, benchmark_max_drawdown],
        'SR': [strategy_sr, benchmark_sr]
    }
    metrics_df = pd.DataFrame(metrics_data, index = ['Strategy', 'Benchmark'])
    for col in metrics_df.columns:
        metrics_df[col] = metrics_df[col].map('{:.4f}'.format)

    print("Performance metrics:")
    print(metrics_df)

### UI

In [None]:
# Constants
tickers_list = np.empty(0, dtype=str)
returns = None
universe = None

# Define UI elements
universe_upload = widgets.FileUpload(
    accept='.txt',
    description='Click to upload list of tickers',
    layout=widgets.Layout(height='auto', width='auto'),
    multiple=False
)

fmp_key = widgets.Text(
    value='',
    placeholder='Insert FMP key here',
    description='FMP Key:',
    disabled=False
)

inp_dca_amount = widgets.BoundedFloatText(
    value=100.0,
    min=1.0,
    max=1000000.0,
    step=1.0,
    description='$ :',
    disabled=False
)

k_param = widgets.BoundedIntText(
    value=5,
    min=1,
    max=1000,
    step=1,
    description='K:',
    disabled=False
)

sel_ranking_method = widgets.Dropdown(
    options=['losers', 'conditional_losers'],
    value='losers',
    description='Ranking Method:',
    disabled=False,
)

threshold_param = widgets.BoundedFloatText(
    value=1.0,
    min=0.01,
    max=99.9,
    step=0.1,
    description='Loss threshold:',
    disabled=False
)

message_label = widgets.Label(value='', layout=widgets.Layout(height='auto', width='auto'))

button = widgets.Button(
    description='Run Simulation',
    disabled=False,
    button_style='',
    tooltip='Run Simulation',
    icon='check'
)

# Ipywidgets outputs
perf_output = widgets.Output()
df_output = widgets.Output()
down_output = widgets.Output()

# Button click function
def on_button_clicked(b):
  # setup
  global tickers_list
  global universe
  global returns
  strategy = None

  with df_output:
    clear_output()
  with perf_output:
    clear_output()
  with down_output:
    clear_output()

  # read uploaded ticker list
  files = universe_upload.value
  uploaded_file = files[list(files.keys())[0]]
  raw_tickers = codecs.decode(uploaded_file['content'], encoding="utf-8")
  tickers = np.array([ticker.strip() for ticker in raw_tickers.splitlines()])
  message_label.value = f'Running with universe of {str(len(tickers))} tickers'
  reload = False

  if len(tickers_list) != len(tickers) or not np.all(tickers_list == tickers):
    reload = True
    tickers_list = tickers

  # read UI parameters
  api_key = fmp_key.value
  dca_amount = float(inp_dca_amount.value)
  K = int(k_param.value)
  loss_threshold = float(threshold_param.value)/100
  ranking_method = sel_ranking_method.value

  # establish universe
  if reload:
    universe = Toolkit(
        tickers=tickers_list.tolist(),
        api_key=api_key,
    )

    with perf_output:
      print("Downloading historical data. This might take a while...")
    universe = universe.get_historical_data(progress_bar = False, period='monthly')
    returns = universe['Return']
    returns = returns.drop(columns=['Benchmark'])

  # run simulation
  with perf_output:
    print("Performing strategy backtest. This might take a while...")
  strategy = simulate(returns, K = K, bet = dca_amount, ranking_method = ranking_method, loss_threshold = loss_threshold)

  with perf_output:
    print("Creating benchmark. This might take a while...")
  benchmark = simulate(returns, K = K, bet = dca_amount, ranking_method = 'benchmark')

  # visualize everything
  with perf_output:
    perf_sheet(strategy, benchmark)
  with df_output:
    display(strategy, clear=True)
  with down_output:
    filename = 'strategy.csv'
    f = StringIO()
    strategy.to_csv(f)
    b64 = base64.b64encode(f.getvalue().encode('utf-8'))
    payload = b64.decode()
    html = f'<a download="{filename}" href="data:text/csv;base64,{payload}" download>Download results</a>'
    display(widgets.HTML(html), clear=True)


button.on_click(lambda b: on_button_clicked(b))

# Strucure the UI
children = [perf_output, df_output, down_output]
tab = widgets.Tab()
tab.children = children
tab.set_title(0, 'Equity Graph')
tab.set_title(1, 'Cohorts and Strategy')
tab.set_title(2, 'Download results')

# Display the UI
widgets.VBox([universe_upload,
              widgets.HBox([fmp_key, inp_dca_amount, k_param]),
              widgets.HBox([sel_ranking_method, threshold_param]),
              button,
              message_label,
              tab])

VBox(children=(FileUpload(value={}, accept='.txt', description='Click to upload list of tickers', layout=Layou…