Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream example #34

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 31 additions & 0 deletions examples/stream/index.js
@@ -0,0 +1,31 @@
'use strict';

const Piscina = require('../../dist/src');
const { resolve } = require('path');
const { MessageChannel } = require('worker_threads');
const PortDuplex = require('./port_duplex');
const { pipeline, Writable } = require('stream');

const piscina = new Piscina({
filename: resolve(__dirname, 'worker.js')
});

class W extends Writable {
length = 0;
_write (chunk, encoding, callback) {
this.length += chunk.length;
callback();
}
};

(async function () {
const channel = new MessageChannel();
const duplex = new PortDuplex(channel.port2, { writable: false });
const w = new W();

duplex.on('close', () => channel.port2.close());

pipeline(duplex, w, () => console.log(w.length));

await piscina.runTask({ port: channel.port1 }, [channel.port1]);
})();
58 changes: 58 additions & 0 deletions examples/stream/port_duplex.js
@@ -0,0 +1,58 @@
'use strict';

const { Duplex } = require('stream');

class PortDuplex extends Duplex {
#port = undefined;

constructor (port, options) {
const {
readable = true,
writable = true
} = { ...options };
if (typeof readable !== 'boolean') {
throw new TypeError('readable must be a boolean');
}
if (typeof writable !== 'boolean') {
throw new TypeError('writable must be a boolean');
}
super({ autoDestroy: true, readable, writable });
this.#port = port;
this.#port.onmessage = PortDuplex.#onmessage.bind(this);
}

_write (chunk, encoding, callback) {
// Chunk should always be a buffer here
const temp = new Uint8Array(chunk);
// TODO(@jasnell): This will need backpressure implemented
this.#port.postMessage(temp, [temp.buffer]);
callback();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling there should be some sort of backpressure built in.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely it should

}

_read () {
// Do nothing here. A more complete example would
// implement proper read/pause behavior.
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I’ve thought about porting the code from our internal Worker stdio implementation to npm. This is definitely giving more motivation for that :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely would be good I think. I'm considering the possibility of Piscina providing a more complete version of PortDuplex out of the box.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasnell The thing is, in the end streaming use cases for Workers are even rarer than use cases for Workers in general. I don’t think we should encourage this pattern if users don’t really need it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's absolutely fair. The key reason I'm thinking through this is to explore the various good and bad ways of using the workers and what the various caveats are.


_destroy (err, callback) {
if (err) {
this.#port.close();
callback(err);
return;
}
if (this.writableEnded) {
this.#port.postMessage(null);
}
callback();
}

static #onmessage = function ({ data }) {
if (data != null) {
this.push(data);
} else {
this.push(null);
}
};
}

module.exports = PortDuplex;
20 changes: 20 additions & 0 deletions examples/stream/worker.js
@@ -0,0 +1,20 @@
'use strict';

const PortDuplex = require('./port_duplex');
const { createReadStream } = require('fs');
const { pipeline } = require('stream');
const { createGzip } = require('zlib');

module.exports = ({ port }) => {
return new Promise((resolve, reject) => {
const input = createReadStream(__filename);
const stream = new PortDuplex(port, { readable: false });
pipeline(input, createGzip(), stream, (err) => {
if (err) {
reject(err);
return;
}
resolve();
});
});
};