Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

What is the expected behaviour of .pipeThrough()? #765

Closed
guest271314 opened this issue Aug 19, 2017 · 15 comments
Closed

What is the expected behaviour of .pipeThrough()? #765

guest271314 opened this issue Aug 19, 2017 · 15 comments

Comments

@guest271314
Copy link

According to the specification

3.2.4.4. pipeThrough({ writable, readable }, options)

The pipeThrough method provides a convenient, chainable way of piping this readable stream through a transform stream (or any other { writable, readable } pair). It simply pipes the stream into the writable side of the supplied pair, and returns the readable side for further use.

Seeking clarification as to what the above paragraph means for practical usage.

Specifically, the literal text appears to suggest that the writable side of the object would be processed first.

Perhaps have a general misinterpretation of the intended usage of the .pipeThrough() method, here.

Given

let _chunk;
let [writable, readable] = [
  new WritableStream({
    write(chunk) {
        console.log(chunk);
        _chunk = chunk.map(c => c * 10);
        return _chunk
      },
      close() {
        console.log("done")
      }
  })
, new ReadableStream({
    pull(c, data) {
      console.log(c, _chunk); // `_chunk` is `undefined`
    }
  })
];

let rs = new ReadableStream({
    pull(c) {
      c.enqueue([1, 2, 3]);
      c.close()
    }
  });
  
  rs.pipeThrough({
    writable, readable
  });

the expected result initially here was that writable would be processed first, though readable is processed first.

If the specification is taken literally, how do we access the data transformed at writable side at the readable side of the pair?

Can we return data from a WritableStream to a ReadableStream?

Can we create a basic example of the practical intended and correct usage of the pipeThrough method; both as the current implementation at browsers exist, and, if applicable, how the current implementation at browsers differs from what the specification authors and contributors are ultimately working towards implementing at browsers?

@domenic
Copy link
Member

domenic commented Aug 19, 2017

If you want to know about the precise behavior, you should read the spec, not the note boxes that are intended as a gentle introduction.

The spec is very simple: it is the four steps at https://streams.spec.whatwg.org/#rs-pipe-through

Please let us know if you have any questions after reading and understanding the spec.

@guest271314
Copy link
Author

guest271314 commented Aug 19, 2017

@domenic At this point, no, still do not understand the pattern. Can you illuminate the errors of usage at OP so can correct?

@domenic
Copy link
Member

domenic commented Aug 19, 2017

Can you explain what you mean by "processed first" then?

@guest271314
Copy link
Author

guest271314 commented Aug 19, 2017

@domenic

It simply pipes the stream into the writable side of the supplied pair, and returns the readable side for further use.

From an evidently naive interpretation the data from the ReadableStream is available first at writable then at readable properties passed to .pipeThrough().

What does returns mean within the specification? Do we specifically return one or more values from write(){} at writable?

What are examples of decompressorTransform and ignoreNonImageFilesTransform at https://streams.spec.whatwg.org/#example-pipe-chain?

@domenic
Copy link
Member

domenic commented Aug 19, 2017

I don't know what "available first" means. Code would help, with expected results and actual results.

@domenic
Copy link
Member

domenic commented Aug 19, 2017

Oh, I think I missed your // _chunk is undefined comment.

The issue is that pull() is called whenever a readable stream is created. If you have no data to give back, then don't call controller.enqueue(). But you always need to be prepared for pull() being called.

@guest271314
Copy link
Author

guest271314 commented Aug 19, 2017

@domenic http://plnkr.co/edit/uqbvxUQtjXEpcPYQjD2i?p=preview Why is readable logged at console before writable?

Expected result is to return data from writable, possible transformed, receive transformed data at readable, which can be reached at a possible chained .pipeTo().

How do we defer .enqueue() call?

This is what mean by request for basic working example. For those who may interpret specification literally when trying to comprehend how to properly use ReadableStream and WritableStream.

@domenic
Copy link
Member

domenic commented Aug 19, 2017

Probably you didn't see my above comment, but it's the same reason: because pull() is called as soon as the readable stream is created. Just remove the pipeThrough line entirely and that becomes clear.

@guest271314
Copy link
Author

guest271314 commented Aug 19, 2017

@domenic So do we utilize .getReader(), .read() at both original and, or, readable ReadableStreams
to defer the call to pull? What is the correct usage for the expected result? ReadableStream -> within .pipeThrough -> writable -> readable gets data from writable?

@domenic
Copy link
Member

domenic commented Aug 19, 2017

No. pull() will always be called as long as the current size of the queue is less than the high water mark. You can set the high water mark to zero if you really want to avoid pull() calls, but that's not recommended. It's better to just write a well-behaved pull() that can be called any time. Trying to control when it's called is not a good idea here.

@guest271314
Copy link
Author

guest271314 commented Aug 19, 2017

@domenic Then what am missing, here? Should just continue to try different configuration on own? Or avoid using .pipeThrough() until a basic example is available somewhere in the wild that can find through searching?

Have no experience using high water mark and also have questions about CountQueuingStrategy and ByteLengthQueuingStrategy though do not want to bombard the board with a bunch of questions that you and your colleagues might view as trivial and basic.

@domenic
Copy link
Member

domenic commented Aug 19, 2017

So the basic problem is that manually creating a transform stream is very hard. You may enjoy looking through https://github.com/whatwg/streams/blob/master/reference-implementation/lib/transform-stream.js, which we're attempting to clean up and eventually standardize and ship. When it's ready, you'll be able to create something simple like

const { readable, writable } = new TransformStream({
  transform(chunk) {
    return chunk.map(c => c * 10);
  }
});

Until then, properly understanding how to get data from the writable side to the readable side of a transform is tricky. Your version has a variety of bugs, but even after I cleaned them up, it turns out it doesn't work, because pull() is called when there is no chunk written to the writable side, and is not called after that, so it never makes it to the readable side. Here is a demo if this not working well: http://jsbin.com/yowoqozavo/1/edit?html,console

Here is a version that works, using a very different technique: http://jsbin.com/hicawawizo/edit?html,console

I don't really recommend manually creating transform streams yourself. Manually creating { readable, writable } pairs makes sense for some cases, known as "duplex" streams, where there are two separate I/O channels: see e.g. https://streams.spec.whatwg.org/#example-both. But for transformations of data, you'll really want to use transform-stream.js, and eventually, the browser's built-in TransformStream.

@domenic
Copy link
Member

domenic commented Aug 19, 2017

No. close() is only present if you want to perform special actions in response to closing.

@ricea
Copy link
Collaborator

ricea commented Aug 21, 2017

Returning to the original question, pipeThrough is approximately equivalent to

function pipeThrough({writable, readable}) {
  this.pipeTo(writable);
  return readable;
}

Key to this discussion is that pipeThrough() does not create any link between readable and writable.

@ricea ricea added the question label Aug 21, 2017
@guest271314
Copy link
Author

guest271314 commented Aug 21, 2017

@ricea Was able to achieve requirement using .pipeTo() with a variation of @domenic 's code at http://jsbin.com/hicawawizo/edit?html,console

function pipeThrough(rs) {
  "use strict";
  console.clear();
  let readable = new ReadableStream();
  let writable = new WritableStream({
    write(chunk) {
        readable = new ReadableStream({
          pull(c) {
            c.enqueue(chunk.map(n => n * 10));
            c.close();
          }
        })
      },
      close(c) {
        console.log("done")
      }
  });

  return ("body" in rs ? rs.body : rs).pipeTo(writable).then(() => {
    console.log("really done");
    return readable
  });
}


  pipeThrough(new Response(new ReadableStream({
    pull(c) {
      c.enqueue([1, 2, 3]);
      c.close()
    }
  })))
  .then(ts => {
    let stream = ts.getReader();
    let fn = ({value,done}) => {
      if (done) return stream.closed.then(() => ({done}));
      console.log(value, done);
      return stream.read().then(fn)
    }
    return stream.read().then(fn)
  })
  .then(done => console.log(done));

and by compiling the files at https://github.com/whatwg/streams/blob/master/reference-implementation/lib/ into a single file and using .pipeThrough(), with the adjustment of using .enqueue() instead of return within transform function


  const {
    readable, writable
  } = new TransformStream({
    transform(chunk, c) {
      console.log(chunk, c);
      c.enqueue(chunk.map(n => n * 10));
    }
  });

  let arr = new Uint8Array([...Array(100).keys()]);
  console.log(arr)
  const response = new Response(arr);
  const reader = response.body.getReader();
  const rs = new ReadableStream({
    pull(c) {
      reader.read().then(function fn({
       done, value
      }) {
        console.log(value);
        if (done) return;
        c.enqueue([...value]);
        return reader.read().then(fn);
      })
    }
  });

  rs
    .pipeThrough({
      readable, writable
    })
    .getReader()
    .read()
    .then(({
      value, done
    }) => console.log(value));

plnkr http://plnkr.co/edit/knPGThLufvNms4KrRDJB?p=preview

@domenic domenic closed this as completed Sep 8, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

3 participants