In [None]:
#!/usr/bin/env python
"""
Find missing trading days (as date ranges) in a Parquet dataset, per partition combination,
without ever loading the entire dataset into memory at once.

Key points:
- Dataset is a directory of Parquet files, possibly Hive-partitioned on multiple keys.
- One column is named `date`. It may be:
    * a partition key (e.g. date=2020-01-02/...), or
    * a column stored inside the Parquet files.
- Given [start_date, end_date] and a pandas_market_calendars calendar:
    * Build the trading-day index (days market is open).
    * Enumerate all combinations of non-date partition columns from the directory structure.
    * For each partition combination:
        - Collect the set of dates present for that combo
          (from directory names if date is a partition key, or by reading only that comboâ€™s data).
        - Compute missing trading-day ranges.
- Missing days are reported as contiguous ranges per partition combination.

Requirements:
    pip install pandas pyarrow pandas_market_calendars
"""

import argparse
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Tuple, Any, Iterable

import os
import pandas as pd
import pyarrow.dataset as ds
import pandas_market_calendars as mcal
import sys
sys.path.append('../src')
import utils


# ----------------------------------------------------------------------
# Trading days utilities
# ----------------------------------------------------------------------

def get_trading_days(
    calendar_name: str, start_date: str, end_date: str
) -> pd.DatetimeIndex:
    """
    Return normalized (midnight) trading days for the given calendar and date range.
    Only days where the market is open are included.
    """
    cal = mcal.get_calendar(calendar_name)
    schedule = cal.schedule(start_date=start_date, end_date=end_date)
    idx = schedule.index  # DatetimeIndex, typically tz-aware

    if idx.tz is not None:
        idx = idx.tz_convert(None)

    return idx.normalize()


def find_missing_ranges(
    expected_days: pd.DatetimeIndex,
    present_days: pd.DatetimeIndex,
) -> List[Tuple[pd.Timestamp, pd.Timestamp]]:
    """
    Given expected trading days and the days present in the data, return a list of
    missing contiguous ranges (start_date, end_date).

    Both inputs should be normalized DatetimeIndex objects.
    """
    if expected_days.empty:
        return []

    present_set = set(present_days)
    missing = [d for d in expected_days if d not in present_set]

    if not missing:
        return []

    pos = {d: i for i, d in enumerate(expected_days)}    

    missing.sort()
    ranges: List[Tuple[pd.Timestamp, pd.Timestamp]] = []

    start = prev = missing[0]
    for d in missing[1:]:
        # If the gap is more than 1 day, we start a new range
        if pos[d] - pos[prev] > 1:
            ranges.append((start, prev))
            start = d
        prev = d
        
    ranges.append((start, prev))

    return ranges


# ----------------------------------------------------------------------
# Partition discovery from filesystem
# ----------------------------------------------------------------------

def parse_partition_components(parts: Iterable[str]) -> Dict[str, str]:
    """
    Parse a sequence of path components like ['exchange=XNYS', 'ticker=AAPL']
    into a dict {'exchange': 'XNYS', 'ticker': 'AAPL'}.
    Components that do not contain '=' are ignored.
    """
    result: Dict[str, str] = {}
    for p in parts:
        if "=" in p:
            key, val = p.split("=", 1)
            result[key] = val
    return result


