In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
# Calculate moving averages
def moving_average(data, window):
    return data['Open'].rolling(window=window).mean()

In [3]:
# Calculate momentum
def momentum(data, window):
    return data['Open'].diff(window - 1)

In [4]:
def calculate_rsi(data, window=14):
    delta = data['Close'].diff()

    gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()

    rs = gain / loss

    rsi = 100 - (100 / (1 + rs))

    return rsi

In [5]:
def adx(data, window=14):

    # Calculate True Range
    data['tr0'] = abs(data['High'] - data['Low'])
    data['tr1'] = abs(data['High'] - data['Close'].shift())
    data['tr2'] = abs(data['Low'] - data['Close'].shift())
    data['TR'] = data[['tr0', 'tr1', 'tr2']].max(axis=1)

    # Calculate Directional Movement
    data['DMplus'] = np.where((data['High'] - data['High'].shift()) > (data['Low'].shift() - data['Low']), data['High'] - data['High'].shift(), 0)
    data['DMminus'] = np.where((data['Low'].shift() - data['Low']) > (data['High'] - data['High'].shift()), data['Low'].shift() - data['Low'], 0)

    # Calculate Smoothed True Range
    data['ATR'] = data['TR'].rolling(window=window).mean()

    # Calculate Smoothed Directional Movement
    data['ADMplus'] = data['DMplus'].rolling(window=window).mean()
    data['ADMminus'] = data['DMminus'].rolling(window=window).mean()

    # Calculate Directional Index
    data['DIplus'] = (data['ADMplus'] / data['ATR']) * 100
    data['DIminus'] = (data['ADMminus'] / data['ATR']) * 100

    data['DX'] = abs(data['DIplus'] - data['DIminus']) / (data['DIplus'] + data['DIminus']) * 100

    adx_values = data['DX'].rolling(window=window).mean()

    return (adx_values, data['DIplus'], data['DIminus'])

In [6]:
# Trading strategy combining moving average crossover, momentum, and VWAP
def trading_strategy(data, rsi_window=None, overbought_threshold=None, oversold_threshold=None, adx_window=None, adx_threshold=None, short_window=None, long_window=None, momentum_window=None):
    signals = pd.DataFrame(index=data.index)
    signals['Date'] = pd.to_datetime(data['Date'])
    signals['signal'] = 0.0

    # Moving average crossover
    if short_window:
        signals['short_mavg'] = moving_average(data, short_window)
        signals['long_mavg'] = moving_average(data, long_window)
        signals['signal'] = 0.0
        # Place buy order when short moving average crosses above long moving average
        signals.loc[(signals['short_mavg'].shift(1) < signals['long_mavg'].shift(1)) & (signals['short_mavg'] > signals['long_mavg']), 'signal'] = 1.0

        # Place sell order when short moving average crosses below long moving average
        signals.loc[(signals['short_mavg'].shift(1) > signals['long_mavg'].shift(1)) & (signals['short_mavg'] < signals['long_mavg']), 'signal'] = -1.0

    # Momentum
    if momentum_window:
        signals['momentum'] = momentum(data, momentum_window)
        signals.loc[(signals['momentum'] > 0), 'signal'] = 1.0
        signals.loc[(signals['momentum'] < 0), 'signal'] = -1.0
        
        
    if rsi_window:
        rsi_values = calculate_rsi(data, window=rsi_window)
        signals['rsi'] = rsi_values
        signals.loc[signals['rsi'] > overbought_threshold, 'signal'] = -1.0
        signals.loc[signals['rsi'] < oversold_threshold, 'signal'] = 1.0

    if adx_window:
        #### if +DI crosses over -DI create a buy signal and vice-a-versa
        adx_values = adx(data, window=adx_window)
        signals['adx'] = adx_values[0]
        signals['+DI'] = adx_values[1]
        signals['-DI'] = adx_values[2]
        signals.loc[(signals['+DI'].shift(1) < signals['-DI'].shift(1)) & (signals['+DI'] > signals['-DI']) & (signals['adx'] > adx_threshold), 'signal'] = 1.0
    
        # Place sell signal when -DI crosses over +DI and ADX is above the threshold
        signals.loc[(signals['+DI'].shift(1) > signals['-DI'].shift(1)) & (signals['+DI'] < signals['-DI']) & (signals['adx'] > adx_threshold), 'signal'] = -1.0
        
        
    return signals

In [60]:
df = pd.read_csv("MSFT_hist.csv")
df = df[df['Date'] >= '2015-01-01']

In [61]:
signals = trading_strategy(df, short_window=10, long_window=20)
# signals = trading_strategy(df, adx_window=14, adx_threshold=25)
# signals = trading_strategy(df, rsi_window=14, overbought_threshold=80, oversold_threshold=40)
# signals = trading_strategy(df, momentum_window=5)
signals.to_csv("MSFT_signal.csv")
signals = trading_strategy(df, momentum_window=5)
signals.to_csv("MSFT__momentum_signal.csv")
signals = trading_strategy(df, short_window=10, long_window=30, momentum_window=5, adx_window=14, adx_threshold=25, rsi_window=14, overbought_threshold=80, oversold_threshold=40)
signals.to_csv("MSFT_complete_signal.csv")

