In [1]:
import pandas as pd
import numpy as np
import matplotlib.finance as finplt
import matplotlib.pyplot as plt

from PIL import Image
from PIL import ImageFont
from PIL import ImageDraw
from os import listdir
import datetime

from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.utils import to_categorical
from keras.layers.advanced_activations import LeakyReLU

from sklearn.model_selection import train_test_split

%matplotlib inline

Using TensorFlow backend.


In [2]:
df = pd.read_csv(r'dukascopy - EURUSD_Candlestick_4_Hour_BID_31.12.2015-30.12.2016.csv',
                 parse_dates=[0], index_col=0, date_parser=lambda d: pd.datetime.strptime(d[:13], '%d.%m.%Y %H'))

df_window = df.iloc[40:, :]

In [3]:
y = pd.read_csv(r'y candles.csv',
                parse_dates=[0], index_col=0, date_parser=lambda d: pd.datetime.strptime(d[:13], '%Y-%m-%d %H'))

In [4]:
ind = [d in y.index for d in df_window.index]
df_window = df_window.loc[ind]

In [5]:
df_window.tail()

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Gmt time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2016-07-08 00:00:00,1.10634,1.10815,1.10622,1.10814,17745.2401
2016-07-08 04:00:00,1.10816,1.10899,1.10722,1.10764,28404.7499
2016-07-08 08:00:00,1.10764,1.10811,1.10572,1.10627,37149.1504
2016-07-08 12:00:00,1.10626,1.11201,1.10018,1.1036,78897.1592
2016-07-08 16:00:00,1.10361,1.10594,1.10352,1.10515,26419.8901


### Create image files
Don't run this section if images are already there

In [6]:
# Save plots for individual candlesticks in window

plt.rcParams['figure.figsize'] = (0.4, 0.8)

for i in range(len(df_window)):
    try:
        fig, ax = plt.subplots()
        plt.rcParams['figure.figsize'] = (0.4, 0.8)
        to_plot = df_window[i:i+1]
        finplt.candlestick2_ohlc(ax, to_plot.Open, to_plot.High, to_plot.Low, to_plot.Close,
                             width=0.6, colorup='g', colordown='r', alpha=1)
        plt.axis('off')
        plt.savefig('./candles/' + str(df_window.iloc[i].name)[:-6] + 'h.jpg')
        plt.close()
    except:
        continue

In [7]:
# Convert plots to greyscale and Keras-ready

files = listdir('./candles/')
files.sort()

for file in files:
    i = Image.open('./candles/' + file).convert('L')
    j = np.asarray(i.getdata(), dtype=np.float64).reshape((i.size[1], i.size[0]))
    j = np.asarray(j, dtype=np.uint8) #if values still in range 0-255! 
    img = Image.fromarray(j, mode='L')
    img.save('./candles/' + file)

### Let us have a CNN

In [5]:
# Define and preprocess X multi-dimentional array of all images

files = listdir('./candles/')
files.sort()

X = [[] for _ in range(len(files))]
for i, file in enumerate(files):
    X[i].append(np.array(Image.open('./candles/' + file)))

X = np.array(X)
X = X.astype('float32')
X /= 255

X = X.reshape(X.shape[0], 28, 57, 1)
X.shape

(603, 28, 57, 1)

In [6]:
y = to_categorical(y['Category'])

In [7]:
#X_train = X[:35]
#y_train = y[:35]
#X_test = X[35:]
#y_test = y[35:]
X_train = X
y_train = y

In [None]:
# no good? use more advanced CNN model following below

model = Sequential()

model.add(Conv2D(32, (4, 4), activation='relu', input_shape=(28, 57, 1)))
model.add(MaxPooling2D(pool_size=(2,2)))
#model.add(Dropout(0.15))

model.add(Flatten())
model.add(Dense(256, activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(64, activation='relu'))
model.add(Dense(8, activation='softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train, y_train, batch_size=32, epochs=25, verbose=1)

#score = model.evaluate(X_test, y_test, verbose=0)

In [8]:
model = Sequential()

model.add(Conv2D(32, kernel_size=(3, 3), activation='linear', input_shape=(28, 57, 1), padding='same'))
model.add(LeakyReLU(alpha=0.1))
model.add(MaxPooling2D((2, 2), padding='same'))
model.add(Conv2D(64, (3, 3), activation='linear', padding='same'))
model.add(LeakyReLU(alpha=0.1))
model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))
model.add(Conv2D(128, (3, 3), activation='linear', padding='same'))
model.add(LeakyReLU(alpha=0.1))
model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))
model.add(Flatten())
model.add(Dense(128, activation='linear'))
model.add(LeakyReLU(alpha=0.1))
model.add(Dense(8, activation='softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train, y_train, batch_size=32, epochs=25, verbose=1)

Epoch 1/25
Epoch 2/25
Epoch 3/25
Epoch 4/25
Epoch 5/25
Epoch 6/25
Epoch 7/25
Epoch 8/25
Epoch 9/25
Epoch 10/25
Epoch 11/25
Epoch 12/25
Epoch 13/25
Epoch 14/25
Epoch 15/25
Epoch 16/25
Epoch 17/25
Epoch 18/25
Epoch 19/25
Epoch 20/25
Epoch 21/25
Epoch 22/25
Epoch 23/25
Epoch 24/25
Epoch 25/25


<keras.callbacks.History at 0x7fa9b9d15e48>

Categories:
```
0: "No category"
1: "Hammer with body near high",
2: "Hammer with body near low",
3: "Spinning top",
4: "Doji with close near high",
5: "Doji with close near low",
6: "Doji with close near middle",
7: "Marubozu"
```

In [None]:
ey = model.predict(X)
predictions = np.empty(0).astype(int)
tol = 0.45

for i, e in enumerate(ey):
    if e[1] >= tol or e[2] >= tol or e[3] >= tol or e[4] >= tol or e[5] >= tol or e[6] >= tol or e[7] >= tol:
        print(str(i) + ': ' + str(np.apply_along_axis(lambda e: np.round(e, 2), 0, ey[i])))
        predictions = np.append(predictions, i)

In [None]:
for p in predictions:
    print(str(p) + ': ' + str(y[p]))

In [None]:
actuals = []
for i, yy in enumerate(y):
    if yy[1] == 1 or yy[2] == 1 or yy[3] == 1 or yy[4] == 1 or yy[5] == 1 or yy[6] == 1:
        print(str(i) + ': ' + str(yy))
        actuals = np.append(actuals, i)

In [None]:
for i in range(len(actuals)):
    if predictions[i] != actuals[i]:
        print(i)

### For a test split, define bigger X_test set from df, and generate images for them. Predict their shape, and save images with text of what is predicted

In [17]:
df_test = df.iloc[1000:1200, :]

# Save plots for individual candlesticks in window

plt.rcParams['figure.figsize'] = (0.4, 0.8)

for i in range(len(df_test)):
    try:
        fig, ax = plt.subplots()
        plt.rcParams['figure.figsize'] = (0.4, 0.8)
        to_plot = df_test[i:i+1]
        finplt.candlestick2_ohlc(ax, to_plot.Open, to_plot.High, to_plot.Low, to_plot.Close,
                             width=0.6, colorup='g', colordown='r', alpha=1)
        plt.axis('off')
        plt.savefig('./test/' + str(df_test.iloc[i].name)[:-6] + 'h.jpg')
        plt.close()
    except:
        continue
        
test_files = listdir('./test/')
test_files.sort()

# Convert plots to greyscale and Keras-ready

for file in test_files:
    i = Image.open('./test/' + file).convert('L')
    j = np.asarray(i.getdata(), dtype=np.float64).reshape((i.size[1], i.size[0]))
    j = np.asarray(j, dtype=np.uint8) #if values still in range 0-255! 
    img = Image.fromarray(j, mode='L')
    img.save('./test/' + file)
    
# Define and preprocess X multi-dimentional array of all images

X_test = [[] for _ in range(len(test_files))]
for i, file in enumerate(test_files):
    X_test[i].append(np.array(Image.open('./test/' + file)))

X_test = np.array(X_test)
X_test = X_test.astype('float32')
X_test /= 255
X_test = X_test.reshape(X_test.shape[0], 28, 57, 1)
print(X_test.shape)

y_test = model.predict(X_test)

(338, 28, 57, 1)


In [18]:
cat = {1: 'Hammer with body near high', 2: 'Hammer with body near low', 3: 'Spinning top',
       4: 'Doji with close near high', 5: 'Doji with close near low', 6: 'Doji with close near middle',
       7: 'Marubozu', 0: 'No category'}

text = []
for i in range(len(y_test)):
    pred = cat[np.argmax(y_test[i])]
    text = np.append(text, pred)

for f, file in enumerate(test_files):
    i = Image.open('./test/' + file)
    j = Image.fromarray(np.full((200, 193), 255, dtype='uint8'))
    
    basewidth=95
    wpercent = (basewidth/float(i.size[0]))
    hsize = int((float(i.size[1])*float(wpercent)))
    i = i.resize((basewidth,hsize), Image.ANTIALIAS)
    j.paste(i, (0,0))
    
    draw = ImageDraw.Draw(j)
    draw.text((0, 0), text[f], (0), ImageFont.truetype("font.ttf", 14))
    j.save('./test_result/' + file)

In [None]:
for i, e in enumerate(y_test):
    if e[1] >= tol or e[2] >= tol or e[3] >= tol or e[4] >= tol or e[5] >= tol or e[6] >= tol:
        print(str(i) + ': ' + str(np.apply_along_axis(lambda e: np.round(e, 2), 0, y_test[i])))

### Save dat model
For loading, use
```
from keras.models import load_model
model = load_model('my_model.h5')
```

In [19]:
model.save(r'An eye for an eye - a CNN model.h5')  # creates a HDF5 file 'my_model.h5'