In [None]:
import numpy as np
import pandas as pd

from tqdm import tqdm

import matplotlib.pyplot as plt
import seaborn as sns

plt.style.use('dark_background')

In [None]:
df = pd.read_csv(r"//home/poggerez/codebase/python/stock-analysis/sensex-01041979-31122023d.csv").drop("Unnamed: 5", axis=1) \
        .assign(Date = lambda k: pd.to_datetime(k.Date).dt.strftime("%Y-%m-%d")).rename_axis("data-point") \
        .set_axis(["date", "open", "high", "low", "close"], axis=1)

In [None]:
def find_extrema(
    x: pd.Series,
    window: int = 21,
    pct_change: float = 0.05,
) -> pd.DataFrame:
    """
    Returns a dataframe of peaks and troughs, with columns "extrema" and "price".\n
    Column `extrema` may have values +/- 1 to indicate peak and troughs.\n
    Column `price` contains the input parameter `x`

    Args:
        x (pd.Series): time series data of a stock price.
        window (int, optional): centered window to consider to estimate local extrema. Defaults to 15.

    Returns:
        _type_: pd.DataFrame
    """

    y = x.rolling(window, center=True, closed="both", min_periods=1) \
         .agg(["max", "min"]).eq(x, axis=0).astype(int) \
         .assign(extrema = lambda k: k["max"] - k["min"]) \
         .assign(price = x) \
         .query("extrema != 0")[["extrema", "price"]]

    # removing cases where there are consecutive peaks and troughs
    idx = 0
    while idx < y.shape[0]-1:

        # if not consecutive, continue with the loop
        if y.iloc[idx, 0] != y.iloc[idx+1, 0]:
            idx += 1

        else:
            # if consecutive peaks, drop the lower peak
            if y.iloc[idx, 0] == 1:
                if y.iloc[idx, 1] < y.iloc[idx+1, 1]:
                    y.drop(index=y.iloc[idx].name, inplace=True)
                else: y.drop(index=y.iloc[idx+1].name, inplace=True)

            # if consecutive troughs, discard the higher trough
            elif y.iloc[idx, 0] == -1:
                if y.iloc[idx, 1] > y.iloc[idx+1, 1]:
                    y.drop(index=y.iloc[idx].name, inplace=True)
                else: y.drop(index=y.iloc[idx+1].name, inplace=True)

    # setting min price diff of `pct_change` to consider the following extrema
    y["pct_change"] = y["price"].diff(-1).divide(y["price"]).round(6).abs()
    
    while y["pct_change"].min() < pct_change:

        idx = 0

        while idx < y.shape[0]-1:

            if y.iloc[idx, 2] < pct_change:
                y.drop(index=y.iloc[[idx, idx+1]].index, inplace=True)

            else: idx += 1

        y["pct_change"] = y["price"].diff(-1).divide(y["price"]).round(6).abs()

    return y

find_extrema(df["close"])

In [None]:
""" Note we can have two consecutive troughs or peaks """

df_plot = df.query("date.str.startswith(@year_plot)")
df_extreme = df_plot.query("peak_local == True | trough_local == True")

plt.plot(df_plot["date"], df_plot["close"], color="#008686")
plt.plot(df_extreme["date"], df_extreme["close"], color="#860086")

plt.show()