In [1]:
import sys
sys.path.append("../../")
from radarange_orchestrator import llm

In [2]:
from radarange_orchestrator.tools import all_tools
m = llm()
chat = m.chat(tools = all_tools)

llama_context: n_ctx_per_seq (80000) < n_ctx_train (131072) -- the full capacity of the model will not be utilized


In [3]:
prompt = """
Here's some examples of usage of Storage:

# storage.py
from __future__ import annotations

import os
import typing
from concurrent.futures import Executor, Future
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, TypeVar

from pydantic import BaseModel, ConfigDict, field_validator

from mariotte import config, date, instrument

T = TypeVar("T")


class StorageMode(str, Enum):
    # try read, write on error, ignore write error, after ignore read errors
    Default = "Default"
    Write = "Write"  # fail on write
    WriteIgnoreErrors = "WriteIgnoreErrors"
    Read = "Read"  # fail on read
    ReadIgnoreErrors = "ReadIgnoreErrors"


class StorageRequest(BaseModel):
    base_path: str
    instr: instrument.Instrument
    start: datetime
    end: datetime
    name: str
    addt: Any

    def full_path(self) -> str:
        return os.path.join(
            self.base_path,
            self.name,
            self.instr.market,
            self.instr.pair,
            date.to_str(self.start),
        )

    def info(self):
        return (self.instr.market, self.instr.pair, self.start, self.end)

    def __str__(self) -> str:
        return f"{self.instr.market}/{self.instr.pair} {self.start}"


class InstrumentInterval(BaseModel):
    instrument: instrument.Instrument
    start: datetime
    end: datetime

    @field_validator("start", "end", mode="before")
    @classmethod
    def date_from_str(cls, value: str | datetime) -> datetime:
        if isinstance(value, str):
            return date.from_str(value)
        return value

    @staticmethod
    def from_reader(reader: Any) -> list[InstrumentInterval]:
        return typing.cast(
            list[InstrumentInterval],
            config.load(reader, InstrumentInterval),
        )

    @staticmethod
    def from_file(path: str) -> list[InstrumentInterval]:
        with open(path, "r") as f:
            return InstrumentInterval.from_reader(f)


class Storage[T](BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    mode: StorageMode
    executor: Executor
    base_path: str
    name: str

    def load(
        self,
        instrument: instrument.Instrument,
        start: datetime,
        end: datetime,
    ) -> list[tuple[T, StorageRequest]]:
        return self.load_multi(
            [
                InstrumentInterval(
                    instrument=instrument,
                    start=start,
                    end=end,
                )
            ]
        )

    def load_multi(
        self,
        instrs: list[InstrumentInterval],
    ) -> list[tuple[T, StorageRequest]]:
        return self.wait(self.load_async(instrs))

    def load_async(
        self,
        instrs: list[InstrumentInterval],
    ) -> list[tuple[StorageRequest, Future[T | None]]]:
        requests: list[StorageRequest] = []

        for instr in instrs:
            intervals = date.split(instr.start, instr.end, timedelta(hours=12))

            for interval in intervals:
                req = StorageRequest(
                    base_path=self.base_path,
                    instr=instr.instrument,
                    start=interval[0],
                    end=interval[1],
                    name=self.name,
                    addt=None,
                )
                requests.append(req)
        return list(
            zip(
                requests,
                map(lambda x: self.executor.submit(self.load_req, x), requests),
            )
        )

    def wait(
        self, tasks: list[tuple[StorageRequest, Future[T | None]]]
    ) -> list[tuple[T, StorageRequest]]:
        res: list[tuple[T, StorageRequest]] = []
        for req, task in tasks:
            cur_res = task.result()
            if cur_res is None:
                continue
            res.append((cur_res, req))

        print(
            f"Read {len(res)} out of {len(tasks)} ({len(res) / max(len(tasks), 1) * 100:.2f}%)"
        )
        return res

    def load_req(self, req: StorageRequest) -> T | None:
        match self.mode:
            case StorageMode.Default:
                return self.default_req(req)
            case StorageMode.Read:
                return self.read_req(req)
            case StorageMode.ReadIgnoreErrors:
                return self.read_ignore_missing_req(req)
            case StorageMode.Write:
                return self.write_req(req)
            case StorageMode.WriteIgnoreErrors:
                self.try_write_req(req)
                return None

    def default_req(self, req: StorageRequest) -> T | None:
        try:
            return self.read_req(req)
        except Exception as e:
            print(f"Could not read {req}:\n{e}")
            self.try_write_req(req)
            return self.read_ignore_missing_req(req)

    def read_ignore_missing_req(self, req: StorageRequest) -> T | None:
        try:
            return self.read_req(req)
        except Exception as _:
            return None

    def read_req(self, req: StorageRequest) -> T:
        raise NotImplementedError("virtual method")

    def try_write_req(self, req: StorageRequest) -> bool:
        try:
            self.write_req(req)
            return True
        except Exception as e:
            print(f"Could not write {req}:\n{e}")
            return False

    def write_req(self, req: StorageRequest) -> None:
        raise NotImplementedError("virtual method")


# storage_features.py
from __future__ import annotations

import os
from concurrent.futures import ThreadPoolExecutor

import polars as pl
from pydantic import BaseModel, ConfigDict

from mariotte import instrument, paths
from mariotte.backtest import Backtest
from mariotte.research.storage import Storage, StorageMode, StorageRequest


class FeaturesConfig(BaseModel):
    traded_ob_window_size: int
    sided_ob_window_size: int


class FeaturesExporterModelConfig(BaseModel):
    instrument: instrument.InfraInstrument
    features_file: str
    features: FeaturesConfig


class FeaturesData(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    ft: pl.LazyFrame


FEATURES_STORAGE_PATH = "/ft_storage/"


class FeaturesStorage(Storage[FeaturesData]):
    executable: str = os.path.join(paths.boyle_path(), "build/allin_features_exporter")
    features_config: FeaturesConfig
    run_config_path: str = "./configs/run_config.json"
    sim_config_path: str = "./configs/simulation_config.json"
    verbose: int = 1

    @staticmethod
    def default(jobs: int) -> FeaturesStorage:
        return FeaturesStorage(
            features_config=FeaturesConfig(
                traded_ob_window_size=500_000_000,
                sided_ob_window_size=1_000_000_000,
            ),
            mode=StorageMode.ReadIgnoreErrors,
            executor=ThreadPoolExecutor(max_workers=jobs),
            base_path=FEATURES_STORAGE_PATH,
            name="main",
        )

    def paths(self, req: StorageRequest) -> tuple[str, str, str, str]:
        path = req.full_path()

        features = os.path.join(path, "features.parquet")

        stdout = os.path.join(path, "stdout.txt")
        stderr = os.path.join(path, "stderr.txt")

        return path, features, stdout, stderr

    def read_req(self, req: StorageRequest) -> FeaturesData:
        _, features, _, _ = self.paths(req)

        ft = pl.scan_parquet(features)
        ft.collect_schema()

        return FeaturesData(ft=ft)

    def write_req(self, req: StorageRequest) -> None:
        path, features, stdout, stderr = self.paths(req)

        os.makedirs(path, exist_ok=True)

        model_config = FeaturesExporterModelConfig(
            instrument=req.instr.to_infra(),
            features_file=features,
            features=self.features_config,
        )

        backtest = Backtest(
            executable=self.executable,
            model=model_config,
            run=self.run_config_path,
            simulation=self.sim_config_path,
            start=req.start,
            end=req.end,
            orders="/dev/null",
            stdout=stdout,
            stderr=stderr,
            verbose=self.verbose,
        )
        backtest.cmd().local_execute()

# storage_md.py
from __future__ import annotations

import os
from concurrent.futures import ThreadPoolExecutor

import polars as pl
from pydantic import BaseModel, ConfigDict

from mariotte import instrument, paths
from mariotte.backtest import Backtest
from mariotte.research.storage import Storage, StorageMode, StorageRequest


class MdExporterModelConfig(BaseModel):
    orderbook_file: str
    trades_file: str
    orderbook_levels: int
    skip_empty_levels: bool
    instrument: instrument.InfraInstrument


class MarketData(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    ob: pl.LazyFrame
    tr: pl.LazyFrame


MD_STORAGE_PATH = "/md_storage/"


class MdStorage(Storage[MarketData]):
    executable: str = os.path.join(paths.boyle_path(), "build/md_exporter")
    orderbook_levels: int = 40
    skip_empty_levels: bool = True
    run_config_path: str = "./configs/run_config.json"
    sim_config_path: str = "./configs/simulation_config.json"
    verbose: int = 1

    @staticmethod
    def default(jobs: int) -> MdStorage:
        return MdStorage(
            mode=StorageMode.ReadIgnoreErrors,
            executor=ThreadPoolExecutor(max_workers=jobs),
            base_path=MD_STORAGE_PATH,
            name="main",
        )

    def paths(self, req: StorageRequest) -> tuple[str, str, str, str, str]:
        path = req.full_path()

        orderbook = os.path.join(path, "orderbook.parquet")
        trades = os.path.join(path, "trades.parquet")

        stdout = os.path.join(path, "stdout.txt")
        stderr = os.path.join(path, "stderr.txt")

        return path, orderbook, trades, stdout, stderr

    def read_req(self, req: StorageRequest) -> MarketData:
        _, orderbook, trades, _, _ = self.paths(req)

        ob = pl.scan_parquet(orderbook)
        ob.collect_schema()
        tr = pl.scan_parquet(trades)
        tr.collect_schema()

        return MarketData(ob=ob, tr=tr)

    def write_req(self, req: StorageRequest) -> None:
        path, orderbook, trades, stdout, stderr = self.paths(req)

        os.makedirs(path, exist_ok=True)

        model_config = MdExporterModelConfig(
            orderbook_file=orderbook,
            trades_file=trades,
            orderbook_levels=self.orderbook_levels,
            skip_empty_levels=self.skip_empty_levels,
            instrument=req.instr.to_infra(),
        )

        backtest = Backtest(
            executable=self.executable,
            model=model_config,
            run=self.run_config_path,
            simulation=self.sim_config_path,
            start=req.start,
            end=req.end,
            orders="/dev/null",
            stdout=stdout,
            stderr=stderr,
            verbose=self.verbose,
        )
        backtest.cmd().local_execute()

# process.py

from concurrent.futures import ThreadPoolExecutor

from tap import Tap

from mariotte.research import process, research
from mariotte.research.storage import InstrumentInterval, StorageMode
from mariotte.research.storage_features import FeaturesStorage
from mariotte.research.storage_md import MdStorage
from mariotte.research.storage_processed import ProcessedStorage


class ProcessArgs(Tap):
    intervals: str
    src_name: str = "main"
    dst_name: str = "main"
    mode: StorageMode = StorageMode.Default
    jobs: int = 1
    verbose: int = 2
    md_storage: str | None = None
    ft_storage: str | None = None
    df_storage: str | None = None


def run(args: ProcessArgs):
    intervals = InstrumentInterval.from_file(args.intervals)

    executor = ThreadPoolExecutor(max_workers=args.jobs)

    mds = MdStorage.default(jobs=args.jobs)
    if args.md_storage is not None:
        mds.base_path = args.md_storage
    mds.mode = StorageMode.ReadIgnoreErrors
    mds.executor = executor
    mds.verbose = args.verbose
    mds.name = args.src_name

    fts = FeaturesStorage.default(jobs=args.jobs)
    if args.ft_storage is not None:
        fts.base_path = args.ft_storage
    fts.mode = StorageMode.ReadIgnoreErrors
    fts.executor = executor
    fts.verbose = args.verbose
    fts.name = args.src_name

    md = mds.load_multi(intervals)
    ft = fts.load_multi(intervals)

    data = research.merge_feeds(md, ft, force=False)

    print(f"Found {len(data)} intervals, md: {len(md)}, ft: {len(ft)}")
    dfs = ProcessedStorage.default(process.process_single)
    if args.df_storage is not None:
        dfs.base_path = args.df_storage
    dfs.mode = args.mode
    dfs.executor = executor
    dfs.verbose = args.verbose
    dfs.name = args.dst_name
    df = dfs.wait(dfs.write_async(data))

    total = len(data)
    loaded = len(df)
    print(f"Read {loaded} out of {total} ({(loaded / total * 100):.2f}%)")


def main():
    args = ProcessArgs(underscores_to_dashes=True).parse_args()
    run(args)


if __name__ == "__main__":
    main()

# export_data.py
from concurrent.futures import Future, ThreadPoolExecutor

from tap import Tap

from mariotte import paths
from mariotte.build import build_default
from mariotte.research.storage import InstrumentInterval, StorageMode, StorageRequest
from mariotte.research.storage_features import FeaturesData, FeaturesStorage
from mariotte.research.storage_md import MarketData, MdStorage


class ExportDataArgs(Tap):
    intervals: str
    marketdata: bool = False
    features: bool = False
    no_build: bool = False
    name: str = "main"
    mode: StorageMode = StorageMode.Default
    jobs: int = 10
    verbose: int = 2
    md_storage: str | None = None
    ft_storage: str | None = None


def run(args: ExportDataArgs):
    intervals = InstrumentInterval.from_file(args.intervals)

    if not args.no_build:
        build_default(verbose=args.verbose)

    executor = ThreadPoolExecutor(max_workers=args.jobs)

    md = MdStorage.default(jobs=args.jobs)
    if args.md_storage is not None:
        md.base_path = args.md_storage
    md.mode = args.mode
    md.executor = executor
    md.verbose = args.verbose
    md.name = args.name

    md.executable = paths.copy_binary(md.executable, None)

    ft = FeaturesStorage.default(jobs=args.jobs)
    if args.ft_storage is not None:
        ft.base_path = args.ft_storage
    ft.mode = args.mode
    ft.executor = executor
    ft.verbose = args.verbose
    ft.name = args.name

    ft.executable = paths.copy_binary(ft.executable, None)

    wait_md: list[tuple[StorageRequest, Future[MarketData | None]]] = []
    wait_ft: list[tuple[StorageRequest, Future[FeaturesData | None]]] = []
    for interval in intervals:
        if args.marketdata:
            wait_md = wait_md + md.load_async([interval])
        if args.features:
            wait_ft = wait_ft + ft.load_async([interval])

    md_res = md.wait(wait_md)
    ft_res = ft.wait(wait_ft)

    total = max(1, len(wait_md) + len(wait_ft))
    loaded = len(md_res) + len(ft_res)
    print(f"Read {loaded} out of {total} ({(loaded / total * 100):.2f}%)")


def main():
    args = ExportDataArgs(underscores_to_dashes=True).parse_args()
    run(args)


if __name__ == "__main__":
    main()

# research_base.py

import gc
import os
import typing
from datetime import datetime, timedelta
from glob import glob
from typing import Any

import catboost
import numpy as np
import numpy.typing as npt
import plotly.express as px
import plotly.graph_objects as go
import polars as pl
import polars.selectors as cs
from polars import col
from tqdm.notebook import tqdm

from mariotte import date, exchange_info, instrument, logs, plot, util
from mariotte.research import research
from mariotte.research.storage import StorageMode
from mariotte.research.storage_features import FeaturesStorage
from mariotte.research.storage_md import MdStorage

%load_ext autoreload
%autoreload 2

# Configure Polars
pl.Config().set_tbl_rows(50).set_fmt_str_lengths(100)

md = MdStorage.default(jobs=1)
data = md.load(
    instrument=INSTRUMENT,
    start=START,
    end=END,
)

LVLS = md.orderbook_levels

# fit_research.py

import gc
import os
import typing
from datetime import datetime, timedelta
from glob import glob
from typing import Any

import catboost
import numpy as np
import numpy.typing as npt
import plotly.express as px
import plotly.graph_objects as go
import polars as pl
import polars.selectors as cs
from polars import col
from tqdm.notebook import tqdm

from mariotte import date, exchange_info, instrument, logs, plot, util
from mariotte.exchange_info import ExchangeInfo
from mariotte.research import research
from mariotte.research.storage import InstrumentInterval, StorageMode, StorageRequest
from mariotte.research.storage_features import FeaturesData, FeaturesStorage
from mariotte.research.storage_md import MarketData, MdStorage
from mariotte.train import Pool, Trainer

%load_ext autoreload
%autoreload 2

# Configure Polars
pl.Config().set_tbl_rows(50).set_fmt_str_lengths(100)

mds = MdStorage.default(jobs=1)
fts = FeaturesStorage.default(jobs=1)
md = mds.load_multi(intervals)
ft = fts.load_multi(intervals)
data = research.merge_feeds(md, ft, force=False)
print(len(data))

I need to write a new module, that takes place instead of Storage, and covers same tasks
For now just describe, what this module does now, try to simplify things, but not too much
"""

