In [11]:
sel_stick = ['JAIBALAJI', 'SBILIFE', 'QUESS', 'ICICIGI', 'BBTC', 'ERIS', 'HDFCLIFE', 'ICICIPRULI', 'JMFINANCIL', 'PNBHOUSING', 'JINDALSAW', 'OFSS', 'BAJAJFINSV', 'GODREJIND', 'BAJAJHLDNG', 'MANYAVAR', 'POLYMED', 'SBICARD', 'HFCL', 'JKCEMENT', 'SHRIRAMFIN', 'CHOLAFIN', 'KALYANKJIL', 'PERSISTENT', 'M&MFIN']


In [12]:
import pandas as pd
import numpy as np
import yfinance as yf
import openpyxl
from openpyxl.styles import PatternFill
import mplfinance as mpf
from typing import Optional
import os

In [13]:
class TurtleTrading:
    def __init__(self, ticker: str, period: str = "1mo", nse: bool = True, rounding: int = 2):
        """
        Turtle Trading logic for fetching data, calculating indicators, and generating signals.
        """
        stock_exchange = ".NS" if nse else ""
        self.ticker = ticker
        self.mod_ticker = ticker + stock_exchange
        self.period = period
        self.rounding = rounding
        self.data: Optional[pd.DataFrame] = None

    def fetch_data(self) -> pd.DataFrame:
        """
        Fetch historical stock data.
        """
        try:
            self.data = yf.download(self.mod_ticker, period=self.period)
            self.data[['Open', 'High', 'Low', 'Close', 'Adj Close']] = self.data[
                ['Open', 'High', 'Low', 'Close', 'Adj Close']
            ].round(self.rounding)
        except Exception as e:
            raise RuntimeError(f"Failed to fetch data for {self.ticker}: {e}")
        return self.data

    def calculate_indicators(self) -> pd.DataFrame:
        """
        Calculate technical indicators like 55-day High, 20-day Low, and ATR.
        """
        if self.data is None:
            raise ValueError("Data not loaded. Call fetch_data first.")
        
        self.data['55d_High'] = self.data['High'].rolling(window=55).max()
        self.data['20d_Low'] = self.data['Low'].rolling(window=20).min()
        true_range = pd.concat([
            self.data['High'] - self.data['Low'],
            (self.data['High'] - self.data['Close'].shift(1)).abs(),
            (self.data['Close'].shift(1) - self.data['Low']).abs()
        ], axis=1).max(axis=1)
        self.data['True_Range'] = true_range
        self.data['Avg_True_Range(N)'] = true_range.rolling(window=20).mean().round(self.rounding)
        return self.data

    def generate_signals(self) -> pd.DataFrame:
        """
        Generate buy/sell signals based on Turtle Trading strategy.
        """
        if self.data is None:
            raise ValueError("Data not loaded. Call fetch_data first.")
        
        self.data['Buy_Signal'] = self.data['High'] > self.data['55d_High'].shift(1)
        self.data['Sell_Signal'] = self.data['Low'] < self.data['20d_Low'].shift(1)
        return self.data

    def save_to_excel(self, filename: str):
        """
        Save data to Excel with conditional formatting for Buy/Sell signals.
        """
        if self.data is None:
            raise ValueError("Data not loaded. Call fetch_data first.")
        
                # Check if the file exists; create it if it doesn't
        if not os.path.exists(filename):
            # Create an empty workbook and save it
            workbook = openpyxl.Workbook()
            workbook.save(filename)
        
        with pd.ExcelWriter(filename, engine="openpyxl", mode="a", if_sheet_exists="replace") as writer:
            self.data.to_excel(writer, sheet_name=self.ticker)


    
    def generate_format(self, filename: str):
        """
        Apply conditional formatting for Buy_Signal and Sell_Signal in the Excel file and freeze panes.
        """
        if self.data is None:
            raise ValueError("Data not loaded. Call fetch_data first.")

        # Load the workbook and worksheet
        try:
            workbook = openpyxl.load_workbook(filename)
        except FileNotFoundError:
            raise FileNotFoundError(f"The file {filename} does not exist. Save data first.")

        if self.ticker not in workbook.sheetnames:
            raise ValueError(f"Sheet for ticker {self.ticker} not found in {filename}. Ensure data is saved.")

        worksheet = workbook[self.ticker]

        # Define fill styles for Buy_Signal and Sell_Signal
        green_fill = PatternFill(start_color='C6EFCE', end_color='C6EFCE', fill_type='solid')
        red_fill = PatternFill(start_color='FFC7CE', end_color='FFC7CE', fill_type='solid')

        # Locate column indices for Buy_Signal and Sell_Signal
        buy_signal_col = self.data.columns.get_loc('Buy_Signal') + 1
        sell_signal_col = self.data.columns.get_loc('Sell_Signal') + 1

        # Apply conditional formatting row by row
        for row in range(2, len(self.data) + 2):  # Skip header row
            buy_signal = worksheet.cell(row, buy_signal_col + 1).value
            sell_signal = worksheet.cell(row, sell_signal_col + 1).value

            if buy_signal:  # Apply green fill if Buy_Signal is True
                for cell in worksheet[row]:
                    cell.fill = green_fill
            if sell_signal:  # Apply red fill if Sell_Signal is True
                for cell in worksheet[row]:
                    cell.fill = red_fill

        # Freeze the first row and first column
        worksheet.freeze_panes = 'B2'

        # Save changes to the workbook
        workbook.save(filename)
        print(f"Formatting applied and saved to {filename}.")

    def plot_candlestick(self):
        """
        Plot a candlestick chart.
        """
        if self.data is None:
            raise ValueError("Data not loaded. Call fetch_data first.")
        mpf.plot(self.data, type="candle", volume=False, style="charles", title=self.ticker)


