In [1]:
from typing import List, Tuple, Any
from datetime import datetime
import os
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
import gym
from gym import wrappers
import matplotlib.pyplot as plt
from pprint import pprint
from datetime import datetime, timedelta

In [2]:

def get_unix(date: str, time: str) -> int:
    format_date = "%d/%m/%Y"
    format_time = "%H:%M:%S.%f000"
    date_obj = datetime.strptime(date, format_date)
    time_obj = datetime.strptime(time, format_time)
    almost = datetime.combine(date_obj.date(), time_obj.time())
    return int(almost.timestamp())
class Tick:
    def __init__(self, line: str) -> None:
        splitted_line = line.split(',')
        self.date = splitted_line[0]
        self.time = splitted_line[1]
        self.bid = float(splitted_line[2].strip())
        self.ask = float(splitted_line[4].strip())
        self.unix_code = get_unix(splitted_line[0], splitted_line[1])
class Strategy:
    def _init_(self) -> None:
        self.num_strats = 4

    def bounce(self, prices: List[float]) -> bool:
        """
        Check if the average of the last 10 prices is greater than the average of the previous 10 prices.

        Parameters:
        prices (List[float]): List of price values.

        Returns:
        bool: True if the average of the last 10 prices is greater, False otherwise.
        """
        if np.mean(prices[-20:-10]) < np.mean(prices[-10:]):
            return True
        else:
            return False

    def calculate_ema_incremental(self, last_ema: float, new_price: float, alpha: float) -> float:
        """
        Calculate the EMA incrementally given the last EMA, the new price, and alpha.

        Parameters:
        last_ema (float): The last calculated EMA value.
        new_price (float): The new price to include in the EMA calculation.
        alpha (float): The smoothing factor.

        Returns:
        float: The updated EMA value.
        """
        return (new_price * alpha) + (last_ema * (1 - alpha))

    def calculate_macd_incremental(self, prices: List[float], short_period: int, long_period: int, signal_period: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Calculate the MACD and signal line incrementally.

        Parameters:
        prices (np.ndarray): Array of prices.

        Returns:
        Tuple[np.ndarray, np.ndarray]: The MACD values and the signal line values.
        """
        alpha_short = 2 / (short_period + 1)
        alpha_long = 2 / (long_period + 1)
        alpha_signal = 2 / (signal_period + 1)

        short_ema = np.zeros(len(prices))
        long_ema = np.zeros(len(prices))
        macd = np.zeros(len(prices))
        signal_line = np.zeros(len(prices))

        short_ema[0] = prices[0]
        long_ema[0] = prices[0]
        macd[0] = 0
        signal_line[0] = 0

        for i in range(1, len(prices)):
            short_ema[i] = self.calculate_ema_incremental(short_ema[i-1], prices[i], alpha_short)
            long_ema[i] = self.calculate_ema_incremental(long_ema[i-1], prices[i], alpha_long)
            macd[i] = short_ema[i] - long_ema[i]
            signal_line[i] = self.calculate_ema_incremental(signal_line[i-1], macd[i], alpha_signal)

        return short_ema, long_ema, macd, signal_line

    def check_last_macd_crossover(self, price_array: np.ndarray) -> int:
        """
        Check if there is a crossover in the last MACD value and the signal line, and evaluate the strategy.

        Parameters:
        price_array (np.ndarray): Array of prices.

        Returns:
        int: -1 for buy signal, 1 for sell signal, 0 for no action.
        """
        short_period = int(50000 * 2.5 / 12)
        long_period = int(50000 * 2.5 / 26)
        signal_period = int(50000 * 2.5 / 9)

        _, _, macd, signal_line = self.calculate_macd_incremental(price_array, short_period, long_period, signal_period)

        if len(macd) > 1 and np.sign(macd[-2] - signal_line[-2]) != np.sign(macd[-1] - signal_line[-1]):
            if macd[-1] < 0:
                return -1  # Buy signal
            else:
                return 1  # Sell signal
        return 0  # No action
                
def compute_balance(orders_list: List[Any], tick: Tick) -> float:
    open_balance = 0
    for line in orders_list:
        if len(line) > 2:
            side = line[ 0 ]
            number_units = line[1]
            bid_price, ask_price = line[2], line[3]
            last_bid, last_ask = tick.bid, tick.ask
            profit_loss = 0
            if side == -1:
                profit_loss = number_units * last_bid - number_units * ask_price
            elif side == 1:
                profit_loss = number_units * bid_price - number_units * last_ask
            open_balance += profit_loss
    return open_balance * 1
def compute_drawdown(path_to_use: str, orders_list: List[Any], tick: Tick) -> float:
    if len(orders_list) > 1 :
        last_balance = compute_balance(orders_list, tick)
        balances = []
        with open(path_to_use, 'r') as file_reader:
            for line in file_reader:
                balances.append(float(line.split(':')[6]))
        balances.append(last_balance)
        max_drawdown = 0
        drawdown = 0
        peak = balances[0]
        for balance in balances:
            if balance > peak:
                peak = balance
            drawdown = peak - balance
            if drawdown > max_drawdown:
                max_drawdown = drawdown
        return max_drawdown
    else:
        return 0
def Register(path_to_use: str, line: List[Any]) -> None:
    with open(path_to_use, 'a') as file_in_process:
        for value in line:
            file_in_process.writelines(f'{value}:')
        file_in_process.writelines('\n')
 
def file_content(path_to_use: str) -> str:
    if os.path.exists(path_to_use):
        with open(path_to_use, 'r') as file_in_process:
            return file_in_process.read()
    else:
        return ""
def time_signal_detector( current_tick: Tick, previous_tick: Tick, pivot_unix: int ) -> bool:
    current_unix = current_tick.unix_code
    previous_unix = previous_tick.unix_code
    if current_unix <= pivot_unix:
        return True
    elif previous_unix <= pivot_unix < current_unix:
        return False
def State_detector(acumulated_side) -> bool:
    if acumulated_side >= 0 :
          return False
    else :
          return True
time_frames = [7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 
               98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 
               175, 182, 189, 196, 203, 210, 217, 224, 231, 238, 245,
               252, 259, 266, 273, 280, 287, 294, 301, 308, 315, 322,
               329, 336, 343, 350, 357, 364, 371, 378, 385, 392, 399,
               406, 413, 420, 427, 434, 441, 448, 455, 462, 469, 476,
               483, 490, 497, 504, 511, 518, 525, 532, 539, 546, 553,
               560, 567, 574, 581, 588, 595, 602, 609, 616, 623, 630,
               637, 644, 651, 658, 665, 672, 679, 686, 693]
dict_for_tres_hold = {  0.07: 'A',
                        0.08: 'B',
                        0.09: 'C',
                        0.1: 'D',
                        0.15: 'E',
                        0.16: 'F',
                        0.17: 'G',
                        0.18: 'H',
                        0.19: 'I',
                        0.2: 'J',
                        0.25: 'K' }
dict_for_decision =  {  1: 'sell',
                       -1: 'buy'  }
dict_for_time_frames = {}
for i,x in enumerate(time_frames):
    dict_for_time_frames[f'{x}'] = f'Tf_{i}'
def Process_day(
    path_of_current_day: str, 
    day: str, 
    folder_for_results: str, 
    Strat: Strategy,
    sum_side_by_day: int = 0
) -> str:
    list_for_balance = []
    prices = []
    saving_path = 'Ind_' + day
    id = 0

    with open(path_of_current_day, 'r') as current_day:
        current_day_list = current_day.readlines()
        for index in range(len(current_day_list) - 2):
            tick_line = Tick(current_day_list[index + 1])  # Assuming Tick is a class you have defined
            prices.append(tick_line.ask)
            side = Strat.check_last_macd_crossover(np.array(prices))

            sum_side_by_day += side
            if side != 0:
                list_for_balance.append([side, 1, tick_line.bid, tick_line.ask])
                P_L = compute_balance(list_for_balance, tick_line)  # Assuming compute_balance is defined elsewhere
                drawdown = compute_drawdown(saving_path, list_for_balance, tick_line)  # Assuming compute_drawdown is defined elsewhere
                Register(saving_path, [id, dict_for_decision[side], 1, tick_line.bid, tick_line.ask, P_L, drawdown])  # Assuming Register is defined elsewhere
                id += 1

    print(os.path.join(folder_for_results, saving_path))
    if os.path.exists(saving_path):
        with open(os.path.join(folder_for_results, saving_path), 'w') as exit_file:
            exit_file.write(file_content(saving_path))  # Assuming file_content is defined elsewhere

    return saving_path

In [3]:
folder = "E:\\B-Module\\TXTs"
folder_for_results = [ "E:\\B-Module\\results\\Indcts" ]
archives_in_folder = os.listdir( folder )
for archive_name in archives_in_folder:
    if archive_name.endswith('.txt') :
        complete_route = os.path.join(folder, archive_name)
        prices_for_saving =[ ]
        print(f'{complete_route}')
        Strat = Strategy( )
        Process_day( complete_route, archive_name, folder_for_results[0], Strat)
        '''
            for path_of_analized_day, folder_for_results in zip(group_path_of_analized_day, group_folder_for_results ) :
                place_path = os.path.join(folder_for_results, path_of_analized_day)
                with open(place_path, 'w') as archive:
                        archive.write(file_content(path_of_analized_day))
                print(f'The following path has been saved : {place_path}')
                if os.path.exists(path_of_analized_day):
                    os.remove(path_of_analized_day)
                else:
                    pass
        '''

E:\B-Module\TXTs\01-02-2023.txt


In [11]:
import os
from typing import List, Tuple, Any
import numpy as np
from datetime import datetime

def get_unix(date: str, time: str) -> int:
    format_date = "%d/%m/%Y"
    format_time = "%H:%M:%S.%f000"
    date_obj = datetime.strptime(date, format_date)
    time_obj = datetime.strptime(time, format_time)
    almost = datetime.combine(date_obj.date(), time_obj.time())
    return int(almost.timestamp())

class Tick:
    def __init__(self, line: str) -> None:
        splitted_line = line.split(',')
        self.date = splitted_line[0]
        self.time = splitted_line[1]
        self.bid = float(splitted_line[2].strip())
        self.ask = float(splitted_line[4].strip())
        self.unix_code = get_unix(splitted_line[0], splitted_line[1])

class Strategy:
    def __init__(self) -> None:
        self.num_strats = 4

    def bounce(self, prices: List[float]) -> bool:
        if len(prices) < 20:
            return False
        return np.mean(prices[-20:-10]) < np.mean(prices[-10:])

    def calculate_ema_incremental(self, last_ema: float, new_price: float, alpha: float) -> float:
        return (new_price * alpha) + (last_ema * (1 - alpha))

    def calculate_macd_incremental(self, prices: List[float], short_period: int, long_period: int, signal_period: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        alpha_short = 2 / (short_period + 1)
        alpha_long = 2 / (long_period + 1)
        alpha_signal = 2 / (signal_period + 1)

        short_ema = np.zeros(len(prices))
        long_ema = np.zeros(len(prices))
        macd = np.zeros(len(prices))
        signal_line = np.zeros(len(prices))

        short_ema[0] = prices[0]
        long_ema[0] = prices[0]
        macd[0] = 0
        signal_line[0] = 0

        for i in range(1, len(prices)):
            short_ema[i] = self.calculate_ema_incremental(short_ema[i-1], prices[i], alpha_short)
            long_ema[i] = self.calculate_ema_incremental(long_ema[i-1], prices[i], alpha_long)
            macd[i] = short_ema[i] - long_ema[i]
            signal_line[i] = self.calculate_ema_incremental(signal_line[i-1], macd[i], alpha_signal)

        return short_ema, long_ema, macd, signal_line

    def check_last_macd_crossover(self, price_array: np.ndarray) -> int:
        short_period = int(50000 * 2.5 / 12)
        long_period = int(50000 * 2.5 / 26)
        signal_period = int(50000 * 2.5 / 9)

        _, _, macd, signal_line = self.calculate_macd_incremental(price_array, short_period, long_period, signal_period)

        if len(macd) > 1 and np.sign(macd[-2] - signal_line[-2]) != np.sign(macd[-1] - signal_line[-1]):
            if macd[-1] < 0:
                return -1  # Buy signal
            else:
                return 1  # Sell signal
        return 0  # No action

class TickProcessor:
    def __init__(self, strategy: Strategy):
        self.strategy = strategy
        self.prices = []
        self.list_for_balance = []
        self.sum_side_by_day = 0

    def process_tick(self, tick_line: str, id: int, saving_path: str) -> None:
        tick = Tick(tick_line)
        self.prices.append(tick.ask)

        if len(self.prices) < 26:  # Ensure enough data for all indicators
            return

        side = self.strategy.check_last_macd_crossover(np.array(self.prices))

        if side != 0:
            self.sum_side_by_day += side
            self.list_for_balance.append([side, 1, tick.bid, tick.ask])
            P_L = compute_balance(self.list_for_balance, tick)
            drawdown = compute_drawdown(saving_path, self.list_for_balance, tick)
            Register(saving_path, [id, dict_for_decision[side], 1, tick.bid, tick.ask, P_L, drawdown])

def compute_balance(orders_list: List[Any], tick: Tick) -> float:
    open_balance = 0
    for line in orders_list:
        if len(line) > 2:
            side = line[0]
            number_units = line[1]
            bid_price, ask_price = line[2], line[3]
            last_bid, last_ask = tick.bid, tick.ask
            profit_loss = 0
            if side == -1:
                profit_loss = number_units * last_bid - number_units * ask_price
            elif side == 1:
                profit_loss = number_units * bid_price - number_units * last_ask
            open_balance += profit_loss
    return open_balance

def compute_drawdown(path_to_use: str, orders_list: List[Any], tick: Tick) -> float:
    if len(orders_list) > 1:
        last_balance = compute_balance(orders_list, tick)
        balances = []
        with open(path_to_use, 'r') as file_reader:
            for line in file_reader:
                balances.append(float(line.split(':')[6]))
        balances.append(last_balance)
        max_drawdown = 0
        drawdown = 0
        peak = balances[0]
        for balance in balances:
            if balance > peak:
                peak = balance
            drawdown = peak - balance
            if drawdown > max_drawdown:
                max_drawdown = drawdown
        return max_drawdown
    else:
        return 0

def Register(path_to_use: str, line: List[Any]) -> None:
    with open(path_to_use, 'a') as file_in_process:
        for value in line:
            file_in_process.writelines(f'{value}:')
        file_in_process.writelines('\n')

def file_content(path_to_use: str) -> str:
    if os.path.exists(path_to_use):
        with open(path_to_use, 'r') as file_in_process:
            return file_in_process.read()
    else:
        return ""

def Process_day(path_of_current_day: str, day: str, folder_for_results: str, strat: Strategy) -> str:
    saving_path = os.path.join(folder_for_results, f'Ind_{day}')
    processor = TickProcessor(strat)
    id = 0

    with open(path_of_current_day, 'r') as current_day:
        current_day_list = current_day.readlines()
        for index in range(len(current_day_list) - 2):
            tick_line = current_day_list[index + 1]
            processor.process_tick(tick_line, id, saving_path)
            id += 1

    if os.path.exists(saving_path):
        with open(saving_path, 'w') as exit_file:
            exit_file.write(file_content(saving_path))

    return saving_path


In [13]:
folder = "E:\\B-Module\\TXTss"
folder_for_results = [ "E:\\B-Module\\results\\Indcts" ]
archives_in_folder = os.listdir( folder )
for archive_name in archives_in_folder:
    if archive_name.endswith('.txt') :
        complete_route = os.path.join(folder, archive_name)
        prices_for_saving =[ ]
        print(f'{complete_route}')
        Strat = Strategy( )
        Process_day( complete_route, archive_name, folder_for_results[0], Strat)
        '''
            for path_of_analized_day, folder_for_results in zip(group_path_of_analized_day, group_folder_for_results ) :
                place_path = os.path.join(folder_for_results, path_of_analized_day)
                with open(place_path, 'w') as archive:
                        archive.write(file_content(path_of_analized_day))
                print(f'The following path has been saved : {place_path}')
                if os.path.exists(path_of_analized_day):
                    os.remove(path_of_analized_day)
                else:
                    pass
        '''

E:\B-Module\TXTss\01-02-2023.txt


KeyboardInterrupt: 