<a href="https://colab.research.google.com/github/sanchitgarg2204/Encoding-Candlesticks-As-Images-For-Pattern-Classification-Using-CNN/blob/main/white_paper.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from sklearn.linear_model import LinearRegression
import numpy as np
import pickle


def load_pkl(pkl_name):
    '''
    Args:
        pkl_name (string): path for pickle.

    Returns:
        (dict): including following structure
            `raw time-series data` (N, 32, 4):
                'train_data', 'val_data', 'test_data'
            `gasf data` (N, 32, 32, 4):
                'train_gaf', 'val_gaf', 'test_gaf'
            `label data` (N, 3):
                'train_label', 'val_label', 'test_label',
            `one-hot label data` (N, 9):
                'train_label_arr', 'val_label_arr', 'test_label_arr'
    '''
    # load data from data folder
    with open(pkl_name, 'rb') as f:
        data = pickle.load(f)
    return data


def ts2gasf(ts, max_v, min_v):
    '''
    Args:
        ts (numpy): (N, )
        max_v (int): max value for normalization
        min_v (int): min value for normalization

    Returns:
        gaf_m (numpy): (N, N)
    '''
    # Normalization : 0 ~ 1
    if max_v == min_v:
        gaf_m = np.zeros((len(ts), len(ts)))
    else:
        ts_nor = np.array((ts-min_v) / (max_v-min_v))
        # Arccos
        ts_nor_arc = np.arccos(ts_nor)
        # GAF
        gaf_m = np.zeros((len(ts_nor), len(ts_nor)))
        for r in range(len(ts_nor)):
            for c in range(len(ts_nor)):
                gaf_m[r, c] = np.cos(ts_nor_arc[r] + ts_nor_arc[c])
    return gaf_m


def get_gasf(arr):
    '''Convert time-series to gasf
    Args:
        arr (numpy): (N, ts_n, 4)

    Returns:
        gasf (numpy): (N, ts_n, ts_n, 4)

    Todos:
        add normalization together version
    '''
    arr = arr.copy()
    gasf = np.zeros((arr.shape[0], arr.shape[1], arr.shape[1], arr.shape[2]))
    for i in range(arr.shape[0]):
        for c in range(arr.shape[2]):
            each_channel = arr[i, :, c]
            c_max = np.amax(each_channel)
            c_min = np.amin(each_channel)
            each_gasf = ts2gasf(each_channel, max_v=c_max, min_v=c_min)
            gasf[i, :, :, c] = each_gasf
    return gasf


def gasf2ts(arr):
    '''
    Args:
        arr (numpy array):  (32, 32)
    Returns:
        numpy.series: (1d)
    '''
    # Get element from diagonal
    diag_v = np.zeros((arr.shape[0],))
    for i in range(arr.shape[0]):
        diag_v[i] = arr[i, i]
    # Inverse to Arc
    diag_v_arc = np.arccos(diag_v) / 2
    # Inverse to Normalized ts
    ts = np.cos(diag_v_arc)
    return ts


def ohlc2culr(ohlc):
    '''
    Args:
        ohlc (numpy): (N, ts_n, 4)

    Returns:
        culr (numpy): (N, ts_n, 4)
    '''
    culr = np.zeros((ohlc.shape[0], ohlc.shape[1], ohlc.shape[2]))
    culr[:, :, 0] =  ohlc[:, :, -1]
    culr[:, :, 1] = ohlc[:, :, 1] - np.maximum(ohlc[:, :, 0], ohlc[:, :, -1])
    culr[:, :, 2] = np.minimum(ohlc[:, :, 0], ohlc[:, :, -1]) - ohlc[:, :, 2]
    culr[:, :, 3] = ohlc[:, :, -1] - ohlc[:, :, 0]
    return culr


def culr2ohlc(culr_n, culr):
    '''
    Args:
        culr_n (numpy): (N, ts_n, 4)
        culr (numpy): (N, ts_n, 4)

    Returns:
        ohlc (numpy): (N, ts_n, 4)
    '''
    ohlc = np.zeros((*culr_n.shape, ))
    for i in range(culr_n.shape[0]):
        for c in range(culr_n.shape[-1]):
            # get min & max from data before normalized
            each_culr = culr[i, :, c]
            min_v = np.amin(each_culr)
            max_v = np.amax(each_culr)
            # inverse normalization
            each_culr_n = culr_n[i, :, c]
            culr_n[i, :, c] = (each_culr_n * (max_v - min_v)) + min_v
        # convert culr to ohlc
        ohlc[i, :, -1] = culr_n[i, :, 0]
        ohlc[i, :, 0] = ohlc[i, :, -1] - culr_n[i, :, -1]
        ohlc[i, :, 1] = culr_n[i, :, 1] + np.maximum(ohlc[i, :, 0], ohlc[i, :, -1])
        ohlc[i, :, 2] = np.minimum(ohlc[i, :, 0], ohlc[i, :, -1]) - culr_n[i, :, 2]
    return ohlc


def get_slope(series):
    y = series.values.reshape(-1, 1)
    x = np.array(range(1, series.shape[0] + 1)).reshape(-1,1)
    model = LinearRegression()
    model.fit(x, y)
    slope = model.coef_
    return slope


def get_trend(slope):
    '''Need to run `process_data` first with slope only, then calculate by yourself.
    25 percentile: 7.214285714286977e-05
    '''
    slope = np.array(slope)
    thres = 7.214285714286977e-05
    if (slope >= thres):
        return 1
    elif (slope <= -thres):
        return -1
    else:
        return 0

In [None]:
from tqdm import trange, tqdm
from scipy import stats
import pandas as pd
import time

def rename(data):
    rename_dc = {'Date': 'timestamp', 'Price':'close'}
    data.rename(columns=rename_dc, inplace=True)
    data.columns = [c.lower() for c in data.columns]
    return data


def process_data(data, slope=True):
    '''Including calculation of CLUR, Quartiles, and cus trend
    Args:
        data (dataframe): csv data from assets. With column names open, high, low, close.

    Returns:
        dataframe.
    '''
    if slope:
        # process slpoe
        data['diff'] = data['close'] - data['open']
        data = data.query('diff != 0').reset_index(drop=True)
        data['direction'] = np.sign(data['diff'])
        data['ushadow_width'] = 0
        data['lshadow_width'] = 0

        for idx in trange(len(data)):
            if data.loc[idx, 'direction'] == 1:
                data.loc[idx, 'ushadow_width'] = data.loc[idx, 'high'] - data.loc[idx, 'close']
                data.loc[idx, 'lshadow_width'] = data.loc[idx, 'open'] - data.loc[idx, 'low']
            else:
                data.loc[idx, 'ushadow_width'] = data.loc[idx, 'high'] - data.loc[idx, 'open']
                data.loc[idx, 'lshadow_width'] = data.loc[idx, 'close'] - data.loc[idx, 'low']

            if idx <= 50:
                data.loc[idx, 'body_per'] = stats.percentileofscore(abs(data['diff']), abs(data.loc[idx,'diff']), 'rank')
                data.loc[idx, 'upper_per'] = stats.percentileofscore(data['ushadow_width'], data.loc[idx,'ushadow_width'], 'rank')
                data.loc[idx, 'lower_per'] = stats.percentileofscore(data['lshadow_width'], data.loc[idx,'lshadow_width'], 'rank')
            else:
                data.loc[idx, 'body_per'] = stats.percentileofscore(abs(data.loc[idx-50:idx, 'diff']),abs(data.loc[idx, 'diff']), 'rank')
                data.loc[idx, 'upper_per'] = stats.percentileofscore(data.loc[idx-50:idx, 'ushadow_width'], data.loc[idx, 'ushadow_width'], 'rank')
                data.loc[idx, 'lower_per'] = stats.percentileofscore(data.loc[idx-50:idx, 'lshadow_width'], data.loc[idx, 'lshadow_width'], 'rank')

        data['slope'] = data['close'].rolling(7).apply(get_slope, raw=False)
        data.dropna(inplace=True)
    else:
        # process trend
        data['trend'] = data['slope'].rolling(1).apply(get_trend, raw=False)
        data['previous_trend'] = data['trend'].shift(1).fillna(0)
    return data


def detect_evening_star(data, short_per=35, long_per=65):
    '''Detect evening star pattern
    Args:
        short_per (int): percentile for determination.
        long_per (int): percentile for determination.

    Returns:
        dataframe.
    '''
    print('[ Info ] : detecting evening star')
    temp = data[(data['previous_trend'] == 1) & (data['direction'] == 1)].index
    data['evening'] = 0
    try:
        for idx in tqdm(temp):
            cond1 = (data.loc[idx, 'body_per'] >= long_per)
            cond2 = (data.loc[idx+1, 'body_per'] <= short_per)
            cond3 = (data.loc[idx+2, 'direction'] == -1)
            cond4 = (data.loc[idx+1, 'close'] + data.loc[idx+1, 'open'])/2 >= data.loc[idx, 'close']
            cond5 = data.loc[idx+2, 'close'] <= ((data.loc[idx, 'open'] + data.loc[idx, 'close'])/2)
            # cond6 = (data.loc[idx+2, 'body_per'] >= long_per)
            cond7 = (data.loc[idx+2, 'open'] <= (data.loc[idx+1, 'open'] + data.loc[idx+1, 'close'])/2)
            if cond1 & cond2 & cond3 & cond4 & cond5 & cond7:
                data.loc[idx+2, 'evening'] = 1
    except:
        pass


    return data


def detect_morning_star(data, short_per=35, long_per=65):
    '''Detect morning star pattern
    Args:
        short_per (int): percentile for determination.
        long_per (int): percentile for determination.

    Returns:
        dataframe.
    '''
    print('[ Info ] : detecting morning star')
    temp = data[(data['previous_trend'] == -1) & (data['direction'] == -1)].index
    data['morning'] = 0
    try:
        for idx in tqdm(temp):
            cond1 = (data.loc[idx, 'body_per'] >= long_per)
            cond2 = (data.loc[idx+1, 'body_per'] <= short_per)
            cond3 = (data.loc[idx+2, 'direction'] == 1)
            # cond4 = max(data.loc[idx+1, 'close'], data.loc[idx+1, 'open']) <= data.loc[idx, 'close']
            cond4 = (data.loc[idx+1, 'close'] + data.loc[idx+1, 'open'])/2 <= data.loc[idx, 'close']
            cond5 = data.loc[idx+2, 'close'] >= ((data.loc[idx, 'open'] + data.loc[idx, 'close'])/2)
            # cond6 = (data.loc[idx+2, 'body_per'] >= long_per)
            cond7 = (data.loc[idx+2, 'open'] >= (data.loc[idx+1, 'open'] + data.loc[idx+1, 'close'])/2)
            if cond1 & cond2 & cond3 & cond4 & cond5 & cond7:
                data.loc[idx+2, 'morning'] = 1
    except:
        pass

    return data


def detect_shooting_star(data, short_per=35, long_per=65):
    '''Detect shooting star pattern
    Args:
        short_per (int): percentile for determination.
        long_per (int): percentile for determination.

    Returns:
        dataframe.
    '''
    print('[ Info ] : detecting shooting star')
    data['shooting_star'] = 0
    temp = data[(data['previous_trend'] == 1) & (data['direction'] == 1)].index
    try:
        for idx in tqdm(temp):
            cond1 = (data.loc[idx, 'body_per'] >= long_per)
            cond2 = (data.loc[idx, 'direction'] == 1)
            cond3 = (data.loc[idx+1, 'ushadow_width'] > 2 * abs(data.loc[idx+1, 'diff']))
            cond4 = (min(data.loc[idx+1, 'open'], data.loc[idx+1, 'close']) > ((data.loc[idx, 'close'] + data.loc[idx, 'open']) / 2))
            cond5 = (data.loc[idx+1, 'lower_per'] <= short_per - 10)  # 25
            cond6 = (data.loc[idx+1, 'upper_per'] >= long_per)
            if cond1 & cond2 & cond3 & cond4 & cond5 & cond6:
                data.loc[idx+1, 'shooting_star'] = 1
    except:
        pass

    return data


