# Setup Basic Imports
> * pip install ipympl
> * jupyter nbextension install --py --symlink --sys-prefix --overwrite ipympl
> * jupyter nbextension enable --py --sys-prefix ipy

In [12]:
import os, sys
from datetime import date, datetime, timedelta
import time
from time import sleep, perf_counter
from typing import Any
import traceback

import pandas as pd
pd.options.plotting.backend = "plotly"

import numpy as np

pd.set_option('display.max_columns',None)
pd.set_option('display.max_rows',None)
pd.set_option('display.width',1000)
pd.set_option('display.colheader_justify','center')
pd.set_option('display.precision',3)

import matplotlib.pyplot as plt

import matplotlib.gridspec as gridspec

%matplotlib widget
%matplotlib inline

import warnings
warnings.filterwarnings("ignore")
from tqdm.notebook import tqdm_notebook as tqdm

from IPython.display import display, clear_output, HTML

# This allows multiple outputs from a single jupyter notebook cell:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Import application libraries

In [13]:
# %pip install nsetools
# %pip install nsepy
clear_output()

from nsetools import Nse
import asyncio
from reportlab.pdfgen import canvas
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.pdfbase import pdfmetrics
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.utils import ImageReader
from cmath import isnan
from backtesting import Backtest, Strategy
import plotly.graph_objects as go
# from scipy.stats import linregress

nse = Nse()

from nsepy import get_history

# Utility functions

In [14]:
# converts seconds to readable format
def get_time_hh_mm_ss(sec: int) -> None:
    td = str(timedelta(seconds=sec))
    # print('Time in hh:mm:ss:', td)
    # split string into individual component
    x = td.split(':')
    print('Time in hh:mm:ss:', x[0], 'Hours', x[1], 'Minutes', x[2], 'Seconds')
    
# Create new pdf report
def create_report_file(ticker: str, textList: list, ImagePath: str):
    w, h = A4
    
    # print(f'Canvas size: width={w}, height={h}')
    date_string = date.today().strftime("%Y%m%d")
    cwd = os.getcwd()
    fol_path = f'{cwd}\\reports\\{date_string}'
    if not os.path.exists(fol_path):
        os.mkdir(fol_path)
    filepath = f'{fol_path}\\{ticker}.pdf'
    
    # creating a pdf object
    pdf = canvas.Canvas(filepath)
    pdf.setTitle(f'Stock Analysis {date.today()}')
    
    # textline and for loop
    text = pdf.beginText(10, h-30)
    text.setFont("Courier", 18)
    text.setFillColor(colors.red)
    text.textLine(ticker)
    text.textLine(f'Stock Analysis {date.today().strftime("%d %B %Y")}')
    for line in textList:
        text.textLine(line)
    pdf.drawText(text)
    # drawing a line
    pdf.line(30, 710, 550, 710)
    
    # drawing a image at the 
    # specified (x.y) position
    img = ImageReader(ImagePath)
    img_w, img_h = img.getSize()
    pdf.drawInlineImage(ImagePath, 10, 20, width=w-10, preserveAspectRatio=True)
    
    # saving the pdf
    pdf.save()

# Download Nifty Stock Codes

In [15]:
# It fetches all stock codes from NSE
def fetch_stock_codes():
    _stock_codes = nse.get_stock_codes()
    stock_data = {'ticker':_stock_codes.keys(), 'company_name': _stock_codes.values()}

    df_stock_codes = pd.DataFrame(stock_data)
    df_stock_codes = df_stock_codes[1:]
    df_stock_codes.to_excel('data\\stock_codes.xlsx', index=False, header=True)
    return df_stock_codes

# it provides a list of tickers that we are interested in
def get_stock_codes() -> list:
    cwd = os.getcwd()
    filepath = f'{cwd}\\data\\stock_codes.xlsx'
    if os.path.exists(filepath):
        df_stock_codes = pd.read_excel(filepath)
        return df_stock_codes['ticker'].values.tolist()
    else:
        print('stock codes file missing')
        return []

# df_stock_codes = fetch_stock_codes()

# Get Historical data for stocks

> * https://dev.to/shahstavan/how-to-fetch-stock-data-using-api-in-python-5e15
> * https://algotrading101.com/learn/yahoo-finance-api-guide/
> * https://towardsdatascience.com/how-to-get-stock-data-using-python-c0de1df17e75
> * https://www.quora.com/Where-can-I-get-free-NSE-stock-price-data-for-a-personal-Python-project
> * !pip install yahoo_fin
> *  https://algotrading101.com/learn/yahoo-finance-api-guide/

