In [1]:
%load_ext autoreload
%autoreload 2
import sys; sys.path.append('../lib/*/*'); sys.path.append('..')
import alpaca_trade_api as alp
import matplotlib.pyplot as plt
import lib.TimeKeeper as tk
import lib.Toolbox as tb
import glob
import lib.Broker as br
import pandas as pd
from lib.Config import Config
import numpy as np
import os
import os.path as p
import pytz
from scipy import stats

api = br.paper_api()



Activating with Paper


# Observation
During training we make predictions one when to buy and sell which are broadly mirrored, but not always, and further, when they are mirrored there is variance in whether the trade actually goes through. That is to say, we are recording it going through in training, but then it does not in trading. How do we reconcile these distinct discrepencies? Let us speak them more plainly:
1. *Incomplete Parity*: Parity is not perfect, we must create perfect parity in all operational markers.
2. *Trade Completion Parity*: Once we have perfect parity of information, we must still achieve perfect parity of outcome. We must ensure that we can measure when a trade will actually have gone through.

## Incomplete Parity
Ultimately we need to produce better records of what is actually being records and ensure extremely precise timekeeping. Honestly, since we can make so many querries, it may make sense to do a complete overhaul of our record collection program and instead literally pull each period as though we were going through it as we have... although, that's also a crazy thing to do...

We just need to show that, 100% of the time, we have the exact same trades pulled AND we are able to get the exact same *history* as we have in training. After all, all of our metrics rely on the past 20 to 40 ticks.

Ok, so we need to form up our trade pulling. Do we go so far as to keep records of ALL trades ever done so as to more easily pull them? Possibly, though I think it may lead to some severe issues in terms of git data bloat, but then we may be able to store them effectively so I'm not sure.

Specific next steps:
1. Pull all trades for NVDA from the past like, three months and store them in pickle form.
2. Make a function to check whether the history and the current data are the same.
3. Make logs keep track of all of the data we're taking in, but in a compressed and simplified manner (basically, we feed a set of dataframes to it and it parses them into a single string, which we store under a column called "bulk data" or some such)
4. Run an experiment on the current data being pulled by the robotrader

In [33]:
# Pull trade info for archiving
api = br.paper_api()

trades = api.get_trades('NVDA', start=tk.to_time_s(2,1), feed='sip').df

Activating with Paper


In [19]:
api = br.paper_api()
entries = [base.split('/')[-1] for base in glob.glob('../archive_data/raw_trades/NVDA/*')]
print(entries)
for date in (pd.date_range(tk.to_time(2,1),tk.to_time(4,24), freq='B')):
    if str(date)+'.pkl' in entries: continue
    trades = api.get_trades('NVDA', start=tk.dto_time(tk.get_market_open(date)), end=tk.dto_time(tk.get_market_close(date))).df
    if len(trades) == 0: print(date); continue
    trades[['price','size']].to_pickle('../archive_data/raw_trades/NVDA/'+str(date)+'.pkl')
    #print(pd.read_pickle('../archive_data/raw_trades/NVDA/'+str(date)+'.pkl'))

Activating with Paper
['2024-02-15 00:00:00-04:00.pkl', '2024-02-08 00:00:00-04:00.pkl', '2024-02-01 00:00:00-04:00.pkl', '2024-02-06 00:00:00-04:00.pkl', '2024-02-12 00:00:00-04:00.pkl', '2024-02-16 00:00:00-04:00.pkl', '2024-02-13 00:00:00-04:00.pkl', '2024-02-14 00:00:00-04:00.pkl', '2024-02-07 00:00:00-04:00.pkl', '2024-02-09 00:00:00-04:00.pkl', '2024-02-02 00:00:00-04:00.pkl', '2024-02-05 00:00:00-04:00.pkl']
2024-02-19 00:00:00-04:00
2024-03-29 00:00:00-04:00


Unnamed: 0_level_0,price,size
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2024-04-24 19:00:00.010579607+00:00,792.3650,1
2024-04-24 19:00:00.071332957+00:00,792.4818,1
2024-04-24 19:00:00.153252316+00:00,792.3200,10
2024-04-24 19:00:00.185888115+00:00,792.2300,8
2024-04-24 19:00:00.185890631+00:00,792.2300,5
...,...,...
2024-04-24 19:47:09.508482317+00:00,795.8750,85
2024-04-24 19:47:09.567990839+00:00,795.8440,300
2024-04-24 19:47:09.607895009+00:00,795.9900,5
2024-04-24 19:47:09.613854625+00:00,795.8750,1


### Our Next Steps
Ok, so, what are our next steps? We need to make a trade gathering archive (basically, what we need to do is store both the trades and the data, and be able to update them both with each tick). We need to create a function that can compare the trades historically with those that we collect in the present, and specifically look for discontenuity. So what do we do?
1. We update get_data to both collect and store trade information and data information
2. We update get_segment to both collect and store trade information and data information
3. We update strategy get data to both collect and store trade information and data information
4. We create a function which checks whether the collected data is the same as what we've got in the archives (and indeed, whether the archive is accurate/keeps continuity)

What would this change even look like? Well if we're keeping track of trades we need to determine how. I think we'll have each day of trades as it's own pickle file. Honestly, there's so much data with them it would be silly to splice them together. Speaking of splicing, we need a general purpose function for splicing two sets together, specifically only adding the relevant parts and hopefully not having to sort everything every time. To that end, we should also reduce the general amount of data being loaded at any given time, so if we do have to sort everytime we can do so intelligently.

What are we invisioning? get_segment fires every 10 seconds (or whatever the tick duration is) and then every minute we update the archive with what we've gathered. This will only work after we're certain that parity is perfect between our backtesting and our current.

In [2]:
def strat_get_data(self, update:bool = False):
    '''Pulls the given day's data. If update then we only pull a single minute and update our existing data'''
    if update and hasattr(self, 'data') and tk.in_tick(self.data.index[-1]):
        bar = tb.get_segment(self.symbol, self.api)
        self.data.loc[bar.name] = bar
    else:
        s, e = tk.get_market_open(self.date), tk.get_market_close(self.date)
        tb.update_archive(self.symbol)
        self.data = tb.get_archive(self.symbol)[s:e]

In [3]:
def get_data(symbol:str, start:pd.DatetimeIndex = tk.dto_time(tk.get_market_open()), end:pd.DatetimeIndex = tk.dto_time(tk.now()), api:alp.REST = None):
    '''Returns the data of the given time'''
    if not api: api = br.paper_api()

    s, e = start if isinstance(start, str) else tk.dto_time(start), end if isinstance(end, str) else tk.dto_time(end)
    df = api.get_trades(symbol, start=s, end=e, feed='sip').df
    
    if len(df) == 0: return None
    else: df = df.price

    df.index = df.index
    ts = pd.Series(pd.date_range(df.index[0], end=df.index[-1], freq=str(tk.TICK_DURATION)+"s")).round('s')
    data = []
    for i in range(len(ts))[:-1]:
        try:
            f = df[ts.loc[i]:ts.loc[i+1]]
            #print(f.index.min(),f.index.max(), len(f), print(ts.loc[i+1]))
            f = f[(np.abs(stats.zscore(f)) < 1)]
            if len(f) > 0: data.append({'date':ts.loc[i+1], 'open': f.iloc[0], 'close': f.iloc[-1], 'high':f.max(), 'low':f.min(), 'volume':len(f)})
            else: data.append({'date':ts.loc[i+1], 'open': data[-1]['open'], 'close': data[-1]['close'], 'high':data[-1]['high'], 'low':data[-1]['low'], 'volume':len(f)})
        except Exception as e: print(e)
    if len(data) > 0:
        data = pd.DataFrame(data).set_index('date')
        data.index = data.index.tz_convert(pytz.timezone('America/New_York'))
    return data

# What we Really Need
What we really need is a function called "get_data_up_to_now" that does exactly what it says, getting the data from the beginning of the day up to this minute

In [4]:
tk.sync()
get_data()

sleeping for 7


TypeError: get_data() missing 1 required positional argument: 'symbol'

In [None]:
# How will this actually work. Make a function that updates the trade pickle for a certain day (or creates it if non exist).
'''
    calls data_up_to_now
        check if there's a data archive of today
            if yes, grab it
            if no:
                check if there is a trades pickle for today, if not, create one up to now
        if there is, pull it, if the time is up to now (within the tick amount) then bam, that's your 


'''

In [2]:
import lib.Broker as br
import lib.TimeKeeper as tk
import lib.Toolbox as tb
import alpaca_trade_api as alp
import numpy as np
import pandas as pd
from scipy import stats
import pytz

def process_trades(df:pd.DataFrame):
    '''Processes the trades given up to the most recent Timekeeper tick'''
    ts, data = pd.Series(pd.date_range(df.index[0], end=df.index[-1], freq=str(tk.TICK_DURATION)+"s").round('s')), []
    for i in range(1, len(ts)):
        f = df[ts.loc[i-1]:ts.loc[i]].price
        f = f[(np.abs(stats.zscore(f)) < 1)]
        data.append({'date':ts.loc[i], 'open': f.iloc[0], 'close': f.iloc[-1], 'high':f.max(), 'low':f.min(), 'volume':len(f)})
    if len(data) > 0:
        data = pd.DataFrame(data).set_index('date')
        data.index = data.index.tz_convert(pytz.timezone('America/New_York'))
    return data

