In [None]:
#| default_exp datastore

In [5]:
#| export
import os
import glob

import pandas as pd

from research.utils import (
    generate_weekday_dates, 
    find_list_overlap,
    get_home_directory
)

DATASTORE_PATH = "/home/pmk/datastore"
ASSET_TYPES = ["PERPS", "EQUITY", "CRYPTO"]
DATA_TYPES = ["TRADE", "QUOTE"]

In [10]:
#| export
class DataStore:
    """
    A class that reads/writes time series data to partitioned files on disk.

    Designed primarily for financial time series.
    """
    def __init__(self, datastore_path: str = DATASTORE_PATH):
        self.path = datastore_path

    def _folder_exists_if_not_create(self, path):
        if not os.path.exists(path):
            try:
                os.makedirs(symbol_directory)
            except OSError as e:
                print(f"Error creating folder '{folder_path}': {str(e)}")

    @property
    def available_files(self) -> pd.DataFrame:
        files = glob.glob(f"{self.path}/**/*.parquet", recursive = True)
        # remove header path and create DF of datastore files
        files_df = pd.DataFrame([i.replace(self.path, "").split("/") for i in files])
        # clean and aggregate
        files_df = files_df.rename(columns = {
                1 : "asset_type",
                2 : "data_type",
                3 : "symbol",
                4 : "file"
            }).drop(columns = [0])
        return files_df
    
    @property
    def available_partitions(self) -> pd.DataFrame:
        partitions = self.available_files.copy()
        # clean parquet extension and convert to datetime
        partitions["file"] = pd.to_datetime(
            partitions["file"].apply(
                lambda x: x.replace(".parquet", "")
            )
        )
        return partitions\
            .groupby(["asset_type", "data_type","symbol"])\
            .agg({"file" : ["min", "max", "count"]})\
            .reset_index()
        
    def get_asset_datastore_state(
                self,
                asset_type: str,
                data_type: str,
                symbols: list,
                dates: list
            ):
        assert asset_type in ASSET_TYPES, f"invalid asset_type: {asset_type}"
        assert data_type in DATA_TYPES, f"invalid data_type: {data_type}"

        if asset_type != 'CRYPTO': 
            weekdays = generate_weekday_dates(min(dates),max(dates))
            dates = find_list_overlap(dates, weekdays)
        
        data_type_path = f"{self.path}/{asset_type}/{data_type}"
        # dict we will populate with missing items
        missing = {}
        for symbol in symbols:
            missing[symbol] = []
            symbol_directory = f"{data_type_path}/{symbol}"
            self._folder_exists_if_not_create(symbol_directory)
            existing_partitions = glob.glob(f"{symbol_directory}/*.parquet")
            existing_partitions = [i.split("/")[-1] for i in existing_partitions]
            for date in dates:
                if f"{date}.parquet" not in existing_partitions:
                    missing[symbol].append(date)
        return missing

    def write_asset_partition(
            self,
            df: pd.DataFrame,
            asset_type: str,
            data_type: str,
            symbol: str,
            date: str
            ):
        data_type_path = f"{self.path}/{asset_type}/{data_type}"
        symbol_directory = f"{data_type_path}/{symbol}"
        partition_path = f"{symbol_directory}/{date}.parquet"
        df.to_parquet(partition_path, index = None)

    def load_asset_partition(
            self,
            asset_type: str,
            data_type: str,
            symbol: str,
            date: str
            ):
        data_type_path = f"{self.path}/{asset_type}/{data_type}"
        symbol_directory = f"{data_type_path}/{symbol}"
        partition_path = f"{symbol_directory}/{date}.parquet"
        return pd.read_parquet(partition_path)

    def load_asset_parition_range(
            self,
            asset_type: str,
            data_type: str,
            symbol: list,
            dates: list
        ):
        

In [8]:
ds = DataStore(f"{get_home_directory()}/datastore")

In [9]:
partition = ds.load_asset_partition(
    asset_type = "EQUITY",
    data_type = "TRADE",
    symbol = "AROW",
    date = "2023-01-03"
)
partition.head()

Unnamed: 0,conditions,exchange,id,participant_timestamp,price,sequence_number,sip_timestamp,size,tape,trf_id,trf_timestamp
0,[12],4,319,1672779685399262000,33.98,6428681,1672779685400263036,338,3,12.0,1.67278e+18
1,"[12, 37]",4,318,1672779622094000000,33.98,6427419,1672779622109060727,83,3,12.0,1.67278e+18
2,[15],12,297,1672779601024156497,33.98,6424590,1672779601024173819,3023,3,,
3,"[8, 9, 41]",12,296,1672779601024130202,33.98,6424589,1672779601024151847,3023,3,,
4,[15],11,124,1672779600029275904,33.93,6422322,1672779600029627197,100,3,,
