Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 120 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ const client = new pg.Client({
await client.connect();

const processor = new EventProcessor({
client: createProcessorClient({ querier: client }),
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: {
UserCreated: {
// Handlers are processed concurrently and independently with retries
Expand Down Expand Up @@ -301,11 +301,11 @@ For a detailed architecture diagram, see [architecture.mmd](./architecture.mmd).
Every event in txob follows this structure:

```typescript
interface TxOBEvent<EventType extends string> {
interface TxOBEvent<EventType extends string, EventData = Record<string, unknown>> {
id: string; // Unique event identifier (UUID recommended)
timestamp: Date; // When the event was created
type: EventType; // Event type (e.g., "UserCreated", "OrderPlaced")
data: Record<string, unknown>; // Event payload - your custom data
data: EventData; // Event payload - can be strongly typed per event type
correlation_id: string; // For tracing requests across services
handler_results: Record<string, TxOBEventHandlerResult>; // Results from each handler
errors: number; // Number of processing attempts
Expand All @@ -326,8 +326,11 @@ interface TxOBEvent<EventType extends string> {
Handlers are async functions that execute your side-effects:

```typescript
type TxOBEventHandler = (
event: TxOBEvent,
type TxOBEventHandler<
EventType extends string = string,
EventData = Record<string, unknown>,
> = (
event: TxOBEvent<EventType, EventData>,
opts: { signal?: AbortSignal },
) => Promise<void>;
```
Expand All @@ -349,6 +352,57 @@ const incrementCounter: TxOBEventHandler = async (event) => {
};
```

#### Typed payloads with Standard Schema (Zod/ArkType/Valibot/etc.)

txob uses schema-driven event payload typing through the Standard Schema interface.
`eventSchemas` is required by both `createProcessorClient(...)` and `createEventProcessor(...)`.
Use any validator that implements Standard Schema (for example Zod, ArkType, or Valibot).

```typescript
import { z } from "zod";
import {
createEventHandlerMap,
createEventProcessor,
type TxOBEventSchemaMap,
} from "txob";
import { createProcessorClient } from "txob/pg";

const eventSchemas = {
UserCreated: z.object({
userId: z.string().uuid(),
email: z.string().email(),
}),
OrderPlaced: z.object({
orderId: z.string().uuid(),
amount: z.number().positive(),
}),
} satisfies TxOBEventSchemaMap<"UserCreated" | "OrderPlaced">;

const handlerMap = createEventHandlerMap({
eventSchemas,
handlerMap: {
UserCreated: {
sendWelcomeEmail: async (event) => {
await emailService.send(event.data.email); // typed as string
},
},
OrderPlaced: {
sendReceipt: async (event) => {
await receipts.send(event.data.orderId, event.data.amount); // strongly typed
},
},
},
});

const processor = createEventProcessor({
eventSchemas,
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap,
});
```

Schema-first inference via `eventSchemas` is the standard and required approach.

### Handler Results

Each handler's execution is tracked independently:
Expand Down Expand Up @@ -792,7 +846,7 @@ await client.connect();

// 3. Create and start the processor
const processor = new EventProcessor<EventType>({
client: createProcessorClient<EventType>({ querier: client }),
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: {
UserCreated: {
sendEmail: async (event, { signal }) => {
Expand Down Expand Up @@ -912,7 +966,7 @@ gracefulShutdown(server, {

```typescript
const processor = new EventProcessor({
client: createProcessorClient({ querier: client }),
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: {
UserCreated: {
sendWelcomeEmail: async (event) => {
Expand Down Expand Up @@ -1010,7 +1064,7 @@ const producer = kafka.producer();
await producer.connect();

const processor = new EventProcessor({
client: createProcessorClient({ querier: client }),
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: {
UserCreated: {
// Publish to Kafka with guaranteed consistency
Expand Down Expand Up @@ -1067,7 +1121,7 @@ const client = new pg.Client({
await client.connect();

const processor = new EventProcessor({
client: createProcessorClient({ querier: client }),
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: {
// All your handlers...
},
Expand Down Expand Up @@ -1140,11 +1194,12 @@ Creates a PostgreSQL processor client.
```typescript
import { createProcessorClient } from "txob/pg";

createProcessorClient<EventType>(opts: {
createProcessorClient(opts: {
querier: pg.Client;
eventSchemas: Record<string, StandardSchemaV1<unknown, unknown>>;
table?: string; // Default: "events"
limit?: number; // Default: 100
}): TxOBProcessorClient<EventType>
}): TxOBProcessorClient<...inferred from eventSchemas...>
```

### `createProcessorClient` (MongoDB)
Expand All @@ -1154,12 +1209,13 @@ Creates a MongoDB processor client.
```typescript
import { createProcessorClient } from "txob/mongodb";

createProcessorClient<EventType>(opts: {
createProcessorClient(opts: {
mongo: mongodb.MongoClient;
db: string; // Database name
eventSchemas: Record<string, StandardSchemaV1<unknown, unknown>>;
collection?: string; // Default: "events"
limit?: number; // Default: 100
}): TxOBProcessorClient<EventType>
}): TxOBProcessorClient<...inferred from eventSchemas...>
```

### `TxOBError`
Expand Down Expand Up @@ -1233,11 +1289,11 @@ If the database is not configured for Change Streams, an error will be emitted v

```typescript
// Main event type
type TxOBEvent<EventType extends string> = {
type TxOBEvent<EventType extends string, EventData = Record<string, unknown>> = {
id: string;
timestamp: Date;
type: EventType;
data: Record<string, unknown>;
data: EventData;
correlation_id: string;
handler_results: Record<string, TxOBEventHandlerResult>;
errors: number;
Expand All @@ -1246,16 +1302,32 @@ type TxOBEvent<EventType extends string> = {
};

// Handler function signature
type TxOBEventHandler = (
event: TxOBEvent,
type TxOBEventHandler<
EventType extends string = string,
EventData = Record<string, unknown>,
> = (
event: TxOBEvent<EventType, EventData>,
opts: { signal?: AbortSignal },
) => Promise<void>;

// Handler map structure
type TxOBEventHandlerMap<EventType extends string> = Record<
type TxOBEventDataMap<EventType extends string> = Record<
EventType,
Record<string, TxOBEventHandler>
Record<string, unknown>
>;
type TxOBEventHandlerMap<
EventType extends string,
EventDataMap extends TxOBEventDataMap<EventType>,
> = {
[TType in EventType]: Record<string, TxOBEventHandler<TType, EventDataMap[TType]>>;
};

// Schema-first convenience API
createEventProcessor({
eventSchemas,
client: createProcessorClient({ querier, eventSchemas }),
handlerMap,
});

// Handler result tracking
type TxOBEventHandlerResult = {
Expand Down Expand Up @@ -1395,7 +1467,7 @@ If using `FOR UPDATE SKIP LOCKED` properly (which txob does), stuck events are n
- Lower `maxEventConcurrency`
- Profile handlers for memory leaks
- Archive old events
- Reduce `limit` in `createProcessorClient({ querier: client, table, limit })`
- Reduce `limit` in `createProcessorClient({ querier: client, eventSchemas, table, limit })`

### Duplicate handler executions

Expand Down Expand Up @@ -1700,7 +1772,7 @@ txob can emit spans and metrics without depending on a specific telemetry SDK. I
import { metrics, trace } from "@opentelemetry/api";

const processor = new EventProcessor({
client: createProcessorClient({ querier: client }),
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: handlers,
telemetry: {
tracer: trace.getTracer("txob"),
Expand Down Expand Up @@ -1767,30 +1839,44 @@ onEventMaxErrorsReached: async ({ event, txClient }) => {

### Can I use this with TypeScript?

Yes! txob is written in TypeScript and provides full type safety:
Yes! txob is written in TypeScript and provides full type safety, including typed event payloads:

```typescript
// Define your event types
const eventTypes = {
UserCreated: "UserCreated",
OrderPlaced: "OrderPlaced",
} as const;
import { z } from "zod";
import {
createEventHandlerMap,
createEventProcessor,
type TxOBEventSchemaMap,
} from "txob";
import { createProcessorClient } from "txob/pg";

type EventType = keyof typeof eventTypes;
const eventSchemas = {
UserCreated: z.object({ userId: z.string().uuid(), email: z.string().email() }),
OrderPlaced: z.object({ orderId: z.string().uuid(), amount: z.number() }),
} satisfies TxOBEventSchemaMap<"UserCreated" | "OrderPlaced">;

// TypeScript will enforce all event types have handlers
const processor = new EventProcessor<EventType>({
client: createProcessorClient<EventType>({ querier: client }),
const handlers = createEventHandlerMap({
eventSchemas,
handlerMap: {
UserCreated: {
/* handlers */
sendWelcomeEmail: async (event) => {
event.data.email; // string
},
},
OrderPlaced: {
/* handlers */
sendReceipt: async (event) => {
event.data.amount; // number
},
},
// Missing an event type? TypeScript error!
// Missing an event type? TypeScript error
},
});

const processor = createEventProcessor({
eventSchemas,
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: handlers,
});
```

### What's the performance impact?
Expand Down
21 changes: 21 additions & 0 deletions examples/pg/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { z } from "zod";
import type { TxOBEventSchemaMap } from "../../src/index.js";

export const eventTypes = {
ResourceSaved: "ResourceSaved",
EventMaxErrorsReached: "EventMaxErrorsReached",
} as const;

export type EventType = keyof typeof eventTypes;

export const eventSchemas = {
[eventTypes.ResourceSaved]: z.object({
type: z.literal("activity"),
id: z.uuid(),
}),
[eventTypes.EventMaxErrorsReached]: z.object({
failedEventId: z.uuid(),
failedEventType: z.string(),
failedEventCorrelationId: z.uuid(),
}),
} satisfies TxOBEventSchemaMap<EventType>;
3 changes: 2 additions & 1 deletion examples/pg/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
},
"dependencies": {
"http-graceful-shutdown": "^3.1.13",
"pg": "^8.11.3"
"pg": "^8.11.3",
"zod": "^4.1.12"
},
"devDependencies": {
"@types/node": "^20.10.5",
Expand Down
Loading
Loading