len(prompt)

17425

In [4]:
chat.add_user_message(prompt)
m.act(chat, max_tokens_per_message = 50000, temperature=0.7, on_message=chat.append, max_prediction_rounds=100)
chat.show_final_answer()



The existing Storage module manages data storage/retrieval for Market Data, Features, and Processed Data. Key tasks:

1. **Core Functionality**:
- Implements a generic Storage base class with mode-driven read/write logic
- Provides specialized subclasses (MdStorage, FeaturesStorage) for different data types
- Handles parallel operations via executors
- Integrates with backtest/exporter binaries to generate data
- Manages file paths and error handling through modes (Read/Write/Ignore)

2. **Key Components**:
- **Storage Modes**: Control behavior for read/write operations (e.g., fail on error or ignore)
- **Request Handling**: Uses StorageRequest objects to track data requests
- **Data Formats**: Works with Parquet files and polars LazyFrames
- **External Tools**: Executes binaries (md_exporter, features_exporter) to generate data

Proposed Simplified Module Structure:

```python
from typing import Any, Dict
import os
import polars as pl
from concurrent.futures import ThreadPoolExecutor

class BaseStorage:
    def __init__(
        self,
        base_path: str,
        mode: str = "default",
        executor: ThreadPoolExecutor = None,
        name: str = "main",
        verbose: int = 1
    ):
        self.base_path = base_path
        self.mode = mode  # "read", "write", "default", "ignore"
        self.executor = executor or ThreadPoolExecutor()
        self.name = name
        self.verbose = verbose

    def _get_path(self, req: Dict[str, Any]) -> str:
        """Construct full path for given request parameters"""
        return os.path.join(
            self.base_path,
            self.name,
            req['market'],
            req['pair'],
            req['start_date_str']
        )

    def read(self, req: Dict[str, Any]) -> Any:
        """Generic read logic implementing mode behavior"""
        path = self._get_path(req)
        if not os.path.exists(path) and self.mode in ("default", "read"):
            raise FileNotFoundError(f"Path {path} doesn't exist")
        elif self.mode == "ignore":
            return None if not os.path.exists(path) else self._read_data(path)
        else:
            return self._read_data(path)

    def write(self, req: Dict[str, Any]) -> None:
        """Generic write logic implementing mode behavior"""
        path = self._get_path(req)
        if self.mode in ("write", "default"):
            self._generate_data(path, req)
            self._save_data(path)
        elif self.mode == "ignore":
            try:
                self._generate_data(path, req)
                self._save_data(path)
            except Exception as e:
                print(f"Warning: Failed to write {path}: {str(e)}")

    def _generate_data(self, path: str, req: Dict[str, Any]) -> None:
        """Implementation-specific data generation (to be overridden)"""
        raise NotImplementedError

    def _read_data(self, path: str) -> Any:
        """Implementation-specific data reading (to be overridden)"""
        raise NotImplementedError

    def _save_data(self, path: str) -> None:
        """Implementation-specific data saving (to be overridden)"""
        raise NotImplementedError

class MdStorage(BaseStorage):
    def __init__(
        self,
        orderbook_levels: int = 40,
        **kwargs
    ):
        super().__init__(**kwargs)
        self.orderbook_levels = orderbook_levels

    def _generate_data(self, path, req):
        # Execute md_exporter binary using req parameters
        # Example: run_backtest_command(req, output_path=path)
        pass

    def _read_data(self, path):
        ob_path = os.path.join(path, "orderbook.parquet")
        tr_path = os.path.join(path, "trades.parquet")
        return {
            "orderbook": pl.scan_parquet(ob_path),
            "trades": pl.scan_parquet(tr_path)
        }

    def _save_data(self, path):
        # Actual saving logic if needed
        pass

class FeaturesStorage(BaseStorage):
    def _generate_data(self, path, req):
        # Execute features_exporter binary
        pass

    def _read_data(self, path):
        return pl.scan_parquet(os.path.join(path, "features.parquet"))

# Usage example:
storage = MdStorage(base_path="/md_storage/")
data = storage.read({
    "market": "binance",
    "pair": "BTC-USDT",
    "start_date_str": "2023-01-01"
})
```

