In [131]:
import yfinance as yf
import pandas as pd
import plotly.graph_objs as go

In [262]:
def fetch_data(ticker, start_date='2021-01-01', end_date='2024-01-01'):
    """Fetch historical data for the given ticker."""
    return yf.download(ticker, start=start_date, end=end_date)


def ewmac(price, vol, Lfast, Lslow):
    """
    Calculate the ewmac trading rule forecast, given a price, volatility and EWMA speeds Lfast and Lslow

    Assumes that 'price' and vol is daily data

    This version uses a precalculated price volatility, and does not do capping or scaling

    :param price: The price or other series to use (assumed Tx1)
    :type price: pd.Series

    :param vol: The daily price unit volatility (NOT % vol)
    :type vol: pd.Series aligned to price

    :param Lfast: Lookback for fast in days
    :type Lfast: int

    :param Lslow: Lookback for slow in days
    :type Lslow: int

    :returns: pd.Series -- unscaled, uncapped forecast
    """
    fast_ewma = price.ewm(span=Lfast, min_periods=1).mean()
    slow_ewma = price.ewm(span=Lslow, min_periods=1).mean()
    raw_ewmac = fast_ewma - slow_ewma
    return raw_ewmac / vol.ffill()


def calculate_ewmac_signals(price, volatility, parameters):
    """
    Calculate EWMAC signals for a given price series, volatility, and a list of parameters.

    :param price: The price series
    :type price: pd.Series

    :param volatility: The volatility series
    :type volatility: pd.Series

    :param parameters: List of tuples containing fast and slow parameters
    :type parameters: list of tuples

    :returns: pd.DataFrame -- DataFrame containing EWMAC signals for each parameter pair
    """
    signals = pd.DataFrame(index=price.index)
    for i, (Lfast, Lslow) in enumerate(parameters):
        signals[f'EWMAC_{Lfast}'] = ewmac(price, volatility, Lfast, Lslow)
    return signals


def scale_cap_forecast(signals, cap=20):
    """
    Scale the forecast so it has an expected absolute average value of 10.

    :param signals: EWMAC signals.
    :return: Scaled signals.
    """
    signals = signals * 10 / signals.abs().mean()
    return signals.clip(-cap, cap)

def calculate_position_size(capped_signals, risk_target, volatility, price, capital, instrument_weights, fx_rate, IDM):
    """
    Calculate the position size based on capped signals, risk target, and other parameters.

    :param capped_signals: DataFrame containing capped EWMAC signals.
    :param risk_target: Annualized risk target.
    :param volatility: Current volatility of the instrument.
    :param price: Current price of the instrument.
    :param capital: Total capital available for trading.
    :param instrument_weights: Weights of instruments in the portfolio.
    :param fx_rate: Exchange rate if applicable.
    :param IDM: Instrument Diversification Multiplier.
    :return: Position size.
    """
    # Calculate the average of capped signals
    average_signal = capped_signals.mean(axis=1)
    
    # Calculate risk-adjusted unrounded position
    position = average_signal * (risk_target / volatility) * (capital * instrument_weights * fx_rate) / (price * IDM)
    
    return position

def plot_position_size(data, position_size):
    """
    Plot position size against underlying price with buy/sell signals.

    :param data: DataFrame containing price data.
    :type data: pd.DataFrame

    :param position_size: Position size data.
    :type position_size: pd.Series
    """
    # Calculate the range for the position size and the underlying price
    position_size_range = position_size.max() - position_size.min()
    price_range = data['Close'].max() - data['Close'].min()

    # Determine the scaling factor for the position size
    position_size_scaling_factor = price_range / position_size_range

    # Plot position size against underlying price
    fig = go.Figure()

    # Add underlying price trace
    fig.add_trace(go.Scatter(x=data.index, y=data['Close'], mode='lines', name='Underlying Price'))

    # Add position size trace
    fig.add_trace(go.Scatter(x=position_size.index, y=position_size * position_size_scaling_factor, mode='lines', name='Position Size'))

    # Add buy and sell signals
    buys = position_size[position_size > 0]
    sells = position_size[position_size < 0]
    fig.add_trace(go.Scatter(x=buys.index, y=data.loc[buys.index]['Close'], mode='markers', name='Buy Signal', marker=dict(color='green', size=8)))
    fig.add_trace(go.Scatter(x=sells.index, y=data.loc[sells.index]['Close'], mode='markers', name='Sell Signal', marker=dict(color='red', size=8)))

    # Update layout
    fig.update_layout(
        title='Position Size vs Underlying Price with Buy/Sell Signals',
        xaxis_title='Date',
        yaxis_title='Price/Position Size',
        yaxis=dict(tickformat=".2f"),
        yaxis2=dict(
            title='Position Size',
            overlaying='y',
            side='right',
            tickformat=".2f"
        ),
        margin=dict(l=0, r=0, t=50, b=0),
    )

    # Show plot
    fig.show()

