In [None]:
import numpy as np
from scipy.fft import fft
import matplotlib.pyplot as plt
from PyEMD import EMD,CEEMDAN, EEMD
from sklearn.feature_selection import mutual_info_regression
import warnings
warnings.filterwarnings('ignore')

from ta.trend import ema_indicator

from modules.Utils.utils import loadFromDB, strategyTester
import pandas as pd


In [None]:
def emd(signal):
    emd = EMD(DTYPE=np.float16, spline_kind='cubic')#CEEMDAN() 
    imfs = emd(signal.values)
    
    t = [i for i in range(len(signal))]
    N = imfs.shape[0]
    fig, axs = plt.subplots(N + 1, 1, figsize=(25,11))
    axs[0].plot(t, signal)
    axs[0].set_title('Original Signal')
    for n, imf in enumerate(imfs):
        axs[n+1].plot(t, imf)
        axs[n+1].set_title(f'IMF {n}')
    
    return imfs

In [None]:

SYMBOL= 'MATIC'
df = loadFromDB(f'../backtest_tools/database/database/Binance/1h/{SYMBOL}-USDT.csv').iloc[-2500:]
#df["EMA20"] = ema_indicator(df["Close"], 20)
df.dropna(inplace=True)
print(df.shape)
df.head()

In [None]:
imfs = emd(df["Close"])

In [None]:
def phase_spectrum(imfs):
    imfs_p = []
    fig, axs = plt.subplots(len(imfs), 1, figsize=(25,11))
    for i, imf in enumerate(imfs):
        trans = fft(imf)
        imf_p = np.arctan(trans.imag / trans.real)
        
        imfs_p.append(imf_p)
        
        axs[i].plot(imf_p, 'o')
        axs[i].set_title(f'IMF {i}')
        
    return imfs_p

In [None]:
imfs_p = phase_spectrum(imfs)

In [None]:
def phase_mi(phases):
    mis = []
    for i in range(len(phases)-1):
        mis.append(mutual_info_regression(phases[i].reshape(-1, 1), phases[i+1])[0])
        
    return np.array(mis)
mis = phase_mi(imfs_p)

In [None]:
def divide_signal(signal, imfs, mis, cutoff=0.75):
    cut_point = np.where(mis > cutoff)[0][0]
    stochastic_component = np.sum(imfs[:cut_point], axis=0)
    deterministic_component = np.sum(imfs[cut_point:], axis=0)
    
    t = [i for i in range(len(signal))]
    
    fig, axs = plt.subplots(3, 1, figsize=(25,11))
    axs[0].plot(t, signal.values)
    axs[0].set_title('Original Signal')
    
    axs[1].plot(t, stochastic_component)
    axs[1].set_title('Stochastic Component')
    
    axs[2].plot(t, deterministic_component)
    axs[2].set_title('Deterministic Component')
    
    return stochastic_component, deterministic_component

In [None]:
stochastic_component, deterministic_component = divide_signal(df["Close"], imfs, mis,cutoff=0.96)

In [None]:
df['deterministic_component'] = deterministic_component
df['stochastic_component'] = stochastic_component
df['deterministic_component_diff'] = df['deterministic_component'].diff()
df.dropna(inplace=True)

In [None]:
def buyCondition(row:pd.Series, previous_row:pd.Series=None)->bool:
    return True if row.deterministic_component_diff>0 else False

def sellCondition(row:pd.Series, previous_row:pd.Series=None)->bool:
    return True if row.deterministic_component_diff<0 else False

strategyTester(df.iloc[:len(df)-1500],buyCondition,sellCondition,equity=1000)

In [None]:
from bokeh.plotting import figure
from bokeh.io import push_notebook,show, output_notebook
from bokeh.models import HoverTool, Band, ColumnDataSource
from bokeh.layouts import row, column
output_notebook()

In [None]:

import matplotlib.pyplot as plt

p = figure(title=f"Comparaison", x_axis_label='time', y_axis_label='value ($)',width=1500, height=600,)
# Add the HoverTool to the figure
p.add_tools(HoverTool(
    tooltips=[
    ("Price", "@y{0.00} $"),
], formatters={
        '$x': 'datetime',
        '$y' : 'printf',
    },
    mode='vline'
))
p.line(list(range(len(df))), df.Close.values, legend_label="Close", line_width=2, )
p.line(list(range(len(df))), df.deterministic_component.values, legend_label="Deterministic", line_width=2, color='orange')

p.legend.location = "top_left"

q = figure(title=f"Comparaison", x_axis_label='time', y_axis_label='value',width=1500, height=600,)
# Add the HoverTool to the figure
q.add_tools(HoverTool(
    tooltips=[
    ("Price", "@y{0.00}"),
], formatters={
        '$x': 'datetime',
        '$y' : 'printf',
    },
    mode='vline'
))
q.line(list(range(len(df))), df.deterministic_component_diff.values, legend_label="Deterministic", line_width=2, color='orange')
q.legend.location = "top_left"

show(column(p, q))

# Wrap it up

In [None]:
df_final = pd.DataFrame()
for col in df.columns:
    df_final[col]=np.nan
df_final

In [None]:
kucoin_range = 1500

print(f'Before inloop :')

for i in range(len(df)-kucoin_range):
    df_i = df.iloc[i:i+kucoin_range]
    imfs = emd(df_i["Close"])
    mis = phase_mi(imfs_p)
    imfs_p = phase_spectrum(imfs)
    stochastic_component, deterministic_component = divide_signal(df_i["Close"], imfs, mis,cutoff=0.96)
    df_i['deterministic_component'] = deterministic_component
    df_i['stochastic_component'] = stochastic_component
    df_i['deterministic_component_diff'] = df_i['deterministic_component'].diff()
    df_final = df_final.append(df_i.iloc[-1],ignore_index=True)
    print(f'{len(df_final)} records')