In [29]:
import pandas as pd
import numpy as np
import re
import os
import pdb
import dask
from datetime import datetime
import dask.dataframe as dd

In [2]:
dask.config.set(scheduler="processes")

@dask.delayed
def load_TRTH_trade(filename,
             tz_exchange="America/New_York",
             only_non_special_trades=True,
             only_regular_trading_hours=True,
             open_time="09:30:00",
             close_time="16:00:00",
             merge_sub_trades=True,
             drop_columns = True ):
    try:
        if re.search('(csv|csv\\.gz)$',filename):
            DF = pd.read_csv(filename)
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search('parquet$',filename):
            DF = pd.read_parquet(filename)

    except Exception as e:
     #   print("load_TRTH_trade could not load "+filename)
     #   print(e)
        return None
    
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")
        print(e)
        return None

    
    if DF.shape[0]==0:
        return None
    
    if only_non_special_trades:
        DF = DF[DF["trade-stringflag"]=="uncategorized"]
    if drop_columns==True: 
        DF.drop(columns=["trade-rawflag","trade-stringflag"],axis=1,inplace=True)
    
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    
    if only_regular_trading_hours:
        DF=DF.between_time(open_time,close_time)    # warning: ever heard e.g. about Thanksgivings?
    
    if merge_sub_trades:
           DF=DF.groupby(DF.index).agg(trade_price=pd.NamedAgg(column='trade-price', aggfunc='mean'),
                                       trade_volume=pd.NamedAgg(column='trade-volume', aggfunc='sum'))
    
    return DF



@dask.delayed
def load_TRTH_bbo(filename,
             tz_exchange="America/New_York",
             only_regular_trading_hours=True,
             merge_sub_trades=True):
    try:
        if re.search(r'(csv|csv\.gz)$',filename):
            DF = pd.read_csv(filename)
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search(r'parquet$',filename):
            DF = pd.read_parquet(filename) 
    except Exception as e:
       # print("load_TRTH_bbo could not load "+filename)
        return None
    
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")


In [22]:
def extract_files(base_dir, ticker, start_date, end_date):
    # Convert start and end dates to datetime objects for comparison
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.strptime(end_date, "%Y-%m-%d")

    # Paths to search
    paths_to_search = [os.path.join(base_dir, ticker, 'bbo'), os.path.join(base_dir, ticker, 'trade')]

    # List to hold the paths of the files that match the criteria
    matching_files = []

    for path in paths_to_search:
        for root, dirs, files in os.walk(path):
            # Check if current directory matches the required format 'yyyy_mm'
            current_dir = os.path.basename(root)
            try:
                dir_year, dir_month = current_dir.split('_')
                dir_date = datetime(int(dir_year), int(dir_month), 1)
                if start_date <= dir_date <= end_date:
                    for file in files:
                        # Parse the date from the file name and check if it falls within the date range
                        try:
                            file_date_str = '-'.join(file.split('-')[:3])
                            file_date = datetime.strptime(file_date_str, "%Y-%m-%d")
                            if start_date <= file_date <= end_date:
                                full_path = os.path.join(root, file)
                                matching_files.append(full_path)
                        except ValueError:
                            # Skip files that do not match the expected date format
                            pass
            except ValueError:
                # Skip directories that do not match the expected format 'yyyy_mm'
                pass

    return matching_files


In [25]:
def extract_files(base_dir, ticker, start_date, end_date):
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.strptime(end_date, "%Y-%m-%d")

    bbo_files = []
    trade_files = []

    paths_to_search = {
        'bbo': os.path.join(base_dir, ticker, 'bbo'),
        'trade': os.path.join(base_dir, ticker, 'trade')
    }

    for category, path in paths_to_search.items():
        for root, dirs, files in os.walk(path):
            current_dir = os.path.basename(root)
            try:
                dir_year, dir_month = current_dir.split('_')
                dir_date = datetime(int(dir_year), int(dir_month), 1)
                if start_date <= dir_date <= end_date:
                    for file in files:
                        file_date_str = '-'.join(file.split('-')[:3])
                        try:
                            file_date = datetime.strptime(file_date_str, "%Y-%m-%d")
                            if start_date <= file_date <= end_date:
                                full_path = os.path.join(root, file)
                                if category == 'bbo':
                                    bbo_files.append(full_path)
                                else:
                                    trade_files.append(full_path)
                        except ValueError:
                            pass
            except ValueError:
                pass

    return bbo_files, trade_files



In [28]:
current_dir = os.getcwd()
loading_dir = os.path.join(current_dir,"data","raw",
                                    "flash_crash_DJIA","csv_files")
