Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions drizzle/0006_capabilities_and_stream.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- P1.3: Add capabilities, stream cursor, and stream timestamps to runtime_sessions
ALTER TABLE "runtime_sessions" ADD COLUMN "capabilities" JSONB NOT NULL DEFAULT '{}';
ALTER TABLE "runtime_sessions" ADD COLUMN "last_stream_cursor" INTEGER;
ALTER TABLE "runtime_sessions" ADD COLUMN "stream_connected_at" TIMESTAMP WITH TIME ZONE;
ALTER TABLE "runtime_sessions" ADD COLUMN "stream_disconnected_at" TIMESTAMP WITH TIME ZONE;
2 changes: 2 additions & 0 deletions drizzle/0007_canonical_schema_version.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- P1.4: Add schema_version to canonical events
ALTER TABLE "run_events_canonical" ADD COLUMN "schema_version" INTEGER NOT NULL DEFAULT 3;
3 changes: 3 additions & 0 deletions drizzle/0008_webhook_active_boolean.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- P1.5: Fix webhook active column from integer to boolean
ALTER TABLE "webhooks" ALTER COLUMN "active" TYPE BOOLEAN USING (active = 1);
ALTER TABLE "webhooks" ALTER COLUMN "active" SET DEFAULT true;
21 changes: 21 additions & 0 deletions drizzle/0009_outbound_messages.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- P2.2: Outbound message tracking table
CREATE TABLE "run_outbound_messages" (
"id" UUID PRIMARY KEY,
"run_id" UUID NOT NULL REFERENCES "runs"("id") ON DELETE CASCADE,
"runtime_session_id" VARCHAR(255) NOT NULL,
"message_id" VARCHAR(255) NOT NULL,
"message_type" VARCHAR(128) NOT NULL,
"category" VARCHAR(32) NOT NULL,
"sender" VARCHAR(255) NOT NULL,
"recipients" JSONB NOT NULL DEFAULT '[]',
"status" VARCHAR(32) NOT NULL DEFAULT 'queued',
"payload_descriptor" JSONB NOT NULL DEFAULT '{}',
"ack" JSONB,
"error_message" TEXT,
"created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
"accepted_at" TIMESTAMP WITH TIME ZONE,
"updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX ON "run_outbound_messages" ("message_id");
CREATE INDEX ON "run_outbound_messages" ("run_id");
CREATE INDEX ON "run_outbound_messages" ("status");
18 changes: 18 additions & 0 deletions drizzle/0010_webhook_deliveries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- P4.2: Durable webhook delivery tracking
CREATE TABLE "webhook_deliveries" (
"id" UUID PRIMARY KEY,
"webhook_id" UUID NOT NULL REFERENCES "webhooks"("id") ON DELETE CASCADE,
"event" VARCHAR(128) NOT NULL,
"run_id" UUID NOT NULL,
"payload" JSONB NOT NULL,
"status" VARCHAR(32) NOT NULL DEFAULT 'pending',
"attempts" INTEGER NOT NULL DEFAULT 0,
"last_attempt_at" TIMESTAMP WITH TIME ZONE,
"response_status" INTEGER,
"error_message" TEXT,
"created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
"delivered_at" TIMESTAMP WITH TIME ZONE
);
CREATE INDEX ON "webhook_deliveries" ("webhook_id");
CREATE INDEX ON "webhook_deliveries" ("status");
CREATE INDEX ON "webhook_deliveries" ("run_id");
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { EventRepository } from './storage/event.repository';
import { ArtifactRepository } from './storage/artifact.repository';
import { MetricsRepository } from './storage/metrics.repository';
import { ProjectionRepository } from './storage/projection.repository';
import { OutboundMessageRepository } from './storage/outbound-message.repository';
import { RunRepository } from './storage/run.repository';
import { RuntimeSessionRepository } from './storage/runtime-session.repository';
import { InstrumentationService } from './telemetry/instrumentation.service';
Expand All @@ -42,6 +43,7 @@ import { RunManagerService } from './runs/run-manager.service';
import { RunRecoveryService } from './runs/run-recovery.service';
import { StreamConsumerService } from './runs/stream-consumer.service';
import { WebhookController } from './controllers/webhook.controller';
import { WebhookDeliveryRepository } from './webhooks/webhook-delivery.repository';
import { WebhookRepository } from './webhooks/webhook.repository';
import { WebhookService } from './webhooks/webhook.service';

Expand All @@ -68,6 +70,7 @@ import { WebhookService } from './webhooks/webhook.service';
ProjectionRepository,
ArtifactRepository,
MetricsRepository,
OutboundMessageRepository,
StreamHubService,
EventNormalizerService,
ProjectionService,
Expand All @@ -82,6 +85,7 @@ import { WebhookService } from './webhooks/webhook.service';
RunRecoveryService,
RunInsightsService,
WebhookRepository,
WebhookDeliveryRepository,
WebhookService
]
})
Expand Down
7 changes: 7 additions & 0 deletions src/config/app-config.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ describe('AppConfigService', () => {
});
});

describe('clientVersion', () => {
it('should read version from package.json', () => {
const config = new AppConfigService();
expect(config.clientVersion).toBe('0.3.0');
});
});

describe('readNumber edge cases', () => {
it('should handle a valid number string for STREAM_BACKOFF_BASE_MS', () => {
process.env.STREAM_BACKOFF_BASE_MS = '500';
Expand Down
13 changes: 13 additions & 0 deletions src/config/app-config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ export class AppConfigService implements OnModuleInit {
readonly nodeEnv = process.env.NODE_ENV ?? 'development';
readonly isDevelopment = this.nodeEnv === 'development';

/** Read from package.json at startup */
readonly clientVersion: string = (() => {
try {
// eslint-disable-next-line @typescript-eslint/no-require-imports
return require('../../package.json').version as string;
} catch {
return '0.0.0';
}
})();

readonly port = readNumber('PORT', 3001);
readonly host = process.env.HOST ?? '0.0.0.0';
readonly corsOrigin = process.env.CORS_ORIGIN ?? 'http://localhost:3000';
Expand Down Expand Up @@ -68,6 +78,9 @@ export class AppConfigService implements OnModuleInit {
readonly dbPoolIdleTimeout = readNumber('DB_POOL_IDLE_TIMEOUT', 30000);
readonly dbPoolConnectionTimeout = readNumber('DB_POOL_CONNECTION_TIMEOUT', 5000);

readonly streamHubStrategy = process.env.STREAM_HUB_STRATEGY ?? 'memory';
readonly redisUrl = process.env.REDIS_URL ?? '';

readonly logLevel = process.env.LOG_LEVEL ?? 'info';
readonly otelEnabled = readBoolean('OTEL_ENABLED', false);
readonly otelServiceName = process.env.OTEL_SERVICE_NAME ?? 'macp-control-plane';
Expand Down
8 changes: 8 additions & 0 deletions src/contracts/control-plane.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ export interface ProgressProjection {
}>;
}

export interface OutboundMessageSummary {
total: number;
queued: number;
accepted: number;
rejected: number;
}

export interface RunStateProjection {
run: RunSummaryProjection;
participants: ParticipantProjection[];
Expand All @@ -245,6 +252,7 @@ export interface RunStateProjection {
progress: ProgressProjection;
timeline: TimelineProjection;
trace: TraceSummary;
outboundMessages: OutboundMessageSummary;
}

export interface ReplayRequest {
Expand Down
1 change: 1 addition & 0 deletions src/contracts/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export interface RuntimeInitializeResult {
websiteUrl?: string;
};
supportedModes: string[];
capabilities?: RuntimeCapabilities;
}

export interface RuntimeStartSessionRequest {
Expand Down
21 changes: 21 additions & 0 deletions src/controllers/run-insights.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ import {
Body,
Controller,
Get,
Header,
Param,
ParseUUIDPipe,
Post,
Query,
Res,
ValidationPipe
} from '@nestjs/common';
import type { Response } from 'express';
import { ApiBody, ApiOkResponse, ApiOperation, ApiTags } from '@nestjs/swagger';
import { CompareRunsDto } from '../dto/compare-runs.dto';
import { ExportRunQueryDto } from '../dto/export-run-query.dto';
Expand Down Expand Up @@ -45,6 +48,24 @@ export class RunInsightsController {
});
}

@Get(':id/export/stream')
@ApiOperation({ summary: 'Stream export as JSONL (newline-delimited JSON).' })
@Header('Content-Type', 'application/x-ndjson')
async streamExport(
@Param('id', new ParseUUIDPipe()) id: string,
@Query(new ValidationPipe({ transform: true, whitelist: true })) query: ExportRunQueryDto,
@Res() res: Response
) {
res.setHeader('Content-Type', 'application/x-ndjson');
res.setHeader('Transfer-Encoding', 'chunked');
for await (const line of this.insightsService.exportRunStream(id, {
includeRaw: query.includeRaw
})) {
res.write(line);
}
res.end();
}

@Post('compare')
@ApiOperation({ summary: 'Compare two runs side-by-side.' })
@ApiBody({ type: CompareRunsDto })
Expand Down
55 changes: 55 additions & 0 deletions src/controllers/runs.controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import { EventRepository } from '../storage/event.repository';
import { ReplayService } from '../replay/replay.service';
import { StreamHubService } from '../events/stream-hub.service';
import { AppConfigService } from '../config/app-config.service';
import { ProjectionService } from '../projection/projection.service';
import { OutboundMessageRepository } from '../storage/outbound-message.repository';

describe('RunsController', () => {
let controller: RunsController;
let mockRunExecutor: {
launch: jest.Mock;
cancel: jest.Mock;
sendSignal: jest.Mock;
updateContext: jest.Mock;
};
let mockRunManager: {
listRuns: jest.Mock;
Expand All @@ -26,12 +29,19 @@ describe('RunsController', () => {
let mockReplayService: Partial<ReplayService>;
let mockStreamHub: Partial<StreamHubService>;
let mockConfig: Partial<AppConfigService>;
let mockProjectionService: {
rebuild: jest.Mock;
};
let mockOutboundMessageRepository: {
listByRunId: jest.Mock;
};

beforeEach(() => {
mockRunExecutor = {
launch: jest.fn(),
cancel: jest.fn(),
sendSignal: jest.fn(),
updateContext: jest.fn(),
};
mockRunManager = {
listRuns: jest.fn(),
Expand All @@ -48,6 +58,12 @@ describe('RunsController', () => {
mockConfig = {
streamSseHeartbeatMs: 15000,
};
mockProjectionService = {
rebuild: jest.fn(),
};
mockOutboundMessageRepository = {
listByRunId: jest.fn(),
};

controller = new RunsController(
mockRunExecutor as unknown as RunExecutorService,
Expand All @@ -56,6 +72,8 @@ describe('RunsController', () => {
mockReplayService as unknown as ReplayService,
mockStreamHub as unknown as StreamHubService,
mockConfig as unknown as AppConfigService,
mockProjectionService as unknown as ProjectionService,
mockOutboundMessageRepository as unknown as OutboundMessageRepository,
);
});

Expand Down Expand Up @@ -249,4 +267,41 @@ describe('RunsController', () => {
});
});

// ===========================================================================
// updateContext
// ===========================================================================
describe('updateContext', () => {
it('delegates to runExecutor.updateContext', async () => {
const contextResult = { messageId: 'msg-ctx', ack: { ok: true } };
mockRunExecutor.updateContext.mockResolvedValue(contextResult);

const body = { from: 'agent-1', context: { key: 'value' } };
const result = await controller.updateContext('run-1', body as any);

expect(mockRunExecutor.updateContext).toHaveBeenCalledWith('run-1', body);
expect(result).toEqual(contextResult);
});
});

// ===========================================================================
// rebuildProjection
// ===========================================================================
describe('rebuildProjection', () => {
it('fetches events and delegates to projectionService.rebuild', async () => {
const fakeRun = { id: 'run-1', status: 'completed' };
mockRunManager.getRun.mockResolvedValue(fakeRun);
const events = [{ id: 'e1', seq: 1, type: 'run.created' }];
mockEventRepository.listCanonicalByRun.mockResolvedValue(events);
const projection = { run: { runId: 'run-1', status: 'completed' } };
mockProjectionService.rebuild.mockResolvedValue(projection);

const result = await controller.rebuildProjection('run-1');

expect(mockRunManager.getRun).toHaveBeenCalledWith('run-1');
expect(mockEventRepository.listCanonicalByRun).toHaveBeenCalledWith('run-1', 0, 100000);
expect(mockProjectionService.rebuild).toHaveBeenCalledWith('run-1', events);
expect(result).toEqual(projection);
});
});

});
40 changes: 39 additions & 1 deletion src/controllers/runs.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import { ReplayRequestDto } from '../dto/replay-request.dto';
import { CloneRunDto } from '../dto/clone-run.dto';
import { SendSignalDto } from '../dto/send-signal.dto';
import { StreamRunQueryDto } from '../dto/stream-run-query.dto';
import { UpdateContextDto } from '../dto/update-context.dto';
import { ProjectionService } from '../projection/projection.service';
import { OutboundMessageRepository } from '../storage/outbound-message.repository';
import {
CanonicalEventDto,
CreateRunResponseDto,
Expand All @@ -52,9 +55,20 @@ export class RunsController {
private readonly eventRepository: EventRepository,
private readonly replayService: ReplayService,
private readonly streamHub: StreamHubService,
private readonly config: AppConfigService
private readonly config: AppConfigService,
private readonly projectionService: ProjectionService,
private readonly outboundMessageRepository: OutboundMessageRepository
) {}

@Post('validate')
@ApiOperation({ summary: 'Preflight validation of an execution request without creating a run.' })
@ApiBody({ type: ExecutionRequestDto })
async validateRequest(
@Body(new ValidationPipe({ transform: true, whitelist: true })) body: ExecutionRequestDto
) {
return this.runExecutor.validate(body);
}

@Get()
@ApiOperation({ summary: 'List runs with optional filtering and pagination.' })
async listRuns(
Expand Down Expand Up @@ -293,6 +307,30 @@ export class RunsController {
return this.runManager.deleteRun(id);
}

@Get(':id/messages')
@ApiOperation({ summary: 'List outbound messages for a run.' })
async getRunMessages(@Param('id', new ParseUUIDPipe()) id: string) {
return this.outboundMessageRepository.listByRunId(id);
}

@Post(':id/context')
@ApiOperation({ summary: 'Update context during a running session.' })
@ApiBody({ type: UpdateContextDto })
async updateContext(
@Param('id', new ParseUUIDPipe()) id: string,
@Body(new ValidationPipe({ transform: true, whitelist: true })) body: UpdateContextDto
) {
return this.runExecutor.updateContext(id, body);
}

@Post(':id/projection/rebuild')
@ApiOperation({ summary: 'Rebuild the projection from persisted canonical events.' })
async rebuildProjection(@Param('id', new ParseUUIDPipe()) id: string) {
await this.runManager.getRun(id);
const events = await this.eventRepository.listCanonicalByRun(id, 0, 100000);
return this.projectionService.rebuild(id, events as any);
}

@Post(':id/archive')
@ApiOperation({ summary: 'Archive a run, excluding it from default listings.' })
async archiveRun(@Param('id', new ParseUUIDPipe()) id: string) {
Expand Down
Loading
Loading