In [1]:
import pandas as pd
import os

In [2]:
# Function to read stock data from an Excel file
def get_stock_data_from_excel(file_path):
    all_stock_data = pd.read_excel(file_path, sheet_name=None)
    stock_data = {}

    for sheet_name, data in all_stock_data.items():
        # Ensure 'Date' is treated as a datetime column and set as index
        data['Date'] = pd.to_datetime(data['Date'])
        data.set_index('Date', inplace=True)
        stock_data[sheet_name] = data

    return stock_data

In [3]:
# Function to calculate EMA for a given stock's dataframe
def calculate_ema(data, period):
    return data['Close'].ewm(span=period, adjust=False).mean()

In [4]:
# Function to calculate ATR (Average True Range)
def calculate_atr(data, atr_period):
    data['High-Low'] = data['High'] - data['Low']
    data['High-Close'] = (data['High'] - data['Close'].shift()).abs()
    data['Low-Close'] = (data['Low'] - data['Close'].shift()).abs()

    data['True Range'] = data[['High-Low', 'High-Close', 'Low-Close']].max(axis=1)
    data['ATR_' + str(atr_period)] = data['True Range'].rolling(window=atr_period).mean()

    # Drop the intermediate columns
    data.drop(['High-Low', 'High-Close', 'Low-Close', 'True Range'], axis=1, inplace=True)
    return data

In [5]:
# Function to create the SCRIP DataFrame for Double EMA crossover with ATR filtering
def create_scrip_dataframe_with_atr(data, fast_ema_period, slow_ema_period, atr_period, atr_threshold):
    # Calculate EMAs and ATR
    data['EMA_' + str(fast_ema_period)] = calculate_ema(data, fast_ema_period)
    data['EMA_' + str(slow_ema_period)] = calculate_ema(data, slow_ema_period)
    data = calculate_atr(data, atr_period)

    # Determine Long/Short signals based on crossover and ATR threshold
    data['Long / Short'] = data.apply(
        lambda row: 'Long' if row['EMA_' + str(fast_ema_period)] > row['EMA_' + str(slow_ema_period)] and row['ATR_' + str(atr_period)] > atr_threshold else
                    'Short' if row['EMA_' + str(fast_ema_period)] < row['EMA_' + str(slow_ema_period)] and row['ATR_' + str(atr_period)] > atr_threshold else 'NA',
        axis=1
    )

    # Initialize Signal No. and increment on position change
    data['Signal No.'] = 0
    signal_no = 0
    previous_position = None
    for i in range(len(data)):
        current_position = data.iloc[i]['Long / Short']
        if current_position != previous_position and current_position != 'NA':
            signal_no += 1
        data.at[data.index[i], 'Signal No.'] = signal_no if current_position != 'NA' else 0
        previous_position = current_position

    return data

In [6]:
# Function to create the Trade_Results DataFrame collectively for all trades
def create_trade_results(scrip_df):
    trades = []
    grouped = scrip_df.groupby('Signal No.')

    for signal_no, group in grouped:
        if signal_no < 3:
            continue

        if signal_no == scrip_df['Signal No.'].max():  # Exclude the last signal for all trades combined
            break
        entry = group.iloc[0]
        next_signal_entry = grouped.get_group(signal_no + 1).iloc[0]

        entry_index = scrip_df.index.get_loc(entry.name) + 1 #CHANGE 1
        exit_index = scrip_df.index.get_loc(next_signal_entry.name) + 1 #CHANGE 2

        # Ensure the index is within bounds
        if entry_index < len(scrip_df) and exit_index < len(scrip_df):  #CHANGE 3
            entry = scrip_df.iloc[entry_index]
            next_signal_entry = scrip_df.iloc[exit_index]
        else:
            continue  # Skip if the indices are out of bounds

        entry_date = entry.name
        exit_date = next_signal_entry.name


        entry_price = entry['Open']   #CHANGE 4
        exit_price = next_signal_entry['Open']  #CHANGE 5
        days_in_trade = (exit_date - entry_date).days

        # Determine if the signal is a whipsaw signal
        whipsaw_signal_count = scrip_df[scrip_df['Signal No.'] == signal_no].shape[0]
        whipsaw_signal = 'Yes' if whipsaw_signal_count == 1 else 'No'

        # Determine the range excluding the entry date
        period_data = scrip_df[(scrip_df.index >= entry_date) & (scrip_df.index < exit_date)] #CHANGE 6

        # Calculate highest or lowest point based on the signal type
        highest_lowest_point = period_data['High'].max() if entry['Long / Short'] == 'Long' else period_data['Low'].min()
        highest_point_percent = ((highest_lowest_point - entry_price) / entry_price * 100) if entry['Long / Short'] == 'Long' else ((entry_price - highest_lowest_point) / entry_price * 100)

        # Determine if the highest point occurred on the opening day (excluding entry day)
        opening_highest = 'Yes' if highest_point_percent <= 0.0 else 'No' #CHANGE 7 instead of less than, we do less than equal to

        trades.append({
            'Signal No.': signal_no,
            'Type of Signal': entry['Long / Short'],
            'Entry Date': entry_date,
            'Exit Date': exit_date,
            'Entry Price': entry_price,
            'Exit Price': exit_price,
            'Days in Trade': days_in_trade,
            'Pure Signal P&L': 0.0,  # Placeholder, will be calculated separately
            'Pure Signal Won/Lost': '',  # Placeholder, will be calculated separately
            'Highest Point/Lowest Point': highest_lowest_point,
            'Highest Point %': highest_point_percent,
            'Opening = Highest': opening_highest,
            'Whipsaw Signal': whipsaw_signal,
            'Opening Equity': 0.0,  # Placeholder, will be calculated separately
            'Shares Bought': 0,  # Placeholder, will be calculated separately
            'Total Price Paid': 0.0,  # Placeholder, will be calculated separately
            'Total Price Got': 0.0,  # Placeholder, will be calculated separately
            'Transaction Cost': 0.0,  # Placeholder, will be calculated separately
            'Closing Equity': 0.0,  # Placeholder, will be calculated separately
        })

    trade_results_df = pd.DataFrame(trades)
    return trade_results_df

In [7]:
# Function to calculate financial metrics for a given trade results DataFrame
def calculate_financial_metrics(trade_results_df, starting_equity):
    previous_equity = starting_equity
    for i, row in trade_results_df.iterrows():
        shares_bought = round(previous_equity / row['Entry Price'])
        total_price_paid = shares_bought * row['Entry Price']
        total_price_got = shares_bought * row['Exit Price']
        transaction_cost = total_price_paid * 0.002
        pnl = ((row['Exit Price'] - row['Entry Price']) * shares_bought) - transaction_cost if row['Type of Signal'] == 'Long' else ((row['Entry Price'] - row['Exit Price']) * shares_bought) - transaction_cost
        won_lost = 'Won' if pnl > 0 else 'Lost'
        closing_equity = previous_equity + pnl

        # Update the dataframe
        trade_results_df.at[i, 'Opening Equity'] = previous_equity
        trade_results_df.at[i, 'Shares Bought'] = shares_bought
        trade_results_df.at[i, 'Total Price Paid'] = total_price_paid
        trade_results_df.at[i, 'Total Price Got'] = total_price_got
        trade_results_df.at[i, 'Transaction Cost'] = transaction_cost
        trade_results_df.at[i, 'Pure Signal P&L'] = pnl
        trade_results_df.at[i, 'Pure Signal Won/Lost'] = won_lost
        trade_results_df.at[i, 'Closing Equity'] = closing_equity

        previous_equity = closing_equity

    return trade_results_df

In [8]:
# Function to create the Analysis DataFrame for a given set of trades
def create_analysis_dataframe(trade_results_df, signal_type, stock_symbol, fast_ema, slow_ema, starting_equity):
    total_trades = len(trade_results_df)
    winning_trades = trade_results_df[trade_results_df['Pure Signal Won/Lost'] == 'Won']
    losing_trades = trade_results_df[trade_results_df['Pure Signal Won/Lost'] == 'Lost']
    winning_percentage = (len(winning_trades) / total_trades) * 100 if total_trades > 0 else 0
    losing_percentage = (len(losing_trades) / total_trades) * 100 if total_trades > 0 else 0
    num_whipsaws = trade_results_df[trade_results_df['Whipsaw Signal'] == 'Yes'].shape[0]
    num_opening_highest = trade_results_df[trade_results_df['Opening = Highest'] == 'Yes'].shape[0]
    thirtieth_percentile = trade_results_df['Highest Point %'].quantile(0.3) if total_trades > 0 else 0

    # Calculate CAGR (Compound Annual Growth Rate)
    if total_trades > 0:
        num_years = trade_results_df['Days in Trade'].sum() / 365
        ending_equity = trade_results_df.iloc[-1]['Closing Equity']
        cagr = (((ending_equity / starting_equity) ** (1 / num_years)) - 1) * 100 if num_years > 0 else 0
    else:
        cagr = 0

    analysis_data = {
        'Scrip Name': stock_symbol,
        'Fast EMA': fast_ema,
        'Slow EMA': slow_ema,
        '# Signals': total_trades,
        'Signal Type': signal_type,
        'CAGR- Return': cagr,
        'Winning %': winning_percentage,
        'Losing %': losing_percentage,
        '# Whipsaws': num_whipsaws,
        '# Opening = Highest': num_opening_highest,
        '30th Percentile': thirtieth_percentile
    }
    analysis_df = pd.DataFrame([analysis_data])
    return analysis_df