def detect_hanging_man(data, short_per=35, long_per=65):
    '''Detect hanging man pattern
    Args:
        short_per (int): percentile for determination.
        long_per (int): percentile for determination.

    Returns:
        dataframe.
    '''
    print('[ Info ] : detecting hanging man')
    data['hanging_man'] = 0
    temp = data[(data['previous_trend'] == 1) & (data['direction'] == 1)].index
    try:
        for idx in tqdm(temp):
            cond1 = (data.loc[idx, 'lshadow_width'] > 2 * abs(data.loc[idx, 'diff']))
            cond2 = (data.loc[idx, 'body_per'] <= short_per)
            cond3 = (data.loc[idx, 'upper_per'] <= (short_per - 10))
            cond4 = (data.loc[idx, 'lower_per'] >= long_per)
            if cond1 & cond2 & cond3 & cond4:
                data.loc[idx, 'hanging_man'] = 1
    except:
        pass


    return data


def detect_bullish_engulfing(data, short_per=35, long_per=65):
    '''Detect bullish engulfing pattern
    Args:
        short_per (int): percentile for determination.
        long_per (int): percentile for determination.

    Returns:
        dataframe.
    '''
    print('[ Info ] : detecting bullish engulfing')
    data['bullish_engulfing'] = 0
    temp = data[(data['previous_trend'] == -1) & (data['direction'] == -1)].index
    try:
        for idx in tqdm(temp):
            cond1 = (data.loc[idx, 'direction'] == -1)
            cond2 = (data.loc[idx, 'body_per'] >= long_per)
            cond3 = (data.loc[idx+1, 'direction'] == 1)
            cond4 = (data.loc[idx+1, 'close'] > data.loc[idx, 'open'])
            cond5 = (data.loc[idx+1, 'open'] < data.loc[idx, 'close'])
            if cond1 & cond2 & cond3 & cond4 & cond5:
                data.loc[idx+1, 'bullish_engulfing'] = 1
    except:
        pass

    return data


def detect_bearish_engulfing(data, short_per=35, long_per=65):
    '''Detect bearish engulfing pattern
    Args:
        short_per (int): percentile for determination.
        long_per (int): percentile for determination.

    Returns:
        dataframe.
    '''
    print('[ Info ] : detecting bearish engulfing')
    data['bearish_engulfing'] = 0
    temp = data[(data['previous_trend'] == 1) & (data['direction'] == 1)].index
    try:
        for idx in tqdm(temp):
            cond1 = (data.loc[idx, 'direction'] == 1)
            cond2 = (data.loc[idx, 'body_per'] >= long_per)
            cond3 = (data.loc[idx+1, 'direction'] == -1)
            cond4 = (data.loc[idx+1, 'close'] < data.loc[idx, 'open'])
            cond5 = (data.loc[idx+1, 'open'] > data.loc[idx, 'close'])
            if cond1 & cond2 & cond3 & cond4 & cond5:
                data.loc[idx+1, 'bearish_engulfing'] = 1
    except:
        pass


    return data


def detect_hammer(data,short_per=35, long_per=65):
    '''Detect hammer pattern
    Args:
        short_per (int): percentile for determination.
        long_per (int): percentile for determination.

    Returns:
        dataframe.
    '''
    print('[ Info ] : detecting hammer')
    data['hammer'] = 0
    temp = data[(data['previous_trend'] == -1) & (data['direction'] == -1)].index
    try:
        for idx in tqdm(temp):
            cond1 = (data.loc[idx, 'lshadow_width'] > 2 * abs(data.loc[idx, 'diff']))
            cond2 = (data.loc[idx, 'body_per'] <= short_per)
            cond3 = (data.loc[idx, 'upper_per'] <= (short_per - 15))
            cond4 = (data.loc[idx, 'lower_per'] >= long_per)
            if cond1 & cond2 & cond3 & cond4:
                data.loc[idx, 'hammer'] = 1
    except:
        pass


    return data


def detect_inverted_hammer(data, short_per=35, long_per=65):
    '''Detect inverted hammer pattern
    Args:
        short_per (int): percentile for determination.
        long_per (int): percentile for determination.

    Returns:
        dataframe.
    '''
    print('[ Info ] : detecting inverted hammer')
    data['inverted_hammer'] = 0
    temp = data[(data['previous_trend'] == -1) & (data['direction'] == -1)].index
    try:
        for idx in tqdm(temp):
            cond1 = (data.loc[idx, 'direction'] == -1)
            cond2 = (data.loc[idx, 'body_per'] >= long_per)
            cond3 = (data.loc[idx+1, 'ushadow_width'] > 2 * abs(data.loc[idx+1, 'diff']))
            cond4 = (max(data.loc[idx+1, 'open'], data.loc[idx+1, 'close']) < ((data.loc[idx, 'close'] + data.loc[idx, 'open']) / 2))
            cond5 = (data.loc[idx+1, 'lower_per'] <= short_per)
            cond6 = (data.loc[idx+1, 'upper_per'] >= long_per)
            if cond1 & cond2 & cond3 & cond4 & cond5 & cond6:
                data.loc[idx+1, 'inverted_hammer'] = 1
    except:
        pass

    return data


def detect_bullish_harami(data, short_per=35, long_per=65):
    '''Detect inverted bullish harami pattern
    Args:
        short_per (int): percentile for determination.
        long_per (int): percentile for determination.

    Returns:
        dataframe.
    '''
    print('[ Info ] : detecting bullish harami')
    data['bullish_harami'] = 0
    temp = data[(data['previous_trend'] == -1) & (data['direction'] == -1)].index
    try:
        for idx in tqdm(temp):
            cond1 = (data.loc[idx, 'direction'] == -1)
            cond2 = (data.loc[idx, 'body_per'] >= long_per)
            cond3 = (data.loc[idx+1, 'direction'] == 1)
            cond4 = (data.loc[idx+1, 'close'] >= ((data.loc[idx, 'open'] + data.loc[idx, 'close'])/2))
            cond5 = (data.loc[idx+1, 'close'] < data.loc[idx, 'open'])
            cond6 = (data.loc[idx+1, 'open'] > data.loc[idx, 'close'])
            cond7 = (data.loc[idx+1, 'open'] <= ((data.loc[idx, 'open'] + data.loc[idx, 'close'])/2))
            cond8 = (data.loc[idx+1, 'body_per'] >= long_per)
            if cond1 & cond2 & cond3 & cond4 & cond5 & cond6 & cond7 & cond8:
                data.loc[idx+1, 'bullish_harami'] = 1
    except:
        pass


    return data


def detect_bearish_harami(data, short_per=35, long_per=65):
    '''Detect inverted bearish harami pattern
    Args:
        short_per (int): percentile for determination.
        long_per (int): percentile for determination.

    Returns:
        dataframe.
    '''
    print('[ Info ] : detecting bearish harami')
    data['bearish_harami'] = 0
    temp = data[(data['previous_trend'] == 1) & (data['direction'] == 1)].index
    try:
        for idx in tqdm(temp):
            cond1 = (data.loc[idx, 'direction'] == 1)
            cond2 = (data.loc[idx, 'body_per'] >= long_per)
            cond3 = (data.loc[idx+1, 'direction'] == -1)
            cond4 = (data.loc[idx+1, 'close'] <= ((data.loc[idx, 'open'] + data.loc[idx, 'close'])/2))
            cond5 = (data.loc[idx+1, 'close'] > data.loc[idx, 'open'])
            cond6 = (data.loc[idx+1, 'open'] < data.loc[idx, 'close'])
            cond7 = (data.loc[idx+1, 'open'] >= ((data.loc[idx, 'open'] + data.loc[idx, 'close'])/2))
            cond8 = (data.loc[idx+1, 'body_per'] >= long_per)
            if cond1 & cond2 & cond3 & cond4 & cond5 & cond6 & cond7 & cond8:
                data.loc[idx+1, 'bearish_harami'] = 1
    except:
        pass


    return data


def detect_all(data, tasks_ls=None):
    '''
    Args:
        data (dataframe): csv data after `process_data` function.
        multi (bool): use multiprocessing or not.
        pro_num (int): how many processes to be used.

    Returns:
        data (dataframe): dataframe with detections.
    '''

    data = detect_evening_star(data)
    data = detect_morning_star(data)
    data = detect_shooting_star(data)
    data = detect_hanging_man(data)
    data = detect_bullish_engulfing(data)
    data = detect_bearish_engulfing(data)
    data = detect_hammer(data)
    data = detect_inverted_hammer(data)
    data = detect_bullish_harami(data)
    data = detect_bearish_harami(data)
    return data


def detection_result(data):
    '''Print numbers of detection
    Args:
        data (dataframe): csv data after `process_data` function.

    Returns:
        data (dataframe): dataframe with detections.
    '''
    print('\n[ Info ] : number of evening star is %s' % np.sum(data['evening']))
    print('[ Info ] : number of morning star is %s' % np.sum(data['morning']))
    print('[ Info ] : number of shooting star is %s' % np.sum(data['shooting_star']))
    print('[ Info ] : number of hanging man is %s' % np.sum(data['hanging_man']))
    print('[ Info ] : number of bullish engulfing is %s' % np.sum(data['bullish_engulfing']))
    print('[ Info ] : number of bearish engulfing is %s' % np.sum(data['bearish_engulfing']))
    print('[ Info ] : number of hammer is %s' % np.sum(data['hammer']))
    print('[ Info ] : number of inverted hammer is %s' % np.sum(data['inverted_hammer']))
    print('[ Info ] : number of bullish harami is %s' % np.sum(data['bullish_harami']))
    print('[ Info ] : number of bearish harami is %s' % np.sum(data['bearish_harami']))


if __name__ == "__main__":
    TASLS_LS = [detect_evening_star, detect_morning_star, detect_shooting_star,
                detect_hanging_man, detect_bullish_engulfing, detect_bearish_engulfing,
                detect_hammer, detect_inverted_hammer, detect_bullish_harami,
                detect_bearish_harami]



In [None]:
from google.colab import drive
import sys
drive.mount('/content/drive/', force_remount = True)

Mounted at /content/drive/


In [None]:
df=pd.read_excel('/content/drive/MyDrive/whitepaper/eur-usd-2010-17.xlsx', header=0)
df.to_csv('data1.csv', index=False)
data=pd.read_csv('data1.csv')

In [None]:
data=rename(data)
data.head()
# calculate features & slope
data = process_data(data, slope=True)

# calculate trend (depend on slopes)
data = process_data(data, slope=False)

100%|██████████| 2064/2064 [00:05<00:00, 382.02it/s]


In [None]:
detect_data = detect_all(data, TASLS_LS)
#data.to_csv('./data/eurusd_2010_2017_patterns.csv', index=False)
detection_result(detect_data)

[ Info ] : detecting evening star


100%|██████████| 522/522 [00:00<00:00, 6913.21it/s]


[ Info ] : detecting morning star


100%|█████████▉| 489/490 [00:00<00:00, 6623.18it/s]


[ Info ] : detecting shooting star


100%|██████████| 522/522 [00:00<00:00, 8315.62it/s]


[ Info ] : detecting hanging man


100%|██████████| 522/522 [00:00<00:00, 7803.52it/s]


[ Info ] : detecting bullish engulfing


100%|█████████▉| 489/490 [00:00<00:00, 10285.62it/s]


[ Info ] : detecting bearish engulfing


100%|██████████| 522/522 [00:00<00:00, 9908.88it/s]


[ Info ] : detecting hammer


100%|██████████| 490/490 [00:00<00:00, 13391.86it/s]


[ Info ] : detecting inverted hammer


100%|█████████▉| 489/490 [00:00<00:00, 7635.52it/s]


[ Info ] : detecting bullish harami


100%|█████████▉| 489/490 [00:00<00:00, 6065.27it/s]


[ Info ] : detecting bearish harami


100%|██████████| 522/522 [00:00<00:00, 6350.08it/s]


[ Info ] : number of evening star is 0
[ Info ] : number of morning star is 0
[ Info ] : number of shooting star is 0
[ Info ] : number of hanging man is 20
[ Info ] : number of bullish engulfing is 8
[ Info ] : number of bearish engulfing is 1
[ Info ] : number of hammer is 6
[ Info ] : number of inverted hammer is 0
[ Info ] : number of bullish harami is 6
[ Info ] : number of bearish harami is 4





In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np

from keras import backend as K
from keras import optimizers
import keras.utils
from keras.models import Sequential
from keras.layers import Dense, Flatten, Conv2D, Activation