def calculate_backtest_metrics(position_size, price_data, capital):
    """
    Calculate various backtest metrics based on position size and price data.

    :param position_size: Series containing position size data.
    :param price_data: Series containing price data.

    :return: Dictionary containing backtest metrics.
    """
    # Join the position size and price data on their date index
    data = pd.concat([position_size, price_data], axis=1, join="inner")
    
    # Drop any rows with missing values
    data = data.dropna()

    # Calculate P&L (Profit and Loss)
    pnl = position_size * data['Close'].pct_change()
    
    # Calculate cumulative P&L
    cumulative_pnl = pnl.cumsum()

    # Calculate Sharpe ratio
    sharpe_ratio = pnl.mean() / pnl.std() * (252 ** 0.5)

    # Calculate annualized return
    annualized_return = pnl.mean() * 252

    annualized_return_pct = annualized_return*100/capital

    # Additional metrics
    drawdowns = (cumulative_pnl - cumulative_pnl.expanding().max())
    avg_drawdown = drawdowns.mean()

    # Calculate average position size
    avg_position = position_size.abs().mean()
    turnover = (position_size.diff().abs() / avg_position).sum()
    skew = pnl.skew()
    lower_tail = pnl.quantile(0.05)
    upper_tail = pnl.quantile(0.95)
    alpha = (cumulative_pnl.iloc[-1] / cumulative_pnl.std()) * (252 ** 0.5)
    
    return {
        "Initial Capital": capital,
        "Mean Annualised Return  %": annualized_return_pct,
        "Avg Drawdown %": avg_drawdown,
        "Standard Deviation %": pnl.std(),
        "Sharpe Ratio": sharpe_ratio,
        "Turnover": turnover,
        "Skew": skew,
        "Lower Tail (5th percentile)": lower_tail,
        "Upper Tail (95th percentile)": upper_tail,
        "Alpha": alpha,
        "Cumulative PnL": cumulative_pnl[-1]
    }
    
def plot_position_size(position_size, price):
    """
    Plot position size against underlying price with vertical lines indicating periods of long and short positions.

    :param position_size: Position size data.
    :param price: Underlying price data.
    """
    fig = go.Figure()

    # Plot underlying price
    fig.add_trace(go.Scatter(x=price.index, y=price, mode='lines', name='Price', line=dict(color='blue')))

    # Create a separate y-axis for position size
    fig.add_trace(go.Scatter(x=position_size.index, y=position_size, mode='lines', name='Position Size', line=dict(color='red'), yaxis='y2'))

    # Add fill areas for long and short positions
    fig.add_trace(go.Scatter(x=position_size.index, y=position_size * (position_size > 0), mode='lines', fill='tozeroy', name='Long Position', fillcolor='rgba(0,255,0,0.3)', line=dict(width=0), hoverinfo='skip', yaxis='y2'))
    fig.add_trace(go.Scatter(x=position_size.index, y=position_size * (position_size < 0), mode='lines', fill='tozeroy', name='Short Position', fillcolor='rgba(255,0,0,0.3)', line=dict(width=0), hoverinfo='skip', yaxis='y2'))

    # Add vertical lines for direction change
    for i in range(1, len(position_size)):
        if position_size[i] != 0 and position_size[i - 1] == 0:
            fig.add_shape(type="line", xref="x", yref="paper", x0=position_size.index[i], x1=position_size.index[i], y0=0, y1=1, line=dict(color="green" if position_size[i] > 0 else "red", width=1), opacity=0.1)

    # Set layout
    fig.update_layout(title='Position Size vs Underlying Price with Direction Change Indicators', xaxis_title='Date')

    # Ensure both y-axes share y=0
    fig.update_layout(yaxis=dict(title='Price', color='blue'),
                      yaxis2=dict(title='Position Size', color='red', overlaying='y', side='right'))
    
    fig.show()