In [16]:
async def fetch_stock_historical(ticker):
    try:
        _today = date.today()
        _start_date = _today - timedelta(days=365*5)
        cwd = os.getcwd()
        filepath = f'{cwd}\\historical\\{ticker}.xlsx'
        # if os.path.isfile(filepath):
        #     history_df = pd.read_excel(filepath)
        #     # fetch last date for which data is available
        #     if len(history_df)  > 200:
        #         _start_date = history_df['Date'].max().date()
        #         # if latest data is already available, then do nothing
        #         if _today == _start_date:
        #             print(f'latest data already available, skipping for {ticker}')
        #             return        
            
        _d = get_history(symbol=ticker, start=_start_date, end=_today)
        # Move date columns to first
        _d.insert(loc=0, column='Date', value=_d.index)
        
        # print(f'extracted {_d.shape[0]} historical records for {ticker}')
        _d.to_excel(filepath, index=False)
    except Exception as e:
        print(f'Error: skipped historical data extraction for {ticker}')
        print(e)
        
# fetch_stock_historical('ACC')

In [18]:
# loop through all stock codes and fetch_stock_historical
async def collect_all_hostoricals():
    stock_codes = get_stock_codes()
    if len(stock_codes) > 0:
        tasks = []
        print('Creating async requests')
        for ticker in tqdm(stock_codes):
            tasks.append(
                asyncio.create_task(
                    fetch_stock_historical(ticker)
                    )
                )
        # print('fetching historicals from NSE')
        pbar = tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Downloading stock historicals")
        res = [await t for t in pbar]
        # results = await asyncio.gather(*tasks, return_exceptions=False )
    else:
        print("Stock codes not found")
        
def read_ticker_file(ticker: str):
    isValid = False
    result = None
    cwd = os.getcwd()
    filepath = f'{cwd}\\historical\\{ticker}.xlsx'
    if os.path.exists(filepath):
        df_ticker = pd.read_excel(filepath)
        if len(df_ticker) > 200:
            df_ticker.set_index('Date', inplace=True, )
            result = df_ticker
            isValid = True
    else:
        print(f'No data found for {ticker}')
    return isValid, result
    
await collect_all_hostoricals()

Creating async requests


  0%|          | 0/1870 [00:00<?, ?it/s]

Downloading stock historicals:   0%|          | 0/1870 [00:00<?, ?it/s]

# Stock Analysis

> * [mplfinance plotting](https://nbviewer.org/github/matplotlib/mplfinance/blob/master/examples/addplot.ipynb)
> * [Backtesting](https://kernc.github.io/backtesting.py/doc/backtesting/backtesting.html#backtesting.backtesting.Strategy.I)
> * see within comments for some links
<!-- https://awesomeopensource.com/projects/python/stock-analysis -->
<!-- https://awesomeopensource.com/projects/python/stock-market-analysis -->
<!-- https://medium.com/geekculture/top-4-python-libraries-for-technical-analysis-db4f1ea87e09 -->
<!-- https://dev.to/sewinter/8-best-python-libraries-for-algorithmic-trading-1af8 -->
<!-- https://financetrain.com/best-python-librariespackages-finance-financial-data-scientists -->
<!-- https://analyticsindiamag.com/top-python-libraries-to-get-historical-stock-data-with-code/ -->
<!-- https://medium.com/analytics-vidhya/8-popular-python-libraries-in-finance-industry-29d936c40ca4 -->
<!-- https://techflare.blog/simple-backtesting-for-trading-in-python/ -->


## Basic returns analysis

In [97]:
class BasicReturns():
    def __init__(self, ticker, df_stock, durationDays: int):
        self.ticker = ticker
        self.durationDays = durationDays
        
        _today = date.today()
        _start_date = _today - timedelta(days=self.durationDays)
        self.df_stock = df_stock[df_stock.index.date >= _start_date]
        # print(self.df_stock.index.tolist()[0])
        
    def durationReturns(self):
        start_value = self.df_stock['Close'].iloc[0]
        end_value = self.df_stock['Close'].iloc[len(self.df_stock)-1]
        pct_change = round((end_value - start_value)*100/start_value, 2)
        _statement = f'Last {self.durationDays} days returns is {pct_change}%'
        print(_statement)
        return pct_change
    
    def trendline_slope(self):
        result = linregress(np.arange(len(self.df_stock)), self.df_stock.Close)
        slope = round(result.slope, 2)
        # print(f'Trendline slope for {self.durationDays} is {slope}')
        return result.slope
        
        
        

## Calculate stop loss

In [15]:
def buy_stoploss(ticker, df_ticker):
    df_ticker['diff'] = (df_ticker['High']-df_ticker['Open'])*100/df_ticker['Open']
    percentile_60 = round(np.percentile(df_ticker['diff'], 60), 2)
    return percentile_60

def sell_stoploss(ticker, df_ticker, buy_price=0):
    last_close = df_ticker.tail(1).iloc['Close']
    x = ((last_close - buy_price)*100)/buy_price
    # df_ticker['diff'] = (df_ticker['Low']-df_ticker['Open'])*100/df_ticker['Open']
    # percentile_50 = round(np.percentile(df_ticker['diff'], 50), 2)
    
    y = -6.95 * 10^{-7} * x^{7}+4.7 * 10^{-5} *  x^{6}-1.17 * 10^{-3} *  x^{5}+1.22 * 10^{-2} *  x^{4}-3.35 * 10^{-2} *  x^{3}-2.14 * 10^{-1} *  x^{2}+1.18 *  x-5
    return y
    
    

## Exponential Moving Averages

<!-- https://github.com/matplotlib/mplfinance#newapi -->
<!-- https://www.quantstart.com/articles/Backtesting-a-Moving-Average-Crossover-in-Python-with-pandas/ -->

In [57]:
# To do: Implement backtesting

# Calculate exponential moving average
def EMA(values: pd.Series, n: int):
        result = values.ewm(span=n, adjust=False, min_periods=1).mean()
        return result

# Check Moving Average Strategy
class MovingAverageCrossStrategy():
    def __init__(self, df_stock, short_window=100, long_window=400):
        self.df_stock = df_stock.copy()
        _today = date.today()
        
        if long_window < 365:
            _start_date = _today - timedelta(days=365)
        else:
            _start_date = _today - timedelta(days=long_window * 2)
        temp_df = self.df_stock[self.df_stock.index.date >= _start_date]
        
        self.short_window = short_window
        self.long_window = long_window

    def generate_signals(self):
        temp_df = pd.DataFrame()
        temp_df['Close'] = self.df_stock['Close']
        
        temp_df['short_mavg'] = EMA(temp_df.Close, self.short_window)
        temp_df['long_mavg'] = EMA(temp_df.Close, self.long_window)
        temp_df = temp_df.dropna()
        
        temp_df['short_shifted'] = temp_df['short_mavg'].shift(1)
        temp_df['long_shifted'] = temp_df['long_mavg'].shift(1)
        
        temp_df['buy_signal'] = np.where((temp_df['short_mavg'] > temp_df['long_mavg']) & (temp_df['short_shifted'] < temp_df['long_shifted']), temp_df['Close']*.97, None)
        
        temp_df['sell_signal'] = np.where((temp_df['short_mavg'] < temp_df['long_mavg']) & (temp_df['short_shifted'] > temp_df['long_shifted']), temp_df['Close']*1.03, None)

        temp_df['buy_signal'].iloc[:self.long_window] = np.nan
        temp_df['sell_signal'].iloc[:self.long_window] = np.nan
        return temp_df[['Close', 'short_mavg', 'long_mavg' ,'buy_signal','sell_signal']]
    
    def trade_signal(self):
        mavg_diff = None
        isValid = False
        
        trade_signals = self.generate_signals()
        self.signals = trade_signals.copy()
        # calculate % diff between long and short moving averages
        trade_signals['mavg_diff'] = (trade_signals['long_mavg'] - trade_signals['short_mavg'])*100/trade_signals['short_mavg']
        # check if short ma is greater than long ma. If yes; skip becuse its not ready for buy.
        last_record = trade_signals.tail(1).to_dict('records')[0]
        mavg_diff = round(last_record['mavg_diff'],2)
        
        # proceed only if there is both buy and sell signal
        if trade_signals['sell_signal'].count() == 0 or trade_signals['buy_signal'].count() == 0:
            return isValid, mavg_diff
        
        # Find out last sell signal date
        last_sell_signal_date = trade_signals[trade_signals['sell_signal'].notnull()].index.max()
        # Find out last buy signal date
        last_buy_signal_date = trade_signals[trade_signals['buy_signal'].notnull()].index.max()
        
        # proceed only if last signal is for sell and not buy
        if last_sell_signal_date > last_buy_signal_date:
            # print('Last Sell signal date: ', last_sell_signal_date)
            trade_signals = trade_signals[trade_signals.index >= last_sell_signal_date]
            # find max draw down. After max draw down; stock will start coming up
            min_short_mavg_date = trade_signals['short_mavg'].idxmin()
            # print('Max draw down date: ', min_short_mavg_date)
            trade_signals = trade_signals[trade_signals.index > min_short_mavg_date]
            # trade_signals = trade_signals[trade_signals['mavg_diff'].between(-1,2, inclusive=True)]
            if len(trade_signals) > 0:
                isValid = True
                
        return isValid, mavg_diff

    def generate_graph(self, ticker) -> str:
        if not isinstance(self.signals, pd.DataFrame) | len(self.signals) == 0:
            signals = self.generate_signals()
        else:
            signals = self.signals
        buy_markers = mpl.make_addplot(signals['buy_signal'].tolist(), type='scatter', markersize=120, marker='^', title='Buy')
        sell_markers = mpl.make_addplot(signals['sell_signal'].tolist(), type='scatter', markersize=120, marker='v', title='Sell')
        apds = [buy_markers, sell_markers]

        result = self.df_stock[['Open', 'High', 'Low', 'Close']]
        
        fig = go.Figure(data=[go.Candlestick(x=result.index,
                                     open=result.Open, 
                                     high=result.High,
                                     low=result.Low,
                                     close=result.Close,
                                     name='Close Price'), 
                    # add moving average lines
                      go.Scatter(x=signals.index, y=signals.short_mavg, line=dict(color='orange', width=1), name=str(self.short_window)+' MA'),
                      go.Scatter(x=signals.index, y=signals.long_mavg, line=dict(color='purple', width=1), name=str(self.long_window)+' MA'),
                    #   add buy and sell signal markers
                      go.Scatter(x=signals.index, y=signals.buy_signal, mode='markers', marker_symbol='arrow-up',
                                 marker=dict(color='DarkGreen', 
                                             size=15), 
                                 name='Buy'),
                      go.Scatter(x=signals.index, y=signals.sell_signal, mode='markers', marker_symbol='arrow-down',
                                 marker=dict(color='DarkRed', 
                                             size=15), 
                                 name='Sell'),
                      ])
        
        fig.update_layout(
            autosize=False,
            width=1000,
            height=700,
            title=ticker,
            xaxis_rangeslider_visible=True,
            )
        # fig.show()
        
        cwd = os.getcwd()
        filepath = f'{cwd}\\tmp\\{ticker}.jpeg'
        fig.write_image(filepath)
        return filepath

# Run Strategy

> * [progress bar asyncio](https://stackoverflow.com/questions/61041214/making-a-tqdm-progress-bar-for-asyncio)
> * [create pdf with python](https://www.geeksforgeeks.org/creating-pdf-documents-with-python/)
> * [reportlab](https://pythonassets.com/posts/create-pdf-documents-in-python-with-reportlab/)

In [99]:
# generate actual leads
async def generate_leads(ticker, df_ticker) -> None:
    try:
        mac = MovingAverageCrossStrategy(df_ticker, short_window=20, long_window=50)
        isValid, mavg_diff = mac.trade_signal()
        if isValid:
            if -1 <= mavg_diff <= 2:
                
                print('########################################')
                print(f'{ticker} => ready for buy review : {isValid}, MA Difference :{mavg_diff}%')
                # print('----------------------------------------')
                
                texts = []
                slope_multiple = 1
                for duration in [365*5, 365, 180, 30]:
                    br = BasicReturns(ticker, df_ticker, duration)
                    slope = br.trendline_slope()
                    slope_multiple += slope
                    _return = br.durationReturns()
                    texts.append(f'Last {duration} days return is {_return}%')
                buy_sltp = buy_stoploss(ticker, df_ticker)
                print(f'Overall slope additive is {round(slope_multiple, 2)}')
                print(f'Buy stop loss @ {buy_sltp}%')
                
                graphpath = mac.generate_graph(ticker)
                create_report_file(ticker, texts, graphpath)
    except Exception as e:
        print(f'Analysis failed for {ticker}')
        print(traceback.format_exc())
        # print(e)
            
# trigger function
async def strategy_run(stock_codes: list) -> None:
    start_time = perf_counter()

    tasks = []
    active_tasks = 0
    if isinstance(stock_codes, list) & len(stock_codes) == 0:
        stock_codes = get_stock_codes()
    
    for ticker in tqdm(stock_codes, desc="Submitting analysis task"):
        try:
            isValid, df_ticker = read_ticker_file(ticker)
            if isValid:
                task = asyncio.create_task(generate_leads(ticker, df_ticker))
                task.name = ticker
                tasks.append(task)
        except Exception as e:
            print('Error', e)
            continue
        
    await asyncio.gather(*tasks, return_exceptions=False)
    
    total_time = perf_counter() - start_time
    get_time_hh_mm_ss(total_time)
    


In [11]:

# sell stop loss calculation formula
# https://tools.timodenk.com/polynomial-interpolation
# stop loss% %change
# -5.00 0
# 0.00 -5.00
# 5.00	-4.00
# 10.00 -3.00
# 12.00 -2.00
# 15.00 -1.50
# 18 -1.50
# 20.00 -1.00
# -6.95 * 10^{-7} * x^{7}+4.7 * 10^{-5} *  x^{6}-1.17 * 10^{-3} *  x^{5}+1.22 * 10^{-2} *  x^{4}-3.35 * 10^{-2} *  x^{3}-2.14 * 10^{-1} *  x^{2}+1.18 *  x-5

In [101]:
# await collect_all_hostoricals()
await strategy_run(stock_codes=[])

# sell_stoploss('ABOTTINDIA')