In [14]:
def process_tickers(tickers: list, filename: str, days: int = 1, period: str = "1y"):
    """
    Process multiple tickers and save data for those meeting the buy signal.
    """
    selective_tickers = {}
    for ticker in tickers:
        trader = TurtleTrading(ticker, period=period)
        trader.fetch_data()
        trader.calculate_indicators()
        data = trader.generate_signals()
        if data['Buy_Signal'].tail(days).any():
            trader.save_to_excel(filename)
            trader.generate_format(filename)
            last_row = data.iloc[-1]
            atr = last_row['Avg_True_Range(N)']
            close_price = last_row['Close']
            selective_tickers[ticker] = round((atr / close_price) * 100, 4)
    return dict(sorted(selective_tickers.items(), key=lambda x: x[1], reverse=True))


# Example usage
if __name__ == "__main1__":
    from nifty_50_tickers import nifty_500

    tickers = nifty_500
    filename = "Turtle1_Trading.xlsx"
    top_tickers = process_tickers(tickers, filename, days=1, period="1y")
    print(top_tickers)

In [15]:
import multiprocessing
import pandas as pd
from typing import List, Dict, Tuple

def log_and_process(args):
    ticker, period, filename, days = args
    print(f"Starting processing for {ticker}")
    result = process_single_ticker(ticker, period, filename, days)
    print(f"Finished processing for {ticker}")
    return result


def process_single_ticker(ticker: str, period: str, filename: str, days: int) -> Tuple[str, float]:
    """
    Process a single ticker to fetch data, calculate indicators, and check buy signals.
    Returns a tuple of the ticker and its ATR percentage if it meets the buy signal criteria.
    """
    print("in process")
    try:
        # Initialize TurtleTrading for the ticker
        trader = TurtleTrading(ticker, period=period)
        trader.fetch_data()
        trader.calculate_indicators()
        data = trader.generate_signals()

        # Check for buy signals
        if data['Buy_Signal'].tail(days).any():
            # Save data to Excel
            trader.save_to_excel(filename)
            last_row = data.iloc[-1]
            atr = last_row['Avg_True_Range(N)']
            close_price = last_row['Close']
            atr_percentage = round((atr / close_price) * 100, 4)
            print("xjhh")
            return ticker, atr_percentage

    except Exception as e:
        print(f"Error processing {ticker}: {e}")
    return ticker, None


def process_tickers_multiprocessing(tickers: List[str], filename: str, days: int = 1, period: str = "1y") -> Dict[str, float]:
    """
    Process multiple tickers in parallel using multiprocessing.
    Returns a dictionary of tickers and their ATR percentages for those meeting the buy signal criteria.
    """
    # @staticmethod

    # Create a partial function with fixed arguments
    with multiprocessing.Pool(processes=2) as pool:
        print("in with ")
        results = pool.map(
            log_and_process,[(ticker, period, filename, days) for ticker in tickers]
        )
        print("in with 2")
    # Filter out None results 5pand sort by ATR percentage
    filtered_results = {ticker: atr for ticker, atr in results if atr is not None}
    return dict(sorted(filtered_results.items(), key=lambda x: x[1], reverse=True))





In [16]:
# Example Usage
if __name__ == "__main1__":
    from nifty_50_tickers import nifty_500

    tickers = nifty_500[:5]
    filename = "Turtle_Trading1.xlsx"
    top_tickers = process_tickers_multiprocessing(tickers, filename, days=1, period="1y")
    print(top_tickers)

In [17]:
# class AccountHandler:
#     def __init__(self, risk_percentage: float, account_size: float = 100000, stock_types: int = 2):
#         """
#         Handles risk management and position sizing.
#         """
#         self.account_size = account_size
#         self.risk_percentage = risk_percentage / 100
#         self.trading_amt_per_stock = self.account_size / stock_types
#         self.risk = self.trading_amt_per_stock * self.risk_percentage

