# OHLCV Pipeline Integration Test

This notebook verifies the end-to-end OHLCV pipeline:
1. Data Loading & Splitting with `OHLCVLoader`
2. Normalization with `OHLCVPackedScaler`
3. Minimal fine-tuning run with `MoiraiFinetune`

## Step 1: Data Loading & Splitting

Use `OHLCVLoader` to create train/test datasets for AAPL.

In [45]:
import os
import torch
import numpy as np
import pandas as pd
from pathlib import Path
from datasets import load_from_disk
from uni2ts.data.ohlcvloader import OHLCVLoader

# Constants
DATA_PATH = "/opt/uni2ts/data/processed_equities/5m/"
OUTPUT_DIR = "/opt/uni2ts/data/test_splits/"
SYMBOL = "AAPL"
SPLIT_DATE = "2025-01-01"

# Initialize Loader
loader = OHLCVLoader(
    data_path=DATA_PATH,
    freq="5min",
    verbose=True
)

# Create splits
train_ds, test_ds = loader.create_train_test_splits(
    symbol=SYMBOL,
    output_dir=OUTPUT_DIR,
    split_date=SPLIT_DATE,
    gap_fill_strategy="fill_weekend"
)

print(f"\nâœ“ Train dataset size: {len(train_ds)}")
print(f"âœ“ Test dataset size: {len(test_ds)}")


OHLCVLoader Initialized
  Data path: /opt/uni2ts/data/processed_equities/5m
  Frequency: 5min
  Timezone: America/New_York
  Market Hours: AUTO-DETECT


Creating Train/Test Splits for AAPL
  Split date: 2025-01-01
  Train: < 2025-01-01
  Test: >= 2025-01-01
  âœ“ Validating DataFrame structure...
    âœ“ All validations passed
    âœ“ 500586 rows validated

ðŸ“Š Data split:
  Total rows: 500586
  Train split: 489666 rows
  Test split: 10920 rows
  Train: 97.8%
  Test: 2.2%

ðŸš€ Building TRAIN dataset...


Saving the dataset (0/1 shards):   0%|          | 0/1 [00:00<?, ? examples/s]

  âœ“ Train dataset saved: /opt/uni2ts/data/test_splits/AAPL_train
     Time steps: 2629230
     Features: (6, 2629230)

ðŸš€ Building TEST dataset...


Saving the dataset (0/1 shards):   0%|          | 0/1 [00:00<?, ? examples/s]

  âœ“ Test dataset saved: /opt/uni2ts/data/test_splits/AAPL_test
     Time steps: 58818
     Features: (6, 58818)

âœ“ Done! Train: /opt/uni2ts/data/test_splits/AAPL_train, Test: /opt/uni2ts/data/test_splits/AAPL_test
  Combined features: [open, high, low, volume, min_since_open, day_of_week]

âœ“ Train dataset size: 1
âœ“ Test dataset size: 1


## Step 2: Verification of Scaler on Loaded Data

Simulate a batch and verify `OHLCVPackedScaler` statistics.

In [46]:
from einops import rearrange, repeat
from uni2ts.module.packed_scaler import OHLCVPackedScaler

# Load sample from train dataset
entry = train_ds[0]
entry

{'item_id': 'AAPL',
 'start': 946909800,
 'freq': '5min',
 'target': [0.9336000084877014,
  0.9247000217437744,
  0.9262999892234802,
  0.9341999888420105,
  0.9311000108718872,
  0.923799991607666,
  0.9279999732971191,
  0.9279999732971191,
  0.9352999925613403,
  0.9350000023841858,
  0.9319000244140625,
  0.9419999718666077,
  0.9358000159263611,
  0.9262999892234802,
  0.9240999817848206,
  0.9240999817848206,
  0.9196000099182129,
  0.909600019454956,
  0.9174000024795532,
  0.9179999828338623,
  0.9146000146865845,
  0.9135000109672546,
  0.9162999987602234,
  0.9275000095367432,
  0.9262999892234802,
  0.9275000095367432,
  0.9262999892234802,
  0.928600013256073,
  0.929099977016449,
  0.9279999732971191,
  0.9301999807357788,
  0.9258000254631042,
  0.9279999732971191,
  0.9297000169754028,
  0.9275000095367432,
  0.9279999732971191,
  0.9319000244140625,
  0.9308000206947327,
  0.9319000244140625,
  0.9347000122070312,
  0.9358000159263611,
  0.9397000074386597,
  0.94749999

In [22]:
target = torch.tensor(entry['target']).unsqueeze(-1)  # [time, 1]
target

tensor([[  0.9336],
        [  0.9247],
        [  0.9263],
        ...,
        [251.0100],
        [250.0800],
        [250.3800]])

In [23]:
past_feat = torch.tensor(entry['past_feat_dynamic_real'])
past_feat

tensor([[9.3640e-01, 9.3750e-01, 9.2410e-01,  ..., 2.5082e+02, 2.5099e+02,
         2.5007e+02],
        [9.4080e-01, 9.3860e-01, 9.3020e-01,  ..., 2.5108e+02, 2.5100e+02,
         2.5056e+02],
        [9.3360e-01, 9.2410e-01, 9.2080e-01,  ..., 2.5050e+02, 2.4987e+02,
         2.4950e+02],
        [2.2075e+07, 9.3968e+06, 7.5152e+06,  ..., 4.4469e+05, 8.5454e+05,
         2.4652e+06],
        [0.0000e+00, 5.0000e+00, 1.0000e+01,  ..., 3.7500e+02, 3.8000e+02,
         3.8500e+02],
        [0.0000e+00, 0.0000e+00, 0.0000e+00,  ..., 1.0000e+00, 1.0000e+00,
         1.0000e+00]])

In [25]:
obs_mask = torch.tensor(entry['observed_mask']).unsqueeze(-1)  # [time, 1]
obs_mask

tensor([[1.],
        [1.],
        [1.],
        ...,
        [1.],
        [1.],
        [1.]])

In [26]:
time_steps = target.shape[0]
num_variates = 7  # 4 OHLC + 1 Vol + 2 Time

# Format: [open, high, low, close, volume, min_since_open, dow]
# past_feat layout: [open, high, low, volume, min_since_open, dow]
full_data = torch.cat([
    past_feat[0:3].permute(1, 0), # Open, High, Low [time, 3]
    target,                       # Close [time, 1]
    past_feat[3:6].permute(1, 0)  # Volume, Min, DoW [time, 3]
], dim=1).unsqueeze(-1) # [time, 7, 1]
full_data

tensor([[[9.3640e-01],
         [9.4080e-01],
         [9.3360e-01],
         ...,
         [2.2075e+07],
         [0.0000e+00],
         [0.0000e+00]],

        [[9.3750e-01],
         [9.3860e-01],
         [9.2410e-01],
         ...,
         [9.3968e+06],
         [5.0000e+00],
         [0.0000e+00]],

        [[9.2410e-01],
         [9.3020e-01],
         [9.2080e-01],
         ...,
         [7.5152e+06],
         [1.0000e+01],
         [0.0000e+00]],

        ...,

        [[2.5082e+02],
         [2.5108e+02],
         [2.5050e+02],
         ...,
         [4.4469e+05],
         [3.7500e+02],
         [1.0000e+00]],

        [[2.5099e+02],
         [2.5100e+02],
         [2.4987e+02],
         ...,
         [8.5454e+05],
         [3.8000e+02],
         [1.0000e+00]],

        [[2.5007e+02],
         [2.5056e+02],
         [2.4950e+02],
         ...,
         [2.4652e+06],
         [3.8500e+02],
         [1.0000e+00]]])

In [28]:
target_packed = rearrange(full_data, "t v p -> (v t) p")
observed_mask_packed = repeat(obs_mask, "t p -> (v t) p", v=num_variates)
sample_id = torch.ones(target_packed.shape[0], dtype=torch.long)
variate_id = repeat(torch.arange(num_variates), "v -> (v t)", t=time_steps)
variate_id

tensor([0, 0, 0,  ..., 6, 6, 6])

In [30]:
observed_mask_packed

tensor([[1.],
        [1.],
        [1.],
        ...,
        [1.],
        [1.],
        [1.]])

In [29]:
sample_id 

tensor([1, 1, 1,  ..., 1, 1, 1])

In [34]:
from einops import rearrange, repeat
from uni2ts.module.packed_scaler import OHLCVPackedScaler

# Load sample from train dataset
entry = train_ds[0]

# Fix: Correctly convert lists to tensors and handle dimensions
target = torch.tensor(entry['target']).unsqueeze(-1)  # [time, 1]
past_feat = torch.tensor(entry['past_feat_dynamic_real'])  # [6, time]
obs_mask = torch.tensor(entry['observed_mask']).unsqueeze(-1)  # [time, 1]

time_steps = target.shape[0]
num_variates = 7  # 4 OHLC + 1 Vol + 2 Time

# Format: [open, high, low, close, volume, min_since_open, dow]
# past_feat layout: [open, high, low, volume, min_since_open, dow]
full_data = torch.cat([
    past_feat[0:3].permute(1, 0), # Open, High, Low [time, 3]
    target,                       # Close [time, 1]
    past_feat[3:6].permute(1, 0)  # Volume, Min, DoW [time, 3]
], dim=1).unsqueeze(-1) # [time, 7, 1]

target_packed = rearrange(full_data, "t v p -> (v t) p")
observed_mask_packed = repeat(obs_mask, "t p -> (v t) p", v=num_variates)
sample_id = torch.ones(target_packed.shape[0], dtype=torch.long)
variate_id = repeat(torch.arange(num_variates), "v -> (v t)", t=time_steps)

print(f"Packed target shape: {target_packed.shape}")

# Initialize Scaler
scaler = OHLCVPackedScaler(verbose=True)


Packed target shape: torch.Size([18404610, 1])

OHLCVPackedScaler Initialization
  Open index: 0 â†’ Group 0 (OHL collective z-score)
  High index: 1 â†’ Group 0 (OHL collective z-score)
  Low index: 2 â†’ Group 0 (OHL collective z-score)
  Volume index: 3 â†’ Group 1 (individual z-score)
  Minutes index: 4 â†’ Mid-range (195.0 Â± 97.5)
  Day of Week index: 5 â†’ Mid-range (2.0 Â± 1.0)
  Correction: 1
  Minimum scale: 1e-05



In [44]:
len(train_ds[0]['target'])

2629230

In [40]:
target_packed.shape

torch.Size([18404610, 1])

In [None]:

# Compute Loc/Scale
loc, scale = scaler(target_packed, observed_mask_packed, sample_id, variate_id)

# Verification
print("\n--- Scaler Verification ---")
for v_id in range(num_variates):
    mask = (variate_id == v_id)
    v_loc = torch.unique(loc[mask])
    v_scale = torch.unique(scale[mask])
    print(f"Variate {v_id}: loc={v_loc.tolist()}, scale={v_scale.tolist()}")

# Assertions
assert torch.allclose(loc[variate_id == 0], loc[variate_id == 3]) # Open vs Close loc
assert not torch.allclose(loc[variate_id == 0], loc[variate_id == 4]) # Open vs Volume loc
print("\nâœ“ Scaler statistics verified successfully!")

Packed target shape: torch.Size([18404610, 1])

OHLCVPackedScaler Initialization
  Open index: 0 â†’ Group 0 (OHL collective z-score)
  High index: 1 â†’ Group 0 (OHL collective z-score)
  Low index: 2 â†’ Group 0 (OHL collective z-score)
  Volume index: 3 â†’ Group 1 (individual z-score)
  Minutes index: 4 â†’ Mid-range (195.0 Â± 97.5)
  Day of Week index: 5 â†’ Mid-range (2.0 Â± 1.0)
  Correction: 1
  Minimum scale: 1e-05


OHLCVPackedScaler: Computing Normalization Statistics (Vectorized)
  Input shape: torch.Size([18404610, 1])
  Unique sample_ids: [1]
  Unique variate_ids: [0, 1, 2, 3, 4, 5, 6]

  Step 1: Create group mapping for OHL collective normalization
    OHLC mask count: 7887690
    Volume mask count: 2629230
    Other mask count: 7887690

  Step 2: Compute OHL collective statistics using vectorized operations


RuntimeError: [enforce fail at alloc_cpu.cpp:117] err == 0. DefaultCPUAllocator: can't allocate memory: you tried to allocate 338729669252100 bytes. Error code 12 (Cannot allocate memory)

In [None]:
# it should do window level 


## Step 3: Minimal Fine-Tuning Run

Configure `MoiraiFinetune` and run for 1 step.

In [None]:
from uni2ts.model.moirai import MoiraiFinetune, MoiraiModule
from uni2ts.distribution import MixtureOutput, StudentTOutput, NormalFixedScaleOutput, NegativeBinomialOutput, LogNormalOutput
import lightning as L
from torch.utils.data import DataLoader

# 1. Setup Model
model_name = "Salesforce/moirai-1.1-R-small"
module = MoiraiModule.from_pretrained(model_name)

# Override scaler to use OHLCVPackedScaler
module.scaler = OHLCVPackedScaler()

finetune_model = MoiraiFinetune(
    module=module,
    min_patches=2,
    min_mask_ratio=0.15,
    max_mask_ratio=0.5,
    max_dim=128,
    num_training_steps=10,
    num_warmup_steps=0,
    lr=1e-5
)

print("âœ“ Model initialized with OHLCVPackedScaler")

# 2. Prepare Data (Dummy batch for now as full dataloader requires more setup)
batch = {
    "target": target_packed.unsqueeze(0).to(torch.float32), # [batch, seq, patch]
    "observed_mask": observed_mask_packed.unsqueeze(0).bool(),
    "sample_id": sample_id.unsqueeze(0),
    "time_id": torch.arange(target_packed.shape[0]).unsqueeze(0),
    "variate_id": variate_id.unsqueeze(0),
    "prediction_mask": torch.zeros(target_packed.shape[0]).unsqueeze(0).bool(),
    "patch_size": torch.ones(target_packed.shape[0], dtype=torch.long).unsqueeze(0) * 8
}

# Move to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
finetune_model.to(device)
for k in batch: batch[k] = batch[k].to(device)

# 3. Test forward pass
with torch.no_grad():
    distr = finetune_model(**batch)
    print(f"\nâœ“ Forward pass successful. Output distribution type: {type(distr)}")

# 4. Test training step
loss = finetune_model.training_step(batch, 0)
print(f"âœ“ Training step successful. Loss: {loss.item()}")