Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

readable and writable stream to duplex stream #2460

Closed
FranckFreiburger opened this issue Feb 4, 2020 · 10 comments
Closed

readable and writable stream to duplex stream #2460

FranckFreiburger opened this issue Feb 4, 2020 · 10 comments
Labels

Comments

@FranckFreiburger
Copy link

  • Node.js Version: 13.6.0
  • OS: win7
  • Scope (install, code, runtime, meta, other?): code
  • Module (and version) (if relevant): stream

Hello,
I try to write a simple function that converts a readable stream and a writable stream into a duplex stream:

const duplexStream = duplexFromStreams(readableStream, writableStream);

I find it is very hard to obtain a satisfying result.
I know that some implementations exists (duplexify, duplexer2), but they don't work properly.
I'm wondering if you can give me some advice on how to do that and what pitfalls to avoid.
Thanks.

@Hakerh400
Copy link

Hakerh400 commented Feb 4, 2020

Try this:

const duplexFromStreams = (rs, ws) => {
  const ds = new class extends stream.Duplex{
    #bufs = [];
    #waiting = false;
    allowHalfOpen = true;
    constructor(){
      super();
      const push = buf => {
        this.#bufs.push(buf);
        if(this.#waiting){
          this.push(this.#bufs.shift());
          this.#waiting = false;
        }
      };
      rs.on('data', buf => push(buf));
      rs.on('end', () => push(null));
    }
    _read(){
      while(this.#bufs.length !== 0)
        if(!this.push(this.#bufs.shift()))
          return;
      this.#waiting = true;
    }
    write(buf, enc, cb){
      return ws.write(buf, enc, cb);
    }
  }
  ws.on('drain', () => ds.emit('drain'));
  return ds;
};

@FranckFreiburger
Copy link
Author

FranckFreiburger commented Feb 5, 2020

Thanks for you code @Hakerh400
I wondering how to forward ws.write() return value (need wait for 'drain' event) to the duplex stream.
ws.write() API is quite confusing, it has a return value and a callback argument.

@Hakerh400
Copy link

I wondering how to forward ws.write() return value

Simply add return statement in _write method (see updated code above).

@FranckFreiburger
Copy link
Author

Yeah, that's what I thought at first, but _write() caller do not use the return value, see:
https://github.com/nodejs/node/blob/018c3e8949e925efc8077801d44c2b2feb974750/lib/_stream_writable.js#L435

@Hakerh400
Copy link

Ok, I see what you mean. If you worry about the drain event, you can use write instead of _write and add drain event listener artificially (see updated code).

@FranckFreiburger
Copy link
Author

FranckFreiburger commented Feb 6, 2020

The following code seems to handle drain properly without the need to redefine write()
(see the end of the following paragraph: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback)

	_write(chunk, encoding, callback) {

		let cb = callback;
		if ( !ws.write(chunk, encoding, () => void cb() ) ) {

			cb = () => {};
			ws.once('drain', callback);
		}
	}

@Hakerh400
Copy link

Hakerh400 commented Feb 6, 2020

That works too. I tried to stream 2.6GB of data. but seems that your method is about ~2 times slower:

original: 25.871s
your:     52.610s

You can speed it up a lot by refactoring the method like this:

_write(chunk, encoding, callback){
  const ok = ws.write(chunk, encoding, () => {ok && callback()});
  if(!ok) ws.once('drain', callback);
}

@FranckFreiburger
Copy link
Author

Here is my final code: https://gist.github.com/FranckFreiburger/9af693b0432d7ee85d4e360e524551dc

...I have to admit that the result is quite similar to duplexer2

@github-actions
Copy link

There has been no activity on this issue for 3 years and it may no longer be relevant. It will be closed 1 month after the last non-automated comment.

@github-actions github-actions bot added the stale label Feb 11, 2023
@github-actions
Copy link

There has been no activity on this issue and it is being closed. If you feel closing this issue is not the right thing to do, please leave a comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants