Skip to content

Commit cdd1a88

Browse files
authored
fix(otel): prevent spans with negative durations (#2582)
1 parent 200b735 commit cdd1a88

File tree

4 files changed

+55
-15
lines changed

4 files changed

+55
-15
lines changed

apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ export class ClickhouseEventRepository implements IEventRepository {
156156
task_identifier: event.taskSlug,
157157
run_id: event.runId,
158158
start_time: formatClickhouseDate64NanosecondsEpochString(event.startTime.toString()),
159-
duration: (event.duration ?? 0).toString(),
159+
duration: formatClickhouseUnsignedIntegerString(event.duration ?? 0),
160160
trace_id: event.traceId,
161161
span_id: event.spanId,
162162
parent_span_id: event.parentId ?? "",
@@ -432,7 +432,9 @@ export class ClickhouseEventRepository implements IEventRepository {
432432
const startTime = options.startTime ?? getNowInNanoseconds();
433433
const duration =
434434
options.duration ??
435-
(options.endTime ? calculateDurationFromStart(startTime, options.endTime) : 100);
435+
(options.endTime
436+
? calculateDurationFromStart(startTime, options.endTime, 100 * 1_000_000)
437+
: 100);
436438

437439
const traceId = propagatedContext?.traceparent?.traceId ?? generateTraceId();
438440
const parentId = options.parentId ?? propagatedContext?.traceparent?.spanId;
@@ -460,7 +462,7 @@ export class ClickhouseEventRepository implements IEventRepository {
460462
task_identifier: options.taskSlug,
461463
run_id: options.attributes.runId,
462464
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
463-
duration: duration.toString(),
465+
duration: formatClickhouseUnsignedIntegerString(duration),
464466
trace_id: traceId,
465467
span_id: spanId,
466468
parent_span_id: parentId ?? "",
@@ -561,7 +563,7 @@ export class ClickhouseEventRepository implements IEventRepository {
561563
task_identifier: options.taskSlug,
562564
run_id: options.attributes.runId,
563565
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
564-
duration: String(options.incomplete ? 0 : duration),
566+
duration: formatClickhouseUnsignedIntegerString(options.incomplete ? 0 : duration),
565567
trace_id: traceId,
566568
span_id: spanId,
567569
parent_span_id: parentId ?? "",
@@ -595,7 +597,7 @@ export class ClickhouseEventRepository implements IEventRepository {
595597
task_identifier: options.taskSlug,
596598
run_id: options.attributes.runId,
597599
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
598-
duration: String(options.incomplete ? 0 : duration),
600+
duration: formatClickhouseUnsignedIntegerString(options.incomplete ? 0 : duration),
599601
trace_id: traceId,
600602
span_id: spanId,
601603
parent_span_id: parentId ?? "",
@@ -644,7 +646,9 @@ export class ClickhouseEventRepository implements IEventRepository {
644646
task_identifier: run.taskIdentifier,
645647
run_id: run.friendlyId,
646648
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
647-
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
649+
duration: formatClickhouseUnsignedIntegerString(
650+
calculateDurationFromStart(startTime, endTime ?? new Date())
651+
),
648652
trace_id: run.traceId,
649653
span_id: run.spanId,
650654
parent_span_id: run.parentSpanId ?? "",
@@ -692,7 +696,9 @@ export class ClickhouseEventRepository implements IEventRepository {
692696
task_identifier: run.taskIdentifier,
693697
run_id: blockedRun.friendlyId,
694698
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
695-
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
699+
duration: formatClickhouseUnsignedIntegerString(
700+
calculateDurationFromStart(startTime, endTime ?? new Date())
701+
),
696702
trace_id: blockedRun.traceId,
697703
span_id: spanId,
698704
parent_span_id: parentSpanId,
@@ -732,7 +738,9 @@ export class ClickhouseEventRepository implements IEventRepository {
732738
task_identifier: run.taskIdentifier,
733739
run_id: run.friendlyId,
734740
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
735-
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
741+
duration: formatClickhouseUnsignedIntegerString(
742+
calculateDurationFromStart(startTime, endTime ?? new Date())
743+
),
736744
trace_id: run.traceId,
737745
span_id: run.spanId,
738746
parent_span_id: run.parentSpanId ?? "",
@@ -778,7 +786,9 @@ export class ClickhouseEventRepository implements IEventRepository {
778786
task_identifier: run.taskIdentifier,
779787
run_id: run.friendlyId,
780788
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
781-
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
789+
duration: formatClickhouseUnsignedIntegerString(
790+
calculateDurationFromStart(startTime, endTime ?? new Date())
791+
),
782792
trace_id: run.traceId,
783793
span_id: run.spanId,
784794
parent_span_id: run.parentSpanId ?? "",
@@ -868,7 +878,9 @@ export class ClickhouseEventRepository implements IEventRepository {
868878
task_identifier: run.taskIdentifier,
869879
run_id: run.friendlyId,
870880
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
871-
duration: calculateDurationFromStart(startTime, cancelledAt).toString(),
881+
duration: formatClickhouseUnsignedIntegerString(
882+
calculateDurationFromStart(startTime, cancelledAt)
883+
),
872884
trace_id: run.traceId,
873885
span_id: run.spanId,
874886
parent_span_id: run.parentSpanId ?? "",
@@ -1841,3 +1853,15 @@ function convertClickhouseDate64NanosecondsEpochStringToBigInt(date: string): bi
18411853
const parts = date.split(".");
18421854
return BigInt(parts.join(""));
18431855
}
1856+
1857+
function formatClickhouseUnsignedIntegerString(value: number | bigint): string {
1858+
if (value < 0) {
1859+
return "0";
1860+
}
1861+
1862+
if (typeof value === "bigint") {
1863+
return value.toString();
1864+
}
1865+
1866+
return Math.floor(value).toString();
1867+
}

apps/webapp/app/v3/eventRepository/common.server.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,20 @@ export function getDateFromNanoseconds(nanoseconds: bigint): Date {
2828
return new Date(Number(nanoseconds) / 1_000_000);
2929
}
3030

31-
export function calculateDurationFromStart(startTime: bigint, endTime: Date = new Date()) {
31+
export function calculateDurationFromStart(
32+
startTime: bigint,
33+
endTime: Date = new Date(),
34+
minimumDuration?: number
35+
) {
3236
const $endtime = typeof endTime === "string" ? new Date(endTime) : endTime;
3337

34-
return Number(BigInt($endtime.getTime() * 1_000_000) - startTime);
38+
const duration = Number(BigInt($endtime.getTime() * 1_000_000) - startTime);
39+
40+
if (minimumDuration && duration < minimumDuration) {
41+
return minimumDuration;
42+
}
43+
44+
return duration;
3545
}
3646

3747
export function calculateDurationFromStartJsDate(startTime: Date, endTime: Date = new Date()) {

apps/webapp/app/v3/eventRepository/index.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { env } from "~/env.server";
22
import { eventRepository } from "./eventRepository.server";
33
import { clickhouseEventRepository } from "./clickhouseEventRepositoryInstance.server";
44
import { IEventRepository, TraceEventOptions } from "./eventRepository.types";
5-
import { $replica } from "~/db.server";
5+
import { $replica, prisma } from "~/db.server";
66
import { logger } from "~/services/logger.server";
77
import { FEATURE_FLAG, flags } from "../featureFlags.server";
88
import { getTaskEventStore } from "../taskEventStore.server";
@@ -145,7 +145,7 @@ async function recordRunEvent(
145145
}
146146

147147
async function findRunForEventCreation(runId: string) {
148-
return $replica.taskRun.findFirst({
148+
return prisma.taskRun.findFirst({
149149
where: {
150150
id: runId,
151151
},

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,13 @@ export function registerRunEngineEventBusHandlers() {
403403

404404
engine.eventBus.on("runRetryScheduled", async ({ time, run, environment, retryAt }) => {
405405
try {
406-
let retryMessage = `Retry #${run.attemptNumber} delay`;
406+
if (retryAt && time && time >= retryAt) {
407+
return;
408+
}
409+
410+
let retryMessage = `Retry ${
411+
typeof run.attemptNumber === "number" ? `#${run.attemptNumber - 1}` : ""
412+
} delay`;
407413

408414
if (run.nextMachineAfterOOM) {
409415
retryMessage += ` after OOM`;

0 commit comments

Comments
 (0)