In [1]:
import vectorbt as vbt
from vectorbt.signals.factory import SignalFactory
import numpy as np
from numba import njit
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from abc import abstractmethod, ABCMeta
import time
from pathlib import Path
from itertools import product, combinations, permutations 
from typing import List, Optional, Union
import time
import yfinance as yf

## Top 30 Stocks in India

In [2]:
tickers = [
    "NTPC.NS","HINDALCO.NS","BHARTIARTL.NS",
    "SHREECEM.NS","TCS.NS","HDFCLIFE.NS",
    "CIPLA.NS","LT.NS","ULTRACEMCO.NS",
    "WIPRO.NS","KOTAKBANK.NS","BAJFINANCE.NS",
    "TATACONSUM.NS","BAJAJFINSV.NS","MARUTI.NS",
    "TITAN.NS","ICICIBANK.NS","ONGC.NS","ITC.NS",
    "APOLLOHOSP.NS","BRITANNIA.NS","BAJAJ-AUTO.NS",
    "TECHM.NS","COALINDIA.NS","TATASTEEL.NS","HEROMOTOCO.NS",
    "INDUSINDBK.NS","NESTLEIND.NS","M&M.NS","RELIANCE.NS"
]

### Utilities

In [3]:
DATA_PATH = Path("./data").resolve()

def download_data(
    ticker: str,
    period: Optional[str] = "max",
    interval: Optional[str] = "1d",
    auto_adjust: Optional[bool] = True,
    path: Union[str, Path] = DATA_PATH,
):
    """
    downloads data from yahoo and converts it to parquet.
    to be run using a threadpool executor since it is IO bound.

    i.e from concurrent.futures import ThreadPoolExecutor
    with ThreadPoolExecutor as executor:
        executor.map(download_data, [Tickers])

    TODO:
        stop it from saving parquet files of empty tickers

    """
    try:
        temp = yf.Ticker(ticker)
        temp_history = temp.history(
            period=period, interval=interval, auto_adjust=auto_adjust
        )
        temp_history.to_parquet(
            path=f"{path/ticker}.parquet", engine="pyarrow", index=True
        )
    except:
        pass
    else:
        print(f"{ticker} was successfully downloaded")
        time.sleep(2)


def delete_empty_parquet(path):
    for p in path.iterdir():
        if (len(pd.read_parquet(p))) == 0:
            print(f"deleting {p}")
            p.unlink()


def get_file_name(path: Path, output: list = []):
    for file in path.iterdir():
        output.append(file.stem)
    return output

def permute_data(data: pd.Series, seed: Optional[int] = 1730) -> pd.Series:
    """
    shuffled a given data, N.B faster than the np.shuffle and
    allows for using random state for replicability
    """
    rng = np.random.default_rng(seed)
    permuted_data = rng.permutation(data.values)  # faster than choice
    return pd.Series(data=permuted_data, index=data.index)


def random_like(
    data: pd.Series,
    seed: Optional[int] = 1730,
    p: List[float] = [
        0.5,
    ],
    binomial: Optional[bool] = True,
) -> pd.Series:
    """
    generates entries of the same size as the main entries according to given probabilities.
    two options are given, Binomial or choice. Binomial is faster

    """
    rng = np.random.default_rng(seed)
    if binomial:
        random_data = rng.binomial(n=1, p=0.5, size=data.size).astype("bool")
    else:
        random_data = rng.choice([True, False], size=data.size, p=p)
    return pd.Series(data=random_data, index=data.index)

## Download Data

In [4]:
# with ThreadPoolExecutor() as executor:
#     executor.map(download_data, tickers)

In [5]:
START_DATE = pd.to_datetime("2009-10-01")
END_DATE = pd.to_datetime("2020-01-31")

## SETTINGS

### Base strategy class

In [6]:
class Strategy(metaclass=ABCMeta):
    @abstractmethod
    def get_entries(self):
        """
        returns entries signals
        """
        pass

    @abstractmethod
    def get_exits(self):
        """
        returns exit signals
        """
        pass

### Backtest

In [7]:
class Backtest:
    def __init__(
        self,
        ticker: str,
        strategy: Strategy,
        fees: float = 0.001,
        size: float = np.inf,
        freq: str = "1D",
    ):
        self.ticker = ticker
        self.strategy = strategy
        self.fees = fees
        self.size = size
        self.freq = freq

    def __repr__(self):
        return f"<Bactest {str(self.strategy)}>"

    def _get_portfolio(self, entries, exits):
        return vbt.Portfolio.from_signals(
            close=self.strategy.data,
            entries=entries,
            exits=exits,
            fees=self.fees,
            size=self.size,
            freq=self.freq,
        )

    def run(self, **kwargs):
        entries = self.strategy.get_entries()
        exits = self.strategy.get_exits()
        
        
        

        random_entries = entries.apply(random_like)
        permuted_entries = entries.apply(permute_data)

        portfolio = self._get_portfolio(entries, exits, **kwargs)
        random_porfolio = self._get_portfolio(random_entries, exits, **kwargs)
        permuted_portfolio = self._get_portfolio(permuted_entries, exits, **kwargs)

        output = {
            self.ticker: dict(
                mean_expectancy=portfolio.trades.expectancy().mean(),
                max_expectancy=portfolio.trades.expectancy().max(),
                mean_random_expectancy=random_porfolio.trades.expectancy().mean(),
                mean_permuted_expectancy=permuted_portfolio.trades.expectancy().mean(),
            )
        }
        return output

