Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ const EnvironmentSchema = z
EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE: z.coerce.number().int().default(10485760),
EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS: z.coerce.number().int().default(5000),
EVENT_REPOSITORY_CLICKHOUSE_ROLLOUT_PERCENT: z.coerce.number().optional(),
EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse"]).default("postgres"),
EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse", "clickhouse_v2"]).default("postgres"),
EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000),
EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
TaskEventDetailsV1Result,
TaskEventSummaryV1Result,
TaskEventV1Input,
TaskEventV2Input,
} from "@internal/clickhouse";
import { Attributes, startSpan, trace, Tracer } from "@internal/tracing";
import { createJsonErrorObject } from "@trigger.dev/core/v3/errors";
Expand Down Expand Up @@ -72,6 +73,12 @@ export type ClickhouseEventRepositoryConfig = {
maximumTraceSummaryViewCount?: number;
maximumTraceDetailedSummaryViewCount?: number;
maximumLiveReloadingSetting?: number;
/**
* The version of the ClickHouse task_events table to use.
* - "v1": Uses task_events_v1 (partitioned by start_time)
* - "v2": Uses task_events_v2 (partitioned by inserted_at to avoid "too many parts" errors)
*/
version?: "v1" | "v2";
};

/**
Expand All @@ -81,13 +88,15 @@ export type ClickhouseEventRepositoryConfig = {
export class ClickhouseEventRepository implements IEventRepository {
private _clickhouse: ClickHouse;
private _config: ClickhouseEventRepositoryConfig;
private readonly _flushScheduler: DynamicFlushScheduler<TaskEventV1Input>;
private readonly _flushScheduler: DynamicFlushScheduler<TaskEventV1Input | TaskEventV2Input>;
private _tracer: Tracer;
private _version: "v1" | "v2";

constructor(config: ClickhouseEventRepositoryConfig) {
this._clickhouse = config.clickhouse;
this._config = config;
this._tracer = config.tracer ?? trace.getTracer("clickhouseEventRepo", "0.0.1");
this._version = config.version ?? "v1";

this._flushScheduler = new DynamicFlushScheduler({
batchSize: config.batchSize ?? 1000,
Expand All @@ -99,31 +108,42 @@ export class ClickhouseEventRepository implements IEventRepository {
memoryPressureThreshold: 10000,
loadSheddingThreshold: 10000,
loadSheddingEnabled: false,
isDroppableEvent: (event: TaskEventV1Input) => {
isDroppableEvent: (event: TaskEventV1Input | TaskEventV2Input) => {
// Only drop LOG events during load shedding
return event.kind === "DEBUG_EVENT";
},
});
}

get version() {
return this._version;
}

get maximumLiveReloadingSetting() {
return this._config.maximumLiveReloadingSetting ?? 1000;
}

async #flushBatch(flushId: string, events: TaskEventV1Input[]) {
async #flushBatch(flushId: string, events: (TaskEventV1Input | TaskEventV2Input)[]) {
await startSpan(this._tracer, "flushBatch", async (span) => {
span.setAttribute("flush_id", flushId);
span.setAttribute("event_count", events.length);
span.setAttribute("version", this._version);

const firstEvent = events[0];

if (firstEvent) {
logger.debug("ClickhouseEventRepository.flushBatch first event", {
event: firstEvent,
version: this._version,
});
}

const [insertError, insertResult] = await this._clickhouse.taskEvents.insert(events, {
const insertFn =
this._version === "v2"
? this._clickhouse.taskEventsV2.insert
: this._clickhouse.taskEvents.insert;

const [insertError, insertResult] = await insertFn(events, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
Expand All @@ -136,6 +156,7 @@ export class ClickhouseEventRepository implements IEventRepository {
logger.info("ClickhouseEventRepository.flushBatch Inserted batch into clickhouse", {
events: events.length,
insertResult,
version: this._version,
});

this.#publishToRedis(events);
Expand All @@ -155,7 +176,7 @@ export class ClickhouseEventRepository implements IEventRepository {
}
}

async #publishToRedis(events: TaskEventV1Input[]) {
async #publishToRedis(events: (TaskEventV1Input | TaskEventV2Input)[]) {
if (events.length === 0) return;
await tracePubSub.publish(events.map((e) => e.trace_id));
}
Expand Down Expand Up @@ -960,7 +981,10 @@ export class ClickhouseEventRepository implements IEventRepository {
): Promise<TraceSummary | undefined> {
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);

const queryBuilder = this._clickhouse.taskEvents.traceSummaryQueryBuilder();
const queryBuilder =
this._version === "v2"
? this._clickhouse.taskEventsV2.traceSummaryQueryBuilder()
: this._clickhouse.taskEvents.traceSummaryQueryBuilder();

queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
queryBuilder.where("trace_id = {traceId: String}", { traceId });
Expand All @@ -974,6 +998,14 @@ export class ClickhouseEventRepository implements IEventRepository {
});
}

// For v2, add inserted_at filtering for partition pruning
if (this._version === "v2") {
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
});
// No upper bound on inserted_at - we want all events inserted up to now
}

if (options?.includeDebugLogs === false) {
queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" });
}
Expand Down Expand Up @@ -1058,7 +1090,10 @@ export class ClickhouseEventRepository implements IEventRepository {
): Promise<SpanDetail | undefined> {
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);

const queryBuilder = this._clickhouse.taskEvents.spanDetailsQueryBuilder();
const queryBuilder =
this._version === "v2"
? this._clickhouse.taskEventsV2.spanDetailsQueryBuilder()
: this._clickhouse.taskEvents.spanDetailsQueryBuilder();

queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
queryBuilder.where("trace_id = {traceId: String}", { traceId });
Expand All @@ -1073,6 +1108,13 @@ export class ClickhouseEventRepository implements IEventRepository {
});
}

// For v2, add inserted_at filtering for partition pruning
if (this._version === "v2") {
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
});
}

queryBuilder.orderBy("start_time ASC");

const [queryError, records] = await queryBuilder.execute();
Expand Down Expand Up @@ -1477,7 +1519,10 @@ export class ClickhouseEventRepository implements IEventRepository {
): Promise<TraceDetailedSummary | undefined> {
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);

const queryBuilder = this._clickhouse.taskEvents.traceDetailedSummaryQueryBuilder();
const queryBuilder =
this._version === "v2"
? this._clickhouse.taskEventsV2.traceDetailedSummaryQueryBuilder()
: this._clickhouse.taskEvents.traceDetailedSummaryQueryBuilder();

queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
queryBuilder.where("trace_id = {traceId: String}", { traceId });
Expand All @@ -1491,6 +1536,13 @@ export class ClickhouseEventRepository implements IEventRepository {
});
}

// For v2, add inserted_at filtering for partition pruning
if (this._version === "v2") {
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
});
}

if (options?.includeDebugLogs === false) {
queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" });
}
Expand Down Expand Up @@ -1675,7 +1727,10 @@ export class ClickhouseEventRepository implements IEventRepository {
): Promise<RunPreparedEvent[]> {
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);

const queryBuilder = this._clickhouse.taskEvents.traceSummaryQueryBuilder();
const queryBuilder =
this._version === "v2"
? this._clickhouse.taskEventsV2.traceSummaryQueryBuilder()
: this._clickhouse.taskEvents.traceSummaryQueryBuilder();

queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
queryBuilder.where("trace_id = {traceId: String}", { traceId });
Expand All @@ -1690,6 +1745,13 @@ export class ClickhouseEventRepository implements IEventRepository {
});
}

// For v2, add inserted_at filtering for partition pruning
if (this._version === "v2") {
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
});
}

queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" });
queryBuilder.orderBy("start_time ASC");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ export const clickhouseEventRepository = singleton(
initializeClickhouseRepository
);

function initializeClickhouseRepository() {
export const clickhouseEventRepositoryV2 = singleton(
"clickhouseEventRepositoryV2",
initializeClickhouseRepositoryV2
);

function getClickhouseClient() {
if (!env.EVENTS_CLICKHOUSE_URL) {
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
}

const url = new URL(env.EVENTS_CLICKHOUSE_URL);
url.searchParams.delete("secure");

const safeUrl = new URL(url.toString());
safeUrl.password = "redacted";

console.log("🗃️ Initializing Clickhouse event repository", { url: safeUrl.toString() });

const clickhouse = new ClickHouse({
return new ClickHouse({
url: url.toString(),
name: "task-events",
keepAlive: {
Expand All @@ -34,6 +34,55 @@ function initializeClickhouseRepository() {
},
maxOpenConnections: env.EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
});
}

function initializeClickhouseRepository() {
if (!env.EVENTS_CLICKHOUSE_URL) {
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
}

const url = new URL(env.EVENTS_CLICKHOUSE_URL);
url.searchParams.delete("secure");

const safeUrl = new URL(url.toString());
safeUrl.password = "redacted";

console.log("🗃️ Initializing Clickhouse event repository (v1)", { url: safeUrl.toString() });

const clickhouse = getClickhouseClient();

const repository = new ClickhouseEventRepository({
clickhouse: clickhouse,
batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE,
flushInterval: env.EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS,
maximumTraceSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT,
maximumTraceDetailedSummaryViewCount:
env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT,
maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING,
insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY,
waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1",
asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE,
asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS,
version: "v1",
});

return repository;
}

function initializeClickhouseRepositoryV2() {
if (!env.EVENTS_CLICKHOUSE_URL) {
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
}

const url = new URL(env.EVENTS_CLICKHOUSE_URL);
url.searchParams.delete("secure");

const safeUrl = new URL(url.toString());
safeUrl.password = "redacted";

console.log("🗃️ Initializing Clickhouse event repository (v2)", { url: safeUrl.toString() });

const clickhouse = getClickhouseClient();

const repository = new ClickhouseEventRepository({
clickhouse: clickhouse,
Expand All @@ -47,6 +96,7 @@ function initializeClickhouseRepository() {
waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1",
asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE,
asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS,
version: "v2",
});

return repository;
Expand Down
Loading