diff --git a/.server-changes/prisma-span-datasource-attribute.md b/.server-changes/prisma-span-datasource-attribute.md new file mode 100644 index 00000000000..86507b89790 --- /dev/null +++ b/.server-changes/prisma-span-datasource-attribute.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Tag Prisma spans with `db.datasource: "writer" | "replica"` so monitors and trace queries can distinguish the writer pool from the replica pool. Applies to all `prisma:engine:*` spans (including `prisma:engine:connection` used by the connection-pool monitors) and the outer `prisma:client:operation` span. diff --git a/apps/webapp/app/db.server.ts b/apps/webapp/app/db.server.ts index 4668b58fb02..96f6307f576 100644 --- a/apps/webapp/app/db.server.ts +++ b/apps/webapp/app/db.server.ts @@ -13,8 +13,8 @@ import { env } from "./env.server"; import { logger } from "./services/logger.server"; import { isValidDatabaseUrl } from "./utils/db"; import { singleton } from "./utils/singleton"; -import { startActiveSpan } from "./v3/tracer.server"; -import { Span } from "@opentelemetry/api"; +import { DATASOURCE_CONTEXT_KEY, startActiveSpan } from "./v3/tracer.server"; +import { context, Span, trace } from "@opentelemetry/api"; import { queryPerformanceMonitor } from "./utils/queryPerformanceMonitor.server"; export type { @@ -98,12 +98,30 @@ export async function $transaction( export { Prisma }; -export const prisma = singleton("prisma", getClient); +function tagDatasource( + datasource: "writer" | "replica", + client: T +): T { + return client.$extends({ + name: "datasource-tagger", + query: { + $allOperations: ({ query, args }) => { + trace.getActiveSpan()?.setAttribute("db.datasource", datasource); + return context.with( + context.active().setValue(DATASOURCE_CONTEXT_KEY, datasource), + async () => await query(args) + ); + }, + }, + }) as unknown as T; +} -export const $replica: PrismaReplicaClient = singleton( - "replica", - () => getReplicaClient() ?? prisma -); +export const prisma = singleton("prisma", () => tagDatasource("writer", getClient())); + +export const $replica: PrismaReplicaClient = singleton("replica", () => { + const replica = getReplicaClient(); + return replica ? tagDatasource("replica", replica) : prisma; +}); function getClient() { const { DATABASE_URL } = process.env; diff --git a/apps/webapp/app/v3/tracer.server.ts b/apps/webapp/app/v3/tracer.server.ts index 71e14521e50..2ce5aa275c7 100644 --- a/apps/webapp/app/v3/tracer.server.ts +++ b/apps/webapp/app/v3/tracer.server.ts @@ -1,6 +1,7 @@ import { type Attributes, type Context, + createContextKey, DiagConsoleLogger, DiagLogLevel, type Link, @@ -61,6 +62,24 @@ import { performance } from "node:perf_hooks"; export const SEMINTATTRS_FORCE_RECORDING = "forceRecording"; +export const DATASOURCE_CONTEXT_KEY = createContextKey("trigger.db.datasource"); + +class DatasourceAttributeSpanProcessor implements SpanProcessor { + onStart(span: Span, parentContext: Context): void { + const ds = parentContext.getValue(DATASOURCE_CONTEXT_KEY); + if (typeof ds === "string") { + span.setAttribute("db.datasource", ds); + } + } + onEnd(): void {} + shutdown(): Promise { + return Promise.resolve(); + } + forceFlush(): Promise { + return Promise.resolve(); + } +} + class CustomWebappSampler implements Sampler { constructor(private readonly _baseSampler: Sampler) {} @@ -205,7 +224,7 @@ function setupTelemetry() { const samplingRate = 1.0 / Math.max(parseInt(env.INTERNAL_OTEL_TRACE_SAMPLING_RATE, 10), 1); - const spanProcessors: SpanProcessor[] = []; + const spanProcessors: SpanProcessor[] = [new DatasourceAttributeSpanProcessor()]; if (env.INTERNAL_OTEL_TRACE_EXPORTER_URL) { const headers = parseInternalTraceHeaders() ?? {}; diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 5e04f57144f..a3070d56339 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -1,4 +1,4 @@ -import { batch, logger, task, tasks, timeout, wait } from "@trigger.dev/sdk"; +import { batch, logger, task, tasks, timeout, wait, waitUntil } from "@trigger.dev/sdk"; import { setTimeout } from "timers/promises"; import { ResourceMonitor } from "../resourceMonitor.js"; import { fixedLengthTask } from "./batches.js"; @@ -21,6 +21,10 @@ export const helloWorldTask = task({ env: process.env, }); + waitUntil((async () => { + logger.info("Hello, world from the waitUntil hook", { payload }); + })()); + logger.debug("debug: Hello, worlds!", { payload }); logger.info("info: Hello, world!", { payload }); logger.log("log: Hello, world!", { payload });