<a href="https://colab.research.google.com/github/whyrv/AI-Chip/blob/master/Parquet_cleanup.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install ipython-autotime
%load_ext autotime
import time
t1 = time.perf_counter()

In [None]:
import pandas as pd
import numpy as np
import datetime
from matplotlib import pyplot as plt
from pandas.tseries.frequencies import to_offset
from google.colab import files

In [None]:
symbol = 'AAPL'

In [None]:
df = pd.read_parquet('{}.parquet'.format(symbol))

In [None]:
df.columns = ['datetime', 'open', 'high', 'low', 'close', 'vol']

## Only keep market hour data and discard pre and post market

* Drop everything before 9:30 a.m. and after 16:01 p.m. 

In [None]:
df.drop(df.index[df['datetime'].dt.hour < 9] , inplace = True)

In [None]:
df.drop(df.index[(df['datetime'].dt.hour == 9) & (df['datetime'].dt.minute < 30)], inplace=True)

In [None]:
df.drop(df.index[df['datetime'].dt.hour > 16] , inplace = True)

In [None]:
df.drop(df.index[(df['datetime'].dt.hour == 16) & (df['datetime'].dt.minute > 1)], inplace=True)

In [None]:
df[df['datetime'].dt.year == 2007]

In [None]:
df.info()

## Auto detect splits and add as a column
* Figure out the split factor
* Divide the price up to the split and multiply the volume up to the split.

In [None]:
times = df[['datetime']].to_numpy()
days = df['datetime'].dt.day
daysnumpy = days.to_numpy()
close = df[['close']].to_numpy()

In [None]:
assert len(close) == len(days)

In [None]:
splitf = np.ones(len(close))
for i in range(1, len(close)):
  prev_day = daysnumpy[i-1]
  new_day = daysnumpy[i]
  # prev_day = times[i-1][0].astype('datetime64[D]')
  # new_day = times[i][0].astype('datetime64[D]')
  if i == 318:
    print("{} {}".format(prev_day, new_day))
  # only check for splits on day boundaries
  if prev_day == new_day:
    continue
  # check for reverse split 
  is_rev_split = False
  if close[i] > close[i-1]:
    is_rev_split = True
    q = close[i].item()/close[i-1].item()
  else:
    q = close[i-1].item()/close[i].item()
  # use round to get the integer not floor or int
  qint = round(q)
  # make sure that this is a real split. the round up or down should be very close to 
  # the original unrounded number. like 6.93 rounded up to 7 in the case of AAPL
  diff = 100*abs(q-qint)/q
  # so the split (or reverse) should be 2x or more and the round-error < 10%
  if qint >= 2 and diff <= 10:
    if is_rev_split:
      fac = q_int
    else:
      fac = 1.0 / qint
    t = pd.to_datetime(times[i][0]).strftime('%m-%d-%Y')
    splitf[:i] *= fac
    print('{}: stock split on {} by {} == {}-->{} ({})'.format(i, t, fac, close[i-1], close[i], splitf[0]))


In [None]:
plt.plot(splitf)

In [None]:
assert len(df['close']) == len(splitf)

In [None]:
df['realClose'] = df['close']
df['close'] *= splitf
df['open'] *= splitf
df['high'] *= splitf
df['low'] *= splitf
df['vol'] /= splitf

In [None]:
df.head()

In [None]:
df.tail()

## next

In [None]:
df.set_index('datetime')

Drop the initial rows of the dataframe till we get to a 9:30 start. Sometimes the df for a symbol starts in the middle of the day.

In [None]:
done = False
i = 0
while not done:
  if (df.iat[i,0].hour == 9 and df.iat[i,0].minute == 30):
    done = True
  else:
    i += 1
df = df.iloc[i: , :]
df.head()

In [None]:
df.info()

## Now aggregate the 1-min to 5-min bars. 

In [None]:
# Defines how to aggregate pricing when downsampling OHLC
# data into a broader time period. e.g. 1-min data -> 5 min data.
price_mapping = {
    "open": "first",
    "high": "max",
    "low": "min",
    "close": "last",
    "vol": "sum",
    "realClose": "last"
}

In [None]:
xx = df.set_index('datetime').resample('5T').agg(price_mapping)

* We need to shift the index so that the aggregation appears on the close of the bar, not the start.

In [None]:
loffset='5T'
xx.index = xx.index + to_offset(loffset)

In [None]:
len(xx.index)

In [None]:
xx = xx.dropna()

In [None]:
xx.head()

In [None]:
len(xx.index)

## Aggregate to daily bars and calculate pivot points

In [None]:
zz = df.set_index('datetime').resample('D').agg(price_mapping)

In [None]:
zz.tail()

In [None]:
zz = zz.dropna()

### Calculate pivot points

In [None]:
zz = zz.copy() # avoid the infamous set on slice warning

In [None]:
zz['PP'] = (zz['high'] + zz['low'] + zz['close'])/3.0
zz['R1'] = 2*zz['PP'] - zz['low']
zz['S1'] = 2*zz['PP'] - zz['high']
zz['R2'] = zz['PP'] + (zz['high'] - zz['low'])
zz['S2'] = zz['PP'] - (zz['high'] - zz['low'])
zz['R3'] = zz['high'] + 2*(zz['PP'] - zz['low'])
zz['S3'] = zz['low'] - 2*(zz['high'] - zz['PP'])

