Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider SSE to enable server->client http subscriptions #544

Open
delaneyj opened this issue Jun 16, 2021 · 35 comments · May be fixed by #5713
Open

Consider SSE to enable server->client http subscriptions #544

delaneyj opened this issue Jun 16, 2021 · 35 comments · May be fixed by #5713
Labels
✅ accepted-PRs-welcome Feature proposal is accepted and ready to work on 💸 Get paid! I'll send you money through your GitHub sponsors for addressing this

Comments

@delaneyj
Copy link

delaneyj commented Jun 16, 2021

Note from @KATT : the core team is not looking at solving this ourselves, but happy to receive PR and provide some guidance to potential contributors.


Reading through the #532 PR it occurred to me that the JSON-RPC notification system only works one way, from client to server. Most of the time you want it to be from server to client. Although nothing about the spec says which side is which, in a browser this basically means the browser is the client.

One interesting approach could be to use Server Sent Events. This would allow for stateless load balance-able subscriptions that could work with HTTP2. From what I'm seeing though I'm not sure if it would technically fall under a JSON-RPC complaint spec; though you probably can treat as a server to client notification. Nice thing is this is already built into browsers with polyfills available for node and most languages.

I think you'd create/cancel as you do currently but the client would also fetch on a new route /subscription/3nxu4ex234. Know this is separate from the websocket implementation but SSE isn't a well known. The major downside of the SSE spec as it stands is the text only protocol (compared to binary for websockets) but in a JSON-RPC context the point is moot.

TRP-63

Funding

  • You can sponsor this specific effort via a Polar.sh pledge below
  • We receive the pledge once the issue is completed & verified
Fund with Polar
@KATT
Copy link
Member

KATT commented Jun 16, 2021

SSE should be possible to add alongside WebSockets. I prefer WS myself as I don't have to think about max concurrent connection limit and my application doesn't have user numbers that make it hard to scale anyway.

The way our HTTP subscriptions work right now is through long polling and you pass a cursor of the current state and server let's the request hang until there's new data / times out. Inspect https://chat.trpc.io/ to see how the long polling works.

@delaneyj
Copy link
Author

I too love websockets, when they work. At scale have had issues with OOM memory causing a host of issues. The major issues at scale have been token based access timers, sticky load balancing, if a server goes down it takes everyone with it. I'll dig into the examples more but this was just to plant the idea in your head. 😃

@KATT
Copy link
Member

KATT commented Jun 16, 2021

Isn't scaling with SSE similar though, or am I missing something? It's still a persistent connection between a client and server(s) that also needs to be scaled.

@KATT
Copy link
Member

KATT commented Jun 16, 2021

Scaling is not my area of expertise, but with WS and the current setup it should scale seamlessly on things like k8s/render.com - we'll listen to a SIGTERM and broadcast a reconnect notification to all clients which will create an overlapping connection and send a ReconnectError to any listeners.

SIGTERM

process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});

ReconnectError

test('ability to do do overlapping connects', async () => {
const { client, close, wsClient, ee, wssHandler, wss } = factory();
ee.once('subscription:created', () => {
setImmediate(() => {
ee.emit('server:msg', {
id: '1',
});
});
});
function createSub() {
const onNext = jest.fn();
const onError = jest.fn();
const onDone = jest.fn();
const unsub = client.$subscription('onMessage', undefined, {
onNext,
onError,
onDone,
});
return { onNext, onDone, onError, unsub };
}
const sub1 = createSub();
await waitFor(() => {
expect(sub1.onNext).toHaveBeenCalledTimes(2);
});
wssHandler.broadcastReconnectNotification();
await waitFor(() => {
expect(sub1.onError.mock.calls[0][0].originalError).toBeInstanceOf(
ReconnectError,
);
expect(wss.clients.size).toBe(2);
});
const sub2 = createSub();
await waitFor(() => {
expect(sub2.onNext).toHaveBeenCalledWith({
type: 'started',
});
});
sub1.unsub();
await waitFor(() => {
expect(wss.clients.size).toBe(1);
});
wsClient.close();
close();
});

@delaneyj
Copy link
Author

If you are handling reconnect that's great, the SSE spec has it built in. Since you are relying on ws it could be more reliable than the previous an earlier socket.io based approach.

@delaneyj
Copy link
Author

I'll close for now, will do more testing with the websocket approach until I have better reason to seek alternatives.

@thelinuxlich
Copy link

graphql-yoga 2 added subscriptions exclusively through SSE, since HTTP/2 you don't have to care about max concurrent connections as it was in the past

@KATT
Copy link
Member

KATT commented Apr 25, 2022

