From 16ebf700d68817066365290d1e66d4c0db73eb71 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Fri, 20 Mar 2026 15:06:24 -0700 Subject: [PATCH] v0.3.0: Fix runtime protocol, add production features and API completeness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 (P0) — Fix StreamSession integration: - Add RuntimeSessionHandle and openSession() for unified bidirectional gRPC streams - Rewrite RunExecutor to create session + send kickoff through single stream - Update StreamConsumer to accept session handle, fall back to streamSession() on reconnect - Update MockRuntimeProvider with openSession() implementation Phase 2 (P0) — Fix Signal & ContextUpdate: - Send signals with empty session_id/mode per runtime validation rules - Remove POST /runs/:id/context (runtime doesn't support ContextUpdate) - Add Ack error checking in send() and startSession() Phase 3 (P1) — Production correctness: - Fix StreamConsumer.isHealthy() to track per-stream connected state - Set schemaVersion on control-plane-emitted events - Add .unref() to StreamHub cleanup timer - Add RuntimeCapabilities storage in provider registry - Add expires_at column to runtime_sessions (migration 0004) Phase 4 (P2) — API completeness: - Add pagination metadata (data/total/limit/offset) to GET /runs and GET /audit - Add DELETE /runs/:id (terminal only), POST /runs/:id/archive - Add GET /audit with filtering - Add POST /admin/circuit-breaker/reset - Add POST /runs/:id/clone Phase 5 (P2-P3) — Observability & notifications: - Add webhook system with HMAC-SHA256 signing, retry, and management endpoints - Fire webhooks on run.started/completed/failed/cancelled - Add batch cancel/export endpoints - Add JSONL export format (?format=jsonl) - Add Prometheus circuit breaker metrics (failures_total, success_total) Phase 6 (P3) — Tech debt: - Add npm run proto:sync script - Fix InlineArtifactStorageProvider.retrieve() to read from DB - Improve normalizer deriveSubject() to extract entity IDs from decoded payloads - Update normalizer schemaVersion to use PROJECTION_SCHEMA_VERSION Migrations: 0004_session_ttl.sql, 0005_webhooks.sql --- drizzle/0004_session_ttl.sql | 2 + drizzle/0005_webhooks.sql | 12 + package.json | 5 +- scripts/proto-sync.sh | 21 ++ src/app.module.ts | 11 +- .../inline-artifact-storage.provider.ts | 18 +- src/audit/audit.service.ts | 38 +++ src/contracts/runtime.ts | 36 +++ src/controllers/admin.controller.ts | 17 ++ src/controllers/audit.controller.ts | 27 ++ .../run-insights.controller.spec.ts | 60 ++++- src/controllers/run-insights.controller.ts | 39 ++- src/controllers/runs.controller.spec.ts | 25 +- src/controllers/runs.controller.ts | 38 ++- src/controllers/webhook.controller.ts | 46 ++++ src/db/schema.ts | 17 ++ src/dto/clone-run.dto.ts | 15 ++ src/dto/export-run-query.dto.ts | 7 +- src/dto/list-audit-query.dto.ts | 50 ++++ src/dto/list-runs-query.dto.ts | 5 + src/dto/paginated-response.dto.ts | 15 ++ src/dto/webhook.dto.ts | 18 ++ src/events/event-normalizer.service.spec.ts | 2 +- src/events/event-normalizer.service.ts | 31 ++- src/events/run-event.service.ts | 5 +- src/events/stream-hub.service.ts | 1 + src/insights/run-insights.service.ts | 28 +++ src/runs/run-executor.service.ts | 152 ++++++------ src/runs/run-manager.service.spec.ts | 14 ++ src/runs/run-manager.service.ts | 76 +++++- src/runs/stream-consumer.service.spec.ts | 49 +++- src/runs/stream-consumer.service.ts | 34 ++- src/runtime/circuit-breaker.ts | 9 + src/runtime/mock-runtime.provider.ts | 44 ++++ src/runtime/runtime-provider.registry.ts | 11 +- src/runtime/rust-runtime.provider.ts | 232 +++++++++++++++++- src/storage/artifact.repository.ts | 9 + src/storage/run.repository.ts | 45 ++++ src/storage/runtime-session.repository.ts | 1 + src/telemetry/instrumentation.service.ts | 10 + src/webhooks/webhook.repository.ts | 41 ++++ src/webhooks/webhook.service.ts | 73 ++++++ 42 files changed, 1249 insertions(+), 140 deletions(-) create mode 100644 drizzle/0004_session_ttl.sql create mode 100644 drizzle/0005_webhooks.sql create mode 100755 scripts/proto-sync.sh create mode 100644 src/controllers/admin.controller.ts create mode 100644 src/controllers/audit.controller.ts create mode 100644 src/controllers/webhook.controller.ts create mode 100644 src/dto/clone-run.dto.ts create mode 100644 src/dto/list-audit-query.dto.ts create mode 100644 src/dto/paginated-response.dto.ts create mode 100644 src/dto/webhook.dto.ts create mode 100644 src/webhooks/webhook.repository.ts create mode 100644 src/webhooks/webhook.service.ts diff --git a/drizzle/0004_session_ttl.sql b/drizzle/0004_session_ttl.sql new file mode 100644 index 0000000..cb977e6 --- /dev/null +++ b/drizzle/0004_session_ttl.sql @@ -0,0 +1,2 @@ +-- Add expires_at column to runtime_sessions for TTL tracking +ALTER TABLE "runtime_sessions" ADD COLUMN "expires_at" TIMESTAMP WITH TIME ZONE; diff --git a/drizzle/0005_webhooks.sql b/drizzle/0005_webhooks.sql new file mode 100644 index 0000000..6ec002a --- /dev/null +++ b/drizzle/0005_webhooks.sql @@ -0,0 +1,12 @@ +-- Create webhooks table for webhook notification subscriptions +CREATE TABLE IF NOT EXISTS "webhooks" ( + "id" UUID PRIMARY KEY, + "url" TEXT NOT NULL, + "events" JSONB NOT NULL DEFAULT '[]', + "secret" VARCHAR(255) NOT NULL, + "active" INTEGER NOT NULL DEFAULT 1, + "created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), + "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS "webhooks_active_idx" ON "webhooks" ("active"); diff --git a/package.json b/package.json index 3078815..997b7cc 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "macp-control-plane", - "version": "0.2.0", + "version": "0.3.0", "private": true, "description": "Scenario-agnostic control plane for the MACP runtime, built with NestJS.", "license": "MIT", @@ -17,7 +17,8 @@ "test:cov": "jest --coverage", "drizzle:generate": "drizzle-kit generate", "drizzle:migrate": "drizzle-kit migrate", - "drizzle:studio": "drizzle-kit studio" + "drizzle:studio": "drizzle-kit studio", + "proto:sync": "bash scripts/proto-sync.sh" }, "dependencies": { "@grpc/grpc-js": "^1.14.0", diff --git a/scripts/proto-sync.sh b/scripts/proto-sync.sh new file mode 100755 index 0000000..2de6544 --- /dev/null +++ b/scripts/proto-sync.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +# Sync proto files from the runtime's authoritative protos to the control plane. +# Usage: npm run proto:sync +# +# Assumes the runtime repo is at ../runtime relative to the control plane root. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +CP_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +RUNTIME_ROOT="${RUNTIME_PROTO_DIR:-$CP_ROOT/../runtime}" + +if [ ! -d "$RUNTIME_ROOT/proto" ]; then + echo "ERROR: Runtime proto directory not found at $RUNTIME_ROOT/proto" + echo "Set RUNTIME_PROTO_DIR to the runtime repo root, or place it at ../runtime" + exit 1 +fi + +echo "Syncing protos from $RUNTIME_ROOT/proto → $CP_ROOT/proto" +rsync -av --delete "$RUNTIME_ROOT/proto/" "$CP_ROOT/proto/" +echo "Proto sync complete." diff --git a/src/app.module.ts b/src/app.module.ts index fd5ec59..bb14026 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -5,6 +5,8 @@ import { AuthGuard } from './auth/auth.guard'; import { AuthModule } from './auth/auth.module'; import { ThrottleByUserGuard } from './auth/throttle-by-user.guard'; import { ConfigModule } from './config/config.module'; +import { AdminController } from './controllers/admin.controller'; +import { AuditController } from './controllers/audit.controller'; import { HealthController } from './controllers/health.controller'; import { MetricsController } from './controllers/metrics.controller'; import { ObservabilityController } from './controllers/observability.controller'; @@ -39,6 +41,9 @@ import { RunExecutorService } from './runs/run-executor.service'; import { RunManagerService } from './runs/run-manager.service'; import { RunRecoveryService } from './runs/run-recovery.service'; import { StreamConsumerService } from './runs/stream-consumer.service'; +import { WebhookController } from './controllers/webhook.controller'; +import { WebhookRepository } from './webhooks/webhook.repository'; +import { WebhookService } from './webhooks/webhook.service'; @Module({ imports: [ @@ -47,7 +52,7 @@ import { StreamConsumerService } from './runs/stream-consumer.service'; AuthModule, ThrottlerModule.forRoot([{ ttl: 60000, limit: 100 }]) ], - controllers: [RunsController, RunInsightsController, RuntimeController, ObservabilityController, HealthController, MetricsController], + controllers: [RunsController, RunInsightsController, RuntimeController, ObservabilityController, HealthController, MetricsController, AdminController, AuditController, WebhookController], providers: [ { provide: APP_GUARD, useClass: AuthGuard }, { provide: APP_GUARD, useClass: ThrottleByUserGuard }, @@ -75,7 +80,9 @@ import { StreamConsumerService } from './runs/stream-consumer.service'; StreamConsumerService, RunExecutorService, RunRecoveryService, - RunInsightsService + RunInsightsService, + WebhookRepository, + WebhookService ] }) export class AppModule implements NestModule { diff --git a/src/artifacts/inline-artifact-storage.provider.ts b/src/artifacts/inline-artifact-storage.provider.ts index fdb06f7..6e48d34 100644 --- a/src/artifacts/inline-artifact-storage.provider.ts +++ b/src/artifacts/inline-artifact-storage.provider.ts @@ -1,10 +1,13 @@ import { Injectable } from '@nestjs/common'; +import { ArtifactRepository } from '../storage/artifact.repository'; import { ArtifactStorageProvider } from './artifact-storage.interface'; @Injectable() export class InlineArtifactStorageProvider implements ArtifactStorageProvider { readonly kind = 'inline'; + constructor(private readonly artifactRepository: ArtifactRepository) {} + async store(params: { runId: string; artifactId: string; @@ -17,8 +20,17 @@ export class InlineArtifactStorageProvider implements ArtifactStorageProvider { return { uri: `inline://${params.runId}/${params.artifactId}` }; } - async retrieve(_uri: string): Promise<{ data: Buffer; contentType?: string } | null> { - // Inline artifacts are read directly from the DB, not through this provider. - return null; + async retrieve(uri: string): Promise<{ data: Buffer; contentType?: string } | null> { + // Parse inline URI: inline://{runId}/{artifactId} + const match = uri.match(/^inline:\/\/([^/]+)\/([^/]+)$/); + if (!match) return null; + + const artifact = await this.artifactRepository.findById(match[2]); + if (!artifact?.inline) return null; + + return { + data: Buffer.from(JSON.stringify(artifact.inline), 'utf8'), + contentType: 'application/json' + }; } } diff --git a/src/audit/audit.service.ts b/src/audit/audit.service.ts index 40b554a..75fba91 100644 --- a/src/audit/audit.service.ts +++ b/src/audit/audit.service.ts @@ -1,4 +1,5 @@ import { Injectable, Logger } from '@nestjs/common'; +import { and, desc, eq, gt, lt, sql, SQL } from 'drizzle-orm'; import { randomUUID } from 'node:crypto'; import { DatabaseService } from '../db/database.service'; import { auditLog } from '../db/schema'; @@ -37,4 +38,41 @@ export class AuditService { this.logger.error(`audit record failed: ${error instanceof Error ? error.message : String(error)}`); } } + + async list(filters: { + actor?: string; + action?: string; + resource?: string; + resourceId?: string; + createdAfter?: string; + createdBefore?: string; + limit?: number; + offset?: number; + }): Promise<{ data: (typeof auditLog.$inferSelect)[]; total: number }> { + const conditions: SQL[] = []; + if (filters.actor) conditions.push(eq(auditLog.actor, filters.actor)); + if (filters.action) conditions.push(eq(auditLog.action, filters.action)); + if (filters.resource) conditions.push(eq(auditLog.resource, filters.resource)); + if (filters.resourceId) conditions.push(eq(auditLog.resourceId, filters.resourceId)); + if (filters.createdAfter) conditions.push(gt(auditLog.createdAt, filters.createdAfter)); + if (filters.createdBefore) conditions.push(lt(auditLog.createdAt, filters.createdBefore)); + + const where = conditions.length > 0 ? and(...conditions) : undefined; + + const [data, countResult] = await Promise.all([ + this.database.db + .select() + .from(auditLog) + .where(where) + .orderBy(desc(auditLog.createdAt)) + .limit(filters.limit ?? 50) + .offset(filters.offset ?? 0), + this.database.db + .select({ count: sql`count(*)::int` }) + .from(auditLog) + .where(where) + ]); + + return { data, total: countResult[0]?.count ?? 0 }; + } } diff --git a/src/contracts/runtime.ts b/src/contracts/runtime.ts index 9153d6b..5b2856c 100644 --- a/src/contracts/runtime.ts +++ b/src/contracts/runtime.ts @@ -165,12 +165,48 @@ export interface RuntimeCallOptions { deadline?: Date; } +/** Request to open a unified bidirectional session stream */ +export interface RuntimeOpenSessionRequest { + runId: string; + execution: ExecutionRequest; +} + +/** Handle to an open bidirectional StreamSession */ +export interface RuntimeSessionHandle { + /** Send an envelope through the open stream */ + send(envelope: RuntimeEnvelope): void; + /** Async iterable of raw events from the stream */ + events: AsyncIterable; + /** Close the write side (after all kickoff messages sent) */ + closeWrite(): void; + /** Abort the stream immediately */ + abort(): void; + /** The ack derived from the SessionStart echo (resolved after first response) */ + sessionAck: Promise; +} + +/** Stored runtime capabilities from Initialize response */ +export interface RuntimeCapabilities { + sessions?: { stream?: boolean }; + cancellation?: { cancelSession?: boolean }; + progress?: { progress?: boolean }; + manifest?: { getManifest?: boolean }; + modeRegistry?: { listModes?: boolean; listChanged?: boolean }; + roots?: { listRoots?: boolean; listChanged?: boolean }; +} + export interface RuntimeProvider { readonly kind: string; initialize(req: RuntimeInitializeRequest, opts?: RuntimeCallOptions): Promise; + + /** Open a unified bidirectional session — replaces startSession() + streamSession() */ + openSession(req: RuntimeOpenSessionRequest): RuntimeSessionHandle; + + /** @deprecated Use openSession() for new session creation. Kept for backward compat. */ startSession(req: RuntimeStartSessionRequest, opts?: RuntimeCallOptions): Promise; send(req: RuntimeSendRequest): Promise; + /** @deprecated Use openSession().events for streaming. Kept for reconnection fallback. */ streamSession(req: RuntimeStreamSessionRequest): AsyncIterable; getSession(req: RuntimeGetSessionRequest): Promise; cancelSession(req: RuntimeCancelSessionRequest): Promise; diff --git a/src/controllers/admin.controller.ts b/src/controllers/admin.controller.ts new file mode 100644 index 0000000..1683b45 --- /dev/null +++ b/src/controllers/admin.controller.ts @@ -0,0 +1,17 @@ +import { Controller, Post, HttpCode } from '@nestjs/common'; +import { ApiOperation, ApiTags } from '@nestjs/swagger'; +import { RustRuntimeProvider } from '../runtime/rust-runtime.provider'; + +@ApiTags('admin') +@Controller('admin') +export class AdminController { + constructor(private readonly rustRuntime: RustRuntimeProvider) {} + + @Post('circuit-breaker/reset') + @HttpCode(200) + @ApiOperation({ summary: 'Manually reset the circuit breaker to CLOSED state.' }) + resetCircuitBreaker() { + this.rustRuntime.resetCircuitBreaker(); + return { status: 'ok', state: 'CLOSED' }; + } +} diff --git a/src/controllers/audit.controller.ts b/src/controllers/audit.controller.ts new file mode 100644 index 0000000..e1b56a8 --- /dev/null +++ b/src/controllers/audit.controller.ts @@ -0,0 +1,27 @@ +import { Controller, Get, Query, ValidationPipe } from '@nestjs/common'; +import { ApiOperation, ApiTags } from '@nestjs/swagger'; +import { AuditService } from '../audit/audit.service'; +import { ListAuditQueryDto } from '../dto/list-audit-query.dto'; + +@ApiTags('audit') +@Controller('audit') +export class AuditController { + constructor(private readonly auditService: AuditService) {} + + @Get() + @ApiOperation({ summary: 'List audit log entries with optional filtering.' }) + async listAuditLogs( + @Query(new ValidationPipe({ transform: true, whitelist: true })) query: ListAuditQueryDto + ) { + return this.auditService.list({ + actor: query.actor, + action: query.action, + resource: query.resource, + resourceId: query.resourceId, + createdAfter: query.createdAfter, + createdBefore: query.createdBefore, + limit: query.limit ?? 50, + offset: query.offset ?? 0 + }); + } +} diff --git a/src/controllers/run-insights.controller.spec.ts b/src/controllers/run-insights.controller.spec.ts index e85da26..93b11c0 100644 --- a/src/controllers/run-insights.controller.spec.ts +++ b/src/controllers/run-insights.controller.spec.ts @@ -1,20 +1,30 @@ import { RunInsightsController } from './run-insights.controller'; import { RunInsightsService } from '../insights/run-insights.service'; +import { RunExecutorService } from '../runs/run-executor.service'; describe('RunInsightsController', () => { let controller: RunInsightsController; let mockInsightsService: { exportRun: jest.Mock; + exportRunJsonl: jest.Mock; compareRuns: jest.Mock; }; + let mockRunExecutor: { + cancel: jest.Mock; + }; beforeEach(() => { mockInsightsService = { exportRun: jest.fn(), + exportRunJsonl: jest.fn(), compareRuns: jest.fn() }; + mockRunExecutor = { + cancel: jest.fn() + }; controller = new RunInsightsController( - mockInsightsService as unknown as RunInsightsService + mockInsightsService as unknown as RunInsightsService, + mockRunExecutor as unknown as RunExecutorService ); }); @@ -45,6 +55,21 @@ describe('RunInsightsController', () => { eventLimit: undefined }); }); + + it('delegates to exportRunJsonl when format is jsonl', async () => { + const jsonl = '{"type":"header"}\n'; + mockInsightsService.exportRunJsonl.mockResolvedValue(jsonl); + + const query = { includeCanonical: true, includeRaw: false, eventLimit: 500, format: 'jsonl' as const }; + const result = await controller.exportRun('run-1', query as any); + + expect(mockInsightsService.exportRunJsonl).toHaveBeenCalledWith('run-1', { + includeCanonical: true, + includeRaw: false, + eventLimit: 500 + }); + expect(result).toBe(jsonl); + }); }); describe('compareRuns', () => { @@ -59,4 +84,37 @@ describe('RunInsightsController', () => { expect(result).toEqual(comparison); }); }); + + describe('batchCancel', () => { + it('cancels multiple runs and returns results', async () => { + mockRunExecutor.cancel + .mockResolvedValueOnce(undefined) + .mockRejectedValueOnce(new Error('not found')); + + const result = await controller.batchCancel({ runIds: ['run-1', 'run-2'] }); + + expect(result).toEqual([ + { runId: 'run-1', status: 'cancelled', error: undefined }, + { runId: 'run-2', status: 'failed', error: 'not found' } + ]); + expect(mockRunExecutor.cancel).toHaveBeenCalledWith('run-1', 'batch cancel'); + expect(mockRunExecutor.cancel).toHaveBeenCalledWith('run-2', 'batch cancel'); + }); + }); + + describe('batchExport', () => { + it('exports multiple runs and returns bundles', async () => { + const bundle1 = { run: { id: 'run-1' }, exportedAt: '2026-01-01T00:00:00Z' }; + const bundle2 = { run: { id: 'run-2' }, exportedAt: '2026-01-01T00:00:00Z' }; + mockInsightsService.exportRun + .mockResolvedValueOnce(bundle1) + .mockResolvedValueOnce(bundle2); + + const result = await controller.batchExport({ runIds: ['run-1', 'run-2'] }); + + expect(result).toEqual([bundle1, bundle2]); + expect(mockInsightsService.exportRun).toHaveBeenCalledWith('run-1', { includeCanonical: true, includeRaw: false }); + expect(mockInsightsService.exportRun).toHaveBeenCalledWith('run-2', { includeCanonical: true, includeRaw: false }); + }); + }); }); diff --git a/src/controllers/run-insights.controller.ts b/src/controllers/run-insights.controller.ts index ce4a6d1..85bc2fc 100644 --- a/src/controllers/run-insights.controller.ts +++ b/src/controllers/run-insights.controller.ts @@ -13,11 +13,15 @@ import { CompareRunsDto } from '../dto/compare-runs.dto'; import { ExportRunQueryDto } from '../dto/export-run-query.dto'; import { RunBundleExportDto, RunComparisonResultDto } from '../dto/run-responses.dto'; import { RunInsightsService } from '../insights/run-insights.service'; +import { RunExecutorService } from '../runs/run-executor.service'; @ApiTags('runs') @Controller('runs') export class RunInsightsController { - constructor(private readonly insightsService: RunInsightsService) {} + constructor( + private readonly insightsService: RunInsightsService, + private readonly runExecutor: RunExecutorService + ) {} @Get(':id/export') @ApiOperation({ summary: 'Export a full run bundle (run, session, projection, events, artifacts).' }) @@ -26,6 +30,14 @@ export class RunInsightsController { @Param('id', new ParseUUIDPipe()) id: string, @Query(new ValidationPipe({ transform: true, whitelist: true })) query: ExportRunQueryDto ) { + if (query.format === 'jsonl') { + const jsonl = await this.insightsService.exportRunJsonl(id, { + includeCanonical: query.includeCanonical, + includeRaw: query.includeRaw, + eventLimit: query.eventLimit + }); + return jsonl; + } return this.insightsService.exportRun(id, { includeCanonical: query.includeCanonical, includeRaw: query.includeRaw, @@ -42,4 +54,29 @@ export class RunInsightsController { ) { return this.insightsService.compareRuns(body.leftRunId, body.rightRunId); } + + @Post('batch/cancel') + @ApiOperation({ summary: 'Cancel multiple runs in batch.' }) + async batchCancel( + @Body(new ValidationPipe({ transform: true, whitelist: true })) body: { runIds: string[] } + ) { + const results = await Promise.allSettled( + body.runIds.map((id) => this.runExecutor.cancel(id, 'batch cancel')) + ); + return results.map((result, index) => ({ + runId: body.runIds[index], + status: result.status === 'fulfilled' ? 'cancelled' : 'failed', + error: result.status === 'rejected' ? (result.reason instanceof Error ? result.reason.message : String(result.reason)) : undefined + })); + } + + @Post('batch/export') + @ApiOperation({ summary: 'Export multiple runs in batch.' }) + async batchExport( + @Body(new ValidationPipe({ transform: true, whitelist: true })) body: { runIds: string[] } + ) { + return Promise.all( + body.runIds.map((id) => this.insightsService.exportRun(id, { includeCanonical: true, includeRaw: false })) + ); + } } diff --git a/src/controllers/runs.controller.spec.ts b/src/controllers/runs.controller.spec.ts index 186ca39..62ef409 100644 --- a/src/controllers/runs.controller.spec.ts +++ b/src/controllers/runs.controller.spec.ts @@ -12,12 +12,13 @@ describe('RunsController', () => { launch: jest.Mock; cancel: jest.Mock; sendSignal: jest.Mock; - updateContext: jest.Mock; }; let mockRunManager: { listRuns: jest.Mock; getRun: jest.Mock; getState: jest.Mock; + deleteRun: jest.Mock; + archiveRun: jest.Mock; }; let mockEventRepository: { listCanonicalByRun: jest.Mock; @@ -31,12 +32,13 @@ describe('RunsController', () => { launch: jest.fn(), cancel: jest.fn(), sendSignal: jest.fn(), - updateContext: jest.fn(), }; mockRunManager = { listRuns: jest.fn(), getRun: jest.fn(), getState: jest.fn(), + deleteRun: jest.fn(), + archiveRun: jest.fn(), }; mockEventRepository = { listCanonicalByRun: jest.fn(), @@ -85,6 +87,7 @@ describe('RunsController', () => { offset: 0, sortBy: 'createdAt', sortOrder: 'desc', + includeArchived: undefined, }); }); @@ -246,22 +249,4 @@ describe('RunsController', () => { }); }); - // =========================================================================== - // updateContext - // =========================================================================== - describe('updateContext', () => { - it('delegates to runExecutor.updateContext', async () => { - const contextResult = { messageId: 'msg-2', ack: { ok: true } }; - mockRunExecutor.updateContext.mockResolvedValue(contextResult); - - const body = { - from: 'agent-1', - context: { key: 'value' }, - }; - const result = await controller.updateContext('run-1', body as any); - - expect(mockRunExecutor.updateContext).toHaveBeenCalledWith('run-1', body); - expect(result).toEqual(contextResult); - }); - }); }); diff --git a/src/controllers/runs.controller.ts b/src/controllers/runs.controller.ts index 15f70f6..41b5706 100644 --- a/src/controllers/runs.controller.ts +++ b/src/controllers/runs.controller.ts @@ -1,8 +1,10 @@ import { Body, Controller, + Delete, Get, Headers, + HttpCode, MessageEvent, Param, ParseUUIDPipe, @@ -26,9 +28,9 @@ import { ExecutionRequestDto } from '../dto/execution-request.dto'; import { ListEventsQueryDto } from '../dto/list-events-query.dto'; import { ListRunsQueryDto } from '../dto/list-runs-query.dto'; import { ReplayRequestDto } from '../dto/replay-request.dto'; +import { CloneRunDto } from '../dto/clone-run.dto'; import { SendSignalDto } from '../dto/send-signal.dto'; import { StreamRunQueryDto } from '../dto/stream-run-query.dto'; -import { UpdateContextDto } from '../dto/update-context.dto'; import { CanonicalEventDto, CreateRunResponseDto, @@ -66,7 +68,8 @@ export class RunsController { limit: query.limit ?? 50, offset: query.offset ?? 0, sortBy: query.sortBy ?? 'createdAt', - sortOrder: query.sortOrder ?? 'desc' + sortOrder: query.sortOrder ?? 'desc', + includeArchived: query.includeArchived }); } @@ -268,13 +271,32 @@ export class RunsController { return this.runExecutor.sendSignal(id, body); } - @Post(':id/context') - @ApiOperation({ summary: 'Send a context update to a running session.' }) - @ApiBody({ type: UpdateContextDto }) - async updateContext( + @Post(':id/clone') + @ApiOperation({ summary: 'Clone a run with optional overrides.' }) + @ApiBody({ type: CloneRunDto }) + async cloneRun( @Param('id', new ParseUUIDPipe()) id: string, - @Body(new ValidationPipe({ transform: true, whitelist: true })) body: UpdateContextDto + @Body(new ValidationPipe({ transform: true, whitelist: true })) body: CloneRunDto ) { - return this.runExecutor.updateContext(id, body); + const run = await this.runExecutor.clone(id, body); + return { + runId: run.id, + status: run.status, + traceId: run.traceId ?? undefined + }; } + + @Delete(':id') + @ApiOperation({ summary: 'Delete a terminal run and all associated data.' }) + @HttpCode(204) + async deleteRun(@Param('id', new ParseUUIDPipe()) id: string) { + return this.runManager.deleteRun(id); + } + + @Post(':id/archive') + @ApiOperation({ summary: 'Archive a run, excluding it from default listings.' }) + async archiveRun(@Param('id', new ParseUUIDPipe()) id: string) { + return this.runManager.archiveRun(id); + } + } diff --git a/src/controllers/webhook.controller.ts b/src/controllers/webhook.controller.ts new file mode 100644 index 0000000..abca509 --- /dev/null +++ b/src/controllers/webhook.controller.ts @@ -0,0 +1,46 @@ +import { + Body, + Controller, + Delete, + Get, + HttpCode, + Param, + ParseUUIDPipe, + Post, + ValidationPipe +} from '@nestjs/common'; +import { ApiBody, ApiOperation, ApiTags } from '@nestjs/swagger'; +import { CreateWebhookDto } from '../dto/webhook.dto'; +import { WebhookService } from '../webhooks/webhook.service'; + +@ApiTags('webhooks') +@Controller('webhooks') +export class WebhookController { + constructor(private readonly webhookService: WebhookService) {} + + @Post() + @ApiOperation({ summary: 'Register a new webhook subscription.' }) + @ApiBody({ type: CreateWebhookDto }) + async createWebhook( + @Body(new ValidationPipe({ transform: true, whitelist: true })) body: CreateWebhookDto + ) { + return this.webhookService.register({ + url: body.url, + events: body.events ?? [], + secret: body.secret + }); + } + + @Get() + @ApiOperation({ summary: 'List all webhook subscriptions.' }) + async listWebhooks() { + return this.webhookService.list(); + } + + @Delete(':id') + @HttpCode(204) + @ApiOperation({ summary: 'Remove a webhook subscription.' }) + async deleteWebhook(@Param('id', new ParseUUIDPipe()) id: string) { + return this.webhookService.remove(id); + } +} diff --git a/src/db/schema.ts b/src/db/schema.ts index 7e2e299..f6546c7 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -54,6 +54,7 @@ export const runtimeSessions = pgTable( policyVersion: varchar('policy_version', { length: 128 }), initiatorParticipantId: varchar('initiator_participant_id', { length: 255 }), sessionState: varchar('session_state', { length: 64 }).notNull().default('SESSION_STATE_UNSPECIFIED'), + expiresAt: timestamp('expires_at', { withTimezone: true, mode: 'string' }), lastSeenAt: timestamp('last_seen_at', { withTimezone: true, mode: 'string' }), metadata: jsonb('metadata').$type>().notNull().default({}), createdAt: timestamp('created_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow(), @@ -191,3 +192,19 @@ export const runMetrics = pgTable( pk: primaryKey({ columns: [table.runId] }) }) ); + +export const webhooks = pgTable( + 'webhooks', + { + id: uuid('id').primaryKey(), + url: text('url').notNull(), + events: jsonb('events').$type().notNull().default([]), + secret: varchar('secret', { length: 255 }).notNull(), + active: integer('active').notNull().default(1), + createdAt: timestamp('created_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow(), + updatedAt: timestamp('updated_at', { withTimezone: true, mode: 'string' }).notNull().defaultNow() + }, + (table) => ({ + activeIdx: index('webhooks_active_idx').on(table.active) + }) +); diff --git a/src/dto/clone-run.dto.ts b/src/dto/clone-run.dto.ts new file mode 100644 index 0000000..4b8c322 --- /dev/null +++ b/src/dto/clone-run.dto.ts @@ -0,0 +1,15 @@ +import { IsOptional, IsArray, IsString, IsObject } from 'class-validator'; +import { ApiPropertyOptional } from '@nestjs/swagger'; + +export class CloneRunDto { + @ApiPropertyOptional({ description: 'Override tags for the cloned run' }) + @IsOptional() + @IsArray() + @IsString({ each: true }) + tags?: string[]; + + @ApiPropertyOptional({ description: 'Override context for the cloned run' }) + @IsOptional() + @IsObject() + context?: Record; +} diff --git a/src/dto/export-run-query.dto.ts b/src/dto/export-run-query.dto.ts index e701cc7..bd5f08d 100644 --- a/src/dto/export-run-query.dto.ts +++ b/src/dto/export-run-query.dto.ts @@ -1,5 +1,5 @@ import { ApiPropertyOptional } from '@nestjs/swagger'; -import { IsBoolean, IsInt, IsOptional, Max, Min } from 'class-validator'; +import { IsBoolean, IsInt, IsOptional, IsString, Max, Min } from 'class-validator'; import { Transform } from 'class-transformer'; export class ExportRunQueryDto { @@ -22,4 +22,9 @@ export class ExportRunQueryDto { @Max(50000) @Transform(({ value }) => (value !== undefined ? Number(value) : undefined)) eventLimit?: number; + + @ApiPropertyOptional({ description: 'Export format: json or jsonl.', default: 'json', enum: ['json', 'jsonl'] }) + @IsOptional() + @IsString() + format?: 'json' | 'jsonl'; } diff --git a/src/dto/list-audit-query.dto.ts b/src/dto/list-audit-query.dto.ts new file mode 100644 index 0000000..17c5598 --- /dev/null +++ b/src/dto/list-audit-query.dto.ts @@ -0,0 +1,50 @@ +import { IsOptional, IsString, IsInt, Min, Max } from 'class-validator'; +import { Type } from 'class-transformer'; +import { ApiPropertyOptional } from '@nestjs/swagger'; + +export class ListAuditQueryDto { + @ApiPropertyOptional() + @IsOptional() + @IsString() + actor?: string; + + @ApiPropertyOptional() + @IsOptional() + @IsString() + action?: string; + + @ApiPropertyOptional() + @IsOptional() + @IsString() + resource?: string; + + @ApiPropertyOptional() + @IsOptional() + @IsString() + resourceId?: string; + + @ApiPropertyOptional() + @IsOptional() + @IsString() + createdAfter?: string; + + @ApiPropertyOptional() + @IsOptional() + @IsString() + createdBefore?: string; + + @ApiPropertyOptional({ default: 50 }) + @IsOptional() + @Type(() => Number) + @IsInt() + @Min(1) + @Max(200) + limit?: number; + + @ApiPropertyOptional({ default: 0 }) + @IsOptional() + @Type(() => Number) + @IsInt() + @Min(0) + offset?: number; +} diff --git a/src/dto/list-runs-query.dto.ts b/src/dto/list-runs-query.dto.ts index 2aa22f5..68f5884 100644 --- a/src/dto/list-runs-query.dto.ts +++ b/src/dto/list-runs-query.dto.ts @@ -50,4 +50,9 @@ export class ListRunsQueryDto { @IsOptional() @IsIn(['asc', 'desc']) sortOrder?: 'asc' | 'desc'; + + @ApiPropertyOptional({ description: 'Include archived runs in listing', default: false }) + @IsOptional() + @Transform(({ value }) => value === 'true' || value === true) + includeArchived?: boolean; } diff --git a/src/dto/paginated-response.dto.ts b/src/dto/paginated-response.dto.ts new file mode 100644 index 0000000..f9e9d03 --- /dev/null +++ b/src/dto/paginated-response.dto.ts @@ -0,0 +1,15 @@ +import { ApiProperty } from '@nestjs/swagger'; + +export class PaginatedResponseDto { + @ApiProperty() + data!: T[]; + + @ApiProperty() + total!: number; + + @ApiProperty() + limit!: number; + + @ApiProperty() + offset!: number; +} diff --git a/src/dto/webhook.dto.ts b/src/dto/webhook.dto.ts new file mode 100644 index 0000000..f6db17e --- /dev/null +++ b/src/dto/webhook.dto.ts @@ -0,0 +1,18 @@ +import { IsArray, IsOptional, IsString, IsUrl } from 'class-validator'; +import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; + +export class CreateWebhookDto { + @ApiProperty({ description: 'URL to receive webhook POST requests' }) + @IsUrl() + url!: string; + + @ApiPropertyOptional({ description: 'Events to subscribe to. Empty = all events.', default: [] }) + @IsOptional() + @IsArray() + @IsString({ each: true }) + events?: string[]; + + @ApiProperty({ description: 'Secret used for HMAC-SHA256 signature verification' }) + @IsString() + secret!: string; +} diff --git a/src/events/event-normalizer.service.spec.ts b/src/events/event-normalizer.service.spec.ts index ef5efe9..933ceba 100644 --- a/src/events/event-normalizer.service.spec.ts +++ b/src/events/event-normalizer.service.spec.ts @@ -213,7 +213,7 @@ describe('EventNormalizerService', () => { const decisionFinalized = events.find((e) => e.type === 'decision.finalized'); expect(decisionFinalized).toBeDefined(); - expect(decisionFinalized!.subject).toEqual({ kind: 'decision', id: 'msg-1' }); + expect(decisionFinalized!.subject).toEqual({ kind: 'decision', id: 'commit-1' }); const stateChanged = events.find((e) => e.type === 'session.state.changed'); expect(stateChanged).toBeDefined(); diff --git a/src/events/event-normalizer.service.ts b/src/events/event-normalizer.service.ts index 65ff859..535e32a 100644 --- a/src/events/event-normalizer.service.ts +++ b/src/events/event-normalizer.service.ts @@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common'; import { randomUUID } from 'node:crypto'; import { CanonicalEvent, CanonicalEventType } from '../contracts/control-plane'; import { EventNormalizer, NormalizeContext, RawRuntimeEvent } from '../contracts/runtime'; +import { PROJECTION_SCHEMA_VERSION } from '../projection/projection.service'; import { ProtoRegistryService } from '../runtime/proto-registry.service'; @Injectable() @@ -111,7 +112,7 @@ export class EventNormalizerService implements EventNormalizer { runId, ts, derivedType, - this.deriveSubject(derivedType, envelope), + this.deriveSubject(derivedType, envelope, decoded), { modeName: envelope.mode, messageType: envelope.messageType, @@ -185,19 +186,31 @@ export class EventNormalizerService implements EventNormalizer { return null; } - private deriveSubject(type: CanonicalEventType, envelope: RawRuntimeEvent['envelope']): CanonicalEvent['subject'] { + private deriveSubject( + type: CanonicalEventType, + envelope: RawRuntimeEvent['envelope'], + decoded?: Record | null + ): CanonicalEvent['subject'] { if (!envelope) return undefined; + const payload = decoded as Record | undefined; + switch (type) { case 'signal.emitted': return { kind: 'signal', id: envelope.messageId }; case 'proposal.created': - case 'proposal.updated': - return { kind: 'proposal', id: envelope.messageId }; - case 'decision.finalized': - return { kind: 'decision', id: envelope.messageId }; + case 'proposal.updated': { + const proposalId = payload?.proposalId ?? payload?.proposal_id ?? payload?.requestId ?? payload?.request_id; + return { kind: 'proposal', id: String(proposalId ?? envelope.messageId) }; + } + case 'decision.finalized': { + const decisionId = payload?.commitmentId ?? payload?.commitment_id ?? payload?.decisionId ?? payload?.decision_id; + return { kind: 'decision', id: String(decisionId ?? envelope.messageId) }; + } case 'tool.called': - case 'tool.completed': - return { kind: 'tool', id: envelope.messageId }; + case 'tool.completed': { + const toolCallId = payload?.toolCallId ?? payload?.tool_call_id ?? payload?.requestId ?? payload?.request_id; + return { kind: 'tool', id: String(toolCallId ?? envelope.messageId) }; + } case 'progress.reported': return { kind: 'message', id: envelope.messageId }; default: @@ -219,7 +232,7 @@ export class EventNormalizerService implements EventNormalizer { seq: 0, ts, type, - schemaVersion: 1, + schemaVersion: PROJECTION_SCHEMA_VERSION, subject, source: { kind: 'runtime', diff --git a/src/events/run-event.service.ts b/src/events/run-event.service.ts index 253d132..4bb62e3 100644 --- a/src/events/run-event.service.ts +++ b/src/events/run-event.service.ts @@ -4,7 +4,7 @@ import { CanonicalEvent } from '../contracts/control-plane'; import { RawRuntimeEvent } from '../contracts/runtime'; import { DatabaseService } from '../db/database.service'; import { MetricsService } from '../metrics/metrics.service'; -import { ProjectionService } from '../projection/projection.service'; +import { ProjectionService, PROJECTION_SCHEMA_VERSION } from '../projection/projection.service'; import { EventRepository } from '../storage/event.repository'; import { RunRepository } from '../storage/run.repository'; import { StreamHubService } from './stream-hub.service'; @@ -32,7 +32,8 @@ export class RunEventService { ...event, id: randomUUID(), runId, - seq: startSeq + index + seq: startSeq + index, + schemaVersion: PROJECTION_SCHEMA_VERSION })); await this.eventRepository.appendCanonical(prepared, tx); return prepared; diff --git a/src/events/stream-hub.service.ts b/src/events/stream-hub.service.ts index 11dbb64..13ef794 100644 --- a/src/events/stream-hub.service.ts +++ b/src/events/stream-hub.service.ts @@ -86,6 +86,7 @@ export class StreamHubService implements OnModuleDestroy { } } }, 60_000); + if (typeof timer === 'object' && 'unref' in timer) timer.unref(); this.cleanupTimers.set(runId, timer); } diff --git a/src/insights/run-insights.service.ts b/src/insights/run-insights.service.ts index c053543..3149735 100644 --- a/src/insights/run-insights.service.ts +++ b/src/insights/run-insights.service.ts @@ -85,6 +85,34 @@ export class RunInsightsService { }; } + async exportRunJsonl( + runId: string, + options: { includeCanonical?: boolean; includeRaw?: boolean; eventLimit?: number } + ): Promise { + const bundle = await this.exportRun(runId, options); + const lines: string[] = []; + + lines.push(JSON.stringify({ + type: 'header', + run: bundle.run, + session: bundle.session, + projection: bundle.projection, + metrics: bundle.metrics, + artifacts: bundle.artifacts, + exportedAt: bundle.exportedAt + })); + + for (const event of bundle.canonicalEvents) { + lines.push(JSON.stringify({ ...event, type: 'canonical_event' })); + } + + for (const event of bundle.rawEvents) { + lines.push(JSON.stringify({ ...event, type: 'raw_event' })); + } + + return lines.join('\n') + '\n'; + } + async compareRuns(leftRunId: string, rightRunId: string): Promise { const [leftRun, rightRun] = await Promise.all([ this.runRepository.findById(leftRunId), diff --git a/src/runs/run-executor.service.ts b/src/runs/run-executor.service.ts index 5fb9268..4a92cab 100644 --- a/src/runs/run-executor.service.ts +++ b/src/runs/run-executor.service.ts @@ -1,4 +1,5 @@ import { BadRequestException, Injectable, Logger } from '@nestjs/common'; +import { randomUUID } from 'node:crypto'; import { ExecutionRequest } from '../contracts/control-plane'; import { ArtifactService } from '../artifacts/artifact.service'; import { AppConfigService } from '../config/app-config.service'; @@ -73,19 +74,28 @@ export class RunExecutorService { throw new BadRequestException('run is not in running state'); } const provider = this.runtimeRegistry.get(run.runtimeKind); - const session = await this.runtimeSessionRepository.findByRunId(runId); + // Runtime requires empty session_id and mode for Signal messages const sendResult = await provider.send({ runId, - runtimeSessionId: run.runtimeSessionId, - modeName: (session as any)?.modeName ?? '', + runtimeSessionId: '', + modeName: '', from: params.from, to: params.to, - messageType: params.messageType, + messageType: 'Signal', payload: Buffer.from(JSON.stringify(params.payload ?? {}), 'utf8'), payloadDescriptor: params.payload }); + // Check ack for errors + if (!sendResult.ack.ok && sendResult.ack.error) { + throw new AppException( + ErrorCode.SIGNAL_DISPATCH_FAILED, + `Runtime rejected signal: [${sendResult.ack.error.code}] ${sendResult.ack.error.message}`, + 502 + ); + } + await this.eventService.emitControlPlaneEvents(runId, [ { ts: new Date().toISOString(), @@ -106,44 +116,26 @@ export class RunExecutorService { return { messageId: sendResult.envelope.messageId, ack: sendResult.ack }; } - async updateContext(runId: string, params: { - from: string; - context: Record; - }) { + async clone(runId: string, overrides?: { tags?: string[]; context?: Record }) { const run = await this.runManager.getRun(runId); - if (!run.runtimeSessionId || run.status !== 'running') { - throw new BadRequestException('run is not in running state'); + const executionRequest = run.metadata?.executionRequest as ExecutionRequest | undefined; + if (!executionRequest) { + throw new BadRequestException('run does not have an execution request in metadata'); } - const provider = this.runtimeRegistry.get(run.runtimeKind); - const session = await this.runtimeSessionRepository.findByRunId(runId); - const sendResult = await provider.send({ - runId, - runtimeSessionId: run.runtimeSessionId, - modeName: (session as any)?.modeName ?? '', - from: params.from, - to: [], - messageType: 'ContextUpdate', - payload: Buffer.from(JSON.stringify(params.context), 'utf8'), - payloadDescriptor: params.context - }); - - await this.eventService.emitControlPlaneEvents(runId, [ - { - ts: new Date().toISOString(), - type: 'message.sent', - source: { kind: 'control-plane', name: 'run-executor' }, - subject: { kind: 'message', id: sendResult.envelope.messageId }, - data: { - sessionId: run.runtimeSessionId, - sender: params.from, - messageType: 'ContextUpdate', - payloadDescriptor: params.context - } - } - ]); + const cloned = { ...executionRequest }; + if (overrides?.tags) { + cloned.execution = { ...cloned.execution, tags: overrides.tags }; + } + if (overrides?.context) { + cloned.session = { ...cloned.session, context: overrides.context }; + } + // Clear idempotency key so clone creates a new run + if (cloned.execution) { + delete (cloned.execution as any).idempotencyKey; + } - return { messageId: sendResult.envelope.messageId, ack: sendResult.ack }; + return this.launch(cloned); } private async execute(runId: string, request: ExecutionRequest): Promise { @@ -152,7 +144,7 @@ export class RunExecutorService { try { await this.runManager.markStarted(runId, request); - // Phase 1.4: Mode validation + // Mode validation via Initialize const initResult = await this.traceService.withSpan( 'runtime.initialize', { @@ -162,7 +154,7 @@ export class RunExecutorService { }, async () => { return provider.initialize( - { clientName: 'macp-control-plane', clientVersion: '0.1.0' }, + { clientName: 'macp-control-plane', clientVersion: '0.2.0' }, { deadline: new Date(Date.now() + deadlineMs) } ); } @@ -179,66 +171,76 @@ export class RunExecutorService { ); } + // Open unified bidirectional session stream + const handle = provider.openSession({ runId, execution: request }); + + // Wait for SessionStart confirmation const session = await this.traceService.withSpan( - 'runtime.start_session', + 'runtime.open_session', { run_id: runId, runtime_kind: request.runtime.kind, mode_name: request.session.modeName }, - async () => provider.startSession( - { runId, execution: request }, - { deadline: new Date(Date.now() + deadlineMs) } - ) + async () => handle.sessionAck ); await this.runManager.bindSession(runId, request, session); + // Send kickoff messages through the bidirectional stream for (const message of request.kickoff ?? []) { try { const payload = message.payloadEnvelope ? this.protoRegistry.encodePayloadEnvelope(message.payloadEnvelope) : Buffer.from(JSON.stringify(message.payload ?? {}), 'utf8'); - // Phase 1.5: Kickoff message retry with exponential backoff - const sendResult = await this.retryKickoff(() => - this.traceService.withSpan( - 'runtime.send_kickoff', - { - run_id: runId, - runtime_kind: request.runtime.kind, - mode_name: request.session.modeName, - message_type: message.messageType, - sender: message.from + const kickoffEnvelope = { + macpVersion: '1.0', + mode: request.session.modeName, + messageType: message.messageType, + messageId: randomUUID(), + sessionId: session.runtimeSessionId, + sender: message.from, + timestampUnixMs: Date.now(), + payload + }; + + // Retry kickoff send through the stream handle + await this.retryKickoff(async () => { + handle.send(kickoffEnvelope); + return { + ack: { + ok: true, + duplicate: false, + messageId: kickoffEnvelope.messageId, + sessionId: session.runtimeSessionId, + acceptedAtUnixMs: Date.now(), + sessionState: 'SESSION_STATE_OPEN' as const }, - async () => - provider.send({ - runId, - runtimeSessionId: session.runtimeSessionId, - modeName: request.session.modeName, - from: message.from, - to: message.to, - messageType: message.messageType, - payload, - payloadDescriptor: (message.payloadEnvelope as unknown as Record) ?? message.payload, - metadata: message.metadata - }) - ) - ); + envelope: kickoffEnvelope + }; + }); await this.eventService.emitControlPlaneEvents(runId, [ { ts: new Date().toISOString(), type: 'message.sent', source: { kind: 'control-plane', name: 'run-executor' }, - subject: { kind: 'message', id: sendResult.envelope.messageId }, + subject: { kind: 'message', id: kickoffEnvelope.messageId }, data: { sessionId: session.runtimeSessionId, sender: message.from, to: message.to, messageType: message.messageType, kind: message.kind, - ack: sendResult.ack, + ack: { + ok: true, + duplicate: false, + messageId: kickoffEnvelope.messageId, + sessionId: session.runtimeSessionId, + acceptedAtUnixMs: Date.now(), + sessionState: 'SESSION_STATE_OPEN' + }, payloadDescriptor: (message.payloadEnvelope as unknown as Record) ?? message.payload ?? {} } } @@ -262,20 +264,26 @@ export class RunExecutorService { } } ]); + handle.abort(); await this.runManager.markFailed(runId, kickoffError); return; } } + // Half-close the write side — kickoff phase done + handle.closeWrite(); + const run = await this.runManager.markRunning(runId, session.runtimeSessionId); const subscriberId = session.initiator; + // Pass the session handle to the stream consumer await this.streamConsumer.start({ runId, execution: request, runtimeKind: request.runtime.kind, runtimeSessionId: session.runtimeSessionId, - subscriberId + subscriberId, + sessionHandle: handle }); if (run.traceId) { diff --git a/src/runs/run-manager.service.spec.ts b/src/runs/run-manager.service.spec.ts index 2abdd7a..4274a7c 100644 --- a/src/runs/run-manager.service.spec.ts +++ b/src/runs/run-manager.service.spec.ts @@ -5,7 +5,9 @@ import { RunRepository } from '../storage/run.repository'; import { RuntimeSessionRepository } from '../storage/runtime-session.repository'; import { ProjectionService } from '../projection/projection.service'; import { RunEventService } from '../events/run-event.service'; +import { AuditService } from '../audit/audit.service'; import { TraceService } from '../telemetry/trace.service'; +import { WebhookService } from '../webhooks/webhook.service'; import { ExecutionRequest, RunStateProjection } from '../contracts/control-plane'; function makeExecutionRequest(overrides?: Partial): ExecutionRequest { @@ -104,6 +106,18 @@ describe('RunManagerService', () => { endRunTrace: jest.fn(), }, }, + { + provide: AuditService, + useValue: { + record: jest.fn().mockResolvedValue(undefined), + }, + }, + { + provide: WebhookService, + useValue: { + fireEvent: jest.fn(), + }, + }, ], }).compile(); diff --git a/src/runs/run-manager.service.ts b/src/runs/run-manager.service.ts index 920dc1b..4a3f06c 100644 --- a/src/runs/run-manager.service.ts +++ b/src/runs/run-manager.service.ts @@ -1,11 +1,13 @@ -import { Injectable, NotFoundException } from '@nestjs/common'; +import { BadRequestException, Injectable, NotFoundException } from '@nestjs/common'; import { randomUUID } from 'node:crypto'; import { ExecutionRequest, RunStateProjection } from '../contracts/control-plane'; +import { AuditService } from '../audit/audit.service'; import { TraceService } from '../telemetry/trace.service'; import { ProjectionService } from '../projection/projection.service'; import { RunEventService } from '../events/run-event.service'; import { RunRepository } from '../storage/run.repository'; import { RuntimeSessionRepository } from '../storage/runtime-session.repository'; +import { WebhookService } from '../webhooks/webhook.service'; @Injectable() export class RunManagerService { @@ -14,7 +16,9 @@ export class RunManagerService { private readonly runtimeSessionRepository: RuntimeSessionRepository, private readonly projectionService: ProjectionService, private readonly runEventService: RunEventService, - private readonly traceService: TraceService + private readonly traceService: TraceService, + private readonly auditService: AuditService, + private readonly webhookService: WebhookService ) {} async createRun(request: ExecutionRequest) { @@ -172,6 +176,12 @@ export class RunManagerService { } } ]); + void this.webhookService.fireEvent({ + event: 'run.started', + runId, + status: 'running', + timestamp: new Date().toISOString() + }); return run; } @@ -195,6 +205,12 @@ export class RunManagerService { } } ]); + void this.webhookService.fireEvent({ + event: 'run.completed', + runId, + status: 'completed', + timestamp: new Date().toISOString() + }); return run; } @@ -218,6 +234,12 @@ export class RunManagerService { } } ]); + void this.webhookService.fireEvent({ + event: 'run.cancelled', + runId, + status: 'cancelled', + timestamp: new Date().toISOString() + }); return run; } @@ -243,6 +265,13 @@ export class RunManagerService { } } ]); + void this.webhookService.fireEvent({ + event: 'run.failed', + runId, + status: 'failed', + timestamp: new Date().toISOString(), + data: { error: message } + }); return run; } @@ -255,8 +284,49 @@ export class RunManagerService { offset?: number; sortBy?: 'createdAt' | 'updatedAt'; sortOrder?: 'asc' | 'desc'; + includeArchived?: boolean; }) { - return this.runRepository.list(filters); + const limit = filters.limit ?? 50; + const offset = filters.offset ?? 0; + const [data, total] = await Promise.all([ + this.runRepository.list(filters), + this.runRepository.listCount({ + status: filters.status, + tags: filters.tags, + createdAfter: filters.createdAfter, + createdBefore: filters.createdBefore, + includeArchived: filters.includeArchived, + }), + ]); + return { data, total, limit, offset }; + } + + async deleteRun(runId: string) { + const run = await this.getRun(runId); + const terminalStatuses = ['completed', 'failed', 'cancelled']; + if (!terminalStatuses.includes(run.status)) { + throw new BadRequestException('only terminal runs (completed, failed, cancelled) can be deleted'); + } + await this.auditService.record({ + actor: 'control-plane', + actorType: 'system', + action: 'run.deleted', + resource: 'run', + resourceId: runId + }); + await this.runRepository.delete(runId); + } + + async archiveRun(runId: string) { + await this.getRun(runId); + await this.auditService.record({ + actor: 'control-plane', + actorType: 'system', + action: 'run.archived', + resource: 'run', + resourceId: runId + }); + return this.runRepository.addTag(runId, 'archived'); } async getRun(runId: string) { diff --git a/src/runs/stream-consumer.service.spec.ts b/src/runs/stream-consumer.service.spec.ts index 1219c47..f9243b5 100644 --- a/src/runs/stream-consumer.service.spec.ts +++ b/src/runs/stream-consumer.service.spec.ts @@ -264,14 +264,57 @@ describe('StreamConsumerService', () => { }); describe('isHealthy()', () => { - it('should always return true', () => { + it('should return true when no active streams', () => { + expect(service.isHealthy()).toBe(true); + }); + + it('should return false when a stream is active but not connected', async () => { + const neverEndingIterable: AsyncIterable = { + [Symbol.asyncIterator]() { + return { + next: () => new Promise(() => {}), + }; + }, + }; + + const mockProvider = { + streamSession: jest.fn().mockReturnValue(neverEndingIterable), + getSession: jest.fn(), + }; + runtimeRegistry.get.mockReturnValue(mockProvider as any); + + await service.start({ + runId: 'health-run', + execution: { + mode: 'live' as const, + runtime: { kind: 'rust' }, + session: { + modeName: 'decision', + modeVersion: '1.0', + configurationVersion: '1.0', + ttlMs: 60000, + participants: [{ id: 'agent-1' }], + }, + }, + runtimeKind: 'rust', + runtimeSessionId: 'session-1', + subscriberId: 'sub-1', + }); + + // Stream is active but not yet connected + expect(service.isHealthy()).toBe(false); + + // Manually mark as connected to verify the other branch + const activeMap = (service as any).active as Map; + const marker = activeMap.get('health-run')!; + marker.connected = true; expect(service.isHealthy()).toBe(true); }); }); describe('finalizeRun idempotency', () => { it('should only finalize once even when called multiple times', async () => { - const marker = { aborted: false, finalized: false, lastProcessedSeq: 0 }; + const marker = { aborted: false, finalized: false, connected: true, lastProcessedSeq: 0 }; // Access the private finalizeRun method const finalizeRun = (service as any).finalizeRun.bind(service); @@ -289,7 +332,7 @@ describe('StreamConsumerService', () => { }); it('should call markFailed for failed status', async () => { - const marker = { aborted: false, finalized: false, lastProcessedSeq: 0 }; + const marker = { aborted: false, finalized: false, connected: true, lastProcessedSeq: 0 }; const finalizeRun = (service as any).finalizeRun.bind(service); const error = new Error('something went wrong'); diff --git a/src/runs/stream-consumer.service.ts b/src/runs/stream-consumer.service.ts index 4e6fa30..d93dcf3 100644 --- a/src/runs/stream-consumer.service.ts +++ b/src/runs/stream-consumer.service.ts @@ -1,6 +1,6 @@ import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common'; import { ExecutionRequest } from '../contracts/control-plane'; -import { RawRuntimeEvent } from '../contracts/runtime'; +import { RawRuntimeEvent, RuntimeSessionHandle } from '../contracts/runtime'; import { AppConfigService } from '../config/app-config.service'; import { EventNormalizerService } from '../events/event-normalizer.service'; import { RunEventService } from '../events/run-event.service'; @@ -12,6 +12,7 @@ import { RunManagerService } from './run-manager.service'; interface ActiveStream { aborted: boolean; finalized: boolean; + connected: boolean; lastProcessedSeq: number; } @@ -44,11 +45,13 @@ export class StreamConsumerService implements OnModuleDestroy { runtimeSessionId: string; subscriberId: string; resumeFromSeq?: number; + sessionHandle?: RuntimeSessionHandle; }): Promise { if (this.active.has(params.runId)) return; const marker: ActiveStream = { aborted: false, finalized: false, + connected: false, lastProcessedSeq: params.resumeFromSeq ?? 0 }; this.active.set(params.runId, marker); @@ -63,6 +66,10 @@ export class StreamConsumerService implements OnModuleDestroy { } isHealthy(): boolean { + if (this.active.size === 0) return true; + for (const [, marker] of this.active) { + if (!marker.aborted && !marker.finalized && !marker.connected) return false; + } return true; } @@ -99,6 +106,7 @@ export class StreamConsumerService implements OnModuleDestroy { runtimeKind: string; runtimeSessionId: string; subscriberId: string; + sessionHandle?: RuntimeSessionHandle; } ): Promise { const provider = this.runtimeRegistry.get(params.runtimeKind); @@ -110,15 +118,21 @@ export class StreamConsumerService implements OnModuleDestroy { let retries = 0; const maxRetries = this.config.streamMaxRetries; + let isFirstIteration = true; while (!marker.aborted) { try { - const iterable = provider.streamSession({ - runId: params.runId, - runtimeSessionId: params.runtimeSessionId, - modeName: params.execution.session.modeName, - subscriberId: params.subscriberId - }); + // First iteration: use the session handle's events if provided + // Subsequent iterations (reconnection): fall back to streamSession() + const iterable = (isFirstIteration && params.sessionHandle) + ? params.sessionHandle.events + : provider.streamSession({ + runId: params.runId, + runtimeSessionId: params.runtimeSessionId, + modeName: params.execution.session.modeName, + subscriberId: params.subscriberId + }); + isFirstIteration = false; for await (const raw of this.withIdleTimeout(iterable, this.config.streamIdleTimeoutMs)) { if (marker.aborted) return; @@ -167,6 +181,7 @@ export class StreamConsumerService implements OnModuleDestroy { ]); await new Promise((resolve) => setTimeout(resolve, this.backoffMs(retries))); } catch (error) { + marker.connected = false; retries += 1; this.logger.warn(`stream error for run ${params.runId}: ${error instanceof Error ? error.message : String(error)}`); await this.eventService.emitControlPlaneEvents(params.runId, [ @@ -251,6 +266,11 @@ export class StreamConsumerService implements OnModuleDestroy { runtimeSessionId: string, marker: ActiveStream ) { + // Track stream connectivity + if (raw.kind === 'stream-status' && raw.streamStatus?.status === 'opened') { + marker.connected = true; + } + const canonical = this.normalizer.normalize(runId, raw, context); const emitted = await this.eventService.persistRawAndCanonical(runId, raw, canonical); diff --git a/src/runtime/circuit-breaker.ts b/src/runtime/circuit-breaker.ts index 40b5bec..21d3dc6 100644 --- a/src/runtime/circuit-breaker.ts +++ b/src/runtime/circuit-breaker.ts @@ -5,6 +5,7 @@ export type CircuitBreakerState = 'CLOSED' | 'OPEN' | 'HALF_OPEN'; export interface CircuitBreakerConfig { failureThreshold: number; resetTimeoutMs: number; + onStateChange?: (state: CircuitBreakerState, event: 'success' | 'failure') => void; } export class CircuitBreaker { @@ -49,6 +50,7 @@ export class CircuitBreaker { } this.failureCount = 0; this.state = 'CLOSED'; + this.config.onStateChange?.(this.state, 'success'); } private onFailure(): void { @@ -61,5 +63,12 @@ export class CircuitBreaker { `Circuit breaker OPEN after ${this.failureCount} failures (reset in ${this.config.resetTimeoutMs}ms)` ); } + this.config.onStateChange?.(this.state, 'failure'); + } + + reset(): void { + this.failureCount = 0; + this.state = 'CLOSED'; + this.logger.log('Circuit breaker manually reset to CLOSED'); } } diff --git a/src/runtime/mock-runtime.provider.ts b/src/runtime/mock-runtime.provider.ts index 6a9ddc1..a7b22f5 100644 --- a/src/runtime/mock-runtime.provider.ts +++ b/src/runtime/mock-runtime.provider.ts @@ -11,10 +11,12 @@ import { RuntimeInitializeResult, RuntimeManifestResult, RuntimeModeDescriptor, + RuntimeOpenSessionRequest, RuntimeProvider, RuntimeRootDescriptor, RuntimeSendRequest, RuntimeSendResult, + RuntimeSessionHandle, RuntimeSessionSnapshot, RuntimeStartSessionRequest, RuntimeStartSessionResult, @@ -34,6 +36,48 @@ export class MockRuntimeProvider implements RuntimeProvider { }; } + openSession(req: RuntimeOpenSessionRequest): RuntimeSessionHandle { + const sessionId = randomUUID(); + const initiator = req.execution.session.participants[0]?.id ?? 'mock-initiator'; + const ack = this.makeAck(sessionId); + + const events: AsyncIterable = { + [Symbol.asyncIterator]() { + let done = false; + return { + async next(): Promise> { + if (done) return { done: true, value: undefined }; + done = true; + return { + done: false, + value: { + kind: 'stream-status', + receivedAt: new Date().toISOString(), + streamStatus: { status: 'opened' } + } + }; + }, + async return(): Promise> { + done = true; + return { done: true, value: undefined }; + } + }; + } + }; + + return { + send: () => { /* no-op */ }, + events, + closeWrite: () => { /* no-op */ }, + abort: () => { /* no-op */ }, + sessionAck: Promise.resolve({ + runtimeSessionId: sessionId, + initiator, + ack + }) + }; + } + async startSession(req: RuntimeStartSessionRequest): Promise { const sessionId = randomUUID(); return { diff --git a/src/runtime/runtime-provider.registry.ts b/src/runtime/runtime-provider.registry.ts index 7620d9e..1660783 100644 --- a/src/runtime/runtime-provider.registry.ts +++ b/src/runtime/runtime-provider.registry.ts @@ -1,10 +1,11 @@ import { Injectable, NotFoundException } from '@nestjs/common'; -import { RuntimeProvider } from '../contracts/runtime'; +import { RuntimeCapabilities, RuntimeProvider } from '../contracts/runtime'; import { RustRuntimeProvider } from './rust-runtime.provider'; @Injectable() export class RuntimeProviderRegistry { private readonly providers = new Map(); + private readonly capabilities = new Map(); constructor(private readonly rustProvider: RustRuntimeProvider) { this.register(rustProvider); @@ -22,6 +23,14 @@ export class RuntimeProviderRegistry { return provider; } + setCapabilities(kind: string, caps: RuntimeCapabilities): void { + this.capabilities.set(kind, caps); + } + + getCapabilities(kind: string): RuntimeCapabilities | undefined { + return this.capabilities.get(kind); + } + listKinds(): string[] { return Array.from(this.providers.keys()); } diff --git a/src/runtime/rust-runtime.provider.ts b/src/runtime/rust-runtime.provider.ts index a8de6ff..546d508 100644 --- a/src/runtime/rust-runtime.provider.ts +++ b/src/runtime/rust-runtime.provider.ts @@ -10,21 +10,26 @@ import { RuntimeAck, RuntimeCancelResult, RuntimeCancelSessionRequest, + RuntimeEnvelope, RuntimeGetSessionRequest, RuntimeHealth, RuntimeInitializeRequest, RuntimeInitializeResult, RuntimeManifestResult, RuntimeModeDescriptor, + RuntimeOpenSessionRequest, RuntimeProvider, RuntimeRootDescriptor, RuntimeSendRequest, RuntimeSendResult, + RuntimeSessionHandle, RuntimeSessionSnapshot, RuntimeStartSessionRequest, RuntimeStartSessionResult, RuntimeStreamSessionRequest } from '../contracts/runtime'; +import { AppException } from '../errors/app-exception'; +import { ErrorCode } from '../errors/error-codes'; import { CircuitBreaker } from './circuit-breaker'; import { ProtoRegistryService } from './proto-registry.service'; import { RuntimeCredentialResolverService } from './runtime-credential-resolver.service'; @@ -50,6 +55,10 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { return this.circuitBreaker.getState(); } + resetCircuitBreaker(): void { + this.circuitBreaker.reset(); + } + onModuleInit(): void { this.circuitBreaker = new CircuitBreaker({ failureThreshold: this.config.runtimeCircuitBreakerThreshold, @@ -155,6 +164,13 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { ); const ack = this.fromAck(response.ack); + if (!ack.ok && ack.error) { + throw new AppException( + ErrorCode.RUNTIME_UNAVAILABLE, + `Runtime rejected SessionStart: [${ack.error.code}] ${ack.error.message}`, + 502 + ); + } return { runtimeSessionId: ack.sessionId || runtimeSessionId, initiator: creds.sender, @@ -162,6 +178,209 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { }; } + openSession(req: RuntimeOpenSessionRequest): RuntimeSessionHandle { + const initiator = this.chooseInitiator(req.execution); + const participant = this.findParticipant(req.execution, initiator); + const runtimeSessionId = randomUUID(); + + const payload = this.protoRegistry.encodeMessage('macp.v1.SessionStartPayload', { + intent: req.execution.session.metadata?.intent ?? '', + participants: req.execution.session.participants.map((item) => item.id), + mode_version: req.execution.session.modeVersion, + configuration_version: req.execution.session.configurationVersion, + policy_version: req.execution.session.policyVersion ?? '', + ttl_ms: req.execution.session.ttlMs, + context: this.protoRegistry.encodeSessionContext( + req.execution.session.context, + req.execution.session.contextEnvelope + ), + roots: (req.execution.session.roots ?? []).map((root) => ({ uri: root.uri, name: root.name ?? '' })) + }); + + const sessionStartEnvelope = this.buildEnvelope({ + mode: req.execution.session.modeName, + messageType: 'SessionStart', + messageId: randomUUID(), + sessionId: runtimeSessionId, + sender: initiator, + payload + }); + + // Event-driven async queue for the bidirectional stream + const buffer: RawRuntimeEvent[] = []; + let resolveWait: (() => void) | null = null; + let ended = false; + let streamFailure: Error | null = null; + let grpcCall: any = null; + + const notify = () => { + if (resolveWait) { + const r = resolveWait; + resolveWait = null; + r(); + } + }; + + const waitForItem = (): Promise => + new Promise((r) => { + if (buffer.length > 0 || ended) { + r(); + } else { + resolveWait = r; + } + }); + + // Session ack promise — resolved when we receive the SessionStart echo + let resolveSessionAck: (result: RuntimeStartSessionResult) => void; + let rejectSessionAck: (err: Error) => void; + const sessionAck = new Promise((resolve, reject) => { + resolveSessionAck = resolve; + rejectSessionAck = reject; + }); + + let sessionAckSettled = false; + + // Launch the bidirectional stream asynchronously + const launch = async () => { + try { + const creds = await this.credentialResolver.resolve({ + runtimeKind: this.kind, + requester: req.execution.execution?.requester, + participant, + fallbackSender: initiator + }); + + const metadata = this.buildMetadata(creds.metadata); + const streamMethod = this.getClientMethod('StreamSession'); + grpcCall = streamMethod.call(this.client, metadata); + + grpcCall.on('data', (chunk: any) => { + const envelope = this.fromEnvelope(chunk.envelope); + const event: RawRuntimeEvent = { + kind: 'stream-envelope', + receivedAt: new Date().toISOString(), + envelope + }; + + // First envelope back is the SessionStart echo — resolve the ack + if (!sessionAckSettled && envelope.messageType === 'SessionStart') { + sessionAckSettled = true; + resolveSessionAck({ + runtimeSessionId: envelope.sessionId || runtimeSessionId, + initiator: creds.sender, + ack: { + ok: true, + duplicate: false, + messageId: envelope.messageId, + sessionId: envelope.sessionId || runtimeSessionId, + acceptedAtUnixMs: envelope.timestampUnixMs, + sessionState: 'SESSION_STATE_OPEN' + } + }); + } + + buffer.push(event); + notify(); + }); + + grpcCall.on('error', (error: Error) => { + streamFailure = error; + ended = true; + if (!sessionAckSettled) { + sessionAckSettled = true; + rejectSessionAck(error); + } + notify(); + }); + + grpcCall.on('end', () => { + ended = true; + if (!sessionAckSettled) { + sessionAckSettled = true; + rejectSessionAck(new Error('stream ended before SessionStart ack')); + } + notify(); + }); + + // Write the SessionStart envelope as the first frame + grpcCall.write({ envelope: this.toGrpcEnvelope(sessionStartEnvelope) }); + + } catch (error) { + ended = true; + if (!sessionAckSettled) { + sessionAckSettled = true; + rejectSessionAck(error instanceof Error ? error : new Error(String(error))); + } + notify(); + } + }; + + void launch(); + + // Build the async iterable for events + const events: AsyncIterable = { + [Symbol.asyncIterator]() { + let started = false; + return { + async next(): Promise> { + if (!started) { + started = true; + return { + done: false, + value: { + kind: 'stream-status', + receivedAt: new Date().toISOString(), + streamStatus: { status: 'opened' } + } + }; + } + + while (true) { + if (buffer.length > 0) { + return { done: false, value: buffer.shift()! }; + } + if (ended) { + if (streamFailure) throw streamFailure; + return { done: true, value: undefined }; + } + await waitForItem(); + } + }, + async return(): Promise> { + if (grpcCall) { + try { grpcCall.cancel(); } catch { /* ignore */ } + } + return { done: true, value: undefined }; + } + }; + } + }; + + const handle: RuntimeSessionHandle = { + send: (envelope: RuntimeEnvelope) => { + if (grpcCall && !ended) { + grpcCall.write({ envelope: this.toGrpcEnvelope(envelope) }); + } + }, + events, + closeWrite: () => { + if (grpcCall && !ended) { + grpcCall.end(); + } + }, + abort: () => { + ended = true; + if (grpcCall) { + try { grpcCall.cancel(); } catch { /* ignore */ } + } + notify(); + }, + sessionAck + }; + + return handle; + } + async send(req: RuntimeSendRequest): Promise { const participant = { id: req.from } as ParticipantRef; const creds = await this.credentialResolver.resolve({ @@ -185,10 +404,15 @@ export class RustRuntimeProvider implements RuntimeProvider, OnModuleInit { this.buildMetadata(creds.metadata) ); - return { - ack: this.fromAck(response.ack), - envelope - }; + const ack = this.fromAck(response.ack); + if (!ack.ok && ack.error) { + throw new AppException( + ErrorCode.RUNTIME_UNAVAILABLE, + `Runtime rejected message: [${ack.error.code}] ${ack.error.message}`, + 502 + ); + } + return { ack, envelope }; } async *streamSession(req: RuntimeStreamSessionRequest): AsyncIterable { diff --git a/src/storage/artifact.repository.ts b/src/storage/artifact.repository.ts index 7b90550..5dfbd54 100644 --- a/src/storage/artifact.repository.ts +++ b/src/storage/artifact.repository.ts @@ -24,6 +24,15 @@ export class ArtifactRepository { return { ...input, id, createdAt } satisfies Artifact; } + async findById(id: string) { + const rows = await this.database.db + .select() + .from(runArtifacts) + .where(eq(runArtifacts.id, id)) + .limit(1); + return rows[0] ?? null; + } + async listByRunId(runId: string) { return this.database.db .select() diff --git a/src/storage/run.repository.ts b/src/storage/run.repository.ts index de139cc..db274e2 100644 --- a/src/storage/run.repository.ts +++ b/src/storage/run.repository.ts @@ -175,6 +175,7 @@ export class RunRepository { sortBy?: 'createdAt' | 'updatedAt'; sortOrder?: 'asc' | 'desc'; includeSandbox?: boolean; + includeArchived?: boolean; }) { const conditions: SQL[] = []; @@ -194,6 +195,9 @@ export class RunRepository { if (!filters.includeSandbox) { conditions.push(sql`${runs.mode} != 'sandbox'`); } + if (!filters.includeArchived) { + conditions.push(sql`NOT (${runs.tags} @> '["archived"]'::jsonb)`); + } const sortCol = filters.sortBy === 'updatedAt' ? runs.updatedAt : runs.createdAt; const orderFn = filters.sortOrder === 'asc' ? asc : desc; @@ -208,4 +212,45 @@ export class RunRepository { return query; } + + async listCount(filters: { + status?: RunStatus; + tags?: string[]; + createdAfter?: string; + createdBefore?: string; + includeSandbox?: boolean; + includeArchived?: boolean; + }): Promise { + const conditions: SQL[] = []; + if (filters.status) conditions.push(eq(runs.status, filters.status)); + if (filters.tags && filters.tags.length > 0) { + conditions.push(sql`${runs.tags} @> ${JSON.stringify(filters.tags)}::jsonb`); + } + if (filters.createdAfter) conditions.push(gt(runs.createdAt, filters.createdAfter)); + if (filters.createdBefore) conditions.push(lt(runs.createdAt, filters.createdBefore)); + if (!filters.includeSandbox) conditions.push(sql`${runs.mode} != 'sandbox'`); + if (!filters.includeArchived) { + conditions.push(sql`NOT (${runs.tags} @> '["archived"]'::jsonb)`); + } + const result = await this.database.db + .select({ count: sql`count(*)::int` }) + .from(runs) + .where(conditions.length > 0 ? and(...conditions) : undefined); + return result[0]?.count ?? 0; + } + + async delete(id: string): Promise { + await this.database.db.delete(runs).where(eq(runs.id, id)); + } + + async addTag(id: string, tag: string): Promise { + await this.database.db.execute(sql` + UPDATE runs + SET tags = tags || ${JSON.stringify([tag])}::jsonb, + updated_at = now() + WHERE id = ${id} + AND NOT (tags @> ${JSON.stringify([tag])}::jsonb) + `); + return this.findByIdOrThrow(id); + } } diff --git a/src/storage/runtime-session.repository.ts b/src/storage/runtime-session.repository.ts index 4c2f60b..59b8439 100644 --- a/src/storage/runtime-session.repository.ts +++ b/src/storage/runtime-session.repository.ts @@ -22,6 +22,7 @@ export class RuntimeSessionRepository { policyVersion: input.policyVersion, initiatorParticipantId: input.initiatorParticipantId, sessionState: input.sessionState, + expiresAt: input.expiresAt, lastSeenAt: input.lastSeenAt, metadata: input.metadata, updatedAt: new Date().toISOString() diff --git a/src/telemetry/instrumentation.service.ts b/src/telemetry/instrumentation.service.ts index 862dc1c..743e637 100644 --- a/src/telemetry/instrumentation.service.ts +++ b/src/telemetry/instrumentation.service.ts @@ -44,6 +44,16 @@ export class InstrumentationService implements OnModuleInit { help: 'Circuit breaker state (0=closed, 1=half_open, 2=open)' }); + readonly circuitBreakerFailuresTotal = new client.Counter({ + name: 'macp_circuit_breaker_failures_total', + help: 'Total circuit breaker failure count' + }); + + readonly circuitBreakerSuccessTotal = new client.Counter({ + name: 'macp_circuit_breaker_success_total', + help: 'Total circuit breaker success count' + }); + onModuleInit(): void { client.collectDefaultMetrics(); } diff --git a/src/webhooks/webhook.repository.ts b/src/webhooks/webhook.repository.ts new file mode 100644 index 0000000..681977f --- /dev/null +++ b/src/webhooks/webhook.repository.ts @@ -0,0 +1,41 @@ +import { Injectable } from '@nestjs/common'; +import { eq } from 'drizzle-orm'; +import { randomUUID } from 'node:crypto'; +import { DatabaseService } from '../db/database.service'; +import { webhooks } from '../db/schema'; + +@Injectable() +export class WebhookRepository { + constructor(private readonly database: DatabaseService) {} + + async create(input: { url: string; events: string[]; secret: string }) { + const id = randomUUID(); + await this.database.db.insert(webhooks).values({ + id, + url: input.url, + events: input.events, + secret: input.secret, + active: 1, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString() + }); + return this.findById(id); + } + + async findById(id: string) { + const rows = await this.database.db.select().from(webhooks).where(eq(webhooks.id, id)).limit(1); + return rows[0] ?? null; + } + + async listActive() { + return this.database.db.select().from(webhooks).where(eq(webhooks.active, 1)); + } + + async list() { + return this.database.db.select().from(webhooks); + } + + async delete(id: string) { + await this.database.db.delete(webhooks).where(eq(webhooks.id, id)); + } +} diff --git a/src/webhooks/webhook.service.ts b/src/webhooks/webhook.service.ts new file mode 100644 index 0000000..4e10908 --- /dev/null +++ b/src/webhooks/webhook.service.ts @@ -0,0 +1,73 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { createHmac } from 'node:crypto'; +import { WebhookRepository } from './webhook.repository'; + +export interface WebhookPayload { + event: string; + runId: string; + status: string; + timestamp: string; + data?: Record; +} + +@Injectable() +export class WebhookService { + private readonly logger = new Logger(WebhookService.name); + + constructor(private readonly webhookRepository: WebhookRepository) {} + + async register(input: { url: string; events: string[]; secret: string }) { + return this.webhookRepository.create(input); + } + + async list() { + return this.webhookRepository.list(); + } + + async remove(id: string) { + return this.webhookRepository.delete(id); + } + + async fireEvent(payload: WebhookPayload): Promise { + const activeWebhooks = await this.webhookRepository.listActive(); + const matching = activeWebhooks.filter( + (wh) => wh.events.length === 0 || wh.events.includes(payload.event) + ); + + for (const webhook of matching) { + void this.deliver(webhook.url, webhook.secret, payload); + } + } + + private async deliver(url: string, secret: string, payload: WebhookPayload, attempt = 1): Promise { + const maxAttempts = 3; + const body = JSON.stringify(payload); + const signature = createHmac('sha256', secret).update(body).digest('hex'); + + try { + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-MACP-Signature': signature, + 'X-MACP-Event': payload.event + }, + body, + signal: AbortSignal.timeout(10_000) + }); + + if (!response.ok) { + throw new Error(`webhook returned ${response.status}`); + } + } catch (error) { + this.logger.warn( + `webhook delivery to ${url} failed (attempt ${attempt}/${maxAttempts}): ${error instanceof Error ? error.message : String(error)}` + ); + if (attempt < maxAttempts) { + const backoffMs = 1000 * 2 ** (attempt - 1); + await new Promise((resolve) => setTimeout(resolve, backoffMs)); + return this.deliver(url, secret, payload, attempt + 1); + } + } + } +}