# Introduction to Dataset and Backtrader

## 1. Dataset Introduction

The dataset we will be using is from AlgoSeek LLC, provided as part of the book Machine Learning for Trading by Stefan Jansen. It consists of data for the Nasdaq 100 stocks in their proprietary data format.

In [1]:
from urllib.request import Request, urlopen
from io import BytesIO
from zipfile import ZipFile


def download_and_unzip(url, extract_to='.'):
    http_response = urlopen(url)
    zipfile = ZipFile(BytesIO(http_response.read()))
    zipfile.extractall(path=extract_to)

In [2]:
import os
os.mkdir('data')
# download nasdaq100 data
download_and_unzip('https://algoseek-public.s3.amazonaws.com/nasdaq100-1min.zip', extract_to='data')


In [14]:
for f in tqdm(list(nasdaq_path.glob('2015/**/*.csv.gz'))):
    print(f)

100%|██████████| 27184/27184 [00:00<00:00, 484070.48it/s]

data/nasdaq100/2015/20151221/ALXN.csv.gz
data/nasdaq100/2015/20151221/CTSH.csv.gz
data/nasdaq100/2015/20151221/VIAB.csv.gz
data/nasdaq100/2015/20151221/VRSK.csv.gz
data/nasdaq100/2015/20151221/MYL.csv.gz
data/nasdaq100/2015/20151221/PCAR.csv.gz
data/nasdaq100/2015/20151221/FAST.csv.gz
data/nasdaq100/2015/20151221/ESRX.csv.gz
data/nasdaq100/2015/20151221/LLTC.csv.gz
data/nasdaq100/2015/20151221/NVDA.csv.gz
data/nasdaq100/2015/20151221/WFM.csv.gz
data/nasdaq100/2015/20151221/TMUS.csv.gz
data/nasdaq100/2015/20151221/ENDP.csv.gz
data/nasdaq100/2015/20151221/WDC.csv.gz
data/nasdaq100/2015/20151221/QCOM.csv.gz
data/nasdaq100/2015/20151221/LRCX.csv.gz
data/nasdaq100/2015/20151221/MAT.csv.gz
data/nasdaq100/2015/20151221/VOD.csv.gz
data/nasdaq100/2015/20151221/FOXA.csv.gz
data/nasdaq100/2015/20151221/ROST.csv.gz
data/nasdaq100/2015/20151221/NFLX.csv.gz
data/nasdaq100/2015/20151221/CA.csv.gz
data/nasdaq100/2015/20151221/CTXS.csv.gz
data/nasdaq100/2015/20151221/PYPL.csv.gz
data/nasdaq100/2015/201




In [21]:
from pathlib import Path
from tqdm import tqdm
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds 
import pyarrow.csv 
import pandas as pd

nasdaq_path = Path('data/nasdaq100')

def pyarrow_extract_and_combine_data():
    path = nasdaq_path
    data = []
    # ~80K files to process
    for f in tqdm(list(path.glob('*/**/*.csv.gz'))):
        data.append(pa.csv.read_csv(f))
    
    data = pa.concat_tables(data)
    
    return data

def inc_extract_and_combine_data():
    path = nasdaq_path
    data = []
    
    years = ['2015','2016','2017']
    
    for year in years:
        ydata = []
        tcols = ['OpenBarTime','FirstTradeTime',
                    'HighBidTime','HighAskTime',
                    'HighTradeTime','LowBidTime',
                    'LowAskTime','LowTradeTime',
                    'CloseBarTime','LastTradeTime']
        for f in tqdm(list(path.glob(f'{year}/**/*.csv.gz'))):
            ydata.append(pd.read_csv(f, parse_dates=[['Date', 'TimeBarStart']])
                        .set_index('Date_TimeBarStart')
                        .sort_index()
                        .drop(tcols, axis=1)
                        .set_index('Ticker', append=True)
                        .swaplevel())
        print('concat step')
        ydata = pd.concat(ydata)
        
        print('tcol step')
        # ydata[tcols] = ydata[tcols].apply(pd.to_datetime, errors='coerce')
        ydata = ydata.apply(pd.to_numeric, downcast='integer')
        ydata.index.rename(['Ticker', 'BarDateTime'], inplace=True)
        ydata['ticker'] = ydata.index.get_level_values('Ticker')
        ydata['datetime'] = ydata.index.get_level_values('BarDateTime')
        ydata['Year'] = ydata.index.get_level_values('BarDateTime').year
        ydata['Month'] = ydata.index.get_level_values('BarDateTime').month
        print(ydata.info(show_counts=True))
        # data.to_hdf(nasdaq_path / 'algoseek.h5', 'min_taq')
        ydata.to_parquet('data/1min_taq/',partition_cols=['Year','Month','ticker'], index=True)
        ypa_data = pa.Table.from_pandas(ydata)
        ypart = ds.partitioning(
            pa.schema([
                ('Year', pa.int64()),
                ('Month', pa.int64()),
                ('ticker', pa.string())
            ]),
            flavor='hive'
        )
        ds.write_dataset(ypa_data, 'data/taq1min', format='parquet', partitioning=ypart, existing_data_behavior='overwrite_or_ignore')    
        

def extract_and_combine_data():
    path = nasdaq_path 

    data = []
    # ~80K files to process
    for f in tqdm(list(path.glob('*/**/*.csv.gz'))):
        data.append(pd.read_csv(f, parse_dates=[['Date', 'TimeBarStart']])
                    .set_index('Date_TimeBarStart')
                    .sort_index()
                    .set_index('Ticker', append=True)
                    .swaplevel())
    data = pd.concat(data).apply(pd.to_numeric, downcast='integer')
    data.index.rename(['Ticker', 'BarDateTime'])
    data['ticker'] = data.index.get_level_values('Ticker')
    data['datetime'] = data.index.get_level_values('BarDateTime')
    data['Year'] = data.index.get_level_values('BarDateTime').dt.year
    data['Month'] = data.index.get_level_values('BarDateTime').dt.month
    print(data.info(show_counts=True))
    data.to_hdf(nasdaq_path / 'algoseek.h5', 'min_taq')
    data.to_parquet('data/1min_taq/',partition_cols=['Year','Month','ticker'], index=True)
    pa_data = pa.Table.from_pandas(data)
    part = ds.partitioning(
        pa.schema([
            ('year', pa.int64()),
            ('month', pa.int64()),
            ('ticker', pa.string())
        ]),
        flavor='hive'
    )
    ds.write_dataset(pa_data, 'data/taq1min', format='parquet', partitioning=part)


In [22]:
inc_extract_and_combine_data()

100%|██████████| 26664/26664 [03:29<00:00, 127.15it/s]


concat step
tcol step
<class 'pandas.core.frame.DataFrame'>
MultiIndex: 24872789 entries, ('ALXN', Timestamp('2016-04-28 04:00:00')) to ('GILD', Timestamp('2016-05-26 20:00:00'))
Data columns (total 52 columns):
 #   Column                       Non-Null Count     Dtype         
---  ------                       --------------     -----         
 0   OpenBidPrice                 24581638 non-null  float64       
 1   OpenBidSize                  24581638 non-null  float64       
 2   OpenAskPrice                 24011111 non-null  float64       
 3   OpenAskSize                  24011111 non-null  float64       
 4   FirstTradePrice              11043255 non-null  float64       
 5   FirstTradeSize               11043255 non-null  float64       
 6   HighBidPrice                 24608302 non-null  float64       
 7   HighBidSize                  24608302 non-null  float64       
 8   HighAskPrice                 24037775 non-null  float64       
 9   HighAskSize                  240377

100%|██████████| 26346/26346 [03:41<00:00, 118.86it/s]


concat step
tcol step
<class 'pandas.core.frame.DataFrame'>
MultiIndex: 24654570 entries, ('ALXN', Timestamp('2017-08-24 04:00:00')) to ('GILD', Timestamp('2017-01-23 20:00:00'))
Data columns (total 52 columns):
 #   Column                       Non-Null Count     Dtype         
---  ------                       --------------     -----         
 0   OpenBidPrice                 24294076 non-null  float64       
 1   OpenBidSize                  24294076 non-null  float64       
 2   OpenAskPrice                 23775902 non-null  float64       
 3   OpenAskSize                  23775902 non-null  float64       
 4   FirstTradePrice              11153237 non-null  float64       
 5   FirstTradeSize               11153237 non-null  float64       
 6   HighBidPrice                 24320422 non-null  float64       
 7   HighBidSize                  24320422 non-null  float64       
 8   HighAskPrice                 23802248 non-null  float64       
 9   HighAskSize                  238022

In [23]:
tst_ds = ds.dataset('data/taq1min/', format='parquet', partitioning='hive')
tst_ds.schema 

OpenBidPrice: double
OpenBidSize: double
OpenAskPrice: double
OpenAskSize: double
FirstTradePrice: double
FirstTradeSize: double
HighBidPrice: double
HighBidSize: double
HighAskPrice: double
HighAskSize: double
HighTradePrice: double
HighTradeSize: double
LowBidPrice: double
LowBidSize: double
LowAskPrice: double
LowAskSize: double
LowTradePrice: double
LowTradeSize: double
CloseBidPrice: double
CloseBidSize: double
CloseAskPrice: double
CloseAskSize: double
LastTradePrice: double
LastTradeSize: double
MinSpread: double
MaxSpread: double
CancelSize: double
VolumeWeightPrice: double
NBBOQuoteCount: int32
TradeAtBid: int32
TradeAtBidMid: int32
TradeAtMid: int32
TradeAtMidAsk: int32
TradeAtAsk: int32
TradeAtCrossOrLocked: int32
Volume: int32
TotalTrades: int16
FinraVolume: int32
FinraVolumeWeightPrice: double
UptickVolume: int32
DowntickVolume: int32
RepeatUptickVolume: int32
RepeatDowntickVolume: int32
UnknownTickVolume: int32
TradeToMidVolWeight: double
TradeToMidVolWeightRelative: doub

In [24]:
tst_ds.head(10)

pyarrow.Table
OpenBidPrice: double
OpenBidSize: double
OpenAskPrice: double
OpenAskSize: double
FirstTradePrice: double
FirstTradeSize: double
HighBidPrice: double
HighBidSize: double
HighAskPrice: double
HighAskSize: double
HighTradePrice: double
HighTradeSize: double
LowBidPrice: double
LowBidSize: double
LowAskPrice: double
LowAskSize: double
LowTradePrice: double
LowTradeSize: double
CloseBidPrice: double
CloseBidSize: double
CloseAskPrice: double
CloseAskSize: double
LastTradePrice: double
LastTradeSize: double
MinSpread: double
MaxSpread: double
CancelSize: double
VolumeWeightPrice: double
NBBOQuoteCount: int32
TradeAtBid: int32
TradeAtBidMid: int32
TradeAtMid: int32
TradeAtMidAsk: int32
TradeAtAsk: int32
TradeAtCrossOrLocked: int32
Volume: int32
TotalTrades: int16
FinraVolume: int32
FinraVolumeWeightPrice: double
UptickVolume: int32
DowntickVolume: int32
RepeatUptickVolume: int32
RepeatDowntickVolume: int32
UnknownTickVolume: int32
TradeToMidVolWeight: double
TradeToMidVolWeight

In [None]:
import pyarrow.compute as pc



## 2. Introduction to Backtrader

Backtrader is an extremely popular backtesting tool for testing trading strategies on historical stock data.

In [27]:
import backtrader as bt
import backtrader.feeds as btfeeds
import backtrader.analyzers as btanalyzers
from backtrader.feed import DataBase
from backtrader import date2num
from backtrader import TimeFrame
import os
import pytz
from pytz import timezone
import json
import time
import itertools
import datetime

# More documentation about backtrader: https://www.backtrader.com/

class AlgoStrategy():

    def __init__(self,strategy):
        self.cerebro = bt.Cerebro()
        strategy.init_broker(self.cerebro.broker)
        data=strategy.add_data(self.cerebro)
        strategy.data=data

        self.cerebro.addstrategy(strategy)

        self.portfolioStartValue=self.cerebro.broker.getvalue()
        self.cerebro.addanalyzer(btanalyzers.DrawDown, _name='dd')
        self.cerebro.addanalyzer(btanalyzers.SharpeRatio_A, _name='sharpe')
        self.cerebro.addanalyzer(btanalyzers.SQN, _name='sqn')
        self.cerebro.addanalyzer(btanalyzers.TradeAnalyzer, _name='ta')

    def performance(self):
        analyzer=self.thestrat.analyzers.ta.get_analysis()
        dd_analyzer=self.thestrat.analyzers.dd.get_analysis()

        #Get the results we are interested in
        total_open = analyzer.total.open
        total_closed = analyzer.total.closed
        total_won = analyzer.won.total
        total_lost = analyzer.lost.total
        win_streak = analyzer.streak.won.longest
        lose_streak = analyzer.streak.lost.longest
        pnl_net = round(analyzer.pnl.net.total,2)
        strike_rate=0
        if total_closed>0:
            strike_rate = (total_won / total_closed) * 100
        #Designate the rows
        h1 = ['Total Open', 'Total Closed', 'Total Won', 'Total Lost']
        h2 = ['Strike Rate','Win Streak', 'Losing Streak', 'PnL Net']
        h3 = ['DrawDown Pct','MoneyDown', '', '']
        self.total_closed=total_closed
        self.strike_rate=strike_rate
        self.max_drawdown=dd_analyzer.max.drawdown
        r1 = [total_open, total_closed,total_won,total_lost]
        r2 = [('%.2f%%' %(strike_rate)), win_streak, lose_streak, pnl_net]
        r3 = [('%.2f%%' %(dd_analyzer.max.drawdown)), dd_analyzer.max.moneydown, '', '']
        #Check which set of headers is the longest.
        header_length = max(len(h1),len(h2),len(h3))
        #Print the rows
        print_list = [h1,r1,h2,r2,h3,r3]
        row_format ="{:<15}" * (header_length + 1)
        print("Trade Analysis Results:")
        for row in print_list:
            print(row_format.format('',*row))

        analyzer=self.thestrat.analyzers.sqn.get_analysis()
        sharpe_analyzer=self.thestrat.analyzers.sharpe.get_analysis()
        self.sqn = analyzer.sqn
        self.sharpe_ratio = sharpe_analyzer['sharperatio']
        if self.sharpe_ratio is None:
            self.sharpe_ratio=0
        self.pnl = self.cerebro.broker.getvalue()-self.portfolioStartValue
        print('[SQN:%.2f, Sharpe Ratio:%.2f, Final Portfolio:%.2f, Total PnL:%.2f]' % (self.sqn,self.sharpe_ratio,self.cerebro.broker.getvalue(),self.pnl))

    def run(self):
        thestrats = self.cerebro.run()
        self.thestrat = thestrats[0]
        self.performance()

