diff --git a/packages/otel/src/instrumentations/fetch.test.ts b/packages/otel/src/instrumentations/fetch.test.ts index ca216149..504dc2d0 100644 --- a/packages/otel/src/instrumentations/fetch.test.ts +++ b/packages/otel/src/instrumentations/fetch.test.ts @@ -1,8 +1,4 @@ -import { http, HttpResponse } from 'msw' -import { setupServer } from 'msw/node' -import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, test } from 'vitest' -import { createTracerProvider } from '../bootstrap/main.ts' -import { shutdownTracers } from '../main.ts' +import { describe, expect, test } from 'vitest' import { FetchInstrumentation } from './fetch.ts' describe('header exclusion', () => { @@ -14,11 +10,7 @@ describe('header exclusion', () => { // eslint-disable-next-line @typescript-eslint/dot-notation const attributes = instrumentation['prepareHeaders']( 'request', - new Headers({ - a: 'a', - b: 'b', - authorization: 'secret', - }), + ['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)), ) expect(attributes).toEqual({ 'http.request.header.a': 'a', @@ -33,11 +25,7 @@ describe('header exclusion', () => { // eslint-disable-next-line @typescript-eslint/dot-notation const empty = everything['prepareHeaders']( 'request', - new Headers({ - a: 'a', - b: 'b', - authorization: 'secret', - }), + ['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)), ) expect(empty).toEqual({}) }) @@ -50,13 +38,9 @@ describe('header exclusion', () => { // eslint-disable-next-line @typescript-eslint/dot-notation const attributes = instrumentation['prepareHeaders']( 'request', - new Headers({ - a: 'a', - b: 'b', - authorization: 'a secret', - }), + ['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)), ) - expect(attributes['http.request.header.authorization']).not.toBe('a secret') + expect(attributes['http.request.header.authorization']).not.toBe('secret') expect(attributes['http.request.header.authorization']).toBeTypeOf('string') expect(attributes['http.request.header.a']).toBe('a') expect(attributes['http.request.header.b']).toBe('b') @@ -70,13 +54,9 @@ describe('header exclusion', () => { // eslint-disable-next-line @typescript-eslint/dot-notation const attributes = instrumentation['prepareHeaders']( 'request', - new Headers({ - a: 'a', - b: 'b', - authorization: 'a secret', - }), + ['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)), ) - expect(attributes['http.request.header.authorization']).not.toBe('a secret') + expect(attributes['http.request.header.authorization']).not.toBe('secret') expect(attributes['http.request.header.a']).not.toBe('a') expect(attributes['http.request.header.b']).not.toBe('b') expect(attributes['http.request.header.authorization']).toBeTypeOf('string') @@ -84,77 +64,3 @@ describe('header exclusion', () => { expect(attributes['http.request.header.b']).toBeTypeOf('string') }) }) - -describe('patched fetch', () => { - const server = setupServer( - http.get('http://localhost:3000/ok', () => HttpResponse.json({ message: 'ok' })), - http.post('http://localhost:3000/ok', () => HttpResponse.json({ message: 'ok' })), - ) - - beforeAll(() => { - server.listen({ onUnhandledRequest: 'error' }) - }) - - beforeEach(() => { - createTracerProvider({ - serviceName: 'test-service', - serviceVersion: '1.0.0', - deploymentEnvironment: 'test', - siteUrl: 'https://example.com', - siteId: '12345', - siteName: 'example', - instrumentations: [new FetchInstrumentation()], - }) - }) - - afterEach(async () => { - server.resetHandlers() - await shutdownTracers() - }) - - afterAll(() => { - server.close() - }) - - it('can GET url', async () => { - createTracerProvider({ - serviceName: 'test-service', - serviceVersion: '1.0.0', - deploymentEnvironment: 'test', - siteUrl: 'https://example.com', - siteId: '12345', - siteName: 'example', - instrumentations: [new FetchInstrumentation()], - }) - - await expect(fetch('http://localhost:3000/ok').then((r) => r.json())).resolves.toEqual({ message: 'ok' }) - }) - - it('can POST url', async () => { - await expect( - fetch('http://localhost:3000/ok', { - method: 'POST', - body: JSON.stringify({ hello: 'rabbit' }), - headers: { - 'Content-Type': 'application/json', - }, - }).then((r) => r.json()), - ).resolves.toEqual({ message: 'ok' }) - }) - - it('can GET request', async () => { - const req = new Request('http://localhost:3000/ok') - await expect(fetch(req).then((r) => r.json())).resolves.toEqual({ message: 'ok' }) - }) - - it('can POST request', async () => { - const req = new Request('http://localhost:3000/ok', { - method: 'POST', - body: JSON.stringify({ hello: 'rabbit' }), - headers: { - 'Content-Type': 'application/json', - }, - }) - await expect(fetch(req).then((r) => r.json())).resolves.toEqual({ message: 'ok' }) - }) -}) diff --git a/packages/otel/src/instrumentations/fetch.ts b/packages/otel/src/instrumentations/fetch.ts index 33c592a9..5305c480 100644 --- a/packages/otel/src/instrumentations/fetch.ts +++ b/packages/otel/src/instrumentations/fetch.ts @@ -1,11 +1,13 @@ +import * as diagnosticsChannel from 'diagnostics_channel' + import * as api from '@opentelemetry/api' import { SugaredTracer } from '@opentelemetry/api/experimental' import { _globalThis } from '@opentelemetry/core' import { InstrumentationConfig, type Instrumentation } from '@opentelemetry/instrumentation' export interface FetchInstrumentationConfig extends InstrumentationConfig { - getRequestAttributes?(headers: Request): api.Attributes - getResponseAttributes?(response: Response): api.Attributes + getRequestAttributes?(request: FetchRequest): api.Attributes + getResponseAttributes?(response: FetchResponse): api.Attributes skipURLs?: (string | RegExp)[] skipHeaders?: (string | RegExp)[] | true redactHeaders?: (string | RegExp)[] | true @@ -14,12 +16,15 @@ export interface FetchInstrumentationConfig extends InstrumentationConfig { export class FetchInstrumentation implements Instrumentation { instrumentationName = '@netlify/otel/instrumentation-fetch' instrumentationVersion = '1.0.0' - private originalFetch: typeof fetch | null = null private config: FetchInstrumentationConfig private provider?: api.TracerProvider + declare private _channelSubs: ListenerRecord[] + private _recordFromReq = new WeakMap() + constructor(config: FetchInstrumentationConfig = {}) { this.config = config + this._channelSubs = [] } getConfig(): FetchInstrumentationConfig { @@ -36,9 +41,10 @@ export class FetchInstrumentation implements Instrumentation { return this.provider } - private annotateFromRequest(span: api.Span, request: Request): void { + private annotateFromRequest(span: api.Span, request: FetchRequest): void { const extras = this.config.getRequestAttributes?.(request) ?? {} - const url = new URL(request.url) + const url = new URL(request.path, request.origin) + // these are based on @opentelemetry/semantic-convention 1.36 span.setAttributes({ ...extras, @@ -52,19 +58,25 @@ export class FetchInstrumentation implements Instrumentation { }) } - private annotateFromResponse(span: api.Span, response: Response): void { + private annotateFromResponse(span: api.Span, response: FetchResponse): void { const extras = this.config.getResponseAttributes?.(response) ?? {} // these are based on @opentelemetry/semantic-convention 1.36 span.setAttributes({ ...extras, - 'http.response.status_code': response.status, + 'http.response.status_code': response.statusCode, ...this.prepareHeaders('response', response.headers), }) - span.setStatus({ code: response.status >= 400 ? api.SpanStatusCode.ERROR : api.SpanStatusCode.UNSET }) + + span.setStatus({ + code: response.statusCode >= 400 ? api.SpanStatusCode.ERROR : api.SpanStatusCode.UNSET, + }) } - private prepareHeaders(type: 'request' | 'response', headers: Headers): api.Attributes { + private prepareHeaders( + type: 'request' | 'response', + headers: FetchRequest['headers'] | FetchResponse['headers'], + ): api.Attributes { if (this.config.skipHeaders === true) { return {} } @@ -74,8 +86,9 @@ export class FetchInstrumentation implements Instrumentation { const everythingSkipped = skips.some((skip) => everything.includes(skip.toString())) const attributes: api.Attributes = {} if (everythingSkipped) return attributes - const entries = headers.entries() - for (const [key, value] of entries) { + for (let idx = 0; idx < headers.length; idx = idx + 2) { + const key = headers[idx].toString().toLowerCase() + const value = headers[idx + 1].toString() if (skips.some((skip) => (typeof skip == 'string' ? skip == key : skip.test(key)))) { continue } @@ -92,6 +105,16 @@ export class FetchInstrumentation implements Instrumentation { return attributes } + private getRequestMethod(original: string): string { + const acceptedMethods = ['HEAD', 'GET', 'POST', 'PUT', 'PATCH', 'DELETE'] + + if (acceptedMethods.includes(original.toUpperCase())) { + return original.toUpperCase() + } + + return '_OTHER' + } + private getTracer(): SugaredTracer | undefined { if (!this.provider) { return undefined @@ -105,39 +128,110 @@ export class FetchInstrumentation implements Instrumentation { return new SugaredTracer(tracer) } - /** - * patch global fetch - */ enable(): void { - const originalFetch = _globalThis.fetch - this.originalFetch = originalFetch - _globalThis.fetch = async (resource: RequestInfo | URL, options?: RequestInit): Promise => { - const url = typeof resource === 'string' ? resource : resource instanceof URL ? resource.href : resource.url - const tracer = this.getTracer() - if ( - !tracer || - this.config.skipURLs?.some((skip) => (typeof skip == 'string' ? url.startsWith(skip) : skip.test(url))) - ) { - return await originalFetch(resource, options) - } + // Avoid to duplicate subscriptions + if (this._channelSubs.length > 0) return + + // https://undici.nodejs.org/#/docs/api/DiagnosticsChannel?id=diagnostics-channel-support + this.subscribe('undici:request:create', this.onRequestCreate.bind(this)) + this.subscribe('undici:request:headers', this.onRequestHeaders.bind(this)) + this.subscribe('undici:request:trailers', this.onRequestEnd.bind(this)) + this.subscribe('undici:request:error', this.onRequestError.bind(this)) + } - return tracer.withActiveSpan('fetch', async (span) => { - const request = new Request(resource, options) - this.annotateFromRequest(span, request) - const response = await originalFetch(request, options) - this.annotateFromResponse(span, response) - return response - }) - } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private subscribe(channelName: string, onMessage: (message: any, name: string | symbol) => void) { + diagnosticsChannel.subscribe(channelName, onMessage) + + const unsubscribe = () => diagnosticsChannel.unsubscribe(channelName, onMessage) + this._channelSubs.push({ name: channelName, unsubscribe }) } - /** - * unpatch global fetch - */ - disable(): void { - if (this.originalFetch) { - _globalThis.fetch = this.originalFetch - this.originalFetch = null + disable() { + this._channelSubs.forEach((sub) => { + sub.unsubscribe() + }) + this._channelSubs.length = 0 + } + + private onRequestCreate({ request }: { request: FetchRequest }): void { + const tracer = this.getTracer() + const url = new URL(request.path, request.origin) + + if ( + !tracer || + request.method === 'CONNECT' || + this.config.skipURLs?.some((skip) => (typeof skip == 'string' ? url.href.startsWith(skip) : skip.test(url.href))) + ) { + return } + + const span = tracer.startSpan( + this.getRequestMethod(request.method), + { + kind: api.SpanKind.CLIENT, + }, + api.context.active(), + ) + + this.annotateFromRequest(span, request) + + this._recordFromReq.set(request, span) + } + + private onRequestHeaders({ request, response }: { request: FetchRequest; response: FetchResponse }): void { + const span = this._recordFromReq.get(request) + if (!span) return + + this.annotateFromResponse(span, response) + } + + private onRequestError({ request, error }: { request: FetchRequest; error: Error }): void { + const span = this._recordFromReq.get(request) + if (!span) return + + span.recordException(error) + span.setStatus({ + code: api.SpanStatusCode.ERROR, + message: error.message, + }) + + span.end() + this._recordFromReq.delete(request) } + + private onRequestEnd({ request }: { request: FetchRequest; response: FetchResponse }): void { + const span = this._recordFromReq.get(request) + if (!span) return + + span.end() + this._recordFromReq.delete(request) + } +} + +interface ListenerRecord { + name: string + unsubscribe: () => void +} + +// https://github.com/open-telemetry/opentelemetry-js-contrib/blob/main/packages/instrumentation-undici/src/types.ts +interface FetchRequest { + origin: string + method: string + path: string + headers: string | (string | string[])[] + addHeader: (name: string, value: string) => void + throwOnError: boolean + completed: boolean + aborted: boolean + idempotent: boolean + contentLength: number | null + contentType: string | null + body: unknown +} + +interface FetchResponse { + headers: Buffer[] + statusCode: number + statusText: string }