A utility library providing a powerful wrapper around TypedEventEmitter. It enhances standard event emitters with promise-based event waiting, cancellation support, and a fluent API.
- Promise-based Event Handling: Wait for specific events using
waitFor(event), returning a Promise so you canawaitit. - Cancellation: Built-in
AbortControllersupport to cancel pending operations withcancel(). - Type Safety: Built on top of
typed-emitterfor full TypeScript support. - Fluent API: Chainable
on,off, andoncemethods. - Completion Helpers:
finish()anddone()methods to easily await the end of a stream.
import { createStream } from "@feelai/stream";
import { EventEmitter } from "events";
import TypedEventEmitter from "typed-emitter";
// Define your events
type MyEvents = {
data: (chunk: string) => void;
error: (err: Error) => void;
end: () => void;
};
// Create a typed emitter
const myEmitter = new EventEmitter() as TypedEventEmitter<MyEvents>;
// Wrap it with createStream
const stream = createStream(myEmitter);
// Using standard event listeners (chainable)
stream.on("data", (chunk) => console.log("Received:", chunk)).once("end", () => console.log("Stream ended"));
// Using async/await
async function processStream() {
try {
// Wait for a specific event
const [chunk] = await stream.waitFor("data");
console.log("First chunk:", chunk);
// Wait for stream completion
await stream.finish();
console.log("Done!");
} catch (err) {
console.error("Stream error or cancelled:", err);
}
}
processStream();
// Cancellation
// stream.cancel(); // This will reject any pending waitFor promisesCreates a new Stream instance wrapping the provided TypedEventEmitter.
constructor(emitter: TypedEventEmitter<EM>)on(event, listener): Register a listener. Returnsthis.off(event, listener): Remove a listener. Returnsthis.once(event, listener): Register a one-time listener. Returnsthis.waitFor(event): Returns aPromisethat resolves with the event arguments when the event is emitted. Rejects if the stream emits "error" or is cancelled.cancel(): Aborts the internal controller and rejects all pendingwaitForpromises.finish()/done(): Returns aPromisethat resolves when the "end" event is emitted.