In [8]:
class Backtest_V2:
    def __init__(
        self,
        ticker: str,
        strategy: Strategy,
        fees: float = 0.001,
        size: float = np.inf,
        freq: str = "1D",
    ):
        self.ticker = ticker
        self.strategy = strategy
        self.fees = fees
        self.size = size
        self.freq = freq
        self.data = self.strategy.data

    def __repr__(self):
        return f"<Bactest {str(self.strategy)}>"

    def _get_portfolio(self, entries, exits, **kwargs):
        return vbt.Portfolio.from_signals(
            close=self.data,
            entries=entries,
            exits=exits,
            fees=self.fees,
            size=self.size,
            freq=self.freq,
            **kwargs
        )

    def run(self, **kwargs):
        entries = self.strategy.get_entries()
        exits = self.strategy.get_exits()
        
        random_entries = entries.apply(random_like)
        permuted_entries = entries.apply(permute_data)
        
        time_based_signal= TimeBasedSignals.run(entries, [50, 100, 200]) # entry vs time_based
        random_vs_time_based_signal = TimeBasedSignals.run(random_entries, [50, 100, 200]) # random_entry vs timebased 
        
        step = 0.01  # in %
        stops = np.arange(step, 1 + step, step)
        entry_vs_trailing_sl = vbt.STCX.run(entries, self.data, 
                                            stop =  list(stops), trailing = True)
        
        random_vs_time_based_entries = random_vs_time_based_signal.new_entries
        random_vs_time_based_exits = random_vs_time_based_signal.exits
        
        time_based_entries = time_based_signal.new_entries #This adapts entries to the exit signals
        time_based_exits = time_based_signal.exits
        
        entry_vs_trailing_sl_entries =  entry_vs_trailing_sl.new_entries
        entry_vs_trailing_sl_exits =  entry_vs_trailing_sl.exits
        
        portfolio = self._get_portfolio(entries,exits, **kwargs)
        random_porfolio = self._get_portfolio(random_entries, exits, **kwargs)
        permuted_portfolio = self._get_portfolio(permuted_entries, exits, **kwargs)
        time_based_pf = self._get_portfolio(time_based_entries, time_based_exits, **kwargs)
        random_vs_time_based_pf = self._get_portfolio(random_vs_time_based_entries,  random_vs_time_based_exits, **kwargs)
        
        time_based_pf  = time_based_pf.trades.expectancy().groupby("custom_num_days").mean()
        random_vs_time_based_pf  = random_vs_time_based_pf.trades.expectancy().groupby("custom_num_days").mean()
        entry_vs_trailing_sl_pf = vbt.Portfolio.from_signals(self.data, entry_vs_trailing_sl_entries,
                                                             entry_vs_trailing_sl_exits)
        
        
        output = {
            self.ticker: dict(
                entry_vs_exit_expectancy=portfolio.trades.expectancy().mean(),
                max_entry_expectancy=portfolio.trades.expectancy().max(),
                random_vs_exit_expectancy =random_porfolio.trades.expectancy().mean(),
                mean_permuted_expectancy=permuted_portfolio.trades.expectancy().mean(),
                entry_vs_50D_expectancy = time_based_pf.values[0],
                entry_vs_100D_expectancy = time_based_pf.values[1],
                entry_vs_200D_expectancy= time_based_pf.values[2],
                random_vs_50D_expectancy = random_vs_time_based_pf.values[0],
                random_vs_100D_expectancy = random_vs_time_based_pf.values[1],
                random_vs_200D_expectancy = random_vs_time_based_pf.values[2],
                entry_vs_trailing_sl_expectancy = entry_vs_trailing_sl_pf.trades.expectancy().mean()
                

            )
        }
        return output

## Strategies

## SMA

In [9]:
class SMAStrategy(Strategy):
    def __init__(self, data):
        self.data = data
    
    def init(self):
        windows=  np.arange(10, 100, 5)
        indicator = vbt.IndicatorFactory.from_pandas_ta("SMA")
        self.fast_sma, self.slow_sma = indicator.run_combs(self.data, windows, short_names=['fast', 'slow'])
        
    def get_entries(self):
        self.init()
        return self.fast_sma.sma_crossed_above(self.slow_sma)
    
    def get_exits(self):
        return self.fast_sma.sma_crossed_below(self.slow_sma)   

#### TODO TEST ON PARTITIONS OF DATA

