Skip to content
Merged
108 changes: 7 additions & 101 deletions packages/otel/src/instrumentations/fetch.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import { http, HttpResponse } from 'msw'
import { setupServer } from 'msw/node'
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, test } from 'vitest'
import { createTracerProvider } from '../bootstrap/main.ts'
import { shutdownTracers } from '../main.ts'
import { describe, expect, test } from 'vitest'
import { FetchInstrumentation } from './fetch.ts'

describe('header exclusion', () => {
Expand All @@ -14,11 +10,7 @@ describe('header exclusion', () => {
// eslint-disable-next-line @typescript-eslint/dot-notation
const attributes = instrumentation['prepareHeaders'](
'request',
new Headers({
a: 'a',
b: 'b',
authorization: 'secret',
}),
['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)),
)
expect(attributes).toEqual({
'http.request.header.a': 'a',
Expand All @@ -33,11 +25,7 @@ describe('header exclusion', () => {
// eslint-disable-next-line @typescript-eslint/dot-notation
const empty = everything['prepareHeaders'](
'request',
new Headers({
a: 'a',
b: 'b',
authorization: 'secret',
}),
['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)),
)
expect(empty).toEqual({})
})
Expand All @@ -50,13 +38,9 @@ describe('header exclusion', () => {
// eslint-disable-next-line @typescript-eslint/dot-notation
const attributes = instrumentation['prepareHeaders'](
'request',
new Headers({
a: 'a',
b: 'b',
authorization: 'a secret',
}),
['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)),
)
expect(attributes['http.request.header.authorization']).not.toBe('a secret')
expect(attributes['http.request.header.authorization']).not.toBe('secret')
expect(attributes['http.request.header.authorization']).toBeTypeOf('string')
expect(attributes['http.request.header.a']).toBe('a')
expect(attributes['http.request.header.b']).toBe('b')
Expand All @@ -70,91 +54,13 @@ describe('header exclusion', () => {
// eslint-disable-next-line @typescript-eslint/dot-notation
const attributes = instrumentation['prepareHeaders'](
'request',
new Headers({
a: 'a',
b: 'b',
authorization: 'a secret',
}),
['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)),
)
expect(attributes['http.request.header.authorization']).not.toBe('a secret')
expect(attributes['http.request.header.authorization']).not.toBe('secret')
expect(attributes['http.request.header.a']).not.toBe('a')
expect(attributes['http.request.header.b']).not.toBe('b')
expect(attributes['http.request.header.authorization']).toBeTypeOf('string')
expect(attributes['http.request.header.a']).toBeTypeOf('string')
expect(attributes['http.request.header.b']).toBeTypeOf('string')
})
})

describe('patched fetch', () => {
const server = setupServer(
http.get('http://localhost:3000/ok', () => HttpResponse.json({ message: 'ok' })),
http.post('http://localhost:3000/ok', () => HttpResponse.json({ message: 'ok' })),
)

beforeAll(() => {
server.listen({ onUnhandledRequest: 'error' })
})

beforeEach(() => {
createTracerProvider({
serviceName: 'test-service',
serviceVersion: '1.0.0',
deploymentEnvironment: 'test',
siteUrl: 'https://example.com',
siteId: '12345',
siteName: 'example',
instrumentations: [new FetchInstrumentation()],
})
})

afterEach(async () => {
server.resetHandlers()
await shutdownTracers()
})

afterAll(() => {
server.close()
})

it('can GET url', async () => {
createTracerProvider({
serviceName: 'test-service',
serviceVersion: '1.0.0',
deploymentEnvironment: 'test',
siteUrl: 'https://example.com',
siteId: '12345',
siteName: 'example',
instrumentations: [new FetchInstrumentation()],
})

await expect(fetch('http://localhost:3000/ok').then((r) => r.json())).resolves.toEqual({ message: 'ok' })
})

it('can POST url', async () => {
await expect(
fetch('http://localhost:3000/ok', {
method: 'POST',
body: JSON.stringify({ hello: 'rabbit' }),
headers: {
'Content-Type': 'application/json',
},
}).then((r) => r.json()),
).resolves.toEqual({ message: 'ok' })
})

it('can GET request', async () => {
const req = new Request('http://localhost:3000/ok')
await expect(fetch(req).then((r) => r.json())).resolves.toEqual({ message: 'ok' })
})

it('can POST request', async () => {
const req = new Request('http://localhost:3000/ok', {
method: 'POST',
body: JSON.stringify({ hello: 'rabbit' }),
headers: {
'Content-Type': 'application/json',
},
})
await expect(fetch(req).then((r) => r.json())).resolves.toEqual({ message: 'ok' })
})
})
174 changes: 134 additions & 40 deletions packages/otel/src/instrumentations/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import * as diagnosticsChannel from 'diagnostics_channel'

import * as api from '@opentelemetry/api'
import { SugaredTracer } from '@opentelemetry/api/experimental'
import { _globalThis } from '@opentelemetry/core'
import { InstrumentationConfig, type Instrumentation } from '@opentelemetry/instrumentation'

export interface FetchInstrumentationConfig extends InstrumentationConfig {
getRequestAttributes?(headers: Request): api.Attributes
getResponseAttributes?(response: Response): api.Attributes
getRequestAttributes?(request: FetchRequest): api.Attributes
getResponseAttributes?(response: FetchResponse): api.Attributes
skipURLs?: (string | RegExp)[]
skipHeaders?: (string | RegExp)[] | true
redactHeaders?: (string | RegExp)[] | true
Expand All @@ -14,12 +16,15 @@ export interface FetchInstrumentationConfig extends InstrumentationConfig {
export class FetchInstrumentation implements Instrumentation {
instrumentationName = '@netlify/otel/instrumentation-fetch'
instrumentationVersion = '1.0.0'
private originalFetch: typeof fetch | null = null
private config: FetchInstrumentationConfig
private provider?: api.TracerProvider

declare private _channelSubs: ListenerRecord[]
private _recordFromReq = new WeakMap<FetchRequest, api.Span>()

constructor(config: FetchInstrumentationConfig = {}) {
this.config = config
this._channelSubs = []
}

getConfig(): FetchInstrumentationConfig {
Expand All @@ -36,9 +41,10 @@ export class FetchInstrumentation implements Instrumentation {
return this.provider
}

private annotateFromRequest(span: api.Span, request: Request): void {
private annotateFromRequest(span: api.Span, request: FetchRequest): void {
const extras = this.config.getRequestAttributes?.(request) ?? {}
const url = new URL(request.url)
const url = new URL(request.path, request.origin)

// these are based on @opentelemetry/semantic-convention 1.36
span.setAttributes({
...extras,
Expand All @@ -52,19 +58,25 @@ export class FetchInstrumentation implements Instrumentation {
})
}

private annotateFromResponse(span: api.Span, response: Response): void {
private annotateFromResponse(span: api.Span, response: FetchResponse): void {
const extras = this.config.getResponseAttributes?.(response) ?? {}

// these are based on @opentelemetry/semantic-convention 1.36
span.setAttributes({
...extras,
'http.response.status_code': response.status,
'http.response.status_code': response.statusCode,
...this.prepareHeaders('response', response.headers),
})
span.setStatus({ code: response.status >= 400 ? api.SpanStatusCode.ERROR : api.SpanStatusCode.UNSET })

span.setStatus({
code: response.statusCode >= 400 ? api.SpanStatusCode.ERROR : api.SpanStatusCode.UNSET,
})
}

private prepareHeaders(type: 'request' | 'response', headers: Headers): api.Attributes {
private prepareHeaders(
type: 'request' | 'response',
headers: FetchRequest['headers'] | FetchResponse['headers'],
): api.Attributes {
if (this.config.skipHeaders === true) {
return {}
}
Expand All @@ -74,8 +86,9 @@ export class FetchInstrumentation implements Instrumentation {
const everythingSkipped = skips.some((skip) => everything.includes(skip.toString()))
const attributes: api.Attributes = {}
if (everythingSkipped) return attributes
const entries = headers.entries()
for (const [key, value] of entries) {
for (let idx = 0; idx < headers.length; idx = idx + 2) {
const key = headers[idx].toString().toLowerCase()
const value = headers[idx + 1].toString()
if (skips.some((skip) => (typeof skip == 'string' ? skip == key : skip.test(key)))) {
continue
}
Expand All @@ -92,6 +105,16 @@ export class FetchInstrumentation implements Instrumentation {
return attributes
}

private getRequestMethod(original: string): string {
const acceptedMethods = ['HEAD', 'GET', 'POST', 'PUT', 'PATCH', 'DELETE']

if (acceptedMethods.includes(original.toUpperCase())) {
return original.toUpperCase()
}

return '_OTHER'
}

private getTracer(): SugaredTracer | undefined {
if (!this.provider) {
return undefined
Expand All @@ -105,39 +128,110 @@ export class FetchInstrumentation implements Instrumentation {
return new SugaredTracer(tracer)
}

/**
* patch global fetch
*/
enable(): void {
const originalFetch = _globalThis.fetch
this.originalFetch = originalFetch
_globalThis.fetch = async (resource: RequestInfo | URL, options?: RequestInit): Promise<Response> => {
const url = typeof resource === 'string' ? resource : resource instanceof URL ? resource.href : resource.url
const tracer = this.getTracer()
if (
!tracer ||
this.config.skipURLs?.some((skip) => (typeof skip == 'string' ? url.startsWith(skip) : skip.test(url)))
) {
return await originalFetch(resource, options)
}
// Avoid to duplicate subscriptions
if (this._channelSubs.length > 0) return

// https://undici.nodejs.org/#/docs/api/DiagnosticsChannel?id=diagnostics-channel-support
this.subscribe('undici:request:create', this.onRequestCreate.bind(this))
this.subscribe('undici:request:headers', this.onRequestHeaders.bind(this))
this.subscribe('undici:request:trailers', this.onRequestEnd.bind(this))
this.subscribe('undici:request:error', this.onRequestError.bind(this))
}

return tracer.withActiveSpan('fetch', async (span) => {
const request = new Request(resource, options)
this.annotateFromRequest(span, request)
const response = await originalFetch(request, options)
this.annotateFromResponse(span, response)
return response
})
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private subscribe(channelName: string, onMessage: (message: any, name: string | symbol) => void) {
diagnosticsChannel.subscribe(channelName, onMessage)

const unsubscribe = () => diagnosticsChannel.unsubscribe(channelName, onMessage)
this._channelSubs.push({ name: channelName, unsubscribe })
}

/**
* unpatch global fetch
*/
disable(): void {
if (this.originalFetch) {
_globalThis.fetch = this.originalFetch
this.originalFetch = null
disable() {
this._channelSubs.forEach((sub) => {
sub.unsubscribe()
})
this._channelSubs.length = 0
}

private onRequestCreate({ request }: { request: FetchRequest }): void {
const tracer = this.getTracer()
const url = new URL(request.path, request.origin)

if (
!tracer ||
request.method === 'CONNECT' ||
this.config.skipURLs?.some((skip) => (typeof skip == 'string' ? url.href.startsWith(skip) : skip.test(url.href)))
) {
return
}

const span = tracer.startSpan(
this.getRequestMethod(request.method),
{
kind: api.SpanKind.CLIENT,
},
api.context.active(),
)

this.annotateFromRequest(span, request)

this._recordFromReq.set(request, span)
}

private onRequestHeaders({ request, response }: { request: FetchRequest; response: FetchResponse }): void {
const span = this._recordFromReq.get(request)
if (!span) return

this.annotateFromResponse(span, response)
}

private onRequestError({ request, error }: { request: FetchRequest; error: Error }): void {
const span = this._recordFromReq.get(request)
if (!span) return

span.recordException(error)
span.setStatus({
code: api.SpanStatusCode.ERROR,
message: error.message,
})

span.end()
this._recordFromReq.delete(request)
}

private onRequestEnd({ request }: { request: FetchRequest; response: FetchResponse }): void {
const span = this._recordFromReq.get(request)
if (!span) return

span.end()
this._recordFromReq.delete(request)
}
}

interface ListenerRecord {
name: string
unsubscribe: () => void
}

// https://github.com/open-telemetry/opentelemetry-js-contrib/blob/main/packages/instrumentation-undici/src/types.ts
interface FetchRequest {
origin: string
method: string
path: string
headers: string | (string | string[])[]
addHeader: (name: string, value: string) => void
throwOnError: boolean
completed: boolean
aborted: boolean
idempotent: boolean
contentLength: number | null
contentType: string | null
body: unknown
}

interface FetchResponse {
headers: Buffer[]
statusCode: number
statusText: string
}