class MyFeed(DataBase):
    def __init__(self):
        super(MyFeed, self).__init__()
        self.list=testData.select("start", "open", "high", "low", "close", "volume", "vwap", "exponential_moving_average").collect()
        # AlgoSeek Columns:
        # Date,Ticker,TimeBarStart,OpenBarTime,OpenBidPrice,OpenBidSize,OpenAskPrice,OpenAskSize,FirstTradeTime,FirstTradePrice,FirstTradeSize,HighBidTime,HighBidPrice,HighBidSize,HighAskTime,HighAskPrice,HighAskSize,HighTradeTime,HighTradePrice,HighTradeSize,LowBidTime,LowBidPrice,LowBidSize,LowAskTime,LowAskPrice,LowAskSize,LowTradeTime,LowTradePrice,LowTradeSize,CloseBarTime,CloseBidPrice,CloseBidSize,CloseAskPrice,CloseAskSize,LastTradeTime,LastTradePrice,LastTradeSize,MinSpread,MaxSpread,CancelSize,VolumeWeightPrice,NBBOQuoteCount,TradeAtBid,TradeAtBidMid,TradeAtMid,TradeAtMidAsk,TradeAtAsk,TradeAtCrossOrLocked,Volume,TotalTrades,FinraVolume,FinraVolumeWeightPrice,UptickVolume,DowntickVolume,RepeatUptickVolume,RepeatDowntickVolume,UnknownTickVolume,TradeToMidVolWeight,TradeToMidVolWeightRelative,TimeWeightBid,TimeWeightAsk

        self.n=0

        self.fromdate=self.list[0]['start']
        self.todate=self.list[len(self.list)-1]['start']
        self.timeframe=bt.TimeFrame.Minutes
        print("from=%s,to=%s" % (self.fromdate,self.todate))

        self.m={}
        #print(self.list)

    def start(self):
        # Nothing to do for this data feed type
        pass

    def stop(self):
        # Nothing to do for this data feed type
        pass

    def _load(self):
        if self.n>=len(self.list):
            return False

        r=self.list[self.n]
        self.lines.datetime[0] = date2num(r['start'])

        self.lines.open[0] = r['open']
        self.lines.high[0] = r['high']
        self.lines.low[0] = r['low']
        self.lines.close[0] = r['close']
        self.lines.volume[0] = r['volume']
        self.m[r['start']]=r

        self.n=self.n+1
        return True

class StrategyTemplate(bt.Strategy):

    def __init__(self):
        self.lastDay=-1
        self.lastMonth=-1
        self.dataclose = self.datas[0].close

    @staticmethod
    def init_broker(broker):
        pass

    @staticmethod
    def add_data(cerebro):
        pass

    def next(self):
        dt=self.datas[0].datetime.datetime(0)
        #print("[NEXT]:%s:close=%s" % (dt,self.dataclose[0]))

        #SOM
        if self.lastMonth!=dt.month:
            if self.lastMonth!=-1:
                chg=self.broker.getvalue()-self.monthCash
                #print("[%s] SOM:chg=%.2f,cash=%.2f" % (dt,chg,self.broker.getvalue()))
            self.lastMonth=dt.month
            self.monthCash=self.broker.getvalue()

        #SOD
        if self.lastDay!=dt.day:
            self.lastDay=dt.day
            #print("[%s] SOD:cash=%.2f" % (dt,self.broker.getvalue()))


In [41]:
class AlgoSeekPandas(btfeeds.PandasData):
    lines = ('datetime',
            'open',
            'high',
            'low',
            'close',
            'volume',
            'OpenBidPrice',
            'OpenBidSize',
            'OpenAskPrice',
            'OpenAskSize',
            'FirstTradePrice',
            'FirstTradeSize', 
            'HighBidPrice', 
            'HighBidSize',
            'HighAskPrice', 
            'HighAskSize', 
            'HighTradePrice', 
            'HighTradeSize', 
            'LowBidPrice', 
            'LowBidSize', 
            'LowAskPrice', 
            'LowAskSize', 
            'LowTradePrice', 
            'LowTradeSize', 
            'CloseBidPrice', 
            'CloseBidSize', 
            'CloseAskPrice', 
            'CloseAskSize', 
            'LastTradePrice', 
            'LastTradeSize', 
            'MinSpread', 
            'MaxSpread', 
            'CancelSize', 
            'VolumeWeightPrice', 
            'NBBOQuoteCount', 
            'TradeAtBid', 
            'TradeAtBidMid', 
            'TradeAtMid', 
            'TradeAtMidAsk', 
            'TradeAtAsk', 
            'TradeAtCrossOrLocked', 
            'Volume', 
            'TotalTrades', 
            'FinraVolume', 
            'FinraVolumeWeightPrice',
            'UptickVolume', 
            'DowntickVolume', 
            'RepeatUptickVolume', 
            'RepeatDowntickVolume', 
            'UnknownTickVolume', 
            'TradeToMidVolWeight', 
            'TradeToMidVolWeightRelative', 
            'TimeWeightBid', 
            'TimeWeightAsk', 
        )
    params = (
            ('datetime', 49),
            ('dtformat', '%Y-%m-%d %H:%M:%S'),
            ('open', 5),
            ('high', 11),
            ('low', 17),
            ('close', 23),
            ('volume', 36),
            ('TimeBarStart', 3),
            ('OpenBarTime',4),
            ('OpenBidPrice',1),
            ('OpenBidSize', 2),
            ('OpenAskPrice', 3),
            ('OpenAskSize', 4),
            ('FirstTradePrice',5),
            ('FirstTradeSize', 6),
            ('HighBidPrice', 7),
            ('HighBidSize',8),
            ('HighAskPrice', 9),
            ('HighAskSize', 10),
            ('HighTradePrice', 11),
            ('HighTradeSize', 12),
            ('LowBidPrice', 13),
            ('LowBidSize', 14),
            ('LowAskPrice', 15),
            ('LowAskSize', 16),
            ('LowTradePrice', 17),
            ('LowTradeSize', 18),
            ('CloseBidPrice', 19),
            ('CloseBidSize', 20),
            ('CloseAskPrice', 21),
            ('CloseAskSize', 22),
            ('LastTradePrice', 23),
            ('LastTradeSize', 24),
            ('MinSpread', 25),
            ('MaxSpread', 26),
            ('CancelSize', 27),
            ('VolumeWeightPrice', 28),
            ('NBBOQuoteCount', 29),
            ('TradeAtBid', 30),
            ('TradeAtBidMid', 31),
            ('TradeAtMid', 32),
            ('TradeAtMidAsk', 33),
            ('TradeAtAsk', 34),
            ('TradeAtCrossOrLocked', 35),
            ('Volume', 36),
            ('TotalTrades', 37),
            ('FinraVolume', 38),
            ('FinraVolumeWeightPrice', 39),
            ('UptickVolume', 40),
            ('DowntickVolume', 41),
            ('RepeatUptickVolume', 42),
            ('RepeatDowntickVolume', 43),
            ('UnknownTickVolume', 44),
            ('TradeToMidVolWeight', 45),
            ('TradeToMidVolWeightRelative', 46),
            ('TimeWeightBid', 47),
            ('TimeWeightAsk', 48)
        )

In [34]:
class AlgoSeekDataFeed(DataBase):
    def __init__(self):
        # super(MyFeed, self).__init__()
        # self.list=testData.select("start", "open", "high", "low", "close", "volume", "vwap", "exponential_moving_average").collect()
        params = (
            ('datetime', 49),
            ('open', 5),
            ('high', 11),
            ('low', 17),
            ('close', 23),
            ('volume', 36),
            ('TimeBarStart', 3),
            ('OpenBarTime',4),
            ('OpenBidPrice',1),
            ('OpenBidSize', 2),
            ('OpenAskPrice', 3),
            ('OpenAskSize', 4),
            ('FirstTradePrice',5),
            ('FirstTradeSize', 6),
            ('HighBidPrice', 7),
            ('HighBidSize',8),
            ('HighAskPrice', 9),
            ('HighAskSize', 10),
            ('HighTradePrice', 11),
            ('HighTradeSize', 12),
            ('LowBidPrice', 13),
            ('LowBidSize', 14),
            ('LowAskPrice', 15),
            ('LowAskSize', 16),
            ('LowTradePrice', 17),
            ('LowTradeSize', 18),
            ('CloseBidPrice', 19),
            ('CloseBidSize', 20),
            ('CloseAskPrice', 21),
            ('CloseAskSize', 22),
            ('LastTradePrice', 23),
            ('LastTradeSize', 24),
            ('MinSpread', 25),
            ('MaxSpread', 26),
            ('CancelSize', 27),
            ('VolumeWeightPrice', 28),
            ('NBBOQuoteCount', 29),
            ('TradeAtBid', 30),
            ('TradeAtBidMid', 31),
            ('TradeAtMid', 32),
            ('TradeAtMidAsk', 33),
            ('TradeAtAsk', 34),
            ('TradeAtCrossOrLocked', 35),
            ('Volume', 36),
            ('TotalTrades', 37),
            ('FinraVolume', 38),
            ('FinraVolumeWeightPrice', 39),
            ('UptickVolume', 40),
            ('DowntickVolume', 41),
            ('RepeatUptickVolume', 42),
            ('RepeatDowntickVolume', 43),
            ('UnknownTickVolume', 44),
            ('TradeToMidVolWeight', 45),
            ('TradeToMidVolWeightRelative', 46),
            ('TimeWeightBid', 47),
            ('TimeWeightAsk', 48)
        )
        # AlgoSeek Columns:
        # Date,Ticker,TimeBarStart,OpenBarTime,OpenBidPrice,OpenBidSize,OpenAskPrice,OpenAskSize,FirstTradeTime,FirstTradePrice,FirstTradeSize,HighBidTime,HighBidPrice,HighBidSize,HighAskTime,HighAskPrice,HighAskSize,HighTradeTime,HighTradePrice,HighTradeSize,LowBidTime,LowBidPrice,LowBidSize,LowAskTime,LowAskPrice,LowAskSize,LowTradeTime,LowTradePrice,LowTradeSize,CloseBarTime,CloseBidPrice,CloseBidSize,CloseAskPrice,CloseAskSize,LastTradeTime,LastTradePrice,LastTradeSize,MinSpread,MaxSpread,CancelSize,VolumeWeightPrice,NBBOQuoteCount,TradeAtBid,TradeAtBidMid,TradeAtMid,TradeAtMidAsk,TradeAtAsk,TradeAtCrossOrLocked,Volume,TotalTrades,FinraVolume,FinraVolumeWeightPrice,UptickVolume,DowntickVolume,RepeatUptickVolume,RepeatDowntickVolume,UnknownTickVolume,TradeToMidVolWeight,TradeToMidVolWeightRelative,TimeWeightBid,TimeWeightAsk

        # self.n=0

        # self.fromdate=self.list[0]['start']
        # self.todate=self.list[len(self.list)-1]['start']
        # self.timeframe=bt.TimeFrame.Minutes
        # print("from=%s,to=%s" % (self.fromdate,self.todate))

        #self.m={}
        #print(self.list)



In [25]:
aapl = tst_ds.to_table(filter=ds.field('ticker')=='AAPL').to_pandas()
aapl.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,OpenBidPrice,OpenBidSize,OpenAskPrice,OpenAskSize,FirstTradePrice,FirstTradeSize,HighBidPrice,HighBidSize,HighAskPrice,HighAskSize,...,RepeatDowntickVolume,UnknownTickVolume,TradeToMidVolWeight,TradeToMidVolWeightRelative,TimeWeightBid,TimeWeightAsk,datetime,Year,Month,ticker
Ticker,BarDateTime,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
AAPL,2015-01-07 04:00:00,,,,,106.75,100.0,106.54,100.0,107.2,700.0,...,0,200,0.49801,0.166,106.42,106.95,2015-01-07 04:00:00,2015,1,AAPL
AAPL,2015-01-07 04:01:00,106.42,100.0,106.95,200.0,,,106.47,500.0,106.95,200.0,...,0,0,,,106.47,106.95,2015-01-07 04:01:00,2015,1,AAPL
AAPL,2015-01-07 04:02:00,106.47,500.0,106.95,200.0,,,106.47,500.0,106.95,200.0,...,0,0,,,106.47,106.95,2015-01-07 04:02:00,2015,1,AAPL
AAPL,2015-01-07 04:03:00,106.47,500.0,106.95,200.0,,,106.48,100.0,106.95,200.0,...,0,0,,,106.48,106.95,2015-01-07 04:03:00,2015,1,AAPL
AAPL,2015-01-07 04:04:00,106.48,100.0,106.95,200.0,,,106.48,100.0,106.95,200.0,...,0,0,,,106.48,106.95,2015-01-07 04:04:00,2015,1,AAPL


In [32]:
aapl.reset_index(drop=False, inplace=True)

In [40]:
aapl

Unnamed: 0,Ticker,BarDateTime,OpenBidPrice,OpenBidSize,OpenAskPrice,OpenAskSize,FirstTradePrice,FirstTradeSize,HighBidPrice,HighBidSize,...,RepeatDowntickVolume,UnknownTickVolume,TradeToMidVolWeight,TradeToMidVolWeightRelative,TimeWeightBid,TimeWeightAsk,datetime,Year,Month,ticker
0,AAPL,2015-01-07 04:00:00,,,,,106.75,100.0,106.54,100.0,...,0,200,0.49801,0.16600,106.42000,106.95000,2015-01-07 04:00:00,2015,1,AAPL
1,AAPL,2015-01-07 04:01:00,106.42,100.0,106.95,200.0,,,106.47,500.0,...,0,0,,,106.47000,106.95000,2015-01-07 04:01:00,2015,1,AAPL
2,AAPL,2015-01-07 04:02:00,106.47,500.0,106.95,200.0,,,106.47,500.0,...,0,0,,,106.47000,106.95000,2015-01-07 04:02:00,2015,1,AAPL
3,AAPL,2015-01-07 04:03:00,106.47,500.0,106.95,200.0,,,106.48,100.0,...,0,0,,,106.48000,106.95000,2015-01-07 04:03:00,2015,1,AAPL
4,AAPL,2015-01-07 04:04:00,106.48,100.0,106.95,200.0,,,106.48,100.0,...,0,0,,,106.48000,106.95000,2015-01-07 04:04:00,2015,1,AAPL
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
724235,AAPL,2017-09-13 19:56:00,159.43,100.0,159.52,2300.0,159.52,200.0,159.44,1000.0,...,0,0,,,159.43016,159.52000,2017-09-13 19:56:00,2017,9,AAPL
724236,AAPL,2017-09-13 19:57:00,159.44,1000.0,159.52,2100.0,159.44,1000.0,159.44,1000.0,...,0,0,-4.00000,-0.50000,159.43046,159.51611,2017-09-13 19:57:00,2017,9,AAPL
724237,AAPL,2017-09-13 19:58:00,159.43,100.0,159.50,1000.0,159.50,100.0,159.44,400.0,...,0,0,-1.57895,-0.33835,159.43258,159.49309,2017-09-13 19:58:00,2017,9,AAPL
724238,AAPL,2017-09-13 19:59:00,159.44,300.0,159.48,900.0,159.44,1993.0,159.44,300.0,...,2427,0,-0.54608,-0.18203,159.43844,159.46258,2017-09-13 19:59:00,2017,9,AAPL


In [44]:
import matplotlib.pyplot as plt

In [48]:
class maCross(bt.Strategy):
    '''
    For an official backtrader blog on this topic please take a look at:

    https://www.backtrader.com/blog/posts/2017-04-09-multi-example/multi-example.html

    oneplot = Force all datas to plot on the same master.
    '''
    params = (
    ('sma1', 40),
    ('sma2', 200),
    ('oneplot', True)
    )

    def __init__(self):
        '''
        Create an dictionary of indicators so that we can dynamically add the
        indicators to the strategy using a loop. This mean the strategy will
        work with any numner of data feeds. 
        '''
        self.inds = dict()
        for i, d in enumerate(self.datas):
            self.inds[d] = dict()
            self.inds[d]['sma1'] = bt.indicators.SimpleMovingAverage(
                d.close, period=self.params.sma1)
            self.inds[d]['sma2'] = bt.indicators.SimpleMovingAverage(
                d.close, period=self.params.sma2)
            self.inds[d]['cross'] = bt.indicators.CrossOver(self.inds[d]['sma1'],self.inds[d]['sma2'])

            if i > 0: #Check we are not on the first loop of data feed:
                if self.p.oneplot == True:
                    d.plotinfo.plotmaster = self.datas[0]

    def next(self):
        for i, d in enumerate(self.datas):
            dt, dn = self.datetime.date(), d._name
            pos = self.getposition(d).size
            if not pos:  # no market / no orders
                if d.UptickVolume > d.DowntickVolume:
                    print(d.UptickVolume)
                    self.buy(data=d, size=1000)
                elif d.UptickVolume < d.DowntickVolume:
                    self.sell(data=d, size=1000)
                # if self.inds[d]['cross'][0] == 1:
                #     self.buy(data=d, size=1000)
                # elif self.inds[d]['cross'][0] == -1:
                #     self.sell(data=d, size=1000)
            else:
                if self.inds[d]['cross'][0] == 1:
                    self.close(data=d)
                    self.buy(data=d, size=1000)
                elif self.inds[d]['cross'][0] == -1:
                    self.close(data=d)
                    self.sell(data=d, size=1000)

    def notify_trade(self, trade):
        dt = self.data.datetime.date()
        if trade.isclosed:
            print('{} {} Closed: PnL Gross {}, Net {}'.format(
                                                dt,
                                                trade.data._name,
                                                round(trade.pnl,2),
                                                round(trade.pnlcomm,2)))
            

#Variable for our starting cash
startcash = 10000

#Create an instance of cerebro
cerebro = bt.Cerebro()

#Add our strategy
cerebro.addstrategy(maCross, oneplot=False)

data = AlgoSeekPandas(dataname=aapl.dropna().set_index('BarDateTime'))
cerebro.adddata(data, name='AAPL')
cerebro.broker.setcash(startcash)
# Run over everything
cerebro.run()

#Get final portfolio Value
portvalue = cerebro.broker.getvalue()
pnl = portvalue - startcash

#Print out the final result
print('Final Portfolio Value: ${}'.format(portvalue))
print('P/L: ${}'.format(pnl))

#Finally plot the end results
# cerebro.plot(style='candlestick')

<backtrader.linebuffer.LineBuffer object at 0x7f3ca0a9f1f0>
2015-12-29 AAPL Closed: PnL Gross 9212.6, Net 9212.6
2015-03-16 AAPL Closed: PnL Gross -10759.0, Net -10759.0
2015-04-30 AAPL Closed: PnL Gross -1349.8, Net -1349.8
2015-06-29 AAPL Closed: PnL Gross 370.0, Net 370.0
2015-06-15 AAPL Closed: PnL Gross -1840.0, Net -1840.0
2015-07-31 AAPL Closed: PnL Gross 5440.0, Net 5440.0
2016-10-10 AAPL Closed: PnL Gross 5036.1, Net 5036.1
<backtrader.linebuffer.LineBuffer object at 0x7f3ca0a9f1f0>
2016-11-17 AAPL Closed: PnL Gross 7310.0, Net 7310.0
2016-12-21 AAPL Closed: PnL Gross -7030.0, Net -7030.0
<backtrader.linebuffer.LineBuffer object at 0x7f3ca0a9f1f0>
2016-02-16 AAPL Closed: PnL Gross 20800.1, Net 20800.1
2016-04-11 AAPL Closed: PnL Gross -13780.1, Net -13780.1
2016-04-29 AAPL Closed: PnL Gross 15982.2, Net 15982.2
2016-04-07 AAPL Closed: PnL Gross -15660.1, Net -15660.1
2016-05-17 AAPL Closed: PnL Gross 14960.1, Net 14960.1
2016-07-11 AAPL Closed: PnL Gross -3590.1, Net -3590.1
F

In [None]:
class AlgoSeekDataFeed(DataBase):
    def __init__(self):
        super(MyFeed, self).__init__()
        self.list=testData.select("start", "open", "high", "low", "close", "volume", "vwap", "exponential_moving_average").collect()

        # AlgoSeek Columns:
        # Date,Ticker,TimeBarStart,OpenBarTime,OpenBidPrice,OpenBidSize,OpenAskPrice,OpenAskSize,FirstTradeTime,FirstTradePrice,FirstTradeSize,HighBidTime,HighBidPrice,HighBidSize,HighAskTime,HighAskPrice,HighAskSize,HighTradeTime,HighTradePrice,HighTradeSize,LowBidTime,LowBidPrice,LowBidSize,LowAskTime,LowAskPrice,LowAskSize,LowTradeTime,LowTradePrice,LowTradeSize,CloseBarTime,CloseBidPrice,CloseBidSize,CloseAskPrice,CloseAskSize,LastTradeTime,LastTradePrice,LastTradeSize,MinSpread,MaxSpread,CancelSize,VolumeWeightPrice,NBBOQuoteCount,TradeAtBid,TradeAtBidMid,TradeAtMid,TradeAtMidAsk,TradeAtAsk,TradeAtCrossOrLocked,Volume,TotalTrades,FinraVolume,FinraVolumeWeightPrice,UptickVolume,DowntickVolume,RepeatUptickVolume,RepeatDowntickVolume,UnknownTickVolume,TradeToMidVolWeight,TradeToMidVolWeightRelative,TimeWeightBid,TimeWeightAsk

        self.n=0

        self.fromdate=self.list[0]['start']
        self.todate=self.list[len(self.list)-1]['start']
        self.timeframe=bt.TimeFrame.Minutes
        print("from=%s,to=%s" % (self.fromdate,self.todate))

        self.m={}
        #print(self.list)

    def start(self):
        # Nothing to do for this data feed type
        pass

    def stop(self):
        # Nothing to do for this data feed type
        pass

    def _load(self):
        if self.n>=len(self.list):
            return False

        r=self.list[self.n]
        self.lines.datetime[0] = date2num(r['start'])

        self.lines.open[0] = r['open']
        self.lines.high[0] = r['high']
        self.lines.low[0] = r['low']
        self.lines.close[0] = r['close']
        self.lines.volume[0] = r['volume']
        self.m[r['start']]=r

        self.n=self.n+1
        return True


In [None]:
class MyStrategy(StrategyTemplate):

    def __init__(self):  # Initiation
        super(MyStrategy, self).__init__()

    def init_broker(broker):
        broker.setcash(1000000.0)
        broker.setcommission(commission=0.0)

    def add_data(cerebro):
        data = MyFeed()
        cerebro.adddata(data)
        return data

    def next(self):  # Processing
        super(MyStrategy, self).next()
        dt=self.datas[0].datetime.datetime(0)
        r=self.data.m[dt]
        #print(r)
        size=self.cerebro.strat_params['size']
        threshold_PctChg=self.cerebro.strat_params['pct_chg']

        model=self.cerebro.strat_params['model']
        df=spark.createDataFrame([r])
        VWAP=r['vwap']
        predictedVWAP = model.transform(df).collect()[0]['prediction']
        expectedPctChg=(predictedVWAP-VWAP)/VWAP*100.0

        goLong=expectedPctChg>threshold_PctChg
        goShort=expectedPctChg<-threshold_PctChg
        #print("expectedPctChg=%s,goLong=%s,goShort=%s" % (expectedPctChg,goLong,goShort))

        if not self.position:
            if goLong:
                print("%s:%s x BUY @ %.2f" % (dt,size,r['close']))
                self.buy(size=size) # Go long
            else:
                print("%s:%s x SELL @ %.2f" % (dt,size,r['close']))
                self.sell(size=size) # Go short
        elif self.position.size>0 and goShort:
            print("%s:%s x SELL @ %.2f" % (dt,size*2,r['close']))
            self.sell(size=size*2)
        elif self.position.size<0 and goLong:
            print("%s:%s x BUY @ %.2f" % (dt,size*2,r['close']))
            self.buy(size=size*2)