## MACD

In [10]:
class MACDStrategy(Strategy):
    def __init__(self, data):
        self.data = data
        
    def init(self):
        fast_windows, slow_windows, signal_windows = vbt.utils.params.create_param_combs(
            (product, (combinations, np.arange(2, 51, 1), 2), np.arange(2, 21, 1)))
        
        self.indicator = vbt.MACD.run(
            self.data,
            fast_window=fast_windows,
            slow_window=slow_windows,
            signal_window=signal_windows
        )
    
    def get_entries(self):
        self.init()
        return self.indicator.macd_above(0) & self.indicator.macd_above(self.indicator.signal)
    
    def get_exits(self):
        return self.indicator.macd_below(0) | self.indicator.macd_below(self.indicator.signal)   

In [11]:
## TODO Refactor the code
def run_backtest(ticker:str):
    print(f"backtesting on {ticker}")
    data = pd.read_parquet(f"{DATA_PATH/ticker}.parquet").get("Close")
    strategy = MACDStrategy(data)
    backtest = Backtest(ticker, strategy)
    result = backtest.run()
    return pd.DataFrame.from_dict(result, orient="index")

## BBANDS

In [12]:
class BBANDStrategy(Strategy):
    def __init__(self, data):
        self.data = data
        
    def init(self):
        lengths, stds, mas = vbt.utils.params.create_param_combs((product, np.arange(10, 55, 5),  (product, [2, 3],["sma", "ema"]) ) ) 
        self.indicator = vbt.IndicatorFactory.from_pandas_ta("BBANDS").run(
            self.data,
            length=lengths,
            std=stds,
            mamode = mas
        )
        
    
    def get_entries(self):
        self.init()
        return self.indicator.close_below(self.indicator.bbl)
        
       
    
    def get_exits(self):
        return self.indicator.close_above(self.indicator.bbu)

In [13]:
## ENTRY BUT EXIT AFTER 50, 100, 200 DAYS
#adapted from the documentation

@njit
def wait_choice_nb(from_i, to_i, col, num_days, temp_idx_arr):
    temp_idx_arr[0] = from_i + num_days - 1
    if temp_idx_arr[0] < to_i:
        return temp_idx_arr[:1]
    return temp_idx_arr[:0]  
            
    
# Build signal generator
TimeBasedSignals = SignalFactory(
    mode='chain',
    param_names=['num_days']
).from_choice_func(
    exit_choice_func=wait_choice_nb,
    exit_settings=dict(
        pass_params=['num_days'],
        pass_kwargs=['temp_idx_arr']
    ) 
)

In [14]:
def run_backtest(ticker:str, strategy_object:Strategy):
    data = pd.read_parquet(f"{DATA_PATH/ticker}.parquet").get("Close")
    data = data.loc[START_DATE:END_DATE]
    strategy = strategy_object(data)
    backtest = Backtest_V2(ticker, strategy)
    print(f"backtesting on {ticker}")
    result = backtest.run()
    time.sleep(5)
    return pd.DataFrame.from_dict(result, orient="index")

## Test run

In [15]:
# START_DATE_1  = pd.to_datetime("2010-01-01")
# END_DATE_1   = pd.to_datetime("2013-01-01")

# START_DATE_2 = pd.to_datetime("2013-01-01")
# END_DATE_2 = pd.to_datetime("2016-01-01")

# START_DATE_3 = pd.to_datetime("2016-01-01")
# END_DATE_3 = pd.to_datetime("2019-01-01")

# PARTITION_1 = titan_price.loc[START_DATE_1 : END_DATE_1]
# PARTITION_2 = titan_price.loc[START_DATE_2 : END_DATE_2]
# PARTITION_3 = titan_price.loc[START_DATE_3 : END_DATE_3]

In [16]:
from functools import partial

In [17]:
sma_strategy = SMAStrategy
sma_backtest = partial(run_backtest, strategy_object = sma_strategy) 

In [18]:
num_batches = 30//3

In [20]:
# for batch in range(num_batches):
#     with ProcessPoolExecutor() as executor:
#         result = executor.map(sma_backtest, tickers[3 * batch: 3 * (batch + 1) ])
#     time.sleep(10)
#     output = pd.concat(list(result))
#     output.to_csv(f"SMA_FULL_{batch}")

In [None]:
for batch in range(num_batches):
    results = []
    for ticker in tickers[3 * batch : 3 * (batch + 1)]:
        results.append(sma_backtest(ticker))
    results = pd.concat(results)
    print(f"saving for batch {batch}")
    results.to_csv(f"SMA_{batch}.csv")
    time.sleep(5)       

backtesting on NTPC.NS
backtesting on HINDALCO.NS
backtesting on BHARTIARTL.NS
saving for batch 0
backtesting on SHREECEM.NS
backtesting on TCS.NS
backtesting on HDFCLIFE.NS
saving for batch 1
backtesting on CIPLA.NS
backtesting on LT.NS
