**Copyright (c) 2021 Risklab Middle East - All Rights Reserved**

---


**Author: Mehrdad Moghimi**



# Imports libraries

In [1]:
%%capture
!pip install plotly -U

In [2]:
import pandas as pd
import numpy as np 
import datetime
import time
import sys
from scipy import stats
from statsmodels.stats import stattools
from statsmodels.tsa.stattools import adfuller
import statsmodels.api as sm

import plotly.express as px
import plotly.figure_factory as ff
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import multiprocessing as mp

pd.options.plotting.backend = "plotly"
np.seterr(divide='ignore', invalid='ignore')

  import pandas.util.testing as tm


{'divide': 'warn', 'invalid': 'warn', 'over': 'warn', 'under': 'ignore'}

# Import Tick data

In [3]:
dir = "https://raw.githubusercontent.com/risk-labratory/data/main/"
url = dir + "IVE_2020.csv"
df = pd.read_csv(url, header=0)
df['dates'] = pd.to_datetime(df['dates'])
df.set_index('dates', inplace=True, drop=True)
df.drop_duplicates(inplace=True)
df = df[(df.index.hour>=9) & (df.index.hour<16)]
df.head()

Unnamed: 0_level_0,price,bid,ask,size
dates,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2020-01-02 09:30:00,130.68,130.59,130.6,20625
2020-01-02 09:30:01,130.5,130.5,130.77,200
2020-01-02 09:30:04,130.53,130.52,130.78,100
2020-01-02 09:30:04,130.55,130.52,130.78,100
2020-01-02 09:30:04,130.53,130.52,130.78,200


# Functions

In [4]:
# SNIPPET 20.5 THE linParts FUNCTION
def linParts(numAtoms,numThreads):
  # partition of atoms with a single loop
  parts=np.linspace(0,numAtoms,min(numThreads,numAtoms)+1)
  parts=np.ceil(parts).astype(int)
  return parts

# SNIPPET 20.6 THE nestedParts FUNCTION
def nestedParts(numAtoms,numThreads,upperTriang=False):
  # partition of atoms with an inner loop
  parts,numThreads_=[0],min(numThreads,numAtoms)
  for num in range(numThreads_):
    part=1 + 4*(parts[-1]**2+parts[-1]+numAtoms*(numAtoms+1.)/numThreads_)
    part=(-1+part**.5)/2.
    parts.append(part)
  parts=np.round(parts).astype(int)
  if upperTriang: # the first rows are the heaviest
    parts=np.cumsum(np.diff(parts)[::-1])
    parts=np.append(np.array([0]),parts)
  return parts

# SNIPPET 20.7 THE mpPandasObj, USED AT VARIOUS POINTS IN THE BOOK
def mpPandasObj(func,pdObj,numThreads=24,mpBatches=1,linMols=True,**kargs):
  """
  Parallelize jobs, return a DataFrame or Series
  + func: function to be parallelized. Returns a DataFrame
  + pdObj[0]: Name of argument used to pass the molecule
  + pdObj[1]: List of atoms that will be grouped into molecules
  + kargs: any other argument needed by func
  Example: df1=mpPandasObj(func,(’molecule’,df0.index),24,**kargs)
  """
  argList = list(kargs.values()) #?
  if linMols:
    parts=linParts(len(argList[1]),numThreads*mpBatches)
  else:
    parts=nestedParts(len(argList[1]),numThreads*mpBatches)
  jobs=[] 
  for i in range(1,len(parts)):
    job={pdObj[0]:pdObj[1][parts[i-1]:parts[i]],'func':func}
    job.update(kargs)
    jobs.append(job)
  if numThreads==1:
    out=processJobs_(jobs)
  else:
    out=processJobs(jobs,numThreads=numThreads)
  if isinstance(out[0],pd.DataFrame):
    df0=pd.DataFrame()
  elif isinstance(out[0],pd.Series):
    df0=pd.Series()
  else:
    return out
  for i in out:
    df0=df0.append(i)
  df0=df0.sort_index()
  return df0

# SNIPPET 20.8 SINGLE-THREAD EXECUTION, FOR DEBUGGING
def processJobs_(jobs):
  # Run jobs sequentially, for debugging
  out=[]
  for job in jobs:
    out_=expandCall(job)
    out.append(out_)
  return out

# SNIPPET 20.9 EXAMPLE OF ASYNCHRONOUS CALL TO PYTHON’S MULTIPROCESSING LIBRARY
def reportProgress(jobNum,numJobs,time0,task):
  # Report progress as asynch jobs are completed
  msg=[float(jobNum)/numJobs,(time.time()-time0)/60.]
  msg.append(msg[1]*(1/msg[0]-1))
  timeStamp=str(datetime.datetime.fromtimestamp(time.time()))
  msg= timeStamp+' '+str(round(msg[0]*100,2))+'% '+task+' done after '+ str(round(msg[1],2))+' minutes. Remaining '+str(round(msg[2],2))+' minutes.'
  if jobNum<numJobs:
    sys.stderr.write(msg+'\r')
  else:
    sys.stderr.write(msg+'\n')
  return

def processJobs(jobs,task=None,numThreads=24):
  # Run in parallel.
  # jobs must contain a ’func’ callback, for expandCall
  if task is None:task=jobs[0]['func'].__name__
  pool=mp.Pool(processes=numThreads)
  outputs,out,time0=pool.imap_unordered(expandCall,jobs),[],time.time()
  # Process asynchronous output, report progress
  for i,out_ in enumerate(outputs,1):
    out.append(out_)
    reportProgress(i,len(jobs),time0,task)
  pool.close()
  pool.join() # this is needed to prevent memory leaks
  return out

# SNIPPET 20.10 PASSING THE JOB (MOLECULE) TO THE CALLBACK FUNCTION
def expandCall(kargs):
  # Expand the arguments of a callback function, kargs[’func’]
  func=kargs['func']
  del kargs['func']
  out=func(**kargs)
  return out

# SNIPPET 20.12 ENHANCING processJobs TO PERFORM ON-THE-FLY OUTPUT REDUCTION
def processJobsRedux(jobs,task=None,numThreads=24,redux=None,reduxArgs={}, reduxInPlace=False):
  '''
  Run in parallel
  jobs must contain a ’func’ callback, for expandCall
  redux prevents wasting memory by reducing output on the fly
  '''
  if task is None:
    task=jobs[0]['func'].__name__
  pool = mp.Pool(processes=numThreads)
  imap, out, time0 = pool.imap_unordered(expandCall,jobs),None,time.time()
  # Process asynchronous output, report progress
  for i,out_ in enumerate(imap,1):
    if out is None:
      if redux is None:
        out,redux,reduxInPlace=[out_],list.append,True
      else:
        out=copy.deepcopy(out_)
    else:
      if reduxInPlace:
        redux(out,out_,**reduxArgs)
      else:
        out=redux(out,out_,**reduxArgs)
    reportProgress(i,len(jobs),time0,task)
  pool.close();pool.join() # this is needed to prevent memory leaks
  if isinstance(out,(pd.Series,pd.DataFrame)):out=out.sort_index()
  return out

# SNIPPET 20.13 ENHANCING mpPandasObj TO PERFORM ON-THE-FLY OUTPUT REDUCTION
def mpJobList(func, argList, numThreads=24, mpBatches=1, linMols=True, redux=None, reduxArgs={}, reduxInPlace=False, **kargs):
  if linMols:
    parts=linParts(len(argList[1]),numThreads*mpBatches)
  else:
    parts=nestedParts(len(argList[1]),numThreads*mpBatches)
  jobs=[]
  for i in xrange(1,len(parts)):
    job={argList[0]:argList[1][parts[i-1]:parts[i]],'func':func}
    job.update(kargs)
    jobs.append(job)
  out=processJobsRedux(jobs,redux=redux,reduxArgs=reduxArgs, reduxInPlace=reduxInPlace,numThreads=numThreads)
  return out

