Skip to content

Commit

Permalink
feat: streaming responses
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkurt committed May 16, 2019
1 parent 7696fa5 commit 8ca8e43
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 8 deletions.
12 changes: 8 additions & 4 deletions packages/core/src/bridge/text-encoding.ts
Expand Up @@ -6,10 +6,14 @@ import { transferInto } from "../utils/buffer"
import { Runtime } from "../runtime"

registerBridge("TextDecoder.decode", async (rt: Runtime, bridge: Bridge, buf: ArrayBuffer, encoding?: string) => {
// const txt = await new TextDecoderProxy(encoding).decode(buf)
const txt2 = Buffer.from(buf).toString(encoding)
// console.log("Got string:", txt.length, txt2.length, txt == txt2, txt[txt.length - 1], txt2[txt2.length - 1])
return txt2
try {
// const txt = await new TextDecoderProxy(encoding).decode(buf)
const txt2 = Buffer.from(buf).toString(encoding)
// console.log("Got string:", txt.length, txt2.length, txt == txt2, txt[txt.length - 1], txt2[txt2.length - 1])
return txt2
} catch (err) {
console.error("crash:", err)
}
})

registerBridge("TextEncoder.encode", async (rt: Runtime, bridge: Bridge, data: string) => {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/server.ts
Expand Up @@ -285,6 +285,7 @@ function handleResponse(rt: Runtime, src: V8ResponseBody, res: http.ServerRespon
}

if (typeof src === "number") {
res.flushHeaders()
return handleResponseStream(rt, src, res, dst)
}

Expand Down
55 changes: 52 additions & 3 deletions packages/core/src/stream_manager.ts
@@ -1,9 +1,10 @@
import { Runtime } from "./runtime"
import { Readable, Writable, PassThrough } from "stream"
import { Readable, Writable, PassThrough, Duplex } from "stream"
import { ivm } from "."
import { transferInto } from "./utils/buffer"
import log from "./log"
import { setTimeout } from "timers"
import { registerBridge } from "./bridge"

export interface StreamInfo {
stream: Readable
Expand Down Expand Up @@ -31,7 +32,10 @@ export const streamManager = {
if (!streamInfo) {
throw new Error(`stream ${key} not found`)
}
return streamInfo.stream
if (streamInfo.stream instanceof Readable) {
return streamInfo.stream
}
throw new Error("Stream is not readable")
},

add(rt: Runtime, stream: Readable, opts: StreamOptions = {}): number {
Expand All @@ -45,6 +49,7 @@ export const streamManager = {
streams[key] = { stream, readLength: 0, addedAt: Date.now(), endedAt: 0, readTimeout }
return id
},

subscribe(rt: Runtime, id: number | string, cb: ivm.Reference<() => void>) {
const key = streamKey(rt, id)
log.debug("stream subscribe id:", id)
Expand Down Expand Up @@ -81,6 +86,7 @@ export const streamManager = {
endStream(key, cb)
})
},

read(rt: Runtime, id: number | string, cb: ivm.Reference<() => void>) {
const key = streamKey(rt, id)
log.debug("stream:read id:", id)
Expand All @@ -96,7 +102,7 @@ export const streamManager = {
function tryRead() {
attempts += 1
try {
const chunk = info.stream.read(1024 * 1024)
const chunk = info.stream.read()
log.debug("chunk is null? arraybuffer?", !chunk, chunk instanceof Buffer)

if (chunk) {
Expand Down Expand Up @@ -161,9 +167,52 @@ export const streamManager = {
info.stream.pipe(stream2)

return [stream1Id, stream2Id]
},
write(rt: Runtime, id: number | string, chunk: any): void {
const key = streamKey(rt, id)
const info = streams[key]
if (!info) {
throw new Error("stream closed, not found or destroyed")
}
if (info.stream instanceof Writable) {
info.stream.write(chunk)
} else {
throw new Error("Stream cannot be written to")
}
},
end(rt: Runtime, id: number | string, chunk: any): void {
const key = streamKey(rt, id)
const info = streams[key]
if (!info) {
throw new Error("stream closed, not found or destroyed")
}
if (info.stream instanceof Writable) {
//@ts-ignore
setImmediate(() => info.stream.end(chunk))
} else {
throw new Error("Stream cannot be written to")
}
}
}

registerBridge("stream.create", rt => {
const stream = new PassThrough()
return Promise.resolve(streamManager.add(rt, stream))
})

registerBridge("stream.push", (rt, _, id: number | string, chunk: any) => {
return Promise.resolve(streamManager.write(rt, id, chunk))
})

registerBridge("stream.end", (rt, _, id: number | string, chunk: any) => {
try {
streamManager.end(rt, id, chunk)
} catch (err) {
console.error("stream end failed:", err)
}
return Promise.resolve(true)
})

function streamKey(rt: Runtime, id: number | string) {
return `${rt.app.name}:${id}`
}
Expand Down
1 change: 1 addition & 0 deletions packages/v8env/package.json
Expand Up @@ -22,6 +22,7 @@
"doc": "./publish-docs.sh"
},
"dependencies": {
"@stardazed/streams-text-encoding": "^1.0.2",
"better-assert": "^1.0.2",
"cookie": "^0.3.1",
"css-select": "^1.3.0-rc0",
Expand Down
33 changes: 32 additions & 1 deletion packages/v8env/src/events.ts
Expand Up @@ -4,7 +4,7 @@
*/
import { logger } from "./logger"
import { EventEmitter2 as EventEmitter } from "eventemitter2"
import refToStream, { isFlyStream, isFlyStreamId } from "./fly/streams"
import refToStream, { isFlyStream, makeFlyStream, writeToFlyStream, endFlyStream } from "./fly/streams"

declare var bridge: any

Expand Down Expand Up @@ -134,9 +134,14 @@ export function fireFetchEvent(url, req, body, callback) {
}

let b = null
let streamID: number | undefined

if (isFlyStream(res.body)) {
b = res.body.flyStreamId
} else if (res.bodySource instanceof ReadableStream) {
// get response id
b = streamID = makeFlyStream()
// streaming happens below, we just need an ID to send to the bridge
} else {
logger.debug("body source type:", res.bodySource.constructor.name)
if (typeof res.bodySource === "string") {
Expand All @@ -156,6 +161,32 @@ export function fireFetchEvent(url, req, body, callback) {
}),
b
])

// if body is a stream, send it over
if (streamID) {
// stream to bridge
logger.debug("sending stream response")
const stream: ReadableStream = res.bodySource
const reader = stream.getReader()
try {
while (true) {
const { done, value } = await reader.read()
if (!done && value) {
writeToFlyStream(streamID, value)
} else if (done) {
//pushToFlyStream(streamID, null)
endFlyStream(streamID, value)
break
}
}
} finally {
try {
// Might throw if the reader is still locked because we finished
// successfully without breaking or throwing.
await stream.cancel()
} catch {}
}
}
}
)
const fn = emitter.listeners("fetch").slice(-1)[0]
Expand Down
16 changes: 16 additions & 0 deletions packages/v8env/src/fly/streams.ts
Expand Up @@ -24,6 +24,22 @@ export function isFlyStreamId(arg: any): arg is FlyStreamId {
return typeof arg === "number"
}

export function makeFlyStream(): FlyStreamId {
const id = bridge.dispatchSync("stream.create")
if (typeof id === "number") {
return id as FlyStreamId
}
throw new Error("Failed to get stream ID")
}

export function writeToFlyStream(id: FlyStreamId, chunk: any) {
return bridge.dispatchSync("stream.push", id, chunk)
}

export function endFlyStream(id: FlyStreamId, chunk: any) {
return bridge.dispatchSync("stream.end", id, chunk)
}

/**
* @hidden
*/
Expand Down
3 changes: 3 additions & 0 deletions packages/v8env/src/index.ts
Expand Up @@ -17,6 +17,7 @@ import { URL, URLSearchParams } from "./url"
import { Headers } from "./headers"

import { TextEncoder, TextDecoder } from "./text-encoding"
import { TextEncoderStream, TextDecoderStream } from "@stardazed/streams-text-encoding"
import { fetch, TimeoutError } from "./fetch"
import Body from "./body_mixin"
import Blob from "./blob"
Expand Down Expand Up @@ -60,6 +61,8 @@ global.bootstrap = function bootstrap() {
TransformStream,
TextEncoder,
TextDecoder,
TextEncoderStream,
TextDecoderStream,
Headers,
Request,
Response,
Expand Down
1 change: 1 addition & 0 deletions packages/v8env/src/text-encoding.ts
Expand Up @@ -16,6 +16,7 @@ export class TextDecoder {
this.encoding = encoding
}
public decode(input) {
if (!input) return ""
return bridge.dispatchSync("TextDecoder.decode", input, this.encoding)
}
}
31 changes: 31 additions & 0 deletions tests/edge-apps/http-js/body.js
Expand Up @@ -13,5 +13,36 @@ fly.http.respondWith(async request => {
return new Response(`res1: ${await res1.text()}\nres2: ${await res2.text()}`)
}

if (url.pathname.startsWith("/stream")) {
let count = 0
let timeout
const write = function(controller) {
controller.enqueue(`chunk ${count++}\n`)
if (count < 3) {
timeout = setTimeout(() => write(controller), 1000)
} else {
controller.close()
}
}
// make a stream that waits 1s before writing
const b = new ReadableStream({
start(controller) {
timeout = setTimeout(() => write(controller), 1000)
},
cancel() {
if (timeout) {
clearImmediate(timeout)
}
}
})
return new Response(b, {
headers: {
"content-type": "text/html",
streaming: "true",
"transfer-encoding": "chunked"
}
})
}

return new Response(await request.text(), {})
})
13 changes: 13 additions & 0 deletions tests/edge-apps/http-js/body.tests.ts
Expand Up @@ -22,4 +22,17 @@ describe.each(["edge.local", "origin.local"])("Request body to %s", host => {
expect(response.status).toEqual(200)
expect(await response.text()).toEqual(`res1: hellohello\nres2: hellohello`)
})

test("streaming", async () => {
const start = new Date().getTime()
const response = await fetch(`http://${host}/stream`)
const firstResponse = new Date().getTime()
expect(firstResponse - start).toBeLessThan(1000)
expect(response.status).toEqual(200)

const body = await response.text()
expect(body).toContain("chunk 0\n")
expect(body).toContain("chunk 1\n")
expect(body).toContain("chunk 2\n")
})
})
5 changes: 5 additions & 0 deletions yarn.lock
Expand Up @@ -1028,6 +1028,11 @@
universal-user-agent "^2.0.0"
url-template "^2.0.8"

"@stardazed/streams-text-encoding@^1.0.2":
version "1.0.2"
resolved "https://registry.yarnpkg.com/@stardazed/streams-text-encoding/-/streams-text-encoding-1.0.2.tgz#a5a303fc8dfb18144e0aeed0773e7fb132f6c4cd"
integrity sha512-f2Z15BId3t44a/u21yYSGXFAkCyKocmAyduoAy7swnZ4xIfbaZlOWsgly/jDNNOuj6hYQN72UaBRe3Z/tOHfqg==

"@types/better-sqlite3@^3.1.3":
version "3.1.3"
resolved "https://registry.yarnpkg.com/@types/better-sqlite3/-/better-sqlite3-3.1.3.tgz#cd74eb09393ccc06014b8d9bae4f8d5e4b99c5fb"
Expand Down

0 comments on commit 8ca8e43

Please sign in to comment.