I'll reopen this and see if someone wants to work on it for the next major :)

@KATT KATT reopened this Apr 25, 2022
@KATT KATT added next-major/maybe Things that are nice to have in the next version 💸 Get paid! I'll send you money through your GitHub sponsors for addressing this labels Apr 25, 2022
@neronim1141
Copy link

Im writing map app for one game, where data for map tiles is send by HTTP request, and i want to update the client map in real time,
i managed to do that with graphql-yoga and SSE because they are operating on one node process,
where your example needs to setup additional process for websockets, which works between trpc requests, but i cant manage it to work when http tries to send data to trpc subscriptions, and while i love how everything else turned out, this one thing totally block me from converting to trpc, I may be still to inexperienced and dont know a way to get it to work like that

@punkpeye
Copy link

Very similar situation to @neronim1141 Had everything migrated and then realized SSE is not working.

@LucasAlda
Copy link

It would be really awesome to have SSE. It is awesome for situations where you want to give some partial responses without setting a full and expensive WS. Really wanting this to be a thing

@AlaaZorkane
Copy link

I believe that a lot of cases don't really need the bidirectional communication nowadays - SSE is a perfect fit and plays well with HTTP/2 - A lot of the graphql community is also leaning toward this approach of HTTP/2 queries/mutations combined with SSE.

A good read on the matter: https://wundergraph.com/blog/deprecate_graphql_subscriptions_over_websockets

Would really like to see this worked on in the context of tRPC 🙏

@elderapo
Copy link

Before going with SSE I suggest reading https://dev.to/miketalbot/server-sent-events-are-still-not-production-ready-after-a-decade-a-lesson-for-me-a-warning-for-you-2gie. It describes pitfall that cannot be solved on code level but instead on infrastructure one - to which we (devs) don't always have full access to.

@thelinuxlich
Copy link

It can be solved on your side, you close connections going to proxy servers that doesn't support HTTP/2 or higher.

@elderapo
Copy link

It can be solved on your side, you close connections going to proxy servers that doesn't support HTTP/2 or higher.

Huh? Then the TRPC client is never going to receive subscription events?

@thelinuxlich
Copy link

No, you close after the event is sent

@thelinuxlich
Copy link

Look at this: https://mercure.rocks/

@fyyyyy
Copy link

fyyyyy commented Feb 27, 2023

Good comparison of polling vs SSE vs web sockets ( at scale )
https://www.youtube.com/watch?v=6QnTNKOJk5A

@partmor
Copy link

partmor commented Mar 24, 2023

Isn't scaling with SSE similar though, or am I missing something? It's still a persistent connection between a client and server(s) that also needs to be scaled.

One huge advantage about using SSE over WS (assuming you only need server->client events, not bidirectional) is that you don’t need to spin up an additional server. This is a big deal in terms of infra complexity.

And you’re not using a different protocol.

@carere
Copy link

carere commented Apr 6, 2023

@KATT Hello, first of all, thanks for this awesome package !!

I'll be happy to try implementing SSE in TRPC, do you have some insight or starting point in order to implement such feature ??
When using SSE, we need to first send headers (with specific values) to keep connection open. It's seems that's it's the main problem with TRPC. I tried to create an endpoint with TRPC and fastify-sse and i received the error, that headers were already sent.

Anyway, would be happy to help 😃

@leonwilly
Copy link

I know @carere volunteered to tackle this but we couldn't wait at my company. So I went ahead and implemented this. I'll be submitting a P.R here shortly.

@yacoubb
Copy link

yacoubb commented Apr 22, 2023

@leonwilly looking forward to it!

@tarekwiz
Copy link

@leonwilly Should we expect that anytime soon? No pressure just tryna get an ETA :)

@leonwilly
Copy link

Hopefully end of this sprint (Monday) I'm swamped right now.

@jaivinwylde
Copy link

@leonwilly when you have time it would be amazing to get that pr, it would be great for the trpc community.

@KATT KATT added ✅ accepted-PRs-welcome Feature proposal is accepted and ready to work on linear and removed 💬 discussion next-major/maybe Things that are nice to have in the next version labels May 24, 2023
@KATT KATT changed the title Consider SSE to enable server->client http subscriptions [TRP-63] Consider SSE to enable server->client http subscriptions May 25, 2023
@KATT KATT added linear and removed linear labels May 25, 2023
@KATT KATT changed the title [TRP-63] Consider SSE to enable server->client http subscriptions Consider SSE to enable server->client http subscriptions May 25, 2023
@StringKe
Copy link

I'd like to help test the SSE functionality, any progress so far?

@OutdatedVersion
Copy link

