Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSE cleanup function never runs #654

Closed
pawelblaszczyk5 opened this issue Jan 17, 2023 · 3 comments
Closed

SSE cleanup function never runs #654

pawelblaszczyk5 opened this issue Jan 17, 2023 · 3 comments

Comments

@pawelblaszczyk5
Copy link
Contributor

Hello,
Basically I'm trying to implement something similar to this but in Solid Start. There is a built in function eventStream, but the cleanup function never runs.

It seems like problem with some internals, because the exact same function in Remix works without problems. I tried to debug why the abort controller assigned to request is never aborted, with no success unfortunately.

Here is a reproduction https://github.com/pawelblaszczyk5/solid-start-sse

@lordnox
Copy link

lordnox commented Mar 7, 2023

After I could not find a working version to implement SSE I found this issue.

Here is another instance where the code is not working as expected:

        const interval = setInterval(() => {
          send('chat', 'Hi :-D ' + x++)
          console.log(x)
        }, 2000)
        return () => {
          console.log('disconnect')
          clearInterval(interval)
        }

With that change the file looks like this: https://gist.github.com/lordnox/39691516d24d90ee5d92c8279cb18c19

The issue is that the eventStream function is never receiving any signal that indicates the connection to the client closes. eventStream seems to expect that the abortSignal is triggered. Somewhere upstream this signal should be triggered.

Anther possibility would be to extend Request with an eventHandler to get informed when the request closes. That would move away from the WebAPI Request however…

Deno is triggering the cancel-method of the ReadableStream. Maybe we could allow the API Handler to except a ReadableStream as well. This would maybe allow solid-start to call the cancel-method.

These are just some ideas

@peerreynders
Copy link

Workaround. See also: SolidStart SSE Demo

Obstacles (on Node.js):

  • Internally the SolidStart Server copies the native Node.js request to an undici Request.
    Consequently events like the request close event don't propagate to the copied request and while the (SolidStart) server can open an event stream, it has no idea when the client closes it.
  • undici never supported SSE in the first place.

TL;DR:

  • In development configure Vite with middleware that tags all text/event-stream requests with an ID header and attaches a close handler that dispatches an event through a BroadcastChannel to the SolidStart server (which runs on a separate thread). Any subscribers to that ID are then notified that client closed the request.
  • In production patch dist/server.js to include the same middleware in polka's middleware chain (no need for a BroadcastChannel).

Vite configuration:

// file: vite.config.ts
import solid from "solid-start/vite";
import solidStartSsePlugin from './src/server/solid-start-sse-plugin';
import { defineConfig } from "vite";

export default defineConfig(({ mode }) => ({
  plugins: [
    solidStartSsePlugin(), 
    solid()
  ],
  define: {
    'process.env.NODE_ENV': JSON.stringify(mode),
  }
}));

The Plugin

// file: src/server/solid-start-sse-plugin
import { solidStartSseSupport } from './solid-start-sse-support';

import type { ViteDevServer } from 'vite';

export default function solidStartSsePlugin() {
  return {
    name: 'solid-start-sse-support',
    configureServer(server: ViteDevServer) {
      // Pre-internal middleware here:
      server.middlewares.use(solidStartSseSupport);

      // Post internal middleware should be registered
      // in a returned thunk, e.g.:
      // return () => {
      //   server.middlewares.use(middleware);
      // };
      return;
    },
  };
}

Middleware and Event Handling

// file: src/server/solid-start-sse-support

function solidStartSseSupport(
  request: http.IncomingMessage,
  _response: http.ServerResponse,
  next: NextFunction
) {
  const accept = request.headers.accept;
  if (
    request.method !== 'GET' ||
    !accept ||
    0 > accept.indexOf('text/event-stream')
  )
    return next();

  // tag request with a unique header
  // which will get copied
  const id = nanoid();
  request.headers[SSE_CORRELATE] = id;

  // send event when request closes
  const close = () => {
    request.removeListener('close', close);
    sendEvent(id, REQUEST_CLOSE);
  };
  request.addListener('close', close);

  return next();
}
Full Listing
// file: src/server/solid-start-sse-support
import { nanoid } from 'nanoid';

import type http from 'node:http';

// track closed requests

let lastPurge = performance.now();
const closedIds = new Map<string, number>();

function purgeClosedIds(now: number) {
  const cutOff = now - 120_000; // 2 minutes
  if (lastPurge > cutOff) return;

  for (const [id, time] of closedIds) if (time < cutOff) closedIds.delete(id);

  lastPurge = now;
}

function addClosedId(id: string) {
  const now = performance.now();
  purgeClosedIds(now);
  closedIds.set(id, now);
}

const REQUEST_CLOSE = {
  source: 'request',
  name: 'close',
} as const;

type Info = typeof REQUEST_CLOSE;
type Notify = (n: Info) => void;

const subscribers = new Map<string, Set<Notify>>();

function removeSubscriber(id: string, notify: Notify) {
  const all = subscribers.get(id);
  if (!all) return false;

  const result = all.delete(notify);
  if (all.size < 1) subscribers.delete(id);

  return result;
}

function addSubscriber(id: string, notify: Notify) {
  const remove = () => removeSubscriber(id, notify);
  const found = subscribers.get(id);
  if (found) {
    found.add(notify);
    return remove;
  }

  subscribers.set(id, new Set<Notify>().add(notify));
  return remove;
}

function notifySubscribers(id: string, info: Info) {
  const all = subscribers.get(id);
  if (!all) return;

  for (const notify of all) notify(info);

  if (info.name === 'close') {
    subscribers.delete(id);
    addClosedId(id);
  }
}

