# Channel Break Out Detection

## Import Data

In [1]:
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from scipy import stats

In [5]:
df = pd.read_csv("BTCUSD.csv")
df

Unnamed: 0,time,open,high,low,close,tick_volume,MA_daily,MA_weekly,RSI,BB_High,...,BBW,MACD,RSI_signal,BB_signal,MACD_signal,MA_signal,Engulfing,Consolidation,Breakout,ATR
0,2017-03-28 03:00:00,1041.319,1041.320,1041.220,1041.222,68,,,,,...,,,0,0,0,0,0,0,0,
1,2017-03-28 04:00:00,1041.222,1042.778,1041.220,1042.000,185,,,,,...,,,0,0,0,0,0,0,0,
2,2017-03-28 05:00:00,1043.000,1043.000,1042.007,1042.007,32,,,,,...,,,0,0,0,0,0,0,0,
3,2017-03-28 06:00:00,1042.989,1042.989,1042.000,1042.003,50,,,,,...,,,0,0,0,0,0,0,0,
4,2017-03-28 07:00:00,1043.000,1043.000,1041.504,1042.007,162,,,,,...,,,0,0,0,0,0,0,0,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
53629,2023-07-08 09:00:00,30260.738,30271.838,30209.738,30235.738,820,30293.99028,30564.08469,49.037318,30394.861410,...,260.166820,-10.482079,0,0,-1,-1,0,0,0,80.315071
53630,2023-07-08 10:00:00,30235.738,30240.938,30204.838,30209.338,573,30271.69028,30561.89719,47.285800,30373.158793,...,235.261587,-11.954919,0,0,-1,-1,0,0,0,76.079357
53631,2023-07-08 11:00:00,30209.338,30216.738,30141.638,30182.738,1043,30254.08428,30559.14519,45.521522,30354.172892,...,216.439784,-15.094552,0,0,-1,-1,0,0,0,75.150786
53632,2023-07-08 12:00:00,30184.038,30198.738,30131.338,30153.538,950,30232.73628,30555.57669,43.598460,30342.690476,...,212.224952,-19.711706,0,0,-1,-1,0,0,0,75.443643


## Detect Pivots/Fractals

In [3]:
def isPivot(candle, window):
    """
    This function detects if a candle is a fractal point
    args: candle index, window before and after candle to test if pivot
    returns: 1 if pivot high, 2 if pivot low, 3 if both and 0 default
    """
    if candle-window < 0 or candle+window >= len(df):
        return 0
    
    pivotHigh = 1
    pivotLow = 2
    for i in range(candle-window, candle+window+1):
        if df.iloc[candle].Low > df.iloc[i].Low:
            pivotLow=0
        if df.iloc[candle].High < df.iloc[i].High:
            pivotHigh=0
    if (pivotHigh and pivotLow):
        return 3
    elif pivotHigh:
        return pivotHigh
    elif pivotLow:
        return pivotLow
    else:
        return 0
    

In [4]:
window=3
df['isPivot'] = df.apply(lambda x: isPivot(x.name,window), axis=1)

AttributeError: 'Series' object has no attribute 'Low'

In [6]:
def pointpos(x):
    if x['isPivot']==2:
        return x['Low']-1e-3
    elif x['isPivot']==1:
        return x['High']+1e-3
    else:
        return np.nan
df['pointpos'] = df.apply(lambda row: pointpos(row), axis=1)

KeyError: 'isPivot'

In [None]:
dfpl = df[0:100]
fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
                open=dfpl['Open'],
                high=dfpl['High'],
                low=dfpl['Low'],
                close=dfpl['Close'])])

fig.add_scatter(x=dfpl.index, y=dfpl['pointpos'], mode="markers",
                marker=dict(size=5, color="MediumPurple"),
                name="pivot")
#fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()

## Detect Price Channel

In [None]:
def collect_channel(candle, backcandles, window):
    localdf = df[candle-backcandles-window:candle-window]
    localdf['isPivot'] = localdf.apply(lambda x: isPivot(x.name,window), axis=1)
    highs = localdf[localdf['isPivot']==1].High.values
    idxhighs = localdf[localdf['isPivot']==1].High.index
    lows = localdf[localdf['isPivot']==2].Low.values
    idxlows = localdf[localdf['isPivot']==2].Low.index
    
    if len(lows)>=2 and len(highs)>=2:
        sl_lows, interc_lows, r_value_l, _, _ = stats.linregress(idxlows,lows)
        sl_highs, interc_highs, r_value_h, _, _ = stats.linregress(idxhighs,highs)
    
        return(sl_lows, interc_lows, sl_highs, interc_highs, r_value_l**2, r_value_h**2)
    else:
        return(0,0,0,0,0,0)
    

In [None]:
candle = 75
backcandles = 40
window = 3

#dfpl = df[candle-backcandles-window-5:candle+200]

fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
                open=dfpl['Open'],
                high=dfpl['High'],
                low=dfpl['Low'],
                close=dfpl['Close'])])

fig.add_scatter(x=dfpl.index, y=dfpl['pointpos'], mode="markers",
                marker=dict(size=5, color="MediumPurple"),
                name="pivot")

sl_lows, interc_lows, sl_highs, interc_highs, r_sq_l, r_sq_h = collect_channel(candle, backcandles, window)
print(r_sq_l, r_sq_h)
x = np.array(range(candle-backcandles-window, candle+1))
fig.add_trace(go.Scatter(x=x, y=sl_lows*x + interc_lows, mode='lines', name='lower slope'))
fig.add_trace(go.Scatter(x=x, y=sl_highs*x + interc_highs, mode='lines', name='max slope'))
#fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()

## Detect Break Out

In [None]:
def isBreakOut(candle, backcandles, window):
    if (candle-backcandles-window)<0:
        return 0
    
    sl_lows, interc_lows, sl_highs, interc_highs, r_sq_l, r_sq_h = collect_channel(candle, 
                                                                                   backcandles, 
                                                                                   window)
    
    prev_idx = candle-1
    prev_high = df.iloc[candle-1].High
    prev_low = df.iloc[candle-1].Low
    prev_close = df.iloc[candle-1].Close
    
    curr_idx = candle
    curr_high = df.iloc[candle].High
    curr_low = df.iloc[candle].Low
    curr_close = df.iloc[candle].Close
    curr_open = df.iloc[candle].Open

    if ( prev_high > (sl_lows*prev_idx + interc_lows) and
        prev_close < (sl_lows*prev_idx + interc_lows) and
        curr_open < (sl_lows*curr_idx + interc_lows) and
        curr_close < (sl_lows*prev_idx + interc_lows)): #and r_sq_l > 0.9
        return 1
    
    elif ( prev_low < (sl_highs*prev_idx + interc_highs) and
        prev_close > (sl_highs*prev_idx + interc_highs) and
        curr_open > (sl_highs*curr_idx + interc_highs) and
        curr_close > (sl_highs*prev_idx + interc_highs)): #and r_sq_h > 0.9
        return 2
    
    else:
        return 0

In [None]:
def breakpointpos(x):
    if x['isBreakOut']==2:
        return x['Low']-3e-3
    elif x['isBreakOut']==1:
        return x['High']+3e-3
    else:
        return np.nan

candle = 75
backcandles = 40
window = 3
dfpl = df[candle-backcandles-window-5:candle+20]
dfpl["isBreakOut"] = [isBreakOut(candle, backcandles, window) for candle in dfpl.index]
dfpl['breakpointpos'] = dfpl.apply(lambda row: breakpointpos(row), axis=1)

In [None]:
candle = 59

fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
                open=dfpl['Open'],
                high=dfpl['High'],
                low=dfpl['Low'],
                close=dfpl['Close'])])

fig.add_scatter(x=dfpl.index, y=dfpl['pointpos'], mode="markers",
                marker=dict(size=5, color="MediumPurple"),
                name="pivot")

fig.add_scatter(x=dfpl.index, y=dfpl['breakpointpos'], mode="markers",
                marker=dict(size=8, color="Black"), marker_symbol="hexagram",
                name="pivot")

sl_lows, interc_lows, sl_highs, interc_highs, r_sq_l, r_sq_h = collect_channel(candle, backcandles, window)
print(r_sq_l, r_sq_h)
x = np.array(range(candle-backcandles-window, candle+1))
fig.add_trace(go.Scatter(x=x, y=sl_lows*x + interc_lows, mode='lines', name='lower slope'))
fig.add_trace(go.Scatter(x=x, y=sl_highs*x + interc_highs, mode='lines', name='max slope'))
#fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()