def get_model(params):
    model = Sequential()

    # Conv1
    model.add(Conv2D(16, (2, 2), input_shape=(10, 10, 4), padding='same', strides=(1, 1)))
    model.add(Activation('sigmoid'))

    # Conv2
    model.add(Conv2D(16, (2, 2), padding='same', strides=(1, 1)))
    model.add(Activation('sigmoid'))

    # FC
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))

    model.add(Dense(params['classes']))
    model.add(Activation('softmax'))
    model.summary()

    return model


def train_model(params, data):
    model = get_model(params)
    model.compile(loss='categorical_crossentropy', optimizer=params['optimizer'], metrics=['accuracy'])
    hist = model.fit(x=data['train_gaf'], y=data['train_label_arr'],
                     validation_data=(data['val_gaf'], data['val_label_arr']),
                     batch_size=params['batch_size'], epochs=params['epochs'], verbose=2)

    return (model, hist)



if __name__ == "__main__":
    PARAMS = {}
    PARAMS['pkl_name'] = '/content/drive/MyDrive/whitepaper/label8_eurusd_10bar_1500_500_val200_gaf_culr.pkl'
    PARAMS['model_name'] = 'cnn_model_10bar.h5'
    PARAMS['classes'] = 9
    PARAMS['learning_rate'] = 0.01
    PARAMS['epochs'] = 50
    PARAMS['batch_size'] = 64
    PARAMS['optimizer'] = optimizers.SGD(lr=PARAMS['learning_rate'])

    # ---------------------------------------------------------
    # load data & keras model

    data =load_pkl(PARAMS['pkl_name'])

    # train cnn model
    model, hist = train_model(PARAMS, data)
    model.save(PARAMS['model_name'])






Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 10, 10, 16)        272       
                                                                 
 activation (Activation)     (None, 10, 10, 16)        0         
                                                                 
 conv2d_1 (Conv2D)           (None, 10, 10, 16)        1040      
                                                                 
 activation_1 (Activation)   (None, 10, 10, 16)        0         
                                                                 
 flatten (Flatten)           (None, 1600)              0         
                                                                 
 dense (Dense)               (None, 128)               204928    
                                                                 
 dense_1 (Dense)             (None, 9)                 1

  saving_api.save_model(


In [None]:
# train & test result
from sklearn.metrics import classification_report
def print_result(data, model):
    # get train & test pred-labels
    train_pred = np.argmax(model.predict(data['train_gaf']),axis=-1)
    #train_pred = model.predict_classes(data['train_gaf'])
    test_pred = np.argmax(model.predict(data['test_gaf']),axis=-1)
    #test_pred = model.predict_classes(data['test_gaf'])
    # get train & test true-labels
    train_label = data['train_label'][:, 0]
    test_label = data['test_label'][:, 0]
    # train & test confusion matrix
    train_result_cm = confusion_matrix(train_label, train_pred, labels=range(9))
    test_result_cm = confusion_matrix(test_label, test_pred, labels=range(9))

    print(train_result_cm, '\n', test_result_cm)
    print()
    print(classification_report(train_label,train_pred))
    print()
    print(classification_report(test_label,test_pred))
print_result(data, model)

[[2098  101  170   89  141   96  117  117   71]
 [  69 1418    0   11    0    0    2    0    0]
 [  45    0 1440    0   15    0    0    0    0]
 [  58   38    0 1185    0    1    0  218    0]
 [  28    0   41    0 1344    0    4    0   83]
 [ 111    2    0    1    0 1346    1   39    0]
 [ 104    2    6    0    3    0 1321    0   64]
 [  42    6    0  102    0   12    0 1338    0]
 [  41    0    3    0  244    0   41    0 1171]] 
 [[706  26  65  32  46  33  23  41  28]
 [ 17 478   0   5   0   0   0   0   0]
 [ 10   0 489   0   1   0   0   0   0]
 [ 21   8   0 405   0   0   0  66   0]
 [ 11   0  18   0 442   0   1   0  28]
 [ 41   1   0   0   0 448   0  10   0]
 [ 34   0   2   0   1   0 453   0  10]
 [ 13   1   0  15   0   5   0 466   0]
 [ 19   0   0   0  63   0  11   0 407]]

              precision    recall  f1-score   support

         0.0       0.81      0.70      0.75      3000
         1.0       0.90      0.95      0.92      1500
         2.0       0.87      0.96      0.91      