# Import

In [None]:

!python3 -m pip install tensorflow==2.13.0rc1
!python3 -m pip install finta
!python3 -m pip install -q --upgrade keras-nlp
!python3 -m pip install numpy
!python3 -m pip install pandas
!python3 -m pip install matplotlib
!python3 -m pip install plotly==5.3.1
!python3 -m pip install opencv-python
!python3 -m pip install -U kaleido
!python3 -m pip install autokeras
!python3 -m pip install keras-tuner

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.utils import CustomObjectScope
from tensorflow.keras.models import load_model
import keras_tuner
import pandas as pd
import time
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from PIL import Image
import io
from finta import TA
import os
import autokeras as ak

np.random.seed(0)
tf.random.set_seed(0)

print(tf.version.VERSION)
print(ak.__version__)
print(keras_tuner.__version__)

## create metrics

In [None]:
def win_rate(y_true, y_pred):

    # convert y_pred to 0 or 1
    y_pred = tf.where(y_pred < 0.5, 0, 1)

    # convert the values to float32
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    correct_count = tf.math.reduce_sum(tf.where(tf.math.logical_and(tf.math.equal(y_true, y_pred), tf.math.not_equal(y_pred, 0)), 1.0, 0.0))
    total_count = tf.math.reduce_sum(tf.where(tf.math.not_equal(y_pred, 0), 1.0, 0.0))
    win_rate = tf.math.divide_no_nan(correct_count, total_count)
    return win_rate

# Evaluate the model

In [None]:
# if the path "evaluation" does not exist, create it
if not os.path.exists("evaluation"):
    os.makedirs("evaluation")

def create_evaluation_folder(path):
    # if "evaluation/path" does not exist, create it
    if not os.path.exists("evaluation/" + path):
        os.makedirs("evaluation/" + path)

def record_stat(pair,stat):
    # create the stat txt file if it does not exist
    if not os.path.exists("evaluation/" + pair + "/stats.txt"):
        with open("evaluation/" + pair + "/stats.txt", "w") as f:
            f.write("")
    # add the stat as a new line in the txt file
    with open("evaluation/" + pair + "/stats.txt", "a") as f:
        f.write(str(stat) + "\n")

def record_graph(pair,graph_name,graph):
    # save the graph as a png
    plt.savefig("evaluation/" + pair + "/" + graph_name + ".png", format='png')

def predicted_wins_and_losses_amount(pair,overall_predictions):
    # the amount of wins and losses predicted
    win_count = np.count_nonzero(overall_predictions == 1)
    loss_count = np.count_nonzero(overall_predictions == 0)
    # record these stats
    record_stat(pair,"amount of wins predicted = " + str(win_count))
    record_stat(pair,"amount of losses predicted = " + str(loss_count))

def accuracy(pair,overall_predictions,results):
    correct_count = np.count_nonzero(overall_predictions == results)
    total_count = len(results)
    accuracy = correct_count / total_count
    record_stat(pair,"accuracy = " + str(accuracy))

def win_rate(pair,overall_predictions,results):
    # calculate the win rate where overall_predictions == 1 and results == 1
    wins = 0
    amount_of_trades = 0
    for i in range(len(overall_predictions)):
        if overall_predictions[i] == 1:
            amount_of_trades += 1
            if results[i] == 1:
                wins += 1
    try:
        win_rate = wins / amount_of_trades
    except:
        win_rate = 0
    record_stat(pair,"win rate = " + str(win_rate))

def accuracy_confidence_level(pair,predictions,overall_predictions,results):
    # confidence level for the accuracy
    win_confidence_tracker = []
    loss_confidence_tracker = []
    for i in range(len(predictions)):
        confidence = ((0.5-predictions[i]) * 2) if overall_predictions[i] == 0 else ((predictions[i]-0.5) * 2)
        if overall_predictions[i] == results[i]:
            win_confidence_tracker.append(confidence)
        else:
            loss_confidence_tracker.append(confidence)
    plt.clf()
    plt.hist(win_confidence_tracker, bins=100, alpha=0.5, label='Correct')
    plt.hist(loss_confidence_tracker, bins=100, alpha=0.5, label='Incorrect')
    plt.legend(loc='upper right')
    record_graph(pair,"model_accuracy_confidence",plt)

def win_rate_confidence_level(pair,predictions,overall_predictions,results):
    # confidence level for the win rate
    win_confidence_tracker = []
    loss_confidence_tracker = []
    # for each prediction, if the prediction is correct, add the confidence to win_confidence_tracker, else add the confidence to loss_confidence_tracker
    for i in range(len(predictions)):
        if overall_predictions[i] != 0:
            confidence = ((0.5-predictions[i]) * 2) if overall_predictions[i] == 0 else ((predictions[i]-0.5) * 2)
            if overall_predictions[i] == results[i]:
                win_confidence_tracker.append(confidence)
            else:
                loss_confidence_tracker.append(confidence)
    plt.clf()
    plt.hist(win_confidence_tracker, bins=100, alpha=0.5, label='Correct')
    plt.hist(loss_confidence_tracker, bins=100, alpha=0.5, label='Incorrect')
    plt.legend(loc='upper right')
    record_graph(pair,"model_winrate_confidence",plt)

def win_rate_confidence_level_with_RR(pair,predictions,overall_predictions,results):
    # confidence level for the win rate including the risk to reward ratio
    win_confidence_tracker = []
    loss_confidence_tracker = []
    # for each prediction, if the prediction is correct, add the confidence to win_confidence_tracker, else add the confidence to loss_confidence_tracker
    for i in range(len(predictions)):
        if overall_predictions[i] != 0:
            confidence = ((0.5-predictions[i]) * 2) if overall_predictions[i] == 0 else ((predictions[i]-0.5) * 2)
            if overall_predictions[i] == results[i]:
                win_confidence_tracker.append(confidence)
                win_confidence_tracker.append(confidence)
            else:
                loss_confidence_tracker.append(confidence)
    plt.clf()
    plt.hist(win_confidence_tracker, bins=100, alpha=0.5, label='Correct')
    plt.hist(loss_confidence_tracker, bins=100, alpha=0.5, label='Incorrect')
    plt.legend(loc='upper right')
    record_graph(pair,"model_winrate_confidence_RR",plt)

minimum_confidence = 0.9
def win_rate_confidence_level_with_RR_over_90_confidence(pair,predictions,overall_predictions,results):
    win_confidence_tracker = []
    loss_confidence_tracker = []
    wins = 0
    amount_of_trades = 0
    # for each prediction, if the prediction is correct, add the confidence to win_confidence_tracker, else add the confidence to loss_confidence_tracker
    for i in range(len(predictions)):
        if overall_predictions[i] != 0:
            confidence = ((0.5-predictions[i]) * 2) if overall_predictions[i] == 0 else ((predictions[i]-0.5) * 2)
            if confidence >= minimum_confidence:
                amount_of_trades += 1
                if overall_predictions[i] == results[i]:
                    win_confidence_tracker.append(confidence)
                    win_confidence_tracker.append(confidence)
                    wins += 1
                else:
                    loss_confidence_tracker.append(confidence)
    plt.clf()
    plt.hist(win_confidence_tracker, bins=100, alpha=0.5, label='Correct')
    plt.hist(loss_confidence_tracker, bins=100, alpha=0.5, label='Incorrect')
    plt.legend(loc='upper right')
    record_graph(pair,"win_rate_confidence_level_with_RR_over_90_confidence",plt)
    try:
        win_rate = wins / amount_of_trades
    except:
        win_rate = 0
    record_stat(pair,"win rate using over 0.9 confidence = " + str(win_rate))

def over_time_balance_graph(pair,overall_predictions,results):
    trade_tracker = [0]
    for i in range(len(predictions)):
        if overall_predictions[i] != 0:
            confidence = ((0.5-predictions[i]) * 2) if overall_predictions[i] == 0 else ((predictions[i]-0.5) * 2)
            if confidence >= minimum_confidence:
                if overall_predictions[i] == results[i]:
                    trade_tracker.append(trade_tracker[-1] + 2)
                else:
                    trade_tracker.append(trade_tracker[-1] - 1)
    plt.clf()
    plt.plot(trade_tracker)
    record_graph(pair,"over_time_balance_graph",plt)
    record_stat(pair,"amount_of_trades = " + str(len(trade_tracker)-1))
    record_stat(pair,"total_oppertunities = " + str(len(predictions)))

def evaluation(pair,predictions,overall_predictions,results):
    create_evaluation_folder(pair)
    predicted_wins_and_losses_amount(pair,overall_predictions)
    accuracy(pair,overall_predictions,results)
    win_rate(pair,overall_predictions,results)
    accuracy_confidence_level(pair,predictions,overall_predictions,results)
    win_rate_confidence_level(pair,predictions,overall_predictions,results)
    win_rate_confidence_level_with_RR(pair,predictions,overall_predictions,results)
    win_rate_confidence_level_with_RR_over_90_confidence(pair,predictions,overall_predictions,results)
    over_time_balance_graph(pair,overall_predictions,results)


# Make predictions on test data

In [None]:
# load the best model
clf = load_model("model/best_model/", custom_objects={"win_rate": win_rate})

# remove .DS_Store from "image_data/validation"
if os.path.exists("image_data/validation/.DS_Store"):
    os.remove("image_data/validation/.DS_Store")

# for each folder in image_data/validation
for folder in os.listdir("image_data/validation"):
    pair = folder
    predictions = []
    results = []
    files = os.listdir("image_data/validation/" + folder + "/win") + os.listdir("image_data/validation/" + folder + "/loss")
    files.sort(key=lambda f: int(''.join(filter(str.isdigit, f))))

    # reduce the amount of files to 10
    # files = files[:10]

    for image in files:
        win = 0
        try:
            image = tf.keras.preprocessing.image.load_img(
                "image_data/validation/" + folder + "/win/" + image, target_size=(200, 50), color_mode="grayscale"
            )
            win = 1
        except:
            image = tf.keras.preprocessing.image.load_img(
                "image_data/validation/" + folder + "/loss/" + image, target_size=(200, 50), color_mode="grayscale"
            )

        image = tf.keras.preprocessing.image.img_to_array(image)
        image = np.expand_dims(image, axis=0)
        prediction = clf.predict(image, verbose=0)
        predictions.append(prediction[0][0])
        results.append(win)
        # print the percentage of images that have been predicted on the same line
        print(str(round(((len(predictions) / len(files))/len(os.listdir("image_data/validation"))) * 100, 2)) + "%", end="\r")
    # convert predictions to np array
    predictions = np.array(predictions)
    # create a list overall_predictions that converts the predictions to 0 or 1
    overall_predictions = np.where(predictions < 0.5, 0, 1)
    # evaluate the model
    evaluation(pair,predictions,overall_predictions,results)