**Key Simplifications:**

1. **Unified Base Class**: Centralizes common logic for:
   - Path construction
   - Mode-based error handling
   - Executor management

2. **Simplified Mode Handling**:
   - 4 modes instead of 5 (removed ReadIgnoreErrors/WriteIgnoreErrors distinction)
   - Clear behavior:
     - "read": Fail if missing
     - "write": Only create new data
     - "default": Read or write if missing
     - "ignore": Silently handle errors

3. **Request Abstraction**:
   - Uses simple dictionaries instead of Pydantic models for requests
   - Removes the need for StorageRequest class

4. **Streamlined Workflow**:
   - Separates generation, reading, and saving into distinct methods
   - Makes extension for new storage types easier

5. **Reduced Overhead**:
   - Eliminates template methods and complex mode enums
   - Simplifies executor usage through default parameters

6. **Clear API**:
   - Public methods (read/write) handle mode logic internally
   - Private methods handle storage-specific operations

**Backward Compatibility Notes**:
- Maintains core functionality while removing unused features
- Can wrap Pydantic models in adapters if needed for existing code
- The simplified mode system is compatible with previous behavior through mapping:
  ```python
  old_mode            new_mode
  -------------------------------
  Default             default
  Write               write
  WriteIgnoreErrors   ignore
  Read                read
  ReadIgnoreErrors    ignore
  ```

