Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transferable streams: the double transfer problem #1063

Open
ricea opened this issue Aug 14, 2020 · 6 comments
Open

Transferable streams: the double transfer problem #1063

ricea opened this issue Aug 14, 2020 · 6 comments

Comments

@ricea
Copy link
Collaborator

ricea commented Aug 14, 2020

The original transferable streams issue has been closed now that support has been landed, however the discussion of the double transfer problem that started at #276 (comment) and consumed the rest of the thread is not concluded.

Summarising the issue, the following code works fine:

const worker = new Worker(`data:text/javascript,onmessage = evt => postMessage(evt.data, [evt.data]);`);
const rs = new ReadableStream();
worker.postMessage(rs, [rs]);
let transferred_rs;
worker.onmessage = evt => { transferred_rs = evt.data; };

However, if you subsequently run worker.terminate() then transferred_rs will start returning errors. For other transferable types, no connection remains with any previous realms that the object was passed through, but in the case of streams, data is still being proxied via the worker.

See the linked thread for why this is hard to fix.

@jimmywarting
Copy link

jimmywarting commented Aug 14, 2020

left a ball of roller coster and created more issues thats need to be fixed, don't sound like an easy task to write spec and working cross multiple threads and pipes 😅

so here is rough workaround for those who stubble up on this issue and seeks for a solution that works right now

MessageChannel solution
// this method pass the readableStream into a messageChannel in order to create a more direct communication

var worker = new Worker(`data:text/javascript,onmessage = evt => postMessage(evt.data, [evt.data]);`);
var rs = new ReadableStream({
  start(c) {
    c.enqueue('a')
    c.enqueue('b')
    c.enqueue('c')
    c.close()
  }
})
// send the transferable readableStream
// to a messageChannel port
var mc = new MessageChannel()
mc.port1.postMessage(rs, [rs])
mc.port1.close()
// post the port instead of the transfered stream
worker.postMessage(mc.port2, [mc.port2])
worker.onmessage = evt => { 
  var port = evt.data
  // now you can terminate the worker since you now got a messageChannel port that you can listen to instead.
  // the web worker don't have anything to do with the transfered readableStream anymore
  worker.terminate()
  port.onmessage = evt => {
    port.close()
    port.onmessage = null
    // sucess
    evt.data.pipeTo(new WritableStream({
      write(x) {
        console.log(x)
      },
      close() {
        console.log('closed')
      }
    }))
  }
}

@alvestrand
Copy link

I think the proper model here is that transferring a ReadableStream transfers the ability to read from the stream - the end that writes to the stream is not transferred, and the connection between the writing end and the reading end remains in place.

Isn't this the same model as for message channesl?

@MattiasBuelens
Copy link
Collaborator

I think the proper model here is that transferring a ReadableStream transfers the ability to read from the stream - the end that writes to the stream is not transferred, and the connection between the writing end and the reading end remains in place.

That's the idea, yes.

The problem is mostly technical: we have to figure out how to make that work. There's a whole discussion on the original issue about all the peculiarities we have to deal with, such as:

  • We not only have to transfer the message port, but we also have to "fast forward" the stream's state to match the original stream's state (things like [[state]], [[storedError]], [[disturbed]],...).
  • The WritableStream may still have chunks in its queue which have not yet been processed by the "cross-realm sink". If the stream is transferred, then we must also transfer this queue, so we don't lose those chunks. (I suppose the same applies to a queued close or abort request.)
  • Similarly, the ReadableStream may have already received chunks from the "cross-realm source" and put them in its own stream.[[controller]].[[queue]] to be read later. Again, if the stream is transferred, then we must also transfer this queue. (I guess [[closeRequested]] also needs to be transferred?)
    • We might be able to avoid this, if we can assert that every chunk we pull from the cross-realm source immediately fulfills a pending read request. In other words, if the [[queue]] of a cross-realm readable stream is always empty.
  • writer.write(), writer.close() and writer.abort() return a promise that resolves when the underlying sink has processed the requested operation. However, if we transfer the WritableStream while those promises are still pending, then we lose access to the message port, so we can no longer communicate with the "real" underlying sink on the other end. We still need to figure out what should happen with these pending promises. Do we leave them pending forever? Resolve them, assuming that "everything will be fine"? Or maybe reject with an error to indicate that the stream was transferred?

@ricea
Copy link
Collaborator Author

ricea commented Nov 16, 2021

Here's a sketch of an approach which reconciles the atomic nature of transfer with the asynchronous nature of streams.

I'm going to talk about the WritableStream case because I think it is the harder of the two.

  1. We give up the nice abstraction of the "cross-realm identity transform" because it makes the following stuff harder. Instead we have separate transfer logic for ReadableStream and WritableStream.
  2. We have a stream with the underlying sink in the original realm O and the transferred writable stream in realm A.
  3. We want to transfer from realm A to realm B.
  4. The atomic part of the transfer works much the same as before (it's a bit ugly because we can no longer use the piping logic to synchronise the states).
  5. The new WritableStream sends a message back to A asking for a new message pipe. It starts queuing writes rather than sending them to A (this may result in backpressure being applied).
  6. The WritableStream in A waits until its write queue is empty, and then forwards the "new message pipe request" back to O.
  7. O constructs a new message pipe and sends it to A, which passes it on to B.
  8. B starts sending writes down the new message pipe directly to O.
  9. B sends a message to A to close itself.

After step 9, realm A has been "unhooked" and can safely be destroyed.

There can be an arbitrary delay for queued writes to complete before A is "unhooked". Maybe we can force the queued chunks from A into O's queue by ignoring backpressure to make this delay as short as possible?

@youennf
Copy link
Contributor

youennf commented Nov 16, 2021

There can be an arbitrary delay for queued writes to complete before A is "unhooked".

That is an ergonomic issue: one calls postMessage, everything is fine so one navigates away thinking everything is good, but too quickly so that the unhooking does not happen.
The current spec approach stays on the safe side so that there is no surprise.

@MattiasBuelens
Copy link
Collaborator

Hmm, delaying the transfer of the queued chunks is indeed quite risky.

While the proposed solution could work for WritableStream, I'm not so sure it'd work for a ReadableStream. The current spec tries to avoid sending chunks from the original stream to the transferred stream that are not yet being requested, so ideally the transferred stream's queue is always empty. However, other spec changes might make it possible for that queue to become non-empty. For example, with #1103, you might do this:

const controller = new AbortController();
const reader = readable.getReader({ signal: controller.signal });
reader.read(); // causes the cross-realm readable to send a "pull" message
controller.abort(); // discards the pending read request
// At this point, we have no pending read requests, but we are already pulling...

// After some time, we receive a "chunk" message and put the chunk in the queue.
// Which means that if you now transfer the stream...
worker.postMessage(readable, { transfer: [readable] });
// ...we have to do something with the chunks in the queue first.

I think it's better if we transfer the entire queue synchronously as part of the transfer steps. In the transfer-receiving steps, we would re-enqueue those chunks with controller.enqueue() and writer.write().

  1. We give up the nice abstraction of the "cross-realm identity transform" because it makes the following stuff harder. Instead we have separate transfer logic for ReadableStream and WritableStream.

So the transfer steps for ReadableStream would acquire a reader, and for WritableStream they would acquire a writer? I think I would like that better than the current solution with pipeTo(), actually. 😛

After step 9, realm A has been "unhooked" and can safely be destroyed.

Is step 9 needed? Can't A close itself immediately after step 7?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

5 participants