Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documentation/Examples for streaming requests and responses #5344

Open
dodiameer opened this issue Jul 2, 2022 · 22 comments
Open

Documentation/Examples for streaming requests and responses #5344

dodiameer opened this issue Jul 2, 2022 · 22 comments
Labels
documentation Improvements or additions to documentation
Milestone

Comments

@dodiameer
Copy link

Describe the problem

As of #5291 SvelteKit now supports using ReadableStream to read the request body in endpoints and supports returning it as a response body, however there are currently no examples in the docs on how to do common things, for example reading files submitted from a multipart/form-data as a stream and write them to disk, reading files from disk and returning them as a stream, etc.

Describe the proposed solution

Adding examples for common use-cases of streaming in the docs

Alternatives considered

No response

Importance

nice to have

Additional Information

I would help with this myself, but I honestly couldn't figure it out, and I wouldn't trust the code I write to be up to the standard of the docs. I specifically wanted to handle super-large (over 1gb) file uploads that would be in a multipart/form-data request with other fields but couldn't figure it out.

Also feel like I should mention this: in #3419, Rich mentions having utility functions for handling multipart/octet streams, so if the maintainers are planning on adding them then it would make sense to not add documentation until they are added, as they would simplify any code that could be written without them.

@benmccann benmccann added the documentation Improvements or additions to documentation label Jul 2, 2022
@rmunn
Copy link
Contributor

rmunn commented Jul 4, 2022

Don't have time to write the docs myself, at least right now — but here are a few things to know about readable streams and multipart/form-data:

  1. The ReadableStream interface doesn't provide a read() method directly; instead, you call getReader() to get a Reader interface that has a read() method.
  2. There are two different reader modes, "default" mode (which creates a Buffer object for you) and "bring-your-own-buffer" or "BYOB" mode, in which you're expected to pass a Buffer object to the reader's read() method. The default mode is easier to handle, but the BYOB mode is better for saving memory in some situations. If you already have a Buffer object that you want to put the data into, use BYOB mode, otherwise just use default mode.
  3. Each time you call read(), you get a "chunk". Chunks are not guaranteed to end at a convenient place. They could end right in the middle of a UTF-8 byte sequence, e.g. the last byte of one chunk could be 0xE2, the next chunk could be a single byte containing 0x80, and the last chunk could start with 0xA2. If you assemble those properly, you'll be able to reassemble the 0xE2 0x80 0xA2 sequence, which is UTF-8 for the • character. Do it wrong and you could end up with two or three � characters instead.
  4. When processing multipart/form-data content, you're looking for the sequence CRLF, two hyphens, the boundary marker, optionally two more hyphens to signal "end of multipart data", optionally a sequence of spaces, and another CRLF. So you're looking for either 0x13 0x10 0x2D 0x2D (boundary) (zero or more 0x20) 0x13 0x10, or else 0x13 0x10 0x2D 0x2D (boundary) 0x2D 0x2D (zero or more 0x20) 0x13 0x10 (assuming your incoming data is in UTF-8). Remember that chunks can be split anywhere, so don't assume that the boundary sequence will be found in a single chunk. It could be split across multiple chunks, even.

Whoever produces sample code for this will need to make sure the sample code doesn't fall into the most common pitfall of stream-handling, which is assuming that chunk boundaries will be nice. A chunk boundary that splits a UTF-8 sequence or a multipart/form-data boundary marker is going to cause issues to naive code, and the examples should probably show how to do it right.

@rmunn
Copy link
Contributor

rmunn commented Jul 4, 2022