OutdatedVersion commented Jun 2, 2023

Hey all! I was in a similar situation as leonwilly and implemented this for work. Bare-boned and extracted its available at https://github.com/OutdatedVersion/trpc-sse-link.

recording.mp4

Since I've been telling myself I'd figure out the best way to get this into mainline tRPC ... for a month please anyone feel free to take any of that code for a PR. ❤️

@timcole
Copy link

timcole commented Jun 3, 2023

Nice work @OutdatedVersion this is good stuff! I've been waiting/wanting SSE in tRPC for a while and this is an amazing start, however, I'm not sure a stream for every subscription like this is the best approach. Instead, I would expect a shared stream that has an accompanied endpoint to post sub/unsub topics is a better approach. This accompanied endpoint would just be handled by useSubscription or whatnot still.

Similar to how Twitter web client connects to https://api.twitter.com/live_pipeline/events which starts an eventstream with an optional topic parameter of initial topics, then posts to https://api.twitter.com/1.1/live_pipeline/update_subscriptions with a comma-separated list sub_topics and unsub_topics to adjust the listening topics.

There are many reasons why a single stream is best here but one main reason is the 100 connections per domain default limit with HTTP/2 and 6 connections on HTTP/1.1. If you have multiple things that need events and user has more than 1 tab open you'll hit the limit pretty quickly with multiple streams per event.

@matannahmani
Copy link

matannahmani commented Jun 4, 2023

@OutdatedVersion did you manage to make it work on Router handler (Next 13 app -> API).
I can't manage to convert it
https://github.com/OutdatedVersion/trpc-sse-link/blob/main/example/src/pages/api/trpc/%5Btrpc%5D.ts

the event's arent sent and request finishes immediately here is my attempt:

const handler = async (
  req: Request,
  context: {
    params: { trpc: string | string[] };
  }
) => {
  if (Array.isArray(context.params.trpc)) {
    return fetchRequestHandler({
      endpoint: "/api/trpc",
      req,
      router: appRouter,
      createContext: createTRPCContext,
      onError:
        env.NODE_ENV === "development"
          ? ({ path, error }) => {
              console.error(
                `❌ tRPC failed on ${path ?? "<no-path>"}: ${error.message}`
              );
            }
          : undefined,
    });
  }
  // @ts-expect-error @todo to fix it
  const procedure = appRouter?._def?.procedures?.[context.params.trpc] as
    | AnyProcedure
    | undefined;
  if (req.method === "GET" && procedure?._def.subscription) {
    const resHeaders = new Headers();
    const ctx = await createTRPCContext({
      req,
      resHeaders,
    });
    // Create a TransformStream for writing the response as the tokens as generated
    const stream = new TransformStream();
    const writer = stream.writable.getWriter();
    try {
      // TODO: support POST
      // https://github.com/trpc/trpc/blob/7ad695ea33810a162808c43b6fba1fb920e05325/packages/server/src/http/resolveHTTPResponse.ts#L25
      // TODO https://github.com/trpc/trpc/blob/7ad695ea33810a162808c43b6fba1fb920e05325/packages/server/src/http/resolveHTTPResponse.ts#L141-L145
      // https://gist.github.com/OutdatedVersion/8ea31e6790d6514094487e2f76e1b652
      const urlParams = new URL(req.url).searchParams;
      const inputParams = urlParams.get("input");
      const input = inputParams ? JSON.parse(inputParams) : undefined;

      const call = {
        type: "subscription",
        ctx,
        path: context.params.trpc,
        input,
        rawInput: input,
      } as const;

      const res = await procedure(call);
      if (!isObservable(res)) {
        // eslint-disable-next-line @typescript-eslint/no-unsafe-call
        await writer.close();
        throw new Error(`subscription must return observable`);
      }

      // https://github.com/trpc/trpc/blob/7ad695ea33810a162808c43b6fba1fb920e05325/packages/server/src/http/resolveHTTPResponse.ts#L189-L193
      const subscription = res.subscribe({
        next(value) {
          // https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events
          console.log("server subscription next", value);
          void writer.write(`event:data\ndata: ${JSON.stringify(value)}\n\n`);
        },
        error(err) {
          console.log("server subscription error", err);
          void writer.abort(err);
          subscription.unsubscribe();
        },
        complete() {
          console.log("server subscription complete");
          void writer.close();
          subscription.unsubscribe();
        },
      });
      subscription.unsubscribe();

      // req.on("close", () => {
      //   console.log("unsubscribe: req closed");
      //   subscription.unsubscribe();
      // });
      // req.on("end", () => {
      //   console.log("unsubscribe: req end");
      //   subscription.unsubscribe();
      // });
      // req.on("error", () => console.log("req error"));
      // req.on("pause", () => console.log("req paused"));
    } catch (error) {
      // https://github.com/trpc/trpc/blob/7ad695ea33810a162808c43b6fba1fb920e05325/packages/server/src/http/resolveHTTPResponse.ts#L198-L202
      console.error("Uncaught subscription error", error);
      void writer.abort(error);
    }
    return new Response(stream.readable, {
      status: 200,
      headers: {
        Connection: "keep-alive",
        "Cache-Control": "no-cache, no-transform",
        "Content-Type": "text/event-stream;charset=utf-8",
        "Access-Control-Allow-Origin": "*",
      },
    });
  }
  return fetchRequestHandler({
    endpoint: "/api/trpc",
    req,
    router: appRouter,
    createContext: createTRPCContext,
    onError:
      env.NODE_ENV === "development"
        ? ({ path, error }) => {
            console.error(
              `❌ tRPC failed on ${path ?? "<no-path>"}: ${error.message}`
            );
          }
        : undefined,
  });
};

