This file contains all the functions implemented through the book.

In [46]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

from typing import Tuple, List, Union

# Chapter 2. Financial Data Structures

In [47]:
def pcaWeights(cov: np.ndarray, riskDist: np.ndarray = None,
               riskTarget: float = 1.) -> np.ndarray:
    eVal, eVec = np.linalg.eigh(cov)
    indices = eVal.argsort()[::-1]
    eVal, eVec = eVal[indices], eVec[:, indices]    # sorting by decreasing eVal (i.e. decreasing variance)
    if riskDist is None:
        riskDist = np.zeros(cov.shape[0])
        riskdist[-1] = 1.
    loads = riskTarget * (riskDist / eVal) ** 0.5
    weights = np.dot(eVec, np.reshape(loads, (-1, 1)))
    return weights

In [48]:
# symmetrical CUSUM filter
def getTEvents(gRaw: pd.Series, h: float) -> np.ndarray:
    gRaw = gRaw[~gRaw.index.duplicated(keep='first')]
    tEvents, sPos, sNeg = [], 0, 0
    diff = gRaw.diff()
    for i in diff.index[1:]:
        sPos, sNeg = max(0, sPos + diff.loc[i]), min(0, sNeg + diff.loc[i])
        if sNeg < -h:
            sNeg = 0
            tEvents.append(i)
        elif sPos > h:
            sPos = 0
            tEvents.append(i)
    return pd.DatetimeIndex(tEvents)

In [49]:
# based on https://towardsdatascience.com/advanced-candlesticks-for-machine-learning-i-tick-bars-a8b93728b4c5
def get_tick_bars(prices: np.ndarray, vols: np.ndarray,
                  times: np.ndarray, freq: int) -> np.ndarray:
    bars = np.zeros(shape=(len(range(freq, len(prices), freq)), 6), dtype=object)
    ind = 0
    for i in range(freq, len(prices), freq):
        bars[ind][0] = pd.Timestamp(times[i - 1])          # time
        bars[ind][1] = prices[i - freq]                    # open
        bars[ind][2] = np.max(prices[i - freq: i])         # high
        bars[ind][3] = np.min(prices[i - freq: i])         # low
        bars[ind][4] = prices[i - 1]                       # close
        bars[ind][5] = np.sum(vols[i - freq: i])           # volume
        ind += 1
    return bars

In [50]:
def get_volume_bars(prices: np.ndarray, vols: np.ndarray,
                    times: np.ndarray, bar_vol: int) -> np.ndarray:
    bars = np.zeros(shape=(len(prices), 6), dtype=object)
    ind = 0
    last_tick = 0
    cur_volume = 0
    for i in range(len(prices)):
        cur_volume += vols[i]
        if cur_volume >= bar_vol:
            bars[ind][0] = pd.Timestamp(times[i - 1])            # time
            bars[ind][1] = prices[last_tick]                     # open
            bars[ind][2] = np.max(prices[last_tick: i + 1])      # high
            bars[ind][3] = np.min(prices[last_tick: i + 1])      # low
            bars[ind][4] = prices[i]                             # close
            bars[ind][5] = np.sum(vols[last_tick: i + 1])        # volume
            cur_volume = 0
            last_tick = i + 1
            ind += 1
    return bars[:ind]

In [51]:
def get_dollar_bars(prices: np.ndarray, vols: np.ndarray,
                    times: np.ndarray, bar_sum: int) -> np.ndarray:
    bars = np.zeros(shape=(len(prices), 6), dtype=object)
    ind = 0
    last_tick = 0
    cur_sum = 0
    for i in range(len(prices)):
        cur_sum += vols[i] * prices[i]
        if cur_sum >= bar_sum:
            bars[ind][0] = pd.Timestamp(times[i - 1])            # time
            bars[ind][1] = prices[last_tick]                     # open
            bars[ind][2] = np.max(prices[last_tick: i + 1])      # high
            bars[ind][3] = np.min(prices[last_tick: i + 1])      # low
            bars[ind][4] = prices[i]                             # close
            bars[ind][5] = np.sum(vols[last_tick: i + 1])        # volume
            cur_sum = 0
            last_tick = i + 1
            ind += 1
    return bars[:ind]

In [52]:
def get_bollinger_bands(dollar_bars: np.ndarray, alpha: float) -> np.ndarray:
    prices = dollar_bars[:, 4]    # taking close prices
    ma = (pd.Series(prices).rolling(20, min_periods=20).mean())      # 20 bars moving average
    sigma = pd.Series(prices).rolling(20, min_periods=20).std()
    b_upper, b_lower = (ma + alpha * sigma), (ma - alpha * sigma)    # bollinger bounds    
    return np.array([ma, b_upper, b_lower])

In [53]:
def get_returns(bars: np.ndarray) -> np.ndarray:
    close_prices = pd.Series(bars[:, 4], index=bars[:, 0])
    return (close_prices.diff() / close_prices)[1:, ].astype(float)

# Chapter 3. Labelling

In [54]:
def get_daily_vol(close: pd.Series, span0: int = 20) -> pd.Series:
    df0 = close.index.searchsorted(close.index - pd.Timedelta(days=1))
    df0 = df0[df0 > 0]
    df0 = pd.Series(close.index[df0 - 1], index=close.index[close.shape[0] - df0.shape[0]:])
    df0 = close.loc[df0.index] / close.loc[df0.values].values - 1    # daily returns
    df0 = df0.ewm(span=span0).std()
    return df0

In [55]:
def apply_tripple_barrier(close: pd.Series, events: pd.DataFrame,
                                   pt_sl: List, molecule: np.ndarray) -> pd.DataFrame:
    '''
    Labeling observations using tripple-barrier method
    
        Parameters:
            close (pd.Series): close prices of bars
            events (pd.DataFrame): dataframe with columns:
                                   - t1: The timestamp of vertical barrier (if np.nan, there will not be
                                         a vertical barrier)
                                   - trgt: The unit width of the horizontal barriers
            pt_sl (list): list of two non-negative float values:
                          - pt_sl[0]: The factor that multiplies trgt to set the width of the upper barrier.
                                      If 0, there will not be an upper barrier.
                          - pt_sl[1]: The factor that multiplies trgt to set the width of the lower barrier.
                                      If 0, there will not be a lower barrier.
            molecule (np.ndarray):  subset of event indices that will be processed by a
                                    single thread (will be used later)
        
        Returns:
            out (pd.DataFrame): dataframe with columns [pt, sl, t1] corresponding to timestamps at which
                                each barrier was touched (if it happened)
    '''
    events_ = events.loc[molecule]
    out = events_[['t1']].copy(deep=True)
    if pt_sl[0] > 0:
        pt = pt_sl[0] * events_['trgt']
    else:
        pt = pd.Series(data=[np.nan] * len(events.index), index=events.index)    # NaNs
    if pt_sl[1] > 0:
        sl = -pt_sl[1] * events_['trgt']
    else:
        sl = pd.Series(data=[np.nan] * len(events.index), index=events.index)    # NaNs
    
    for loc, t1 in events_['t1'].fillna(close.index[-1]).iteritems():
        df0 = close[loc: t1]                                       # path prices
        df0 = (df0 / close[loc] - 1) * events_.at[loc, 'side']     # path returns
        out.loc[loc, 'sl'] = df0[df0 < sl[loc]].index.min()        # earlisest stop loss
        out.loc[loc, 'pt'] = df0[df0 > pt[loc]].index.min()        # earlisest profit taking
    return out

In [56]:
# including metalabeleing possibility
def get_events_tripple_barrier(
    close: pd.Series, tEvents: np.ndarray, pt_sl: float, trgt: pd.Series, minRet: float,
    numThreads: int = 1, t1: Union[pd.Series, bool] = False, side: pd.Series = None
) -> pd.DataFrame:
    '''
    Getting times of the first barrier touch
    
        Parameters:
            close (pd.Series): close prices of bars
            tEvents (np.ndarray): np.ndarray of timestamps that seed every barrier (they can be generated
                                  by CUSUM filter for example)
            pt_sl (float): non-negative float that sets the width of the two barriers (if 0 then no barrier)
            trgt (pd.Series): s series of targets expressed in terms of absolute returns
            minRet (float): minimum target return required for running a triple barrier search
            numThreads (int): number of threads to use concurrently
            t1 (pd.Series): series with the timestamps of the vertical barriers (pass False
                            to disable vertical barriers)
            side (pd.Series) (optional): metalabels containing sides of bets
        
        Returns:
            events (pd.DataFrame): dataframe with columns:
                                       - t1: timestamp of the first barrier touch
                                       - trgt: target that was used to generate the horizontal barriers
                                       - side (optional): side of bets
    '''
    trgt = trgt.loc[trgt.index.intersection(tEvents)]
    trgt = trgt[trgt > minRet]
    if t1 is False:
        t1 = pd.Series(pd.NaT, index=tEvents)
    if side is None:
        side_, pt_sl_ = pd.Series(np.array([1.] * len(trgt.index)), index=trgt.index), [pt_sl[0], pt_sl[0]]
    else:
        side_, pt_sl_ = side.loc[trgt.index.intersection(side.index)], pt_sl[:2]
    events = pd.concat({'t1': t1, 'trgt': trgt, 'side': side_}, axis=1).dropna(subset=['trgt'])
    df0 = apply_tripple_barrier(close, events, pt_sl_, events.index)
#     df0 = mpPandasObj(func=apply_tripple_barrier, pdObj=('molecule', events.index),
#                       numThreads=numThreads, close=close, events=events, pt_sl=[pt_sl, pt_sl])
    events['t1'] = df0.dropna(how='all').min(axis=1)
    if side is None:
        events = events.drop('side', axis=1)
    return events

In [57]:
def add_vertical_barrier(close: pd.Series, tEvents: np.ndarray, numDays: int) -> pd.Series:
    t1 = close.index.searchsorted(tEvents + pd.Timedelta(days=numDays))
    t1 = t1[t1 < close.shape[0]]
    t1 = pd.Series(close.index[t1], index=tEvents[:t1.shape[0]])    # adding NaNs to the end
    return t1

In [58]:
# including metalabeling possibility & modified to generate 0 labels
def get_bins(close: pd.Series, events: pd.DataFrame, t1: Union[pd.Series, bool] = False) -> pd.DataFrame:
    '''
    Generating labels with possibility of knowing the side (metalabeling)
    
        Parameters:
            close (pd.Series): close prices of bars
            events (pd.DataFrame): dataframe returned by 'get_events' with columns:
                                   - index: event starttime
                                   - t1: event endtime
                                   - trgt: event target
                                   - side (optional): position side
            t1 (pd.Series): series with the timestamps of the vertical barriers (pass False
                            to disable vertical barriers)
        
        Returns:
            out (pd.DataFrame): dataframe with columns:
                                       - ret: return realized at the time of the first touched barrier
                                       - bin: if metalabeling ('side' in events), then {0, 1} (take the bet or pass)
                                              if no metalabeling, then {-1, 1} (buy or sell)
    '''
    events_ = events.dropna(subset=['t1'])
    px = events_.index.union(events_['t1'].values).drop_duplicates()
    px = close.reindex(px, method='bfill')
    out = pd.DataFrame(index=events_.index)
    out['ret'] = px.loc[events_['t1'].values].values / px.loc[events_.index] - 1
    if 'side' in events_:
        out['ret'] *= events_['side']
    out['bin'] = np.sign(out['ret'])
    if 'side' in events_:
        out.loc[out['ret'] <= 0, 'bin'] = 0
    else:
        if t1 is not None:
            vertical_first_touch_idx = events_[events_['t1'].isin(t1.values)].index
            out.loc[vertical_first_touch_idx, 'bin'] = 0
    return out

In [59]:
def drop_labels(labels: pd.DataFrame, min_pct: float = 0.05) -> pd.DataFrame:
    while True:
        df0 = labels['bin'].value_counts(normalize=True)
        if df0.min() > min_pct or df0.shape[0] < 3:
            break
        print('dropped label', df0.argmin(), df0.min())
        labels = labels[labels['bin'] != df0.index[df0.argmin()]]
    return labels

# Chapter 4. Sample Weights

In [60]:
def num_conc_events(closeIdx: np.ndarray, t1: pd.Series, molecule: np.ndarray) -> pd.Series:
    '''
    Computing the number of concurrent events per bar
    
        Parameters:
            closeIdx (np.ndarray): timestamps of close prices
            t1 (pd.Series): series with the timestamps of the vertical barriers
            molecule (np.ndarray): dates of events on which weights are computed
            
        Returns:
            pd.Series with number of labels concurrent at each timestamp
    '''
    t1 = t1.fillna(closeIdx[-1])
    t1 = t1[t1 >= molecule[0]]
    t1 = t1.loc[:t1[molecule].max()]
    iloc = closeIdx.searchsorted(pd.DatetimeIndex([t1.index[0], t1.max()]))
    count = pd.Series([0] * (iloc[1] + 1 - iloc[0]), index=closeIdx[iloc[0]: iloc[1] + 1])
    for tIn, tOut in t1.iteritems():
        count.loc[tIn: tOut] += 1
    return count.loc[molecule[0]: t1[molecule].max()]

In [61]:
def sample_weights(t1: pd.Series, num_conc_events: pd.Series, molecule: np.ndarray) -> pd.Series:
    '''
    Computing average uniqueness over the event's lifespan
    
        Parameters:
            t1 (pd.Series): series with the timestamps of the vertical barriers
            num_conc_events (pd.Series): number of concurrent events per bar
            molecule (np.ndarray): dates of events on which weights are computed
            
        Returns:
            weights (pd.Series): weights that represent the average uniqueness
    '''
    weights = pd.Series([0] * len(molecule), index=molecule)
    for tIn, tOut in t1.loc[weights.index].iteritems():
        weights.loc[tIn] = (1.0 / num_conc_events.loc[tIn: tOut]).mean()
    return weights

In [62]:
def get_ind_matrix(barIdx: np.ndarray, t1: pd.Series) -> pd.DataFrame:
    '''
    Deriving indicator matrix
    
        Parameters:
            barIdx (np.ndarray): indexes of bars
            t1 (pd.Series): series with the timestamps of the vertical barriers
            
        Returns:
            indM (pd.DataFrame): binary matrix indicating what bars influence the label for each observation
    '''
    indM = pd.DataFrame(0, index=barIdx, columns=range(t1.shape[0]))
    for i, (t0, t1) in enumerate(t1.iteritems()):
        indM.loc[t0:t1, i] = 1.0
    return indM


def get_avg_uniqueness(indM: pd.DataFrame) -> float:
    '''
    Compute average uniqueness from indicator matrix
    '''
    c = indM.sum(axis=1)
    u = indM.div(c, axis=0)
    avg_uniq = u[u > 0].mean()
    return avg_uniq


def seq_bootstrap(indM: pd.DataFrame, sLength: int = None) -> np.ndarray:
    '''
    Generate a sample via sequential bootstrap
    
        Parameters:
            indM (pd.DataFrame): binary matrix indicating what bars influence the label for each observation
            sLength (int) (optional): sample length (if None, equals number of columns in indM)
            
        Returns:
            phi (np.ndarray): array with indexes of the features sampled by sequential bootstrap
    '''
    if sLength is None:
        sLength = indM.shape[1]
    phi = []
    while len(phi) < sLength:
        avg_uniq = pd.Series()
        for i in indM:
            indM_ = indM[phi + [i]]
            avg_uniq.loc[i] = get_avg_uniqueness(indM_).iloc[-1]
        prob = avg_uniq / avg_uniq.sum()
        phi += [np.random.choice(indM.columns, p=prob)]
    return np.array(phi)

In [63]:
def gen_rand_t1(numObs: int, numBars: int, maxH: int) -> pd.Series:
    '''
    Generate random t1 series
    
        Parameters:
            numObs (int): number of observations for which t1 is generated
            numBars (int): number of bars
            maxH (int): upper bound for uniform distribution to determine the number of bars spanned by observation
        Returns:
            t1 (pd.Series)
    '''
    t1 = pd.Series()
    for i in range(numObs):
        idx = np.random.randint(0, numBars)
        val = idx + np.random.randint(1, maxH)
        t1.loc[idx] = val
    return t1.sort_index()


def aux_MC(numObs: int, numBars: int, maxH: int) -> dict:
    '''
    Generate random t1 series
    
        Parameters:
            numObs (int): number of observations for which t1 is generated
            numBars (int): number of bars
            maxH (int): upper bound for uniform distribution to determine the number of bars spanned by observation
        Returns:
            dict with average uniqueness derived by standard and sequential bootstrap algorithms
    '''
    t1 = gen_rand_t1(numObs, numBars, maxH)
    barIdx = range(t1.max() + 1)
    indM = get_ind_matrix(barIdx, t1)
    phi = np.random.choice(indM.columns, size=indM.shape[1])
    stdU = get_avg_uniqueness(indM[phi]).mean()
    phi = seq_bootstrap(indM)
    seqU = get_avg_uniqueness(indM[phi]).mean()
    return {'stdU': stdU, 'seqU': seqU}


def main_MC(numObs: int, numBars: int, maxH: int, numIters: int) -> None:
    '''
    Run MC simulation for comparing standard and sequential bootstraps
    
        Parameters:
            numObs (int): number of observations for which t1 is generated
            numBars (int): number of bars
            maxH (int): upper bound for uniform distribution to determine the number of bars spanned by observation
            numIters (int): number of MC iterations
        Returns:
            out (pd.DataFrame): dataframe containing uniqueness obtained by standard and sequential bootstraps
    '''
    out = pd.DataFrame()
    for i in range(numIters):
        out = pd.concat((out, pd.DataFrame([aux_MC(numObs, numBars, maxH)])))
    return out

In [64]:
def sample_return_weights(
    t1: pd.Series, num_conc_events: pd.Series, close: pd.Series, molecule: np.ndarray
) -> pd.Series:
    '''
     Determination of sample weights by absolute return distribution
    
        Parameters:
            t1 (pd.Series): series with the timestamps of the vertical barriers
            num_conc_events (pd.Series): number of concurrent events per bar
            close (pd.Series): close prices
            molecule (np.ndarray): dates of events on which weights are computed
            
        Returns:
            weights (pd.Series): weights that absolute returns
    '''
    ret = np.log(close).diff()
    weights = pd.Series(index=molecule, dtype=object)
    for tIn, tOut in t1.loc[weights.index].iteritems():
        weights.loc[tIn] = (ret.loc[tIn: tOut] / num_conc_events.loc[tIn: tOut]).sum()
    return weights.abs()

In [65]:
def get_time_decay(tW: pd.Series, clfLastW: float = 1.0) -> pd.Series:
    '''
    Apply piecewise-linear decay to observed uniqueness. Newest observation gets weight=1,
    oldest observation gets weight=clfLastW.
    
        Parameters:
            tW (pd.Series): observed uniqueness
            clfLastW (float): weight for the oldest observation
        
        Returns:
            clfW (pd.Series): series with time-decay factors
    '''
    clfW = tW.sort_index().cumsum()
    if clfLastW >= 0:
        slope = (1.0 - clfLastW) / clfW.iloc[-1]
    else:
        slope = 1. / ((clfLastW + 1) * clfW.iloc[-1])
    const = 1.0 - slope * clfW.iloc[-1]
    clfW = const + slope * clfW
    clfW[clfW < 0] = 0
    return clfW

# Chapter 5. Fractionally Differentiated Features

In [66]:
def get_weights(d: float, size: int) -> np.ndarray:
    '''
    Computing the weights for differentiating the series
    
        Parameters:
            d (float): differentiating factor
            size (int): length of weights array
            
        Returns:
            w (np.ndarray): array contatining weights
    '''
    w = [1.0]
    for k in range(1, size):
        w_ = -w[-1] / k * (d - k + 1)
        w.append(w_)
    w = np.array(w[::-1]).reshape(-1, 1)
    return w


def plot_weights(dRange: list, nPlots: int, size: int) -> None:
    '''
    Generating plots for weights arrays for different differentiating factors
    
        Parameters:
            dRange (list): list with 2 floats - bounds of the interval
            nPlots (int): number of plots
            size(int): length of each weights array
            
        Returns:
            weights (np.ndarray): array contatining weights
    '''
    w = pd.DataFrame()
    for d in np.linspace(dRange[0], dRange[1], nPlots):
        w_ = get_weights(d, size)
        w_ = pd.DataFrame(w_, index=range(w_.shape[0])[::-1], columns=[d])
        w = w.join(w_, how='outer')
    fig, ax = plt.subplots(figsize=(11, 7))
    ax.plot(w)
    ax.set_xlabel('$k$')
    ax.set_ylabel('$w_k$')
    ax.legend(np.round(np.linspace(dRange[0], dRange[1], nPlots), 2), loc='lower right')
    plt.show()

In [67]:
def frac_diff(series: pd.DataFrame, d: float, thres: float = 0.01) -> pd.DataFrame:
    '''
    Fractional differentiation with increasing width window
    Note 1: For thres=1, nothing is skipped
    Note 2: d can be any positive fractional, not necessarily bounded [0,1]
    
        Parameters:
            series (pd.DataFrame): dataframe with time series
            d (float): differentiating factor
            thres (float): threshold for skipping some of the first observations
        
        Returns:
            df (pd.DataFrame): dataframe with differentiated series
    '''
    w = get_weights(d, series.shape[0])
    w_ = np.cumsum(abs(w))
    w_ /= w_[-1]
    skip = w_[w_ > thres].shape[0]
    
    df = {}
    for name in series.columns:
        seriesF, df_ = series[[name]].fillna(method='ffill').dropna(), \
                       pd.Series(index=np.arange(series.shape[0]), dtype=object)
        for iloc in range(skip, seriesF.shape[0]):
            loc = seriesF.index[iloc]
            if not np.isfinite(series.loc[loc, name]):
                continue    # exclude NAs
            df_[loc] = np.dot(w[-(iloc + 1):, :].T, seriesF.loc[:loc])[0, 0]
        df[name] = df_.dropna().copy(deep=True)
    df = pd.concat(df, axis=1)
    return df

In [68]:
def get_weights_ffd(d: float, thres: float) -> np.ndarray:
    '''
    Computing the weights for differentiating the series with fixed window size
    
        Parameters:
            d (float): differentiating factor
            thres (float): threshold for cutting off weights
            
        Returns:
            w (np.ndarray): array contatining weights
    '''
    w, k = [1.0], 1
    while True:
        w_ = -w[-1] / k * (d - k + 1)
        if abs(w_) < thres:
            break
        w.append(w_)
        k += 1
    w = np.array(w[::-1]).reshape(-1, 1)
    return w

In [69]:
def frac_diff_ffd(series: pd.DataFrame, d: float, thres: float = 1e-5) -> pd.DataFrame:
    '''
    Fractional differentiation with constant width window
    Note 1: thres determines the cut-off weight for the window
    Note 2: d can be any positive fractional, not necessarily bounded [0,1]
    
        Parameters:
            series (pd.DataFrame): dataframe with time series
            d (float): differentiating factor
            thres (float): threshold for cutting off weights
        
        Returns:
            df (pd.DataFrame): dataframe with differentiated series
    '''
    w = get_weights_ffd(d, thres)
    width = len(w) - 1
    
    df = {}
    for name in series.columns:
        seriesF, df_ = series[[name]].fillna(method='ffill').dropna(), \
                       pd.Series(index=np.arange(series.shape[0]), dtype=object)
        for iloc1 in range(width, seriesF.shape[0]):
            loc0, loc1 = seriesF.index[iloc1 - width], seriesF.index[iloc1]
            if not np.isfinite(series.loc[loc1,name]):
                continue    # exclude NAs
            df_[loc1]=np.dot(w.T,seriesF.loc[loc0:loc1])[0, 0]
        df[name] = df_.dropna().copy(deep=True)
    df = pd.concat(df, axis=1)
    return df

In [70]:
def plot_min_ffd(process: Union[np.ndarray, pd.Series, pd.DataFrame],
                 apply_constant_width: bool = True, thres: float = 0.01) -> None:
    '''
    Finding the minimum differentiating factor that passes the ADF test
    
        Parameters:
            process (np.ndarray): array with random process values
            apply_constant_width (bool): flag that shows whether to use constant width window (if True)
                                         or increasing width window (if False)
            thres (float): threshold for cutting off weights
    '''
    out = pd.DataFrame(columns=['adfStat', 'pVal', 'lags', 'nObs', '95% conf'], dtype=object)
    printed = False
    
    for d in np.linspace(0, 2, 21):
        if apply_constant_width:
            process_diff = frac_diff_ffd(pd.DataFrame(process), d, thres)
        else:
            process_diff = frac_diff(pd.DataFrame(process), d, thres)    
        test_results = adfuller(process_diff, maxlag=1, regression='c', autolag=None)
        out.loc[d] = list(test_results[:4]) + [test_results[4]['5%']]
        if test_results[1] <= 0.05 and not printed:
            print(f'Minimum d required: {d}')
            printed = True
    
    fig, ax = plt.subplots(figsize=(11, 7))
    ax.plot(out['adfStat'])
    ax.axhline(out['95% conf'].mean(), linewidth=1, color='r', linestyle='dotted')
    ax.set_title('Searching for minimum $d$')
    ax.set_xlabel('$d$')
    ax.set_ylabel('ADF statistics')
    plt.show()

In [71]:
def print_adf_results(process: np.ndarray) -> None:
    '''
    Printing the results of the Augmented Dickey–Fuller test
    '''
    adf, p_value, _, _, _ = adfuller(process, maxlag=1, regression='c', autolag=None)
    print(f'ADF statistics: {adf}')
    print(f'p-value: {p_value}')