Having said that, here's a general guideline to handling large file uploads in a form with other data:

  1. Create a stream reader.
  2. Start searching chunks for the boundary sequence (obtained from the headers)
  3. Every time you encounter a CRLF during the search, and you have not found a boundary sequence yet, you can safely consider the data up to that CRLF to be part of one section, rather than a potential boundary, and handle it appropriately (write it to the file, save it as a form value, whatever the previous boundary header said to do).
  4. When you encounter a CRLF followed immediately by a couple of hyphens, don't just blindly compare the next bytes to the boundary sequence. They might be truncated. Instead, start comparing those bytes one-by-one to the boundary sequence, with one of three outcomes:
    1. They match a boundary and are complete. The previous section is complete; you can close the WritableStream to the file, or save the form data, or whatever.
    2. You find a character that doesn't belong to the boundary sequence. Everything from the CRLF to that character is therefore part of the previous section, which is not yet complete. Stop matching against the boundary sequence, add what you've found so far to the previous section, and go back to scanning for CRLF characters.
    3. You run out of characters before completing the boundary sequence. Read the next chunk and continue this step, until either the boundary sequence is a complete match (so the previous section is complete, as in step i) or the boundary sequence fails (in which case this data is part of the previous section, as in step ii).
  5. After a boundary marker is found, you're at the start of a new MIME section, so expect some headers followed by a blank lines. Those headers will tell you what kind of thing is coming up and how to handle it. E.g., if it's a file, create a StreamWriter to write the contents somewhere, and so on.

@Rich-Harris
Copy link
Member

I'd love to use an existing library for this rather than rolling our own multipart form data parser, even if we abstract it somehow. Unfortunately the ones that I know of (busboy, formidable) are wedded to Node AFAICT.

Anyone know of something more modern, that can take a Response object (or a Headers and a ReadableStream, I guess) and do something useful with it?

@half-metal
Copy link

Streaming A Large Dataset With A Pipe To Response Writable would be handy. So documentation with an example of a large file being read by an endpoint, converted to a readable stream, and then being fetched by a page is a common use case, avoiding filling up memory. I tried some usual code that would work in Express or Hyper-Express with piping a readableStream, but I get a dest.on is not a function type error.

TypeError: dest.on is not a function
at Stream.pipe (node:internal/streams/legacy:33:8)

Something like how Express or Hyper-express gives examples for piping would be great.
https://github.com/kartikk221/hyper-express/blob/master/docs/Examples.md

Makes it easy to pipe to a stream to another stream etc. etc.

readStream.pipe(transformStream).pipe(response).on('finish', () => {
                    console.log(`Finished transforming the contents of ${filePath} and piping the output to the response.`);
                });

This seems like a common use case.

@xpluscal
Copy link

I hope this is relevant here.
I am importing/using an npm package on a server endpoint and getting the errors
response.body.getReader is not a function.

From the conversations I see that this might be related here - how would I with the new feature enable the package to use ReadableStream?

@JuliaBonita
Copy link

I've been testing Server Sent Events (SSEs) with Kit recently. It's almost working properly, but the non-streamable Response objects obviously prevent the connections from staying open.

In a comment on a related issue, Rich said SSEs are a special case of ReadableStreams. Now that ReadableStreams appears to be implemented, how can we use this to make SSE connections stay open?

Ideally, we would be able to have access to all the res and req methods so that we can properly detect and respond to stream states, too, but simply keeping the connection open would be a great leap forward.

Thank you @Rich-Harris and the rest of the team for all the work you've done on this project.

@repsac-by
Copy link
Contributor

@JuliaBonita
For SSE it is enough to return ReadableStream as response body. I can give a more complex example based on TransformStream, which subscribes to EventEmitter and unsubscribes when connection is closed.

+server.js

import { createSSE } from './sse';
import { bus } from './bus';

/** @type {import('./$types').RequestHandler} */
export async function GET({ request }) {
	// does not have to be a number, this is just an example
	const last_event_id = Number(request.headers.get('last-event-id')) || 0;

	const { readable, subscribe } = createSSE(last_event_id);

	subscribe(bus, 'time');
	subscribe(bus, 'date');

	return new Response(readable, {
		headers: {
			'cache-control': 'no-cache',
			'content-type': 'text/event-stream',
		}
	});
}

sse.js

export function createSSE(last_id = 0, retry = 0) {
	let id = last_id;
	const { readable, writable } = new TransformStream({
		start(controller) {
			controller.enqueue(': hello\n\n');

			if (retry > 0) controller.enqueue(`retry: ${retry}\n\n`);
		},
		transform({ event, data }, controller) {
			let msg = `id: ${++id}\n`;
			if (event) msg += `event: ${event}\n`;
			if (typeof data === 'string') {
				msg += 'data: ' + data.trim().replace(/\n+/gm, '\ndata: ') + '\n';
			} else {
				msg += `data: ${JSON.stringify(data)}\n`;
			}
			controller.enqueue(msg + '\n');
		}
	});

	const writer = writable.getWriter();

	return {
		readable,
		/**
		 * @param {import('node:events').EventEmitter} eventEmitter
		 * @param {string} event
		 */
		async subscribe(eventEmitter, event) {
			function listener(/** @type {any} */ data) {
				writer.write({ event, data });
			}

			eventEmitter.on(event, listener);
			await writer.closed.catch(() => { });
			eventEmitter.off(event, listener);
		}
	};
}

