# Algorithmic Trading: From Technical Analysis to a PyTorch Neural Network (Refined)

This notebook walks through a complete workflow for developing and evaluating algorithmic trading strategies. This version includes **refined strategy logic** to ensure more realistic trade signal generation and a **more robust backtesting engine** for accurate performance metrics.

**Workflow:**
1.  **Refined Strategy Definition:** Create 5 complex, rule-based trading strategies with improved logic.
2.  **Data Fetching:** Download historical data for the top 10 NSE stocks.
3.  **Backtesting & Comparison:** Systematically backtest the 5 strategies using a state-based backtester and generate performance comparison tables.
4.  **Visual Analysis:** Plot strategy signals on a price chart to visually inspect performance.
5.  **Final DNN Model:** Train a robust DNN on combined data from multiple stocks and save it for future predictions.

## Step 1: Setup and Installations

In [None]:
!pip install pandas_ta tvdatafeed scikit-learn -q

In [None]:
import pandas as pd
import pandas_ta as ta
import numpy as np
import matplotlib.pyplot as plt
from tvDatafeed import TvDatafeed, Interval
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score
import joblib # For saving the scaler

# Suppress pandas warnings
pd.options.mode.chained_assignment = None

## Step 2: Refined Rule-Based Strategy Definitions

These functions have been refined to generate more consistent signals. Logic for S3 (Bollinger Squeeze) and S5 (ADX Trend) has been relaxed to be more adaptive.

In [None]:
def add_s1_triple_threat(df):
    """S1: Triple Threat (Supertrend, EMA, MACD)"""
    df_strat = df.copy()
    df_strat.ta.supertrend(length=10, multiplier=3, append=True, col_names=('SUPERT', 'SUPERTd', 'SUPERTl', 'SUPERTs'))
    df_strat.ta.ema(length=50, append=True, col_names=('EMA_50'))
    df_strat.ta.macd(fast=12, slow=26, signal=9, append=True)
    long_cond = (df_strat['SUPERTd'] == 1) & (df_strat['close'] > df_strat['EMA_50']) & (df_strat['MACD_12_26_9'] > df_strat['MACDs_12_26_9']) & (df_strat['MACD_12_26_9'].shift(1) <= df_strat['MACDs_12_26_9'].shift(1))
    short_cond = (df_strat['SUPERTd'] == -1) & (df_strat['close'] < df_strat['EMA_50']) & (df_strat['MACD_12_26_9'] < df_strat['MACDs_12_26_9']) & (df_strat['MACD_12_26_9'].shift(1) >= df_strat['MACDs_12_26_9'].shift(1))
    df_strat['S1_Signal'] = np.select([long_cond, short_cond], [1, -1], default=0)
    return df_strat

def add_s2_rsi_peak(df):
    """S2: RSI Peak/Trough with MACD Filter"""
    df_strat = df.copy()
    df_strat.ta.rsi(length=14, append=True, col_names=('RSI_14'))
    df_strat.ta.macd(fast=12, slow=26, signal=9, append=True)
    long_cond = (df_strat['MACDh_12_26_9'] > 0) & (df_strat['RSI_14'] < 35) & (df_strat['RSI_14'].shift(1) >= 35)
    short_cond = (df_strat['MACDh_12_26_9'] < 0) & (df_strat['RSI_14'] > 65) & (df_strat['RSI_14'].shift(1) <= 65)
    df_strat['S2_Signal'] = np.select([long_cond, short_cond], [1, -1], default=0)
    return df_strat

def add_s3_bollinger_squeeze(df):
    """S3: Bollinger Band Squeeze Breakout (Refined)"""
    df_strat = df.copy()
    bbands = df_strat.ta.bbands(length=20, std=2)
    if bbands is not None and not bbands.empty:
      df_strat = df_strat.join(bbands)
      if 'BBB_20_2.0' in df_strat.columns:
        # REFINED LOGIC: Squeeze is when width is below its moving average
        squeeze_cond = df_strat['BBB_20_2.0'] < df_strat['BBB_20_2.0'].rolling(window=120).mean()
        volume_cond = df_strat['volume'] > (df_strat['volume'].rolling(window=20).mean() * 1.5)
        long_cond = squeeze_cond & (df_strat['close'] > df_strat['BBU_20_2.0']) & volume_cond
        short_cond = squeeze_cond & (df_strat['close'] < df_strat['BBL_20_2.0']) & volume_cond
        df_strat['S3_Signal'] = np.select([long_cond, short_cond], [1, -1], default=0)
      else:
        df_strat['S3_Signal'] = 0
    else:
      df_strat['S3_Signal'] = 0
    return df_strat

def add_s4_ema_stoch(df):
    """S4: Multi-Timeframe EMA with Stochastic Entry"""
    df_strat = df.copy()
    df_strat.ta.ema(length=20, append=True, col_names=('EMA_20'))
    df_strat.ta.ema(length=50, append=True, col_names=('EMA_50'))
    df_strat.ta.ema(length=100, append=True, col_names=('EMA_100'))
    stoch = df_strat.ta.stoch(k=14, d=3, smooth_k=3)
    if stoch is not None and not stoch.empty:
        df_strat = df_strat.join(stoch)
        if 'STOCHk_14_3_3' in df_strat.columns:
          long_trend = (df_strat['EMA_20'] > df_strat['EMA_50']) & (df_strat['EMA_50'] > df_strat['EMA_100'])
          short_trend = (df_strat['EMA_20'] < df_strat['EMA_50']) & (df_strat['EMA_50'] < df_strat['EMA_100'])
          long_entry = (df_strat['STOCHk_14_3_3'] < 20) & (df_strat['STOCHk_14_3_3'] > df_strat['STOCHd_14_3_3']) & (df_strat['STOCHk_14_3_3'].shift(1) <= df_strat['STOCHd_14_3_3'].shift(1))
          short_entry = (df_strat['STOCHk_14_3_3'] > 80) & (df_strat['STOCHk_14_3_3'] < df_strat['STOCHd_14_3_3']) & (df_strat['STOCHk_14_3_3'].shift(1) >= df_strat['STOCHd_14_3_3'].shift(1))
          df_strat['S4_Signal'] = np.select([long_trend & long_entry, short_trend & short_entry], [1, -1], default=0)
        else:
          df_strat['S4_Signal'] = 0
    else:
        df_strat['S4_Signal'] = 0
    return df_strat

def add_s5_adx_trend(df):
    """S5: ADX + Keltner Channel Trend Strategy (Refined)"""
    df_strat = df.copy()
    df_strat.ta.adx(length=14, append=True)
    kc = df_strat.ta.kc(length=20, scalar=2)
    if kc is not None and not kc.empty:
        df_strat = df_strat.join(kc)
        if 'ADX_14' in df_strat.columns and 'KCu_20_2' in df_strat.columns:
          # REFINED LOGIC: Trigger on strong OR developing trends
          strong_trend = (df_strat['ADX_14'] > 25) | ((df_strat['ADX_14'] > 20) & (df_strat['ADX_14'] > df_strat['ADX_14'].shift(1)))
          long_cond = strong_trend & (df_strat['close'] > df_strat['KCu_20_2']) & (df_strat['close'].shift(1) <= df_strat['KCu_20_2'].shift(1))
          short_cond = strong_trend & (df_strat['close'] < df_strat['KCl_20_2']) & (df_strat['close'].shift(1) >= df_strat['KCl_20_2'].shift(1))
          df_strat['S5_Signal'] = np.select([long_cond, short_cond], [1, -1], default=0)
        else:
          df_strat['S5_Signal'] = 0
    else:
        df_strat['S5_Signal'] = 0
    return df_strat

## Step 3: Data Fetching

In [None]:
tv = TvDatafeed()
top_stocks_nse = [
    "RELIANCE", "TCS", "HDFCBANK", "ICICIBANK", "INFY", 
    "HINDUNILVR", "SBIN", "BHARTIARTL", "ITC", "LT"
]
stock_dataframes = {}

print("Fetching 500 days of daily data for top NSE stocks...")
for stock_symbol in top_stocks_nse:
    try:
        data = tv.get_hist(
            symbol=stock_symbol, exchange="NSE", 
            interval=Interval.in_daily, n_bars=500
        )
        stock_dataframes[stock_symbol] = data
        print(f"Successfully fetched data for {stock_symbol}.")
    except Exception as e:
        print(f"Could not fetch data for {stock_symbol}: {e}")

print("\nData fetching complete.")

## Step 4: Backtesting and Comparing Rule-Based Strategies

This section uses the **refined backtesting loop** to ensure accurate trade counts and performance metrics.

In [None]:
def get_performance_metrics(trades_df, stock_symbol, strat_name, risk_free_rate=0.07):
    """Calculates performance metrics and returns them as a dictionary."""
    if trades_df.empty or len(trades_df) < 2:
        return {'Stock': stock_symbol, 'Strategy': strat_name, 'Sharpe Ratio': np.nan, 'Avg Return (%)': np.nan, 'Strike Rate (%)': np.nan, 'Num Trades': 0}

    num_trades = len(trades_df)
    avg_holding_period = trades_df['holding_days'].mean()
    profitable_trades = trades_df[trades_df['return'] > 0]
    strike_rate = (len(profitable_trades) / num_trades) * 100
    avg_return_pct = trades_df['return'].mean()

    trade_returns = trades_df['return']
    sharpe_ratio = np.nan
    if trade_returns.std() > 0 and avg_holding_period > 0:
        annualization_factor = np.sqrt(252 / avg_holding_period)
        avg_excess_return = trade_returns.mean() * (252 / avg_holding_period) - risk_free_rate
        annualized_volatility = trade_returns.std() * annualization_factor
        if annualized_volatility != 0:
            sharpe_ratio = avg_excess_return / annualized_volatility

    return {
        'Stock': stock_symbol,
        'Strategy': strat_name,
        'Sharpe Ratio': sharpe_ratio,
        'Avg Return (%)': avg_return_pct * 100,
        'Strike Rate (%)': strike_rate,
        'Num Trades': num_trades
    }

print("Backtesting all rule-based strategies...")
all_results = []
strategies = {
    "S1_TripleThreat": add_s1_triple_threat,
    "S2_RsiPeak": add_s2_rsi_peak,
    "S3_BollingerSqueeze": add_s3_bollinger_squeeze,
    "S4_EmaStoch": add_s4_ema_stoch,
    "S5_AdxTrend": add_s5_adx_trend
}

dataframes_with_signals = {}

for stock_symbol, df in stock_dataframes.items():
    df_with_all_signals = df.copy()
    for strat_name, strat_func in strategies.items():
        df_with_signals = strat_func(df)
        signal_col_name = strat_name.split('_')[0] + '_Signal'
        df_with_all_signals[signal_col_name] = df_with_signals[signal_col_name]
        
        # --- REFINED STATE-BASED BACKTESTING LOOP ---
        trade_log = []
        position = 0 # 0: flat, 1: long, -1: short
        entry_price = 0
        entry_date = None

        for date, row in df_with_signals.iterrows():
            signal = row[signal_col_name]
            
            # Case 1: In a LONG position, check for exit
            if position == 1 and signal == -1:
                exit_price = row['close']
                trade_return = (exit_price - entry_price) / entry_price
                holding_days = (date - entry_date).days
                if holding_days > 0: trade_log.append({'return': trade_return, 'holding_days': holding_days})
                position = 0 # Exit position
            
            # Case 2: In a SHORT position, check for exit
            elif position == -1 and signal == 1:
                exit_price = row['close']
                trade_return = ((entry_price - exit_price) / entry_price)
                holding_days = (date - entry_date).days
                if holding_days > 0: trade_log.append({'return': trade_return, 'holding_days': holding_days})
                position = 0 # Exit position
            
            # Case 3: NOT in a position, check for entry
            if position == 0 and signal != 0:
                position = signal
                entry_price = row['close']
                entry_date = date

        trades_df = pd.DataFrame(trade_log)
        metrics = get_performance_metrics(trades_df, stock_symbol, strat_name)
        all_results.append(metrics)
    dataframes_with_signals[stock_symbol] = df_with_all_signals

print("Backtesting complete.")


## Step 4a: Visualizing Strategy Signals on RELIANCE

Before looking at the statistical tables, let's visualize how each strategy's signals look on the price chart for a single stock. We will use the last 200 days of data for RELIANCE.

In [None]:
def plot_strategy_signals(df, strat_name, signal_col):
    """Plots the close price and buy/sell signals for a given strategy."""
    plot_df = df.tail(200).copy()
    
    buy_signals = plot_df[plot_df[signal_col] == 1]
    sell_signals = plot_df[plot_df[signal_col] == -1]
    
    plt.figure(figsize=(15, 7))
    plt.plot(plot_df.index, plot_df['close'], label='Close Price', color='skyblue', linewidth=2)
    
    plt.scatter(buy_signals.index, buy_signals['close'], label='Buy Signal', marker='^', color='green', s=150, zorder=5)
    plt.scatter(sell_signals.index, sell_signals['close'], label='Sell Signal', marker='v', color='red', s=150, zorder=5)
    
    plt.title(f'{strat_name} Strategy Signals on RELIANCE (Last 200 Days)')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

print("Generating strategy performance plots for RELIANCE...")
reliance_df_with_signals = dataframes_with_signals.get("RELIANCE")
if reliance_df_with_signals is not None:
    for strat_name in strategies.keys():
        plot_strategy_signals(reliance_df_with_signals, strat_name, strat_name.split('_')[0] + '_Signal')


## Step 4b: Final Performance Comparison Tables

In [None]:
results_df = pd.DataFrame(all_results)

sharpe_pivot = results_df.pivot_table(index='Strategy', columns='Stock', values='Sharpe Ratio')
sharpe_pivot['Average'] = sharpe_pivot.mean(axis=1)
print("--- Comparison Table: Sharpe Ratio ---")
print(sharpe_pivot.round(2))
print("\n" + "="*80 + "\n")

avg_return_pivot = results_df.pivot_table(index='Strategy', columns='Stock', values='Avg Return (%)')
avg_return_pivot['Average'] = avg_return_pivot.mean(axis=1)
print("--- Comparison Table: Average Return per Trade (%) ---")
print(avg_return_pivot.round(2))

## Step 5: Neural Network Implementation (Single Stock Demo)

This section builds the "Expert Committee" model on a single stock (RELIANCE) for a clear demonstration.

In [None]:
print("Preparing data for the Neural Network...")

# Select one stock for the NN demonstration
symbol = "RELIANCE"
# Use the dataframe that already has all signals calculated
nn_df = dataframes_with_signals[symbol].copy()

# 1. Engineer raw features
nn_df.ta.rsi(length=14, append=True, col_names=('RSI'))
nn_df.ta.macd(fast=12, slow=26, signal=9, append=True)
nn_df.ta.atr(length=14, append=True, col_names=('ATR'))
stoch = nn_df.ta.stoch(k=14, d=3, smooth_k=3)
if stoch is not None and not stoch.empty: nn_df = nn_df.join(stoch)

# 2. Create the combined feature set (X)
features = [
    'RSI', 'MACD_12_26_9', 'MACDh_12_26_9', 'ATR', 'STOCHk_14_3_3',
    'S1_Signal', 'S2_Signal', 'S3_Signal', 
    'S4_Signal', 'S5_Signal'
]

# 3. Create the multi-class target (y)
threshold = 0.0075  # 0.75% move for a Long/Short signal
conditions = [
    (nn_df['close'].shift(-1) > nn_df['close'] * (1 + threshold)),
    (nn_df['close'].shift(-1) < nn_df['close'] * (1 - threshold))
]
choices = [2, 0]  # 2 for Long, 0 for Short
nn_df['y'] = np.select(conditions, choices, default=1) # 1 for Hold

# 4. Prepare data for PyTorch
nn_df.dropna(inplace=True)
X = nn_df[features]
y = nn_df['y'].values

X = X[:-1]; y = y[:-1]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

X_train_tensor = torch.FloatTensor(X_train_scaled)
X_test_tensor = torch.FloatTensor(X_test_scaled)
y_train_tensor = torch.LongTensor(y_train)
y_test_tensor = torch.LongTensor(y_test)

print("Data preparation complete.")

In [None]:
# 5. Define the Neural Network Model
class MultiClassNN(nn.Module):
    def __init__(self, input_features, num_classes):
        super(MultiClassNN, self).__init__()
        self.layer1 = nn.Linear(input_features, 128)
        self.layer2 = nn.Linear(128, 64)
        self.output_layer = nn.Linear(64, num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.4)

    def forward(self, x):
        x = self.relu(self.layer1(x))
        x = self.dropout(x)
        x = self.relu(self.layer2(x))
        x = self.dropout(x)
        x = self.output_layer(x)
        return x

# 6. Train the Model
input_dim = X_train_scaled.shape[1]
num_classes = 3 # (Short, Hold, Long)
model = MultiClassNN(input_features=input_dim, num_classes=num_classes)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

print("\nStarting model training...")
epochs = 300
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()
    
    if (epoch + 1) % 50 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

# 7. Evaluate the Model
print("\nEvaluating model on test data...")
model.eval()
with torch.no_grad():
    test_outputs = model(X_test_tensor)
    _, predicted_signals = torch.max(test_outputs, 1)
    
    print(f'\nModel Accuracy on Test Data: {accuracy_score(y_test_tensor.numpy(), predicted_signals.numpy()) * 100:.2f}%')
    print("\nClassification Report:")
    print(classification_report(y_test_tensor.numpy(), predicted_signals.numpy(), target_names=['Short (0)', 'Hold (1)', 'Long (2)']))

