Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sse): sse()-id not propagated #5819

Merged
merged 14 commits into from
Jun 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/next-sse-chat/.env.example
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
POSTGRES_URL=postgres://postgres:adminadmin@0.0.0.0:5432/db
POSTGRES_URL=postgres://postgres:@0.0.0.0:5432/sse-chat
NEXTAUTH_URL=http://localhost:3000
NEXTAUTH_SECRET=secret
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ export function useWhoIsTyping(channelId: string) {
trpc.channel.whoIsTyping.useSubscription(
{ channelId },
{
onData(event) {
setCurrentlyTyping(event.data);
onData(list) {
setCurrentlyTyping(list);
},
},
);
Expand Down
2 changes: 1 addition & 1 deletion examples/next-sse-chat/src/app/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export default async function Home() {
<br />
<a
className="text-gray-700 underline dark:text-gray-400"
href="https://github.com/trpc/trpc/tree/05-10-subscriptions-sse/examples/next-sse-chat"
href="https://github.com/trpc/examples-next-sse-chat"
target="_blank"
rel="noreferrer"
>
Expand Down
28 changes: 4 additions & 24 deletions examples/next-sse-chat/src/server/routers/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,38 +96,18 @@ export const channelRouter = {
.input(
z.object({
channelId: z.string().uuid(),
lastEventId: z.string().optional(),
}),
)
.subscription(async function* (opts) {
const { channelId } = opts.input;
let lastEventId = opts?.input?.lastEventId ?? '';

if (!currentlyTyping[channelId]) {
currentlyTyping[channelId] = {};
// emit who is currently typing
if (currentlyTyping[channelId]) {
yield Object.keys(currentlyTyping[channelId]);
}

const maybeYield = function* (who: WhoIsTyping) {
const id = Object.keys(who).sort().toString();
if (lastEventId === id) {
return;
}
yield sse({
id,
data: Object.keys(who).filter(
(user) => user !== opts.ctx.session?.user?.name,
),
});

lastEventId = id;
};

// if someone is typing, emit event immediately
yield* maybeYield(currentlyTyping[channelId]);

for await (const [channelId, who] of ee.toIterable('isTypingUpdate')) {
if (channelId === opts.input.channelId) {
yield* maybeYield(who);
yield Object.keys(who);
}
}
}),
Expand Down
4 changes: 2 additions & 2 deletions packages/client/src/links/httpSubscriptionLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type {
inferClientTypes,
InferrableClientTypes,
MaybePromise,
SSEvent,
SSEMessage,
} from '@trpc/server/unstable-core-do-not-import';
import {
run,
Expand Down Expand Up @@ -85,7 +85,7 @@ export function unstable_httpSubscriptionLink<
};
// console.log('starting', new Date());
eventSource.addEventListener('open', onStarted);
const iterable = sseStreamConsumer<SSEvent>({
const iterable = sseStreamConsumer<Partial<SSEMessage>>({
from: eventSource,
deserialize: transformer.input.deserialize,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EventSourcePolyfill, NativeEventSource } from 'event-source-polyfill';
import SuperJSON from 'superjson';
import type { Maybe } from '../types';
import {
isServerSentEventEnvelope,
isSSEMessageEnvelope,
sse,
sseHeaders,
sseStreamConsumer,
Expand Down Expand Up @@ -258,6 +258,8 @@ test('SSE on serverless - emit and disconnect early', async () => {
Object {
"lastEventId": null,
"written": Array [
": connected
",
"

",
Expand All @@ -280,6 +282,8 @@ test('SSE on serverless - emit and disconnect early', async () => {
Object {
"lastEventId": "2",
"written": Array [
": connected
",
"

",
Expand Down Expand Up @@ -311,5 +315,5 @@ test('sse()', () => {
id: 1,
data: { json: 1 },
});
expect(isServerSentEventEnvelope(event)).toBe(true);
expect(isSSEMessageEnvelope(event)).toBe(true);
});
93 changes: 45 additions & 48 deletions packages/server/src/unstable-core-do-not-import/stream/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ import { createReadableStream } from './utils/createReadableStream';
type Serialize = (value: any) => any;
type Deserialize = (value: any) => any;

interface SSEventWithId {
/**
* Server-sent Event Message
* @see https://html.spec.whatwg.org/multipage/server-sent-events.html
* @public
*/
export interface SSEMessage {
/**
* The data field of the message - this can be anything
*/
Expand All @@ -18,43 +23,34 @@ interface SSEventWithId {
* Passing this id will allow the client to resume the connection from this point if the connection is lost
* @see https://html.spec.whatwg.org/multipage/server-sent-events.html#the-last-event-id-header
*/
id: string | number;
/**
* Event name for the message
*/
event?: string;
/**
* A comment for the event
*/
comment?: string;
id: string;
}

/**
* Server-sent Event
* @see https://html.spec.whatwg.org/multipage/server-sent-events.html
* @public
*/
export type SSEvent = Partial<SSEventWithId>;

const sseSymbol = Symbol('SSEventEnvelope');
export type ServerSentEventEnvelope<TData> = [typeof sseSymbol, TData];
const sseSymbol = Symbol('SSEMessageEnvelope');
export type SSEMessageEnvelope<TData> = [typeof sseSymbol, TData];

/**
* Produce a typed server-sent event
* Produce a typed server-sent event message
*/
export function sse<TData extends SSEvent>(
event: ValidateShape<TData, SSEvent>,
): ServerSentEventEnvelope<TData> {
export function sse<TData extends SSEMessage>(
event: ValidateShape<TData, SSEMessage>,
): SSEMessageEnvelope<TData> {
if (event.id === '') {
// This could be removed by using different event names for `yield sse(x)`-emitted events and `yield y`-emitted events
throw new Error(
'`id` must not be an empty string as empty string is the same as not setting the id at all',
);
}
return [sseSymbol, event as TData];
}

export function isServerSentEventEnvelope<TData extends SSEvent>(
export function isSSEMessageEnvelope<TData extends SSEMessage>(
value: unknown,
): value is ServerSentEventEnvelope<TData> {
): value is SSEMessageEnvelope<TData> {
return Array.isArray(value) && value[0] === sseSymbol;
}

export type SerializedSSEvent = Omit<SSEvent, 'data'> & {
export type SerializedSSEvent = Omit<SSEMessage, 'data'> & {
data?: string;
};

Expand Down Expand Up @@ -92,12 +88,19 @@ export interface SSEStreamProducerOptions {
*/
emitAndEndImmediately?: boolean;
}

type SSEvent = Partial<
SSEMessage & {
comment: string;
event: string;
}
>;
/**
*
* @see https://html.spec.whatwg.org/multipage/server-sent-events.html
*/
export function sseStreamProducer(opts: SSEStreamProducerOptions) {
const stream = createReadableStream<SerializedSSEvent>();
const stream = createReadableStream<SSEvent>();
stream.controller.enqueue({
comment: 'connected',
});
Expand Down Expand Up @@ -157,13 +160,11 @@ export function sseStreamProducer(opts: SSEStreamProducerOptions) {

const value = next.value;

const data: SSEvent = isServerSentEventEnvelope(value)
? value[1]
const chunk: SSEvent = isSSEMessageEnvelope(value)
? { ...value[1] }
: {
data: value,
};
const chunk: SerializedSSEvent = {};
Object.assign(chunk, data);
if ('data' in chunk) {
chunk.data = JSON.stringify(serialize(chunk.data));
}
Expand All @@ -187,7 +188,7 @@ export function sseStreamProducer(opts: SSEStreamProducerOptions) {
});

return stream.readable.pipeThrough(
new TransformStream<SerializedSSEvent, string>({
new TransformStream<SSEvent, string>({
transform(chunk, controller) {
if ('event' in chunk) {
controller.enqueue(`event: ${chunk.event}\n`);
Expand All @@ -198,12 +199,15 @@ export function sseStreamProducer(opts: SSEStreamProducerOptions) {
if ('id' in chunk) {
controller.enqueue(`id: ${chunk.id}\n`);
}
if ('comment' in chunk) {
controller.enqueue(`: ${chunk.comment}\n`);
}
controller.enqueue('\n\n');
},
}),
);
}
export type inferSSEOutput<TData> = TData extends ServerSentEventEnvelope<
export type inferSSEOutput<TData> = TData extends SSEMessageEnvelope<
infer $Data
>
? $Data
Expand All @@ -220,25 +224,18 @@ export function sseStreamConsumer<TData>(opts: {
const { deserialize = (v) => v } = opts;
const eventSource = opts.from;

const stream = createReadableStream<SerializedSSEvent>();
const stream = createReadableStream<MessageEvent>();

const transform = new TransformStream<
SerializedSSEvent,
inferSSEOutput<TData>
>({
const transform = new TransformStream<MessageEvent, inferSSEOutput<TData>>({
async transform(chunk, controller) {
if (chunk.data) {
const def: SSEvent = {};
def.data = deserialize(JSON.parse(chunk.data));
if ('id' in chunk) {
def.id = chunk.id;
}
if ('event' in chunk) {
def.event = chunk.event;
}
const def: Partial<SSEMessage> = {
data: deserialize(JSON.parse(chunk.data)),
};

controller.enqueue(def as inferSSEOutput<TData>);
if (chunk.lastEventId) {
def.id = chunk.lastEventId;
}
controller.enqueue(def as inferSSEOutput<TData>);
},
});

Expand Down
11 changes: 8 additions & 3 deletions packages/tests/server/httpSubscriptionLink.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const ctx = konn()
let idx = opts.input.lastEventId ?? 0;
while (true) {
yield sse({
id: idx,
id: String(idx),
data: idx,
});
idx++;
Expand Down Expand Up @@ -179,12 +179,12 @@ test('disconnect and reconnect with an event id', async () => {
},
]
>();
const onData = vi.fn<{ data: number }[]>();
const onData = vi.fn<{ data: number; id: string }[]>();
const subscription = client.sub.iterableInfinite.subscribe(
{},
{
onStarted: onStarted,
onData: onData,
onData,
},
);

Expand All @@ -200,6 +200,11 @@ test('disconnect and reconnect with an event id', async () => {
expect(onData.mock.calls.length).toBeGreaterThan(5);
});

expect(onData.mock.calls[0]![0]).toEqual({
data: 0,
id: '0',
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was missing

});

expect(ctx.onIterableInfiniteSpy).toHaveBeenCalledTimes(1);

expect(es.readyState).toBe(EventSource.OPEN);
Expand Down
2 changes: 1 addition & 1 deletion www/docs/client/links/httpSubscriptionLink.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const trpcClient = createTRPCClient<AppRouter>({
## Usage

:::tip
For a full example, see [our full-stack SSE example](https://github.com/trpc/next-sse-chat).
For a full example, see [our full-stack SSE example](https://github.com/trpc/examples-next-sse-chat).
:::

### Basic example
Expand Down
2 changes: 1 addition & 1 deletion www/docs/community/awesome-trpc.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ A collection of resources on tRPC.
| **Recommended:** Starter project with Prisma, Next.js, tRPC, E2E-testing | https://github.com/trpc/examples-next-prisma-starter |
| **create-t3-turbo** - Clean and simple starter repo using the T3 Stack along with Expo React Native | http://github.com/t3-oss/create-t3-turbo |
| **create-t3-app** - Scaffold a starter project using the T3 Stack (Next.js, tRPC, Tailwind CSS, Prisma) | https://create.t3.gg |
| Subscriptions Starter Project using SSE | https://github.com/trpc/next-sse-chat |
| Subscriptions Starter Project using SSE | https://github.com/trpc/examples-next-sse-chat |
| WebSockets Starter Project | https://github.com/trpc/examples-next-prisma-starter-websockets |
| tRPC Kitchen Sink - A collection of tRPC usage patterns. | https://github.com/trpc/examples-kitchen-sink |
| Turborepo + Expo + tRPC Starter | https://github.com/gunnnnii/turbo-expo-trpc-starter |
Expand Down
Loading