Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ updates:
schedule:
interval: "weekly"
open-pull-requests-limit: 0
security-updates-only: true
security-updates-only: true
44 changes: 41 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,16 @@ EventProcessor(client, handlerMap, {
error: (msg, ...args) => console.error(msg, ...args),
},

// OpenTelemetry-compatible tracing and metrics (default: undefined)
telemetry: {
tracer: trace.getTracer("txob"),
meter: metrics.getMeter("txob"),
attributes: {
"service.name": "orders-worker",
"deployment.environment": "production",
},
},

// Hook called when max errors reached (default: undefined)
onEventMaxErrorsReached: async ({ event, txClient, signal }) => {
// Create a dead-letter event, send alerts, etc.
Expand All @@ -724,6 +734,7 @@ EventProcessor(client, handlerMap, {
| `wakeupTimeoutMs` | `number` | `30000` | Fallback poll if no wakeup signal received (only used with wakeupEmitter) |
| `wakeupThrottleMs` | `number` | `1000` | Throttle wakeup signals to prevent excessive polling (only used with wakeupEmitter) |
| `logger` | `Logger` | `undefined` | Custom logger interface |
| `telemetry` | `TxOBTelemetry` | `undefined` | OpenTelemetry-compatible tracer, meter, and shared attributes |
| `onEventMaxErrorsReached` | `function` | `undefined` | Hook for max errors |

## Usage Examples
Expand Down Expand Up @@ -1568,15 +1579,42 @@ EventProcessor(client, handlers, {

### How do I monitor event processing?

**1. Use the logger option:**
**1. Enable OpenTelemetry-compatible telemetry:**

txob can emit spans and metrics without depending on a specific telemetry SDK. Install and configure OpenTelemetry in your application, then pass a `Tracer` and/or `Meter` to opt in:

```typescript
import { metrics, trace } from "@opentelemetry/api";

const processor = new EventProcessor({
client: createProcessorClient({ querier: client }),
handlerMap: handlers,
telemetry: {
tracer: trace.getTracer("txob"),
meter: metrics.getMeter("txob"),
attributes: {
"service.name": "orders-worker",
"deployment.environment": process.env.NODE_ENV ?? "development",
},
},
});
```

This records `txob.poll`, `txob.event.process`, and `txob.handler.process` spans plus `txob.poll.count`, `txob.poll.duration`, `txob.event.processing.count`, `txob.event.processing.duration`, `txob.handler.processing.count`, and `txob.handler.processing.duration` metrics. Metrics use low-cardinality attributes such as event type, handler name, and outcome; event IDs and correlation IDs are only attached to spans.

The full set of telemetry names is exported as constants from `txob`: `TxOBTelemetrySpanName`, `TxOBTelemetryMetricName`, `TxOBTelemetryAttributeKey`, `TxOBTelemetryEventOutcome`, `TxOBTelemetryHandlerOutcome`, and `TxOBTelemetryPollOutcome`.

txob surfaces failures while creating metric instruments during processor construction so misconfigured telemetry is visible at startup. Runtime telemetry operations, including span creation and metric recording, are best-effort and will not interrupt event processing if an exporter or SDK callback fails.

**2. Use the logger option:**

```typescript
EventProcessor(client, handlers, {
logger: myLogger, // Logs all processing activity
});
```

**2. Query the events table:**
**3. Query the events table:**

```sql
-- Pending events
Expand All @@ -1594,7 +1632,7 @@ FROM events WHERE processed_at IS NOT NULL
GROUP BY type;
```

**3. Create monitoring events:**
**4. Create monitoring events:**

```typescript
onEventMaxErrorsReached: async ({ event, txClient }) => {
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./processor.js";
export * from "./error.js";
export * from "./telemetry.js";
227 changes: 227 additions & 0 deletions src/processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ import { describe, it, expect, vi, afterEach } from "vitest";
import { EventProcessor, TxOBEvent, defaultBackoff } from "./processor.js";
import { TxOBError, ErrorUnprocessableEventHandler } from "./error.js";
import { sleep } from "./sleep.js";
import {
TxOBTelemetryAttributeKey,
TxOBTelemetryEventOutcome,
TxOBTelemetryHandlerOutcome,
TxOBTelemetryMetricName,
TxOBTelemetrySpanName,
} from "./telemetry.js";

const mockTxClient = {
getEventByIdForUpdateSkipLocked: vi.fn(),
Expand Down Expand Up @@ -1253,3 +1260,223 @@ describe("EventProcessor - basic", () => {
expect(mockTxClient.updateEvent).not.toHaveBeenCalled();
});
});

describe("EventProcessor - telemetry", () => {
it("records OpenTelemetry-compatible spans and metrics when enabled", async () => {
const counters = new Map<string, { add: ReturnType<typeof vi.fn> }>();
const histograms = new Map<string, { record: ReturnType<typeof vi.fn> }>();
const spans: {
name: string;
setAttributes: ReturnType<typeof vi.fn>;
recordException: ReturnType<typeof vi.fn>;
setStatus: ReturnType<typeof vi.fn>;
end: ReturnType<typeof vi.fn>;
}[] = [];
const meter = {
createCounter: vi.fn((name: string) => {
const counter = { add: vi.fn() };
counters.set(name, counter);
return counter;
}),
createHistogram: vi.fn((name: string) => {
const histogram = { record: vi.fn() };
histograms.set(name, histogram);
return histogram;
}),
};
const tracer = {
startSpan: vi.fn((name: string) => {
const span = {
name,
setAttributes: vi.fn(),
recordException: vi.fn(),
setStatus: vi.fn(),
end: vi.fn(),
};
spans.push(span);
return span;
}),
};
const handlerMap = {
evtType1: {
handler1: vi.fn(() => Promise.resolve()),
},
};
const evt1: TxOBEvent<keyof typeof handlerMap> = {
type: "evtType1",
id: "1",
timestamp: now,
data: {},
correlation_id: "abc123",
handler_results: {},
errors: 0,
};
let callCount = 0;
mockClient.getEventsToProcess.mockImplementation(() => {
callCount++;
return Promise.resolve(callCount === 1 ? [evt1] : []);
});
mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation(() =>
Promise.resolve(evt1),
);
mockTxClient.updateEvent.mockImplementation(() => Promise.resolve());

const processor = new EventProcessor({
client: mockClient,
handlerMap,
pollingIntervalMs: 10,
telemetry: {
meter,
tracer,
attributes: {
"service.name": "txob-test",
},
},
});
processor.start();
await sleep(50);
await processor.stop();

expect(meter.createCounter).toHaveBeenCalledWith(
TxOBTelemetryMetricName.EventProcessingCount,
expect.any(Object),
);
expect(meter.createHistogram).toHaveBeenCalledWith(
TxOBTelemetryMetricName.HandlerProcessingDuration,
expect.any(Object),
);
expect(tracer.startSpan).toHaveBeenCalledWith(
TxOBTelemetrySpanName.EventProcess,
expect.objectContaining({
attributes: expect.objectContaining({
"service.name": "txob-test",
[TxOBTelemetryAttributeKey.EventId]: "1",
[TxOBTelemetryAttributeKey.EventType]: "evtType1",
[TxOBTelemetryAttributeKey.EventCorrelationId]: "abc123",
}),
}),
);
expect(spans.map((span) => span.name)).toEqual(
expect.arrayContaining([
TxOBTelemetrySpanName.Poll,
TxOBTelemetrySpanName.EventProcess,
TxOBTelemetrySpanName.HandlerProcess,
]),
);
expect(
counters.get(TxOBTelemetryMetricName.EventProcessingCount)?.add,
).toHaveBeenCalledWith(
1,
expect.objectContaining({
"service.name": "txob-test",
[TxOBTelemetryAttributeKey.EventType]: "evtType1",
[TxOBTelemetryAttributeKey.EventOutcome]:
TxOBTelemetryEventOutcome.Success,
}),
);
expect(
counters.get(TxOBTelemetryMetricName.HandlerProcessingCount)?.add,
).toHaveBeenCalledWith(
1,
expect.objectContaining({
"service.name": "txob-test",
[TxOBTelemetryAttributeKey.EventType]: "evtType1",
[TxOBTelemetryAttributeKey.HandlerName]: "handler1",
[TxOBTelemetryAttributeKey.HandlerOutcome]:
TxOBTelemetryHandlerOutcome.Success,
}),
);
expect(
histograms.get(TxOBTelemetryMetricName.EventProcessingDuration)?.record,
).toHaveBeenCalledWith(
expect.any(Number),
expect.objectContaining({
[TxOBTelemetryAttributeKey.EventType]: "evtType1",
[TxOBTelemetryAttributeKey.EventOutcome]:
TxOBTelemetryEventOutcome.Success,
}),
);
expect(
spans.find((span) => span.name === TxOBTelemetrySpanName.EventProcess)
?.setStatus,
).toHaveBeenCalledWith({ code: 1 });
});

it("surfaces failures when creating metric instruments", () => {
const metricError = new Error("counter failed");

expect(
() =>
new EventProcessor({
client: mockClient,
handlerMap: {},
pollingIntervalMs: 10,
telemetry: {
meter: {
createCounter: vi.fn(() => {
throw metricError;
}),
createHistogram: vi.fn(),
},
},
}),
).toThrow(metricError);
});

it("does not let runtime telemetry failures interrupt processing", async () => {
const handlerMap = {
evtType1: {
handler1: vi.fn(() => Promise.resolve()),
},
};
const evt1: TxOBEvent<keyof typeof handlerMap> = {
type: "evtType1",
id: "1",
timestamp: now,
data: {},
correlation_id: "abc123",
handler_results: {},
errors: 0,
};
let callCount = 0;
mockClient.getEventsToProcess.mockImplementation(() => {
callCount++;
return Promise.resolve(callCount === 1 ? [evt1] : []);
});
mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation(() =>
Promise.resolve(evt1),
);
mockTxClient.updateEvent.mockImplementation(() => Promise.resolve());

const processor = new EventProcessor({
client: mockClient,
handlerMap,
pollingIntervalMs: 10,
telemetry: {
meter: {
createCounter: vi.fn(() => ({
add: vi.fn(() => {
throw new Error("counter add failed");
}),
})),
createHistogram: vi.fn(() => ({
record: vi.fn(() => {
throw new Error("histogram record failed");
}),
})),
},
tracer: {
startSpan: vi.fn(() => {
throw new Error("span failed");
}),
},
},
});
processor.start();
await sleep(50);
await processor.stop();

expect(handlerMap.evtType1.handler1).toHaveBeenCalledOnce();
expect(mockTxClient.updateEvent).toHaveBeenCalledOnce();
});
});
Loading
Loading