bus.js

import { EventEmitter } from 'node:events';

export const bus = new EventEmitter();

setInterval(() => {
	bus.emit('time', new Date().toLocaleTimeString());
}, 2e3);

setInterval(() => {
	bus.emit('date', new Date().toLocaleDateString());
}, 5e3);

@JuliaBonita
Copy link

JuliaBonita commented Oct 4, 2022

@repsac-by Thank you very much for your help. I appreciate this and will spend more time learning the Streams API if necessary, but it's a totally different API and much more complex than the simple SSE API. The simplicity of the SSE API for simple server-push applications is the most significant feature that differentiates it from WebSockets. If we have to use the more complicated Streams API in Kit, then it seems we might as well use WebSockets and assume our apps will use bi-directional channels even if we only need data pushed from the server to the client.

My assumption (maybe incorrect) was that Kit would include a way to use the actual SSE API with the same simple functionality that we have with vanilla Node/Express. Your example is excellent for the Streams API, but it still requires a significant learning curve to achieve the equivalent functionality and reliability that is trivially easy to achieve with SSEs, which is sufficient for the vast majority of text-based server-push applications.

So before I go down the more complicated Streams API path, can you please confirm that there is no efficient way to implement SSE in Kit to keep connections open and manage the connections like we can easily do in Node/Express?

@JuliaBonita
Copy link

I just looked at the source code for the Streams API "Hello World" example. It requires 80 lines of code (including markup) just to display "Hello World" on the page. Swimming through all the Streams API docs is a deep rabbit hole. Clearly the Streams API is not ergonomic, which is why very few people actually understand or use it. It's also overkill for virtually all SSE text-based applications.

I bet over 95% of the Svelte community will never take the time to learn the Streams API well enough to use it effectively, which makes Kit much less likely to be used in large-scale apps. So I hope there's a way to use actual SSEs in Kit (including proper event handling on the server like with Express).

@repsac-by
Copy link
Contributor

@JuliaBonita
So, here is the ReadableStream for SSE in a few lines.

export async function GET() {
	const readable = new ReadableStream({
		start(ctr) {
			this.interval = setInterval(() => ctr.enqueue('data: ping\n\n'), 3000);
		},
		cancel() {
			clearInterval(this.interval);
		}
	});

	return new Response(readable, {
		headers: {
			'content-type': 'text/event-stream',
		}
	});
}

@JuliaBonita
Copy link

@repsac-by Thank you. The second example helps to show that the Streams API is basically a transport mechanism to stream any data flow, including higher level protocols like SSEs. This should be enough for me to figure out how to apply it to my app.

In the future, I suspect Svelte devs will ask for a more simplified interface for streams and SSEs like Express provides, but for now, at least I can see a path forward. Thank you.

@voscausa
Copy link

voscausa commented Nov 6, 2022

@repsac-by Thnx. Your "SSE in a few lines" comment helped me to push ticker data from server to client.
More here: https://stackoverflow.com/questions/74330190/how-to-respond-with-a-stream-in-a-sveltekit-server-load-function/74336207#74336207

@repsac-by
Copy link
Contributor

repsac-by commented Nov 6, 2022

@voscausa you forgot something

/** @type {import('./$types').RequestHandler} */
export function GET({ request }) {
+  const ac = new AbortController();

  console.log("GET api: yahoo-finance-ticker")
  const stream = new ReadableStream({
    start(controller) {
      tickerListener.on("ticker", (ticker) => {
        console.log(ticker.price);
        controller.enqueue(String(ticker.price));
-      })
+      }, { signal: ac.signal });
+    },
+    cancel() {
+        ac.abort();
    },
  })

  return new Response(stream, {
    headers: {
      'content-type': 'text/event-stream',
    }
  });
}