def discover_partition_combos_from_paths(
    dataset_path: str,
    combo_cols: List[str],
    date_column: str,
) -> Tuple[List[Tuple[Any, ...]], Dict[Tuple[Any, ...], List[pd.Timestamp]]]:
    """
    FAST VERSION:

    Walk the *directory tree* (not individual Parquet files) to discover:

    - All unique combinations of values for `combo_cols` (non-date partition columns).
    - For each combination, the list of dates if `date_column` is a partition key
      in the directory structure.

    Assumes Hive-style partitioning: e.g.
        root/exchange=ARCX.PILLAR/ticker=AEG/date=2020-01-02/part-0.parquet

    Returns:
        combos: list of tuple(combo_values) in the order of combo_cols.
        combo_dates: mapping combo_key -> list of pd.Timestamp (only those combos
                     where `date_column` appears as a partition key).
    """
    dataset_path = os.path.abspath(dataset_path)

    combo_set = set()
    # store date *strings* during walk, convert to Timestamp once at the end
    combo_date_strings: Dict[Tuple[Any, ...], set] = defaultdict(set)

    for dirpath, dirnames, filenames in os.walk(dataset_path):
        rel = os.path.relpath(dirpath, dataset_path)
        if rel == ".":
            # root itself; no partition components here
            continue

        parts = rel.split(os.sep)
        part_dict = parse_partition_components(parts)

        # Build combo key from combo_cols (may be empty tuple if no non-date partition cols)
        combo_key = tuple(part_dict.get(col) for col in combo_cols)
        
        # We only care about directories that actually correspond to some partition level
        if all(val is not None for val in combo_key) or (date_column in part_dict):
            combo_set.add(combo_key)

        # Capture date if it is present as a partition component
        if date_column in part_dict:
            combo_date_strings[combo_key].add(part_dict[date_column])

    combos = sorted(combo_set)
   
    # Convert date strings to normalized Timestamp lists in one shot
    combo_dates: Dict[Tuple[Any, ...], List[pd.Timestamp]] = {}
    for key, date_strs in combo_date_strings.items():
        # sort lexicographically first (cheap), then convert
        sorted_strs = sorted(date_strs)
        ts = pd.to_datetime(sorted_strs).normalize()
        combo_dates[key] = list(ts)

    return combos, combo_dates

# ----------------------------------------------------------------------
# Main computation: missing days by partition combo
# ----------------------------------------------------------------------
def compute_missing_by_partition_streaming(
    dataset_path: str,
    date_column: str,
    trading_days: pd.DatetimeIndex,
) -> pd.DataFrame:
    """
    Streaming-style computation of missing trading-day ranges.

    Strategy:
    - Build a pyarrow.dataset with Hive partitioning (to know partition column names).
    - Determine non-date partition columns (combo_cols).
    - Walk the filesystem to find:
        * all partition combinations (combo_cols),
        * dates present as partition keys (if date_column is a partition key).
    - For each combo:
        * If date_column is a partition key -> get dates from path metadata.
        * Otherwise -> read ONLY that combo's rows from disk (date column only).
    - Compute missing trading-day ranges for each combo.

    Returns:
        DataFrame with columns: combo_cols..., "missing_start", "missing_end"
    """
    # Build dataset; no data read yet, just metadata and schema.
    dataset = ds.dataset(dataset_path, format="parquet", partitioning="hive")

    # Partition columns discovered by Arrow; may be empty
    if dataset.partitioning is not None and dataset.partitioning.schema is not None:
        partition_cols = [field.name for field in dataset.partitioning.schema]
    else:
        partition_cols = []

    # Non-date partition columns define the "partition combination"
    combo_cols = [c for c in partition_cols if c != date_column]

    # Discover partition combinations and (optionally) date values from paths
    combos, combo_dates_from_paths = discover_partition_combos_from_paths(
        dataset_path=dataset_path,
        combo_cols=combo_cols,
        date_column=date_column,
    )

    # Is date effectively a partition column?
    date_is_partition = date_column in partition_cols

    min_td, max_td = trading_days.min(), trading_days.max()

    results_rows: List[Dict[str, Any]] = []

    # Helper to build a pyarrow filter for one combo (only on partition columns)
    def build_partition_filter(combo_key: Tuple[Any, ...]):
        if not combo_cols:
            return None  # no partition filter
        expr = None
        for col_name, value in zip(combo_cols, combo_key):
            # Partition values are strings in Hive layout; compare as strings
            term = ds.field(col_name) == value
            expr = term if expr is None else (expr & term)
        return expr

    for combo_key in combos or [()]:
        # Present days for this combo
        if date_is_partition and combo_key in combo_dates_from_paths:
            # Dates can be computed from folder names; no need to touch file bodies.
            present_days = pd.DatetimeIndex(combo_dates_from_paths[combo_key]).normalize()
            # Restrict to the trading-day window of interest
            present_days = present_days[(present_days >= min_td) & (present_days <= max_td)]
        else:
            # Need to read this combo's data from disk, but only this combo and only the date column.
            partition_filter = build_partition_filter(combo_key)

            # You may optionally add a date-range filter to reduce I/O, e.g.:
            #   date_filter = (ds.field(date_column) >= min_td) & (ds.field(date_column) <= max_td)
            #   full_filter = partition_filter & date_filter if partition_filter is not None else date_filter
            # For simplicity, we only filter on partition columns here.
            full_filter = partition_filter

            table = dataset.to_table(columns=[date_column], filter=full_filter)
            if table.num_rows == 0:
                present_days = pd.DatetimeIndex([])
            else:
                df_dates = table.to_pandas()
                present_days = pd.to_datetime(df_dates[date_column]).dt.normalize().unique()
                present_days = pd.DatetimeIndex(present_days)
                # Restrict to trading window
                present_days = present_days[
                    (present_days >= min_td) & (present_days <= max_td)
                ]

        missing_ranges = find_missing_ranges(trading_days, present_days)

        for start, end in missing_ranges:
            row: Dict[str, Any] = {}
            for col_name, value in zip(combo_cols, combo_key):
                row[col_name] = value
            row["missing_start"] = start.date().isoformat()
            row["missing_end"] = end.date().isoformat()
            results_rows.append(row)

    return pd.DataFrame(results_rows)

def find_first_file_directory(root: str):
    """
    Walk root until the first directory containing any file is found.
    Returns that directory path, or None if no files exist.
    """
    for dirpath, dirnames, filenames in os.walk(root):
        if filenames:
            return dirpath
    return None


def get_partition_field_vals(partition_path, field_names=['exchange','ticker','date']):
    dset = ds.dataset(partition_path, format="parquet", partitioning="hive")
    s = ''.join([str(frag.partition_expression) for frag in dset.get_fragments()])

    field_vals = {}
    for field_name in field_names:
        pattern = re.compile(rf"\({field_name} == \"(?P<{field_name}>[^\"]+)")
        field_vals[field_name] = sorted(set(pattern.findall(s)))

    return field_vals
# ----------------------------------------------------------------------
# CLI
# ----------------------------------------------------------------------
cbday = utils.get_market_business_days('XNYS')
trading_days = pd.date_range('2019-12-15', pd.Timestamp.today().strftime('%Y-%m-%d'), freq=cbday)


if trading_days.empty:
    raise SystemExit("No trading days in the specified range; check dates/calendar.")

%prun missing_df = compute_missing_by_partition_streaming(dataset_path='../data/raw/adrs/bbo-1m/by_exchange/',date_column='date', trading_days=trading_days, )

In [49]:
from collections import defaultdict
from typing import List, Tuple
import pandas as pd

def rectangular_groups_by_ticker(df: pd.DataFrame) -> List[Tuple[List[str], List[str]]]:
    """
    Partition (exchange, ticker) pairs into disjoint rectangles by grouping
    tickers that share exactly the same set of exchanges.

    Returns a list of (exchanges, tickers) groups.
    """
    # Work only with unique pairs
    pairs = df[["exchange", "ticker"]].drop_duplicates()

    # For each ticker, compute its set of exchanges
    ticker_to_exset = (
        pairs.groupby("ticker")["exchange"]
        .agg(lambda s: frozenset(s))
    )

    # Group tickers by their exchange-set signature
    exset_to_tickers = defaultdict(list)
    for ticker, ex_set in ticker_to_exset.items():
        exset_to_tickers[ex_set].append(ticker)

    groups = []
    for ex_set, tickers in exset_to_tickers.items():
        exchanges = sorted(ex_set)
        tickers = sorted(tickers)
        groups.append((exchanges, tickers))

    return groups


def rectangular_groups_by_exchange(df: pd.DataFrame) -> List[Tuple[List[str], List[str]]]:
    """
    Symmetric version: group exchanges that share exactly the same set of tickers.
    Returns a list of (exchanges, tickers) groups.
    """
    pairs = df[["exchange", "ticker"]].drop_duplicates()

    # For each exchange, compute its set of tickers
    ex_to_tset = (
        pairs.groupby("exchange")["ticker"]
        .agg(lambda s: frozenset(s))
    )

    # Group exchanges by their ticker-set signature
    tset_to_exchanges = defaultdict(list)
    for ex, t_set in ex_to_tset.items():
        tset_to_exchanges[t_set].append(ex)

    groups = []
    for t_set, exchanges in tset_to_exchanges.items():
        exchanges = sorted(exchanges)
        tickers = sorted(t_set)
        groups.append((exchanges, tickers))

    return groups


def rectangular_groups(df: pd.DataFrame, mode: str = "auto") -> List[Tuple[List[str], List[str]]]:
    """
    High-level helper.

    - mode="ticker": use grouping by ticker signatures only.
    - mode="exchange": use grouping by exchange signatures only.
    - mode="auto": compute both and pick the one with fewer rectangles.
    """
    if mode == "ticker":
        return rectangular_groups_by_ticker(df)
    elif mode == "exchange":
        return rectangular_groups_by_exchange(df)
    elif mode == "auto":
        g_t = rectangular_groups_by_ticker(df)
        g_e = rectangular_groups_by_exchange(df)
        return g_t if len(g_t) <= len(g_e) else g_e
    else:
        raise ValueError("mode must be 'ticker', 'exchange', or 'auto'")

In [None]:
import os
import re
import sys
import pandas as pd
from glob import glob
sys.path.append('../src')
import utils

def find_first_file_directory(root: str):
    """
    Walk root until the first directory containing any file is found.
    Returns that directory path, or None if no files exist.
    """
    for dirpath, dirnames, filenames in os.walk(root):
        if filenames:
            return dirpath
    
    return None

def get_partition_field_keys(partition_path):
    first_dir = find_first_file_directory(partition_path)
    sep = re.escape(os.sep)
    pattern = re.compile(rf"{sep}(?P<name>[^/=]+)=")
    field_keys = pattern.findall(first_dir)
    
    return field_keys

def get_partition_field_vals(partition_path, field_keys=['exchange','ticker','date']):
    glob_str = os.path.join(partition_path, *('*',)*len(field_keys))
    s = ','.join(glob(glob_str))
    
    print(glob_str,s[:100])
    sep = re.escape(os.sep)
    pattern_str = sep.join(
        rf"{name}=(?P<{name}>[^{sep}]+)"
        for name in field_keys
    ) + ','
    pattern = re.compile(pattern_str)
    print(pattern_str)
    field_vals = pattern.findall(s)

    return field_vals


def find_missing_ranges(
    expected_days: pd.DatetimeIndex,
    present_days: pd.DatetimeIndex,
):
    """
    Given expected trading days and the days present in the data, return a list of
    missing contiguous ranges (start_date, end_date).

    Both inputs should be normalized DatetimeIndex objects.
    """
    if expected_days.empty:
        return []

    present_set = set(present_days)
    missing = [d for d in expected_days if d not in present_set]

    if not missing:
        return []

    pos = {d: i for i, d in enumerate(expected_days)}    

    missing.sort()
    ranges: List[Tuple[pd.Timestamp, pd.Timestamp]] = []

    start = prev = missing[0]
    for d in missing[1:]:
        # If the gap is more than 1 day, we start a new range
        if pos[d] - pos[prev] > 1:
            ranges.append((start, prev))
            start = d
        prev = d
        
    ranges.append((start, prev))

    return ranges

calendar_map = {'ARCX.PILLAR':'XNYS',
                'IFLL.IMPACT':'XLON',
                'XEUR.EOBI':'EUREX',
                'XNAS.BASIC':'XNYS',
                'XNAS.ITCH':'XNYS',
                'XNYS.PILLAR':'XNYS',
                'GLBX.MDP3':'us_futures',
}

partition_path = '../data/raw/adrs/bbo-1m/by_exchange/'
datasets = ['ARCX.PILLAR','XNAS.ITCH','XNYS.PILLAR']
dataset_partition_name = 'exchange'
tickers = ['AZN','BP','GSK','AEG','SHEL','WPP','HLN']
start_date = '2019-12-15'
end_date = pd.Timestamp.today().strftime('%Y-%m-%d')

new_datasets = []
existing_datasets = []
for dataset in datasets:
    dataset_path = os.path.join(partition_path, f"{dataset_partition_name}={dataset}")
    if os.path.exists(dataset_path) and len(os.listdir(dataset_path)) > 0:
        existing_datasets.append(dataset)
    else:
        new_datasets.append(dataset)

batch_jobs = []
if len(new_datasets) > 0:
    batch_jobs.append({
        'datasets': new_datasets,
        'tickers': tickers,
        'start_date': start_date,
        'end_date': end_date,
    })

new_tickers = []
existing_tickers = []
for ticker in tickers:
    ticker_paths = (glob(os.path.join(partition_path, f"*={ticker}*")) + 
                    glob(os.path.join(partition_path, '*', f"*={ticker}*")))
    if len(ticker_paths) > 0:
        existing_tickers.append(ticker)
    else:
        new_tickers.append(ticker)

if len(new_tickers) > 0:
    batch_jobs.append({
        'datasets': datasets,
        'tickers': new_tickers,
        'start_date': start_date,
        'end_date': end_date,
    })

if len(existing_datasets) > 0 and len(existing_tickers) > 0:
    field_keys = get_partition_field_keys(partition_path)
    field_vals = get_partition_field_vals(partition_path, field_keys=field_keys)
    if 'date' in field_keys:
        partition_df = pd.DataFrame(field_vals, columns=field_keys)

    df = pd.DataFrame(field_vals, columns=field_keys)
    all_range_df = []
    if 'date' in field_keys:
        iterator = df.groupby(['exchange','ticker'])['date']
    else:
        iterator = field_vals

    trading_days_by_calendar = {}
    for dataset in datasets:
        calendar_name = calendar_map[dataset]
        cbday = utils.get_market_business_days(calendar_name)
        trading_days_by_calendar[dataset] = pd.date_range(start_date,
                                                            end_date,
                                                            freq=cbday
                                                        )
    if 'ticker' in field_keys:
        symbol_idx = field_keys.index('ticker')
    elif 'code' in field_keys:
        symbol_idx = field_keys.index('code')
    else:
        raise ValueError("No ticker or code field found in partition keys.")

    for x in iterator:
        if 'date' in field_keys:
            comb, present_dates = x
            present_dates = pd.to_datetime(present_dates).tolist()
            field_cols = field_keys[:-1]
        else:
            comb = x
            d = [f"{key}={val}" for key,val in zip(field_keys,comb)]
            comb_path = os.path.join(partition_path, *d)
            present_dates = pd.to_datetime(pd.read_parquet(comb_path, columns=['date'],)['date'].unique()).tolist()
            field_cols = field_keys

        if comb[symbol_idx] not in tickers:
            continue
        
        if dataset_partition_name in field_keys:
            dataset = comb[0]
        else:
            if len(datasets) > 1:
                raise ValueError("Dataset partition name not in field keys but multiple datasets were given.")
            dataset = datasets[0]

        trading_days = trading_days_by_calendar[dataset]
        missing_ranges = find_missing_ranges(trading_days,
                                            present_dates)
        range_df = pd.DataFrame([comb + r for r in missing_ranges],
                    columns=field_cols + ['missing_start','missing_end'])
        all_range_df.append(range_df)

    missing_df = pd.concat(all_range_df, ignore_index=True)
    k = 0
    for (start_date, end_date), group in missing_df.groupby(['missing_start','missing_end']):
        grouped_requests = rectangular_groups(group, mode="auto")
        for datasets, tickers in grouped_requests:
            batch_jobs.append({
                'datasets': datasets,
                'tickers': tickers,
                'start_date': start_date,
                'end_date': end_date,
            })

In [None]:
ticker = 'AZN'

ticker_paths

['../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=AZN',
 '../data/raw/adrs/bbo-1m/by_exchange/exchange=ARCX.PILLAR/ticker=AZN',
 '../data/raw/adrs/bbo-1m/by_exchange/exchange=XNYS.PILLAR/ticker=AZN']

In [71]:
ticker_paths

['../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=AZN',
 '../data/raw/adrs/bbo-1m/by_exchange/exchange=ARCX.PILLAR/ticker=AZN',
 '../data/raw/adrs/bbo-1m/by_exchange/exchange=XNYS.PILLAR/ticker=AZN']