#     def calculate_position_size(self, atr: float) -> int:
#         """
#         Calculate the position size (number of shares).
#         """
#         return int(self.risk // atr)

#     @staticmethod
#     def calculate_stop_loss(entry_price: float, atr: float) -> float:
#         """
#         Calculate the stop-loss price.
#         """
#         return entry_price - 2 * atr

#     @staticmethod
#     def calculate_first_pyramid_price(entry_price: float, atr: float) -> float:
#         """
#         Calculate the price for the first pyramid signal.
#         """
#         return entry_price + 0.5 * atr
    
class Account_Handler:
    """
    Handles risk management, position sizing, and trade levels for the Turtle Trading strategy.
    """
    NO_OF_STOCK_TYPE = 2  # Number of different stocks in the account
    account_size = 100000  # Total account size in currency units

    def __init__(self, risk_percentage: float):
        """
        Initialize the account handler with risk management parameters.
        """
        self.risk_percentage = risk_percentage / 100
        self.trading_amt_per_share = self.account_size / self.NO_OF_STOCK_TYPE
        self.risk = self.trading_amt_per_share * self.risk_percentage

    def fetch_atr(self, filename: str, ticker: str) -> float:
        """
        Retrieve the ATR (Average True Range) value from the Excel file for the given ticker.
        """
        ticker = ticker.upper()
        xls = pd.ExcelFile(filename)
        if ticker in xls.sheet_names:
            df = pd.read_excel(filename, sheet_name=ticker)
            df['Date'] = pd.to_datetime(df['Date'])  # Ensure proper datetime format
            df.set_index('Date', inplace=True)
            
            try:
                atr = df.iloc[-1]['Avg_True_Range(N)']
                return atr
            except KeyError:
                raise KeyError(f"ATR column not found for {ticker}.")
        else:
            raise ValueError(f"Ticker {ticker} not found in the Excel file.")

    def calculate_position_size(self, atr: float) -> int:
        """
        Calculate the position size (number of shares) based on risk and ATR.
        """
        if atr <= 0:
            raise ValueError("ATR must be greater than 0.")
        position_size = self.risk // atr
        return int(position_size)

    def fetch_entry_price(self, filename: str, ticker: str) -> float:
        """
        Retrieve the entry price (previous day's close) for the given ticker from the Excel file.
        """
        ticker = ticker.upper()
        xls = pd.ExcelFile(filename)
        if ticker in xls.sheet_names:
            df = pd.read_excel(filename, sheet_name=ticker)
            df['Date'] = pd.to_datetime(df['Date'])  # Ensure proper datetime format
            df.set_index('Date', inplace=True)
            
            try:
                entry_price = df.iloc[-1]['Close']
                return entry_price
            except KeyError:
                raise KeyError(f"Close price column not found for {ticker}.")
        else:
            raise ValueError(f"Ticker {ticker} not found in the Excel file.")

    def calculate_stop_loss(self, entry_price: float, atr: float, multiplier: float = 2.0) -> float:
        """
        Calculate the stop-loss price based on the entry price and ATR multiplier.
        """
        return entry_price - (multiplier * atr)

    def calculate_pyramid_price(self, entry_price: float, atr: float, level: int) -> float:
        """
        Calculate the pyramid levels based on entry price and ATR.
        Levels:
        - Level 1: 0.5 * ATR
        - Level 2: 1.0 * ATR
        - Level 3: 1.5 * ATR (extendable)
        """
        return entry_price + (level * 0.5 * atr)

    def calculate_stop_loss_pyramid(self, stop_loss_price: float, atr: float, level: int) -> float:
        """
        Calculate the stop-loss for pyramid levels based on ATR adjustments.
        """
        return stop_loss_price + (level * 0.5 * atr)


In [20]:
filename = "Turtle1_Trading.xlsx"
ticker = "FORTIS"
account_handler = Account_Handler(risk_percentage=1)

# Fetch ATR and calculate position size
atr = account_handler.fetch_atr(filename, ticker)
position_size = account_handler.calculate_position_size(atr)

# Fetch entry price and calculate stop-loss
entry_price = account_handler.fetch_entry_price(filename, ticker)
stop_loss_price = account_handler.calculate_stop_loss(entry_price, atr)

# Calculate pyramid levels
first_pyr_price = account_handler.calculate_pyramid_price(entry_price, atr, level=1)
second_pyr_price = account_handler.calculate_pyramid_price(entry_price, atr, level=2)

print(f"ATR: {atr}, Position Size: {position_size}, Entry Price: {entry_price}")
print(f"Stop Loss: {stop_loss_price}, First Pyramid: {first_pyr_price}, Second Pyramid: {second_pyr_price}")


ATR: 21.26, Position Size: 23, Entry Price: 685.85
Stop Loss: 643.33, First Pyramid: 696.48, Second Pyramid: 707.11