def update_archive(symbol:str, date:pd.DatetimeIndex = tk.today()):
    '''Collects the data of a ticker and adds it to a pre-existing archive, sorting and making sure there are no duplicates'''
    trades = update_trades(symbol)
    print(trades.index[-1] + pd.DateOffset(seconds=tk.TICK_DURATION))

    datename = date.strftime("%Y-%m-%d"); ppath = f'../archive_data/processed/{symbol}'; rpath = f'../archive_data/raw/{symbol}'
    if not p.isfile(f'{ppath}/{datename}.pkl'): process_trades(trades).to_pickle(f'{ppath}/{datename}.pkl')
    data = pd.read_pickle(f'{ppath}/{datename}.pkl')
    if tk.get_market_open() > tk.now() or (data.index[-1] + pd.DateOffset(seconds=tk.TICK_DURATION) > tk.now()): return
    data = easy_concat(data, process_trades(trades))
    data.to_pickle(f'{ppath}/{datename}.pkl')
    return data

def update_trades(symbol:str, date:pd.DatetimeIndex = tk.today()):
    '''Updates the trade info of the given day for the given symbol'''
    datename = date.strftime("%Y-%m-%d"); rpath = f'../archive_data/raw/{symbol}'
    if not p.isfile(f'{rpath}/{datename}.pkl'): get_days_trades(symbol).to_pickle(f'{rpath}/{datename}.pkl')
    trades = pd.read_pickle(f'{rpath}/{datename}.pkl')
    trades = easy_concat(trades, get_trades(symbol, trades.index[-1], tk.get_market_close(date)))
    trades.to_pickle(f'{rpath}/{datename}.pkl')
    return trades

def get_trades(symbol:str, s:pd.DatetimeIndex, e:pd.DatetimeIndex):
    '''Functions as api.get_trades() except converts to a df with price and size and can handle datetime objects'''
    if not isinstance(s, str): s = tk.dto_time(s)
    if not isinstance(e, str): e = tk.dto_time(e)
    trades = api.get_trades(symbol, start=s, end=e).df
    if len(trades) == 0: return None
    trades.index = trades.index.tz_convert(pytz.timezone('America/New_York'))
    return  trades[['price','size']]

def get_days_trades(symbol:str, date:pd.DatetimeIndex = tk.today()):
    '''Returns the trades of just today'''
    return get_trades(symbol, tk.get_market_open(date), tk.get_market_close(date))

def get_week(symbol:str, date:pd.DatetimeIndex = tk.today(), api = None):
    '''Returns the data of the last week as a dataframe (ignoring weekends of course)'''
    business_days, df, api = pd.date_range(date-pd.DateOffset(weeks=1)+pd.DateOffset(days=1), date, freq='B'), [], br.paper_api() if not api else api
    print(business_days)
    for day in business_days:
        print(day)
        pdf = get_data(symbol, tk.get_market_open(day), tk.get_market_close(day), api=api)
        df = pdf if not isinstance(df, pd.DataFrame) else easy_concat(df, pdf)
    return df

def construct_archive(symbol:str, weeks=1):
    '''Constructs an archive for the given symbol using all available minute data that can be grabbed'''
    df, api = None, br.paper_api()
    if weeks > 0:
        for i in range(weeks, -1, -1):
            pdf = get_week(symbol, tk.today()-pd.DateOffset(weeks=i), api)
            df = pdf if not isinstance(df, pd.DataFrame) else easy_concat(df, pdf)
    else: df = get_data(symbol)
    if not p.isdir('./archive_data/'+symbol.upper()): sub.run('mkdir ./archive_data/'+symbol.upper(), shell=True)
    pd.to_pickle(df, './archive_data/'+symbol.upper()+'/main_archive.pkl')
        
def update_tracked_archives():
    for symbol in tb.get_symbols('tracked_tickers.txt'):
        construct_archive(symbol)

def save_archive(symbol:str, name:str, data:pd.DataFrame = None):
    '''Collects the data of a ticker and adds it to a pre-existing archive, sorting and making sure there are no duplicates'''
    symbol_dir = './archive_data/'+symbol.upper()
    archive_path = symbol_dir+'/'+name+'_main_archive.pkl'
    if not p.isdir(symbol_dir): sub.run('mkdir '+symbol_dir,shell=True)
    if not p.isfile(archive_path): data.to_pickle(archive_path); return True
    archive_data = pd.read_pickle(archive_path)
    joined = pd.concat([data, archive_data])
    joined = joined[~joined.index.duplicated(keep='first')]
    sub.run('cp '+archive_path+' '+archive_path+'.bckp', shell=True)
    joined.to_pickle(archive_path)
    if len(archive_data) != len(joined): return True
    return False