In [None]:


groups = rectangular_groups(missing_df, mode="auto")

In [47]:
df = pd.DataFrame({
    'exchange': ['ARCX.PILLAR', 'ARCX.PILLAR', 'XNAS.ITCH', 'XNAS.ITCH', 'XNYS.PILLAR'],
    'ticker': ['AZN', 'BP', 'AZN', 'BP', 'AZN'],
})

groups = rectangular_groups(df, mode="auto")

[(['ARCX.PILLAR', 'XNAS.ITCH', 'XNYS.PILLAR'], ['AZN']),
 (['ARCX.PILLAR', 'XNAS.ITCH'], ['BP'])]

In [12]:
partition_df = pd.DataFrame(field_vals,columns=field_keys)
partition_df[dataset_partition_name].unique()

array(['XNAS.ITCH', 'ARCX.PILLAR', 'XNYS.PILLAR'], dtype=object)

In [None]:
cbday = utils.get_market_business_days(calendar_map[comb[0]])

<CustomBusinessDay>

In [None]:
cbday = utils.get_market_business_days('XNYS')
trading_days = pd.date_range('2019-12-15', 
                            pd.Timestamp.today().strftime('%Y-%m-%d'),
                            freq=cbday)

Unnamed: 0,exchange,ticker,missing_start,missing_end


In [23]:
missing_df[missing_df['exchange']=='ARCX.PILLAR'][['missing_start','missing_end']].value_counts()

missing_start  missing_end
2019-12-16     2019-12-31     48
2025-11-10     2025-12-02     20
2025-11-03     2025-12-02     18
2025-09-15     2025-12-02     17
2020-08-12     2020-08-12      8
2020-08-25     2020-09-24      8
2022-11-21     2022-11-21      3
2022-11-23     2022-11-23      3
2022-11-22     2022-11-23      2
2022-12-01     2022-12-02      2
2022-12-08     2022-12-09      2
2022-12-06     2022-12-08      2
2022-12-02     2022-12-02      1
               2022-12-05      1
               2022-12-09      1
2019-12-16     2021-03-31      1
2022-12-05     2022-12-05      1
               2022-12-06      1
2022-12-06     2022-12-06      1
2022-11-29     2022-12-06      1
2022-12-07     2022-12-07      1
2022-12-09     2022-12-09      1
2022-12-01     2022-12-01      1
2022-11-28     2022-11-29      1
2022-11-29     2022-11-29      1
2022-11-28     2022-12-08      1
2019-12-16     2021-06-10      1
               2020-07-22      1
2022-11-23     2022-11-29      1
2022-11-21     2

In [19]:
missing_df[['missing_start','missing_end']].value_counts()

missing_start  missing_end
2019-12-16     2019-12-31     48
2025-11-10     2025-12-02     20
2025-11-03     2025-12-02     18
2025-09-15     2025-12-02     17
2020-08-12     2020-08-12      8
2020-08-25     2020-09-24      8
2022-11-21     2022-11-21      3
2022-11-23     2022-11-23      3
2022-11-22     2022-11-23      2
2022-12-01     2022-12-02      2
2022-12-08     2022-12-09      2
2022-12-06     2022-12-08      2
2022-12-02     2022-12-02      1
               2022-12-05      1
               2022-12-09      1
2019-12-16     2021-03-31      1
2022-12-05     2022-12-05      1
               2022-12-06      1
2022-12-06     2022-12-06      1
2022-11-29     2022-12-06      1
2022-12-07     2022-12-07      1
2022-12-09     2022-12-09      1
2022-12-01     2022-12-01      1
2022-11-28     2022-11-29      1
2022-11-29     2022-11-29      1
2022-11-28     2022-12-08      1
2019-12-16     2021-06-10      1
               2020-07-22      1
2022-11-23     2022-11-29      1
2022-11-21     2

In [11]:
present_dates

