Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queryStream does not handle backpressure #161

Closed
akesser opened this issue Jun 9, 2021 · 5 comments
Closed

queryStream does not handle backpressure #161

akesser opened this issue Jun 9, 2021 · 5 comments

Comments

@akesser
Copy link

akesser commented Jun 9, 2021

I use queryStream to load many (>500000) lines from a table in combination with stream.pipeline to create packages of variable size. The packages are then handled slowly (by using a timeout of two seconds). The two (transform and write) streams used in stream.pipeline respect backpressure. queryStream seems not to handle it.

Since commit ddb2080 was merged, handleNewRows(row) in stream.js does a force push to the underling Readable stream, that is returned to the user:

handleNewRows(row) {    this.inStream.push(row);  }

If I undo the changes from the commit mentioned above, and adding one line

class Stream extends Query {
  constructor(cmdOpts, connOpts, sql, values, socket) {
    super(
      () => {},
      () => {},
      cmdOpts,
      connOpts,
      sql,
      values
    );
    this.socket = socket;
    this.inStream = new Readable({
      objectMode: true,
      read: () => {
        this.socket.resume();
      }
    });

    this.on('fields', function (meta) {
      this.inStream.emit('fields', meta);
    });

    this.on('error', function (err) {
      this.inStream.emit('error', err);
    });

    this.on('end', function (err) {
      if (err) this.inStream.emit('error', err);
      this.socket.resume(); // <- This line was added by me otherwise the connection is not closed
      this.inStream.push(null);
    });
  }

  handleNewRows(row) {
    if (!this.inStream.push(row)) {
      this.socket.pause();
    }
  }
}

The following image shows the different size off process.memoryUsage["rss"] for the two different implementations:
Bild 09 06 21 um 10 46

Additionally, if I understand it correctly, because the data from the net socket is handled using the "data" event, the size of the data that is read and the speed can only be controlled by socket.pause and socket.resume. Is the system able to handle backpressure via the tcp connection in this case?

Thanks for your help.

@rusher
Copy link
Collaborator

rusher commented Jun 10, 2021

I'll need to dig a little on that.

On one hand previous implementation (the one you use in green) handle backpressure well, but if pipelining results, any error when handling result makes connection stall indefinitively.

On other side, current implementation might use more memory if client result handling takes more time than database returning data, but connection will be ok.

There might be some solution to avoid having connection hanging.

@rusher
Copy link
Collaborator

rusher commented Jun 10, 2021

For the moment, i would tend to think that providing a method to close stream would seem the best solution.

like :

const queryStream = conn.queryStream('SELECT * FROM seq_1_to_10000');

stream.pipeline(queryStream, transformStream, someWriterStream, (err) => {
  if (err) queryStream.close(); // <= method to call to ensure having connection not stalling
  ...
});

@rusher
Copy link
Collaborator

rusher commented Jun 11, 2021

Choice was initially to have a good state whatever the situation, but this goes against stream feature to avoid charging memory.

Correction will handle implementation like the correction you describe, and provide a method to permits close query stream when error occurs, to properly handle connection.
Documentation will explicit this use case.

@akesser
Copy link
Author

akesser commented Jun 11, 2021

Im not sure if stream.pipeline propagates errors to the other streams, but if so, could a listener in the Stream extends Query class help to inform the socket or the queryStream about this event?

something like

this.inStream.on('error') {
  this.emit("error"); // or some other way to inform about this
}

@rusher
Copy link
Collaborator

rusher commented Sep 15, 2021

correction will be release in 3.0.1-RC.

If any error occurs using resultset stream, query stream will have to be close.
previous example :

const queryStream = connection.queryStream("SELECT * FROM mysql.user");
stream.pipeline(queryStream, transformStream, someWriterStream);

is now

const queryStream = connection.queryStream("SELECT * FROM mysql.user");
stream.pipeline(queryStream, transformStream, someWriterStream, (err) => { queryStream.close(); });

There is no way to ensure that automagically, so documentation explicitly explain that, permitting real streaming

@rusher rusher closed this as completed Sep 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants