# Frame Library Sandbox

A temporal dataframe wrapper with parquet caching and concurrent processing.

In [None]:
from datetime import datetime, timedelta
import pandas as pd
import polars as pl
import numpy as np
import shutil
from pathlib import Path

from frame import Frame, PandasBackend, PolarsBackend, LazyFrame, CacheMode, CacheMissError, ChunkGranularity

## Basic Usage

Create a data-fetching function that accepts `start_dt` and `end_dt` parameters.

In [None]:
# Sample data function - simulates fetching price data
def fetch_prices(start_dt: datetime, end_dt: datetime, ticker: str = "AAPL"):
    """Simulate fetching daily price data for a ticker."""
    print(f"Fetching {ticker} from {start_dt.date()} to {end_dt.date()}")
    
    dates = pd.date_range(start_dt, end_dt, freq="D")
    np.random.seed(hash(ticker) % 2**32)  # Reproducible per ticker
    
    records = []
    base_price = 100 + hash(ticker) % 100
    for i, dt in enumerate(dates):
        records.append({
            "as_of_date": dt.to_pydatetime(),
            "id": ticker,
            "price": base_price + np.random.randn() * 5 + i * 0.1,
            "volume": int(1e6 + np.random.randn() * 1e5),
        })
    
    df = pd.DataFrame(records)
    return df.set_index(["as_of_date", "id"])

In [None]:
# Clean up any existing cache for fresh start
cache_dir = Path(".frame_cache")
if cache_dir.exists():
    shutil.rmtree(cache_dir)

# Create a Frame wrapping the fetch function
prices = Frame(
    fetch_prices,
    {"ticker": "AAPL"},
    backend="pandas",
    chunk_granularity="month",  # Cache in monthly chunks (also supports "day", "week", "year")
)

In [None]:
df = prices.get(datetime(2025, 1, 1))

In [None]:
df

In [None]:
# Fetch data for a date range
start = datetime(2024, 1, 1)
end = datetime(2024, 1, 15)

df = prices.get_range(start, end)
df.head(10)

In [None]:
# Fetch again - this time from cache (no "Fetching..." print)
df2 = prices.get_range(start, end)
print("Data fetched from cache!")
df2.head()

In [None]:
# Get data for a single date (drops as_of_date from index)
single_day = prices.get(datetime(2024, 1, 10))
single_day

## Caching Behavior

Data is cached in parquet chunks. Extending the date range only fetches missing chunks.

In [None]:
# Check cache directory
cache_files = list(cache_dir.rglob("*.parquet"))
for f in cache_files:
    print(f"  {f.relative_to(cache_dir)}")

In [None]:
# Extend the date range - only fetches the new chunk
extended = prices.get_range(datetime(2024, 1, 1), datetime(2024, 2, 15))
print(f"\nTotal rows: {len(extended)}")

## Cache Modes

Control cache behavior per-call with the `cache_mode` parameter:

| Mode | Name | Read Cache | Write Cache | Fetch Live |
|------|------|------------|-------------|------------|
| `"a"` | Append (default) | Yes | Yes (new data only) | Yes (for missing) |
| `"l"` | Live | No | No | Always |
| `"r"` | Read-only | Yes | No | No (raise error if missing) |
| `"w"` | Write/Refresh | No | Yes (overwrite) | Always |

In [None]:
# Clean cache for fresh demo
if cache_dir.exists():
    shutil.rmtree(cache_dir)

prices = Frame(fetch_prices, {"ticker": "AAPL"}, chunk_granularity="month")

# Default mode "a" (append): read from cache, fetch missing, cache new data
start, end = datetime(2024, 1, 1), datetime(2024, 1, 10)
df1 = prices.get_range(start, end)  # Fetches and caches
print("First call (mode='a'): fetched from source, cached")

df2 = prices.get_range(start, end)  # From cache
print("Second call (mode='a'): served from cache (no fetch)")
df2.head()

In [None]:
# Live mode "l": bypass cache entirely, always fetch fresh
df_live = prices.get_range(start, end, cache_mode="l")
print("Live mode: always fetches fresh data (even if cached)")
df_live.head()

In [None]:
# Write mode "w": force refresh cache (fetch live and overwrite)
df_write = prices.get_range(start, end, cache_mode="w")
print("Write mode: fetched fresh and overwrote cache")
df_write.head()

In [None]:
# Read-only mode "r": only read from cache, raise error if missing
try:
    # This date range is not cached, so it will raise CacheMissError
    uncached_range = prices.get_range(datetime(2024, 6, 1), datetime(2024, 6, 10), cache_mode="r")