## Step 6: Final DNN Model with Combined Data

This is the final, most robust model. It's trained on a large, combined dataset from the top 5 stocks to learn more generalized patterns. The features are the signals from our 5 rule-based strategies. The trained model and scaler are saved so they can be loaded later to make predictions on new data.

In [None]:
print("--- Building Final DNN Model on Combined Data ---")

# 1. Fetch larger dataset for top 5 stocks
top_5_stocks = ["RELIANCE", "TCS", "HDFCBANK", "ICICIBANK", "INFY"]
combined_df_list = []

print("\nFetching 2500 bars for each of the top 5 stocks...")
for symbol in top_5_stocks:
    try:
        data = tv.get_hist(symbol=symbol, exchange="NSE", interval=Interval.in_daily, n_bars=2500)
        print(f"Fetched data for {symbol}")
        
        # 2. Add all strategy signals
        data = add_s1_triple_threat(data)
        data = add_s2_rsi_peak(data)
        data = add_s3_bollinger_squeeze(data)
        data = add_s4_ema_stoch(data)
        data = add_s5_adx_trend(data)
        
        # 3. Create multi-class target variable
        threshold = 0.0075
        conditions = [
            (data['close'].shift(-1) > data['close'] * (1 + threshold)),
            (data['close'].shift(-1) < data['close'] * (1 - threshold))
        ]
        choices = [2, 0] # Long, Short
        data['y'] = np.select(conditions, choices, default=1) # Hold
        
        combined_df_list.append(data)
    except Exception as e:
        print(f"Could not process {symbol}: {e}")

# 4. Combine all dataframes into one
final_df = pd.concat(combined_df_list)
final_df.dropna(inplace=True)

print("\nCombined dataset created.")

# 5. Prepare data for PyTorch
features = ['S1_Signal', 'S2_Signal', 'S3_Signal', 'S4_Signal', 'S5_Signal']
X = final_df[features]
y = final_df['y'].values

# Split into 80% train, 20% test
split_index = int(len(X) * 0.8)
X_train, X_test = X[:split_index], X[split_index:]
y_train, y_test = y[:split_index], y[split_index:]

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

X_train_tensor = torch.FloatTensor(X_train_scaled)
X_test_tensor = torch.FloatTensor(X_test_scaled)
y_train_tensor = torch.LongTensor(y_train)
y_test_tensor = torch.LongTensor(y_test)

print("Data prepared for final DNN.")

# 6. Define and Train the Final DNN
class FinalDNN(nn.Module):
    def __init__(self, input_features, num_classes):
        super(FinalDNN, self).__init__()
        self.layer1 = nn.Linear(input_features, 64)
        self.layer2 = nn.Linear(64, 32)
        self.output_layer = nn.Linear(32, num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)

    def forward(self, x):
        x = self.relu(self.layer1(x))
        x = self.dropout(x)
        x = self.relu(self.layer2(x))
        x = self.dropout(x)
        x = self.output_layer(x)
        return x

input_dim = X_train_scaled.shape[1]
final_model = FinalDNN(input_features=input_dim, num_classes=3)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(final_model.parameters(), lr=0.001)

print("\nStarting final model training...")
epochs = 400 # Train for more epochs on the larger dataset
for epoch in range(epochs):
    final_model.train()
    optimizer.zero_grad()
    outputs = final_model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()
    if (epoch + 1) % 100 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

# 7. Final Model Evaluation (Rating)
print("\n--- Final Model Rating ---")
final_model.eval()
with torch.no_grad():
    test_outputs = final_model(X_test_tensor)
    _, predicted_signals = torch.max(test_outputs, 1)
    print(f'\nModel Accuracy on Unseen Test Data: {accuracy_score(y_test_tensor.numpy(), predicted_signals.numpy()) * 100:.2f}%')
    print("\nClassification Report:")
    print(classification_report(y_test_tensor.numpy(), predicted_signals.numpy(), target_names=['Short (0)', 'Hold (1)', 'Long (2)']))

# 8. Save the Model and Scaler for Future Use
torch.save(final_model.state_dict(), 'final_dnn_model.pth')
joblib.dump(scaler, 'final_dnn_scaler.gz')
print("\nFinal model and data scaler have been saved.")
print("You can now load 'final_dnn_model.pth' and 'final_dnn_scaler.gz' to make predictions on new data.")