# Detecting EMA cross traps by using NEAT

### Import Library

In [1]:
import numpy as np
import pandas as pd
import numpy as np
import pandas_ta as ta
import seaborn as sns
import os

import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [12, 6]
plt.rcParams['figure.dpi'] = 120
import warnings
warnings.filterwarnings('ignore')

In [2]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import neat

### Load Price Data

In [3]:
import os
from pathlib import Path
notebook_path = os.getcwd()
current_dir = Path(notebook_path)
csv_file = str(current_dir.parent) + '/VN30F1M_5minutes.csv'
is_file = os.path.isfile(csv_file)
if is_file:
    dataset = pd.read_csv(csv_file, index_col='Date', parse_dates=True)
else:
    print(csv_file)
    print('remote')
    dataset = pd.read_csv("https://raw.githubusercontent.com/zuongthaotn/vn-stock-data/main/VN30ps/VN30F1M_5minutes.csv", index_col='Date', parse_dates=True)

In [4]:
data = dataset.copy()

In [5]:
data = data[data.index > '2020-11-01 00:00:00']

In [6]:
data["ema_fast"] = ta.ema(data["Close"], length=20)
data["ema_low"] = ta.ema(data["Close"], length=250)
data["ema_cross"] = ((data["ema_fast"] > data["ema_low"]) & (data["ema_fast"].shift(1) <= data["ema_low"].shift(1)) | (data["ema_fast"] < data["ema_low"]) & (data["ema_fast"].shift(1) >= data["ema_low"].shift(1)))

## Calculate some common features

In [7]:
data["ATR"] = ta.atr(data["High"], data["Low"], data["Close"], length=14)  # Volatility
data["RSI"] = ta.rsi(data["Close"], length=14)  # Momentum indicator

## TRAP labeling

In [8]:
def is_trap(r):
    trap = ''
    if r['ema_cross'] == True:
        if r['ema_fast'] > r['ema_low']:
            # Cross up
            if r['min_low_1dlater'] < r['Close'] - 3.5:
                trap = 1
            else:
                trap = 0
        else:
            # Cross down
            if r['max_high_1dlater'] > r['Close'] + 3.5:
                trap = 1
            else:
                trap = 0
    return trap

In [9]:
data['max_high_1dlater'] = data['High'].shift(-51).rolling(51).max()
data['min_low_1dlater'] = data['Low'].shift(-51).rolling(51).min()
data['trap'] = data.apply(lambda r: is_trap(r), axis=1)

In [10]:
# cross_data = data[data.ema_cross == True]
# len(cross_data[cross_data.trap == 0]) / len(cross_data['trap'])

## Features

In [11]:
data['max_5'] = data['High'].shift(1).rolling(5).max()
data['max_10'] = data['High'].shift(1).rolling(10).max()
data['max_20'] = data['High'].shift(1).rolling(20).max()
data['max_50'] = data['High'].shift(1).rolling(50).max()
data['min_5'] = data['Low'].shift(1).rolling(5).min()
data['min_10'] = data['Low'].shift(1).rolling(10).min()
data['min_20'] = data['Low'].shift(1).rolling(20).min()
data['min_50'] = data['Low'].shift(1).rolling(50).min()
cross_up_data = data[(data.ema_cross == True) & (data.ema_fast > data.ema_low)]
cross_up_data.dropna(inplace=True)

In [12]:
len(cross_up_data)

181

In [13]:
X = cross_up_data[['max_5', 'max_10', 'max_20', 'max_50', 'min_5', 'min_10', 'min_20', 'min_50', 'Close', "trap"]]

# Train-Test Split
X_train, X_test = train_test_split(X, test_size=0.25, random_state=42)

In [14]:
len(X_train)

135

In [15]:
len(X_test)

46

In [16]:
def eval_genomes(genomes, config):
    for genome_id, genome in genomes:
        genome.fitness = 4.0
        net = neat.nn.FeedForwardNetwork.create(genome, config)
        for move_index, row in X_train.iterrows():
            inputs = [row['max_5'], row['max_10'], row['max_20'], row['max_50'], row['min_5'], row["min_10"], row["min_20"], row["min_50"], row["Close"]]
            expected_output = row['trap']
            output = net.activate(inputs)
            genome.fitness -= (output[0] - expected_output) ** 2


def run(config_file):
    # Load configuration.
    config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                         neat.DefaultSpeciesSet, neat.DefaultStagnation,
                         config_file)

    # Create the population, which is the top-level object for a NEAT run.
    p = neat.Population(config)

    # Add a stdout reporter to show progress in the terminal.
    # p.add_reporter(neat.StdOutReporter(True))
    # stats = neat.StatisticsReporter()
    # p.add_reporter(stats)

    # Run for up to 100 generations.
    winner = p.run(eval_genomes, 100)

    # Display the winning genome.
    print('\nBest genome:\n{!s}'.format(winner))
    return neat.nn.FeedForwardNetwork.create(winner, config)

In [17]:
%%time
config_path = os.path.join(current_dir, 'style-mix-1.cfg')
best_brain = run(config_path)


Best genome:
Key: 11818
Fitness: -26.636905284468224
Nodes:
	0 DefaultNodeGene(key=0, bias=0.1251539056674364, response=1.0, activation=sigmoid, aggregation=sum)
	1170 DefaultNodeGene(key=1170, bias=3.579639197725395, response=1.0, activation=sigmoid, aggregation=sum)
	1825 DefaultNodeGene(key=1825, bias=-1.3072526637417603, response=1.0, activation=sigmoid, aggregation=sum)
	1849 DefaultNodeGene(key=1849, bias=-2.105861080798479, response=1.0, activation=sigmoid, aggregation=sum)
Connections:
	DefaultConnectionGene(key=(-6, 0), weight=-1.6271834508356111, enabled=False)
	DefaultConnectionGene(key=(-6, 1170), weight=-0.0027919498146826704, enabled=True)
	DefaultConnectionGene(key=(-2, 1849), weight=0.48518399138702295, enabled=True)
	DefaultConnectionGene(key=(1170, 0), weight=1.3287850027837258, enabled=False)
	DefaultConnectionGene(key=(1170, 1825), weight=-0.6497542247250939, enabled=True)
	DefaultConnectionGene(key=(1825, 0), weight=0.03571812400749974, enabled=True)
	DefaultConne

In [18]:
best_brain

<neat.nn.feed_forward.FeedForwardNetwork at 0x75337f832ed0>

In [19]:
# Show output of the most fit genome against training data.
outputs = []
for i, row in X_test.iterrows():
    inputs = [row['max_5'], row['max_10'], row['max_20'], row['max_50'], row['min_5'], row["min_10"], row["min_20"], row["min_50"], row["Close"]]
    expected_output = row['trap']
    output = best_brain.activate(inputs)
    outputs.append(round(output[0]))
    # print("input {!r}, expected output {!r}, got {!r}".format(inputs, expected_output, output))

In [20]:
expected_outputs = X_test['trap'].to_list()
# Evaluate Performance
print("Accuracy:", accuracy_score(expected_outputs, outputs))

Accuracy: 0.7391304347826086