@voscausa
Copy link

voscausa commented Nov 7, 2022

OK. Thanks for the update. It solved the exeption when changing page route.

@voscausa
Copy link

voscausa commented Nov 7, 2022

@repsac-by Now it's working fine. I also included abort in the +page.svelte route
But now I do not understand the { signal: ac.signal }) in the api below.

routes/yahoo/+page.svelte

<script>
  import { onDestroy } from "svelte";

  let result = "";
  const ac = new AbortController();
  const signal = ac.signal;

  async function getStream() {
    const pairs = ["BTC-USD", "EURUSD=X"].join(",");
    const response = await fetch(`/api/yahoo-finance-ticker?pairs=${pairs}&logging=true`, {
      signal,
    });
    const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
    while (true) {
      const { value, done } = await reader.read();
      console.log("resp", done, value);
      if (done) break;
      result += `${value}<br>`;
    }
  }
  getStream();

  onDestroy(() => {
    ac.abort();
    console.log("ticker fetch **aborted");**
  });
</script>

routes/api/yahoo-finance-ticker/+server.js

import YahooFinanceTicker from "yahoo-finance-ticker";

/** @type {import('./$types').RequestHandler} */
export function GET({ url }) {

  const ac = new AbortController();
  const ticker = new YahooFinanceTicker();

  ticker.setLogging(Boolean(url.searchParams.get('logging') ?? 'false'));
  const pairs = (url.searchParams.get('pairs') ?? '').split(",");

  console.log("GET api: yahoo-finance-ticker")
  const stream = new ReadableStream({
    start(controller) {
      (async () => {
        const tickerListener = await ticker.subscribe(pairs);
        tickerListener.on("ticker", (ticker) => {
          console.log(ticker.price);
          controller.enqueue(String(ticker.price));
        }, { signal: ac.signal });                       // ?? { signal: ac.signal }   @repsac-by
      })().catch(err => console.error(err));
    },
    cancel() {
      console.log("cancel and abort");
      ticker.unsubscribe();
      ac.abort();
    },
  })

  return new Response(stream, {
    headers: {
      'content-type': 'text/event-stream',
    }
  });
}

@vcheeze
Copy link

vcheeze commented Jan 9, 2023

@voscausa you can read the MDN docs here on the AbortController interface to understand what the signal does. Passing the signal in to your fetch call essentially associates your call with your AbortController instance so that ac.abort() will abort your fetch call.

@david-plugge
Copy link
Contributor

Just found out about unstable_parsemultipartformdata in the remix docs.
Implementation can be found here https://github.com/remix-run/remix/blob/main/packages/remix-server-runtime/formData.ts
Pretty sure that can be used in sveltekit aswell.

@dodiameer
Copy link
Author

unstable_parsemultipartfoemdata looks like what was needed. I'll fiddle around and see if I can come up with something that works

@fnfbraga
Copy link

fnfbraga commented Jun 7, 2023

+1 for this feature

@gterras
Copy link

gterras commented Jul 20, 2023

Would be really great to have a small consensual example here while waiting for an official doc!

@PerchunPak
Copy link

Here is how I get SSE working for me

routes/sse/+server.ts

function delay(ms: number): Promise<void> {
  return new Promise((res) => setTimeout(res, ms));
}

export function GET() {
  const encoder = new TextEncoder();
  const readable = new ReadableStream({
    async start(controller) {
      for (let i = 0; i < 20; i++) {
        controller.enqueue(encoder.encode('hello'));
        await delay(1000)
      }
      controller.close()
    }
  });

  return new Response(readable, {
    headers: {
      'content-type': 'text/event-stream',
    }
  });
}

routes/sse/+page.svelte

<script lang="ts">
  import { onMount } from 'svelte';

  const result: string[] = [];

  async function subscribe() {
    const response = await fetch('/sse');
    const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
    while (true) {
      const { value, done } = await reader.read();
      if (done) break;
      result.push(value)
      result = result
    }
  }

  onMount(subscribe);
</script>

{#each result as str}
  <p>{str}</p>
{/each}

@cleytoncarvalho

This comment was marked as duplicate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

No branches or pull requests