In [None]:
# !pip install binance-historical-data mplfinance pandera

In [None]:
import datetime as dt

import mplfinance as mpf
import pandas as pd
import numpy as np
import pandera as pa
import matplotlib.pyplot as plt

import shared

In [None]:
logger = shared.get_logger()

In [None]:
shared.dump_binance_klines(tickers=['BTCUSDT'])

In [None]:
btc_df = shared.read_binance_ticker_data()

In [None]:
interval_secs_map = {
    '1m': 60,
    '5m': 5*60,
    '15m': 15*60,
    '1h': 60*60,
    '4h': 4*60*60,
    '1d': 24*60*60,
    '1w': 7*24*60*60,
}
mc = mpf.make_marketcolors(up='#459782',down='#df484c', edge='inherit', wick='inherit')
s  = mpf.make_mpf_style(marketcolors=mc, facecolor='#181b25')

In [None]:
def split_df(df: pd.DataFrame, interval_str: str) -> pd.DataFrame:
    interval_millis = interval_secs_map[interval_str] * 1000
    prev_millis, prev_i = df.iloc[0].open_time, 0
    out_dfs = []
    for i in range(len(df)):
        if df.iloc[i].open_time < prev_millis+interval_millis:
            continue
        slice_df = df.iloc[prev_i:i].assign(open_time=lambda _: prev_millis)
        out_dfs.append(slice_df)
        prev_millis, prev_i = df.iloc[i].open_time, i
    logger.info('len(out_dfs): %s', len(out_dfs))
    out_df = (pd.concat(out_dfs)
        .groupby(by=['open_time'], as_index=False)
        .agg(
            open=('open', 'first'),
            close=('open', 'last'),
            low=('low', 'min'),
            high=('high', 'max'),
        )
        .assign(date=lambda x: x.open_time.apply(lambda x: dt.datetime.fromtimestamp(x/1000)))
        .set_index('date')
    )
    return out_df


def find_double_tops_from_to_linear(df: pd.DataFrame) -> list[list[tuple[str, int]]]:
    """Figures out dounble top based on prev ATH without digging into every candle.
    
    Examples:
    alines_map = find_double_tops_from_to_linear(df=slice_df)
    """
    rows = list(df.iterrows())
    out = {}
    i = 0
    while i<len(rows):
        j=i
        while j<len(rows) and rows[i][1].high >= rows[j][1].high:
            j+=1
        if j == len(rows):
            break
        key = rows[i][1].open_time
        x = (rows[i][0], rows[i][1].high)
        y = (rows[j][0], rows[i][1].high)
        out[key] = [x, y]
        i = j
    return out

In [None]:
btc_15m_df = split_df(df=btc_df, interval_str='15m')
btc_1h_df = split_df(df=btc_df, interval_str='1h')

In [None]:
mpf.plot(btc_15m_df.iloc[0:50], type='candle', figsize=(14, 4), style=s)

In [None]:
mpf.plot(btc_1h_df, type='candle', warn_too_much_data=len(btc_1h_df), figsize=(200, 200/3.5), style=s)

In [None]:
class DoubleTopSchema(pa.DataFrameModel):
    date: pa.typing.Index[dt.datetime]
    open_time: pa.typing.Series[int]
    open: pa.typing.Series[float]
    close: pa.typing.Series[float]
    low: pa.typing.Series[float]
    high: pa.typing.Series[float]


@pa.check_types
def find_double_tops_from_to_lth(
    df: pa.typing.DataFrame[DoubleTopSchema],
    gain_threshold: float,
    interval_threshold: str,
) -> list[list[tuple[str, int]]]:
    rows = list(df.iterrows())
    out = {}
    for i in range(len(rows)):
        j=i+1
        while j<len(rows) and rows[i][1].high >= rows[j][1].high:
            j+=1
        if j == len(rows):
            continue
        key = rows[i][1].open_time
        x = (rows[i][0], rows[i][1].close)
        y = (rows[j][0], rows[i][1].close)
        max_gain = (rows[j][1].high - x[1]) / x[1]
        if max_gain < gain_threshold:
            continue
        interval = rows[j][1].open_time - rows[i][1].open_time
        if interval > interval_secs_map[interval_threshold]*1000:
            continue
        out[key] = [x, y]
    return out


slice_df = btc_15m_df
alines_map = find_double_tops_from_to_lth(
    df=slice_df,
    gain_threshold=0.003,
    interval_threshold='1h',
)
fig, _ = mpf.plot(
    slice_df,
    type='candle',
    warn_too_much_data=len(slice_df),
    figsize=(14*15, 4*15),
    alines={'alines': list(alines_map.values()), 'colors': ['#f19d38']},
    style=s,
    returnfig=True,
)