In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
# from matplotlib import pyplot as plt
from PIL import Image
# import scipy.misc as smp

In [None]:
dataPath='../../../dataset/market_data/output/nikkei_225/CONSTITUENTS_DAILY_9984.T.csv'
df = pd.read_csv(dataPath, parse_dates=['date'])
# print(df)
df = df.dropna(how='any') # remove row with null
df = df.sort_values(by=['date'], ascending=True) # make sure it is ordered by date
df.head(3)

In [None]:
# image config (size)
# 5d - 32x15 
image_height_5d = 32
volume_image_height_5d = 6
gap_height_5d = 2

# 20d - 64x60
image_height_20d = 64
volume_image_height_20d = 13
gap_height_20d = 1

# 60d - 96x180
image_height_60d = 96
volume_image_height_60d = 19
gap_height_60d = 1

# 252d - 256x756
image_height_252d = 256
volume_image_height_252d = 51
gap_height_252d = 1


In [None]:
# add return_date and return_value

def add_return_data(
        raw_dataset: pd.DataFrame,
        return_days: int,
    ):
    dataset = raw_dataset.copy()
    return_dates = []
    return_values = []
    for i in range(len(dataset)):
        # # get expect return date and find the closest record 
        # return_date = dataset.iloc[i]['date'] + np.timedelta64(return_days,'D')
        # return_data_df = dataset.loc[dataset['date'] >= return_date].sort_values(by=['date'])
        # # return null if there is no return_data_df
        # if len(return_data_df) < 1:
        #     return_dates.append(None)
        #     return_values.append(None)
        #     continue

        # return_data = return_data_df.iloc[0]
        # return_value = return_data['close'] / dataset.iloc[i]['close'] - 1
        # return_dates.append(return_data['date'])
        # return_values.append(return_value)

        # if we just search for the [i+return_days] row
        if (i + return_days >= len(dataset)): 
            return_dates.append(None)
            return_values.append(None)
            continue
        return_data = dataset.iloc[i + return_days]
        return_value = return_data['close'] / dataset.iloc[i]['close'] - 1
        return_dates.append(return_data['date'])
        return_values.append(return_value)

    dataset[f'return_{return_days}_date'] = return_dates
    dataset[f'return_{return_days}_value'] = return_values
    
    return dataset

df = add_return_data(df, 5) # add return_5
df = add_return_data(df, 20) # add return_20
df = add_return_data(df, 60) # add return_60
df.to_csv("enhanced_df.csv")
df.head(5)

In [None]:
# Function to generate daily ohlc chart
# https://stackoverflow.com/questions/434583/what-is-the-fastest-way-to-draw-an-image-from-discrete-pixel-values-in-python
# https://stackoverflow.com/questions/57545125/attributeerror-module-scipy-misc-has-no-attribute-toimage

def generate_daily_ohlc_chart(
        size: int,          # height of the bitmap
        min: float,         # the minimum value in that period (5d etc)
        max: float,         # the maximum value in that period (5d etc)
        o: float,           # open
        h: float,           # high
        l: float,           # low
        c: float,           # close
        color = 255,        # 255 or [255, 255, 255] if we want to use RGB
        bgColor = 0         # 0 or [0, 0, 0] if we want to use RGB
    ):
    # initize the data
    data = np.empty((size, 3), dtype=np.uint8) # (size, 3) or (size, 3, 3) if we want to use RGB
    # background color
    data[:,:] = bgColor
    # calculation
    _step = (max - min) / size
    # open
    _open = int((o - min) / _step) # will floor the number
    if _open == size: # -1 if its the maximum value
        _open -= 1
    data[_open,0] = color
    # high-low
    _high = int((h - min) / _step) # will floor the number
    _low = int((l - min) / _step) # will floor the number
    if _high == size: # -1 if its the maximum value
        _high -= 1
    if _low == size: # -1 if its the maximum value
        _low -= 1
    data[_low:_high + 1,1] = color
    # close
    _close = int((c - min) / _step) # will floor the number
    if _close == size: # -1 if its the maximum value
        _close -= 1
    data[_close,2] = color

    return np.flip(data, 0)


In [None]:
# Function to generate daily volume barchart
def generate_daily_volume_chart(
        size: int,          # height of the bitmap
        min: float,         # the minimum value in that period (5d etc)
        max: float,         # the maximum value in that period (5d etc)
        v: float,           # open
        color = 255,
        bgColor = 0
    ):
    # initize the data
    data = np.empty((size, 3), dtype=np.uint8)
    # background color
    data[:,:] = bgColor
    # calculation
    _step = (max - min) / size
    # no volume data
    if _step == 0:
        return data
    # drawing volume bar
    _volume = int((v - min) / _step) # will floor the number
    if (_volume != 0):
        data[0:_volume,1] = color

    # print(_step)
    # print(v)
    # print(_volume)

    return np.flip(data, 0)

In [None]:
# Function to generate a gap
def generate_gap(
        height: int,          # height of the bitmap
        width: int,          # height of the bitmap
        bgColor = 0
    ):
    # initize the data
    data = np.empty((height, width), dtype=np.uint8)
    # background color
    data[:,:] = bgColor
    return data

In [None]:
# Function to generate one image
def generate_image(
        dataset: pd.DataFrame,
        ohlc_height: int,
        volume_height: int, # set this to 0 if we don't want volume barchart
        gag_height: int, # set this to 0 if we don't want volume barchart
    ):
    data = []
    for index, row in dataset.iterrows():
        _ohlc = generate_daily_ohlc_chart(
            ohlc_height,
            float(dataset.min()['low']),
            float(dataset.max()['high']),
            float(row['open']),
            float(row['high']),
            float(row['low']),
            float(row['close']),
        )
        if volume_height == 0:
            data.append(_ohlc)
        else:
            _gap = generate_gap(
                gag_height,
                3,
            )
            _volume = generate_daily_volume_chart(
                volume_height,
                0, # 0 or float(dataset.min()['volume'])
                float(dataset.max()['volume']),
                float(row['volume']),
            )
            _data = np.concatenate([_ohlc, _gap, _volume], axis=0)
            data.append(_data)

    imgData = np.concatenate(data, axis=1)
    img = Image.fromarray(imgData)
        
    return img

In [None]:
# example: 5 day data (ohlc only)
subset = df[500:505]
img = generate_image(subset, image_height_5d, 0, 0)
# img.show()
img.save('demo-5d-ohlc.png')

In [None]:
# example: 5 day data (ohlc + volume)
subset = df[500:505]
img = generate_image(subset, image_height_5d - volume_image_height_5d - gap_height_5d, volume_image_height_5d, gap_height_5d)
# img.show()
img.save('demo-5d-ohlc-volume.png')

In [None]:
# example: 20 day data (ohlc)
subset = df[480:500]
img = generate_image(subset, image_height_20d, 0, 0)
# img.show()
# img_array = np.array(img)
# print(img_array)
img.save('demo-20d-ohlc.png')

In [None]:
# example: 20 day data (ohlc + volume)
subset = df[480:500]
img = generate_image(subset, image_height_20d - volume_image_height_20d - gap_height_20d, volume_image_height_20d, gap_height_20d)
# img.show()
# img_array = np.array(img)
# print(img_array)
img.save('demo-20d-ohlc-volume.png')

In [None]:
# example: 60 day data (ohlc)
subset = df[1000:1060]
img = generate_image(subset, image_height_60d, 0, 0)
# img.show()
img.save('demo-60d-ohlc.png')

In [None]:
# example: 60 day data (ohlc + volume)
subset = df[1000:1060]
img = generate_image(subset, image_height_60d - volume_image_height_60d - gap_height_60d, volume_image_height_60d, gap_height_60d)
# img.show()
img.save('demo-60d-ohlc-volume.png')

In [None]:
# example: 252 day data (ohlc + volume)
subset = df[1000:1252]
img = generate_image(subset, image_height_252d, 0, 0)
# img.show()
img.save('demo-252d-ohlc.png')

In [None]:
# example: 252 day data (ohlc + volume)
subset = df[1000:1252]
img = generate_image(subset, image_height_252d - volume_image_height_252d - gap_height_252d, volume_image_height_252d, gap_height_252d)
# img.show()
img.save('demo-252d-ohlc-volume.png')

In [None]:
# generate images dataset with labels

def feature_engineering(
        dataset: pd.DataFrame, # expect to load the whole dataset (clean data, no null)
        lookback_days: int, # how many days are included in the image (eg: 5, 20, 60, 252)
        return_days: int, # which return value is used for the label (5 or 20 or 60 only)
        ohlc_image_height: int,
        volume_image_height: int, # put 0 if we don't want volume barchart
        gap_height: int, # put 0 if we don't want volume barchart
        label_negitive_margin: float,
        label_positive_margin: float,
        path: str,
    ):
    if return_days not in [5, 20, 60]:
        raise Exception(f'Invalud return_days: {return_days}')
    meta = {
        'id': [],
        'img_data': [],
        'return_value': [],
        'label': []
    }

    # a simple function to map return value to label class
    def label_funt(value: float):
        if value < label_negitive_margin:
            return 0
        if value < 0:
            return 1
        if value < label_positive_margin:
            return 2
        return 3
    
    for i in range(len(dataset)):
        subset = dataset[i:i+lookback_days] # retrieve lookback data
        # break if the subset does not have enough data (almost the end of the raw data)
        if len(subset) != lookback_days:
            print(f'End at i: {i} - len(subset) != lookback_days')
            break
        if np.isnan(subset.iloc[-1][f'return_{return_days}_value']):
            print(f'End at i: {i} - return value nan')
            break
        img = generate_image(subset, ohlc_image_height, volume_image_height, gap_height)
        img.save(path + f'/{i}.png')

        return_value = subset.iloc[-1][f'return_{return_days}_value']
        label = label_funt(float(return_value))
        meta['id'].append(i)
        meta['img_data'].append(np.array(img))
        meta['return_value'].append(return_value)
        meta['label'].append(label)
        
    # print(meta)
    meta_df = pd.DataFrame(meta)
    # print(meta_df)
    meta_df.to_csv(path + "/meta.csv")
    return meta_df

In [None]:
# create demo images and metas 
test_set_raw = df[1500:2500]
path = '../../../images/demo'
Path(path).mkdir(parents=True, exist_ok=True)
test_set = feature_engineering(test_set_raw, 20, 20, image_height_20d - volume_image_height_20d - gap_height_20d, volume_image_height_20d, gap_height_20d, -0.5, 0.5, path)

In [None]:
print(test_set)