Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions src/internal/monitoring/otel-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ import { FastifyReply, FastifyRequest } from 'fastify'
import * as os from 'os'
import { getConfig } from '../../config'

const { version, otelMetricsExportIntervalMs, otelMetricsEnabled, otelMetricsTemporality, region } =
getConfig()
const {
version,
otelMetricsExportIntervalMs,
otelMetricsEnabled,
otelMetricsTemporality,
prometheusMetricsEnabled,
region,
} = getConfig()

let prometheusExporter: PrometheusExporter | undefined
let meterProvider: MeterProvider | undefined
Expand Down Expand Up @@ -229,12 +235,14 @@ if (otelMetricsEnabled) {
)
}

prometheusExporter = new PrometheusExporter({
prefix: 'storage_api',
preventServerStart: true,
withResourceConstantLabels: /^(region|instance|metric\.version)$/,
})
readers.push(prometheusExporter)
if (prometheusMetricsEnabled) {
prometheusExporter = new PrometheusExporter({
prefix: 'storage_api',
preventServerStart: true,
withResourceConstantLabels: /^(region|instance|metric\.version)$/,
})
readers.push(prometheusExporter)
}

meterProvider = new MeterProvider({
resource,
Expand Down
3 changes: 3 additions & 0 deletions src/test/jest.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
declare namespace jest {
function isolateModulesAsync(fn: () => Promise<void>): Promise<void>
}
86 changes: 85 additions & 1 deletion src/test/otel-metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ interface OTelGlobalState {
__otelMetricsShutdown?: () => Promise<void>
}

describe('otel metrics shutdown', () => {
describe('otel metrics', () => {
const originalOtelExporterEndpoint = process.env.OTEL_EXPORTER_OTLP_ENDPOINT
const originalOtelMetricsEndpoint = process.env.OTEL_EXPORTER_OTLP_METRICS_ENDPOINT
const originalOtelMetricsHeaders = process.env.OTEL_EXPORTER_OTLP_METRICS_HEADERS
Expand Down Expand Up @@ -34,6 +34,7 @@ describe('otel metrics shutdown', () => {
}

jest.restoreAllMocks()
jest.resetModules()
})

test('still shuts down meter provider when unregister throws', async () => {
Expand Down Expand Up @@ -68,6 +69,7 @@ describe('otel metrics shutdown', () => {
otelMetricsExportIntervalMs: 1000,
otelMetricsEnabled: true,
otelMetricsTemporality: 'CUMULATIVE',
prometheusMetricsEnabled: true,
region: 'local',
})),
}))
Expand Down Expand Up @@ -135,4 +137,86 @@ describe('otel metrics shutdown', () => {
expect.objectContaining({ type: 'otel-metrics', error: unregisterError })
)
})

test('does not create a Prometheus reader when Prometheus metrics are disabled', async () => {
delete process.env.OTEL_EXPORTER_OTLP_ENDPOINT
delete process.env.OTEL_EXPORTER_OTLP_METRICS_ENDPOINT

const registerInstrumentations = jest.fn(() => jest.fn())
const HostMetrics = jest.fn().mockImplementation(() => ({
start: jest.fn(),
}))
const MeterProvider = jest.fn().mockImplementation(() => ({
shutdown: jest.fn().mockResolvedValue(undefined),
}))
const PrometheusExporter = jest.fn().mockImplementation(() => ({
getMetricsRequestHandler: jest.fn(),
}))
const RuntimeNodeInstrumentation = jest.fn().mockImplementation(() => ({}))
const StorageNodeInstrumentation = jest.fn().mockImplementation(() => ({}))

jest.doMock('../config', () => ({
getConfig: jest.fn(() => ({
version: 'test-version',
otelMetricsExportIntervalMs: 1000,
otelMetricsEnabled: true,
otelMetricsTemporality: 'CUMULATIVE',
prometheusMetricsEnabled: false,
region: 'local',
})),
}))
jest.doMock('@internal/monitoring/logger', () => ({
logger: { info: jest.fn() },
logSchema: { error: jest.fn(), info: jest.fn() },
}))
jest.doMock('@internal/monitoring/system', () => ({
StorageNodeInstrumentation,
}))
jest.doMock('@opentelemetry/api', () => ({
metrics: {
setGlobalMeterProvider: jest.fn(),
},
}))
jest.doMock('@opentelemetry/exporter-metrics-otlp-grpc', () => ({
OTLPMetricExporter: jest.fn(),
}))
jest.doMock('@opentelemetry/exporter-prometheus', () => ({
PrometheusExporter,
}))
jest.doMock('@opentelemetry/host-metrics', () => ({
HostMetrics,
}))
jest.doMock('@opentelemetry/instrumentation', () => ({
registerInstrumentations,
}))
jest.doMock('@opentelemetry/instrumentation-runtime-node', () => ({
RuntimeNodeInstrumentation,
}))
jest.doMock('@opentelemetry/resources', () => ({
resourceFromAttributes: jest.fn(() => ({})),
}))
jest.doMock('@opentelemetry/sdk-metrics', () => ({
AggregationTemporality: {
CUMULATIVE: 'CUMULATIVE',
DELTA: 'DELTA',
},
AggregationType: {
DROP: 'DROP',
EXPLICIT_BUCKET_HISTOGRAM: 'EXPLICIT_BUCKET_HISTOGRAM',
},
MeterProvider,
PeriodicExportingMetricReader: jest.fn(),
}))

await jest.isolateModulesAsync(async () => {
await import('../internal/monitoring/otel-metrics')
})

expect(PrometheusExporter).not.toHaveBeenCalled()
expect(MeterProvider).toHaveBeenCalledWith(
expect.objectContaining({
readers: [],
})
)
})
})
23 changes: 8 additions & 15 deletions src/test/otel-tracing.test.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,9 @@
import { createDeferred } from './utils/promise'