In [5]:
def progressBar(value, end_value, start_time, bar_length=20):
    percent = float(value) / end_value
    arrow = '-' * int(round(percent * bar_length)-1) + '>'
    spaces = ' ' * (bar_length - len(arrow))
    remaining = int(((time.time()-start_time)/value)*(end_value-value)/60)
    sys.stdout.write("\rCompleted: [{0}] {1}% - {2} minutes remaining.".format(arrow + spaces, int(round(percent * 100)), remaining))
    sys.stdout.flush()

In [6]:
def get_ohlcv(df_group):
  ohlc = df_group['price'].ohlc()
  ohlc['volume'] = df_group['size'].sum()
  ohlc['vwap'] = df_group.apply(lambda x: (x['price']*x['size']).sum()/x['size'].sum())
  ohlc['twap'] = df_group['price'].mean()
  ohlc['tick_count'] = df_group['price'].count()
  ohlc['twap_logr'] = np.log(ohlc['twap']) - np.log(ohlc['twap'].shift(1))
  return ohlc

def get_time_bar(df, freq="5Min"):
  df_group = df.groupby(pd.Grouper(freq=freq))
  ohlcv = get_ohlcv(df_group)
  return ohlcv

def get_tick_bar(df, tick_per_bar=10, num_of_bars=None):
  if not tick_per_bar:
    tick_per_bar = int(df.shape[0] / num_of_bars)
  tick_group = df.reset_index().assign(grpId=lambda x: x.index // tick_per_bar)
  dates = tick_group.groupby('grpId', as_index=False).first()['dates']
  df_group =  tick_group.groupby('grpId')
  ohlcv = get_ohlcv(df_group)
  ohlcv.set_index(dates, drop=True, inplace=True)
  return ohlcv

def get_volume_bar(df, volume_per_bar=10000, num_of_bars=None):
  df['cum_size'] = df['size'].cumsum() 
  if not volume_per_bar:
    total_vol = df['cum_size'].values[-1]
    volume_per_bar = total_vol / num_of_bars
    volume_per_bar = round(volume_per_bar, -2) # round to the nearest hundred
  tick_group = df.reset_index().assign(grpId=lambda x: x.cum_size // volume_per_bar)
  dates = tick_group.groupby('grpId', as_index=False).first()['dates']
  df_group =  tick_group.groupby('grpId')
  ohlcv = get_ohlcv(df_group)
  ohlcv.set_index(dates, drop=True, inplace=True)
  return ohlcv

def get_dollar_bar(df, dollar_per_bar=100000, num_of_bars=None):
  df['dollar'] = df['price']*df['size']
  df['cum_dv'] = df['dollar'].cumsum() 
  if not dollar_per_bar:
    total_dvol = df['cum_dv'].values[-1]
    dollar_per_bar = total_dvol / num_of_bars
    dollar_per_bar = round(dollar_per_bar, -2) # round to the nearest hundred
  tick_group = df.reset_index().assign(grpId=lambda x: x.cum_dv // dollar_per_bar)
  dates = tick_group.groupby('grpId', as_index=False).first()['dates']
  df_group =  tick_group.groupby('grpId')
  ohlcv = get_ohlcv(df_group)
  ohlcv.set_index(dates, drop=True, inplace=True)
  return ohlcv

In [7]:
def plot_ohlcv(ohlcv):
  dt_all = pd.date_range(start=ohlcv.index[0],end=ohlcv.index[-1])
  dt_obs = [d.strftime("%Y-%m-%d") for d in ohlcv.index]
  dt_breaks = [d for d in dt_all.strftime("%Y-%m-%d").tolist() if not d in dt_obs]
  fig = make_subplots(rows=3, cols=1,
                      shared_xaxes=True,
                      vertical_spacing=0.05, specs=[[{"rowspan": 2}], 
                                                  [{}], 
                                                  [{}]])
  fig.add_trace(go.Candlestick(x=ohlcv.index, 
                              open=ohlcv.open, 
                              high=ohlcv.high,
                              low=ohlcv.low, 
                              close=ohlcv.close, name='Candlestick'), row=1, col=1)
  fig.add_trace(go.Bar(x=ohlcv.index, y=ohlcv.volume, marker_color='rgba(255, 100, 100, 0.7)', name='volume'), row=3, col=1)
  fig.update_yaxes(title_text="Price", row=1, col=1)
  fig.update_yaxes(title_text="Volume", row=3, col=1)
  fig.update_xaxes(
          rangeslider_visible=False,
          rangebreaks=[
              dict(bounds=["sat", "mon"]),  # hide weekends, eg. hide sat to before mon
              dict(bounds=[16, 9.5], pattern="hour"),  # hide hours outside of 9.30am-4pm
              dict(values=dt_breaks)  # hide empty dates
          ]
      )
  fig.update_layout(xaxis_rangeslider_visible=False)
  fig.show()

# Code Snippets

In [8]:
ohlcv = get_time_bar(df, freq="1B")
ohlcv.dropna(inplace=True)
close = ohlcv.close
ohlcv.head()

Unnamed: 0_level_0,open,high,low,close,volume,vwap,twap,tick_count,twap_logr
dates,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2020-01-03,129.33,129.9874,129.2912,129.64,526340,129.751774,129.719157,922,-0.003845
2020-01-06,129.0,129.8952,128.93,129.8952,655431,129.548003,129.493223,770,-0.001743
2020-01-07,129.52,129.58,129.1405,129.38,413423,129.376731,129.357347,908,-0.00105
2020-01-08,129.38,130.2999,129.24,129.76,449383,129.881903,129.858126,1028,0.003864
2020-01-09,130.3,130.38,129.92,130.3168,376142,130.161216,130.161563,614,0.002334


SNIPPET 5.1 WEIGHTING FUNCTION

In [9]:
def getWeights(d, size):
    # thres>0 drops insignificant weights
    w = [1.]
    for k in range(1, size):
        w_ = -w[-1] / k * (d - k + 1)
        # w = np.append(w, w_) # duno why w_ or w or something turns to np.ndarray suddenly, should be a list somewhat, may give a bug if d is not a np. float
        w.append(w_)
    w = np.array(w[ : : -1]).reshape(-1, 1)
    return w


def plotWeights(dRange, nPlots, size):
    w = pd.DataFrame()
    for d in np.linspace(dRange[0], dRange[1], nPlots):
        d = np.round(d,2)
        w_ = getWeights(d, size = size)
        w_ = pd.DataFrame(w_, index = range(w_.shape[0])[ : : -1], columns = [d])
        w = w.join(w_, how = 'outer')
    fig = w.plot()
    fig.show()
    return

In [10]:
plotWeights(dRange = [0, 1], nPlots = 11, size = 6)

In [11]:
plotWeights(dRange = [1, 2], nPlots = 11, size = 6)

SNIPPET 5.2 STANDARD FRACDIFF (EXPANDING WINDOW)

In [12]:
def fracDiff(series, d, thres = .01):
    """
    Increasing width window, with treatment of NaNs (Standard Fracdiff, expanding window)
    Note 1: For thres=1, nothing is skipped.
    Note 2: d can be any positive fractional, not necessarily bounded [0,1].
    """
    #1) Compute weights for the longest series
    w = getWeights(d, series.shape[0]) # each obs has a weight
    #2) Determine initial calcs to be skipped based on weight-loss threshold
    w_ = np.cumsum(abs(w)) # cumulative weights
    w_ /= w_[-1] # determine the relative weight-loss
    skip = w_[w_ > thres].shape[0]  # the no. of results where the weight-loss is beyond the acceptable value
    #3) Apply weights to values
    df = {} # empty dictionary
    for name in series.columns:
        # fill the na prices
        seriesF = series[[name]].fillna(method = 'ffill').dropna()
        df_ = pd.Series(dtype="float64") # create a pd series
        for iloc in range(skip, seriesF.shape[0]):
            loc = seriesF.index[iloc] # find the iloc th obs 
            
            test_val = series.loc[loc,name] # must resample if duplicate index
            if isinstance(test_val, (pd.Series, pd.DataFrame)):
                test_val = test_val.resample('1m').mean()
            
            if not np.isfinite(test_val).any():
                 continue # exclude NAs
            try: # the (iloc)^th obs will use all the weights from the start to the (iloc)^th
                df_.loc[loc]=np.dot(w[-(iloc+1):,:].T, seriesF.loc[:loc])[0,0]
            except:
                continue
        df[name] = df_.copy(deep = True)
    df = pd.concat(df, axis = 1)
    return df

In [13]:
close_fd = fracDiff(pd.DataFrame(close), d=0.4, thres = 1).iloc[:,0]
close_fd_c = fracDiff(pd.DataFrame(close), d=0.4, thres = 0.01).iloc[:,0]
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(go.Scatter(x=close.index, y=close, name="close"), secondary_y=False)
fig.add_trace(go.Scatter(x=close_fd.index, y=close_fd, name="Diff"), secondary_y=True)
fig.add_trace(go.Scatter(x=close_fd_c.index, y=close_fd_c, name="Diff Controlled"), secondary_y=True)
fig.update_yaxes(title_text="Price", secondary_y=False)
fig.update_yaxes(title_text="Diff Ret", secondary_y=True)
fig.show()

SNIPPET 5.3 THE NEW FIXED-WIDTH WINDOWFRACDIFF METHOD

In [14]:
def getWeights_FFD(d, thres):
    # thres>0 drops insignificant weights
    w = [1.]
    k = 1
    while abs(w[-1]) >= thres:  
        w_ = -w[-1] / k * (d - k + 1)
        w.append(w_)
        k += 1
    w = np.array(w[ : : -1]).reshape(-1, 1)[1 : ]  
    return w

def fracDiff_FFD(series, d, thres = 1e-5):
    """
    Constant width window (new solution)
    Note 1: thres determines the cut-off weight for the window
    Note 2: d can be any positive fractional, not necessarily bounded [0,1].
    """
    #1) Compute weights for the longest series
    w = getWeights_FFD(d, thres)
    # w = getWeights(d, series.shape[0])
    #w=getWeights_FFD(d,thres)
    width = len(w) - 1
    #2) Apply weights to values
    df = {} # empty dict
    for name in series.columns:
        seriesF = series[[name]].fillna(method = 'ffill').dropna()
        df_ = pd.Series(dtype="float64") # empty pd.series
        for iloc1 in range(width, seriesF.shape[0]):
            loc0 = seriesF.index[iloc1 - width]
            loc1 = seriesF.index[iloc1]
            if not np.isfinite(series.loc[loc1,name]):
                continue # exclude NAs
            #try: # the (iloc)^th obs will use all the weights from the start to the (iloc)^th
            df_[loc1] = np.dot(w.T, seriesF.loc[loc0 : loc1])[0, 0]
            # except:
            #     continue
      
        df[name] = df_.copy(deep = True)
    df = pd.concat(df, axis = 1)
    return df

In [15]:
close_ffd = fracDiff_FFD(pd.DataFrame(close), d=0.3, thres = 1e-3).iloc[:,0]
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(go.Scatter(x=close.index, y=close, name="close"), secondary_y=False)
fig.add_trace(go.Scatter(x=close_ffd.index, y=close_ffd, name="FFD"), secondary_y=True)
fig.update_yaxes(title_text="Price", secondary_y=False)
fig.update_yaxes(title_text="Diff Ret", secondary_y=True)
fig.show()

SNIPPET 5.4 FINDING THE MINIMUM D VALUE THAT PASSES THE ADF TEST

In [16]:
def plotMinFFD(df0):
    #path = './'
    #instName ='ES1_Index_Method12'
    out = pd.DataFrame(columns= ['adfStat', 'pVal', 'lags', 'nObs', '95% conf', 'corr'])
    #df0 = pd.read_csv(path + instName +'.csv',index_col = 0, parse_dates = True)
    for d in np.linspace(0, 1, 11):
        df1 = np.log(df0[['close']]).resample('1D').last() # downcast to daily obs
        df2 = fracDiff_FFD(df1, d, thres = .01)
        corr = np.corrcoef(df1.loc[df2.index, 'close'], df2['close'])[0, 1]
        df2 = adfuller(df2['close'], maxlag = 1, regression = 'c', autolag = None)
        out.loc[d] = list(df2[ : 4]) + [df2[4]['5%']] + [corr] # with critical value
    #out.to_csv(path + instName + '_testMinFFD.csv')
    #out[['adfStat', 'corr']].plot(secondary_y = 'adfStat')
    #plt.axhline(out['95% conf'].mean(), linewidth = 1, color = 'r', linestyle = 'dotted')
    #plt.savefig(path + instName + '_testMinFFD.png')
    return out

In [17]:
out = plotMinFFD(ohlcv)
out.head()

Unnamed: 0,adfStat,pVal,lags,nObs,95% conf,corr
0.0,-1.569758,0.498765,1.0,242.0,-2.873559,1.0
0.1,-1.859458,0.351367,1.0,237.0,-2.873814,0.996445
0.2,-2.36427,0.152076,1.0,236.0,-2.873866,0.974398
0.3,-3.0468,0.030749,1.0,235.0,-2.873919,0.92069
0.4,-3.846095,0.002469,1.0,236.0,-2.873866,0.840757


In [18]:
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(go.Scatter(x=out.index, y=out['corr'], name="corr"), secondary_y=False)
fig.add_trace(go.Scatter(x=out.index, y=out['adfStat'], name="adfStat"), secondary_y=True)
fig.add_hline(y=out['95% conf'].mean(), line_width=3, line_dash="dash", line_color="green", secondary_y=True)
fig.update_yaxes(title_text="corr", secondary_y=False)
fig.update_yaxes(title_text="adfStat", secondary_y=True)
fig.show()

In [19]:
def get_optimal_ffd(data, start = 0, end = 1, interval = 10, t=1e-5):
  d = np.linspace(start,end,interval)
  out = mpJobList(mp_get_optimal_ffd, ('molecules', d), redux = pd.DataFrame.append, data = data)
  return out

def mp_get_optimal_ffd(data, molecules, t = 1e-5):
  cols = ['adfStat','pVal','lags','nObs','95% conf']
  out = pd.DataFrame(columns=cols)
  for d in molecules:
    try:
      dfx = fracDiff_FFD(data.to_frame(),d,thres=t)
      dfx = sm.tsa.stattools.adfuller(dfx['close'], maxlag=1,regression='c',autolag=None)
      out.loc[d]=list(dfx[:4])+[dfx[4]['5%']]
    except Exception as e:
      print(f'{d} error: {e}')
  return out

def optimal_ffd(data, start = 0, end = 1, interval = 10, t=1e-5):
  for d in np.linspace(start, end, interval):    
    d = np.round(d, 2)
    dfx = fracDiff_FFD(data.to_frame(), d, thres = t)
    if sm.tsa.stattools.adfuller(dfx['close'], maxlag=1,regression='c',autolag=None)[1] < 0.05:
        return d
  print('no optimal d')
  return d

In [20]:
d = optimal_ffd(close, start = 0, end = 1, interval = 11, t=1e-2)
print("The optimal d is {}".format(d))

The optimal d is 0.3