This approach maintains all core capabilities while reducing complexity by:
1. Removing unnecessary abstractions
2. Simplifying mode management
3. Using standard Python constructs over heavy type systems
4. Making the storage implementation more intuitive through inheritance hierarchy

In [5]:
prompt = """
No, don't propose new changes yet. Also, backwards compatibility is unnecessary. Describe only existing functionality.
"""
chat.add_user_message(prompt)
m.act(chat, max_tokens_per_message = 50000, temperature=0.7, on_message=chat.append, max_prediction_rounds=100)
chat.show_final_answer()



Here's a detailed description of the existing Storage module's functionality based on the provided code:

### 1. Core Abstraction: `Storage` Base Class
```python
class Storage[T](BaseModel):
    ...
```
- **Purpose**: Generic data storage interface for different data types (market data, features, processed data)
- **Key Features**:
  - **Mode-driven operations**: Uses `StorageMode` enum to control behavior:
    ```python
    class StorageMode(str, Enum):
        Default = "Default"
        Write = "Write"
        WriteIgnoreErrors = "WriteIgnoreErrors"
        Read = "Read"
        ReadIgnoreErrors = "ReadIgnoreErrors"
    ```
  - **Parallel processing**: Uses `Executor` (thread/process pool) for async operations
  - **Request handling**:
    - `load` methods split time ranges into hourly intervals (via `date.split()`)
    - Uses `StorageRequest` objects to track data requests
  - **Lifecycle methods**:
    - `load_req()`: Actual data loading implementation
    - `write_req()`: Actual data generation/write implementation
    - Mode-specific variants (e.g., `read_req()`, `try_write_req()`)
  
