In [None]:
import pandas as pd
import numpy as np
import re
import math

In [None]:
MSAD_PERIODS = [3, 5, 10, 25, 50]

In [None]:
df = pd.read_excel('../src/stock_data.xlsx', sheet_name='Si')

In [None]:
def prepare_columns(df: pd.DataFrame) -> pd.DataFrame:
    '''
    Renames columns (removes special symbols) and 
    '''

    df = df.rename(columns={c: re.sub("[<>]", "", c) for c in df.columns})
    df["DATE"] = pd.to_datetime(df["DATE"], format="%y%m%d")
    df["TIME"] = pd.to_datetime(df["TIME"], format="%H%M%S").dt.time
    df["DATETIME"] = pd.to_datetime(df["DATE"].astype(str) + ' ' + df["TIME"].astype(str))  
    
    
    df.drop(
        [
            "DATE",
            "TIME",
            "DATETIME_KEY",
            "TICKER",
            "PER"
        ],
        axis=1,
        inplace=True
    )

    for col in [
        "OPEN",
        "CLOSE",
        "HIGH",
        "LOW",
        "VOL"
    ]:
        df[col] = df[col].astype('float32')

    return df

def add_pivot(df: pd.DataFrame) -> pd.DataFrame:
    '''
    Function calculates Pivot ((High + Low + Close) / 3).
    '''

    df['PIVOT'] = (df['HIGH'] + df['LOW'] + df['CLOSE']) / 3
    return df

def add_movings(df: pd.DataFrame) -> pd.DataFrame:
    '''
    Function adds moving averages to dataframe.
    Calculates MAs by mean and standard deviation.
    '''

    for per in MSAD_PERIODS:
        df[f'MSAD_{per}_mean'] = df['PIVOT'].rolling(per).mean()
        df[f'MSAD_{per}_std'] = df['PIVOT'].rolling(per).std()
        df[f'VOL_MA_{per}_mean'] = df['VOL'].rolling(per).mean()
        df[f'VOL_MA_{per}_std'] = df['VOL'].rolling(per).std()
        
    return df

def add_fractals(df: pd.DataFrame) -> pd.DataFrame:
    '''
    Function calculates fractals Up and Down.
    - Fractal Up, var.1:
        Candle[1] High > Candle[0] High and
        Candle[1] High > Candle[2] High.
    - Fractal Up, var.2:
        Candle[1] High == Candle[0] High and
        Candle[2] High < Candle[1] High and
        Candle[2] Close < Candle[2] Open (red candle).
    - Fractal Down, var.1:
        Candle[1] Low < Candle[0] Low and
        Candle[1] Low < Candle[2] Low.
    - Fractal Down, var.2:
        Candle[1] Low == Candle[0] Low and
        Candle[1] Low < Candle[2] Low and
        Candle[2] Close > Candle[2] Open (green candle).
    '''

    df_len = df.shape[0]
    fractals_down = [0] * df_len
    fractals_up = [0] * df_len
    
    for i in range(1, df_len-1):
        # fractal Down
        fractal_down1 = (
            (df['LOW'][i] < df['LOW'][i-1]) and
            (df['LOW'][i] < df['LOW'][i+1])
        )
        fractal_down2 = (
            (df['LOW'][i] == df['LOW'][i-1]) and
            (df['LOW'][i] < df['LOW'][i+1]) and
            (df['CLOSE'][i+1] > df['OPEN'][i+1])
        )
        if fractal_down1 or fractal_down2:
            fractals_down[i] = 1

        # fractal Up
        fractal_up1 = (
            (df['HIGH'][i] > df['HIGH'][i-1]) and
            (df['HIGH'][i] > df['HIGH'][i+1])
        )
        fractal_up2 = (
            (df['HIGH'][i] == df['HIGH'][i-1]) and
            (df['HIGH'][i] > df['HIGH'][i+1]) and
            (df['CLOSE'][i+1] < df['OPEN'][i+1])
        )
        if fractal_up1 or fractal_up2:
            fractals_up[i] = 1

    df['IS_FRACTAL_DOWN'] = fractals_down
    df['IS_FRACTAL_UP'] = fractals_up
    return df

def calc_end_correction(df: pd.DataFrame) -> pd.DataFrame:
    '''
    Function calculates end correction.
    End correction is upper shadow for green candles and
    lower shadow for red candles.
    If Open == Close then end correction is defined by
    previous candle.
    '''

    len_df = df.shape[0]
    end_corr, end_corr_perc = [0] * len_df, [0] * len_df
    for i in range(0, len_df):
        k = 0
        while k <= i:
            corr = df['CLOSE'][i-k] - df['OPEN'][i-k]
            if corr > 0:
                end_corr[i] = df['HIGH'][i] - df['CLOSE'][i]
                end_corr_perc[i] = (
                    end_corr[i] / (df['CLOSE'][i] - df['OPEN'][i])
                ) if df['OPEN'][i] != df['CLOSE'][i] else 0
                break
            elif corr < 0:
                end_corr[i] = df['CLOSE'][i] - df['LOW'][i]
                end_corr_perc[i] = (
                    end_corr[i] / (df['OPEN'][i] - df['CLOSE'][i])
                ) if df['OPEN'][i] != df['CLOSE'][i] else 0
                break
            else:
                k += 1

    df['END_CORRECTION'] = end_corr
    df['END_CORRECTION_PERC'] = end_corr_perc
    return df

def calc_active_impulses(df: pd.DataFrame) -> pd.DataFrame:
    '''
    Function calculates active impulses and adds data to dataframe.
    Upgoing active impulse:
        - Candle[1] Open >= Candle[0] Open
        - Candle[1] Close >= Candle[0] Close
        - Candle[1] Close > Candle[1] Open
        - Candle[1] Body (Close - Open) >= Candle[0] Body]
    Downgoing active impulse:
        - Candle[1] Open <= Candle[0] Open
        - Candle[1] Close < Candle[0] Close
        - Candle[1] Close < Candle[1] Open
        - Candle[1] Body (Open - Close) >= Candle[0] Body
    Common features for both directions:
        - Candle[1] End Correction Percent < Candle[0] End Correction Percent
        - If End Correction Percent of both candles is equal: Candle[1] End Correction < Candle[0] End Correction
        - Candle[1] End Correction <= Candle[1] Body * 0.2
        - Candle[1] Body >= Candle[1] Amplitude * 0.7
    '''

    len_df = df.shape[0]
    res = [0] * len_df

    for i in range(1, len_df):
        # common variables
        amplitude_i = math.abs(df['HIGH'][i] - df['LOW'][i])
        body_i = math.abs(df['CLOSE'][i] - df['OPEN'][i])
        amplitude_prev = math.abs(df['HIGH'][i-1] - df['LOW'][i-1])
        # upgoing active impulse
        if df['CLOSE'][i] > df['OPEN'][i] and df['CLOSE'][i-1] > df['OPEN'][i-1]:
            has_higher_activity = (
                df['OPEN'][i] >= df['OPEN'][i-1] and
                df['CLOSE'][i] > df['CLOSE'][i-1] and
                df['CLOSE'][i] - df['OPEN'][i] > df['CLOSE'][i-1] - df['OPEN'][i-1]
            )

            end_corr_i = df['HIGH'][i] - df['CLOSE'][i]
            end_corr_prev = df['HIGH'][i-1] - df['CLOSE'][i-1]
            end_corr_i_perc = end_corr_i / amplitude_i
            end_corr_prev_perc = end_corr_prev / amplitude_prev

        elif df['CLOSE'][i] < df['OPEN'][i] and df['CLOSE'][i-1] < df['OPEN'][i-1]:
            has_higher_activity = (
                df['OPEN'][i] <= df['OPEN'][i-1] and
                df['CLOSE'][i] < df['CLOSE'][i-1] and
                df['CLOSE'][i] - df['OPEN'][i] > df['CLOSE'][i-1] - df['OPEN'][i-1]
            )

            end_corr_i = df['CLOSE'][i] - df['LOW'][i]
            end_corr_prev = df['CLOSE'][i-1] - df['LOW'][i-1]
            end_corr_i_perc = end_corr_i / amplitude_i
            end_corr_prev_perc = end_corr_prev / amplitude_prev

        has_lower_correction = (
                (end_corr_i < end_corr_prev)
                if end_corr_i_perc == end_corr_prev_perc else
                (end_corr_i_perc < end_corr_prev_perc)
            )

        is_low_end_correction = end_corr_i <= body_i * 0.2
        is_big_body = body_i >= amplitude_i * 0.7

        if (
            has_higher_activity and
            has_lower_correction and
            is_low_end_correction and
            is_big_body
        ):
            res[i] = 1

    df['IS_ACTIVE_IMPULSE'] = res
    return df


In [None]:
d1 = prepare_columns(df)
d1.head()

In [None]:
d1["CANDLE_COLOR"] = np.where(d1["CLOSE"] > d1["OPEN"], "green", np.where(d1["CLOSE"] < d1["OPEN"], "red", None))
# d1["CANDLE_COLOR"] = np.where(d1["CLOSE"] < d1["OPEN"], "red", None)
d1 = d1.ffill()

In [None]:
d1

In [None]:
df

In [None]:
df = add_pivot(df)
df = add_movings(df)
df = add_fractals(df)
df = calc_end_correction(df)

In [None]:
df.shape

In [None]:
df.head(10)

In [None]:
df['END_CORRECTION'].value_counts()