except CacheMissError as e:
    print(f"CacheMissError: {e}")

# But this works because Jan 1-10 was cached above
df_readonly = prices.get_range(start, end, cache_mode="r")
print("\nRead-only mode: served from cache successfully")
df_readonly.head()

## Hierarchical Cache Levels

Support multiple read-only parent cache directories with one primary write cache.
Useful for team/shared caches where you want to read from a shared cache but write to a personal one.

In [None]:
# Setup: Create a "shared" cache with some data
shared_cache = Path(".frame_cache_shared")
personal_cache = Path(".frame_cache_personal")

# Clean both caches
for c in [shared_cache, personal_cache]:
    if c.exists():
        shutil.rmtree(c)

# First, populate the "shared" cache with January data
shared_frame = Frame(fetch_prices, {"ticker": "AAPL"}, cache_dir=shared_cache, chunk_granularity="month")
_ = shared_frame.get_range(datetime(2024, 1, 1), datetime(2024, 1, 31))
print("Shared cache populated with January 2024 data")

In [None]:
# Create a Frame with hierarchical caching:
# - Primary cache (read/write): personal_cache
# - Parent cache (read-only): shared_cache
hierarchical_frame = Frame(
    fetch_prices,
    {"ticker": "AAPL"},
    cache_dir=personal_cache,
    parent_cache_dirs=[shared_cache],  # Read-only parent caches
    chunk_granularity="month",
)

# Request January data - should come from shared_cache (no fetch!)
jan_data = hierarchical_frame.get_range(datetime(2024, 1, 1), datetime(2024, 1, 15))
print("January: served from shared cache (no fetch)")
print(f"Got {len(jan_data)} rows")

# Request February data - not in any cache, so fetch and write to personal_cache
feb_data = hierarchical_frame.get_range(datetime(2024, 2, 1), datetime(2024, 2, 15))
print(f"\nFebruary: fetched and cached to personal cache")
print(f"Got {len(feb_data)} rows")

In [None]:
# Verify cache structure - February was written to personal cache, not shared
print("Shared cache files:")
for f in shared_cache.rglob("*.parquet"):
    print(f"  {f.relative_to(shared_cache)}")

print("\nPersonal cache files:")
for f in personal_cache.rglob("*.parquet"):
    print(f"  {f.relative_to(personal_cache)}")

# Cleanup
for c in [shared_cache, personal_cache]:
    if c.exists():
        shutil.rmtree(c)

## Chunk Granularity

Control how data is chunked in the cache with `chunk_granularity`:

| Granularity | Chunk Key Format | Example Path |
|-------------|------------------|--------------|
| `"day"` | `YYYY/MM/DD` | `cache/<key>/2024/01/15.parquet` |
| `"week"` | `YYYY/W##` | `cache/<key>/2024/W03.parquet` |
| `"month"` | `YYYY/MM` | `cache/<key>/2024/01.parquet` |
| `"year"` | `YYYY` | `cache/<key>/2024.parquet` |

In [None]:
# Example: Day-level granularity (one file per day)
if cache_dir.exists():
    shutil.rmtree(cache_dir)

daily_frame = Frame(
    fetch_prices,
    {"ticker": "AAPL"},
    chunk_granularity="day",  # One parquet file per day
)

_ = daily_frame.get_range(datetime(2024, 3, 1), datetime(2024, 3, 5))

print("Day granularity cache structure:")
for f in sorted(cache_dir.rglob("*.parquet")):
    print(f"  {f.relative_to(cache_dir)}")

In [None]:
# Example: Year-level granularity (one file per year)
if cache_dir.exists():
    shutil.rmtree(cache_dir)

yearly_frame = Frame(
    fetch_prices,
    {"ticker": "AAPL"},
    chunk_granularity="year",  # One parquet file per year
)

_ = yearly_frame.get_range(datetime(2024, 1, 1), datetime(2024, 12, 31))

print("Year granularity cache structure:")
for f in sorted(cache_dir.rglob("*.parquet")):
    print(f"  {f.relative_to(cache_dir)}")

# Cleanup
if cache_dir.exists():
    shutil.rmtree(cache_dir)

## Nested Frames with Lazy Batching

When a Frame's function calls another Frame's `get_range()`, the calls are batched and executed concurrently.

In [None]:
# Clean cache for demonstration
if cache_dir.exists():
    shutil.rmtree(cache_dir)

In [None]:
# Define a function that depends on another Frame
def compute_returns(start_dt: datetime, end_dt: datetime, price_frame: Frame):
    """Compute daily returns from price data."""
    print(f"Computing returns from {start_dt.date()} to {end_dt.date()}")
    
    # This returns a LazyFrame when called from within another Frame
    prices = price_frame.get_range(start_dt, end_dt)
    
    df = prices.copy()
    df["return"] = df["price"].pct_change()
    return df

# Create the nested Frame
prices = Frame(fetch_prices, {"ticker": "AAPL"})
returns = Frame(compute_returns, {"price_frame": prices})

In [None]:
# Fetch returns - the nested price fetch is batched
result = returns.get_range(datetime(2024, 1, 1), datetime(2024, 1, 10))
result

In [None]:
# Multiple nested dependencies - all batched together
def compute_spread(start_dt: datetime, end_dt: datetime, frame1: Frame, frame2: Frame):
    """Compute price spread between two tickers."""
    print(f"Computing spread from {start_dt.date()} to {end_dt.date()}")
    
    # Both of these become LazyFrames, resolved concurrently
    p1 = frame1.get_range(start_dt, end_dt)
    p2 = frame2.get_range(start_dt, end_dt)
    
    df = p1.copy()
    df["spread"] = p1["price"].values - p2["price"].values
    return df

aapl = Frame(fetch_prices, {"ticker": "AAPL"})
googl = Frame(fetch_prices, {"ticker": "GOOGL"})
spread = Frame(compute_spread, {"frame1": aapl, "frame2": googl})

In [None]:
# Both AAPL and GOOGL fetches happen concurrently
spread_data = spread.get_range(datetime(2024, 2, 1), datetime(2024, 2, 10))
spread_data

## Async API

Use `aget_range()` and `aget()` for async operations.

In [None]:
import asyncio

# Clean cache
if cache_dir.exists():
    shutil.rmtree(cache_dir)

In [None]:
# Fetch multiple frames concurrently with asyncio.gather
aapl = Frame(fetch_prices, {"ticker": "AAPL"})
googl = Frame(fetch_prices, {"ticker": "GOOGL"})
msft = Frame(fetch_prices, {"ticker": "MSFT"})

async def fetch_all():
    start = datetime(2024, 4, 1)
    end = datetime(2024, 4, 10)
    
    results = await asyncio.gather(
        aapl.aget_range(start, end),
        googl.aget_range(start, end),
        msft.aget_range(start, end),
    )
    return results

aapl_df, googl_df, msft_df = await fetch_all()
print(f"AAPL: {len(aapl_df)} rows")
print(f"GOOGL: {len(googl_df)} rows")
print(f"MSFT: {len(msft_df)} rows")

## Polars Backend

Switch to Polars by setting `backend="polars"`.

In [None]:
# Clean cache
if cache_dir.exists():
    shutil.rmtree(cache_dir)

In [None]:
# Data function that returns Polars DataFrame
def fetch_prices_polars(start_dt: datetime, end_dt: datetime, ticker: str = "AAPL"):
    """Simulate fetching daily price data for a ticker (Polars version)."""
    print(f"Fetching {ticker} from {start_dt.date()} to {end_dt.date()}")
    
    dates = pd.date_range(start_dt, end_dt, freq="D")
    np.random.seed(hash(ticker) % 2**32)
    
    base_price = 100 + hash(ticker) % 100
    
    return pl.DataFrame({
        "as_of_date": [dt.to_pydatetime() for dt in dates],
        "id": [ticker] * len(dates),
        "price": [base_price + np.random.randn() * 5 + i * 0.1 for i in range(len(dates))],
        "volume": [int(1e6 + np.random.randn() * 1e5) for _ in range(len(dates))],
    })

In [None]:
# Create Frame with Polars backend
prices_pl = Frame(
    fetch_prices_polars,
    {"ticker": "AAPL"},
    backend="polars",
)

df = prices_pl.get_range(datetime(2024, 5, 1), datetime(2024, 5, 10))
print(type(df))
df

## Operations Layer

Operations are Frame-like objects that wrap Frames and apply transformations declaratively.
They preserve concurrent data fetching when used alongside Frames.

In [None]:
# Import operations
from frame import Rolling, Shift, Diff, Abs, Pct, Add, Sub, Mul, Div

# Clean cache for fresh start
if cache_dir.exists():
    shutil.rmtree(cache_dir)

# Create a base prices Frame
prices = Frame(fetch_prices, {"ticker": "AAPL"})

### Unary Operations

Operations that transform a single Frame.

In [None]:
# Rolling window average (5-day moving average)
rolling_avg = Rolling(prices, window=5, func="mean")

start = datetime(2024, 1, 1)
end = datetime(2024, 1, 15)

result = rolling_avg.get_range(start, end)
result

In [None]:
# Shift/Lag operation - shift data by N periods
shifted = Shift(prices, periods=1)
shifted.get_range(start, end)

In [None]:
# Diff - compute difference between consecutive values
diff = Diff(prices, periods=1)
diff.get_range(start, end)

In [None]:
# Pct - percentage change
pct_change = Pct(prices, periods=1)
pct_change.get_range(start, end)

### Binary Operations

Operations that combine two Frames (or a Frame and a scalar).

In [None]:
# Create a second prices Frame
googl = Frame(fetch_prices, {"ticker": "GOOGL"})

# Add two Frames element-wise
combined = Add(prices, googl)
combined.get_range(start, end)

In [None]:
# Subtract: compute price spread between two stocks
spread = Sub(prices, googl)
spread.get_range(start, end)

In [None]:
# Scalar operations - multiply prices by a constant
scaled = Mul(prices, 1.1)  # 10% markup
scaled.get_range(start, end)

### Chaining Operations

Operations can be chained together - an operation can wrap another operation.

In [None]:
# Chain: prices -> daily returns -> rolling average of returns
returns = Pct(prices, periods=1)
smoothed_returns = Rolling(returns, window=3, func="mean")

smoothed_returns.get_range(start, end)

In [None]:
# Complex pipeline: price - lagged_price (equivalent to diff)
lagged = Shift(prices, periods=1)
daily_change = Sub(prices, lagged)

daily_change.get_range(start, end)

### Concurrent Resolution

Operations preserve the concurrent fetching behavior. When multiple operations share
the same underlying Frame, the data is fetched only once.

In [None]:
# Clean cache to see fetch behavior
if cache_dir.exists():
    shutil.rmtree(cache_dir)

# Create multiple operations on the same Frame
prices = Frame(fetch_prices, {"ticker": "AAPL"})
rolling_5 = Rolling(prices, window=5)
rolling_10 = Rolling(prices, window=10)
shifted = Shift(prices, periods=1)

# When used in a batch context, prices is fetched only once
# and shared by all operations
from frame.executor import set_batch_context, reset_batch_context, resolve_batch_sync

batch = []
token = set_batch_context(batch)

try:
    # All three operations add their lazy inputs to the batch
    lazy_r5 = rolling_5.get_range(start, end)
    lazy_r10 = rolling_10.get_range(start, end)
    lazy_s = shifted.get_range(start, end)
    
    print(f"Items in batch: {len(batch)}")
    print(f"  - LazyOperations (rolling_5, rolling_10, shifted)")
    print(f"  - LazyFrames (prices - shared dependency)")
    
    # Resolve all at once
    resolve_batch_sync(batch)
    
    print("\nAll resolved successfully!")
finally:
    reset_batch_context(token)

### Single Date Access

Operations also support `get()` for single date access.

In [None]:
# Get rolling average for a single date
rolling_avg = Rolling(prices, window=5)
single_day = rolling_avg.get(datetime(2024, 1, 15))
single_day

### Async Operations

Operations support async methods: `aget_range()` and `aget()`.

In [None]:
# Async fetch of operation results
rolling = Rolling(prices, window=5)

async def fetch_rolling():
    return await rolling.aget_range(start, end)

result = await fetch_rolling()
result.head()

## In-Memory Cache Layer

The Frame library includes a module-level, thread-safe, LRU-based in-memory cache that stores DataFrame chunks after first disk read. This eliminates redundant disk I/O for temporal access patterns.

In [None]:
# Import memory cache utilities
from frame import (
    get_memory_cache_stats,
    clear_memory_cache,
    configure_memory_cache,
    CacheStats,
    CacheConfig,
)

# Clean up cache and reset memory cache
if cache_dir.exists():
    shutil.rmtree(cache_dir)
clear_memory_cache()

print("Memory cache cleared. Starting fresh demo...")

In [None]:
# Use CacheManager directly to demonstrate memory cache
from frame.cache import CacheManager
from frame.backends.pandas import PandasBackend

backend = PandasBackend()
cache = CacheManager(fetch_prices, {"ticker": "AAPL"}, backend, cache_dir)

# Write a chunk to disk
chunk_start = datetime(2024, 6, 1)
chunk_end = datetime(2024, 6, 30)

df = fetch_prices(chunk_start, chunk_end, ticker="AAPL")
cache.write_chunk(df, chunk_start, chunk_end)
print(f"Wrote chunk with {len(df)} rows to disk")

In [None]:
# First read - memory cache miss, disk read, populates memory cache
print("First read from disk:")
result1 = cache.read_chunk(chunk_start, chunk_end)

stats = get_memory_cache_stats()
print(f"  Memory cache stats: hits={stats.hits}, misses={stats.misses}, entries={stats.current_entries}")
print(f"  Memory usage: {stats.current_memory_bytes:,} bytes")

In [None]:
# Second read - memory cache HIT (no disk read!)
print("Second read (should hit memory cache):")
result2 = cache.read_chunk(chunk_start, chunk_end)

stats = get_memory_cache_stats()
print(f"  Memory cache stats: hits={stats.hits}, misses={stats.misses}")
print(f"  Hit rate: {stats.hit_rate:.1f}%")

In [None]:
# Configure memory cache limits
configure_memory_cache(max_entries=256)  # Increase max cached chunks
configure_memory_cache(max_memory_bytes=100 * 1024 * 1024)  # 100 MB limit

print("Cache configured with:")
print("  - max_entries: 256")
print("  - max_memory_bytes: 100 MB")

In [None]:
# Cache invalidation on writes - writing to disk invalidates memory cache
print("Before write - cache entries:", get_memory_cache_stats().current_entries)

# Write new data to the same chunk (simulates data update)
df_updated = fetch_prices(chunk_start, chunk_end, ticker="AAPL")
cache.write_chunk(df_updated, chunk_start, chunk_end)

print("After write - cache entries:", get_memory_cache_stats().current_entries)
print("  -> Memory cache invalidated on write (write-through policy)")

In [None]:
# Clear the entire memory cache
# Read to populate cache again first
_ = cache.read_chunk(chunk_start, chunk_end)
print("After read - entries:", get_memory_cache_stats().current_entries)

# Now clear
cleared_count = clear_memory_cache()
print(f"Cleared {cleared_count} entries from memory cache")
print("After clear - entries:", get_memory_cache_stats().current_entries)

In [None]:
# Memory cache uses composite keys: (path, columns, filters)
# Different column selections create separate cache entries
clear_memory_cache()

# Read all columns
df_all = cache.read_chunk(chunk_start, chunk_end)
print(f"Read all columns - entries: {get_memory_cache_stats().current_entries}")

# Read only specific columns (different cache key!)
df_price_only = cache.read_chunk(chunk_start, chunk_end, columns=["price"])
print(f"Read 'price' column only - entries: {get_memory_cache_stats().current_entries}")

# Read with different columns (another cache key)
df_volume_only = cache.read_chunk(chunk_start, chunk_end, columns=["volume"])
print(f"Read 'volume' column only - entries: {get_memory_cache_stats().current_entries}")

print("\nEach unique (path, columns, filters) combo gets its own cache entry!")

In [None]:
# Disable memory cache if needed (e.g., for debugging or memory-constrained environments)
configure_memory_cache(enabled=False)
print("Memory cache disabled")

# Reads now always go to disk
_ = cache.read_chunk(chunk_start, chunk_end)
print(f"After read with cache disabled - entries: {get_memory_cache_stats().current_entries}")

# Re-enable for normal operation
configure_memory_cache(enabled=True)
print("\nMemory cache re-enabled")

### Memory Cache API Summary

| Function | Description |
|----------|-------------|
| `get_memory_cache_stats()` | Get current cache statistics (hits, misses, entries, memory) |
| `clear_memory_cache()` | Clear all entries, returns count cleared |
| `configure_memory_cache(**kwargs)` | Configure cache settings |

**Configuration options:**
- `enabled` (bool): Enable/disable cache (default: True)
- `max_entries` (int): Max cached chunks, 0 = unlimited (default: 128)
- `max_memory_bytes` (int): Max memory usage, 0 = unlimited (default: 0)

**Key features:**
- Thread-safe with fine-grained locking
- LRU eviction when limits exceeded
- Composite keys: (path, columns, filters)
- Write-through invalidation on disk writes
- Works with both pandas and polars DataFrames

In [None]:
# Remove cache directory when done experimenting
if cache_dir.exists():
    shutil.rmtree(cache_dir)
    print("Cache cleaned up!")