### 2. Market Data Storage (`MdStorage`)
```python
class MdStorage(Storage[MarketData]):
    ...
```
- **Purpose**: Store/produce market data (order books and trades)
- **Data format**:
  - Orderbook data: Parquet files (`orderbook.parquet`)
  - Trades data: Parquet files (`trades.parquet`)
- **Implementation details**:
  - Depends on external `md_exporter` binary (C++ executable)
  - Uses `Backtest` class to execute data generation:
    ```python
    backtest = Backtest(
        executable=self.executable,
        model=...,
        start=req.start,
        end=req.end,
        ...
    )
    backtest.cmd().local_execute()
    ```
  - Path structure:
    ```text
    <base_path>/<storage_name>/<market>/<pair>/<date_str>/
      ├── orderbook.parquet
      ├── trades.parquet
      ├── stdout.txt
      └── stderr.txt
    ```
- **Reading data**:
  - Uses `pl.scan_parquet()` for lazy loading of Parquet files
  - Returns `MarketData` object with Polars LazyFrames

### 3. Features Storage (`FeaturesStorage`)
```python
class FeaturesStorage(Storage[FeaturesData]):
    ...
```
- **Purpose**: Store/produce derived features data
- **Data format**:
  - Features data: Parquet files (`features.parquet`)