#print(os.listdir(loading_dir))
example_ticker = "AAPL.OQ-2010"
bbo_files, trade_files = extract_files(loading_dir,example_ticker,"2010-01-01","2010-01-06")
bbo_files

['/Users/ilyesbenayed/Desktop/Big data/data/raw/flash_crash_DJIA/csv_files/AAPL.OQ-2010/bbo/2010_01/2010-01-05-AAPL.OQ-bbo.csv.gz',
 '/Users/ilyesbenayed/Desktop/Big data/data/raw/flash_crash_DJIA/csv_files/AAPL.OQ-2010/bbo/2010_01/2010-01-06-AAPL.OQ-bbo.csv.gz',
 '/Users/ilyesbenayed/Desktop/Big data/data/raw/flash_crash_DJIA/csv_files/AAPL.OQ-2010/bbo/2010_01/2010-01-04-AAPL.OQ-bbo.csv.gz',
 '/Users/ilyesbenayed/Desktop/Big data/data/raw/flash_crash_DJIA/csv_files/AAPL.OQ-2010/bbo/2010_01/2010-01-01-AAPL.OQ-bbo.csv.gz']

In [None]:
delayed_dfs = []

for file in os.listdir(stock_dir_ex):
    path_file = os.path.join(stock_dir_ex, file)
    # Delay computation
    delayed_df = delayed(load_TRTH_trade)(path_file,only_regular_trading_hours=False,merge_sub_trades=False,drop_columns=False)
    delayed_dfs.append(delayed_df)

# Compute all DataFrames in parallel and concatenate
dfs = compute(*delayed_dfs)
final_df = pd.concat(dfs, ignore_index=False)
final_df = final_df.sort_index()

In [None]:
@dask.delayed
def load_merge_trade_bbo(ticker,date_start,
                         date_end,
                         dirBase,
                         suffix="parquet",
                         suffix_save=None,
                         dirSaveBase = None ,
                         saveOnly=False,
                         doSave=False
                        ):
    """"
    This function allows to merge trade and bbo together of a given ticker, 
    We should precise the destination to save the merged data 
    The date merged can be done in any period we would like to test on
    By default the method returns the merged series, but can save or only save the merged series 
    Gien a location of the saving directory, the suffix is by default a parquet file. 
    """"
    bbo_files,trade_files  = extract_files(dirBase,ticker,date_start,date_end)
    trade_dfs = []

    for file in trade_files:
        # Delay computation
        trade_df = delayed(load_TRTH_trade)(file,only_regular_trading_hours=False,merge_sub_trades=False,drop_columns=False)
        trade_dfs.append(trade_df)
    # Compute all DataFrames in parallel and concatenate
    dfs = compute(*trade_dfs)
    trades = pd.concat(dfs, ignore_index=False)
    trades = trades.sort_index()

    bbo_dfs = []
    for file in bbo_files:
        bbo_df =  delayed(load_TRTH_bbo)(file)
        bbo_dfs.append(bbo_df)
    dfs = compute(*bbo_dfs)
    bbos  = pd.concat(dfs,ignore_index= False)
    bbos = bbos.sort_index()
    try:
        trades.shape + bbos.shape
    except:
        return None
    
    events=trades.join(bbos,how="outer")
    
    if doSave:
        dirSave=os.path.join(dirSaveBase,ticker)
        if not os.path.isdir(dirSave):
            os.makedirs(dirSave)

        if suffix_save:
            suffix=suffix_save
        
        file_events=os.path.join(dirSave,"events."+suffix)
       # pdb.set_trace()

        saved=False
        if suffix == "arrow":
            # Convert Pandas DataFrame to Dask DataFrame
            dask_df = dd.from_pandas(events, npartitions=1)  # Adjust npartitions based on your dataset size and memory
            # Dask doesn't directly support exporting to Arrow, so you would need to use PyArrow separately if needed
            # For now, you can save it as Parquet which is compatible with Arrow
            dask_df.to_parquet(file_events, write_index=True)  # Parquet format is compatible with Arrow
            saved = True
        
        elif suffix == "parquet":
            dask_df = dd.from_pandas(events, npartitions=1)  # Convert Pandas DataFrame to Dask DataFrame
            dask_df.to_parquet(file_events, write_index=True, engine='pyarrow', write_metadata_file=True)
            saved = True
        
        if not saved:
            print(f"suffix {suffix}: format not recognized")

        if saveOnly:
            return saved
    return events


""""
To be tested and completed 

""""