interface OTelGlobalState {
__otelTracingShutdown?: () => Promise<void>
}

interface Deferred<T> {
promise: Promise<T>
resolve: (value: T) => void
}

function createDeferred<T>(): Deferred<T> {
let resolve!: (value: T) => void
const promise = new Promise<T>((resolvePromise) => {
resolve = resolvePromise
})

return { promise, resolve }
}

describe('otel tracing bootstrap', () => {
const originalTracingEnabled = process.env.TRACING_ENABLED
const originalTraceEndpoint = process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT
Expand Down Expand Up @@ -188,6 +176,7 @@ describe('otel tracing bootstrap', () => {
config,
}))
const OTLPTraceExporter = jest.fn().mockImplementation(() => ({}))
const classInstrumentationsDeferred = createDeferred<unknown[]>()
const logSchema = {
error: jest.fn(),
info: jest.fn(),
Expand All @@ -214,7 +203,7 @@ describe('otel tracing bootstrap', () => {
logSchema,
}))
jest.doMock('../internal/monitoring/otel-class-instrumentations', () => ({
loadClassInstrumentations: jest.fn(async () => []),
loadClassInstrumentations: jest.fn(() => classInstrumentationsDeferred.promise),
}))

let shutdownOtelTracing: (() => Promise<void>) | undefined
Expand All @@ -225,6 +214,10 @@ describe('otel tracing bootstrap', () => {
.__otelTracingShutdown
})

classInstrumentationsDeferred.resolve([])
await classInstrumentationsDeferred.promise
await Promise.resolve()

await expect(shutdownOtelTracing?.()).resolves.toBeUndefined()

expect(unregisterClassInstrumentations).toHaveBeenCalledTimes(1)
Expand Down
22 changes: 6 additions & 16 deletions src/test/progressive-migrations.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { createDeferred } from './utils/promise'

const mockBatchSend = jest.fn()
const mockWarning = jest.fn()
const mockError = jest.fn()
Expand Down Expand Up @@ -57,18 +59,6 @@ class TestProgressiveMigrations extends ProgressiveMigrations {
}
}

function createDeferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void
let reject!: (reason?: unknown) => void

const promise = new Promise<T>((res, rej) => {
resolve = res
reject = rej
})

return { promise, resolve, reject }
}

const mockGetTenantConfig = jest.mocked(getTenantConfig)
const mockAreMigrationsUpToDate = jest.mocked(areMigrationsUpToDate)
const mockRunMigrationsBatchSend = jest.mocked(RunMigrationsOnTenants.batchSend)
Expand Down Expand Up @@ -133,7 +123,7 @@ describe('ProgressiveMigrations', () => {
})

