In [2]:
%run ./creating_arrays.ipynb

D:\KISHORE\Binance-Data-Downloader\data\downloaded_data
D:\KISHORE\Binance-Data-Downloader\data\extracted_data
['1000BONKUSDC', '1000BONKUSDT', '1000BTTCUSDT', '1000FLOKIUSDT', '1000LUNCBUSD', '1000LUNCUSDT', '1000PEPEUSDC', '1000PEPEUSDT', '1000RATSUSDT', '1000SATSUSDT', '1000SHIBBUSD', '1000SHIBUSDC', '1000SHIBUSDT', '1000XECUSDT', '1INCHUSDT', 'AAVEUSDT', 'ACEUSDT', 'ACHUSDT', 'ADABUSD', 'ADAUSDT', 'AEVOUSDT', 'AGIXBUSD', 'AGIXUSDT', 'AGLDUSDT', 'AIUSDT', 'AKROUSDT', 'ALGOUSDT', 'ALICEUSDT', 'ALPACAUSDT', 'ALPHAUSDT', 'ALTUSDT', 'AMBBUSD', 'AMBUSDT', 'ANCBUSD', 'ANCUSDT', 'ANKRUSDT', 'ANTUSDT', 'APEBUSD', 'APEUSDT', 'API3USDT', 'APTBUSD', 'APTUSDT', 'ARBUSDC', 'ARBUSDT', 'ARKMUSDT', 'ARKUSDT', 'ARPAUSDT', 'ARUSDT', 'ASTRUSDT', 'ATAUSDT', 'ATOMUSDT', 'AUCTIONBUSD', 'AUCTIONUSDT', 'AUDIOUSDT', 'AVAXBUSD', 'AVAXUSDC', 'AVAXUSDT', 'AXLUSDT', 'AXSUSDT', 'BADGERUSDT', 'BAKEUSDT', 'BALUSDT', 'BANANAUSDT', 'BANDUSDT', 'BATUSDT', 'BBUSDT', 'BCHUSDC', 'BCHUSDT', 'BEAMXUSDT', 'BELUSDT', 'BICOU

In [2]:
import base64
import bz2
import datetime
import glob
import gzip
import hashlib
import inspect
import json
import os
import pickle
import re
import shutil
import time
import time
import urllib.request
import zipfile
import zlib
from decimal import Decimal, ROUND_DOWN
from pathlib import Path

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import requests
import talib
import talib
from numba import njit, prange
from pandas.api.types import is_numeric_dtype
from tqdm import tqdm


# all tiny modular functions

## download data, concatenate data utils

In [8]:
def download_monthly_data(month_array, symbol, chart_time):
    # Downloading monthly data
    root_dir = Path.cwd()
    # Create the new folder path
    folder_path = Path(download_dir) / f"{symbol}-{chart_time}-monthly_data"
    folder_path.mkdir(parents=True, exist_ok=True)
    count = 0
    month_array.pop()
    for month in month_array:
        # Construct the link
        link = f"{BINANCE_MONTHLY_URL}{symbol}/{chart_time}/{symbol}-{chart_time}-{month}.zip"
        symbol_object = f"{symbol}-{chart_time}-{month}.zip"
        # Create the file path
        file_path = Path(folder_path) / symbol_object
        if not file_path.exists():
            try:
                # Download the file
                urllib.request.urlretrieve(link, file_path)
                count += 1
                # Print which file was downloaded
                print(f"Downloaded monthly data for {symbol}-{chart_time}-{month}.zip")
            except Exception as e:
                print(f"Failed to download {link}: {e}")
                continue
    if count > 0:
        print(f"Monthly Data Downloaded for {symbol},{chart_time}")
    else:
        print(f"You're already up to date for monthly data for {symbol},{chart_time}")


def download_daily_data(day_array, symbol, chart_time):
    # Downloading daily data
    root_dir = Path.cwd()
    # Create the new folder path
    folder_path = Path(download_dir) / f"{symbol}-{chart_time}-daily_data"
    folder_path.mkdir(parents=True, exist_ok=True)
    count = 0
    day_array.pop()
    for day in day_array:
        # Construct the link
        link = f"{BINANCE_DAILY_URL}{symbol}/{chart_time}/{symbol}-{chart_time}-{day}.zip"
        symbol_object = f"{symbol}-{chart_time}-{day}.zip"
        # Create the file path
        file_path = Path(folder_path) / symbol_object
        if not file_path.exists():
            try:
                # Download the file
                urllib.request.urlretrieve(link, file_path)
                count += 1
                # Print which file was downloaded
                print(f"Downloaded daily data for {symbol}-{chart_time}-{day}.zip")
            except Exception as e:
                print(f"Failed to download {link}: {e}")
                continue
    if count > 0:
        print(f"Daily Data Downloaded for {symbol},{chart_time}")
    else:
        print(f"You're already up to date for daily data for {symbol},{chart_time}")


def construct_csv_file_path(folder_path,
                            symbol,
                            chart_time,
                            file,
                            is_daily=False):
    if is_daily:
        # For daily data, use a different pattern
        return os.path.join(
            folder_path,
            f"{symbol}-{chart_time}-{file.split('-')[-3]}-{file.split('-')[-2]}-{file.split('-')[-1][:-4]}.csv"
        )
    else:
        # For monthly data, use the original pattern
        return os.path.join(folder_path,
                            f"{symbol}-{chart_time}{file[-12:-4]}.csv")


def process_zip_folder(folder_path,
                       pattern,
                       new_csv_folder_path,
                       symbol,
                       chart_time,
                       df_list,
                       daily_array=None,
                       is_daily=False):
    if not os.path.exists(folder_path):
        print(f"Folder not found: {folder_path}")
        return df_list

    # Iterate over files in the directory
    for file_name in os.listdir(folder_path):
        # Construct the full path to the item
        item_path = os.path.join(folder_path, file_name)
        
        # Check if the item is a file and matches the pattern
        if os.path.isfile(item_path) and pattern.match(file_name):
            if is_daily and daily_array:
                # Check if any date in the daily_array is in the file_name
                if not any(date in file_name for date in daily_array):
                    continue

            # Extract the ZIP file
            try:
                with zipfile.ZipFile(item_path, 'r') as zip_ref:
                    zip_ref.extractall(new_csv_folder_path)
            except Exception as e:
                print(f"Error extracting file {file_name}: {e}")
                continue

            # Construct the CSV file path using the helper function
            csv_file_path = construct_csv_file_path(new_csv_folder_path,
                                                    symbol, chart_time,
                                                    file_name, is_daily)

            # Read the CSV file into a data frame, ignoring the headers
            try:
                df = pd.read_csv(csv_file_path, header=None)
                # Remove the first row (which contains the header)
                df = df.iloc[1:]
                # Add it to the list
                df_list.append(df)
            except Exception as e:
                print(f"Error reading file {csv_file_path}: {e}")
                continue

    return df_list


def get_correct_headers(new_csv_folder_path):
    possible_headers = [
        'open_time', 'open', 'high', 'low', 'close', 'volume', 'close_time',
        'quote_volume', 'count', 'taker_buy_volume', 'taker_buy_quote_volume',
        'ignore'
    ]

    for file_name in os.listdir(new_csv_folder_path):
        file_path = os.path.join(new_csv_folder_path, file_name)
        try:
            # Read the first row to get headers
            headers = pd.read_csv(file_path, nrows=1).columns.tolist()
            # Check if at least 2 headers match
            matches = [
                header for header in headers if header in possible_headers
            ]
            if len(matches) >= 2:
#                 print(f"Found matching headers in {file_name}: {matches}")
                return headers
        except Exception as e:
            print(f"Error processing file {file_name}: {e}")

    raise ValueError("Could not find matching headers in any CSV files.")


def concatenate_data_frames(df_list, new_csv_folder_path, symbol, chart_time):
    # Get correct headers from a CSV file
    try:
        correct_headers = get_correct_headers(new_csv_folder_path)
    except ValueError as e:
        print(e)
        return "Error finding headers"

    # Concatenate the data frames in the list
    df_final = pd.concat(df_list, ignore_index=True)

    # Check if df_final has headers or not
    if df_final.columns[0] not in correct_headers:
#         print("Updating older CSVs with correct headers.")
        # Update headers for older CSVs that lack them
        for file_name in os.listdir(new_csv_folder_path):
            file_path = os.path.join(new_csv_folder_path, file_name)
            if os.path.isfile(file_path):
                try:
                    df_old = pd.read_csv(file_path, header=None)
                    # Ensure we only update files with correct structure
                    if len(df_old.columns) == len(correct_headers):
                        df_old.columns = correct_headers
                        df_old.to_csv(file_path, index=False)
                except Exception as e:
                    print(f"Error updating file {file_name}: {e}")

    # Set the headers as the column names of the final dataframe
    df_final.columns = correct_headers

    # Convert 'open_time' and 'close_time' columns to datetime
    try:
        df_final['open_time'] = pd.to_datetime(
            df_final['open_time'],
            unit='ms').dt.tz_localize('UTC').dt.tz_convert('Asia/Kolkata')
        df_final['close_time'] = pd.to_datetime(
            df_final['close_time'],
            unit='ms').dt.tz_localize('UTC').dt.tz_convert('Asia/Kolkata')
    except KeyError as e:
        print(f"Column not found for conversion: {e}")
        return "Error in date conversion"

    # Delete the 'ignore' column if it exists
    if 'ignore' in df_final.columns:
        df_final = df_final.drop(['ignore'], axis=1)

    # Add a new column called 'entry' that will take previous close
    df_final['entry'] = df_final['open']

    # Set the file name
    concatenated_file_name = f"{symbol}-{chart_time}.csv"

    # Construct the file path
    concatenated_file_path = os.path.join(new_csv_folder_path,
                                          concatenated_file_name)

    # Write the data frame to the CSV file
    df_final.to_csv(concatenated_file_path, index=False)

    directory_final = Path(
        concatenated_file_path).parent  # Get the parent directory

    # Deleting all the other CSVs except the final concatenated file
    for file_path in directory_final.iterdir():
        if file_path.is_file():
            # Ensure we only delete individual CSVs used for concatenation
            if file_path.name.startswith(
                    f"{symbol}-{chart_time}-") and file_path.name.endswith(
                        '.csv') and file_path.name != concatenated_file_name:
                file_path.unlink()

    return "Data concatenated, individual CSVs deleted"


## calculate indicators util

In [None]:
round_off = 5

def calculate_indicators_using_talib(timeperiods, df):
    def apply_indicator(func, *args, **kwargs):
        return func(*args, **kwargs)

    indicator_columns = [
        ('HT_TRENDLINE', talib.HT_TRENDLINE(df['close'])),
        ('SAR', talib.SAR(df['high'], df['low'])),
        ('SAREXT', talib.SAREXT(df['high'], df['low'])),
        ('T3', talib.T3(df['close'], timeperiod=5)),
        ('APO', np.round(talib.APO(df['close'], fastperiod=12, slowperiod=26), round_off)),
        ('BOP', talib.BOP(df['open'], df['high'], df['low'], df['close'])),
        *zip(['MACD', 'MACD_signal', 'MACD_hist'], talib.MACD(df['close'])),
        ('PPO', np.round(talib.PPO(df['close']), round_off)),
        ('TRIX', talib.TRIX(df['close'])),
        ('ULTOSC', np.round(talib.ULTOSC(df['high'], df['low'], df['close']), round_off)),
        ('WILLR', talib.WILLR(df['high'], df['low'], df['close'])),
        ('AD', np.round(talib.AD(df['high'], df['low'], df['close'], df['volume']), round_off)),
        ('ADOSC', np.round(talib.ADOSC(df['high'], df['low'], df['close'], df['volume']), round_off)),
        ('OBV', np.round(talib.OBV(df['close'], df['volume']), round_off)),
        ('HT_DCPERIOD', np.round(talib.HT_DCPERIOD(df['close']), round_off)),
        ('HT_DCPHASE', np.round(talib.HT_DCPHASE(df['close']), round_off)),
        *zip(['HT_PHASOR_inphase', 'HT_PHASOR_quadrature'], np.round(talib.HT_PHASOR(df['close']), round_off)),
        ('HT_TRENDMODE', talib.HT_TRENDMODE(df['close'])),
        ('AVGPRICE', talib.AVGPRICE(df['open'], df['high'], df['low'], df['close'])),
        ('MEDPRICE', talib.MEDPRICE(df['high'], df['low'])),
        ('TYPPRICE', talib.TYPPRICE(df['high'], df['low'], df['close'])),
        ('WCLPRICE', talib.WCLPRICE(df['high'], df['low'], df['close'])),
        ('TRANGE', talib.TRANGE(df['high'], df['low'], df['close'])),
    ]

    # Add pattern recognition indicators
    pattern_funcs = [getattr(talib, f) for f in dir(talib) if f.startswith('CDL')]
    indicator_columns.extend([
        (f.__name__, apply_indicator(f, df['open'], df['high'], df['low'], df['close']))
        for f in pattern_funcs
    ])

    # Add statistic functions
    stat_funcs = [talib.LINEARREG, talib.LINEARREG_ANGLE, talib.LINEARREG_INTERCEPT, talib.LINEARREG_SLOPE, talib.TSF, talib.VAR]
    indicator_columns.extend([
        (f.__name__, np.round(apply_indicator(f, df['close']), round_off))
        for f in stat_funcs
    ])

    # Add indicators with timeperiods
    timeperiod_funcs = [
        (talib.BBANDS, lambda t: talib.BBANDS(df['close'], timeperiod=t), ['BB_upper_{}', 'BB_middle_{}', 'BB_lower_{}']),
        (talib.DEMA, lambda t: talib.DEMA(df['close'], timeperiod=t), ['DEMA_{}']),
        (talib.EMA, lambda t: talib.EMA(df['close'], timeperiod=t), ['EMA_{}']),
        (talib.KAMA, lambda t: talib.KAMA(df['close'], timeperiod=t), ['KAMA_{}']),
        (talib.MA, lambda t: talib.MA(df['close'], timeperiod=t), ['MA_{}']),
        (talib.MIDPOINT, lambda t: talib.MIDPOINT(df['close'], timeperiod=t), ['MIDPOINT_{}']),
        (talib.MIDPRICE, lambda t: talib.MIDPRICE(df['high'], df['low'], timeperiod=t), ['MIDPRICE_{}']),
        (talib.SMA, lambda t: talib.SMA(df['close'], timeperiod=t), ['SMA_{}']),
        (talib.TEMA, lambda t: talib.TEMA(df['close'], timeperiod=t), ['TEMA_{}']),
        (talib.TRIMA, lambda t: talib.TRIMA(df['close'], timeperiod=t), ['TRIMA_{}']),
        (talib.WMA, lambda t: talib.WMA(df['close'], timeperiod=t), ['WMA_{}']),
        (talib.ADX, lambda t: talib.ADX(df['high'], df['low'], df['close'], timeperiod=t), ['ADX_{}']),
        (talib.ADXR, lambda t: talib.ADXR(df['high'], df['low'], df['close'], timeperiod=t), ['ADXR_{}']),
        (talib.AROON, lambda t: talib.AROON(df['high'], df['low'], timeperiod=t), ['AROON_up_{}', 'AROON_down_{}']),
        (talib.AROONOSC, lambda t: talib.AROONOSC(df['high'], df['low'], timeperiod=t), ['AROONOSC_{}']),
        (talib.CCI, lambda t: talib.CCI(df['high'], df['low'], df['close'], timeperiod=t), ['CCI_{}']),
        (talib.CMO, lambda t: talib.CMO(df['close'], timeperiod=t), ['CMO_{}']),
        (talib.DX, lambda t: talib.DX(df['high'], df['low'], df['close'], timeperiod=t), ['DX_{}']),
        (talib.MFI, lambda t: talib.MFI(df['high'], df['low'], df['close'], df['volume'], timeperiod=t), ['MFI_{}']),
        (talib.MINUS_DI, lambda t: talib.MINUS_DI(df['high'], df['low'], df['close'], timeperiod=t), ['MINUS_DI_{}']),
        (talib.MINUS_DM, lambda t: talib.MINUS_DM(df['high'], df['low'], timeperiod=t), ['MINUS_DM_{}']),
        (talib.MOM, lambda t: talib.MOM(df['close'], timeperiod=t), ['MOM_{}']),
        (talib.PLUS_DI, lambda t: talib.PLUS_DI(df['high'], df['low'], df['close'], timeperiod=t), ['PLUS_DI_{}']),
        (talib.PLUS_DM, lambda t: talib.PLUS_DM(df['high'], df['low'], timeperiod=t), ['PLUS_DM_{}']),
        (talib.ROC, lambda t: talib.ROC(df['close'], timeperiod=t), ['ROC_{}']),
        (talib.ROCP, lambda t: talib.ROCP(df['close'], timeperiod=t), ['ROCP_{}']),
        (talib.ROCR, lambda t: talib.ROCR(df['close'], timeperiod=t), ['ROCR_{}']),
        (talib.ROCR100, lambda t: talib.ROCR100(df['close'], timeperiod=t), ['ROCR100_{}']),
        (talib.RSI, lambda t: talib.RSI(df['close'], timeperiod=t), ['RSI_{}']),
        (talib.ATR, lambda t: talib.ATR(df['high'], df['low'], df['close'], timeperiod=t), ['ATR_{}']),
        (talib.NATR, lambda t: talib.NATR(df['high'], df['low'], df['close'], timeperiod=t), ['NATR_{}']),
        (talib.BETA, lambda t: talib.BETA(df['high'], df['low'], timeperiod=t), ['BETA_{}']),
        (talib.CORREL, lambda t: talib.CORREL(df['high'], df['low'], timeperiod=t), ['CORREL_{}']),
    ]

#     for func, func_with_timeperiod, column_names in timeperiod_funcs:
#         for timeperiod in timeperiods:
#             result = func_with_timeperiod(timeperiod)
#             if isinstance(result, tuple):
#                 for res, col_name in zip(result, column_names):
#                     # Round the result to 4 decimal points
#                     res_rounded = np.round(res, round_off)
#                     indicator_columns.append((col_name.format(timeperiod), res_rounded))
#             else:
#                 # Round the result to 4 decimal points
#                 result_rounded = np.round(result, round_off)
#                 indicator_columns.append((column_names[0].format(timeperiod), result_rounded))

#     # Concatenate the new indicator columns with the original DataFrame
#     df_indicators = pd.concat([pd.DataFrame(data, columns=[name]) for name, data in indicator_columns], axis=1)

#     # Return the original DataFrame with the new indicator columns
#     return pd.concat([df, df_indicators], axis=1)
    
    for func, func_with_timeperiod, column_names in timeperiod_funcs:
        for timeperiod in timeperiods:
            result = func_with_timeperiod(timeperiod)
            if isinstance(result, tuple):
                for res, col_name in zip(result, column_names):
                    # Round the result to 4 decimal points
                    res_rounded = np.round(res, round_off)
                    indicator_columns.append((col_name.format(timeperiod), res_rounded))
            else:
                # Round the result to 4 decimal points
                result_rounded = np.round(result, round_off)
                indicator_columns.append((column_names[0].format(timeperiod), result_rounded))

    df_indicators = pd.concat([pd.DataFrame(data, columns=[name]) for name, data in indicator_columns], axis=1)
    
    return df_indicators
#     # Return the original DataFrame with the new indicator columns
#     return pd.concat([df, df_indicators], axis=1)

In [None]:
def calculate_indicators_using_talib_new(timeperiods, df):
    round_off = 5
    # Convert DataFrame columns to numpy arrays for faster processing
    open_arr = df['open'].values
    high_arr = df['high'].values
    low_arr = df['low'].values
    close_arr = df['close'].values
    volume_arr = df['volume'].values

    # Initialize a dictionary to store all indicator results
    indicators = {}

    # Single-output indicators
    single_indicators = [
        ('HT_TRENDLINE', talib.HT_TRENDLINE, [close_arr]),
        ('SAR', talib.SAR, [high_arr, low_arr]),
        ('SAREXT', talib.SAREXT, [high_arr, low_arr]),
        ('T3', lambda x: talib.T3(x, timeperiod=5), [close_arr]),
        ('APO', lambda x: np.round(talib.APO(x, fastperiod=12, slowperiod=26), round_off), [close_arr]),
        ('BOP', talib.BOP, [open_arr, high_arr, low_arr, close_arr]),
        ('PPO', lambda x: np.round(talib.PPO(x), round_off), [close_arr]),
        ('TRIX', talib.TRIX, [close_arr]),
        ('ULTOSC', lambda x, y, z: np.round(talib.ULTOSC(x, y, z), round_off), [high_arr, low_arr, close_arr]),
        ('WILLR', talib.WILLR, [high_arr, low_arr, close_arr]),
        ('AD', lambda w, x, y, z: np.round(talib.AD(w, x, y, z), round_off), [high_arr, low_arr, close_arr, volume_arr]),
        ('ADOSC', lambda w, x, y, z: np.round(talib.ADOSC(w, x, y, z), round_off), [high_arr, low_arr, close_arr, volume_arr]),
        ('OBV', lambda x, y: np.round(talib.OBV(x, y), round_off), [close_arr, volume_arr]),
        ('HT_DCPERIOD', lambda x: np.round(talib.HT_DCPERIOD(x), round_off), [close_arr]),
        ('HT_DCPHASE', lambda x: np.round(talib.HT_DCPHASE(x), round_off), [close_arr]),
        ('HT_TRENDMODE', talib.HT_TRENDMODE, [close_arr]),
        ('AVGPRICE', talib.AVGPRICE, [open_arr, high_arr, low_arr, close_arr]),
        ('MEDPRICE', talib.MEDPRICE, [high_arr, low_arr]),
        ('TYPPRICE', talib.TYPPRICE, [high_arr, low_arr, close_arr]),
        ('WCLPRICE', talib.WCLPRICE, [high_arr, low_arr, close_arr]),
        ('TRANGE', talib.TRANGE, [high_arr, low_arr, close_arr]),
    ]

    # Calculate single-output indicators with progress bar
    for name, func, args in tqdm(single_indicators, desc="Calculating single-output indicators"):
        indicators[name] = func(*args)

    # Multi-output indicators
    multi_indicators = [
        ('MACD', talib.MACD, [close_arr], ['MACD', 'MACD_signal', 'MACD_hist']),
        ('HT_PHASOR', lambda x: np.round(talib.HT_PHASOR(x), round_off), [close_arr], ['HT_PHASOR_inphase', 'HT_PHASOR_quadrature']),
    ]

    # Calculate multi-output indicators with progress bar
    for base_name, func, args, output_names in tqdm(multi_indicators, desc="Calculating multi-output indicators"):
        results = func(*args)
        for i, name in enumerate(output_names):
            indicators[name] = results[i]

    # Pattern recognition indicators
    pattern_funcs = [getattr(talib, f) for f in dir(talib) if f.startswith('CDL')]
    for func in tqdm(pattern_funcs, desc="Calculating pattern recognition indicators"):
        indicators[func.__name__] = func(open_arr, high_arr, low_arr, close_arr)

    # Statistic functions
    stat_funcs = [talib.LINEARREG, talib.LINEARREG_ANGLE, talib.LINEARREG_INTERCEPT, talib.LINEARREG_SLOPE, talib.TSF, talib.VAR]
    for func in tqdm(stat_funcs, desc="Calculating statistic functions"):
        indicators[func.__name__] = np.round(func(close_arr), round_off)

    # Indicators with timeperiods
    timeperiod_funcs = [
        (talib.BBANDS, lambda t: talib.BBANDS(close_arr, timeperiod=t), ['BB_upper_{}', 'BB_middle_{}', 'BB_lower_{}']),
        (talib.DEMA, lambda t: talib.DEMA(close_arr, timeperiod=t), ['DEMA_{}']),
        (talib.EMA, lambda t: talib.EMA(close_arr, timeperiod=t), ['EMA_{}']),
        (talib.KAMA, lambda t: talib.KAMA(close_arr, timeperiod=t), ['KAMA_{}']),
        (talib.MA, lambda t: talib.MA(close_arr, timeperiod=t), ['MA_{}']),
        (talib.MIDPOINT, lambda t: talib.MIDPOINT(close_arr, timeperiod=t), ['MIDPOINT_{}']),
        (talib.MIDPRICE, lambda t: talib.MIDPRICE(high_arr, low_arr, timeperiod=t), ['MIDPRICE_{}']),
        (talib.SMA, lambda t: talib.SMA(close_arr, timeperiod=t), ['SMA_{}']),
        (talib.TEMA, lambda t: talib.TEMA(close_arr, timeperiod=t), ['TEMA_{}']),
        (talib.TRIMA, lambda t: talib.TRIMA(close_arr, timeperiod=t), ['TRIMA_{}']),
        (talib.WMA, lambda t: talib.WMA(close_arr, timeperiod=t), ['WMA_{}']),
        (talib.ADX, lambda t: talib.ADX(high_arr, low_arr, close_arr, timeperiod=t), ['ADX_{}']),
        (talib.ADXR, lambda t: talib.ADXR(high_arr, low_arr, close_arr, timeperiod=t), ['ADXR_{}']),
        (talib.AROON, lambda t: talib.AROON(high_arr, low_arr, timeperiod=t), ['AROON_up_{}', 'AROON_down_{}']),
        (talib.AROONOSC, lambda t: talib.AROONOSC(high_arr, low_arr, timeperiod=t), ['AROONOSC_{}']),
        (talib.CCI, lambda t: talib.CCI(high_arr, low_arr, close_arr, timeperiod=t), ['CCI_{}']),
        (talib.CMO, lambda t: talib.CMO(close_arr, timeperiod=t), ['CMO_{}']),
        (talib.DX, lambda t: talib.DX(high_arr, low_arr, close_arr, timeperiod=t), ['DX_{}']),
        (talib.MFI, lambda t: talib.MFI(high_arr, low_arr, close_arr, volume_arr, timeperiod=t), ['MFI_{}']),
        (talib.MINUS_DI, lambda t: talib.MINUS_DI(high_arr, low_arr, close_arr, timeperiod=t), ['MINUS_DI_{}']),
        (talib.MINUS_DM, lambda t: talib.MINUS_DM(high_arr, low_arr, timeperiod=t), ['MINUS_DM_{}']),
        (talib.MOM, lambda t: talib.MOM(close_arr, timeperiod=t), ['MOM_{}']),
        (talib.PLUS_DI, lambda t: talib.PLUS_DI(high_arr, low_arr, close_arr, timeperiod=t), ['PLUS_DI_{}']),
        (talib.PLUS_DM, lambda t: talib.PLUS_DM(high_arr, low_arr, timeperiod=t), ['PLUS_DM_{}']),
        (talib.ROC, lambda t: talib.ROC(close_arr, timeperiod=t), ['ROC_{}']),
        (talib.ROCP, lambda t: talib.ROCP(close_arr, timeperiod=t), ['ROCP_{}']),
        (talib.ROCR, lambda t: talib.ROCR(close_arr, timeperiod=t), ['ROCR_{}']),
        (talib.ROCR100, lambda t: talib.ROCR100(close_arr, timeperiod=t), ['ROCR100_{}']),
        (talib.RSI, lambda t: talib.RSI(close_arr, timeperiod=t), ['RSI_{}']),
        (talib.ATR, lambda t: talib.ATR(high_arr, low_arr, close_arr, timeperiod=t), ['ATR_{}']),
        (talib.NATR, lambda t: talib.NATR(high_arr, low_arr, close_arr, timeperiod=t), ['NATR_{}']),
        (talib.BETA, lambda t: talib.BETA(high_arr, low_arr, timeperiod=t), ['BETA_{}']),
        (talib.CORREL, lambda t: talib.CORREL(high_arr, low_arr, timeperiod=t), ['CORREL_{}']),
    ]

    # Calculate indicators with timeperiods
    total_iterations = len(timeperiod_funcs) * len(timeperiods)
    with tqdm(total=total_iterations, desc="Time-based indicators", unit="calculation", dynamic_ncols=True) as pbar:
        for _, func_with_timeperiod, column_names in timeperiod_funcs:
            for timeperiod in timeperiods:
                result = func_with_timeperiod(timeperiod)
                # Prepare results for each column name
                if isinstance(result, tuple):
                    results_to_add = {col_name.format(timeperiod): np.round(res, round_off) for res, col_name in zip(result, column_names)}
                else:
                    results_to_add = {column_names[0].format(timeperiod): np.round(result, round_off)}

                # Update indicators with results
                indicators.update(results_to_add)
                pbar.update(1)

    # Convert the indicators dictionary to a DataFrame
    df_indicators = pd.DataFrame(indicators)

    return df_indicators

In [None]:
# round_off = 5

# def calculate_indicators_using_talib(timeperiods, df):
#     def apply_indicator(func, *args, **kwargs):
#         return func(*args, **kwargs)

#     indicator_columns = [
#         ('HT_TRENDLINE', talib.HT_TRENDLINE(df['close'])),
#         ('SAR', talib.SAR(df['high'], df['low'])),
#         ('SAREXT', talib.SAREXT(df['high'], df['low'])),
#         ('T3', talib.T3(df['close'], timeperiod=5)),
#         ('APO', np.round(talib.APO(df['close'], fastperiod=12, slowperiod=26), round_off)),
#         ('BOP', talib.BOP(df['open'], df['high'], df['low'], df['close'])),
#         *zip(['MACD', 'MACD_signal', 'MACD_hist'], talib.MACD(df['close'])),
#         ('PPO', np.round(talib.PPO(df['close']), round_off)),
#         ('TRIX', talib.TRIX(df['close'])),
#         ('ULTOSC', np.round(talib.ULTOSC(df['high'], df['low'], df['close']), round_off)),
#         ('WILLR', talib.WILLR(df['high'], df['low'], df['close'])),
#         ('AD', np.round(talib.AD(df['high'], df['low'], df['close'], df['volume']), round_off)),
#         ('ADOSC', np.round(talib.ADOSC(df['high'], df['low'], df['close'], df['volume']), round_off)),
#         ('OBV', np.round(talib.OBV(df['close'], df['volume']), round_off)),
#         ('HT_DCPERIOD', np.round(talib.HT_DCPERIOD(df['close']), round_off)),
#         ('HT_DCPHASE', np.round(talib.HT_DCPHASE(df['close']), round_off)),
#         *zip(['HT_PHASOR_inphase', 'HT_PHASOR_quadrature'], np.round(talib.HT_PHASOR(df['close']), round_off)),
#         ('HT_TRENDMODE', talib.HT_TRENDMODE(df['close'])),
#         ('AVGPRICE', talib.AVGPRICE(df['open'], df['high'], df['low'], df['close'])),
#         ('MEDPRICE', talib.MEDPRICE(df['high'], df['low'])),
#         ('TYPPRICE', talib.TYPPRICE(df['high'], df['low'], df['close'])),
#         ('WCLPRICE', talib.WCLPRICE(df['high'], df['low'], df['close'])),
#         ('TRANGE', talib.TRANGE(df['high'], df['low'], df['close'])),
#     ]

#     # Add pattern recognition indicators
#     pattern_funcs = [getattr(talib, f) for f in dir(talib) if f.startswith('CDL')]
#     indicator_columns.extend([
#         (f.__name__, apply_indicator(f, df['open'], df['high'], df['low'], df['close']))
#         for f in pattern_funcs
#     ])

#     # Add statistic functions
#     stat_funcs = [talib.LINEARREG, talib.LINEARREG_ANGLE, talib.LINEARREG_INTERCEPT, talib.LINEARREG_SLOPE, talib.TSF, talib.VAR]
#     indicator_columns.extend([
#         (f.__name__, np.round(apply_indicator(f, df['close']), round_off))
#         for f in stat_funcs
#     ])

#     # Add indicators with timeperiods
#     timeperiod_funcs = [
#         (talib.BBANDS, lambda t: talib.BBANDS(df['close'], timeperiod=t), ['BB_upper_{}', 'BB_middle_{}', 'BB_lower_{}']),
#         (talib.DEMA, lambda t: talib.DEMA(df['close'], timeperiod=t), ['DEMA_{}']),
#         (talib.EMA, lambda t: talib.EMA(df['close'], timeperiod=t), ['EMA_{}']),
#         (talib.KAMA, lambda t: talib.KAMA(df['close'], timeperiod=t), ['KAMA_{}']),
#         (talib.MA, lambda t: talib.MA(df['close'], timeperiod=t), ['MA_{}']),
#         (talib.MIDPOINT, lambda t: talib.MIDPOINT(df['close'], timeperiod=t), ['MIDPOINT_{}']),
#         (talib.MIDPRICE, lambda t: talib.MIDPRICE(df['high'], df['low'], timeperiod=t), ['MIDPRICE_{}']),
#         (talib.SMA, lambda t: talib.SMA(df['close'], timeperiod=t), ['SMA_{}']),
#         (talib.TEMA, lambda t: talib.TEMA(df['close'], timeperiod=t), ['TEMA_{}']),
#         (talib.TRIMA, lambda t: talib.TRIMA(df['close'], timeperiod=t), ['TRIMA_{}']),
#         (talib.WMA, lambda t: talib.WMA(df['close'], timeperiod=t), ['WMA_{}']),
#         (talib.ADX, lambda t: talib.ADX(df['high'], df['low'], df['close'], timeperiod=t), ['ADX_{}']),
#         (talib.ADXR, lambda t: talib.ADXR(df['high'], df['low'], df['close'], timeperiod=t), ['ADXR_{}']),
#         (talib.AROON, lambda t: talib.AROON(df['high'], df['low'], timeperiod=t), ['AROON_up_{}', 'AROON_down_{}']),
#         (talib.AROONOSC, lambda t: talib.AROONOSC(df['high'], df['low'], timeperiod=t), ['AROONOSC_{}']),
#         (talib.CCI, lambda t: talib.CCI(df['high'], df['low'], df['close'], timeperiod=t), ['CCI_{}']),
#         (talib.CMO, lambda t: talib.CMO(df['close'], timeperiod=t), ['CMO_{}']),
#         (talib.DX, lambda t: talib.DX(df['high'], df['low'], df['close'], timeperiod=t), ['DX_{}']),
#         (talib.MFI, lambda t: talib.MFI(df['high'], df['low'], df['close'], df['volume'], timeperiod=t), ['MFI_{}']),
#         (talib.MINUS_DI, lambda t: talib.MINUS_DI(df['high'], df['low'], df['close'], timeperiod=t), ['MINUS_DI_{}']),
#         (talib.MINUS_DM, lambda t: talib.MINUS_DM(df['high'], df['low'], timeperiod=t), ['MINUS_DM_{}']),
#         (talib.MOM, lambda t: talib.MOM(df['close'], timeperiod=t), ['MOM_{}']),
#         (talib.PLUS_DI, lambda t: talib.PLUS_DI(df['high'], df['low'], df['close'], timeperiod=t), ['PLUS_DI_{}']),
#         (talib.PLUS_DM, lambda t: talib.PLUS_DM(df['high'], df['low'], timeperiod=t), ['PLUS_DM_{}']),
#         (talib.ROC, lambda t: talib.ROC(df['close'], timeperiod=t), ['ROC_{}']),
#         (talib.ROCP, lambda t: talib.ROCP(df['close'], timeperiod=t), ['ROCP_{}']),
#         (talib.ROCR, lambda t: talib.ROCR(df['close'], timeperiod=t), ['ROCR_{}']),
#         (talib.ROCR100, lambda t: talib.ROCR100(df['close'], timeperiod=t), ['ROCR100_{}']),
#         (talib.RSI, lambda t: talib.RSI(df['close'], timeperiod=t), ['RSI_{}']),
#         (talib.ATR, lambda t: talib.ATR(df['high'], df['low'], df['close'], timeperiod=t), ['ATR_{}']),
#         (talib.NATR, lambda t: talib.NATR(df['high'], df['low'], df['close'], timeperiod=t), ['NATR_{}']),
#         (talib.BETA, lambda t: talib.BETA(df['high'], df['low'], timeperiod=t), ['BETA_{}']),
#         (talib.CORREL, lambda t: talib.CORREL(df['high'], df['low'], timeperiod=t), ['CORREL_{}']),
#     ]

#     # Initialize the progress bar
#     total_steps = len(pattern_funcs) + len(stat_funcs) + len(timeperiod_funcs) * len(timeperiods)
#     with tqdm(total=total_steps, desc="Calculating Indicators") as pbar:

#         # Pattern recognition indicators
#         for f in pattern_funcs:
#             indicator_columns.append((f.__name__, apply_indicator(f, df['open'], df['high'], df['low'], df['close'])))
#             pbar.update(1)  # Update progress bar for each function

#         # Statistic functions
#         for f in stat_funcs:
#             result = np.round(apply_indicator(f, df['close']), round_off)
#             indicator_columns.append((f.__name__, result))
#             pbar.update(1)  # Update progress bar for each function

#         # Timeperiod functions
#         for func, func_with_timeperiod, column_names in timeperiod_funcs:
#             for timeperiod in timeperiods:
#                 result = func_with_timeperiod(timeperiod)
#                 if isinstance(result, tuple):
#                     for res, col_name in zip(result, column_names):
#                         res_rounded = np.round(res, round_off)
#                         indicator_columns.append((col_name.format(timeperiod), res_rounded))
#                 else:
#                     result_rounded = np.round(result, round_off)
#                     indicator_columns.append((column_names[0].format(timeperiod), result_rounded))
#                 pbar.update(1)  # Update progress bar for each time period

#     # Concatenate the new indicator columns with the original DataFrame
#     df_indicators = pd.concat([pd.DataFrame(data, columns=[name]) for name, data in indicator_columns], axis=1)
    
#     return df_indicators
# #     # Return the original DataFrame with the new indicator columns
# #     return pd.concat([df, df_indicators], axis=1)



# new download and concatenate data but with modularity

In [9]:
def download_data_and_concatenate(master_dictionary, month_array, day_array):
    for symbol in master_dictionary["symbols"]:
        for chart_time in master_dictionary["chart_times"]:
            print(f"Setting up things for {symbol}, {chart_time}")

            # Set up an empty list for the data frames
            df_list = []

            # Compile the regular expression pattern
            pattern = re.compile(rf"^{symbol}-{chart_time}-\d{{4}}-\d{{2}}\.zip$")

            # Compile the regular expression pattern for daily zip files
            pattern_daily = re.compile(
                rf"^{symbol}-{chart_time}-\d{{4}}-\d{{2}}-\d{{2}}\.zip$")

            # Create the new folder path for daily ZIP files
            new_daily_zip_folder_path = os.path.join(
                download_dir, f"{symbol}-{chart_time}-daily_data")

            # Create the new folder path for ZIP files
            new_monthly_zip_folder_path = os.path.join(
                download_dir, f"{symbol}-{chart_time}-monthly_data")

            # Create the new folder path for CSV files
            new_csv_folder_path = os.path.join(output_dir,
                                               f"{symbol}-{chart_time}")

            # Set the file name
            concatenated_file_name = f"{symbol}-{chart_time}.csv"

            # Construct the file path
            concatenated_file_path = os.path.join(new_csv_folder_path,
                                                  concatenated_file_name)

            download_monthly_data(month_array, symbol, chart_time)  
            download_daily_data(day_array, symbol, chart_time)  

            # Process the monthly ZIP folder and add to df_list
            df_list = process_zip_folder(
                new_monthly_zip_folder_path, 
                pattern, 
                new_csv_folder_path, 
                symbol, 
                chart_time, 
                df_list,
                day_array,
            )

            # Process the daily ZIP folder and add to df_list
            df_list = process_zip_folder(
                new_daily_zip_folder_path, 
                pattern_daily, 
                new_csv_folder_path, 
                symbol, 
                chart_time, 
                df_list, 
                day_array,  
                is_daily=True
            )

            # Call the function to concatenate and process the data frames
            print(concatenate_data_frames(df_list, new_csv_folder_path, symbol, chart_time))
    return "Data downloaded and concatenated"


In [1]:
def calculate_wins_losses(master_dictionary, win_perc=0.73, loss_perc=0.4, lookahead_window=10000):
    for symbol in master_dictionary["symbols"]:
        for chart_time in master_dictionary["chart_times"]:
            try:
                print(f"Calculating for {symbol} {chart_time}, {win_perc} : {loss_perc}, with a lookahead window of {lookahead_window}")

                # Define the directory for processed data
                processed_data_dir = Path(output_dir) / f"{symbol}-{chart_time}/processed_data"
                processed_data_dir.mkdir(parents=True, exist_ok=True)

                # Construct the file names
                og_file_name = f"{symbol}-{chart_time}.csv"
                og_file_path = Path(output_dir) / f"{symbol}-{chart_time}/{og_file_name}"
                new_file_name = f"{symbol}-{chart_time}_W{win_perc}_L{loss_perc}_{lookahead_window}cdls.csv"
                new_file_path = processed_data_dir / new_file_name

                # Read the CSV file into a dataframe
                df = pd.read_csv(og_file_path, usecols=['entry', 'high', 'low', 'open', 'close', 'open_time'])

                # Initialize new columns
                df["if_short"] = np.nan
                df["if_long"] = np.nan
                df["long_target"] = df["entry"] * (1 + win_perc / 100)
                df["short_target"] = df["entry"] * (1 - win_perc / 100)
                df["long_stop_loss"] = df["entry"] * (1 - loss_perc / 100)
                df["short_stop_loss"] = df["entry"] * (1 + loss_perc / 100)
                df["shorts_win_after"] = np.nan
                df["longs_win_after"] = np.nan

                # Convert DataFrame columns to numpy arrays for faster processing
                highs = df['high'].values
                lows = df['low'].values
                long_targets = df['long_target'].values
                short_targets = df['short_target'].values
                long_stop_losses = df['long_stop_loss'].values
                short_stop_losses = df['short_stop_loss'].values

                # Prepare result arrays
                if_long_results = np.full(len(df), np.nan)
                if_short_results = np.full(len(df), np.nan)
                longs_win_after = np.full(len(df), np.nan)
                shorts_win_after = np.full(len(df), np.nan)

                # Process each entry row by row
                for i in tqdm(range(len(df)), desc="Processing Rows", unit='row'):
                    # Define the lookahead window range
                    lookahead_end = min(i + lookahead_window, len(df))
                    
                    # Slice the future highs and lows from current index onwards, respecting the lookahead window
                    future_highs = highs[i:lookahead_end]
                    future_lows = lows[i:lookahead_end]

                    # Early stopping for performance improvement
                    long_hit_idx = np.argmax(future_highs >= long_targets[i]) if np.any(future_highs >= long_targets[i]) else np.nan
                    long_stop_idx = np.argmax(future_lows <= long_stop_losses[i]) if np.any(future_lows <= long_stop_losses[i]) else np.nan

                    short_hit_idx = np.argmax(future_lows <= short_targets[i]) if np.any(future_lows <= short_targets[i]) else np.nan
                    short_stop_idx = np.argmax(future_highs >= short_stop_losses[i]) if np.any(future_highs >= short_stop_losses[i]) else np.nan

                    # Long trade logic
                    if not np.isnan(long_hit_idx) and (np.isnan(long_stop_idx) or long_hit_idx < long_stop_idx):
                        if_long_results[i] = 1
                        longs_win_after[i] = long_hit_idx
                    elif not np.isnan(long_stop_idx):
                        if_long_results[i] = -1
                    else:
                        if_long_results[i] = -1  # No result, mark as loss (-1)

                    # Short trade logic
                    if not np.isnan(short_hit_idx) and (np.isnan(short_stop_idx) or short_hit_idx < short_stop_idx):
                        if_short_results[i] = 1
                        shorts_win_after[i] = short_hit_idx
                    elif not np.isnan(short_stop_idx):
                        if_short_results[i] = -1
                    else:
                        if_short_results[i] = -1  # No result, mark as loss (-1)

                # Update DataFrame with results
                df['if_short'] = if_short_results
                df['if_long'] = if_long_results
                df['shorts_win_after'] = shorts_win_after
                df['longs_win_after'] = longs_win_after

                # Save the processed data
                df.to_csv(new_file_path, index=False)
                print(f"Processed file saved as {new_file_name}")

            except Exception as e:
                print(f"Error processing {symbol} {chart_time}: {e}")


In [2]:
def calculate_indicator_values(master_dictionary, win_perc=0.73, loss_perc=0.4):
    # Iterate over the symbols and chart times
    for symbol in master_dictionary["symbols"]:
        for chart_time in master_dictionary["chart_times"]:
            # Define the directory for input data
            data_dir = Path(output_dir) / f"{symbol}-{chart_time}"
            
            # Construct the file name and path
            file_name = f"{symbol}-{chart_time}.csv"
            file_path = data_dir / file_name
            
            if not file_path.exists():
                print(f"File path for {file_name} doesn't exist. Skipping.")
                continue
            
            # Read the CSV file into a dataframe
            df = pd.read_csv(file_path)
            print(df.dtypes)
            
            # Calculate indicators using TA-Lib
            new_columns = calculate_indicators_using_talib(master_dictionary["timeperiods"], df)
            
            # Save the updated dataframe to the CSV file
            df = pd.concat([df, new_columns], axis=1)
            new_file_name = f"{symbol}-{chart_time}_indicators.csv"
            new_file_path = data_dir / new_file_name
            df.to_csv(new_file_path, index=False)

    return "Indicators are added to the CSV"

In [None]:
def optimized_file_writing(df, file_path, chunk_size=20000):
    # Convert Path object to string or use file_path.suffix
    file_extension = file_path.suffix

    if file_extension == '.parquet':
        table = pa.Table.from_pandas(df)
        pq.write_table(table, str(file_path), compression='snappy')
    elif file_extension == '.csv':
        total_rows = len(df)
        with tqdm(total=total_rows, desc="Writing file") as pbar:
            for i in range(0, total_rows, chunk_size):
                chunk = df.iloc[i:i+chunk_size]
                mode = 'w' if i == 0 else 'a'
                chunk.to_csv(str(file_path), mode=mode, header=(i == 0), index=False)
                pbar.update(len(chunk))
    else:
        raise ValueError("Unsupported file format. Use .parquet or .csv")

def calculate_indicator_values_new(master_dictionary, win_perc=0.73, loss_perc=0.4, use_parquet=False):
    # Iterate over the symbols and chart times
    for symbol in tqdm(master_dictionary["symbols"], desc="Processing symbols"):
        for chart_time in tqdm(master_dictionary["chart_times"], desc=f"Processing chart times for {symbol}", leave=False):
            # Define the directory for input data
            data_dir = Path(output_dir) / f"{symbol}-{chart_time}"
            
            # Construct the file name and path
            file_name = f"{symbol}-{chart_time}.csv"
            file_path = data_dir / file_name
            
            if not file_path.exists():
                print(f"File path for {file_name} doesn't exist. Skipping.")
                continue
            
            # Read the CSV file into a dataframe
            df = pd.read_csv(file_path)
            print(f"Data types for {symbol}-{chart_time}:")
            print(df.dtypes)
            
            # Calculate indicators using TA-Lib
            print(f"Calculating indicators for {symbol}-{chart_time}")
            new_columns = calculate_indicators_using_talib_new(master_dictionary["timeperiods"], df)
            
            # Combine original dataframe with new indicators
            df_with_indicators = pd.concat([df, new_columns], axis=1)
            
            # Save the updated dataframe
            if use_parquet:
                new_file_name = f"{symbol}-{chart_time}_indicators.parquet"
            else:
                new_file_name = f"{symbol}-{chart_time}_indicators.csv"
            
            new_file_path = data_dir / new_file_name
            
            print(f"Writing data for {symbol}-{chart_time} to {new_file_name}")
            optimized_file_writing(df_with_indicators, new_file_path)
            
            print(f"Completed processing {symbol}-{chart_time}")

    return "Indicators are added and saved to files"


In [None]:
def read_data_between_dates(master_dictionary, start_date, end_date, data_path):
    # Iterate over the symbols and chart times
    for symbol in master_dictionary["symbols"]:
        for chart_time in master_dictionary["chart_times"]:
            print(f"displaying filtered for {symbol}-{chart_time}")
            return filter_data_for_symbol_charttime(symbol, chart_time, start_date, end_date, data_path)

def filter_data_for_symbol_charttime(symbol, chart_time, start_date, end_date, data_path):
    # Read the CSV file into a data frame
    df = pd.read_csv(Path(data_path) / f"{symbol}-{chart_time}" / f"{symbol}-{chart_time}.csv")
    
    # Convert 'open_time' to datetime format
    df['open_time'] = pd.to_datetime(df['open_time']).dt.date
    
    # Convert start_date and end_date to datetime objects
    start_date = pd.to_datetime(start_date, format='%Y%m%d').date()
    end_date = pd.to_datetime(end_date, format='%Y%m%d').date()

    # Filter data between start_date and end_date based on the date part of 'open_time'
    df_filtered = df[(df['open_time'] >= start_date) & (df['open_time'] <= end_date)]

    return df_filtered


#sample function call: read_data_between_dates(master_dictionary, "20231206", "20231206", output_dir)
# output_dir is getting initialized when we run the creating_arrays notebook

In [None]:
print("data creation utilities successfully initialized")