In [9]:
# Path to the Excel file containing stock data
file_path = '/content/drive/MyDrive/Data/ABB_Data.xlsx'

In [10]:
# Specify the new ATR period and threshold
atr_period = 14  # Period for calculating ATR
atr_threshold = 1.5  # Example threshold for volatility filter

In [11]:
# Get stock data from Excel file
stock_data = get_stock_data_from_excel(file_path)

In [12]:
# Specify the stocks to analyze, their respective EMA periods, and ATR parameters
selected_stocks = {
    'ABB': (8, 21, atr_period, atr_threshold)
}

In [13]:
starting_equity = 100000  # Initial equity

In [14]:
# Initialize an empty dictionary to store results
all_results = {}

In [17]:
# Run the analysis for each selected stock with ATR filtering
for stock_symbol, (fast_ema_period, slow_ema_period, atr_period, atr_threshold) in selected_stocks.items():
    # Fetch the stock data
    if stock_symbol in stock_data:
        data = stock_data[stock_symbol]

        # Run the SCRIP dataframe creation with ATR filtering
        scrip_df = create_scrip_dataframe_with_atr(data, fast_ema_period, slow_ema_period, atr_period, atr_threshold)

        # Generate trade results
        #trade_results_df = create_trade_results(scrip_df)

        # Separate the trade results into Long and Short trade results
        #long_trade_results_df = trade_results_df[trade_results_df['Type of Signal'] == 'Long'].reset_index(drop=True)
        #short_trade_results_df = trade_results_df[trade_results_df['Type of Signal'] == 'Short'].reset_index(drop=True)

        # Calculate metrics separately for long and short trade results
        #long_trade_results_df = calculate_financial_metrics(long_trade_results_df, starting_equity)
        #short_trade_results_df = calculate_financial_metrics(short_trade_results_df, starting_equity)

        # Generate analysis for Long and Short trade results separately
        #long_analysis_df = create_analysis_dataframe(long_trade_results_df, 'Long', stock_symbol, fast_ema_period, slow_ema_period, starting_equity)
        #short_analysis_df = create_analysis_dataframe(short_trade_results_df, 'Short', stock_symbol, fast_ema_period, slow_ema_period, starting_equity)

        # Store the results
        all_results[stock_symbol] = {
            'scrip_df': scrip_df,
            #'long_trade_results_df': long_trade_results_df,
            #'short_trade_results_df': short_trade_results_df,
            #'long_analysis_df': long_analysis_df,
            #'short_analysis_df': short_analysis_df
        }

In [18]:
# Save the analysis data to a new Excel file
output_file_path = '/content/drive/MyDrive/Analysis/ATR/ABB_ATR_With_DoubleEMA.xlsx'

In [19]:
with pd.ExcelWriter(output_file_path, engine='openpyxl', mode='w') as writer:
    for stock_symbol, results in all_results.items():
        results['scrip_df'].to_excel(writer, sheet_name=f'{stock_symbol}_SCRIP', index=True)
        #results['long_trade_results_df'].to_excel(writer, sheet_name=f'{stock_symbol}_Long Trades', index=False)
        #results['short_trade_results_df'].to_excel(writer, sheet_name=f'{stock_symbol}_Short Trades', index=False)
        #results['long_analysis_df'].to_excel(writer, sheet_name=f'{stock_symbol}_Analysis Long', index=False)
        #results['short_analysis_df'].to_excel(writer, sheet_name=f'{stock_symbol}_Analysis Short', index=False)

In [20]:
# Confirm file save
print(f"Data saved to {output_file_path}")

Data saved to /content/drive/MyDrive/Analysis/ATR/ABB_ATR_With_DoubleEMA.xlsx
