In [85]:
import json

import talib as ta
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.shell import spark
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.session import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
from pyspark.sql.functions import col

# OHLCV data
input_file = open('../../data/futures/ADA_USDT-1h-futures.json')
json_array = json.load(input_file)

In [86]:
# data frame "df"
df = pd.DataFrame(json_array)
df.columns = ['Time', 'Open', 'High', 'Low', 'Close', 'Volume']
df.reset_index(drop=True, inplace=True)

In [87]:
# Ema 89 strategy
df['EMA89High']=ta.EMA(df.High, timeperiod=89)
df['EMA89Low']=ta.EMA(df.Low, timeperiod=89)

df['SMA5'] = ta.SMA(df.Close, timeperiod=5)
df['SMA10'] = ta.SMA(df.Close, timeperiod=10)
df['SMA20'] = ta.SMA(df.Close, timeperiod=20)
df['SMA50'] = ta.SMA(df.Close, timeperiod=50)
df['SMA100'] = ta.SMA(df.Close, timeperiod=100)
df['SMA200'] = ta.SMA(df.Close, timeperiod=200)

df['EMA200']=ta.EMA(df.Close, timeperiod=200)
df['EMA150']=ta.EMA(df.Close, timeperiod=150)
df['EMA100']=ta.EMA(df.Close, timeperiod=100)
df['EMA50']=ta.EMA(df.Close, timeperiod=50)

df['EMA32']=ta.EMA(df.Close, timeperiod=32)
df['EMA21']=ta.EMA(df.Close, timeperiod=21)
df['EMA13']=ta.EMA(df.Close, timeperiod=13)

df['RSI']=ta.RSI(df.Close, timeperiod=14)


# patterns
df['CDL_MARUBOZU'] = ta.CDLMARUBOZU(df.Open, df.High, df.Low, df.Close)
df['CDL_DOJI'] = ta.CDLDOJI(df.Open, df.High, df.Low, df.Close)
df['CDL_ENGULFING'] = ta.CDLENGULFING(df.Open, df.High, df.Low, df.Close)
df['CDL_EVENINGSTAR']  = ta.CDLEVENINGSTAR(df.Open, df.High, df.Low, df.Close)
df['CDL_EVENINGSTAR'] = ta.CDLEVENINGSTAR(df.Open, df.High, df.Low, df.Close)


# MACD
MACD_FAST_EMA = 12
MACD_SLOW_EMA = 26
MACD_SIGNAL_PERIOD = 9
df['MACD'],df['MACD_SIGNAL'],df['MACD_HIST'] = ta.MACD(df.Close, fastperiod=MACD_FAST_EMA, slowperiod=MACD_SLOW_EMA, signalperiod=MACD_SIGNAL_PERIOD)

# BB
df['SMA_BB'] = df.Close.rolling(window=20).mean()

# upper and lower bollinger bands: SMA +/- 2 * standard deviation
df['STD_DEV'] = df.Close.rolling(window=20).std()
df['UPPER_BB'] = df['SMA_BB'] + (2 * df['STD_DEV'])
df['LOWER_BB'] = df['SMA_BB'] - (2 * df['STD_DEV'])

# volume
df['OBV'] = ta.OBV(df.Close, df.Volume)

df['Time'] = pd.to_datetime(df['Time'], unit='ms')
# df['VOL1h%'] = (df.Volume.pct_change(periods=1).mul(100))
# df['VOL4h%'] = (df.Volume.pct_change(periods=4).mul(100))
# df['VOL8h%'] = (df.Volume.pct_change(periods=8).mul(100))
# df['VOL12h%'] = (df.Volume.pct_change(periods=12).mul(100))
# df['VOL24h%'] = (df.Volume.pct_change(periods=24).mul(100))

df['1h%'] = (df.Close.pct_change(periods=1).mul(100))
df['4h%'] = (df.Close.pct_change(periods=4).mul(100))
df['8h%'] = (df.Close.pct_change(periods=8).mul(100))
df['12h%'] = (df.Close.pct_change(periods=12).mul(100))
df['24h%'] = (df.Close.pct_change(periods=24).mul(100))

df['NEXT_1h%'] = (df.Close.pct_change(periods=-1).mul(100))
df['NEXT_4h%'] = (df.Close.pct_change(periods=-4).mul(100))
df['NEXT_8h%'] = (df.Close.pct_change(periods=-8).mul(100))
df['NEXT_12h%'] = (df.Close.pct_change(periods=-12).mul(100))
df['NEXT_24h%'] = (df.Close.pct_change(periods=-24).mul(100))

df = df.assign(Long1h = lambda x: (x['NEXT_1h%'] > 2))
df = df.assign(Short1h = lambda x: (x['NEXT_1h%'] < 2))
df = df.assign(Long4h = lambda x: (x['NEXT_4h%'] > 5))
df = df.assign(Short4h = lambda x: (x['NEXT_4h%'] < 5))

# df = df.dropna()
# df=df[df['Volume']!=0]
# df.isna().sum()

# assert that we do not have any nan left
# assert df.isnull().sum().sum() == 0

