In [1]:
%matplotlib inline
import matplotlib

import numpy as np
import pandas as pd
from datetime import datetime
from fastparquet import write
import time

## Helper Functions

In [2]:
def get_ts_lag(time_series, lag=pd.Timedelta(hours=1)):
    """Compute time series at given lag"""
    # find time_series[t-1] integer indices given lag
    df0 = time_series.index.searchsorted(time_series.index - lag)
    df0 = df0[df0 > 0]  
    
    # align time_series[t-1] timestamps to time_series[t] timestamps 
    df0 = pd.Series(time_series.index[df0 - 1],
                   index=time_series.index[time_series.shape[0] - df0.shape[0] : ])
    
    df0 = pd.Series(time_series[df0.values].values, index=df0.index)
    
    return df0
    
def get_volatility(prices, span=100, delta=pd.Timedelta(hours=1)):
    """
    Compute price return of the form p[t]/p[t-1] - 1
    
    Input: prices :: pd series of prices
           span0  :: the width or lag of the ewm() filter
           delta  :: time interval of volatility to be computed
    Output: pd series of volatility for each given time interval
    """
    
    # find p[t-1] indices given delta
    df0 = prices.index.searchsorted(prices.index - delta)
    df0 = df0[df0 > 0]  
    
    # align p[t-1] timestamps to p[t] timestamps 
    df0 = pd.Series(prices.index[df0 - 1],
                   index=prices.index[prices.shape[0] - df0.shape[0] : ])
    
    # get values for each timestamps then compute returns
    df0 = prices.loc[df0.index] / prices.loc[df0.values].values - 1
    
    # estimate rolling standard deviation
    df0 = df0.ewm(span=span).std()
    df0 = df0[df0 != 0]
    
    return df0


def get_verticals(prices, delta=pd.Timedelta(hours=1)):
    """
    Returns the timestamps for vertical barriers given
    a strategy's holding period.
    
    Input:  prices :: pd series of prices
            delta  :: strategy's holding period
    Output: pd Series of timestamps for vertical barriers
    
    Implement code snippet 3.4 in "Advances in Financial Machines Learning"
    by Marcos Lopez De Padro.    
    """
    
    # find the vertical barrier index for each timestamp
    t1 = prices.index.searchsorted(prices.index + delta)
    t1 = t1[t1 < prices.shape[0]] 
    
    # retrieve the vertical barrier's timestamp
    t1 = prices.index[t1]
    
    # as a series
    t1 = pd.Series(t1, index=prices.index[:t1.shape[0]])
    return t1


def get_horizontals(prices, events, factors=[2, 2]):
    """
    Apply profit taking/stop loss based on volatility estimate, 
    if it takes place before t1 (vertical barrier). Return the
    timestamp of earliest stop-loss or profit taking.
    
    Input: prices  ::  pd series of prices
           events  ::  pd dataframe with 3 columns:
                       t1          ::: the timestamp of vertical barriers; 
                                       if t1=np.nan then no vertical barriers
                       thresholds  ::: unit height of top and bottom barriers
                       side        ::: the side of each bet; side = 1 is long, side = -1 is short
           factors ::  multipliers ::: threshold multiplier to set the height of top/bottom barriers
        
    Output: pd dataframe with 3 columns:
            t1            :: the timestamp of vertical barrier
            stop_loss     :: the timestamp for the lower barrier
            profit_taking :: the time stamp for the upper barrier
    
    The result of get_horizon() will then be used in get_labels() to assign labels based on
    which barrier is hit first.  
    
    NOTE: to get "events", we first assign ""thresholds" and "t1" columns to data 
    using get_volatility() for thresholds and get_verticals() for t1. That is:
        
           data = data.assign(threshold=get_vol(data.price)).dropna()
           data = data.assign(t1=get_horizontals(data)).dropna()
           events = data[['t1', 'threshold']] 
           events = events.assign(side=pd.Series(1., events.index))
    
    Implement code snippet 3.2 in "Advances in Financial Machines Learning"
    by Marcos Lopez De Padro.  
    
    A good read to understand AFML implementation is by Maks Ivanov on
    https://towardsdatascience.com/financial-machine-learning-part-1-labels-7eeed050f32e.   
    """ 
    
    out = events[['t1']].copy(deep=True)
    
    # set upper threshold
    if factors[0] > 0:
        thresh_upper = factors[0] * events['threshold']
    else: 
        thresh_upper = pd.Series(index=events.index)      # NaN; no upper threshold
        
    # set lower threshold
    if factors[1] > 0:
        thresh_lower = -factors[1] * events['threshold']
    else:
        thresh_lower = pd.Series(index=events.index)      # NaN; no lower threshold
    
    # return the timestamp of earliest stop-loss or profit taking
    for loc, t1 in events['t1'].iteritems():              
        df0 = prices[loc:t1]                              # path prices
        df0 = (df0 / prices[loc] - 1) * events.bet[loc]   # path returns
        out.loc[loc, 'stop_loss'] = df0[df0 < thresh_lower[loc]].index.min()   # earliest stop loss
        out.loc[loc, 'take_profit'] = df0[df0 > thresh_upper[loc]].index.min() # earliest profit taking
    return out

def get_labels(touches):
    """
    Assigns a label in {-1, 0, 1} depending on which of the 
    three barriers is hit first. 
    
    Input: touches:: get_horizontals(data_ohlc.close, events, [1,1]),
                     a dataframe with three columns:
                     t1            :: the timestamp of vertical barrier
                     stop_loss     :: the timestamp for the lower barrier
                     profit_taking :: the time stamp for the upper barrier
    The result of get_horizon() will then be used to assign labels based on
    which barrier is hit first.
    
    Based on Maks Ivanov's implementation of MLDP's triple-barrier labeling method.
    """
    
    out = touches.copy(deep=True)
    # pandas df.min() ignores NaN values
    first_touch = touches[['stop_loss', 'take_profit']].min(axis=1)
    for loc, t in first_touch.items():
        if pd.isnull(t):
            out.loc[loc, 'label'] = 0
        elif t == touches.loc[loc, 'stop_loss']:
            out.loc[loc, 'label'] = -1
        else:
            out.loc[loc, 'label'] = 1
    return out


def labeling(df, h=pd.Timedelta(hours=1), v=pd.Timedelta(hours=1), m=[2, 2]):
    """
    Implement Triple-Barrier labeling method in 
    "Advances in Financial Machines Learning" by Marcos Lopez De Padro.    
    
    Parameters
    ----------
    df:  pd.DataFrame
    h:   Timedelta for holding period; default 1 hour
    m:   list of integers [pt, Sl] for profit taking and stop-loss limit
         as multiple of dynamic volatility    
    v:   Timedelta for computing volatility; default 5 hours
    """

    # add thresholds and vertical barrier (t1) columns
    df = df.assign(threshold=get_volatility(df.price, delta=v), 
                             t1=get_verticals(df, delta=h)).dropna()
    
    # events are [t1, threshold, side]
    events = df[['t1', 'threshold']]
    events = events.assign(bet=pd.Series(1., events.index)) # long only
    
    # get the timestamps for [t1, stop_loss, take_profit]
    touches = get_horizontals(df.price, events, m)
    # assign labels based on which barrier is hit first
    touches = get_labels(touches)
    
    # add touches timestamps and label
    df = pd.concat( [df.loc[:, 'price':'threshold'], 
                        touches.loc[:, 't1':'label']], axis=1)
    
    return df 

## Load Data and Aggregate by Timestamp

In [136]:
## --- Read csv files and reformat ---

start = time.time()
data = pd.read_csv('data/20190722.csv')
data = data[data.symbol == 'XBTUSD']

data = (data.groupby(['timestamp'], as_index=False)
        .agg({'price': 'mean', 'size': 'sum', 'side': 'first', 'tickDirection': 'first'}))
data['timestamp'] = data.timestamp.map(lambda t: datetime.strptime(t[:-3], "%Y-%m-%dD%H:%M:%S.%f")) 
data.set_index('timestamp', inplace=True)
data.sort_index(inplace=True)
print(time.time() - start)

8.053701162338257


timestamp
2019-07-22 00:00:03.660879    10591.500000
2019-07-22 00:00:03.742163    10591.500000
2019-07-22 00:00:03.750367    10591.500000
2019-07-22 00:00:03.767575    10591.500000
2019-07-22 00:00:03.824915    10591.500000
2019-07-22 00:00:04.527047    10591.000000
2019-07-22 00:00:04.654762    10591.888889
2019-07-22 00:00:04.672471    10592.000000
2019-07-22 00:00:04.725460    10592.875000
2019-07-22 00:00:04.815028    10594.000000
Name: price, dtype: float64

In [174]:
df

Unnamed: 0_level_0,price,size,side,tickDirection,threshold,t1
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2019-07-22 01:00:04.518521,10594.261905,29458,Buy,ZeroPlusTick,0.000010,2019-07-22 02:00:04.608794
2019-07-22 01:00:04.533937,10594.500000,503,Buy,ZeroPlusTick,0.000034,2019-07-22 02:00:04.608794
2019-07-22 01:00:04.542574,10594.500000,62315,Buy,ZeroPlusTick,0.000042,2019-07-22 02:00:04.608794
2019-07-22 01:00:04.561420,10594.500000,5000,Buy,ZeroPlusTick,0.000045,2019-07-22 02:00:04.608794
2019-07-22 01:00:04.573490,10594.500000,894,Buy,ZeroPlusTick,0.000047,2019-07-22 02:00:04.608794
2019-07-22 01:00:04.621292,10594.500000,30567,Buy,ZeroPlusTick,0.000048,2019-07-22 02:00:04.622871
2019-07-22 01:00:04.645765,10594.000000,1,Sell,MinusTick,0.000045,2019-07-22 02:00:04.681933
2019-07-22 01:00:04.668162,10594.500000,30000,Buy,PlusTick,0.000044,2019-07-22 02:00:04.681933
2019-07-22 01:00:04.678730,10594.500000,1400,Buy,ZeroPlusTick,0.000044,2019-07-22 02:00:04.681933
2019-07-22 01:00:04.734848,10594.500000,50,Buy,ZeroPlusTick,0.000053,2019-07-22 02:00:05.109812


In [148]:
# add thresholds and vertical barrier (t1) columns
df = df.assign(threshold=get_volatility(df.price, delta=v), 
                         t1=get_verticals(df, delta=h)).dropna()

# events are [t1, threshold, side]
events = df[['t1', 'threshold']]
events = events.assign(bet=pd.Series(1., events.index)) # long only

In [179]:
out = events[['t1']].copy(deep=True)
thresh_upper = factors[0] * events['threshold']
thresh_lower = -factors[1] * events['threshold']

# return the timestamp of earliest stop-loss or profit taking

start = time.time()
for loc, t1 in events['t1'][:3000].iteritems():              
    df0 = (prices[loc:t1] / prices[loc] - 1) * events.bet[loc]   # path returns
    out.loc[loc, 'stop_loss'] = df0[df0 < thresh_lower[loc]].index.min()   # earliest stop loss
    out.loc[loc, 'take_profit'] = df0[df0 > thresh_upper[loc]].index.min() # earliest profit taking
print(time.time() - start)

### TRY ROLLING WINDOW IN PANDAS ###

38.2390079498291


In [None]:
out = events[['t1']].copy(deep=True)
thresh_upper = factors[0] * events['threshold']
thresh_lower = -factors[1] * events['threshold']

start = time.time()
for loc, t1 in events['t1'][:3000].iteritems():
    df0 = prices[loc:t1].pct_change().add(1).cumprod().sub(1)
    out.loc[loc, 'stop_loss'] = df0[df0 < thresh_lower[loc]].index.min()   # earliest stop loss
    out.loc[loc, 'take_profit'] = df0[df0 > thresh_upper[loc]].index.min() # earliest profit taking
print(time.time() - start)

In [175]:
prices[:20].pct_change().add(1).cumprod().sub(1)

timestamp
2019-07-22 00:00:03.660879         NaN
2019-07-22 00:00:03.742163    0.000000
2019-07-22 00:00:03.750367    0.000000
2019-07-22 00:00:03.767575    0.000000
2019-07-22 00:00:03.824915    0.000000
2019-07-22 00:00:04.527047   -0.000047
2019-07-22 00:00:04.654762    0.000037
2019-07-22 00:00:04.672471    0.000047
2019-07-22 00:00:04.725460    0.000130
2019-07-22 00:00:04.815028    0.000236
2019-07-22 00:00:05.145412    0.000236
2019-07-22 00:00:08.290639    0.000236
2019-07-22 00:00:08.790551    0.000551
2019-07-22 00:00:09.106780    0.000803
2019-07-22 00:00:09.117630    0.000803
2019-07-22 00:00:09.281116    0.000803
2019-07-22 00:00:09.307383    0.000803
2019-07-22 00:00:09.351334    0.000803
2019-07-22 00:00:09.472615    0.000803
2019-07-22 00:00:09.485726    0.000803
Name: price, dtype: float64

In [176]:
prices[:20]

timestamp
2019-07-22 00:00:03.660879    10591.500000
2019-07-22 00:00:03.742163    10591.500000
2019-07-22 00:00:03.750367    10591.500000
2019-07-22 00:00:03.767575    10591.500000
2019-07-22 00:00:03.824915    10591.500000
2019-07-22 00:00:04.527047    10591.000000
2019-07-22 00:00:04.654762    10591.888889
2019-07-22 00:00:04.672471    10592.000000
2019-07-22 00:00:04.725460    10592.875000
2019-07-22 00:00:04.815028    10594.000000
2019-07-22 00:00:05.145412    10594.000000
2019-07-22 00:00:08.290639    10594.000000
2019-07-22 00:00:08.790551    10597.333333
2019-07-22 00:00:09.106780    10600.000000
2019-07-22 00:00:09.117630    10600.000000
2019-07-22 00:00:09.281116    10600.000000
2019-07-22 00:00:09.307383    10600.000000
2019-07-22 00:00:09.351334    10600.000000
2019-07-22 00:00:09.472615    10600.000000
2019-07-22 00:00:09.485726    10600.000000
Name: price, dtype: float64

In [67]:
from pyspark import SparkContext
sc = SparkContext("local[*]", "temp")

# needed to convert RDDs into DataFrames
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
sqlContext = SQLContext(sc)

In [71]:
data_rdd = sqlContext.createDataFrame(data.reset_index(drop=False))

In [72]:
data_rdd.show()

+--------------------+------------------+------+----+-------------+
|           timestamp|             price|  size|side|tickDirection|
+--------------------+------------------+------+----+-------------+
|2019-07-22 00:00:...|           10591.5|  3000| Buy| ZeroPlusTick|
|2019-07-22 00:00:...|           10591.5|     2| Buy| ZeroPlusTick|
|2019-07-22 00:00:...|           10591.5|   437| Buy| ZeroPlusTick|
|2019-07-22 00:00:...|           10591.5|     2| Buy| ZeroPlusTick|
|2019-07-22 00:00:...|           10591.5|   763| Buy| ZeroPlusTick|
|2019-07-22 00:00:...|           10591.0|    25|Sell|    MinusTick|
|2019-07-22 00:00:...|10591.888888888889| 15000| Buy|     PlusTick|
|2019-07-22 00:00:...|           10592.0|  3000| Buy| ZeroPlusTick|
|2019-07-22 00:00:...|         10592.875| 75000| Buy| ZeroPlusTick|
|2019-07-22 00:00:...|           10594.0| 12000| Buy| ZeroPlusTick|
|2019-07-22 00:00:...|           10594.0|  5000| Buy| ZeroPlusTick|
|2019-07-22 00:00:...|           10594.0|  1000|

In [74]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [None]:
w = Window().rangeBetween()

## Testing labeling() function speed

In [None]:
h=pd.Timedelta(hours=1)
v=pd.Timedelta(hours=1)

# Testing labeling() speed:
for n in range(20000, 80000, 20000):
    start = time.time()
    x = labeling(data[:n], h=h, v=v)
    print(n, time.time() - start)

In [None]:
# start = time.time()
# data = pd.read_csv('data/20190722.csv')
# data = data[data.symbol == 'XBTUSD']
# data['timestamp'] = data.timestamp.map(lambda t: datetime.strptime(t[:-3], "%Y-%m-%dD%H:%M:%S.%f")) 
# # data = (data.groupby(pd.Grouper(key='timestamp', freq='1S'))
# #         .agg({'price': 'mean', 'size': 'sum', 'side': 'first', 'tickDirection': 'first'}))
# data.sort_index(inplace=True)
# print(time.time() - start)