- **Implementation details**:
  - Depends on external `allin_features_exporter` binary
  - Uses similar `Backtest`-based execution as `MdStorage`
  - Path structure:
    ```text
    <base_path>/<storage_name>/<market>/<pair>/<date_str>/
      ├── features.parquet
      ├── stdout.txt
      └── stderr.txt
    ```
- **Reading data**:
  - Returns `FeaturesData` with Polars LazyFrame

### 4. Processed Data Storage (`ProcessedStorage`)
```python
class ProcessedStorage(Storage[ProcessedData]):
    ...
```
- **Purpose**: Store final processed/merged datasets
- **Data flow**:
  1. Merges market data and features data using `research.merge_feeds()`
  2. Saves the combined dataset
- **Implementation details**:
  - Uses `process.process_single()` for data transformation
  - Relies on `Executor` for parallel processing

### 5. Key Supporting Components
#### a. `StorageRequest` Object
```python
class StorageRequest(BaseModel):
    base_path: str
    instr: instrument.Instrument
    start: datetime
    end: datetime
    name: str
    addt: Any  # additional data
```
- Encapsulates all parameters needed to locate/produce data:
  - Path components (market, pair, date)
  - Time range (start/end)
  - Storage configuration (name, base path)

#### b. Path Construction
```python
def full_path(self) -> str:
    return os.path.join(
        self.base_path,
        self.name,
        self.instr.market,
        self.instr.pair,
        date.to_str(self.start)
    )
```
- Generates standardized paths for data storage locations
- Uses date formatting from `date.to_str()`

#### c. Asynchronous Operations
- **`load_async()`**: Returns list of `(StorageRequest, Future)` tuples
- **`wait()`**: Waits for async operations to complete and aggregates results
- Uses Python `concurrent.futures` for parallelism

### 6. External Dependencies
- **Binaries**:
  - `md_exporter` (market data generation)
  - `allin_features_exporter` (features generation)
- **Execution Environment**:
  - Requires compiled binaries in `./build/` directory
  - Depends on `Backtest` class for command execution
- **Data Infrastructure**:
  - Relies on Parquet file format
  - Uses Polars library for data handling

### 7. Usage Patterns
#### a. Data Export (via `export_data.py`)
```python
def run(args):
    intervals = InstrumentInterval.from_file(...)
    md = MdStorage(...)
    ft = FeaturesStorage(...)
    md_res = md.wait(md.load_async(...))
    ft_res = ft.wait(ft.load_async(...))
```
- Parallel execution of data generation across intervals
- Handles both market data and features in separate workflows

#### b. Research/Processing (via `fit_research.py`)
```python
md = mds.load(...)
ft = fts.load(...)
data = research.merge_feeds(md, ft, force=False)
dfs = ProcessedStorage(...)
dfs.write_async(data)
```
- Combines raw data from multiple sources
- Persists final processed datasets

#### c. Storage Mode Usage
```python
# Read-only mode (fail on missing data)
storage = MdStorage(mode=StorageMode.Read)

# Write mode (only generate new data)
storage = FeaturesStorage(mode=StorageMode.Write)

# Default mode (read or generate if missing)
storage = ProcessedStorage(mode=StorageMode.Default)
```

### 8. Key Design Choices
- **Layered architecture**: Clear separation between:
  1. Core storage logic (Storage base class)
  2. Data type implementations (MdStorage/FeaturesStorage)
  3. Workflow coordination (process/export scripts)
- **External tool integration**: Uses compiled binaries for performance-critical data generation
- **Lazy evaluation**: Uses Polars LazyFrames for deferred execution
- **Error handling**: Mode system allows flexible failure handling (e.g., ignore missing data in research workflows)

This module forms the core data infrastructure for handling:
1. Raw market data ingestion
2. Feature engineering
3. Final dataset preparation
4. Parallel processing of large time ranges
5. Production-quality data generation via compiled tools

In [6]:
prompt = """
Ok, now keep in mind the existing structure, and let's create a concept of a new one. Firstly - ignore all different formats (md, features, etc.), the concept only covers generic case.
As the Storage thing generally is used for asynchronously loading data, this is the default behaviour. Skipping errors (if something is not found) is a separate argument.
So, modes (can be named differently):
Load data. Variants: generate if empty | raise error if empty | skip if empty
Generate and save data: Variants: raise errors or skip if generator is working unexpected

Both should utilize parallel execution, with two variants: local, or on passed ray cluster (don't dive into details, assume it's Cluster() object)
Also, resources allocation control should be present, perhaps, number of concurrent workers.

With this in mind, give me a concept code sample. Just structure of methods, arguments (you still can write pydantic or tap classes, if needed) with brief descriptions - no real logic.
"""
chat.add_user_message(prompt)
m.act(chat, max_tokens_per_message = 50000, temperature=0.7, on_message=chat.append, max_prediction_rounds=100)
chat.show_final_answer()




Here's a concept design for a generic, mode-driven storage system with parallel execution control:

```python
from enum import Enum
from typing import Any, Dict, Optional, Callable, Union
from dataclasses import dataclass
from concurrent.futures import Executor, Future

class LoadMode(Enum):
    GENERATE = "generate"
    ERROR = "error"
    SKIP = "skip"

class GenerateMode(Enum):
    RAISE = "raise"
    IGNORE = "ignore"

@dataclass
class DataKey:
    identifier: str  # e.g., "BTC-USD-2023-01-01"

class Storage:
    def __init__(
        self,
        base_path: str,
        load_mode: LoadMode = LoadMode.GENERATE,
        generate_mode: GenerateMode = GenerateMode.RAISE,
        executor: Optional[Executor] = None,
        max_workers: int = 5,
        cluster: Optional[Any] = None,  # Assume Ray Cluster object
    ):
        self.base_path = base_path
        self.load_mode = load_mode
        self.generate_mode = generate_mode
        self.max_workers = max_workers
        self.executor = executor or self._setup_executor(cluster)
        self.key_to_future: Dict[DataKey, Future] = {}  # Track in-progress tasks

    def _setup_executor(self, cluster):
        if cluster:
            return RayExecutor(cluster, self.max_workers)
        else:
            from concurrent.futures import ThreadPoolExecutor
            return ThreadPoolExecutor(max_workers=self.max_workers)

    def get_path(self, key: DataKey) -> str:
        return os.path.join(self.base_path, key.identifier)

    def exists(self, key: DataKey) -> bool:
        return os.path.exists(self.get_path(key))

    def load_sync(self, key: DataKey) -> Optional[Any]:
        """Synchronous blocking load"""
        if self.exists(key):
            return self._read_data(key)
        else:
            return self._handle_missing(key)

    def load(self, key: DataKey) -> Future:
        """Asynchronous load with mode logic"""
        if key in self.key_to_future:
            return self.key_to_future[key]
        
        future = self.executor.submit(self._load_task, key)
        self.key_to_future[key] = future
        future.add_done_callback(lambda f: self.key_to_future.pop(key, None))
        return future

    def _load_task(self, key: DataKey) -> Optional[Any]:
        try:
            return self.load_sync(key)
        except Exception as e:
            if self.generate_mode == GenerateMode.RAISE:
                raise
            else:
                return None

    def _handle_missing(self, key: DataKey) -> Optional[Any]:
        if self.load_mode == LoadMode.GENERATE:
            return self.generate(key)
        elif self.load_mode == LoadMode.ERROR:
            raise KeyError(f"Data not found for {key}")
        else:  # SKIP
            return None

    def generate(self, key: DataKey) -> Optional[Any]:
        """Attempt to generate data for missing key"""
        try:
            data = self._generate_data(key)
            self._save_data(key, data)
            return data
        except Exception as e:
            if self.generate_mode == GenerateMode.RAISE:
                raise
            else:
                logger.warning(f"Generation failed for {key}: {e}")
                return None

    def _generate_data(self, key: DataKey) -> Any:
        """Implement actual generation logic here"""
        raise NotImplementedError

    def _read_data(self, key: DataKey) -> Any:
        """Implement actual read logic here"""
        raise NotImplementedError

    def _save_data(self, key: DataKey, data: Any) -> None:
        """Implement actual save logic here"""
        raise NotImplementedError

# Example usage with configuration options:
class MyStorage(Storage):
    def _generate_data(self, key):
        # Example implementation
        return generate_data_func(key)

    def _read_data(self, key):
        return read_from_disk(self.get_path(key))

    def _save_data(self, key, data):
        save_to_disk(data, self.get_path(key))

# Configuration options
my_storage = MyStorage(
    base_path="/data/storage",
    load_mode=LoadMode.GENERATE,
    generate_mode=GenerateMode.IGNORE,
    max_workers=10,
    cluster=RayCluster(address="auto")  # Optional Ray cluster
)
```

### Key Components & Features:
1. **Mode System**:
   - `LoadMode`: Controls missing data handling
     - GENERATE: Attempt to generate missing data
     - ERROR: Raise exception if missing
     - SKIP: Return None for missing data
   - `GenerateMode`: Controls generation error handling
     - RAISE: Propagate exceptions from generation
     - IGNORE: Suppress exceptions (return None)

2. **Parallel Execution**:
   - Works with standard executors (ThreadPool, ProcessPool)
   - Ray integration via `cluster` parameter
   - Automatic task tracking with `key_to_future` cache
   - Limits concurrent tasks via `max_workers`

3. **Abstraction Layers**:
   - `_generate_data()`: Core data creation logic (to be implemented)
   - `_read_data()/_save_data()`: I/O operations (format-specific)
   - Path management via `get_path()`

4. **Asynchronous API**:
   - `load()` returns `Future` for async operations
   - `load_sync()` provides blocking access
   - Automatic retry management for in-progress tasks

5. **Error Handling**:
   - Separation between load/generation error handling
   - Graceful error suppression with logging
   - Clear exception propagation options

### Usage Patterns:
```python
# Asynchronous loading with error suppression
future = my_storage.load(DataKey("BTC-USD-2023-01"))
data = future.result()  # Block until completion

# Bulk loading with parallel execution
keys = [DataKey(f"BTC-USD-{date}") for date in date_list]
futures = [my_storage.load(k) for k in keys]
results = [f.result() for f in futures]

# Explicit generation (ignores load_mode)
data = my_storage.generate(DataKey("ETH-USD-2023-02"))
```

### Extensibility:
- Subclasses implement only data-specific methods (`_generate/read/save`)
- Works with any data formats (Parquet, HDF5, etc.)
- Can be adapted to cloud storage (S3/GCS) with path adjustments
- Executor pluggability allows different parallelism strategies

This design maintains the core functionality of the original system while:
1. Simplifying the mode system into orthogonal dimensions
2. Providing clear separation between core logic and I/O
3. Supporting modern async programming patterns
4. Allowing integration with distributed computing frameworks
5. Improving maintainability through better abstraction boundaries