mcp-data-platform-v1.62.0
Postgres-backed embedding job queue
Replaces the synchronous re-embed admin endpoint shipped in v1.61.11 (#420) with a Postgres-backed job queue. Embedding work moves off the request path, spec writes enqueue jobs atomically, workers across every pod race for them via SELECT FOR UPDATE SKIP LOCKED, and a reconciler fills any gap on a 5 minute tick so embeddings converge without operator action.
Closes the design gap surfaced by a recent failure where clicking "Re-embed" on three specs produced one completion and two silent failures. The synchronous-in-HTTP-handler model had three failure modes the queue removes:
- Slow embed providers (Ollama at roughly 200ms per operation, a 300-operation spec equals 60s) timed out at the ingress.
- Pod restart mid-embed lost all progress with no checkpoint.
- Parallel re-embed requests contended on the provider's per-host concurrency and surfaced as random failures.
PR #421.
Schema (migration 000045)
CREATE TABLE api_catalog_embedding_jobs (
id BIGSERIAL PRIMARY KEY,
catalog_id TEXT NOT NULL,
spec_name TEXT NOT NULL,
kind TEXT NOT NULL, -- spec_write | reconciler | manual_retry
status TEXT NOT NULL DEFAULT 'pending', -- pending|running|succeeded|failed
attempts INT NOT NULL DEFAULT 0,
last_error TEXT NOT NULL DEFAULT '',
next_run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
worker_id TEXT NOT NULL DEFAULT '',
lease_expires_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
FOREIGN KEY (catalog_id, spec_name)
REFERENCES api_catalog_specs(catalog_id, spec_name) ON DELETE CASCADE
);
CREATE UNIQUE INDEX api_catalog_embedding_jobs_open
ON api_catalog_embedding_jobs (catalog_id, spec_name)
WHERE status IN ('pending', 'running');
ALTER TABLE api_catalog_specs ADD COLUMN operation_count INT NOT NULL DEFAULT 0;The partial unique index makes producer enqueues idempotent: a spec edited twice in quick succession collapses to one open job. operation_count lets the reconciler detect gaps in pure SQL.
Runtime
New pkg/toolkits/apigateway/embedjobs package with five components:
PostgresStorewraps every queue operation.Enqueueissuespg_notify;ClaimusesSELECT FOR UPDATE SKIP LOCKED;Complete,Retry, andFailenforce a lease-ownership predicate so a crashed worker cannot stomp on a re-leased job;ReleaseExpiredLeasesandReconcileGapsare one-statement SQL;SpecStatuses,Health, andSetOperationCountback the read side.Workerclaims one job at a time, callsapigateway.ComputeOperationEmbeddings, and persists vectors. Outcome routes throughComplete,Retry(exponential backoff 5s, 10s, 20s, 40s, 80s), orFailafter 5 attempts.Reaperruns every 30s and flipsstatus='running' AND lease_expires_at <= NOW()rows back to pending. Recovers from any pod crash.Reconcilerruns on every pod boot AND every 5 minutes. Comparess.operation_count <> COALESCE(e.embedded, 0)andON CONFLICT DO NOTHINGenqueues a job for every gap.Listenerusespq.Listeneron theapi_catalog_embedding_jobschannel and degrades to poll-only if the role lacksLISTENprivilege.
Platform wiring in WireAPIGatewayEmbedJobsFromDB spawns all four via the existing lifecycle callbacks. Activates only when a database AND an embedding provider are configured.
Admin
Spec writes (upsert, upload, refresh, clone) now:
- Stamp
entry.OperationCount = apicatalog.CountOperations(content). - Call
UpsertSpec. - Call
h.enqueueEmbedJob(...)fire-and-forget; failure logs only.
The synchronous /reembed endpoint that v1.61.11 shipped is removed. Its URL is reused as the manual-retry entry point: POST /api/v1/admin/api-catalogs/{id}/specs/{spec}/reembed now enqueues a manual_retry job and returns 202 Accepted. The worker treats manual_retry specially by skipping ListExisting, so the operator's "model swapped externally" escape hatch actually replaces stale vectors.
New read-only endpoints back the portal UI:
GET .../embedding-statusreturns per-spec rows for the badges.GET .../embedding-healthreturns a catalog-level rollup ({specs_total, specs_indexed, specs_pending, specs_running, specs_failed}).GET .../embedding-jobsreturns recent job history, filterable by status and spec_name.
Portal UI
The CatalogsPanel polls embedding state every 5 seconds and renders one of five badge states per spec:
| Badge | Meaning |
|---|---|
N/M indexed (green) |
Fully embedded; semantic ranking active. |
indexing N/M (blue) |
Worker is processing this spec right now. |
queued (amber) |
Job is in the queue waiting for a worker. |
failed (red, with last-error tooltip) |
Retries exhausted; Retry button appears. |
empty (gray) |
Spec has zero operations. |
The catalog header shows a one-line rollup: All N specs indexed (green) or K/M indexed (running/queued/failed) (amber/red). The operator no longer needs a button for the happy path.
Bugs caught by the pre-commit adversarial review
Three substantive bugs were caught and fixed before commit:
ON CONFLICT ON CONSTRAINT api_catalog_embedding_jobs_openwould have failed at runtime becauseCREATE UNIQUE INDEXwrites topg_indexbut notpg_constraint. Fixed by switching to index-inference syntax (ON CONFLICT (catalog_id, spec_name) WHERE status IN ('pending','running')) in bothEnqueueandReconcileGaps.manual_retrywas silently a no-op because the worker passedexistingtoComputeregardless of job kind. With a model swapped externally (same name, different behavior), every operation hit the(text_hash, dim, model)dedup and zero embed calls were made. Fixed by settingexisting=nilwhenjob.Kind == KindManualRetry. Regression test pins this.- Reconciler infinite-enqueue on legacy specs because
operation_countdefaulted to0and the worker never stamped it, so a spec with prior vectors looked like a permanent gap. Fixed by having the worker callPersister.StampOperationCount(catalogID, specName, len(rows))after each successfulUpsert. Regression test pins this.
Upgrade notes
- Migration 000045 runs automatically. Adds the queue table and the
operation_countcolumn. No data backfill needed; the worker stamps the column on first reconciler pickup. - Existing v1.61.11 catalogs work without operator action. On pod boot, the reconciler scans every spec and enqueues a
reconcilerjob for any spec whoseoperation_count <> embedding_count. Within one worker pass the column converges and the row stops re-enqueueing. - Multi-replica deployments benefit immediately. Two replicas running the same query both get rows back via
SKIP LOCKED, different rows, no blocking, no work duplication. The unique partial index handles producer-side deduplication. - Operator-facing API changes:
POST .../specs/{spec}/reembedis now202 Accepted(was blocking until embed completed). Clients that ignored the response body still work; clients that polled the response forembedded_countshould switch toGET .../embedding-status.- New GET endpoints for status, health, and job history (see Admin section).
- No embedding provider configured? Spec writes still succeed; nothing is enqueued. Ranking falls back to lexical with
errEmbeddingsNotIndexed. Wireembedding.ollama.urland the reconciler will populate vectors on its next tick.
Installation
Homebrew (macOS)
brew install txn2/tap/mcp-data-platformClaude Code CLI
claude mcp add mcp-data-platform -- mcp-data-platformDocker
docker pull ghcr.io/txn2/mcp-data-platform:v1.62.0Verification
All release artifacts are signed with Cosign. Verify with:
cosign verify-blob --bundle mcp-data-platform_1.62.0_linux_amd64.tar.gz.sigstore.json \
mcp-data-platform_1.62.0_linux_amd64.tar.gz