From 461afb9ead93862b864749487dc0b86ae5af9cd6 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 1 Dec 2025 15:38:37 +0000 Subject: [PATCH] fix(clickhouse): ensure start_time is never older than X ms to prevent old partition merge issues --- apps/webapp/app/env.server.ts | 8 +- .../clickhouseEventRepository.server.ts | 161 +++++++++++++++--- ...lickhouseEventRepositoryInstance.server.ts | 1 + references/hello-world/src/trigger/example.ts | 2 +- 4 files changed, 145 insertions(+), 27 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 41dcd620a2..4a56bf37be 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1149,8 +1149,14 @@ const EnvironmentSchema = z EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT: z.string().default("1"), EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE: z.coerce.number().int().default(10485760), EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS: z.coerce.number().int().default(5000), + EVENTS_CLICKHOUSE_START_TIME_MAX_AGE_MS: z.coerce + .number() + .int() + .default(60_000 * 5), // 5 minutes EVENT_REPOSITORY_CLICKHOUSE_ROLLOUT_PERCENT: z.coerce.number().optional(), - EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse", "clickhouse_v2"]).default("postgres"), + EVENT_REPOSITORY_DEFAULT_STORE: z + .enum(["postgres", "clickhouse", "clickhouse_v2"]) + .default("postgres"), EVENT_REPOSITORY_DEBUG_LOGS_DISABLED: BoolEnv.default(false), EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000), EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000), diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts index 1731937605..f3c0762e65 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts @@ -73,6 +73,12 @@ export type ClickhouseEventRepositoryConfig = { maximumTraceSummaryViewCount?: number; maximumTraceDetailedSummaryViewCount?: number; maximumLiveReloadingSetting?: number; + /** + * Maximum age in milliseconds for start_time. If start_time is older than this threshold, + * it will be clamped to the current time when creating events. + * If not provided, no clamping will be done. + */ + startTimeMaxAgeMs?: number; /** * The version of the ClickHouse task_events table to use. * - "v1": Uses task_events_v1 (partitioned by start_time) @@ -123,6 +129,54 @@ export class ClickhouseEventRepository implements IEventRepository { return this._config.maximumLiveReloadingSetting ?? 1000; } + /** + * Clamps a start time (in nanoseconds) to now if it's too far in the past. + * Returns the clamped value as a bigint. + */ + #clampStartTimeNanoseconds(startTimeNs: bigint): bigint { + if (!this._config.startTimeMaxAgeMs) { + return startTimeNs; + } + + const nowNs = getNowInNanoseconds(); + const maxAgeNs = BigInt(this._config.startTimeMaxAgeMs) * 1_000_000n; // ms to ns + const minAllowedStartTime = nowNs - maxAgeNs; + + if (startTimeNs < minAllowedStartTime) { + return nowNs; + } + + return startTimeNs; + } + + /** + * Clamps a start time string (nanoseconds as string) to now if it's too far in the past. + * Returns the formatted string for ClickHouse. + */ + #clampAndFormatStartTime(startTimeNsString: string): string { + const startTimeNs = BigInt(startTimeNsString); + const clampedNs = this.#clampStartTimeNanoseconds(startTimeNs); + return formatClickhouseDate64NanosecondsEpochString(clampedNs.toString()); + } + + /** + * Clamps a Date start time to now if it's too far in the past. + */ + #clampStartTimeDate(startTime: Date): Date { + if (!this._config.startTimeMaxAgeMs) { + return startTime; + } + + const now = new Date(); + const minAllowedStartTime = new Date(now.getTime() - this._config.startTimeMaxAgeMs); + + if (startTime < minAllowedStartTime) { + return now; + } + + return startTime; + } + async #flushBatch(flushId: string, events: (TaskEventV1Input | TaskEventV2Input)[]) { await startSpan(this._tracer, "flushBatch", async (span) => { span.setAttribute("flush_id", flushId); @@ -197,7 +251,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: event.projectId, task_identifier: event.taskSlug, run_id: event.runId, - start_time: formatClickhouseDate64NanosecondsEpochString(event.startTime.toString()), + start_time: this.#clampAndFormatStartTime(event.startTime.toString()), duration: formatClickhouseUnsignedIntegerString(event.duration ?? 0), trace_id: event.traceId, span_id: event.spanId, @@ -266,7 +320,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: event.projectId, task_identifier: event.taskSlug, run_id: event.runId, - start_time: formatClickhouseDate64NanosecondsEpochString( + start_time: this.#clampAndFormatStartTime( convertDateToNanoseconds(spanEvent.time).toString() ), duration: "0", // Events have no duration @@ -302,7 +356,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: event.projectId, task_identifier: event.taskSlug, run_id: event.runId, - start_time: formatClickhouseDate64NanosecondsEpochString( + start_time: this.#clampAndFormatStartTime( convertDateToNanoseconds(spanEvent.time).toString() ), duration: "0", // Events have no duration @@ -332,7 +386,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: event.projectId, task_identifier: event.taskSlug, run_id: event.runId, - start_time: formatClickhouseDate64NanosecondsEpochString( + start_time: this.#clampAndFormatStartTime( convertDateToNanoseconds(spanEvent.time).toString() ), duration: "0", // Events have no duration @@ -366,7 +420,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: event.projectId, task_identifier: event.taskSlug, run_id: event.runId, - start_time: formatClickhouseDate64NanosecondsEpochString( + start_time: this.#clampAndFormatStartTime( convertDateToNanoseconds(spanEvent.time).toString() ), duration: "0", // Events have no duration @@ -534,7 +588,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: options.environment.projectId, task_identifier: options.taskSlug, run_id: options.attributes.runId, - start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), + start_time: this.#clampAndFormatStartTime(startTime.toString()), duration: formatClickhouseUnsignedIntegerString(duration), trace_id: traceId, span_id: spanId, @@ -635,7 +689,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: options.environment.projectId, task_identifier: options.taskSlug, run_id: options.attributes.runId, - start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), + start_time: this.#clampAndFormatStartTime(startTime.toString()), duration: formatClickhouseUnsignedIntegerString(options.incomplete ? 0 : duration), trace_id: traceId, span_id: spanId, @@ -669,7 +723,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: options.environment.projectId, task_identifier: options.taskSlug, run_id: options.attributes.runId, - start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), + start_time: this.#clampAndFormatStartTime(startTime.toString()), duration: formatClickhouseUnsignedIntegerString(options.incomplete ? 0 : duration), trace_id: traceId, span_id: spanId, @@ -707,7 +761,8 @@ export class ClickhouseEventRepository implements IEventRepository { return; } - const startTime = convertDateToNanoseconds(run.createdAt); + const clampedCreatedAt = this.#clampStartTimeDate(run.createdAt); + const startTime = convertDateToNanoseconds(clampedCreatedAt); const expiresAt = convertDateToClickhouseDateTime( new Date(run.createdAt.getTime() + 30 * 24 * 60 * 60 * 1000) ); @@ -757,7 +812,8 @@ export class ClickhouseEventRepository implements IEventRepository { return; } - const startTime = convertDateToNanoseconds(spanCreatedAt); + const clampedSpanCreatedAt = this.#clampStartTimeDate(spanCreatedAt); + const startTime = convertDateToNanoseconds(clampedSpanCreatedAt); const expiresAt = convertDateToClickhouseDateTime( new Date(run.createdAt.getTime() + 30 * 24 * 60 * 60 * 1000) ); @@ -799,7 +855,8 @@ export class ClickhouseEventRepository implements IEventRepository { return; } - const startTime = convertDateToNanoseconds(run.createdAt); + const clampedCreatedAt = this.#clampStartTimeDate(run.createdAt); + const startTime = convertDateToNanoseconds(clampedCreatedAt); const expiresAt = convertDateToClickhouseDateTime( new Date(run.createdAt.getTime() + 30 * 24 * 60 * 60 * 1000) ); @@ -847,7 +904,8 @@ export class ClickhouseEventRepository implements IEventRepository { return; } - const startTime = convertDateToNanoseconds(run.createdAt); + const clampedCreatedAt = this.#clampStartTimeDate(run.createdAt); + const startTime = convertDateToNanoseconds(clampedCreatedAt); const expiresAt = convertDateToClickhouseDateTime( new Date(run.createdAt.getTime() + 30 * 24 * 60 * 60 * 1000) ); @@ -895,7 +953,8 @@ export class ClickhouseEventRepository implements IEventRepository { return; } - const startTime = convertDateToNanoseconds(endTime ?? new Date()); + const clampedEndTime = this.#clampStartTimeDate(endTime ?? new Date()); + const startTime = convertDateToNanoseconds(clampedEndTime); const expiresAt = convertDateToClickhouseDateTime( new Date(run.createdAt.getTime() + 30 * 24 * 60 * 60 * 1000) ); @@ -939,7 +998,8 @@ export class ClickhouseEventRepository implements IEventRepository { return; } - const startTime = convertDateToNanoseconds(run.createdAt); + const clampedCreatedAt = this.#clampStartTimeDate(run.createdAt); + const startTime = convertDateToNanoseconds(clampedCreatedAt); const expiresAt = convertDateToClickhouseDateTime( new Date(run.createdAt.getTime() + 30 * 24 * 60 * 60 * 1000) ); @@ -980,6 +1040,9 @@ export class ClickhouseEventRepository implements IEventRepository { options?: { includeDebugLogs?: boolean } ): Promise { const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000); + const endCreatedAtWithBuffer = endCreatedAt + ? new Date(endCreatedAt.getTime() + 60_000) + : undefined; const queryBuilder = this._version === "v2" @@ -992,9 +1055,9 @@ export class ClickhouseEventRepository implements IEventRepository { startCreatedAt: convertDateToNanoseconds(startCreatedAtWithBuffer).toString(), }); - if (endCreatedAt) { + if (endCreatedAtWithBuffer) { queryBuilder.where("start_time <= {endCreatedAt: String}", { - endCreatedAt: convertDateToNanoseconds(endCreatedAt).toString(), + endCreatedAt: convertDateToNanoseconds(endCreatedAtWithBuffer).toString(), }); } @@ -1155,8 +1218,20 @@ export class ClickhouseEventRepository implements IEventRepository { } let span: SpanDetail | undefined; + let earliestStartTime: Date | undefined; for (const record of records) { + const recordStartTime = convertClickhouseDateTime64ToJsDate(record.start_time); + + // Track the earliest start time across all records + if ( + record.kind !== "ANCESTOR_OVERRIDE" && + record.kind !== "SPAN_EVENT" && + (!earliestStartTime || recordStartTime < earliestStartTime) + ) { + earliestStartTime = recordStartTime; + } + if (!span) { span = { spanId: spanId, @@ -1166,7 +1241,7 @@ export class ClickhouseEventRepository implements IEventRepository { isPartial: true, // Partial by default, can only be set to false isCancelled: false, level: kindToLevel(record.kind), - startTime: convertClickhouseDateTime64ToJsDate(record.start_time), + startTime: recordStartTime, duration: typeof record.duration === "number" ? record.duration : Number(record.duration), events: [], style: {}, @@ -1193,7 +1268,7 @@ export class ClickhouseEventRepository implements IEventRepository { // We need to add an event to the span span.events.push({ name: record.message, - time: convertClickhouseDateTime64ToJsDate(record.start_time), + time: recordStartTime, properties: parsedMetadata ?? {}, }); } @@ -1241,7 +1316,6 @@ export class ClickhouseEventRepository implements IEventRepository { span.duration = typeof record.duration === "number" ? record.duration : Number(record.duration); } else { - span.startTime = convertClickhouseDateTime64ToJsDate(record.start_time); span.message = record.message; } } @@ -1262,6 +1336,11 @@ export class ClickhouseEventRepository implements IEventRepository { } } + // Always use the earliest start time found across all records + if (span && earliestStartTime) { + span.startTime = earliestStartTime; + } + return span; } @@ -1403,8 +1482,20 @@ export class ClickhouseEventRepository implements IEventRepository { } let span: SpanSummary | undefined; + let earliestStartTime: Date | undefined; for (const record of records) { + const recordStartTime = convertClickhouseDateTime64ToJsDate(record.start_time); + + // Track the earliest start time across all records, except for ancestor overrides and span events + if ( + record.kind !== "ANCESTOR_OVERRIDE" && + record.kind !== "SPAN_EVENT" && + (!earliestStartTime || recordStartTime < earliestStartTime) + ) { + earliestStartTime = recordStartTime; + } + if (!span) { span = { id: spanId, @@ -1419,7 +1510,7 @@ export class ClickhouseEventRepository implements IEventRepository { isPartial: true, // Partial by default, can only be set to false isCancelled: false, isDebug: record.kind === "DEBUG_EVENT", - startTime: convertClickhouseDateTime64ToJsDate(record.start_time), + startTime: recordStartTime, level: kindToLevel(record.kind), events: [], }, @@ -1446,7 +1537,7 @@ export class ClickhouseEventRepository implements IEventRepository { // We need to add an event to the span span.data.events.push({ name: record.message, - time: convertClickhouseDateTime64ToJsDate(record.start_time), + time: recordStartTime, properties: parsedMetadata ?? {}, }); } @@ -1472,12 +1563,16 @@ export class ClickhouseEventRepository implements IEventRepository { span.data.duration = typeof record.duration === "number" ? record.duration : Number(record.duration); } else { - span.data.startTime = convertClickhouseDateTime64ToJsDate(record.start_time); span.data.message = record.message; } } } + // Always use the earliest start time found across all records + if (span && earliestStartTime) { + span.data.startTime = earliestStartTime; + } + return span; } @@ -1643,8 +1738,20 @@ export class ClickhouseEventRepository implements IEventRepository { } let span: SpanDetailedSummary | undefined; + let earliestStartTime: Date | undefined; for (const record of records) { + const recordStartTime = convertClickhouseDateTime64ToJsDate(record.start_time); + + // Track the earliest start time across all records + if ( + record.kind !== "ANCESTOR_OVERRIDE" && + record.kind !== "SPAN_EVENT" && + (!earliestStartTime || recordStartTime < earliestStartTime) + ) { + earliestStartTime = recordStartTime; + } + if (!span) { span = { id: spanId, @@ -1658,7 +1765,7 @@ export class ClickhouseEventRepository implements IEventRepository { isError: false, isPartial: true, // Partial by default, can only be set to false isCancelled: false, - startTime: convertClickhouseDateTime64ToJsDate(record.start_time), + startTime: recordStartTime, level: kindToLevel(record.kind), events: [], }, @@ -1686,7 +1793,7 @@ export class ClickhouseEventRepository implements IEventRepository { // We need to add an event to the span span.data.events.push({ name: record.message, - time: convertClickhouseDateTime64ToJsDate(record.start_time), + time: recordStartTime, properties: parsedMetadata ?? {}, }); } @@ -1708,12 +1815,16 @@ export class ClickhouseEventRepository implements IEventRepository { span.data.duration = typeof record.duration === "number" ? record.duration : Number(record.duration); } else { - span.data.startTime = convertClickhouseDateTime64ToJsDate(record.start_time); span.data.message = record.message; } } } + // Always use the earliest start time found across all records + if (span && earliestStartTime) { + span.data.startTime = earliestStartTime; + } + return span; } diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts index db9d0bfb37..36fb13c6e9 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts @@ -63,6 +63,7 @@ function initializeClickhouseRepository() { waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1", asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE, asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS, + startTimeMaxAgeMs: env.EVENTS_CLICKHOUSE_START_TIME_MAX_AGE_MS, version: "v1", }); diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 4deb1476f7..8ecfdb033e 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -30,7 +30,7 @@ export const helloWorldTask = task({ logger.debug("some log", { span }); }); - await setTimeout(payload.sleepFor ?? 180_000); + await setTimeout(payload.sleepFor ?? 5_000); if (payload.throwError) { throw new Error("Forced error to cause a retry");