@StringKe
Copy link

StringKe commented Jun 5, 2023

@OutdatedVersion unsubscription doesn't actually stop the Observable from running, does it?

@matannahmani
Copy link

matannahmani commented Jun 5, 2023

For people who may be interested I managed to make a production-ready version of SSE using app dir route handlers.
You can view the code here:
https://github.com/matannahmani/ada-trpc-sse-link
https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/trpc/stream-link.ts
https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/trpc/client.ts
https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/app/api/trpc/%5Btrpc%5D/route.ts
Or view live here (integration using LangChain and TRPC SSE LINK):
https://preview.im-ada.ai

@StringKe
Copy link

StringKe commented Jun 6, 2023

For people who may be interested I managed to make a production-ready version of SSE using app dir route handlers. You can view the code here: https://github.com/matannahmani/ada-ai-prototype https://github.com/matannahmani/ada-ai-prototype/blob/main/src/trpc/stream-link.ts https://github.com/matannahmani/ada-ai-prototype/blob/main/src/trpc/client.ts https://github.com/matannahmani/ada-ai-prototype/blob/main/src/app/api/trpc/%5Btrpc%5D/route.ts Or test the demo here (It's not perfect but working quite well): https://ada-ai-prototype.vercel.app/

Will SSE continue to run on the back end when the user actively shuts it down?

@matannahmani
Copy link

matannahmani commented Jun 6, 2023

@StringKe to my understanding yes, as I know so far AbortSignal is bugged/doesn't exist on the next 13.4.4 app route API handlers, I also wrote it down on one of the comments in the code.
@see vercel/next.js#48682

@marschr
Copy link

marschr commented Jul 21, 2023

For people who may be interested I managed to make a production-ready version of SSE using app dir route handlers. You can view the code here: https://github.com/matannahmani/ada-ai-prototype https://github.com/matannahmani/ada-ai-prototype/blob/main/src/trpc/stream-link.ts https://github.com/matannahmani/ada-ai-prototype/blob/main/src/trpc/client.ts https://github.com/matannahmani/ada-ai-prototype/blob/main/src/app/api/trpc/%5Btrpc%5D/route.ts Or test the demo here (It's not perfect but working quite well): https://ada-ai-prototype.vercel.app/

Hey @matannahmani , I was using your repo implementation of tRPC + SSE as reference for a tRPC + SSE + svelte version, but since a few hours ago the repo is gone and all the links are broken, is there any way so we still have access for it as a reference? Thanks.

@matannahmani
Copy link

matannahmani commented Jul 22, 2023

@marschr for sure, here is a new repo dedicated to SSE-Link, I will try to make some work during this weekend to showcase ways you can use it and ways we are using it (LangChain integration).
for now, you can see a snapshot from my earliest stable POC.
https://github.com/matannahmani/ada-trpc-sse-link
https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/trpc/stream-link.ts
https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/trpc/client.ts
https://github.com/matannahmani/ada-trpc-sse-link/blob/main/src/app/api/trpc/%5Btrpc%5D/route.ts
Or view live here (integration using LangChain and TRPC SSE LINK)
https://preview.im-ada.ai

on a side note I will try to make a pull request to trpc main repo with The link and docs so people can view in the future for reference instead of having to search the issues every time, I see the progress on the SSELink is halted #4477

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
✅ accepted-PRs-welcome Feature proposal is accepted and ready to work on 💸 Get paid! I'll send you money through your GitHub sponsors for addressing this
Projects
Status: Potential breaking changes