const SSE_CORRELATE = 'x-solid-start-sse-support';
const channel = process.env.NODE_ENV?.startsWith('dev')
  ? new BroadcastChannel('solid-start-sse-support')
  : undefined;

type EventInfo = {
  id: string;
  info: Info;
};
let receive: (event: MessageEvent<EventInfo>) => void | undefined;
let listening = false;

// Start listening as soon as possible
function listen() {
  if (channel && !receive) {
    receive = (event: MessageEvent<EventInfo>) =>
      notifySubscribers(event.data.id, event.data.info);

    channel.addEventListener('message', receive);
  }
  listening = true;
}

function subscribe(request: Request, notify: Notify) {
  if (!listening)
    throw Error(
      'Call `listen()` at application start up to avoid missing events'
    );

  const id = request.headers.get(SSE_CORRELATE);
  if (!id) return;
  if (closedIds.has(id)) return;

  return addSubscriber(id, notify);
}

export type EventStreamInit<T> = (
  send: (event: string, data: T) => void
) => () => void;

function eventStream<T>(request: Request, init: EventStreamInit<T>) {
  const stream = new ReadableStream({
    start(controller) {
      const encoder = new TextEncoder();
      const send = (event: string, data: T) => {
        controller.enqueue(encoder.encode('event: ' + event + '\n'));
        controller.enqueue(encoder.encode('data: ' + data + '\n\n'));
      };
      let unsubscribe: (() => boolean) | undefined = undefined;

      let cleanup: (() => void) | undefined = init(send);
      const close = () => {
        if (!cleanup) return;
        cleanup();
        cleanup = undefined;
        unsubscribe?.();
        controller.close();
      };

      unsubscribe = subscribe(request, (info) => {
        if (info.source === 'request' && info.name === 'close') {
          close();
          return;
        }
      });

      if (!unsubscribe) {
        close();
        return;
      }
    },
  });

  return new Response(stream, {
    headers: { 'Content-Type': 'text/event-stream' },
  });
}

// --- Middleware ---

function sendEvent(id: string, info: Info) {
  if (!channel) {
    notifySubscribers(id, info);
    return;
  }

  channel.postMessage({
    id,
    info,
  });
}

type NextFunction = (err?: unknown) => void;

function solidStartSseSupport(
  request: http.IncomingMessage,
  _response: http.ServerResponse,
  next: NextFunction
) {
  const accept = request.headers.accept;
  if (
    request.method !== 'GET' ||
    !accept ||
    0 > accept.indexOf('text/event-stream')
  )
    return next();

  // tag request with a unique header
  // which will get copied
  const id = nanoid();
  request.headers[SSE_CORRELATE] = id;

  // send event when request closes
  const close = () => {
    request.removeListener('close', close);
    sendEvent(id, REQUEST_CLOSE);
  };
  request.addListener('close', close);

  return next();
}

// Want to protect middleware from tree shaking
declare global {
  var __no_tree_shaking: Record<string, unknown> | undefined;
}

if (globalThis.__no_tree_shaking) {
  globalThis.__no_tree_shaking.solidStartSseSupport = solidStartSseSupport;
} else {
  globalThis.__no_tree_shaking = { solidStartSseSupport };
}

export { eventStream, listen, solidStartSseSupport };

Example use

In src/components/sse-counter.tsx:

// file: src/components/sse-counter.tsx
import {
  createContext,
  createEffect,
  useContext,
  createSignal,
  onCleanup,
  type ParentProps,
} from 'solid-js';
import server$, { ServerFunctionEvent } from 'solid-start/server';
import { formatTime } from '~/helpers';

// --- START server side ---

import { eventStream } from '~/server/solid-start-sse-support';
// NOTE: call `listen()` in `entry-server.tsx`

async function connectServerSource(this: ServerFunctionEvent) {
  const init = (send: (event: string, data: string) => void) => {
    const interval = setInterval(() => {
      send('message', formatTime(new Date()));
    }, 1000);

    return () => {
      console.log('disconnect');
      clearInterval(interval);
    };
  };

  return eventStream(this.request, init);
}

// --- END server side ---

const [serverTime, setServerTime] = createSignal(formatTime(new Date()));

const ServerTimeContext = createContext(serverTime);

let started = false;

function startServerTime() {
  if (started) return;

  const handle = server$(connectServerSource);
  const href = handle.url;

  // Runs only once but also registers for clean up
  createEffect(() => {
    const onMessage = (message: MessageEvent<string>) => {
      setServerTime(message.data);
    };
    const eventSource = new EventSource(href);
    eventSource.addEventListener('message', onMessage);

    onCleanup(() => {
      eventSource.removeEventListener('message', onMessage);
      eventSource.close();
    });
  });

  started = true;
}

function ServerTimeProvider(props: ParentProps) {
  startServerTime();

  return (
    <ServerTimeContext.Provider value={serverTime}>
      {props.children}
    </ServerTimeContext.Provider>
  );
}

const useServerTime = () => useContext(ServerTimeContext);

export { ServerTimeProvider, useServerTime };

Production patch

Just a postbuild script in the package.json:

  "scripts": {
    
    "postbuild": "sed -i 's/assets_handler).use(comp/assets_handler).use(solidStartSseSupport).use(comp/g' dist/server.js",
    
  },

@ryansolid
Copy link
Member

In setting up for SolidStarts next Beta Phase built on Nitro and Vinxi we are closing all PRs/Issues that will not be merged due to the system changing. If you feel your issue was closed by mistake. Feel free to re-open it after updating/testing against 0.4.x release. Thank you for your patience.

See #1139 for more details.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants