feat: postgres migration#3
Merged
Merged
Conversation
First production-binary code path on the new PG substrate. Builds a local tokio runtime for this single subcommand; main() stays sync until the rest of the call sites migrate. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
36 async methods ported from legacy Store, covering every call site in the codebase: reads (runs/artifacts/pipelines/eval/tracking/events), writes (insert_run, insert_artifact, update_status, link_run_output, copy_run_outputs, backfill_stage_consumers, set_alias, set_tracking, pipeline + eval lifecycle). Add coalesce_claims(cache_key PK, producer_run_id, claimed_at). Replaces the legacy NFS-mkdir leader-election scheme: PG's PRIMARY KEY + INSERT ... ON CONFLICT DO NOTHING RETURNING is genuinely atomic across all clients, whereas mkdir-on-NFS has no atomicity guarantee across NFS clients. The 2 live PG smoke tests still pass. Call-site migration follows in Phase 4 commits. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
src/store.rs goes from 3500 → 615 lines. The in-memory SQLite cache, the filesystem rebuild walker, the per-mutation FS sidecar writes (inputs.json, outputs.json, events/*.jsonl) — all gone. Every Store method now bridges to the async PgStore via block_on_pg, which uses block_in_place + Handle::current() when called from inside a tokio runtime (server/agent) or self.rt.block_on otherwise (CLI sync entry). What stays vs goes on disk: - status.json: kept. Compute→login bridge. - context.json: kept. Compute consumes via $LABCTL_CONTEXT env var. - artifact .meta.json: kept. Lives next to bytes in _objects/. - alias symlinks: kept. add_user_alias creates them. - inputs.json / outputs.json / events/*.jsonl: dropped. - filesystem rebuild walker: dropped. PG is truth, period. Phase 5 deliverable (agent slim): periodic_refresh + cache rebuild deleted from agent.rs and server.rs. Agent owns reconcile, gc, evald, throttle — nothing else. Phase 6 deliverable (UI repoint): server.rs's AppState still holds the same Store API, but Store is now PG-backed. UI reads pass through unchanged; PgPool's internal sync handles concurrent requests, the outer Mutex is now redundant but kept for signature compat. runner.rs cleanup: drop the context.json filesystem-fallback in resolve_from. The 'no registry row but on-disk context.json exists' case can no longer happen with PG as truth. pg_store.rs gains: rehydrate_inputs_by_path, add_user_alias, artifacts_by_kind, artifacts_by_kind_for_producer_user, recipe_history. Plus coalesce slot semantics now use a coalesce_claims table with INSERT...ON CONFLICT DO NOTHING — genuinely atomic across all PG clients, replacing the legacy NFS-mkdir scheme. All 21 unit tests + 2 live PG smoke tests pass. `labctl status` end-to-end against live PG returns 950 rows (matches importer count). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Side-effect of the PG-as-truth Store rewrite: the per-run inputs.json / outputs.json / tracking.json / pipeline.json sidecars, their typed counterparts (RunSidecar, InputSidecar, OutputLink, TrackingSidecar, EvalRequestSidecar, PipelineSidecar, EventLine), the path computers that built their paths, plus the legacy read_json / claim_dir / append_event primitives — none of these are referenced anymore. Build goes from 30 warnings to 0. Kept: STATUS_JSON, CONTEXT_JSON (compute → login bridge — compute nodes have no PG access), LAB_DIRNAME, SUBMIT_SH, ARTIFACT_META, ALIAS_TARGET, ArtifactSidecar, AliasTargetSidecar, CoalesceClaimSidecar, atomic_write_json, ClaimOutcome (still used by the PG-backed coalesce slot's outward API), content_addressed_dir, alias_symlink_path, create_alias_symlink, the shared-group mode bits, plus the trimmed test set. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Symptom: UI loaded forever, /api/runs timed out, /api/stream couldn't establish connection. Process had 130 threads in futex_wait_queue. Root cause: AppState held Arc<Mutex<Store>>. Each handler did state.store.lock().unwrap() and called a sync Store method, which internally bridges to async PgStore via block_on_pg — block_in_place + Handle::current().block_on(...). The std::sync::Mutex guard is held across the block_on (across an await point under the hood), and tokio cannot move that future to another worker. With many concurrent handlers + the events_tailer polling every 500ms, the worker pool serialized on the Mutex and the runtime livelocked. Fix: drop the Mutex. PgPool is already internally synchronized and Send + Sync; PgStore over it is too; the Store sync-facade adds only a Runtime (Send + Sync) and a couple of plain paths/maps. All Store methods now take &self (legacy &mut self was rusqlite-mutability residue, never load-bearing on the PG facade). server.rs and agent.rs hold Arc<Store> directly, no lock. runner / evald / artifacts / tracking signatures move from &mut Store to &Store accordingly. Verified: /api/runs responds instantly, /api/stream handshakes 200 text/event-stream. 19 unit tests + 2 live PG smoke tests still pass. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The cross-cluster import path (Command::ImportFromCluster +
artifacts::import_from_cluster + remote::{read_json, rsync_dir,
probe_reachability} + RemoteConfig + AliasTargetSidecar +
fs_layout::{aliases_root, alias_dir, alias_target, ALIAS_TARGET}) was
unused. Removed end-to-end. Foreign-cluster migration, when needed,
is now: pg_dump + rsync of content-addressed _objects/ trees — no
SSH read-meta-then-rsync dance required.
Store::open no longer pre-creates aliases/, eval_state/, events/,
pipelines/, coalesce_claims/ under runs_base. None of those are written
to since the PG cutover; pre-creating them just produced empty dirs the
cleanup pass kept reaping. The corresponding fs_layout constants
(ALIASES_DIR, EVAL_STATE_DIR, PIPELINES_DIR, EVENTS_DIR,
COALESCE_CLAIMS_DIR) and the run_lab_dir / event_log helpers are gone.
runs/<user>/ dirs are still created lazily at submission time.
validate_user now only rejects "runs" (the one remaining reserved
top-level under runs_base); the other historical reservations are
moot once the dirs themselves don't exist.
-485 lines net. Build + 19 unit tests + 2 live PG smokes all green;
UI restarted on hai-login2 and serving.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The bash wrapper around the user's command was writing status.json with
{state, job_id, rc, updated_at} via tmp+rename, and login-side code
fell back to reading it when sacct didn't yet have a record. sacct is
a strict superset:
- State (RUNNING/COMPLETED/FAILED/CANCELLED/TIMEOUT/OOM/NODE_FAIL/...)
- exit code, signal, Start, End — all on the slurmctld side
- reported regardless of whether the bash wrapper's exit trap fires
(status.json silently misses NODE_FAIL, hard SIGKILL, preemption)
Cuts:
- fs_layout::STATUS_JSON constant.
- runner::status_file_outcome + StatusFile struct.
- The two `.or_else(status_file_outcome(...))` fallbacks in
reconcile_one and repair_finish_times.
- The write_status bash function + entry/exit calls in render_script
and render_follower_script.
bash wrapper is now: `set -uo pipefail` + module loads + env exports +
the user's command. No status sidecar. Exits with the user command's
rc; sacct picks it up.
19 unit tests + 2 live PG smokes pass.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Three additions to support agent-driven pipeline submission (no behavior change in this commit — submit_pipeline still uses the old path): - insert_run becomes an UPSERT (ON CONFLICT DO UPDATE) on the runs PK + cleanup-then-rebuild of run_inputs. Lets the agent later "complete" a pending placeholder by re-calling insert_run on the same id. - insert_pending_pipeline_stage(run_id, recipe, run_dir, source_path, submitted_by, pipeline_id, stage_name, dependency_on): inserts the minimal placeholder for a dependent stage. status='created', no job_id, no cache_key, empty inputs. Recipe is serialised so the agent can deserialise it later, render submit.sh, sbatch. - pending_children_of(parent_run_id): returns RunRows with status 'created' and job_id NULL whose dependency_on->'afterok' references parent_run_id. Uses PG's @> JSONB containment. The agent uses this to find downstream stages to advance when a parent reaches terminal- succeeded. 19 tests + 2 live PG smokes still pass. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Pipeline stages with intra-pipeline parents are no longer submitted at `labctl run-pipeline` time. Instead they land in PG as `status='created', job_id=NULL` placeholders via insert_pending_pipeline_stage. The agent's reconcile loop completes their submission once every parent reaches terminal-success. What changes: submit_pipeline: Topologically iterate stages. Roots (no intra-pipeline parents) submit immediately. A stage whose parents are not yet "materialised" — i.e. cache-hit in this same call, or pinned via `from` — becomes a pending placeholder. Cache-hit chains still propagate synchronously inside one call. SubmittedStage.job_id is "" for pending stages. reconcile_one: After a run's status flips to a terminal value, calls try_submit_pending_children(parent). Succeeded / cache_hit parents trigger normal submission of children whose full parent set is now satisfied. Non-success terminals cascade-fail every pending child blocked on this parent. complete_pending_submission: Deserialises the placeholder's recipe_json, rebuilds the StageContext from PG sibling rows, and calls submit_recipe_inner with the existing run_id. The earlier insert_run UPSERT replaces the placeholder atomically — recipe_json, status, cache_key, context_json, run_inputs all become the resolved values. render_script: No more `#SBATCH --dependency=afterok:...` for pipeline stages. By the time a stage's script renders, every parent is terminal-succeeded on PG and on disk. The coalesce-follower trampoline still uses afterok via render_follower_script for its peer job. Net effect: no resolved_paths.env, no resolved_outputs.json, no late-path bridge of any kind between compute and login. submit.sh is self-contained with concrete `_objects/<hash>/` paths. sacct is sole state source. The only labctl-owned NFS files per run are submit.sh, context.json (recipe context for compute), and the SLURM stdout log. Tests: 19 unit + 2 live PG smoke pass. UI restarted, /api/runs serving. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Two changes:
1. Producer-side output hashing. New `labctl hash-outputs --run-dir`
subcommand reads <run_dir>/.lab/context.json, walks each declared
output path on the compute node, computes util::dir_content_hash
(same function login uses), and atomically writes <run_dir>/.lab/
output_hashes.json as {role: hash}. Streaming outputs (checkpoint_
stream) are skipped — those are decomposed into per-step artifacts
by register_outputs and hashed by path identity, not content.
No PG access needed; compute reads context.json directly.
render_script appends the call after the user command exits with
rc=0:
<labctl_bin> hash-outputs --run-dir <run_dir> || true
Best-effort: any failure leaves the manifest absent and login-side
register_outputs falls back to its existing walk-and-hash. The
user's rc is preserved in either case.
register_outputs now reads the manifest first and only falls back
on cache miss. Hash function is unchanged, so a manifest entry is
bit-for-bit interchangeable with a fresh walk.
Win: dir_content_hash on multi-TB checkpoint dirs was the dominant
register_outputs cost. Producer-side runs on compute node with
bytes hot in page cache; login agent reads from cold NFS. The
manifest hop trades NFS read amplification for one cheap producer-
side walk.
2. Restore conditional afterok in render_script. The agent-driven
commit dropped afterok entirely with `let _ = parent_job_ids`, but
sweep-array → sweep-aggregate dependency legitimately uses the
parameter: when a sweep declares an aggregate stage, it submits
with `afterok:<array_jobid>` so SLURM gates it on every array
element succeeding. Restore the `if !parent_job_ids.is_empty()`
guard — pipeline-stage callers always pass &[], sweep-aggregate
passes a non-empty list.
19 tests + 2 live PG smokes pass. UI restarted, /api/runs serving.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.