A durable, versioned, replayable, single-publisher, multi-subscriber stream implementation written in TypeScript.
Streaming responses from an AI model (or any long-running process) over HTTP is fragile. If the server crashes mid-stream, the client loses everything. If the user refreshes the page, they start over. If two tabs are open, they each need their own stream.
monostream solves this by persisting every chunk as it's written. A subscriber that connects at any point -- before, during, or after the stream is written -- replays from storage and then seamlessly tails live writes. Multiple subscribers all receive the same chunks, in order, exactly once.
If a publisher crashes without finishing, a new publisher can start a new version of the same stream. Subscribers always see only the latest version. Streams close deterministically when a terminal chunk (e.g. finish, error) is written.
A DurableStreamService manages named streams. You provide three things:
- Store -- persists chunks to a backend (PostgreSQL, in-memory, or your own). This is how replay works: late-joining subscribers read from the store, then switch to live delivery.
- Notifier (optional) -- broadcasts "chunk persisted" events across service instances. This is how multiple server processes can subscribe to the same stream. Without a notifier, cross-instance subscribers only see chunks on replay, not in real time.
- Protocol -- defines which chunk types are terminal. When a terminal chunk is written, the stream closes and all subscribers complete.
Each stream has exactly one publisher and any number of subscribers.
npm install monostreamimport { DurableStreamService } from "monostream";
import { MemDurableStreamStore } from "monostream/mem";
type Chunk = { type: "text-delta"; delta: string } | { type: "finish" };
const store = new MemDurableStreamStore();
const svc = new DurableStreamService<Chunk>({
store,
notifier: store,
protocol: {
isTerminal: (chunk) => chunk.type === "finish",
},
});
// publish
const pub = await svc.createPublisher("my-stream", 0);
await pub.write({ type: "text-delta", delta: "Hello " });
await pub.write({ type: "text-delta", delta: "world!" });
await pub.write({ type: "finish" });
// subscribe (replays from store, then tails live)
for await (const chunk of svc.subscribe("my-stream")) {
console.log(chunk);
}subscribe returns a ReadableStream and accepts an optional AbortSignal. Without one, the subscriber blocks until a terminal chunk arrives -- if the publisher crashes or hangs, that may be indefinitely.
const ac = new AbortController();
for await (const chunk of svc.subscribe("my-stream", ac.signal)) {
console.log(chunk);
}Multiple subscribers all receive the same chunks, whether they joined before, during, or after the stream was written.
const pub = await svc.createPublisher("fanout", 0);
const p1 = svc.subscribe("fanout");
const p2 = svc.subscribe("fanout");
const p3 = svc.subscribe("fanout");
await pub.write({ type: "text-delta", delta: "shared" });
await pub.write({ type: "finish" });
// all three receive: [{ type: "text-delta", delta: "shared" }, { type: "finish" }]Streams are versioned. If a publisher crashes without writing a terminal chunk, a new publisher can start a new version. Subscribers always see only the latest version.
// publisher crashes mid-stream
{
using pub0 = await svc.createPublisher("chat:123", 0);
await pub0.write({ type: "text-delta", delta: "partial..." });
// pub0 is disposed at end of block without writing a terminal chunk
}
// new publisher starts at version 1
const pub1 = await svc.createPublisher("chat:123", 1);
await pub1.write({ type: "text-delta", delta: "retry from scratch" });
await pub1.write({ type: "finish" });
// subscribers see only version 1
for await (const chunk of svc.subscribe("chat:123")) {
console.log(chunk);
}
// { type: "text-delta", delta: "retry from scratch" }
// { type: "finish" }You can also auto-resolve the next version:
const pub = await svc.createPublisher("chat:123", "next");
console.log(pub.version); // max(existing versions) + 1, or 0 if newmonostream/ai provides a pre-configured service and stream adapter for the Vercel AI SDK's UIMessage chunk types. It treats finish, error, and abort as terminal chunks.
import { createAIDurableStreamService, subscribeToUIMessageStream } from "monostream/ai";
import { MemDurableStreamStore } from "monostream/mem";
const store = new MemDurableStreamStore();
const svc = createAIDurableStreamService({ store, notifier: store });
// server: publish AI response chunks
const pub = await svc.createPublisher("chat:123:stream", "next");
// client: subscribe as a UIMessage stream
const uiStream = subscribeToUIMessageStream(svc, "chat:123:stream");For production use, persist chunks to PostgreSQL and use LISTEN/NOTIFY for real-time cross-instance delivery.
import pg from "pg";
import { drizzle } from "drizzle-orm/node-postgres";
import { DurableStreamService } from "monostream";
import { DrizzleDurableStreamStore } from "monostream/drizzle";
import { PgDurableStreamNotifier } from "monostream/pg";
const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
const db = drizzle({ client: pool });
const store = new DrizzleDurableStreamStore(db);
const notifier = new PgDurableStreamNotifier(pool);
const svc = new DurableStreamService({
store,
notifier,
protocol: {
isTerminal: (chunk) => chunk.type === "finish" || chunk.type === "error",
},
});The Drizzle adapter expects a stream_chunk table:
CREATE TABLE stream_chunk (
stream_id TEXT NOT NULL,
version BIGINT NOT NULL DEFAULT 0,
id BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
chunk JSONB NOT NULL,
PRIMARY KEY (stream_id, version, id)
);The Drizzle schema is exported for use with drizzle-kit migrations:
import { streamChunks } from "monostream/drizzle";A PGlite notifier is also available at monostream/pglite.
The store must satisfy these invariants:
insertChunkmust be idempotent on(streamId, version, id). If a row with the same key already exists, it should be overwritten (upsert).listChunksmust return rows ordered by(version asc, id asc). The remote tailer advances its cursor based on this ordering.getLatestVersionmust return the highest version number across all chunks for a given stream, ornullif none exist.
Implement DurableStreamStore:
import type { DurableStreamStore } from "monostream";
class MyStore implements DurableStreamStore {
async insertChunk(params: {
streamId: string;
version: number;
id: number;
chunk: unknown;
}): Promise<{ id: number }> {
/* ... */
}
async getLatestVersion(streamId: string): Promise<number | null> {
/* ... */
}
async listChunks(query: DurableStreamListChunksQuery): Promise<DurableStreamChunkRow[]> {
/* ... */
}
}The notifier has relaxed delivery guarantees:
notifyChunkPersistedmust make a best-effort attempt to deliver the notification. At-least-once delivery is sufficient -- the service deduplicates and ignores stale or out-of-order notifications.onChunkPersistedmust return an unsubscribe function that stops future deliveries when called.- Notification failures are non-fatal. The service catches errors from
notifyChunkPersistedand emits them as"error"events. Missed notifications are recovered on the next successful one, since the service always fetches all chunks after its current cursor.
Implement DurableStreamNotifier:
import type { DurableStreamNotifier } from "monostream";
class MyNotifier implements DurableStreamNotifier {
async notifyChunkPersisted(notification: DurableStreamNotification): Promise<void> {
/* ... */
}
async onChunkPersisted(
callback: (notification: DurableStreamNotification) => void,
): Promise<() => Promise<void>> {
/* ... */
}
}| Adapter | Store | Notifier | Import |
|---|---|---|---|
| In-memory | Yes | Yes | monostream/mem |
| Drizzle (PostgreSQL) | Yes | - | monostream/drizzle |
| node-postgres | - | Yes | monostream/pg |
| PGlite | - | Yes | monostream/pglite |
bun test --coverage----------------|---------|---------|-------------------
File | % Funcs | % Lines | Uncovered Line #s
----------------|---------|---------|-------------------
All files | 94.44 | 99.58 |
adapter-mem.ts | 85.71 | 98.75 |
index.ts | 97.62 | 100.00 |
protocol-ai.ts | 100.00 | 100.00 |
----------------|---------|---------|-------------------
28 unit tests cover the core publish/subscribe contract: live delivery, late-join replay, fan-out to multiple subscribers, crash recovery across versions, abort semantics, sequential chunk IDs, and stream cleanup.
A continuous fuzzer randomly selects from 9 scenarios (live, replay, crash-replay, crash-live, late-join, mixed cross-instance, abort, pre-aborted, reuse) with randomized chunk counts, payload sizes, subscriber counts, and version histories. Each iteration verifies that all subscribers receive the correct chunks in order and that the store contains exactly the expected rows.
Two backends are fuzzed:
# in-memory (no external dependencies)
bun run fuzz/mem.ts
# against PostgreSQL (spins up a Docker container automatically)
bun run fuzz/pg.tsRuns continuously until interrupted with Ctrl+C. Seeds are deterministic and printed at startup for reproduction:
bun run fuzz/mem.ts --seed 42The PostgreSQL fuzzer has been run for 80 million+ scenarios over ~48 hours with zero failures at a sustained rate of ~460 scenarios/second (see fuzz/pg.log):
[5s] ok=1986 fail=0 rate=396.9/s peak=0.6s heap=16MB
[85s] ok=35976 fail=0 rate=423.1/s peak=0.5s heap=35MB
[3601s] ok=1365256 fail=0 rate=379.1/s peak=0.5s heap=18MB
[86402s] ok=38910097 fail=0 rate=450.3/s peak=0.5s heap=26MB
[174727s] ok=80674106 fail=0 rate=461.7/s peak=0.8s heap=51MB