In [1]:
import numpy as np
import pandas as pd
import yfinance as yf
from typing import  List

In [2]:
# Download Data

apple = yf.download("AAPL", start="2020-01-01", end="2025-01-01")

  apple = yf.download("AAPL", start="2020-01-01", end="2025-01-01")
[*********************100%***********************]  1 of 1 completed


In [4]:
apple.columns = ["Close", "High", "Low", "Open", "Volume"]

In [5]:
apple

Unnamed: 0_level_0,Close,High,Low,Open,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2020-01-02,72.468254,72.528574,71.223252,71.476592,135480400
2020-01-03,71.763725,72.523754,71.539337,71.696167,146322800
2020-01-06,72.335556,72.374162,70.634539,70.885472,118387200
2020-01-07,71.995354,72.600960,71.775789,72.345204,108872000
2020-01-08,73.153496,73.455095,71.698581,71.698581,132079200
...,...,...,...,...,...
2024-12-24,257.037506,257.047440,254.140589,254.339701,23234700
2024-12-26,257.853760,258.928914,256.470034,257.027510,27237100
2024-12-27,254.439224,257.535238,251.920617,256.669129,42355300
2024-12-30,251.064484,252.358634,249.621015,251.094347,35557500


## Event based labeling

In [8]:
prices = apple["Close"]

In [None]:
def cusum_filter_events_dynamic_threshold(
        prices: pd.Series,
        threshold: pd.Series
) -> pd.DatetimeIndex:
    """
    Detect events using the Symmetric Cumulative Sum (CUSUM) filter.

    The Symmetric CUSUM filter is a change-point detection algorithm used to identify events where the price difference
    exceeds a predefined threshold.

    :param prices: A pandas Series of prices.
    :param threshold: A pandas Series containing the predefined threshold values for event detection.
    :return: A pandas DatetimeIndex containing timestamps of detected events.

    References:
    - De Prado, M. (2018) Advances in financial machine learning. John Wiley & Sons. (Methodology: 39)
    """
    time_events, shift_positive, shift_negative = [], 0, 0
    price_delta = prices.diff().dropna()
    thresholds = threshold.copy()
    price_delta, thresholds = price_delta.align(thresholds, join="inner", copy=False)

    for (index, value), threshold_ in zip(price_delta.to_dict().items(), thresholds.to_dict().values()):
        shift_positive = max(0, shift_positive + value)
        shift_negative = min(0, shift_negative + value)

        if shift_negative < -threshold_:
            shift_negative = 0
            time_events.append(index)

        elif shift_positive > threshold_:
            shift_positive = 0
            time_events.append(index)

    return pd.DatetimeIndex(time_events)

In [None]:
def daily_volatility_with_log_returns(
        close: pd.Series,
        span: int = 100
) -> pd.Series:
    """
    Calculate the daily volatility at intraday estimation points using Exponentially Weighted Moving Average (EWMA).

    :param close: A pandas Series of daily close prices.
    :param span: The span parameter for the Exponentially Weighted Moving Average (EWMA).
    :return: A pandas Series containing daily volatilities.

    References:
    - De Prado, M. (2018) Advances in financial machine learning. John Wiley & Sons. (Methodology: Page 44)
    """
    df1 = close.index.searchsorted(close.index - pd.Timedelta(days=1))
    df1 = df1[df1 > 0]
    df = pd.DataFrame(index=close.index[close.shape[0] - df1.shape[0]:])
 
    df['yesterday'] = close.iloc[df1].values
    df['two_days_ago'] = close.iloc[df1 - 1].values
    returns = np.log(df['yesterday'] / df['two_days_ago'])
    
    stds = returns.ewm(span=span).std().rename("std")

    return stds

In [None]:
def vertical_barrier(
    close: pd.Series,
    time_events: pd.DatetimeIndex,
    number_days: int
) -> pd.Series:
    """
    Shows one way to define a vertical barrier.

    :param close: A dataframe of prices and dates.
    :param time_events: A vector of timestamps.
    :param number_days: A number of days for the vertical barrier.
    :return: A pandas series with the timestamps of the vertical barriers.
    """
    timestamp_array = close.index.searchsorted(time_events + pd.Timedelta(days=number_days))
    timestamp_array = timestamp_array[timestamp_array < close.shape[0]]
    timestamp_array = pd.Series(close.index[timestamp_array], index=time_events[:timestamp_array.shape[0]])
    return timestamp_array


In [None]:

def triple_barrier(
    close: pd.Series,
    events: pd.DataFrame,
    profit_taking_stop_loss: list[float, float],
    molecule: list
) -> pd.DataFrame:
    # Filter molecule to ensure all timestamps exist in events
    molecule = [m for m in molecule if m in events.index]

    # Continue with the existing logic
    events_filtered = events.loc[molecule]
    output = events_filtered[['End Time']].copy(deep=True)

    if profit_taking_stop_loss[0] > 0:
        profit_taking = profit_taking_stop_loss[0] * events_filtered['Base Width']
    else:
        profit_taking = pd.Series(index=events.index)

    if profit_taking_stop_loss[1] > 0:
        stop_loss = -profit_taking_stop_loss[1] * events_filtered['Base Width']
    else:
        stop_loss = pd.Series(index=events.index)

    for location, timestamp in events_filtered['End Time'].fillna(close.index[-1]).items():
        df = close[location:timestamp]
        df = np.log(df / close[location]) * events_filtered.at[location, 'Side']
        output.loc[location, 'stop_loss'] = df[df < stop_loss[location]].index.min()
        output.loc[location, 'profit_taking'] = df[df > profit_taking[location]].index.min()

    return output

In [None]:
def meta_events(
    close: pd.Series,
    time_events: pd.DatetimeIndex,
    ptsl: List[float],
    target: pd.Series,
    return_min: float,
    num_threads: int,
    timestamp: pd.Series = False,
    side: pd.Series = None
) -> pd.DataFrame:
    # Filter target by time_events and return_min
    target = target.loc[time_events]
    target = target[target > return_min]

    # Ensure timestamp is correctly initialized
    if timestamp is False:
        timestamp = pd.Series(pd.NaT, index=time_events)
    else:
        timestamp = timestamp.loc[time_events]

    if side is None:
        side_position, profit_loss = pd.Series(1., index=target.index), [ptsl[0], ptsl[0]]
    else:
        side_position, profit_loss = side.loc[target.index], ptsl[:2]

    # Include 'target' and 'timestamp' in the events DataFrame
    events = pd.concat({'End Time': timestamp, 'Base Width': target, 'Side': side_position, 'target': target, 'timestamp': timestamp}, axis=1).dropna(subset=['Base Width'])


    df0 = list(map(
        triple_barrier,
        [close] * num_threads,
        [events] * num_threads,
        [profit_loss] * num_threads,
        np.array_split(time_events, num_threads)
    ))
    df0 = pd.concat(df0, axis=0)

    events['End Time'] = df0.dropna(how='all').min(axis=1)

    if side is None:
        events = events.drop('Side', axis=1)

    # Return events including the 'target' and 'timestamp' columns
    return events , df0

In [None]:
def meta_labeling(
    events: pd.DataFrame,
    close: pd.Series
) -> pd.DataFrame:
    """
    Expands label to incorporate meta-labeling.

    :param events: DataFrame with timestamp of vertical barrier and unit width of the horizontal barriers.
    :param close: Series of close prices with date indices.
    :return: DataFrame containing the return and binary labels for each event.

    Reference:
    De Prado, M. (2018) Advances in financial machine learning. John Wiley & Sons.
    Methodology: 51
    """
    events_filtered = events.dropna(subset=['End Time'])
    all_dates = events_filtered.index.union(events_filtered['End Time'].values).drop_duplicates()
    close_filtered = close.reindex(all_dates, method='bfill')
    out = pd.DataFrame(index=events_filtered.index)
    out['End Time'] = events['End Time']
    out['Return of Label'] = close_filtered.loc[events_filtered['End Time'].values].values / close_filtered.loc[events_filtered.index] - 1

    if 'Side' in events_filtered:
        out['Return of Label'] *= events_filtered['Side']
    out['Label'] = np.sign(out['Return of Label'])  * (1 - (events['End Time'] == events['timestamp']))
    if 'Side' in events_filtered:
        out.loc[out['Return of Label'] <= 0, 'Label'] = 0
        out['Side'] = events_filtered['Side']
    return out

In [None]:
volatility = daily_volatility_with_log_returns(prices, 30)
filter_threshold = 1.5
moelcules = cusum_filter_events_dynamic_threshold(np.log(prices), filter_threshold * volatility)