# print(df.shape[0])
# print('Long1h', df['Long1h'].value_counts()[True])
# print('Short1h', df['Short1h'].value_counts()[True])
# print('Long4h', df['Long4h'].value_counts()[True])
# print('Short4h', df['Short4h'].value_counts()[True])
# print(df.columns[:-1])

In [88]:
# Support and resistances
n1=3
n2=2

def support(df1, l, n1, n2): #n1 n2 before and after candle l
    for i in range(l-n1+1, l+1):
        if(df1.Low[i]>df1.Low[i-1]):
            return 0
    for i in range(l+1,l+n2+1):
        if(df1.Low[i]<df1.Low[i-1]):
            return 0
    return 1

def resistance(df1, l, n1, n2): #n1 n2 before and after candle l
    for i in range(l-n1+1, l+1):
        if(df1.High[i]<df1.High[i-1]):
            return 0
    for i in range(l+1,l+n2+1):
        if(df1.High[i]>df1.High[i-1]):
            return 0
    return 1

df['SUPPORT'] = False
df['RESISTANCE'] = False

for i in range(n1, len(list(df.index))-n2):
    # print(i)
    df.loc[i, "SUPPORT"] = True if support(df, i, n1, n2) else False
    df.loc[i, "RESISTANCE"] = True if resistance(df, i, n1, n2) else False

SUPPORT 250
RESISTANCE 274


Unnamed: 0,Time,Open,High,Low,Close,Volume,EMA89High,EMA89Low,SMA5,SMA10,...,NEXT_4h%,NEXT_8h%,NEXT_12h%,NEXT_24h%,Long1h,Short1h,Long4h,Short4h,SUPPORT,RESISTANCE
7259,2022-10-30 11:00:00,0.4135,0.4156,0.41,0.413,31190329.0,0.406293,0.400604,0.41508,0.41635,...,2.608696,,,,False,True,False,True,False,False
7260,2022-10-30 12:00:00,0.413,0.4143,0.4084,0.4125,28175004.0,0.406471,0.400778,0.41346,0.41589,...,,,,,False,True,False,False,False,False
7261,2022-10-30 13:00:00,0.4124,0.4132,0.4055,0.4072,42844919.0,0.40662,0.400883,0.41184,0.41475,...,,,,,False,True,False,False,False,False
7262,2022-10-30 14:00:00,0.4071,0.4097,0.4053,0.4084,28265306.0,0.406689,0.400981,0.41092,0.41408,...,,,,,False,True,False,False,False,False
7263,2022-10-30 15:00:00,0.4084,0.4092,0.4012,0.4025,39189085.0,0.406744,0.400986,0.40872,0.41257,...,,,,,False,False,False,False,False,False


In [None]:
print('SUPPORT', df['SUPPORT'].value_counts()[True])
print('RESISTANCE', df['RESISTANCE'].value_counts()[True])
df.tail()

In [None]:
# RSI Divergence
def pivotid(df1, l, n1, n2): #n1 n2 before and after candle l
    if l-n1 < 0 or l+n2 >= len(df1):
        return 0
    
    pividlow=1
    pividhigh=1
    for i in range(l-n1, l+n2+1):
        if(df1.low[l]>df1.low[i]):
            pividlow=0
        if(df1.high[l]<df1.high[i]):
            pividhigh=0
    if pividlow and pividhigh:
        return 3
    elif pividlow:
        return 1
    elif pividhigh:
        return 2
    else:
        return 0

def RSIpivotid(df1, l, n1, n2): #n1 n2 before and after candle l
    if l-n1 < 0 or l+n2 >= len(df1):
        return 0

    pividlow=1
    pividhigh=1
    for i in range(l-n1, l+n2+1):
        if(df1.RSI[l]>df1.RSI[i]):
            pividlow=0
        if(df1.RSI[l]<df1.RSI[i]):
            pividhigh=0
    if pividlow and pividhigh:
        return 3
    elif pividlow:
        return 1
    elif pividhigh:
        return 2
    else:
        return 0 


def divsignal(x, nbackcandles):
    backcandles=nbackcandles 
    candleid = int(x.name)

    maxim = np.array([])
    minim = np.array([])
    xxmin = np.array([])
    xxmax = np.array([])

    maximRSI = np.array([])
    minimRSI = np.array([])
    xxminRSI = np.array([])
    xxmaxRSI = np.array([])

    for i in range(candleid-backcandles, candleid+1):
        if df.iloc[i].pivot == 1:
            minim = np.append(minim, df.iloc[i].low)
            xxmin = np.append(xxmin, i) #could be i instead df.iloc[i].name
        if df.iloc[i].pivot == 2:
            maxim = np.append(maxim, df.iloc[i].high)
            xxmax = np.append(xxmax, i) # df.iloc[i].name
        if df.iloc[i].RSIpivot == 1:
            minimRSI = np.append(minimRSI, df.iloc[i].RSI)
            xxminRSI = np.append(xxminRSI, df.iloc[i].name)
        if df.iloc[i].RSIpivot == 2:
            maximRSI = np.append(maximRSI, df.iloc[i].RSI)
            xxmaxRSI = np.append(xxmaxRSI, df.iloc[i].name)

    if maxim.size<2 or minim.size<2 or maximRSI.size<2 or minimRSI.size<2:
        return 0
    
    slmin, intercmin = np.polyfit(xxmin, minim,1)
    slmax, intercmax = np.polyfit(xxmax, maxim,1)
    slminRSI, intercminRSI = np.polyfit(xxminRSI, minimRSI,1)
    slmaxRSI, intercmaxRSI = np.polyfit(xxmaxRSI, maximRSI,1)
    
    
    if slmin > 1e-4 and slmax > 1e-4 and slmaxRSI <-0.1:
        return 1
    elif slmin < -1e-4 and slmax < -1e-4 and slminRSI > 0.1:
        return 2
    else:
        return 0