def get_archive(symbol:str):
    '''Retrieves an archive of minute data from our stores and returns it as a normal datasource'''
    symbol_dir = './archive_data/'+symbol.upper()
    archive_path = symbol_dir+'/main_archive.pkl'
    if not p.isfile(archive_path): archive_path = '.'+archive_path
    archive_data = pd.read_pickle(archive_path)
    archive_data = archive_data[~archive_data.index.duplicated(keep='last')]
    archive_data.columns = archive_data.columns.str.lower()
    return archive_data

def easy_concat(df1:pd.DataFrame, df2:pd.DataFrame):
    '''Quick and dirty wrapper method to concat two dataframes and remove duplicates'''
    df = pd.concat([df1,df2])
    df = df[~df.index.duplicated(keep='last')]
    return df.sort_index()

def get_segment(symbol:str, api:alp.REST):
    '''Get's the last minute of the symbol and waits if it doesn't match'''
    tick = tk.now().round(f'{tk.TICK_DURATION}s')
    bar = get_trades(symbol, tick-pd.DateOffset(seconds=tk.TICK_DURATION), tick).price
    bar = bar[(np.abs(stats.zscore(bar)) < 1)]
    bar = pd.DataFrame([{'date':tick, 'open': bar.iloc[0], 'close': bar.iloc[-1], 'high':bar.max(), 'low':bar.min(), 'volume':len(bar)}]).set_index('date').iloc[-1]
    return bar



In [10]:

'''The actual cycle of keeping a vigilent eye on the market and seeing if we should trade'''
open_date = tk.get_trade_open()
if tk.is_after(open_date.hour, open_date.minute): tk.sync()
#self.post_to_slack()
data = update_archive('NVDA')
while True:
    # if tk.keep_time(self.port):
    #     self.update_tickers()
    #     tk.sync()
    #     self.post_to_slack()
    print(tk.now())
    # self.trade(tk.now())
    bar = get_segment('NVDA', api)
    data.loc[bar.name] = bar
    tk.sync()

sleeping for 3
2024-05-02 12:27:21.055577832-04:00
2024-05-02 12:27:13-04:00
open      847.6724
close     847.3900
high      847.7300
low       847.3900
volume    140.0000
Name: 2024-05-02 12:27:10-04:00, dtype: float64
sleeping for 8
2024-05-02 12:27:21-04:00
open      847.300
close     847.440
high      847.455
low       847.275
volume     68.000
Name: 2024-05-02 12:27:20-04:00, dtype: float64
sleeping for 10
2024-05-02 12:27:31-04:00
open      847.4324
close     847.4000
high      847.5200
low       847.3000
volume     60.0000
Name: 2024-05-02 12:27:30-04:00, dtype: float64
sleeping for 10
2024-05-02 12:27:41-04:00
open      847.400
close     847.480
high      847.485
low       847.120
volume    104.000
Name: 2024-05-02 12:27:40-04:00, dtype: float64
sleeping for 10
2024-05-02 12:27:51-04:00
open      847.390
close     847.265
high      847.420
low       847.160
volume     61.000
Name: 2024-05-02 12:27:50-04:00, dtype: float64
sleeping for 10
2024-05-02 12:28:01-04:00
open      847.

KeyboardInterrupt: 

In [24]:
def compare_logs(symbol:str, date:pd.DatetimeIndex = tk.today()):
    entry_name = date.strftime("%Y-%m-%d-%a")
    brokerage_p, training_p = p.join('logs','brokerage',entry_name,symbol.upper()+'_BR_OPS.tsv'),  p.join('logs','training',entry_name,symbol.upper()+'_TR_OPS.tsv')
    if not p.isfile(brokerage_p): brokerage_p = '../'+brokerage_p
    if not p.isfile(training_p): training_p = '../'+training_p
    broker, training = pd.read_csv(brokerage_p, delimiter='\t').set_index('index'), pd.read_csv(training_p, delimiter='\t').set_index('index')
    for index in broker.index:
        if index in training.index:
            be, te = broker.loc[index], training.loc[index]
            if be.close==te.close and be.bollinger_high and te.bollinger_high and be.volume == te.volume:
                continue
            print(index, 'DISCREPENCY')


compare_logs('NVDA')

In [25]:
''' 
Every 10 seconds we pull data
Every 60 seconds we check it against our archive (after updating of course)

'''

' \nEvery 10 seconds we pull data\nEvery 60 seconds we check it against our archive (after updating of course)\n\n'