it('keeps new tenants queued while a batch is in flight and ignores duplicate adds', async () => {
const deferredBatch = createDeferred<void>()
const deferredBatch = createDeferred()
mockRunMigrationsBatchSend.mockReturnValueOnce(deferredBatch.promise as never)

const migrations = new TestProgressiveMigrations({
Expand All @@ -155,15 +145,15 @@ describe('ProgressiveMigrations', () => {

expect(migrations.pending()).toEqual(['tenant-a', 'tenant-b'])

deferredBatch.resolve(undefined)
deferredBatch.resolve()

await expect(flushPromise).resolves.toBeUndefined()
expect(migrations.pending()).toEqual(['tenant-b'])
expect(migrations.isEmitting()).toBe(false)
})

it('serializes drain with an in-flight batch and drains the remaining tenants after it finishes', async () => {
const deferredBatch = createDeferred<void>()
const deferredBatch = createDeferred()
mockRunMigrationsBatchSend
.mockReturnValueOnce(deferredBatch.promise as never)
.mockResolvedValueOnce(undefined as never)
Expand All @@ -184,7 +174,7 @@ describe('ProgressiveMigrations', () => {
expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(1)
expect(migrations.isEmitting()).toBe(true)

deferredBatch.resolve(undefined)
deferredBatch.resolve()

await expect(Promise.all([flushPromise, drainPromise])).resolves.toEqual([undefined, undefined])

Expand Down
25 changes: 25 additions & 0 deletions src/test/utils/promise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
export interface Deferred<T> {
promise: Promise<T>
resolve(value: T | PromiseLike<T>): void
reject(reason?: unknown): void
}

export interface VoidDeferred {
promise: Promise<void>
resolve(value?: void | PromiseLike<void>): void
reject(reason?: unknown): void
}

type DeferredFor<T> = [T] extends [void] ? VoidDeferred : Deferred<T>

export function createDeferred<T = void>(): DeferredFor<T> {
let resolve!: (value: T | PromiseLike<T>) => void
let reject!: (reason?: unknown) => void

const promise = new Promise<T>((res, rej) => {
resolve = res
reject = rej
})

return { promise, resolve, reject } as DeferredFor<T>
}
22 changes: 7 additions & 15 deletions src/test/vector-store-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,7 @@ import {
VectorStore,
VectorStoreManager,
} from '@storage/protocols/vector'

function deferred() {
let resolve!: () => void
const promise = new Promise<void>((res) => {
resolve = res
})

return { promise, resolve }
}
import { createDeferred } from './utils/promise'

function createMockVectorStore(): jest.Mocked<VectorStore> {
return {
Expand Down Expand Up @@ -194,8 +186,8 @@ function createDeterministicVectorDb(options: {

describe('VectorStoreManager bucket lifecycle', () => {
it('serializes concurrent creates for the final bucket slot', async () => {
const releaseFirstCreate = deferred()
const firstCreateStarted = deferred()
const releaseFirstCreate = createDeferred()
const firstCreateStarted = createDeferred()

const db = createDeterministicVectorDb({
bucketCount: 1,
Expand Down Expand Up @@ -250,8 +242,8 @@ describe('VectorStoreManager bucket lifecycle', () => {
})

it('shares the bucket-count lock between delete and create so capacity is observed after delete commits', async () => {
const releaseDelete = deferred()
const deleteReachedRemoval = deferred()
const releaseDelete = createDeferred()
const deleteReachedRemoval = createDeferred()

const db = createDeterministicVectorDb({
bucketCount: 1,
Expand Down Expand Up @@ -281,8 +273,8 @@ describe('VectorStoreManager bucket lifecycle', () => {
})

it('does not block unrelated creates while delete waits on the target bucket lock', async () => {
const releaseBucketLock = deferred()
const deleteWaitingOnBucketLock = deferred()
const releaseBucketLock = createDeferred()
const deleteWaitingOnBucketLock = createDeferred()

const db = createDeterministicVectorDb({
bucketCount: 1,
Expand Down
Loading