In [1]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from lib.zigzag import zig_zag
import lib.bars as bars

In [112]:
data = pd.read_csv("..\..\Datasets\WDO@_M5-with-zz.csv")
# columns = ['time', 'open', 'high', 'low', 'close']
columns = ['time', 'open', 'high', 'low', 'close', 'zz', 'highs', 'lows']
data = data.loc[:, columns]
# data['time']=pd.to_datetime(data['time'], unit='ms')
data.tail()

Unnamed: 0,time,open,high,low,close,zz,highs,lows
96519,2021-04-05 17:35:00,5677.5,5685.0,5677.5,5685.0,,,
96520,2021-04-05 17:40:00,5685.0,5685.5,5682.5,5684.0,,,
96521,2021-04-05 17:45:00,5684.5,5686.0,5676.5,5679.5,5686.0,5686.0,
96522,2021-04-05 17:50:00,5679.0,5680.5,5676.0,5677.0,5676.0,,5676.0
96523,2021-04-05 17:55:00,5676.5,5679.0,5672.0,5672.0,,,


In [71]:
# parsed_ds=data.loc[(data['time']>='2021-03-19 8:00:00')&(data['time']<='2021-03-19 19:00:00')]
parsed_ds=data.loc[(data['time']>='2021-03-19 12:55:00')&(data['time']<='2021-03-22 19:00:00')]
parsed_ds.reset_index(inplace=True, drop=True)
parsed_ds.head()

Unnamed: 0,time,open,high,low,close,zz,highs,lows,leg_start
0,2021-03-19 12:55:00,5478.5,5482.0,5470.0,5478.0,5482.0,5482.0,,27510.0
1,2021-03-19 13:00:00,5477.5,5478.5,5470.0,5473.5,,,,
2,2021-03-19 13:05:00,5473.5,5476.5,5468.0,5468.0,,,,
3,2021-03-19 13:10:00,5468.5,5469.5,5464.5,5465.0,,,,
4,2021-03-19 13:15:00,5464.5,5466.0,5458.5,5462.5,,,5458.5,


In [11]:
zz, lows, highs = zig_zag(data,3)

In [5]:
parsed_ds=parsed_ds.assign(zz=zz,lows=lows,highs=highs)

In [27]:
ds=data.loc[(data['time']>='2021-03-19 09:00:00')&(data['time']<='2021-03-23 19:00:00')]

In [80]:
fig = go.Figure(data=[go.Candlestick(x=parsed_ds['time'],
                open=parsed_ds['open'],
                high=parsed_ds['high'],
                low=parsed_ds['low'],
                close=parsed_ds['close'])])

fig.add_trace(go.Scatter(
    x=parsed_ds.loc[(parsed_ds['zz'].notnull())]['time'],
    y=parsed_ds.loc[(parsed_ds['zz'].notnull())]['zz']
))


fig.update_xaxes(
    rangebreaks=[
        dict(bounds=["sat", "mon"]), #hide weekends
        dict(bounds=[19, 9], pattern="hour"), #hide hours outside of 9am-6pm
    ]
)
fig.update_layout(xaxis_rangeslider_visible=False)

fig.show()

In [127]:
'''
Cria uma Series com o inicio de cada perna
contando que cada perna começa na primeira
barra com direção igual ao ZigZag ou doji.

A perna na coluna leg_start pode 
terminar uma linha antes exluindo a ultima 
barra da perna. Logo a maneira correta de 
achar a perna completa é verificar da primeira
barra da perna até a próxima não nula ocorrencia
da coluna "zz".

É preciso saber o começo e o fim da perna para
extrair a features
'''


def zz_direction(row):
    if row['zz']!=np.nan:
        if row["zz"]==row["highs"] :
            return -1
        else:
            return 1
    return 0

# Versão lenta:
# def leg_start(_parsed_ds):
#     leg_start=np.full(_data.shape[0], np.nan)
#     leg_counter=0
#     for idx in range(_data.shape[0]):
#         if not np.isnan(_data.iloc[idx]["zz"]):
#             zz_direc=zz_direction(_data.iloc[idx])
#             found_start_bar=False
#             forward_idx=0
#             while not found_start_bar and idx+forward_idx<_data.shape[0]:
#                 bar_dir = bars.bar_direction(_data.iloc[idx+forward_idx])
#                 if zz_direc == bar_dir or \
#                     0 == bar_dir:
#                     leg_start[idx+forward_idx]=leg_counter
#                     leg_counter+=1
#                     found_start_bar=True
#                 forward_idx+=1
#     return leg_start

# def leg_start(_data):
#     leg_start=np.full(_data.shape[0], np.nan)
#     leg_start_direction=0
#     leg_counter=0
#     for idx, row in _data.iterrows():
#         bar_dir=bars.bar_direction(row)
#         if not np.isnan(row['zz']):
#             leg_start_direction=zz_direction(row)
#         if leg_start_direction!=0 and (bar_dir==leg_start_direction or bar_dir==0):
#             leg_start[idx]=leg_counter
#             leg_counter+=1
#             leg_start_direction=0
#     return leg_start

def leg_start2(row):
    leg_start_direction=0
    leg_counter=0
    bar_dir=bars.bar_direction(row)
    if not np.isnan(row['zz']):
        leg_start_direction=zz_direction(row)
    if leg_start_direction!=0 and (bar_dir==leg_start_direction or bar_dir==0):
        leg_counter+=1
        leg_start_direction=0
        return leg_counter

class leg_counter():
    leg_start_direction=0
    leg_counter=0

    @classmethod
    def leg_start(cls,row):
        bar_dir=bars.bar_direction(row)
        if not np.isnan(row['zz']):
            cls.leg_start_direction=zz_direction(row)
        if cls.leg_start_direction!=0 and (bar_dir==cls.leg_start_direction or bar_dir==0):
            cls.leg_counter+=1
            cls.leg_start_direction=0
            return cls.leg_counter
        return np.nan


parsed_ds['leg_start']=parsed_ds.apply(lambda row: leg_counter.leg_start(row), axis=1)
# parsed_ds=parsed_ds.assign(leg_start=leg_start2(parsed_ds))
# parsed_ds.head(30)

In [42]:
bars.doji(parsed_ds.iloc[23])

True

In [129]:
# Verificar se o idx está no começo da perna
# Passar um loop até o final
# O final é o primeiro não nulo após o começo da perna
# Atualizar a variavel começo_perna para o final_da_perna 
# leg_start_idx+leg_end_idx

# Versão Lenta:
# def iter_legs(_data, feature_fn=None):
#     leg_start_idx=0
#     while leg_start_idx < _data.shape[0]-1:
#         if np.isnan(_data.iloc[leg_start_idx]['leg_start']):
#             leg_start_idx+=1
#             continue
#         else:
#             for leg_end_idx in range(1,_data.shape[0]):
#                 if not np.isnan(_data.iloc[leg_start_idx+leg_end_idx]['zz']):
#                     if leg_end_idx>0:
#                         leg_start_idx+=leg_end_idx
#                     else:
#                         leg_start_idx+=1
#                     break

'''
Toda perna termina antes da proxima começar.
O if de achar o começo vem por ultimo porque
se ele achar o começo da perna a procura do 
final dela tem que vir com o leg_start_idx+1
'''

'''
A feature_fn sempre retorna um array do tamanho
do numero de pernas
'''
def iter_legs(_data, feature):
    feature_arr=[]
    leg_start_idx=None
    for idx, row in _data.iterrows():
        if leg_start_idx!=None:
            feature.run(leg_start_idx,idx)
            
        if leg_start_idx!=None and not np.isnan(row['zz']):
            leg_start_idx=None
            feature_arr.append(feature.result())


        if not np.isnan(row['leg_start']):
            leg_start_idx=idx
    return feature_arr


print("arr: ", iter_legs(parsed_ds, bar_countage()))



arr:  [8, 13, 6, 2, 3, 3, 4, 6, 7, 3, 7, 3, 4, 4, 8, 6, 6, 9, 3, 6, 8, 4, 7, 7, 5, 6, 5, 6, 4, 13, 9]


In [132]:
# leg=leg()
iahiuhd=parsed_ds.apply(lambda row: leg.loop(row), axis=1)

leg_start_idx:  0 end 7
leg_start_idx:  7 end 19
leg_start_idx:  20 end 25
leg_start_idx:  25 end 26
leg_start_idx:  26 end 28
leg_start_idx:  29 end 31
leg_start_idx:  32 end 35
leg_start_idx:  36 end 41
leg_start_idx:  42 end 48
leg_start_idx:  49 end 51
leg_start_idx:  51 end 57
leg_start_idx:  57 end 59
leg_start_idx:  59 end 62
leg_start_idx:  62 end 65
leg_start_idx:  65 end 72
leg_start_idx:  72 end 77
leg_start_idx:  77 end 82
leg_start_idx:  82 end 90
leg_start_idx:  90 end 92
leg_start_idx:  92 end 97
leg_start_idx:  98 end 105
leg_start_idx:  105 end 108
leg_start_idx:  109 end 115
leg_start_idx:  116 end 122
leg_start_idx:  123 end 127
leg_start_idx:  128 end 133
leg_start_idx:  134 end 138
leg_start_idx:  140 end 145
leg_start_idx:  145 end 148
leg_start_idx:  148 end 160
leg_start_idx:  160 end 168


In [131]:
'''
FEATURE: Contagem de barras
Diferença entre leg_sart_idx and leg_end_idx
'''

class bar_countage():

    def run(self,start,end):
        self.countage=abs(end-start)+1
    
    def result(self):
        return self.countage

class directional_bar_count():
    pass

class leg():
    leg_start_idx=None

    @classmethod
    def loop(cls, row):
        if cls.leg_end(row):
            print('leg_start_idx: ', leg.leg_start_idx, 'end', row.name)
            leg.leg_start_idx=None
        if cls.leg_start(row):
            leg.leg_start_idx=row.name

    def leg_start(row):
        return not np.isnan(row['leg_start'])

    def leg_end(row):
        return leg.leg_start_idx!=None and not np.isnan(row['zz'])

In [42]:
fig = go.Figure(data=[go.Candlestick(x=parsed_ds['time'],
                open=parsed_ds['open'],
                high=parsed_ds['high'],
                low=parsed_ds['low'],
                close=parsed_ds['close'])])

# fig.add_trace(go.Scatter(
#     x=data.loc[(data['zz'].notnull())]['time'],
#     y=data.loc[(data['zz'].notnull())]['zz']
# ))

fig.update_xaxes(
    rangebreaks=[
        dict(bounds=["sat", "mon"]), #hide weekends
        dict(bounds=[18, 9], pattern="hour"), #hide hours outside of 9am-6pm
    ]
)

fig.update_layout(xaxis_rangeslider_visible=False)

fig.show()

In [6]:
# Perna total expandida comparada com
# a perna como ela aparece no grafico.
# Relação continua de sobreposição entre 
# as barras.

contracted_leg = parsed_ds.iloc[parsed_ds.shape[0]-1]['high']-parsed_ds.iloc[0]['low']
expanded_leg=0
for idx in range(parsed_ds.shape[0]):
    expanded_leg+=parsed_ds.iloc[idx]['high']-parsed_ds.iloc[idx]['low']

print('expanded_leg: {}, contracted_leg: {}'.format(expanded_leg,contracted_leg))

expanded_leg: 93.0, contracted_leg: 32.5


In [7]:
leg_direction = -1

In [8]:
'''
Bars that trade below 50% of the bar before
'''

overlap_count=0
for idx in range(data.shape[0]):
    if idx <= 1:
        continue
    fifty=data.iloc[idx-1]['low']+(data.iloc[idx-1]['high']-data.iloc[idx-1]['low'])/2
    if leg_direction > 0 and fifty > data.iloc[idx]['low']:
        overlap_count+=1
    if leg_direction < 0 and fifty < data.iloc[idx]['high']:
        overlap_count+=1
    
    # print(idx)


In [18]:
def between(value, bar, body=True):
    direction = bars.bar_direction(bar)
    if body:
        if direction>0:
            if bar['close']>value and bar['open'] < value:
                return True
        if direction<0:
            if bar['close']<value and bar['open'] > value:
                return True
    else:
        if bar['high']>value and bar['low'] < value:
            return True
    return False

def inside_bar(outside, inside, inside_body=False):
    if not inside_body:
        if outside['high']>=inside['high'] and outside['low']<=inside['low']:
            return True
    else:
        direction=bars.bar_direction(inside)
        if direction>0:
            if outside['high']>=inside['close'] and outside['low']<=inside['open']:
                return True
        if direction<0:
            if outside['high']>=inside['open'] and outside['low']<=inside['close']:
                return True
    return False

In [57]:
inside_bar(parsed_ds.iloc[6],parsed_ds.iloc[7]) or \
inside_bar(parsed_ds.iloc[6],parsed_ds.iloc[7], inside_body=True) or \
between(parsed_ds.iloc[6]['close'],parsed_ds.iloc[7]) or \
between(parsed_ds.iloc[6]['open'],parsed_ds.iloc[7])

True

In [63]:
'''
Bars that overlap with the body of bars before
- The first bar of the range does not count
- The body overlaps with two bars before or more
'''

overlaping_count=bar_before=last_overlaping=0
range_starts=[]
while bar_before<parsed_ds.shape[0]-2:
    for overlap_bar in range(bar_before+2, parsed_ds.shape[0]):
        doji=bars.doji(parsed_ds.iloc[bar_before])
        if doji:
            overlap_flag = inside_bar(parsed_ds.iloc[bar_before],parsed_ds.iloc[overlap_bar]) or \
                        inside_bar(parsed_ds.iloc[bar_before],parsed_ds.iloc[overlap_bar], inside_body=True) or \
                        between(parsed_ds.iloc[bar_before]['high'],parsed_ds.iloc[overlap_bar]) or \
                        between(parsed_ds.iloc[bar_before]['low'],parsed_ds.iloc[overlap_bar])
            if overlap_bar == bar_before+2:
                if not overlap_flag:
                    break
            if overlap_flag:
                overlaping_count+=1
                last_overlaping=overlap_bar
                # print('bar_before: ', bar_before)
        if not doji:
            overlap_flag=inside_bar(parsed_ds.iloc[bar_before],parsed_ds.iloc[overlap_bar]) or \
                    inside_bar(parsed_ds.iloc[bar_before],parsed_ds.iloc[overlap_bar], inside_body=True) or \
                    between(parsed_ds.iloc[bar_before]['close'],parsed_ds.iloc[overlap_bar]) or \
                    between(parsed_ds.iloc[bar_before]['open'],parsed_ds.iloc[overlap_bar])
            if overlap_bar == bar_before+2:
                if not overlap_flag:
                    break
            if overlap_flag:
                overlaping_count+=1
                last_overlaping=overlap_bar

    if last_overlaping>bar_before:
        # print('last_overlaping: {} bar_before: {}'.format(last_overlaping,bar_before))
        range_starts.append(bar_before)
        bar_before=last_overlaping
    else:
        bar_before+=1
        # print('bar_before: ', bar_before)

overlaping_count+=len(range_starts)
print('overlaping_count: ', overlaping_count)
print('range_starts: ', range_starts)

last_overlaping: 4 bar_before: 0
last_overlaping: 10 bar_before: 6
overlaping_count:  8
range_starts:  [0, 6]
