-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream API with cancellation #32
Comments
You can implement your own stream wrapper around a class XMLStream extends stream.Writable {
initialize () {
this.sax = new saxes.SaxesParser(
{ fragment: true
})
// todo: handle all your sax events like `onopentag`, etc
}
_write (chunk : string | Buffer, _enc : any, cb : Function | undefined) {
this.sax.write(chunk.toString())
if (typeof cb == "function") cb()
} |
There has been one other request for a streaming API. At that time the person who asked was satisfied with the answer I gave and so it did not go further. Oh, and I've just noticed it was someone working on exceljs! I've thought about working on a streaming API from time to time but there's never been an urgent need for me to have such API so I've never moved forward on it. Anyway, the few times I thought about how I'd do it, I came to the conclusion that the streaming API should be an add-on that can be installed independent of the XML parser itself. The fact of the matter is that there are many situations in which someone may need the parser but not the streaming API. I think all approaches to getting a streaming API would either tie the code to Node.js or tie it to something that replicates Node.js' Providing the API as an add-on could happen by:
Either method works for me, though method 1 would require more coordination and integration with the tools that are already being used for saxes. (E.g. I don't want to add a dependency to Saxes has had some significant changes since version 4. These changes have not been released yet but anybody working on an add-on to saxes should be warned that this is coming up in version 5. The major changes are:
|
Thanks for all of the input! I've put together a small PoC based on the current master of saxes and I've managed to port exceljs over. When are you planning to release v5 / could you publish a pre-release so that I can open a PR against exceljs? Here is the implementation I've used: const {Writable} = require('stream');
const saxes = require('saxes');
module.exports = class SAXStream extends Writable {
initialize() {
this.sax = new saxes.SaxesParser({
fragment: false,
});
}
_write(chunk, _enc, cb) {
this.sax.write(chunk.toString());
if (typeof cb === 'function') cb();
}
}; My immediate question would: can't we use But I've also run into two issues:
saxStream.sax.on('closetag', node => {
switch (node.name) {
case 'worksheet':
saxStream.sax.close();
break;
// ... When there is some version of v5 (even a beta release) on npm, I can open the PR to help reproduce 2. - any idea what might cause it? |
I've just pushed out 5.0.0-rc.1 on the
|
My comment was meant to be a minimal code sample to help get exceljs unstuck and illustrate the concepts of creating your own The project I took the code sample from has some special considerations as it's a persistent TCP-socket API that only emits fragments over the session. Since my project uses TCP there is built in back pressure, however this is a great article on as @lddubeau stated:
|
Thanks @ondreian that helps explain it! For exceljs, we'll always look at complete xml files, so we'll go with the default value. Regarding backpressure, I'm well aware of it and it is one of the main motivating factors of me modernizing exceljs - I'd love to use async iterators :) @lddubeau Unfortunately, I can't use the published image, it contains the examples and typescript source file, but not the built js files.
|
@alubbe Aaaaaargh... I forgot to update the publishing scripts and hilarity... hmm... garbage ensued. rc.2 is now out and I did pull it down to check that it is not complete garbage like rc.1 was. Sorry about that. |
Awesome - and here is the PoC: exceljs/exceljs#1127 |
A quick update - I have not been able to investigate the hanging issue, but instead have refactored everything to use async iterators using this new helper class: class SAXStream extends Stream.Transform {
constructor() {
super({readableObjectMode: true});
this._error = null;
this.saxesParser = new SaxesParser();
this.saxesParser.on('error', error => {
this._error = error;
});
for (const event of EVENTS) {
if (event !== 'ready' && event !== 'error' && event !== 'end') {
this.saxesParser.on(event, value => {
this.push({event, value});
});
}
}
}
_transform(chunk, _encoding, callback) {
// TODO: Ensure we handle back-pressure!
this.saxesParser.write(chunk.toString());
// saxesParser.write and saxesParser.on() are synchronous,
// so we can only reach the below line once all event handlers
// have been called
callback(this._error);
}
_final(callback) {
this.saxesParser.close();
callback();
}
}; With this, all issues have disappeard and I'm really happy with how code that uses this class looks. try {
for await (const {event, value} of saxStream) {
if (event === 'opentag') {
this.parseOpen(value);
} else if (event === 'text') {
this.parseText(value);
} else if (event === 'closetag') {
if (!this.parseClose(value.name)) {
return this.model;
}
}
}
// Do something after everything has been parsed
} catch (err) {
// handle the error
} I don't think a streaming interface for In any case, looking forward to seeing the v5 release :) |
I've slightly simplified my implementation so that we don't need let Stream = require('stream');
const {SaxesParser, EVENTS} = require('saxes');
// Backwards compatibility for earlier node versions and browsers
if (!Stream.Readable || typeof Symbol === 'undefined' || !Stream.Readable.prototype[Symbol.asyncIterator]) {
Stream = require('readable-stream');
}
module.exports = class SAXStream extends Stream.Transform {
constructor() {
super({readableObjectMode: true});
this.saxesParser = new SaxesParser();
this.saxesParser.on('error', error => {
this.destroy(error);
});
for (const event of EVENTS) {
if (event !== 'ready' && event !== 'error' && event !== 'end') {
this.saxesParser.on(event, value => {
this.push({event, value});
});
}
}
}
_transform(chunk, _encoding, callback) {
this.saxesParser.write(chunk.toString());
// saxesParser.write and saxesParser.on() are synchronous,
// so we can only reach the below line once all event handlers
// have been called
callback();
}
_final(callback) {
this.saxesParser.close();
callback();
}
}; |
More great news, exceljs got almost 4 times faster using saxes instead of sax 🎉 exceljs/exceljs#1139 Any ETA on releasing v5? |
@alubbe Sorry for the silence. Been spread very thin lately. Happy to learn about the speed improvement.
Right but the code you quoted ignores the return value of Regarding this:
Right. I would accept a PR that provides such interface, along with documentation, and comprehensive tests for it. I had earlier mentioned moving saxes to a monorepo structure with a main package that contains only the parser and a package that contains the stream interface... but upon further reflection I don't think that's necessary. The streaming interface could be added as an additional module. Those folks who only need the parser would continue doing what they've been doing all along. Those folks who need the streaming API would refer to the new module. v5 could go out by the end of the week. If there's a PR submitted for the streaming API, then it could be released as 5.1. It is not a breaking change so it does not require bumping the major number. |
That has been bothering me as well, but the control flow gets really complicated when accounting for it because all of the However, as I was benchmarking the usage of Keep in mind that what happens underneath is that a single 16kb xml string (we get these asynchronously from a async stream) turns into multiple saxesparser events (these are all sync underneath). The current implementation spits out the saxesparser events one-by-one. Investigating this led me to the idea that maybe we should be honest of the fact that saxesparser is sync and leverage the associated performance by instead, for every call to for await (const events of saxStream) {
for (const {eventType, value} of events) { // <- this line is new
if (eventType === 'opentag') {
this.parseOpen(value);
} else if (eventType === 'text') {
this.parseText(value);
} else if (eventType === 'closetag') {
if (!this.parseClose(value.name)) {
return this.model;
}
}
}
} module.exports = class SAXStream extends Stream.Transform {
constructor(eventTypes) {
super({readableObjectMode: true});
this.events = []; // <- this line is new
this.saxesParser = new SaxesParser();
this.saxesParser.on('error', error => {
this.destroy(error);
});
for (const eventType of eventTypes) {
if (eventType !== 'ready' && eventType !== 'error' && eventType !== 'end') {
this.saxesParser.on(eventType, value => {
this.events.push({eventType, value}); // <- pushes to this.events instead
});
}
}
}
_transform(chunk, _encoding, callback) {
this.saxesParser.write(chunk.toString());
// saxesParser.write and saxesParser.on() are synchronous,
// so we can only reach the below line once all events have been emitted
const events = this.events; // <- this line is new
this.events = []; // <- this line is new
callback(null, events); // <- we emit just a single async 'data' event now
}
_final(callback) {
this.saxesParser.close();
callback();
}
}; It's less ergonomic but much more honest and simpler to maintain. And I believe (again, might be wrong), that using |
Indeed saxes is not pausable so when there's backpressure downstream you need to buffer the events produced by saxes. They have to be buffered somewhere. I can see the appeal of buffering them downstream instead of in your own Transform. I don't know what the practical consequences are... except that if everybody in a pipeline passes the buck to the next guy downstream then you can end up with backpressure being ignored by the whole pipeline. The latest implementation you show with I've given some thought to adding a |
I agree - so should we go with |
This statement of mine was misleading:
In the earlier implementation, backpressure is partially ignored. It is ignored in the sense that if the reading end of the transform signals that it does not want more data while a chunk is being processed, then that signal is ignored during the processing of the chunk. However, the next call to So I do not think the earlier implementation what was passing events one by one to the reading side is as problematic as I made it out to be. We both agree that the data needs to be buffered somewhere so there's no great benefit to paying attention to the return value of Users of
As a user I would prefer getting one event at a time rather than a bunch of events. I've seen the issue you've opened with async iteration and streams. Is the difference in performance only visible when using async iteration or is it present even when using the stream in more... "traditional" ways? |
5.0.0 is now out, by the way. |
It seems to only affect async iteration and I agree with you that returning an array of events is surprising (in a bad way) and should not be the default. So either
I feel that 2 is easier to write and maintain (because most difficult things, such as backpressure are fully handled by generators and Readable.from), but I would love your thoughts on which way you would lean. |
@alubbe Sorry for the delay. I'm not a fan of solution 1. In my experience, in TypeScript, having functions that return different types depending on flags, while quite doable (I've done it many times), can get hairy quick (been there done that). Sometimes it is unavoidable (e.g. compatibility with a legacy API), but I try to avoid when I can. |
So we both gravitate towards 2. right? It could look something like this: export async function* parse(iterable, eventTypes) {
const saxesParser = new SaxesParser();
let error;
saxesParser.on('error', err => {
error = err;
});
let events = [];
for (const eventType of eventTypes) {
if (eventType !== 'ready' && eventType !== 'error' && eventType !== 'end') {
this.saxesParser.on(eventType, value => {
this.events.push({eventType, value}); // <- pushes to this.events instead
});
}
}
for await (const chunk of iterable) {
saxesParser.write(chunk.toString());
// saxesParser.write and saxesParser.on() are synchronous,
// so we can only reach the below line once all events have been emitted
if (error) throw error;
// As a performance optimization, we gather all events instead of passing
// them one by one, which would cause each event to go through the event queue
yield events;
events = [];
}
}; Then you could either export an additional |
I'm thinking my previous comment may have given the wrong impression. It is fine to have an API that would allow getting individual events or arrays of events. I'm just not in favor of making the choice between the two values a flag. I'd rather see:
rather than:
Here, however, you're dealing with an already established API which has a specific shape. You cannot just create methods with random names on a Stream object. They'll never get called. It should be possible though to have two stream classes: one that returns single events and one that returns arrays of events. They should be able to share a common base implementation. |
I would even go so far as to say that if you have access to async iteration (node v10+), you should try to avoid writing/creating streams and instead opt for sync and async generators at every stop of the way. At least that's what my experience of the past few months has taught me. Considering that saxes is a modern TS codebase, I would suggest to not expose any stream interfaces, instead 2 async generators:
to be used like this: for await (const {eventType, value} of getEvents(someIterator)) {
// ...
}
for await (const events of getEventBatches(someIterator)) {
for (const {eventType, value} of events) {
// ...
}
} where What do you think? |
Just wanted to check in on this - we've been doing this 'migration' of rewriting streams to generators at work in several places and it's been a very productive experience. Let me know whether you think this could useful for saxes |
What you suggest would be useful. I reviewed the discussion here. Reflecting a bit on where we started and where we are now, I have to say if I had just added streams to satisfy a request for a streaming API, I would have just added a very by-the-book stream API to it, without any of the testing and thinking that you've done. The reason being that as we speak, stream support for saxes is not a live problem for me. |
I gotcha - maybe it's best to not add it then. Just for completeness, this is all that we needed to build for exceljs, maybe it can help someone else (for node v10 and above): const {SaxesParser} = require('saxes');
const {PassThrough} = require('stream');
module.exports = async function*(iterable) {
const saxesParser = new SaxesParser();
let error;
saxesParser.on('error', err => {
error = err;
});
let events = [];
saxesParser.on('opentag', value => events.push({eventType: 'opentag', value}));
saxesParser.on('text', value => events.push({eventType: 'text', value}));
saxesParser.on('closetag', value => events.push({eventType: 'closetag', value}));
for await (const chunk of iterable) {
saxesParser.write(chunk.toString());
// saxesParser.write and saxesParser.on() are synchronous,
// so we can only reach the below line once all events have been emitted
if (error) throw error;
// As a performance optimization, we gather all events instead of passing
// them one by one, which would cause each event to go through the event queue
yield events;
events = [];
}
}; |
@alubbe thank you for this awesome example. I just want to add one important note. If someone wants to parse a Readable stream as an iterable then the Readable stream must read data as a string but not as a Buffer. See docs https://nodejs.org/api/stream.html#stream_event_data
Here is the example: Expand
import {createReadStream} from 'fs';
import {SaxesParser, SaxesTagPlain} from 'saxes';
import Emittery from 'emittery';
export interface SaxesEvent {
type: 'opentag' | 'text' | 'closetag' | 'end';
tag?: SaxesTagPlain;
text?: string;
}
/**
* Generator method.
* Parses one chunk of the iterable input (Readable stream in the string data reading mode).
* @see https://nodejs.org/api/stream.html#stream_event_data
* @param iterable Iterable or Readable stream in the string data reading mode.
* @returns Array of SaxesParser events
* @throws Error if a SaxesParser error event was emitted.
*/
async function *parseChunk(iterable: Iterable<string> | Readable): AsyncGenerator<SaxesEvent[], void, undefined> {
const saxesParser = new SaxesParser<{}>();
let error;
saxesParser.on('error', _error => {
error = _error;
});
// As a performance optimization, we gather all events instead of passing
// them one by one, which would cause each event to go through the event queue
let events: SaxesEvent[] = [];
saxesParser.on('opentag', tag => {
events.push({
type: 'opentag',
tag
});
});
saxesParser.on('text', text => {
events.push({
type: 'text',
text
});
});
saxesParser.on('closetag', tag => {
events.push({
type: 'closetag',
tag
});
});
for await (const chunk of iterable) {
saxesParser.write(chunk as string);
if (error) {
throw error;
}
yield events;
events = [];
}
yield [{
type: 'end'
}];
}
const eventEmitter = new Emittery();
eventEmitter.on('text', async (text) => {
console.log('Start');
await new Promise<void>(async (resolve) => {
await new Promise<void>((resolve1) => {
console.log('First Level Promise End');
resolve1();
});
console.log('Second Level Promise End');
resolve();
});
});
const readable = createReadStream('./some-file.xml');
// Enable string reading mode
readable.setEncoding('utf8');
// Read stream chunks
for await (const saxesEvents of parseChunk(iterable) ?? []) {
// Process batch of events
for (const saxesEvent of saxesEvents ?? []) {
// Emit ordered events and process them in the event handlers strictly one-by-one
// See https://github.com/sindresorhus/emittery#emitserialeventname-data
await eventEmitter.emitSerial(event.type, event.tag || event.text);
}
} |
How usage this solution in Event?
|
Hey, at exceljs we're thinking about moving from sax to saxes exceljs/exceljs#748 (comment)
However, one of our core features is our stream excel reader api that allows large excel files to be processed. The README says that the old streams API was removed and that a new one might be coming - what's the status on that and would you be open to accept help if you lay out a rough plan of how it should be best implemented? Thanks!
The text was updated successfully, but these errors were encountered: