Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/webapp/app/eventLoopMonitor.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { env } from "./env.server";
import { context, Context } from "@opentelemetry/api";
import { performance } from "node:perf_hooks";
import { logger } from "./services/logger.server";
import { signalsEmitter } from "./services/signals.server";

const THRESHOLD_NS = env.EVENT_LOOP_MONITOR_THRESHOLD_MS * 1e6;

Expand Down Expand Up @@ -110,6 +111,13 @@ function startEventLoopUtilizationMonitoring() {
lastEventLoopUtilization = currentEventLoopUtilization;
}, env.EVENT_LOOP_MONITOR_UTILIZATION_INTERVAL_MS);

signalsEmitter.on("SIGTERM", () => {
clearInterval(interval);
});
signalsEmitter.on("SIGINT", () => {
clearInterval(interval);
});

return () => {
clearInterval(interval);
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AuthenticatedEnvironment } from "../apiAuth.server";
import { logger } from "../logger.server";
import { signalsEmitter } from "../signals.server";
import { StreamIngestor, StreamResponder } from "./types";
import { LineTransformStream } from "./utils.server";
import { v1RealtimeStreams } from "./v1StreamsGlobal.server";
Expand Down Expand Up @@ -243,12 +244,17 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
}

function initializeRelayRealtimeStreams() {
return new RelayRealtimeStreams({
const service = new RelayRealtimeStreams({
ttl: 1000 * 60 * 5, // 5 minutes
cleanupInterval: 1000 * 60, // 1 minute
fallbackIngestor: v1RealtimeStreams,
fallbackResponder: v1RealtimeStreams,
});

signalsEmitter.on("SIGTERM", service.close.bind(service));
signalsEmitter.on("SIGINT", service.close.bind(service));

return service;
}

export const relayRealtimeStreams = singleton(
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import invariant from "tiny-invariant";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { provider } from "~/v3/tracer.server";
import { logger } from "./logger.server";
import { RunsReplicationService } from "./runsReplicationService.server";
import { signalsEmitter } from "./signals.server";

export const runsReplicationInstance = singleton(
"runsReplicationInstance",
Expand Down Expand Up @@ -80,8 +80,8 @@ function initializeRunsReplicationInstance() {
});
});

process.on("SIGTERM", service.shutdown.bind(service));
process.on("SIGINT", service.shutdown.bind(service));
signalsEmitter.on("SIGTERM", service.shutdown.bind(service));
signalsEmitter.on("SIGINT", service.shutdown.bind(service));
}

return service;
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ export class RunsReplicationService {
}

public async shutdown() {
if (this._isShuttingDown) return;

this._isShuttingDown = true;

this.logger.info("Initiating shutdown of runs replication service");
Expand Down
32 changes: 32 additions & 0 deletions apps/webapp/app/services/signals.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { EventEmitter } from "events";
import { singleton } from "~/utils/singleton";

export type SignalsEvents = {
SIGTERM: [
{
time: Date;
signal: NodeJS.Signals;
}
];
SIGINT: [
{
time: Date;
signal: NodeJS.Signals;
}
];
};

export type SignalsEventArgs<T extends keyof SignalsEvents> = SignalsEvents[T];

export type SignalsEmitter = EventEmitter<SignalsEvents>;

function initializeSignalsEmitter() {
const emitter = new EventEmitter<SignalsEvents>();

process.on("SIGTERM", () => emitter.emit("SIGTERM", { time: new Date(), signal: "SIGTERM" }));
process.on("SIGINT", () => emitter.emit("SIGINT", { time: new Date(), signal: "SIGINT" }));

return emitter;
}

export const signalsEmitter = singleton("signalsEmitter", initializeSignalsEmitter);
40 changes: 36 additions & 4 deletions apps/webapp/app/v3/dynamicFlushScheduler.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Logger } from "@trigger.dev/core/logger";
import { nanoid } from "nanoid";
import pLimit from "p-limit";
import { signalsEmitter } from "~/services/signals.server";

export type DynamicFlushSchedulerConfig<T> = {
batchSize: number;
Expand All @@ -22,6 +23,7 @@ export class DynamicFlushScheduler<T> {
private readonly BATCH_SIZE: number;
private readonly FLUSH_INTERVAL: number;
private flushTimer: NodeJS.Timeout | null;
private metricsReporterTimer: NodeJS.Timeout | undefined;
private readonly callback: (flushId: string, batch: T[]) => Promise<void>;

// New properties for dynamic scaling
Expand All @@ -41,6 +43,7 @@ export class DynamicFlushScheduler<T> {
droppedEvents: 0,
droppedEventsByKind: new Map<string, number>(),
};
private isShuttingDown: boolean = false;

// New properties for load shedding
private readonly loadSheddingThreshold: number;
Expand Down Expand Up @@ -75,6 +78,7 @@ export class DynamicFlushScheduler<T> {

this.startFlushTimer();
this.startMetricsReporter();
this.setupShutdownHandlers();
}

addToBatch(items: T[]): void {
Expand Down Expand Up @@ -119,8 +123,8 @@ export class DynamicFlushScheduler<T> {
this.currentBatch.push(...itemsToAdd);
this.totalQueuedItems += itemsToAdd.length;

// Check if we need to create a batch
if (this.currentBatch.length >= this.currentBatchSize) {
// Check if we need to create a batch (if we are shutting down, create a batch immediately because the flush timer is stopped)
if (this.currentBatch.length >= this.currentBatchSize || this.isShuttingDown) {
this.createBatch();
}

Expand All @@ -137,6 +141,23 @@ export class DynamicFlushScheduler<T> {
this.resetFlushTimer();
}

private setupShutdownHandlers(): void {
signalsEmitter.on("SIGTERM", () =>
this.shutdown().catch((error) => {
this.logger.error("Error shutting down dynamic flush scheduler", {
error,
});
})
);
signalsEmitter.on("SIGINT", () =>
this.shutdown().catch((error) => {
this.logger.error("Error shutting down dynamic flush scheduler", {
error,
});
})
);
}

private startFlushTimer(): void {
this.flushTimer = setInterval(() => this.checkAndFlush(), this.FLUSH_INTERVAL);
}
Expand All @@ -145,6 +166,9 @@ export class DynamicFlushScheduler<T> {
if (this.flushTimer) {
clearInterval(this.flushTimer);
}

if (this.isShuttingDown) return;

this.startFlushTimer();
}

Expand Down Expand Up @@ -226,7 +250,7 @@ export class DynamicFlushScheduler<T> {
}

private lastConcurrencyAdjustment: number = Date.now();

private adjustConcurrency(backOff: boolean = false): void {
const currentConcurrency = this.limiter.concurrency;
let newConcurrency = currentConcurrency;
Expand Down Expand Up @@ -281,7 +305,7 @@ export class DynamicFlushScheduler<T> {

private startMetricsReporter(): void {
// Report metrics every 30 seconds
setInterval(() => {
this.metricsReporterTimer = setInterval(() => {
const droppedByKind: Record<string, number> = {};
this.metrics.droppedEventsByKind.forEach((count, kind) => {
droppedByKind[kind] = count;
Expand Down Expand Up @@ -356,10 +380,18 @@ export class DynamicFlushScheduler<T> {

// Graceful shutdown
async shutdown(): Promise<void> {
if (this.isShuttingDown) return;

this.isShuttingDown = true;

if (this.flushTimer) {
clearInterval(this.flushTimer);
}

if (this.metricsReporterTimer) {
clearInterval(this.metricsReporterTimer);
}

// Flush any remaining items
if (this.currentBatch.length > 0) {
this.createBatch();
Expand Down
9 changes: 7 additions & 2 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import z from "zod";
import { env } from "~/env.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { signalsEmitter } from "~/services/signals.server";
import { singleton } from "~/utils/singleton";
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
import { concurrencyTracker } from "../services/taskRunConcurrencyTracker.server";
Expand Down Expand Up @@ -112,6 +113,7 @@ export class MarQS {
private queueDequeueCooloffPeriod: Map<string, number> = new Map();
private queueDequeueCooloffCounts: Map<string, number> = new Map();
private clearCooloffPeriodInterval: NodeJS.Timeout;
isShuttingDown: boolean = false;

constructor(private readonly options: MarQSOptions) {
this.redis = options.redis;
Expand Down Expand Up @@ -151,11 +153,14 @@ export class MarQS {
}

#setupShutdownHandlers() {
process.on("SIGTERM", () => this.shutdown("SIGTERM"));
process.on("SIGINT", () => this.shutdown("SIGINT"));
signalsEmitter.on("SIGTERM", () => this.shutdown("SIGTERM"));
signalsEmitter.on("SIGINT", () => this.shutdown("SIGINT"));
}

async shutdown(signal: NodeJS.Signals) {
if (this.isShuttingDown) return;
this.isShuttingDown = true;

console.log("👇 Shutting down marqs", this.name, signal);
clearInterval(this.clearCooloffPeriodInterval);
this.#rebalanceWorkers.forEach((worker) => worker.stop());
Expand Down
33 changes: 7 additions & 26 deletions apps/webapp/app/v3/tracing.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,14 @@ export async function startSpanWithEnv<T>(
fn: (span: Span) => Promise<T>,
options?: SpanOptions
): Promise<T> {
return startSpan(
tracer,
name,
async (span) => {
try {
return await fn(span);
} catch (e) {
if (e instanceof Error) {
span.recordException(e);
} else {
span.recordException(new Error(String(e)));
}

throw e;
} finally {
span.end();
}
return startSpan(tracer, name, fn, {
...options,
attributes: {
...attributesFromAuthenticatedEnv(env),
...options?.attributes,
},
{
attributes: {
...attributesFromAuthenticatedEnv(env),
...options?.attributes,
},
kind: SpanKind.SERVER,
...options,
}
);
kind: SpanKind.SERVER,
});
}

export async function emitDebugLog(
Expand Down
Loading
Loading