In [1]:
import datetime
import random
import time
import backtrader as bt
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from alpha_vantage.timeseries import TimeSeries
from deap import base
from deap import creator
from deap import tools
from tqdm import trange
from pathlib import Path
from IPython import get_ipython
get_ipython().run_line_magic('matplotlib', 'inline')


ALPHA_VANTAGE_DIR_PATH = Path("C:/Users/varan/Downloads")
SECRET = "UHZGIZAJSQ9RXRV0"


def get_alpha_vantage(key, ticker):
    """Given a key to Alpha Vantage and a valid ticker, this function will
    query alpha vantage and save the dataset into a csv in a predefined
    directory using ticker as the filename.
    """
    ts = TimeSeries(key=key, output_format="pandas", indexing_type="date")
    
    try:
        data, meta_data = ts.get_daily_adjusted(symbol=ticker, outputsize="full")
        data.to_csv(ALPHA_VANTAGE_DIR_PATH / f"{ticker}.csv")
        print(f"{ticker} has been downloaded to {ALPHA_VANTAGE_DIR_PATH}/{ticker}.csv")
    except:
        print(f"{ticker} Not found.")


def read_alpha_vantage(ticker):
    """If the ticker's csv has been downloaded with `get_alpha_vantage`,
    this function will return a pandas dataframe of adjusted open, adjusted
    high, adjusted low, adjusted close and volume rounded to 4 decimal places
    """
    if not (ALPHA_VANTAGE_DIR_PATH / f"{ticker}.csv").exists():
        return None

    df = pd.read_csv(
        ALPHA_VANTAGE_DIR_PATH / f"{ticker}.csv", index_col=0, parse_dates=True
    ).sort_index()
    df = df.rename(
        columns={
            "1. open": "Open",
            "2. high": "High",
            "3. low": "Low",
            "4. close": "Close",
            "5. adjusted close": "Adjusted Close",
            "6. volume": "Volume",
            "7. dividend amount": "Dividend",
            "8. split coefficient": "Split Coefficient",
        }
    )
    df["Unadjusted Open"] = df["Open"]
    df["Open"] = df["Close"] * df["Adjusted Close"] / df["Open"]
    df["High"] = df["High"] * df["Open"] / df["Unadjusted Open"]
    df["Low"] = df["Low"] * df["Open"] / df["Unadjusted Open"]
    df["Close"] = df["Adjusted Close"]
    return df[["Open", "High", "Low", "Close", "Volume"]].round(4)


In [2]:
get_alpha_vantage(key=SECRET, ticker="NVDA")
df = read_alpha_vantage(ticker="NVDA")

print(df.head())


NVDA has been downloaded to C:\Users\varan\Downloads/NVDA.csv
              Open    High     Low   Close     Volume
date                                                 
1999-11-01  1.9456  2.1808  1.9456  1.8007  1630300.0
1999-11-02  2.0005  2.0999  1.9846  1.9156  1744800.0
1999-11-03  2.2191  2.4008  2.2028  2.1026  4191000.0
1999-11-04  2.3528  2.5384  2.3528  2.2367  2625700.0
1999-11-05  2.0384  2.0554  1.8264  2.1647  1284100.0


In [4]:
class CrossoverStrategy(bt.Strategy):
    # list of parameters which are configurable for the strategy
    params = dict(fast_period=12, slow_period=26, signal_period=9)

    def __init__(self):

        self.fast_ma = bt.indicators.EMA(self.data.close, period=self.p.fast_period)
        self.slow_ma = bt.indicators.EMA(self.data.close, period=self.p.slow_period)
        self.macd_line = self.fast_ma - self.slow_ma
        self.signal_line = bt.indicators.EMA(self.macd_line, period=self.p.signal_period)
        self.macd_crossover = bt.indicators.CrossOver(self.macd_line, self.signal_line)

    def next(self):

        if self.macd_crossover > 0:
            self.close()  # close short position
            self.buy()  # enter long position
        elif self.macd_crossover < 0:
            self.close()  # close long position
            self.sell()  # enter short position


TICKER = "NVDA"
STRATEGY_PARAMS = dict(fast_period=12, slow_period=26, signal_period=9)

In [5]:
def run_backtest(plot=True, **strategy_params):

    cerebro = bt.Cerebro()
    cerebro.adddata(
        bt.feeds.PandasData(dataname=read_alpha_vantage(ticker=TICKER), name=TICKER)
    )

    # Remember to set it high enough or the strategy may not
    # be able to trade because of short of cash
    cerebro.broker.setcash(10000.0)

    # Print out the starting conditions
    print(f"Starting Portfolio Value: {cerebro.broker.getvalue():,.2f}")

    # Although we have defined some default params in the strategy,
    # we can override it by passing in keyword arguments here.
    cerebro.addstrategy(CrossoverStrategy, **strategy_params)

    cerebro.addobserver(bt.observers.Trades)
    cerebro.addobserver(bt.observers.DrawDown)
    cerebro.addanalyzer(bt.analyzers.SharpeRatio)

    # Let's say that we have 0.25% slippage and commission per trade,
    # that is 0.5% in total for a round trip.
    cerebro.broker.setcommission(commission=0.0025, margin=False)

    # Run over everything
    strats = cerebro.run()

    print(f"Final Portfolio Value:    {cerebro.broker.getvalue():,.2f}")

    if plot:
        cerebro.plot(iplot=False)




In [6]:
run_backtest(plot=True, **STRATEGY_PARAMS)

Starting Portfolio Value: 10,000.00
Final Portfolio Value:    9,714.77


In [7]:
run_backtest(**STRATEGY_PARAMS)


# fix the seed so that we will get the same results
# feel free to change it or comment out the line
random.seed(1)

# GA parameters
PARAM_NAMES = ["fast_period", "slow_period", "signal_period"]
NGEN = 20
NPOP = 100
CXPB = 0.5
MUTPB = 0.3

data = bt.feeds.PandasData(dataname=read_alpha_vantage(ticker=TICKER), name=TICKER)

Starting Portfolio Value: 10,000.00
Final Portfolio Value:    9,714.77


In [8]:
def evaluate(individual, plot=False, log=False):

    # convert list of parameter values into dictionary of kwargs
    strategy_params = {k: v for k, v in zip(PARAM_NAMES, individual)}

    # fast moving average by definition cannot be slower than the slow one
    if strategy_params["fast_period"] >= strategy_params["slow_period"]:
        return [-np.inf]

    # by setting stdstats to False, backtrader will not store the changes in
    # statistics like number of trades, buys & sells, etc.
    cerebro = bt.Cerebro(stdstats=False)
    cerebro.adddata(data)

    # Remember to set it high enough or the strategy may not
    # be able to trade because of short of cash
    initial_capital = 10_000.0
    cerebro.broker.setcash(initial_capital)

    # Pass in the genes of the individual as kwargs
    cerebro.addstrategy(CrossoverStrategy, **strategy_params)

    # This is needed for calculating our fitness score
    cerebro.addanalyzer(bt.analyzers.DrawDown)

    # Let's say that we have 0.25% slippage and commission per trade,
    # that is 0.5% in total for a round trip.
    cerebro.broker.setcommission(commission=0.0025, margin=False)

    # Run over everything
    strats = cerebro.run()

    profit = cerebro.broker.getvalue() - initial_capital
    max_dd = strats[0].analyzers.drawdown.get_analysis()["max"]["moneydown"]
    fitness = profit / (max_dd if max_dd > 0 else 1)

    if log:
        print(f"Starting Portfolio Value: {initial_capital:,.2f}")
        print(f"Final Portfolio Value:    {cerebro.broker.getvalue():,.2f}")
        print(f"Total Profit:             {profit:,.2f}")
        print(f"Maximum Drawdown:         {max_dd:,.2f}")
        print(f"Profit / Max DD:          {fitness}")

    if plot:
        cerebro.plot()

    return [fitness]


# our fitness score is supposed to be maximised and there is only 1 objective
creator.create("FitnessMax", base.Fitness, weights=(1.0,))

# our individual is a list of genes, with the fitness score the higher the better
creator.create("Individual", list, fitness=creator.FitnessMax)

# register some handy functions for calling
toolbox = base.Toolbox()
toolbox.register("indices", random.sample, range(NPOP), NPOP)
# crossover strategy
toolbox.register("mate", tools.cxUniform, indpb=CXPB)
# mutation strategy
toolbox.register("mutate", tools.mutUniformInt, low=1, up=151, indpb=0.2)
# selection strategy
toolbox.register("select", tools.selTournament, tournsize=3)
# fitness function
toolbox.register("evaluate", evaluate)

# definition of an individual & a population
toolbox.register("attr_fast_period", random.randint, 1, 51)
toolbox.register("attr_slow_period", random.randint, 10, 151)
toolbox.register("attr_signal_period", random.randint, 1, 101)
toolbox.register(
    "individual",
    tools.initCycle,
    creator.Individual,
    (
        toolbox.attr_fast_period,
        toolbox.attr_slow_period,
        toolbox.attr_signal_period,
    ),
)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

mean = np.ndarray(NGEN)
best = np.ndarray(NGEN)
hall_of_fame = tools.HallOfFame(maxsize=3)

t = time.perf_counter()
pop = toolbox.population(n=NPOP)
for g in trange(NGEN):
    # Select the next generation individuals
    offspring = toolbox.select(pop, len(pop))
    # Clone the selected individuals
    offspring = list(map(toolbox.clone, offspring))

    # Apply crossover on the offspring
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < CXPB:
            toolbox.mate(child1, child2)
            del child1.fitness.values
            del child2.fitness.values

    # Apply mutation on the offspring
    for mutant in offspring:
        if random.random() < MUTPB:
            toolbox.mutate(mutant)
            del mutant.fitness.values

    # Evaluate the individuals with an invalid fitness
    invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
    fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit

    # The population is entirely replaced by the offspring
    pop[:] = offspring
    hall_of_fame.update(pop)
    print(
        "HALL OF FAME:\n"
        + "\n".join(
            [
                f"    {_}: {ind}, Fitness: {ind.fitness.values[0]}"
                for _, ind in enumerate(hall_of_fame)
            ]
        )
    )

    fitnesses = [
        ind.fitness.values[0] for ind in pop if not np.isinf(ind.fitness.values[0])
    ]
    mean[g] = np.mean(fitnesses)
    best[g] = np.max(fitnesses)

end_t = time.perf_counter()
print(f"Time Elapsed: {end_t - t:,.2f}")

fig, ax = plt.subplots(sharex=True, figsize=(16, 9))

sns.lineplot(x=range(NGEN), y=mean, ax=ax, label="Average Fitness Score")
sns.lineplot(x=range(NGEN), y=best, ax=ax, label="Best Fitness Score")
ax.set_title("Fitness Score")
ax.set_xticks(range(NGEN))
ax.set_xlabel("Iteration")

plt.tight_layout()
plt.show()

OPTIMISED_STRATEGY_PARAMS = {k: v for k, v in zip(PARAM_NAMES, hall_of_fame[0])}
run_backtest(**OPTIMISED_STRATEGY_PARAMS)


  5%|▌         | 1/20 [07:56<2:31:00, 476.86s/it]HALL OF FAME:
    0: [34, 62, 55], Fitness: 1.5277777358006763
    1: [36, 75, 5], Fitness: 1.489520339567398
    2: [33, 57, 78], Fitness: 1.489295087459557
 10%|█         | 2/20 [12:51<1:50:57, 369.84s/it]HALL OF FAME:
    0: [9, 19, 56], Fitness: 2.051923707694525
    1: [50, 55, 5], Fitness: 1.8431648597139578
    2: [27, 140, 5], Fitness: 1.7677367814597615
 15%|█▌        | 3/20 [17:51<1:35:46, 338.02s/it]HALL OF FAME:
    0: [5, 143, 44], Fitness: 2.1630765521085213
    1: [9, 19, 56], Fitness: 2.051923707694525
    2: [50, 55, 5], Fitness: 1.8431648597139578
 20%|██        | 4/20 [21:53<1:19:55, 299.74s/it]HALL OF FAME:
    0: [5, 143, 44], Fitness: 2.1630765521085213
    1: [9, 19, 56], Fitness: 2.051923707694525
    2: [5, 62, 44], Fitness: 1.8665515029706001
 25%|██▌       | 5/20 [26:54<1:15:05, 300.35s/it]HALL OF FAME:
    0: [5, 143, 44], Fitness: 2.1630765521085213
    1: [9, 19, 56], Fitness: 2.051923707694525
    2: [5, 62