def plot_position_size_vs_pnl(position_size, price):
    """
    Plot position size against PNL with vertical lines indicating periods of long and short positions.

    :param position_size: Position size data.
    :param pnl: Profit and Loss data.
    """
    pnl = (position_size * price.pct_change()).cumsum()
    fig = go.Figure()

    # Plot PNL
    fig.add_trace(go.Scatter(x=pnl.index, y=pnl, mode='lines', name='PNL', line=dict(color='blue')))

    # Create a separate y-axis for position size
    fig.add_trace(go.Scatter(x=position_size.index, y=position_size, mode='lines', name='Position Size', line=dict(color='red'), yaxis='y2'))

    # Add fill areas for long and short positions
    fig.add_trace(go.Scatter(x=position_size.index, y=position_size * (position_size > 0), mode='lines', fill='tozeroy', name='Long Position', fillcolor='rgba(0,255,0,0.3)', line=dict(width=0), hoverinfo='skip', yaxis='y2'))
    fig.add_trace(go.Scatter(x=position_size.index, y=position_size * (position_size < 0), mode='lines', fill='tozeroy', name='Short Position', fillcolor='rgba(255,0,0,0.3)', line=dict(width=0), hoverinfo='skip', yaxis='y2'))

    # Add vertical lines for direction change
    for i in range(1, len(position_size)):
        if position_size[i] != 0 and position_size[i - 1] == 0:
            fig.add_shape(type="line", xref="x", yref="paper", x0=position_size.index[i], x1=position_size.index[i], y0=0, y1=1, line=dict(color="green" if position_size[i] > 0 else "red", width=1), opacity=0.1)

    # Set layout
    fig.update_layout(title='Position Size vs PNL with Direction Change Indicators', xaxis_title='Date')

    # Ensure both y-axes share y=0
    fig.update_layout(yaxis=dict(title='PNL', color='blue'),
                      yaxis2=dict(title='Position Size', color='red', overlaying='y', side='right'))
    
    fig.show()


In [263]:
def backtest(ticker = 'BZ=F', start_date = '2021-01-01', end_date = '2024-01-01', parameters = [(8, 32), (16, 64), (32, 128)],
             risk_target = 0.25, capital = 10000, plot=True):

    instrument_weights = 1 # Example instrument weights
    fx_rate = 1.0  # Example exchange rate if applicable
    idm = 2  # Example Instrument Diversification Multiplier

    # Fetch data
    data = fetch_data(ticker, start_date, end_date)

    # Calculate daily returns
    data['Daily_Returns'] = data['Close'].pct_change()

    # Calculate volatility
    data['Volatility'] = data['Daily_Returns'].ewm(span=30, min_periods=0).std()

    # Calculate EWMAC signals
    signals = calculate_ewmac_signals(data['Close'], data['Volatility'], parameters)

    # Scale and cap the signals
    scaled_signals = scale_cap_forecast(signals)

    # Calculate position size
    position_size = calculate_position_size(scaled_signals, risk_target, data['Volatility'], data['Close'], capital, instrument_weights, fx_rate, idm)

    backtest_metrics = calculate_backtest_metrics(position_size, data['Close'], capital)
    
    # Print the calculated metrics
    for metric, value in backtest_metrics.items():
        print(f"{metric}: {round(value,2)}")

    if plot==True:
        plot_position_size(position_size, data['Close'])
        plot_position_size_vs_pnl(position_size, data['Close'])

In [264]:
backtest()

[*********************100%***********************]  1 of 1 completed
Initial Capital: 10000
Mean Annualised Return  %: 26.37
Avg Drawdown %: -804.54
Standard Deviation %: 149.29
Sharpe Ratio: 1.11
Turnover: 75.51
Skew: -0.03
Lower Tail (5th percentile): -237.4
Upper Tail (95th percentile): 269.63
Alpha: 56.29
Cumulative PnL: 7849.05


In [258]:
import plotly.graph_objects as go

def plot_pnl_subplots(pnl):
    """
    Plot pnl and cumulative pnl in separate subplots.

    :param pnl: PnL data.
    :param cumulative_pnl: Cumulative PnL data.
    """
    cumulative_pnl=pnl.cumsum()
    # Create figure with subplots
    fig = go.Figure()

    # Add trace for PnL
    fig.add_trace(go.Scatter(x=pnl.index, y=pnl, mode='lines', name='PnL', line=dict(color='blue')))

    # Add trace for Cumulative PnL
    fig.add_trace(go.Scatter(x=cumulative_pnl.index, y=cumulative_pnl, mode='lines', name='Cumulative PnL', line=dict(color='red')))

    # Update layout for subplot
    fig.update_layout(title='PnL and Cumulative PnL Subplots',
                      xaxis_title='Date',
                      yaxis_title='PnL',
                      yaxis=dict(title='PnL', color='blue'),
                      yaxis2=dict(title='Cumulative PnL', overlaying='y', side='right', color='red'))

    # Show plot
    fig.show()

# Call the function with your pnl and cumulative pnl data
plot_pnl_subplots(pnl)
