From 9ea26921bfe20282c53d0d202e7f78fcfcd33d5c Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 1 Dec 2025 11:27:45 +0000 Subject: [PATCH] fix(clickhouse): partition by insertion date to prevent "Too many parts" errors when partitioning by start time --- apps/webapp/app/env.server.ts | 2 +- .../clickhouseEventRepository.server.ts | 80 +++++++++++++-- ...lickhouseEventRepositoryInstance.server.ts | 64 ++++++++++-- .../app/v3/eventRepository/index.server.ts | 33 ++++++- apps/webapp/app/v3/featureFlags.server.ts | 2 +- apps/webapp/app/v3/otlpExporter.server.ts | 11 ++- .../schema/010_add_task_events_v2.sql | 55 +++++++++++ .../011_add_task_events_v2_table_mvs.sql | 19 ++++ internal-packages/clickhouse/src/index.ts | 13 +++ .../clickhouse/src/taskEvents.ts | 99 +++++++++++++++++++ 10 files changed, 356 insertions(+), 22 deletions(-) create mode 100644 internal-packages/clickhouse/schema/010_add_task_events_v2.sql create mode 100644 internal-packages/clickhouse/schema/011_add_task_events_v2_table_mvs.sql diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 207a2bcfab..c1cf4f22ad 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1150,7 +1150,7 @@ const EnvironmentSchema = z EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE: z.coerce.number().int().default(10485760), EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS: z.coerce.number().int().default(5000), EVENT_REPOSITORY_CLICKHOUSE_ROLLOUT_PERCENT: z.coerce.number().optional(), - EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse"]).default("postgres"), + EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse", "clickhouse_v2"]).default("postgres"), EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000), EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000), EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000), diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts index 8bf841c0bc..1731937605 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts @@ -4,6 +4,7 @@ import type { TaskEventDetailsV1Result, TaskEventSummaryV1Result, TaskEventV1Input, + TaskEventV2Input, } from "@internal/clickhouse"; import { Attributes, startSpan, trace, Tracer } from "@internal/tracing"; import { createJsonErrorObject } from "@trigger.dev/core/v3/errors"; @@ -72,6 +73,12 @@ export type ClickhouseEventRepositoryConfig = { maximumTraceSummaryViewCount?: number; maximumTraceDetailedSummaryViewCount?: number; maximumLiveReloadingSetting?: number; + /** + * The version of the ClickHouse task_events table to use. + * - "v1": Uses task_events_v1 (partitioned by start_time) + * - "v2": Uses task_events_v2 (partitioned by inserted_at to avoid "too many parts" errors) + */ + version?: "v1" | "v2"; }; /** @@ -81,13 +88,15 @@ export type ClickhouseEventRepositoryConfig = { export class ClickhouseEventRepository implements IEventRepository { private _clickhouse: ClickHouse; private _config: ClickhouseEventRepositoryConfig; - private readonly _flushScheduler: DynamicFlushScheduler; + private readonly _flushScheduler: DynamicFlushScheduler; private _tracer: Tracer; + private _version: "v1" | "v2"; constructor(config: ClickhouseEventRepositoryConfig) { this._clickhouse = config.clickhouse; this._config = config; this._tracer = config.tracer ?? trace.getTracer("clickhouseEventRepo", "0.0.1"); + this._version = config.version ?? "v1"; this._flushScheduler = new DynamicFlushScheduler({ batchSize: config.batchSize ?? 1000, @@ -99,31 +108,42 @@ export class ClickhouseEventRepository implements IEventRepository { memoryPressureThreshold: 10000, loadSheddingThreshold: 10000, loadSheddingEnabled: false, - isDroppableEvent: (event: TaskEventV1Input) => { + isDroppableEvent: (event: TaskEventV1Input | TaskEventV2Input) => { // Only drop LOG events during load shedding return event.kind === "DEBUG_EVENT"; }, }); } + get version() { + return this._version; + } + get maximumLiveReloadingSetting() { return this._config.maximumLiveReloadingSetting ?? 1000; } - async #flushBatch(flushId: string, events: TaskEventV1Input[]) { + async #flushBatch(flushId: string, events: (TaskEventV1Input | TaskEventV2Input)[]) { await startSpan(this._tracer, "flushBatch", async (span) => { span.setAttribute("flush_id", flushId); span.setAttribute("event_count", events.length); + span.setAttribute("version", this._version); const firstEvent = events[0]; if (firstEvent) { logger.debug("ClickhouseEventRepository.flushBatch first event", { event: firstEvent, + version: this._version, }); } - const [insertError, insertResult] = await this._clickhouse.taskEvents.insert(events, { + const insertFn = + this._version === "v2" + ? this._clickhouse.taskEventsV2.insert + : this._clickhouse.taskEvents.insert; + + const [insertError, insertResult] = await insertFn(events, { params: { clickhouse_settings: this.#getClickhouseInsertSettings(), }, @@ -136,6 +156,7 @@ export class ClickhouseEventRepository implements IEventRepository { logger.info("ClickhouseEventRepository.flushBatch Inserted batch into clickhouse", { events: events.length, insertResult, + version: this._version, }); this.#publishToRedis(events); @@ -155,7 +176,7 @@ export class ClickhouseEventRepository implements IEventRepository { } } - async #publishToRedis(events: TaskEventV1Input[]) { + async #publishToRedis(events: (TaskEventV1Input | TaskEventV2Input)[]) { if (events.length === 0) return; await tracePubSub.publish(events.map((e) => e.trace_id)); } @@ -960,7 +981,10 @@ export class ClickhouseEventRepository implements IEventRepository { ): Promise { const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000); - const queryBuilder = this._clickhouse.taskEvents.traceSummaryQueryBuilder(); + const queryBuilder = + this._version === "v2" + ? this._clickhouse.taskEventsV2.traceSummaryQueryBuilder() + : this._clickhouse.taskEvents.traceSummaryQueryBuilder(); queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); queryBuilder.where("trace_id = {traceId: String}", { traceId }); @@ -974,6 +998,14 @@ export class ClickhouseEventRepository implements IEventRepository { }); } + // For v2, add inserted_at filtering for partition pruning + if (this._version === "v2") { + queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", { + insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer), + }); + // No upper bound on inserted_at - we want all events inserted up to now + } + if (options?.includeDebugLogs === false) { queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" }); } @@ -1058,7 +1090,10 @@ export class ClickhouseEventRepository implements IEventRepository { ): Promise { const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000); - const queryBuilder = this._clickhouse.taskEvents.spanDetailsQueryBuilder(); + const queryBuilder = + this._version === "v2" + ? this._clickhouse.taskEventsV2.spanDetailsQueryBuilder() + : this._clickhouse.taskEvents.spanDetailsQueryBuilder(); queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); queryBuilder.where("trace_id = {traceId: String}", { traceId }); @@ -1073,6 +1108,13 @@ export class ClickhouseEventRepository implements IEventRepository { }); } + // For v2, add inserted_at filtering for partition pruning + if (this._version === "v2") { + queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", { + insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer), + }); + } + queryBuilder.orderBy("start_time ASC"); const [queryError, records] = await queryBuilder.execute(); @@ -1477,7 +1519,10 @@ export class ClickhouseEventRepository implements IEventRepository { ): Promise { const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000); - const queryBuilder = this._clickhouse.taskEvents.traceDetailedSummaryQueryBuilder(); + const queryBuilder = + this._version === "v2" + ? this._clickhouse.taskEventsV2.traceDetailedSummaryQueryBuilder() + : this._clickhouse.taskEvents.traceDetailedSummaryQueryBuilder(); queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); queryBuilder.where("trace_id = {traceId: String}", { traceId }); @@ -1491,6 +1536,13 @@ export class ClickhouseEventRepository implements IEventRepository { }); } + // For v2, add inserted_at filtering for partition pruning + if (this._version === "v2") { + queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", { + insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer), + }); + } + if (options?.includeDebugLogs === false) { queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" }); } @@ -1675,7 +1727,10 @@ export class ClickhouseEventRepository implements IEventRepository { ): Promise { const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000); - const queryBuilder = this._clickhouse.taskEvents.traceSummaryQueryBuilder(); + const queryBuilder = + this._version === "v2" + ? this._clickhouse.taskEventsV2.traceSummaryQueryBuilder() + : this._clickhouse.taskEvents.traceSummaryQueryBuilder(); queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); queryBuilder.where("trace_id = {traceId: String}", { traceId }); @@ -1690,6 +1745,13 @@ export class ClickhouseEventRepository implements IEventRepository { }); } + // For v2, add inserted_at filtering for partition pruning + if (this._version === "v2") { + queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", { + insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer), + }); + } + queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" }); queryBuilder.orderBy("start_time ASC"); diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts index 4c7945e804..db9d0bfb37 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts @@ -8,7 +8,12 @@ export const clickhouseEventRepository = singleton( initializeClickhouseRepository ); -function initializeClickhouseRepository() { +export const clickhouseEventRepositoryV2 = singleton( + "clickhouseEventRepositoryV2", + initializeClickhouseRepositoryV2 +); + +function getClickhouseClient() { if (!env.EVENTS_CLICKHOUSE_URL) { throw new Error("EVENTS_CLICKHOUSE_URL is not set"); } @@ -16,12 +21,7 @@ function initializeClickhouseRepository() { const url = new URL(env.EVENTS_CLICKHOUSE_URL); url.searchParams.delete("secure"); - const safeUrl = new URL(url.toString()); - safeUrl.password = "redacted"; - - console.log("🗃️ Initializing Clickhouse event repository", { url: safeUrl.toString() }); - - const clickhouse = new ClickHouse({ + return new ClickHouse({ url: url.toString(), name: "task-events", keepAlive: { @@ -34,6 +34,55 @@ function initializeClickhouseRepository() { }, maxOpenConnections: env.EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS, }); +} + +function initializeClickhouseRepository() { + if (!env.EVENTS_CLICKHOUSE_URL) { + throw new Error("EVENTS_CLICKHOUSE_URL is not set"); + } + + const url = new URL(env.EVENTS_CLICKHOUSE_URL); + url.searchParams.delete("secure"); + + const safeUrl = new URL(url.toString()); + safeUrl.password = "redacted"; + + console.log("🗃️ Initializing Clickhouse event repository (v1)", { url: safeUrl.toString() }); + + const clickhouse = getClickhouseClient(); + + const repository = new ClickhouseEventRepository({ + clickhouse: clickhouse, + batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE, + flushInterval: env.EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS, + maximumTraceSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT, + maximumTraceDetailedSummaryViewCount: + env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT, + maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING, + insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY, + waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1", + asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE, + asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS, + version: "v1", + }); + + return repository; +} + +function initializeClickhouseRepositoryV2() { + if (!env.EVENTS_CLICKHOUSE_URL) { + throw new Error("EVENTS_CLICKHOUSE_URL is not set"); + } + + const url = new URL(env.EVENTS_CLICKHOUSE_URL); + url.searchParams.delete("secure"); + + const safeUrl = new URL(url.toString()); + safeUrl.password = "redacted"; + + console.log("🗃️ Initializing Clickhouse event repository (v2)", { url: safeUrl.toString() }); + + const clickhouse = getClickhouseClient(); const repository = new ClickhouseEventRepository({ clickhouse: clickhouse, @@ -47,6 +96,7 @@ function initializeClickhouseRepository() { waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1", asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE, asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS, + version: "v2", }); return repository; diff --git a/apps/webapp/app/v3/eventRepository/index.server.ts b/apps/webapp/app/v3/eventRepository/index.server.ts index 9f06ec1bc3..e4ef0a333e 100644 --- a/apps/webapp/app/v3/eventRepository/index.server.ts +++ b/apps/webapp/app/v3/eventRepository/index.server.ts @@ -1,6 +1,9 @@ import { env } from "~/env.server"; import { eventRepository } from "./eventRepository.server"; -import { clickhouseEventRepository } from "./clickhouseEventRepositoryInstance.server"; +import { + clickhouseEventRepository, + clickhouseEventRepositoryV2, +} from "./clickhouseEventRepositoryInstance.server"; import { IEventRepository, TraceEventOptions } from "./eventRepository.types"; import { $replica, prisma } from "~/db.server"; import { logger } from "~/services/logger.server"; @@ -10,6 +13,10 @@ import { getTaskEventStore } from "../taskEventStore.server"; export function resolveEventRepositoryForStore(store: string | undefined): IEventRepository { const taskEventStore = store ?? env.EVENT_REPOSITORY_DEFAULT_STORE; + if (taskEventStore === "clickhouse_v2") { + return clickhouseEventRepositoryV2; + } + if (taskEventStore === "clickhouse") { return clickhouseEventRepository; } @@ -22,6 +29,9 @@ export async function getEventRepository( parentStore: string | undefined ): Promise<{ repository: IEventRepository; store: string }> { if (typeof parentStore === "string") { + if (parentStore === "clickhouse_v2") { + return { repository: clickhouseEventRepositoryV2, store: "clickhouse_v2" }; + } if (parentStore === "clickhouse") { return { repository: clickhouseEventRepository, store: "clickhouse" }; } else { @@ -31,6 +41,10 @@ export async function getEventRepository( const taskEventRepository = await resolveTaskEventRepositoryFlag(featureFlags); + if (taskEventRepository === "clickhouse_v2") { + return { repository: clickhouseEventRepositoryV2, store: "clickhouse_v2" }; + } + if (taskEventRepository === "clickhouse") { return { repository: clickhouseEventRepository, store: "clickhouse" }; } @@ -42,6 +56,9 @@ export async function getV3EventRepository( parentStore: string | undefined ): Promise<{ repository: IEventRepository; store: string }> { if (typeof parentStore === "string") { + if (parentStore === "clickhouse_v2") { + return { repository: clickhouseEventRepositoryV2, store: "clickhouse_v2" }; + } if (parentStore === "clickhouse") { return { repository: clickhouseEventRepository, store: "clickhouse" }; } else { @@ -49,7 +66,9 @@ export async function getV3EventRepository( } } - if (env.EVENT_REPOSITORY_DEFAULT_STORE === "clickhouse") { + if (env.EVENT_REPOSITORY_DEFAULT_STORE === "clickhouse_v2") { + return { repository: clickhouseEventRepositoryV2, store: "clickhouse_v2" }; + } else if (env.EVENT_REPOSITORY_DEFAULT_STORE === "clickhouse") { return { repository: clickhouseEventRepository, store: "clickhouse" }; } else { return { repository: eventRepository, store: getTaskEventStore() }; @@ -58,13 +77,17 @@ export async function getV3EventRepository( async function resolveTaskEventRepositoryFlag( featureFlags: Record | undefined -): Promise<"clickhouse" | "postgres"> { +): Promise<"clickhouse" | "clickhouse_v2" | "postgres"> { const flag = await flags({ key: FEATURE_FLAG.taskEventRepository, defaultValue: env.EVENT_REPOSITORY_DEFAULT_STORE, overrides: featureFlags, }); + if (flag === "clickhouse_v2") { + return "clickhouse_v2"; + } + if (flag === "clickhouse") { return "clickhouse"; } @@ -75,6 +98,10 @@ async function resolveTaskEventRepositoryFlag( const randomNumber = Math.random(); if (randomNumber < rolloutPercent) { + // Use the default store when rolling out (could be clickhouse or clickhouse_v2) + if (env.EVENT_REPOSITORY_DEFAULT_STORE === "clickhouse_v2") { + return "clickhouse_v2"; + } return "clickhouse"; } } diff --git a/apps/webapp/app/v3/featureFlags.server.ts b/apps/webapp/app/v3/featureFlags.server.ts index 72c4f67593..6e00e7772c 100644 --- a/apps/webapp/app/v3/featureFlags.server.ts +++ b/apps/webapp/app/v3/featureFlags.server.ts @@ -10,7 +10,7 @@ export const FEATURE_FLAG = { const FeatureFlagCatalog = { [FEATURE_FLAG.defaultWorkerInstanceGroupId]: z.string(), [FEATURE_FLAG.runsListRepository]: z.enum(["clickhouse", "postgres"]), - [FEATURE_FLAG.taskEventRepository]: z.enum(["clickhouse", "postgres"]), + [FEATURE_FLAG.taskEventRepository]: z.enum(["clickhouse", "clickhouse_v2", "postgres"]), }; type FeatureFlagKey = keyof typeof FeatureFlagCatalog; diff --git a/apps/webapp/app/v3/otlpExporter.server.ts b/apps/webapp/app/v3/otlpExporter.server.ts index 8c60e72ac4..f7337b3b16 100644 --- a/apps/webapp/app/v3/otlpExporter.server.ts +++ b/apps/webapp/app/v3/otlpExporter.server.ts @@ -17,7 +17,10 @@ import { } from "@trigger.dev/otlp-importer"; import { logger } from "~/services/logger.server"; import { ClickhouseEventRepository } from "./eventRepository/clickhouseEventRepository.server"; -import { clickhouseEventRepository } from "./eventRepository/clickhouseEventRepositoryInstance.server"; +import { + clickhouseEventRepository, + clickhouseEventRepositoryV2, +} from "./eventRepository/clickhouseEventRepositoryInstance.server"; import { generateSpanId } from "./eventRepository/common.server"; import { EventRepository, eventRepository } from "./eventRepository/eventRepository.server"; import type { @@ -38,6 +41,7 @@ class OTLPExporter { constructor( private readonly _eventRepository: EventRepository, private readonly _clickhouseEventRepository: ClickhouseEventRepository, + private readonly _clickhouseEventRepositoryV2: ClickhouseEventRepository, private readonly _verbose: boolean, private readonly _spanAttributeValueLengthLimit: number ) { @@ -111,6 +115,10 @@ class OTLPExporter { return this._clickhouseEventRepository; } + if (store === "clickhouse_v2") { + return this._clickhouseEventRepositoryV2; + } + return this._eventRepository; } @@ -886,6 +894,7 @@ function initializeOTLPExporter() { return new OTLPExporter( eventRepository, clickhouseEventRepository, + clickhouseEventRepositoryV2, process.env.OTLP_EXPORTER_VERBOSE === "1", process.env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT ? parseInt(process.env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT, 10) diff --git a/internal-packages/clickhouse/schema/010_add_task_events_v2.sql b/internal-packages/clickhouse/schema/010_add_task_events_v2.sql new file mode 100644 index 0000000000..04279b80cb --- /dev/null +++ b/internal-packages/clickhouse/schema/010_add_task_events_v2.sql @@ -0,0 +1,55 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS trigger_dev.task_events_v2 +( + -- This the main "tenant" ID + environment_id String, + -- The organization ID here so we can do MV rollups of usage + organization_id String, + -- The project ID here so we can do MV rollups of usage + project_id String, + -- The task slug (e.g. "my-task") + task_identifier String CODEC(ZSTD(1)), + -- The non-friendly ID for the run + run_id String CODEC(ZSTD(1)), + -- nanoseconds since the epoch + start_time DateTime64(9) CODEC(Delta(8), ZSTD(1)), + trace_id String CODEC(ZSTD(1)), + span_id String CODEC(ZSTD(1)), + -- will be an empty string for root spans + parent_span_id String CODEC(ZSTD(1)), + -- Log body, event name, or span name + message String CODEC(ZSTD(1)), + -- this is the new level column, can be + -- SPAN, SPAN_EVENT, DEBUG_EVENT, LOG_DEBUG, LOG_LOG, LOG_SUCCESS, LOG_INFO, LOG_WARN, LOG_ERROR, ANCESTOR_OVERRIDE + kind LowCardinality(String) CODEC(ZSTD(1)), + -- isError, isPartial, isCancelled will now be in this status column + -- OK, ERROR, PARTIAL, CANCELLED + status LowCardinality(String) CODEC(ZSTD(1)), + -- span/log/event attributes and resource attributes + -- includes error attributes, gen_ai attributes, and other attributes + attributes JSON CODEC(ZSTD(1)), + attributes_text String MATERIALIZED toJSONString(attributes), + -- This is the metadata column, includes style for styling the event in the UI + -- is a JSON stringified object, e.g. {"style":{"icon":"play","variant":"primary"},"error":{"message":"Error message","attributes":{"error.type":"ErrorType","error.code":"123"}}} + metadata String CODEC(ZSTD(1)), + -- nanoseconds since the start time, only non-zero for spans + duration UInt64 CODEC(ZSTD(1)), + -- The TTL for the event, will be deleted 7 days after the event expires + expires_at DateTime64(3), + -- NEW: Insert timestamp for partitioning (avoids "too many parts" errors from late-arriving events) + inserted_at DateTime64(3) DEFAULT now64(3), + + INDEX idx_run_id run_id TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_span_id span_id TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_duration duration TYPE minmax GRANULARITY 1, + INDEX idx_attributes_text attributes_text TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8 +) +ENGINE = MergeTree +PARTITION BY toDate(inserted_at) +ORDER BY (environment_id, toUnixTimestamp(start_time), trace_id) +TTL toDateTime(expires_at) + INTERVAL 7 DAY +SETTINGS ttl_only_drop_parts = 1; + +-- +goose Down +DROP TABLE IF EXISTS trigger_dev.task_events_v2; + diff --git a/internal-packages/clickhouse/schema/011_add_task_events_v2_table_mvs.sql b/internal-packages/clickhouse/schema/011_add_task_events_v2_table_mvs.sql new file mode 100644 index 0000000000..33f19926a3 --- /dev/null +++ b/internal-packages/clickhouse/schema/011_add_task_events_v2_table_mvs.sql @@ -0,0 +1,19 @@ +-- +goose Up +-- Create materialized views for task_events_v2 table (partitioned by inserted_at) +-- These write to the same target tables as the v1 MVs so usage is aggregated across both tables + +CREATE MATERIALIZED VIEW IF NOT EXISTS trigger_dev.mv_task_event_v2_usage_by_minute +TO trigger_dev.task_event_usage_by_minute_v1 AS +SELECT + organization_id, + project_id, + environment_id, + toStartOfMinute(start_time) AS bucket_start, + count() AS event_count +FROM trigger_dev.task_events_v2 +WHERE kind != 'DEBUG_EVENT' AND kind != 'ANCESTOR_OVERRIDE' AND status != 'PARTIAL' +GROUP BY organization_id, project_id, environment_id, bucket_start; + +-- +goose Down +DROP VIEW IF EXISTS trigger_dev.mv_task_event_v2_usage_by_minute; + diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index 294c11867f..4aceeb92d8 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -15,9 +15,13 @@ import { } from "./taskRuns.js"; import { getSpanDetailsQueryBuilder, + getSpanDetailsQueryBuilderV2, getTraceDetailedSummaryQueryBuilder, + getTraceDetailedSummaryQueryBuilderV2, getTraceSummaryQueryBuilder, + getTraceSummaryQueryBuilderV2, insertTaskEvents, + insertTaskEventsV2, } from "./taskEvents.js"; import { Logger, type LogLevel } from "@trigger.dev/core/logger"; import type { Agent as HttpAgent } from "http"; @@ -171,4 +175,13 @@ export class ClickHouse { spanDetailsQueryBuilder: getSpanDetailsQueryBuilder(this.reader), }; } + + get taskEventsV2() { + return { + insert: insertTaskEventsV2(this.writer), + traceSummaryQueryBuilder: getTraceSummaryQueryBuilderV2(this.reader), + traceDetailedSummaryQueryBuilder: getTraceDetailedSummaryQueryBuilderV2(this.reader), + spanDetailsQueryBuilder: getSpanDetailsQueryBuilderV2(this.reader), + }; + } } diff --git a/internal-packages/clickhouse/src/taskEvents.ts b/internal-packages/clickhouse/src/taskEvents.ts index dae2e4fbd0..d8c1b8b7f6 100644 --- a/internal-packages/clickhouse/src/taskEvents.ts +++ b/internal-packages/clickhouse/src/taskEvents.ts @@ -131,3 +131,102 @@ export function getSpanDetailsQueryBuilder(ch: ClickhouseReader, settings?: Clic settings, }); } + +// ============================================================================ +// V2 Table Functions (partitioned by inserted_at instead of start_time) +// ============================================================================ + +export const TaskEventV2Input = z.object({ + environment_id: z.string(), + organization_id: z.string(), + project_id: z.string(), + task_identifier: z.string(), + run_id: z.string(), + start_time: z.string(), + duration: z.string(), + trace_id: z.string(), + span_id: z.string(), + parent_span_id: z.string(), + message: z.string(), + kind: z.string(), + status: z.string(), + attributes: z.unknown(), + metadata: z.string(), + expires_at: z.string(), + // inserted_at has a default value in the table, so it's optional for inserts + inserted_at: z.string().optional(), +}); + +export type TaskEventV2Input = z.input; + +export function insertTaskEventsV2(ch: ClickhouseWriter, settings?: ClickHouseSettings) { + return ch.insertUnsafe({ + name: "insertTaskEventsV2", + table: "trigger_dev.task_events_v2", + settings: { + enable_json_type: 1, + type_json_skip_duplicated_paths: 1, + input_format_json_throw_on_bad_escape_sequence: 0, + input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects: 1, + ...settings, + }, + }); +} + +export function getTraceSummaryQueryBuilderV2( + ch: ClickhouseReader, + settings?: ClickHouseSettings +) { + return ch.queryBuilderFast({ + name: "getTraceEventsV2", + table: "trigger_dev.task_events_v2", + columns: [ + "span_id", + "parent_span_id", + "run_id", + "start_time", + "duration", + "status", + "kind", + "metadata", + { name: "message", expression: "LEFT(message, 256)" }, + ], + settings, + }); +} + +export function getTraceDetailedSummaryQueryBuilderV2( + ch: ClickhouseReader, + settings?: ClickHouseSettings +) { + return ch.queryBuilderFast({ + name: "getTaskEventDetailedSummaryV2", + table: "trigger_dev.task_events_v2", + columns: [ + "span_id", + "parent_span_id", + "run_id", + "start_time", + "duration", + "status", + "kind", + "metadata", + { name: "message", expression: "LEFT(message, 256)" }, + "attributes_text", + ], + settings, + }); +} + +export function getSpanDetailsQueryBuilderV2( + ch: ClickhouseReader, + settings?: ClickHouseSettings +) { + return ch.queryBuilder({ + name: "getSpanDetailsV2", + baseQuery: + "SELECT span_id, parent_span_id, start_time, duration, status, kind, metadata, message, attributes_text FROM trigger_dev.task_events_v2", + schema: TaskEventDetailsV1Result, + settings, + }); +}