def divsignal2(x, nbackcandles):
    backcandles=nbackcandles 
    candleid = int(x.name)

    closp = np.array([])
    xxclos = np.array([])
    
    maxim = np.array([])
    minim = np.array([])
    xxmin = np.array([])
    xxmax = np.array([])

    maximRSI = np.array([])
    minimRSI = np.array([])
    xxminRSI = np.array([])
    xxmaxRSI = np.array([])

    for i in range(candleid-backcandles, candleid+1):
        closp = np.append(closp, df.iloc[i].close)
        xxclos = np.append(xxclos, i)
        if df.iloc[i].pivot == 1:
            minim = np.append(minim, df.iloc[i].low)
            xxmin = np.append(xxmin, i) #could be i instead df.iloc[i].name
        if df.iloc[i].pivot == 2:
            maxim = np.append(maxim, df.iloc[i].high)
            xxmax = np.append(xxmax, i) # df.iloc[i].name
        if df.iloc[i].RSIpivot == 1:
            minimRSI = np.append(minimRSI, df.iloc[i].RSI)
            xxminRSI = np.append(xxminRSI, df.iloc[i].name)
        if df.iloc[i].RSIpivot == 2:
            maximRSI = np.append(maximRSI, df.iloc[i].RSI)
            xxmaxRSI = np.append(xxmaxRSI, df.iloc[i].name)

    slclos, interclos = np.polyfit(xxclos, closp,1)
    
    if slclos > 1e-4 and (maximRSI.size<2 or maxim.size<2):
        return 0
    if slclos < -1e-4 and (minimRSI.size<2 or minim.size<2):
        return 0
# signal decisions here !!!

    if slclos > 1e-4:
        if maximRSI[-1]<maximRSI[-2] and maxim[-1]>maxim[-2]:
            return 1
    elif slclos < -1e-4:
        if minimRSI[-1]>minimRSI[-2] and minim[-1]<minim[-2]:
            return 2
    else:
        return 0

def pointpos(x):
    if x['pivot']==1:
        return x['Low']-1e-3
    elif x['pivot']==2:
        return x['High']+1e-3
    else:
        return np.nan

def RSIpointpos(x):
    if x['RSIpivot']==1:
        return x['RSI']-1
    elif x['RSIpivot']==2:
        return x['RSI']+1
    else:
        return np.nan
    
def pivotid(df1, l, n1, n2): #n1 n2 before and after candle l
    if l-n1 < 0 or l+n2 >= len(df1):
        return 0
    
    pividlow=1
    pividhigh=1
    for i in range(l-n1, l+n2+1):
        if(df1.Low[l]>df1.Low[i]):
            pividlow=0
        if(df1.High[l]<df1.High[i]):
            pividhigh=0
    if pividlow and pividhigh:
        return 3
    elif pividlow:
        return 1
    elif pividhigh:
        return 2
    else:
        return 0

def RSIpivotid(df1, l, n1, n2): #n1 n2 before and after candle l
    if l-n1 < 0 or l+n2 >= len(df1):
        return 0

    pividlow=1
    pividhigh=1
    for i in range(l-n1, l+n2+1):
        if(df1.RSI[l]>df1.RSI[i]):
            pividlow=0
        if(df1.RSI[l]<df1.RSI[i]):
            pividhigh=0
    if pividlow and pividhigh:
        return 3
    elif pividlow:
        return 1
    elif pividhigh:
        return 2
    else:
        return 0 


df['pivot'] = df.apply(lambda x: pivotid(df, x.name,5,5), axis=1)
df['RSIpivot'] = df.apply(lambda x: RSIpivotid(df, x.name, 5, 5), axis=1)

df['pointpos'] = df.apply(lambda row: pointpos(row), axis=1)
df['RSIpointpos'] = df.apply(lambda row: RSIpointpos(row), axis=1)

df['pivot'] = df.apply(lambda x: pivotid(df, x.name,5,5), axis=1)
df['RSIpivot'] = df.apply(lambda x: RSIpivotid(df, x.name, 5, 5), axis=1)

df['divSignal'] = df.apply(lambda row: divsignal(row,30), axis=1)
df['divSignal2'] = df.apply(lambda row: divsignal2(row,30), axis=1)