In [66]:
global sl, tp, buy, sell, amount, pnl
sl = 0
tp = 0
buy = False
sell = False
pnl = 0
amount = 0

def backtest_strategy(data, initial_cash, position_size, transaction_cost, stop_loss, take_profit, **kwargs):
    signals = trading_strategy(data, **kwargs)

    # Initialize portfolio
    portfolio = pd.DataFrame(index=signals.index)
    portfolio['Cash'] = initial_cash
    portfolio['Position'] = 0.0
    portfolio['pnl'] = 0
    portfolio['g_sl'] = 0
    portfolio['g_tp'] = 0
    portfolio['close'] = 0
    
    cash_balance = initial_cash  # Initialize cash balance
    
    for index, row in signals.iterrows():
        global sl
        global tp
        global buy
        global sell
        global amount
        global pnl
        signal = row['signal']
        price = data.loc[index, 'Close']
        stop_loss_price = price - stop_loss
        take_profit_price = price + take_profit
        
        # Calculate transaction cost
        cost = abs(signal - portfolio['Position'].iloc[-1]) * price * position_size * transaction_cost
        
        if signal == 1.0 and cash_balance >= price * position_size and tp == 0:
            # Buy
            portfolio.loc[index, 'Position'] = position_size
            cash_balance -= price * position_size + cost
            tp = take_profit_price
            sl = stop_loss_price
            amount = price * position_size
            buy = True
        elif signal == -1.0 and cash_balance >= price * position_size and tp == 0:
            # Sell
            portfolio.loc[index, 'Position'] = -position_size
            cash_balance -= price * position_size - cost
            tp = stop_loss_price
            sl = take_profit_price
            amount = price * position_size
            sell = True
        
        if price >= tp and buy:
            cash_balance += amount + (take_profit * position_size)
            tp = 0
            sl = 0
            buy = False
            pnl += (take_profit * position_size)
        if price <= sl and buy:
            cash_balance += amount - (stop_loss * position_size)
            tp = 0
            sl = 0
            buy = False
            pnl -= (stop_loss * position_size)
        if price <= tp and sell:
            cash_balance += amount + (take_profit * position_size)
            tp = 0
            sl = 0
            sell = False
            pnl += (take_profit * position_size)
        if price >= sl and sell:
            cash_balance += amount - (stop_loss * position_size)
            tp = 0
            sl = 0
            sell = False
            pnl -= (stop_loss * position_size)
        
        if index == signals.index[-1] and (signal == 1.0 or signal == -1.0):
            cash_balance += price * position_size
        
        # Update portfolio value
        portfolio.loc[index, 'Cash'] = int(cash_balance)
        portfolio.loc[index, 'pnl'] = int(pnl)
        portfolio.loc[index, 'g_sl'] = int(sl)
        portfolio.loc[index, 'g_tp'] = int(tp)
        portfolio.loc[index, 'close'] = int(price)

        
    print("Starting Balance:",portfolio.iloc[0]['Cash'],", Final Balance:",portfolio.iloc[-1]['Cash'],", total Profit/Loss:",portfolio.iloc[-1]['pnl'])
    return portfolio

# Example usage
portfolio = backtest_strategy(df, initial_cash=10000, position_size=1, transaction_cost=0, momentum_window=5, stop_loss=4, take_profit=4)
portfolio.to_csv("MSFT_portfolio.csv")

Starting Balance: 10000.0 , Final Balance: 10052.0 , total Profit/Loss: 52.0


In [None]:
# Plotting the graph
plt.figure(figsize=(26, 12))
plt.plot(signals['Date'].values, df['Close'].values, label='Open')
plt.plot(signals['Date'].values, signals['short_mavg'].values, label='10-day Moving Average')
plt.plot(signals['Date'].values, signals['long_mavg'].values, label='50-day Moving Average')
plt.title('Stock Open Price and Moving Averages')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.show()

In [None]:
# Plotting the graph
plt.figure(figsize=(26, 12))
plt.plot(signals['Date'].values, df['Close'].values, label='Open')
plt.plot(signals['Date'].values, signals['rsi'].values, label='rsi')
plt.axhline(y=80, color='r', linestyle='--', label='Overbought')
plt.axhline(y=40, color='r', linestyle='--', label='Oversold')
plt.title('Stock Open Price and Moving Averages')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(14, 7))
plt.plot(signals['Date'].values, signals['+DI'].values, label='+DI')
plt.plot(signals['Date'].values, signals['-DI'].values, label='-DI')
plt.plot(signals['Date'].values, signals['adx'].values, label='rsi')
plt.axhline(y=25, color='r', linestyle='--', label='Threshold')
plt.xlabel('Date')
plt.ylabel('Price')
plt.title('Price and ADX, +DI, -DI')
plt.legend()
plt.show()