In [None]:
zz.tail()

##### Store the indices of the day start and day end bars. Because the start times are not guaranteed to be at 9:30, we are not guaranteed to have the same no. of bars per day. Hence exhaustive search.

In [None]:
row_hours  = xx.index.hour
end_ids = {}
start_ids = {}
for i in range(0, len(xx.index)-1):
  if row_hours[i] > row_hours[i+1]:  # 16:x > 9:x quick way to check for date rollover
    end_ids[i] = 0
    start_ids[i+1] = 0

# add the last row anyway
end_ids[len(xx.index)-1] = 0
assert len(end_ids) == len(zz.index)

# Date Generation
convert datetime index back to column
then remap to new dates

* First reset the index

In [None]:
xx = xx.reset_index()

In [None]:
xx[xx.isnull().any(axis=1)]

In [None]:
valid_dates = pd.bdate_range(start='1/1/1900', end='1/01/2021').strftime("%Y-%m-%d").tolist()

In [None]:
len(xx.index)/len(valid_dates)

In [None]:
def remap_dates(datelist, num_rows):
    Ds, Is = [], []
    nd = len(datelist) - 79 
    q = 0
    id = 0
    for i in range(0, num_rows):
        if i in start_ids and id > nd:
          #print('HERE: q={}, i={}'.format(q, i))
          q += 1
          id = 0
          # record the remapped date
        newdate = datelist[id]
        id += 1
        Ds.append(newdate)
        Is.append(q)
        if i in start_ids:
          start_ids[i] = (q,newdate)
        elif i in end_ids:
          end_ids[i] = (q, newdate)
    # print('nd={} last=i={} last q={} last id={}'.format(nd,i,q,id))
    return pd.Series({'D': Ds,
                      'I': Is})

In [None]:
a,b = remap_dates(valid_dates, len(xx.index))

In [None]:
xx[['D', 'I']] = remap_dates(valid_dates, len(xx.index))

In [None]:
xx.head(80)

In [None]:
xx[xx['datetime'].dt.year == 2007]

In [None]:
xx[xx['I'] == 1]

## Now create the event lists which hold the pivot points

In [None]:
end_ids[78]

In [None]:
assert len(end_ids) == len(zz.index)

In [None]:
def symbol_from_level(sym, level):
  return "{}-{}".format(sym, level)

In [None]:
pp = zz.columns.get_loc('PP')
prevloc = pp
for pivot in ('R1', 'S1', 'R2', 'S2', 'R3', 'S3'):
  newloc = zz.columns.get_loc(pivot)
  assert newloc == prevloc + 1, "prev-{} newloc-{} pivot-{}".format(prevloc, newloc, pivot)
  prevloc = newloc

In [None]:
xx.columns.get_loc('D')

In [None]:
rows = []
remap_loc = xx.columns.get_loc('D')
k = 0
for i in sorted(end_ids):
  q, remapped_date = end_ids[i]
  sym = symbol_from_level(symbol, q)
  assert xx.iat[i, remap_loc] == remapped_date
  # for each pivot point add sym,date,type,value in respective lists
  # need one more assert that the zz's datetime matches
  piv = 1
  for pivotcol in range(pp, pp+7):
    rows.append((sym, remapped_date, piv, zz.iat[k,pivotcol]))
    piv += 1
  k += 1

ef = pd.DataFrame(rows, columns=['Symbol', 'Date', 'Type', 'Value'])

In [None]:
ef.head(9)

In [None]:
ef.info()

In [None]:
assert len(ef.index) == len(zz.index)*7

In [None]:
xx.head(80)

In [None]:
zz.head()

## Post Process
* Drop the original datetime (minute bars)
* convert the 'D' dates to the new index

In [None]:
xx = xx.drop('datetime', axis=1)

In [None]:
xx.rename(columns={'D':'datetime'}, inplace=1)

In [None]:
xx = xx.set_index('datetime')

In [None]:
xx.head()

## Finalize to CSVs and download
* create a new symbol for each date-year-range
* drop the I column and write csv

In [None]:
mkdir -p "/content/CSV"

In [None]:
mkdir -p "/content/events"

In [None]:
xx['I'].max()

In [None]:
maxi = xx['I'].max()+1
for i in range(0, maxi):
  csv_name = '/content/CSV/{}.csv'.format(symbol_from_level(symbol, i))
  xx[xx['I'] == i].to_csv(csv_name, mode='w', columns = ['open', 'high', 'low', 'close', 'vol', 'realClose'])

In [None]:
!zip -r /content/AAPL.CSV.zip /content/CSV

In [None]:
event_list_file = '/content/events/{}.csv'.format(symbol)
ef.to_csv(event_list_file, mode='w', index=False)

In [None]:

files.download('/content/AAPL.CSV.zip')
files.download(event_list_file)

In [None]:
t2 = time.perf_counter()
print('time taken to run:',t2-t1)