['2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-02',
 '2020-01-

exchange     ticker
ARCX.PILLAR  AEG       2
             ARGX      9
             ASML      7
             AZN       6
             BBVA      2
                      ..
XNYS.PILLAR  TTE       2
             UL        2
             VOD       2
             WDS       2
             WPP       2
Length: 168, dtype: int64

In [29]:
pattern = re.compile(''.join([rf"{sep}(?P<{name}>[^/=]+)=" for name in field_names]))
pattern.search('../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=SAP/date=2022-01-11')

In [23]:
s[:100]

'../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=SAP/date=2022-01-11,../data/raw/adrs/b'

In [24]:
sep = re.escape(os.sep)  # cross-platform

field_names = ["exchange", "ticker", "date"]

# /exchange=(?P<exchange>[^/]+)/ticker=(?P<ticker>[^/]+)/date=(?P<date>[^/]+)
pattern_str = sep.join(
    rf"{name}=(?P<{name}>[^{sep}]+)"
    for name in field_names
) + ','
pattern = re.compile(pattern_str)

glob_str = os.path.join('../data/raw/adrs/bbo-1m/by_exchange/', *('*',)*len(field_keys))
s = ','.join(glob(glob_str))
#s = "../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=SAP/date=2022-01-11"

pattern.findall(s)



[('XNAS.ITCH', 'SAP', '2022-01-11'),
 ('XNAS.ITCH', 'SAP', '2025-02-28'),
 ('XNAS.ITCH', 'SAP', '2024-08-29'),
 ('XNAS.ITCH', 'SAP', '2022-08-01'),
 ('XNAS.ITCH', 'SAP', '2023-01-24'),
 ('XNAS.ITCH', 'SAP', '2025-08-07'),
 ('XNAS.ITCH', 'SAP', '2022-04-18'),
 ('XNAS.ITCH', 'SAP', '2024-01-19'),
 ('XNAS.ITCH', 'SAP', '2021-11-24'),
 ('XNAS.ITCH', 'SAP', '2022-09-02'),
 ('XNAS.ITCH', 'SAP', '2021-02-03'),
 ('XNAS.ITCH', 'SAP', '2021-09-02'),
 ('XNAS.ITCH', 'SAP', '2025-10-02'),
 ('XNAS.ITCH', 'SAP', '2025-03-06'),
 ('XNAS.ITCH', 'SAP', '2021-08-27'),
 ('XNAS.ITCH', 'SAP', '2023-02-02'),
 ('XNAS.ITCH', 'SAP', '2025-07-17'),
 ('XNAS.ITCH', 'SAP', '2024-10-17'),
 ('XNAS.ITCH', 'SAP', '2021-08-02'),
 ('XNAS.ITCH', 'SAP', '2024-07-02'),
 ('XNAS.ITCH', 'SAP', '2024-04-03'),
 ('XNAS.ITCH', 'SAP', '2020-12-21'),
 ('XNAS.ITCH', 'SAP', '2020-08-13'),
 ('XNAS.ITCH', 'SAP', '2020-01-22'),
 ('XNAS.ITCH', 'SAP', '2024-02-15'),
 ('XNAS.ITCH', 'SAP', '2020-05-07'),
 ('XNAS.ITCH', 'SAP', '2025-05-14'),
 

In [19]:
pattern_str

'exchange=(?P<exchange>[^/]+)/ticker=(?P<ticker>[^/]+)/date=(?P<date>[^/]+)/'

In [None]:
from glob import glob
s_all = ','.join(glob(os.path.join('../data/raw/adrs/bbo-1m/by_exchange/','*','*','*')))

In [12]:
import re
from glob import glob

partition_path = '../data/raw/adrs/bbo-1m/by_exchange/'
s = ','.join(glob(os.path.join(partition_path,'*=*','*=*','date=*')))
pattern = re.compile(r"ticker=(?P<ticker>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})")
dates = pattern.findall(s)
# def get_partition_dates(partition_path):
#     s = ','.join(glob(os.path.join(partition_path,'*=*','date=*')))
#     pattern = re.compile(r"date=(?P<date>\d{4}-\d{2}-\d{2})")
#     dates = pattern.findall(s)
    
#     return sorted(list(set(dates)))


In [17]:
glob(os.path.join(partition_path,'*','*','*','*.parquet'))

['../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=SAP/date=2022-01-11/da95c6f5e9184a22806b9370501695c9-0.parquet',
 '../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=SAP/date=2025-02-28/92ae0e78677740f7a13d5ff1b7eef3d0-0.parquet',
 '../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=SAP/date=2024-08-29/e131693509ea409dbccf448454b144a4-0.parquet',
 '../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=SAP/date=2022-08-01/9c536d0a889e4987bad7c0929cad5734-0.parquet',
 '../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=SAP/date=2023-01-24/37c1ddf4963f4e30acd4b8429a5e6fe3-0.parquet',
 '../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=SAP/date=2025-08-07/05df8db79d6348e18b15cea545a0737f-0.parquet',
 '../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=SAP/date=2022-04-18/0116623f8c15438788fdcb04e4728511-0.parquet',
 '../data/raw/adrs/bbo-1m/by_exchange/exchange=XNAS.ITCH/ticker=SAP/date=2024-01-19/acae8849c65d4

In [6]:
dataset_path='../data/raw/adrs/bbo-1m/none/'
dataset = ds.dataset(dataset_path, format="parquet", partitioning="hive")

In [29]:
combos, combo_dates = discover_partition_combos_from_paths(
    dataset_path='../data/raw/adrs/bbo-1m/by_exchange/',date_column='date',combo_cols=['exchange','ticker']
)

In [None]:
import os

def discover_partition_combos_from_paths(
    dataset_path: str,
    combo_cols: List[str],
    date_column: str,
) -> Tuple[List[Tuple[Any, ...]], Dict[Tuple[Any, ...], List[pd.Timestamp]]]:
    """
    FAST VERSION:

    Walk the *directory tree* (not individual Parquet files) to discover:

    - All unique combinations of values for `combo_cols` (non-date partition columns).
    - For each combination, the list of dates if `date_column` is a partition key
      in the directory structure.

    Assumes Hive-style partitioning: e.g.
        root/exchange=ARCX.PILLAR/ticker=AEG/date=2020-01-02/part-0.parquet

    Returns:
        combos: list of tuple(combo_values) in the order of combo_cols.
        combo_dates: mapping combo_key -> list of pd.Timestamp (only those combos
                     where `date_column` appears as a partition key).
    """
    dataset_path = os.path.abspath(dataset_path)

    combo_set = set()
    # store date *strings* during walk, convert to Timestamp once at the end
    combo_date_strings: Dict[Tuple[Any, ...], set] = defaultdict(set)

    for dirpath, dirnames, filenames in os.walk(dataset_path):
        rel = os.path.relpath(dirpath, dataset_path)
        if rel == ".":
            # root itself; no partition components here
            continue

        parts = rel.split(os.sep)
        part_dict = parse_partition_components(parts)

        # Build combo key from combo_cols (may be empty tuple if no non-date partition cols)
        combo_key = tuple(part_dict.get(col) for col in combo_cols)
        
        # We only care about directories that actually correspond to some partition level
        if all(val is not None for val in combo_key) or (date_column in part_dict):
            combo_set.add(combo_key)

        # Capture date if it is present as a partition component
        if date_column in part_dict:
            combo_date_strings[combo_key].add(part_dict[date_column])

    combos = sorted(combo_set)
   
    # Convert date strings to normalized Timestamp lists in one shot
    combo_dates: Dict[Tuple[Any, ...], List[pd.Timestamp]] = {}
    for key, date_strs in combo_date_strings.items():
        # sort lexicographically first (cheap), then convert
        sorted_strs = sorted(date_strs)
        ts = pd.to_datetime(sorted_strs).normalize()
        combo_dates[key] = list(ts)

    return combos, combo_dates

In [12]:
import sys
sys.path.append('../src')
import utils

In [None]:
cbday = utils.get_market_business_days('XNYS')
present_days = pd.date_range('2020-01-02','2025-11-07',freq=cbday)
expected_days = trading_days


In [35]:
ranges

[(Timestamp('2019-12-16 00:00:00'), Timestamp('2019-12-31 00:00:00')),
 (Timestamp('2025-11-10 00:00:00'), Timestamp('2025-12-02 00:00:00'))]