# T27 + T28 Ops Validation Notebook

Validates paper controller + operations harness behaviors without live broker credentials.


In [None]:
import json
import os
import shutil
from datetime import datetime
from pathlib import Path

from research.utils.validation_harness import (
    discover_run_artifacts,
    hash_json_normalized,
    hash_jsonl_normalized,
    run_cmd,
)

PROJECT_ROOT = Path('.').resolve()
BASE_DATA_ROOT = PROJECT_ROOT / '.quanto_data'
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
TMP_DATA_ROOT = PROJECT_ROOT / '.quanto_data_notebook_tmp' / f'T27_T28_{timestamp}'
TMP_DATA_ROOT.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(BASE_DATA_ROOT, TMP_DATA_ROOT, dirs_exist_ok=True)
os.environ['QUANTO_DATA_ROOT'] = str(TMP_DATA_ROOT)

PROMOTION_DIR = TMP_DATA_ROOT / 'promotions' / 'candidate'
promotions = sorted(PROMOTION_DIR.glob('*.json'))
if not promotions:
    raise RuntimeError(f'No promotions found under {PROMOTION_DIR}')
PROMOTION_PAYLOAD = json.loads(promotions[0].read_text(encoding='utf-8'))
CANDIDATE_ID = PROMOTION_PAYLOAD['experiment_id']
EXPERIMENTS_ROOT = TMP_DATA_ROOT / 'experiments'
all_experiments = sorted(p.name for p in EXPERIMENTS_ROOT.iterdir())
UNPROMOTED_ID = next(
    exp for exp in all_experiments if not (PROMOTION_DIR / f'{exp}.json').exists() and exp != CANDIDATE_ID
)
SPEC_PATH = EXPERIMENTS_ROOT / CANDIDATE_ID / 'spec' / 'experiment_spec.json'
SPEC_PAYLOAD = json.loads(SPEC_PATH.read_text(encoding='utf-8'))
UNIVERSE = SPEC_PAYLOAD.get('symbols') or SPEC_PAYLOAD.get('universe') or []
if len(UNIVERSE) < 2:
    raise RuntimeError('Candidate spec must list at least two symbols for paper validation.')
UNIVERSE = UNIVERSE[:2]
CONFIG_ROOT = TMP_DATA_ROOT / 'notebook_configs'
CONFIG_ROOT.mkdir(parents=True, exist_ok=True)

def make_paper_config(experiment_id: str, path: Path) -> Path:
    payload = {
        'experiment_id': experiment_id,
        'execution_mode': 'alpaca_paper',
        'universe': UNIVERSE,
        'broker': {'alpaca_base_url': 'https://paper-api.alpaca.markets'},
        'risk_limits': {'max_gross_exposure': 0.5, 'max_turnover': 0.1, 'max_drawdown': 0.05},
        'polling': {'max_poll_seconds': 5, 'poll_interval_seconds': 1},
        'reconciliation': {'position_tolerance_shares': 0.5, 'cash_tolerance_usd': 10.0},
        'artifacts': {'output_root': str(TMP_DATA_ROOT / 'paper' / experiment_id / 'runs')},
    }
    path.write_text(json.dumps(payload, indent=2), encoding='utf-8')
    return path

CANDIDATE_CONFIG = make_paper_config(CANDIDATE_ID, CONFIG_ROOT / 'candidate_paper.json')
UNPROMOTED_CONFIG = make_paper_config(UNPROMOTED_ID, CONFIG_ROOT / 'unpromoted_paper.json')
print('temp data root:', TMP_DATA_ROOT)
print('candidate:', CANDIDATE_ID)
print('unpromoted:', UNPROMOTED_ID)
print('candidate config:', CANDIDATE_CONFIG)


In [None]:
import json
import os
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List

from research.ops.alerts import AlertEmitter
from research.ops.config import AlertingConfig, BackoffPolicyConfig, OpsConfig, PaperOpsConfig
from research.ops.lifecycle import RunLifecycleTracker
from research.ops.runtime import BrokerRetryableError, RunExecutor
from research.ops.scheduler import MissedRun, ScheduleDecision
from research.ops.service import PaperRunOrchestrator
from research.paper.config import load_paper_config
from research.paper.run import PaperRunner, derive_run_id

NOTEBOOK_STATE: Dict[str, Any] = {}

class StaticScheduler:
    """Deterministic scheduler stub for orchestrator-driven tests."""

    def __init__(self, experiment_id: str, scheduled_for: datetime):
        self.experiment_id = experiment_id
        self.scheduled_for = scheduled_for
        self.history: List[Any] = []
        self.run_id: str | None = None

    def evaluate(self, *, base_run_id: str, now: datetime | None = None) -> ScheduleDecision:
        run_id = f"{base_run_id}_{self.scheduled_for.strftime('%Y%m%d%H%M')}"
        self.run_id = run_id
        return ScheduleDecision(run_id=run_id, scheduled_for=self.scheduled_for, due=True, resume_run_id=None, missed=[])

    def mark_active(self, run_id: str, scheduled_for: datetime) -> None:
        self.history.append(('active', run_id, scheduled_for.isoformat()))

    def mark_terminal(self, run_id: str, state: str) -> None:
        self.history.append(('terminal', run_id, state))

class NotebookStubRunner(PaperRunner):
    """Runner stub that writes deterministic artifacts for validation."""

    def __init__(self, config, *, run_id: str | None = None, scheduled_for: str | None = None):
        super().__init__(config, run_id=run_id, scheduled_for=scheduled_for)
        logs = self.run_dir / 'logs'
        logs.mkdir(parents=True, exist_ok=True)
        step_rows = [
            {
                'idx': 0,
                'timestamp': '2024-01-02T13:30:00+00:00',
                'mode': 'risk_on',
                'weights': {symbol: 1.0 / len(config.universe) for symbol in config.universe},
            },
            {
                'idx': 1,
                'timestamp': '2024-01-03T13:30:00+00:00',
                'mode': 'defensive',
                'weights': {symbol: 0.0 for symbol in config.universe},
            },
        ]
        steps_path = logs / 'steps.jsonl'
        steps_path.write_text('
'.join(json.dumps(row, sort_keys=True) for row in step_rows), encoding='utf-8')
        metrics_payload = {
            'summary': {'pnl': 0.0, 'turnover': 0.0, 'reject_rate': 0.0, 'execution_halts': 0},
            'execution': {'fills': 0, 'orders_submitted': 0},
        }
        (self.run_dir / 'metrics.json').write_text(json.dumps(metrics_payload, indent=2, sort_keys=True), encoding='utf-8')
        exec_payload = {'latency_ms': 12, 'fees': 0.12}
        (self.run_dir / 'execution_metrics.json').write_text(json.dumps(exec_payload, indent=2, sort_keys=True), encoding='utf-8')

class ExplodingRunner(PaperRunner):
    """Runner stub that fails immediately to trigger hard alerts."""

    def __init__(self, config, *, run_id: str | None = None, scheduled_for: str | None = None):
        raise RuntimeError('forced_runner_failure')


def clear_alpaca_creds() -> None:
    for key in ('ALPACA_API_KEY', 'ALPACA_SECRET_KEY', 'ALPACA_BASE_URL'):
        os.environ.pop(key, None)


def ensure_dummy_creds() -> None:
    os.environ['ALPACA_API_KEY'] = 'demo-key'
    os.environ['ALPACA_SECRET_KEY'] = 'demo-secret'
    os.environ['ALPACA_BASE_URL'] = 'https://paper-api.alpaca.markets'


def build_ops_config(expect_trades: bool = True) -> OpsConfig:
    return OpsConfig(
        paper_trading=PaperOpsConfig(
            cron='* * * * *',
            timezone='UTC',
            grace_minutes=0,
            expect_trades=expect_trades,
            runbook_url='ops/runbook.md',
            backoff=BackoffPolicyConfig(initial_seconds=0.1, max_seconds=1.0, multiplier=2, max_attempts=1),
            alerts=AlertingConfig(hard_channels=['ops/oncall'], soft_channels=['ops/oncall']),
        )
    )


def collect_alerts() -> list[dict[str, Any]]:
    alerts_root = Path(os.environ['QUANTO_DATA_ROOT']) / 'monitoring' / 'alerts'
    payloads: list[dict[str, Any]] = []
    if not alerts_root.exists():
        return payloads
    for path in sorted(alerts_root.glob('*.json')):
        entries = json.loads(path.read_text(encoding='utf-8'))
        payloads.extend(entries)
    return payloads

print('helper scaffolding ready')


In [None]:
# T27: Promotion-only enforcement (unpromoted experiment must fail)
clear_alpaca_creds()
result = run_cmd([
    'python', 'scripts/run_paper.py', '--config', str(UNPROMOTED_CONFIG),
], check=False)
print('return code:', result.returncode)
print(result.stdout)
print(result.stderr)
assert result.returncode != 0
assert 'not promoted' in result.stderr


In [None]:
# T27: Paper run rejects missing credentials even for promoted experiments
clear_alpaca_creds()
result = run_cmd([
    'python', 'scripts/run_paper.py', '--config', str(CANDIDATE_CONFIG),
], check=False)
print('return code:', result.returncode)
print(result.stdout)
print(result.stderr)
assert result.returncode != 0
assert 'ALPACA_API_KEY' in result.stderr


In [None]:
# T27: Mocked success path using stub runner + orchestrator
ensure_dummy_creds()
paper_config = load_paper_config(Path(CANDIDATE_CONFIG))
ops_config = build_ops_config(expect_trades=True)
scheduled_for = datetime(2024, 1, 2, 13, 0, tzinfo=timezone.utc)
base_run_id = derive_run_id(paper_config)
stub_scheduler = StaticScheduler(paper_config.experiment_id, scheduled_for)
orchestrator = PaperRunOrchestrator(
    paper_config=paper_config,
    ops_config=ops_config,
    runner_factory=NotebookStubRunner,
    scheduler=stub_scheduler,
)
report = orchestrator.run(now=scheduled_for)
run_dir = Path(report.details['run_dir'])
artifacts = discover_run_artifacts(run_dir)
print('run id:', report.run_id)
print('artifacts:', artifacts)
print('summary:', report.summary_json)
print('markdown:', report.summary_markdown)
NOTEBOOK_STATE['success_run'] = {
    'run_id': report.run_id,
    'run_dir': run_dir,
    'summary_json': Path(report.summary_json),
    'scheduled_for': scheduled_for.isoformat(),
}
print('steps hash:', hash_jsonl_normalized(artifacts['steps']))
if 'metrics' in artifacts:
    print('metrics hash:', hash_json_normalized(artifacts['metrics']))
if 'execution_metrics' in artifacts:
    print('execution metrics hash:', hash_json_normalized(artifacts['execution_metrics']))
assert run_dir.exists()
assert (run_dir / 'metrics.json').exists()
assert report.summary_json is not None


In [None]:
# T28: Lifecycle + summary validation
success_info = NOTEBOOK_STATE['success_run']
state_path = Path(os.environ['QUANTO_DATA_ROOT']) / 'paper' / paper_config.experiment_id / 'runs' / success_info['run_id'] / 'state.json'
state_payload = json.loads(state_path.read_text(encoding='utf-8'))
transition_states = [entry['state'] for entry in state_payload['transitions']]
print('transitions:', transition_states)
assert transition_states[:3] == ['SCHEDULED', 'STARTING', 'RUNNING']
assert transition_states[-1] == 'COMPLETED'
summary_payload = json.loads(success_info['summary_json'].read_text(encoding='utf-8'))
print('daily summary keys:', sorted(summary_payload))
assert summary_payload['run_id'] == success_info['run_id']
assert summary_payload['experiment_id'] == paper_config.experiment_id


In [None]:
# T28: Forced failure -> hard alert
failure_scheduler = StaticScheduler(paper_config.experiment_id, scheduled_for + timedelta(minutes=30))
orchestrator_failure = PaperRunOrchestrator(
    paper_config=paper_config,
    ops_config=build_ops_config(expect_trades=False),
    runner_factory=ExplodingRunner,
    scheduler=failure_scheduler,
)
try:
    orchestrator_failure.run(now=scheduled_for + timedelta(minutes=30))
except RuntimeError as exc:
    print('expected failure:', exc)
NOTEBOOK_STATE['failure_run_id'] = failure_scheduler.run_id


In [None]:
# T28: Alerts inspection (soft + hard entries must exist)
alerts = collect_alerts()
print('alerts:', alerts)
soft = [entry for entry in alerts if entry['severity'] == 'soft' and entry.get('run_id') == NOTEBOOK_STATE['success_run']['run_id']]
hard = [entry for entry in alerts if entry['severity'] == 'hard']
assert soft, 'soft alert missing (expect_trades triggered no-trade alert)'
assert any(entry.get('run_id') == NOTEBOOK_STATE.get('failure_run_id') for entry in hard)


In [None]:
# T28: Missed run detection via real scheduler + orchestrator
from research.ops.scheduler import PaperRunScheduler

missed_config = PaperOpsConfig(cron='0 0 * * *', timezone='UTC', grace_minutes=5)
missed_ops = OpsConfig(paper_trading=missed_config)
missed_scheduler = PaperRunScheduler(
    paper_config.experiment_id,
    missed_config,
    data_root=Path(os.environ['QUANTO_DATA_ROOT']) / 'ops_missed'
)
missed_orchestrator = PaperRunOrchestrator(
    paper_config=paper_config,
    ops_config=missed_ops,
    scheduler=missed_scheduler,
)
missed_report = missed_orchestrator.run(now=datetime(2024, 1, 2, 15, 0, tzinfo=timezone.utc))
print('report status:', missed_report.status)
print('missed runs:', missed_report.details.get('missed_runs'))
assert missed_report.status == 'IDLE'
assert missed_report.details['missed_runs'], 'missed run list should not be empty'


In [None]:
# T28: Recovery supervisor / resume safety via RunExecutor
from research.ops.scheduler import PaperRunScheduler

recovery_tracker = RunLifecycleTracker('exp_recovery', 'recovery_run')
recovery_scheduler = PaperRunScheduler(
    'exp_recovery',
    PaperOpsConfig(cron='* * * * *', timezone='UTC', grace_minutes=0),
    data_root=Path(os.environ['QUANTO_DATA_ROOT']) / 'ops_recovery'
)
recovery_scheduler.mark_active('recovery_run', datetime(2024, 1, 2, 13, 0, tzinfo=timezone.utc))
attempts = {'count': 0}


def flaky_callable() -> dict[str, Any]:
    attempts['count'] += 1
    if attempts['count'] < 3:
        raise BrokerRetryableError('alpaca_down')
    run_dir = Path(os.environ['QUANTO_DATA_ROOT']) / 'paper' / 'exp_recovery' / 'runs' / 'recovery_run'
    run_dir.mkdir(parents=True, exist_ok=True)
    return {'run_dir': str(run_dir), 'metrics': {'pnl': 0.0}}

executor = RunExecutor(
    experiment_id='exp_recovery',
    run_id='recovery_run',
    lifecycle=recovery_tracker,
    scheduler=recovery_scheduler,
    alert_emitter=AlertEmitter(),
    backoff_config=BackoffPolicyConfig(initial_seconds=0.01, max_seconds=0.05, multiplier=2, max_attempts=3),
)
result = executor.execute(flaky_callable)
print('attempts:', attempts['count'])
print('final state:', recovery_tracker.current_state)
assert attempts['count'] == 3
assert recovery_tracker.current_state == 'COMPLETED'


In [None]:
# Final summary
success_info = NOTEBOOK_STATE['success_run']
print('temporary data root:', TMP_DATA_ROOT)
print('successful run dir:', success_info['run_dir'])
print('summary artifact:', success_info['summary_json'])
print('alerts stored under:', Path(os.environ['QUANTO_DATA_ROOT']) / 'monitoring' / 'alerts')
print('Notebook validation complete.')
