New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rotate stream into google cloud storage #75

Closed
adtecho opened this Issue Nov 17, 2016 · 9 comments

Comments

Projects
None yet
2 participants
@adtecho

adtecho commented Nov 17, 2016

Awesome library! I played with it and referenced our use here: https://lugassy.net/how-moving-from-pub-sub-to-avro-saved-us-38-976-year-ec6c33ea7d08

We're pushing avro files to google cloud storage every 5 minutes and currently use FileEncoder:

var avro = {
  type: avsc.parse('/schemas/event.avsc'),
  filename: randomFilename()
}

avro.encoder = avsc.createFileEncoder(avro.filename, avro.type)

// multiple
avro.encoder.write(event)

setInterval(function() {
  var newFilename = randomFilename()

  // flush, so we can upload to gcs
  avro.encoder.end()

  // start a new encoder
  avro.encoder = avsc.createFileEncoder(newFilename, eventType)
  gcs.bucket('my-bucket').upload(avro.filename)
  avro.filename = newFilename

}, 5 * 60 * 1000)

The actual code is more complex and consist of managing multiple encoders, cleaning up files, etc.

Following #17, I tried doing it file-less, but something doesn't look right. I'm thinking the new encoder restarts before the file gets upload.

var avro = {
  type: avsc.parse('/schemas/event.avsc'),
}

avro.encoder = new avsc.streams.BlockEncoder(avro.type)

// multiple
avro.encoder.write(event)

setInterval(function() {
  // avoids uploading empty files
  if (!avro.encoder._blockCount) return

  var file = gcs.bucket('my-bucket').file(randomFilename())
  avro.encoder.pipe(file.createWriteStream())
  avro.encoder.end()
  avro.encoder = new avsc.streams.BlockEncoder(avro.type)
}, 5 * 60 * 1000)

Any idea how to properly rotate these?

@mtth

This comment has been minimized.

Show comment
Hide comment
@mtth

mtth Nov 18, 2016

Owner

Thanks! What errors are you getting with the file-less implementation?

Owner

mtth commented Nov 18, 2016

Thanks! What errors are you getting with the file-less implementation?

@adtecho

This comment has been minimized.

Show comment
Hide comment
@adtecho

adtecho Nov 18, 2016

My bad. error repeats whether I try file- or stream-based upload.

Basically, only a handful of files end up in google cloud out of all running log servers and they are only ~1MB or less. Before I deeply debug:

  1. was this tested under load of 500-1000 RPS (per server)?
  2. what is a proper number for blockSize?
  3. is _.blockCount the right variable to determine if a stream is not empty?
  4. if i pipe an encoder, then end it and immediately open a new one, can i lose some of the events?

One thing I did see is that on high load, _.blockCount will sometimes return 1 although clearly there was a lot more than 64K written.

ps, I use codec: deflate.

adtecho commented Nov 18, 2016

My bad. error repeats whether I try file- or stream-based upload.

Basically, only a handful of files end up in google cloud out of all running log servers and they are only ~1MB or less. Before I deeply debug:

  1. was this tested under load of 500-1000 RPS (per server)?
  2. what is a proper number for blockSize?
  3. is _.blockCount the right variable to determine if a stream is not empty?
  4. if i pipe an encoder, then end it and immediately open a new one, can i lose some of the events?

One thing I did see is that on high load, _.blockCount will sometimes return 1 although clearly there was a lot more than 64K written.

ps, I use codec: deflate.

@mtth

This comment has been minimized.

Show comment
Hide comment
@mtth

mtth Nov 18, 2016

Owner

was this tested under load of 500-1000 RPS (per server)?

Load testing of which part? ~1k records per second for encoders is unlikely to be an issue (these support upwards of 100k records per second depending on your schema). I'm not familiar with hot spots outside of 'avsc' though (e.g. concurrent uploads).

what is a proper number for blockSize?

The default (64kB) is safe. Anywhere between 1kB and a few 100kB is probably fine as well; lower will lead to faster but less compact compression. 16kB is another common value.

is _.blockCount the right variable to determine if a stream is not empty?

No, this field actually represents the number of records in the pending block; so it could be equal to 0 even if the stream isn't empty (it's also "private"). There is no field which exposes this functionality directly currently, we could add one but I'm wondering whether there would be a simpler way to achieve what you are trying to do (perhaps using an auxiliary variable?).

Would you be able to rotate the files when they reach a certain number of blocks rather than on a fixed time interval? It would solve the empty stream issue and also guarantee a more even file size distribution.

if i pipe an encoder, then end it and immediately open a new one, can i lose some of the events?

All the events will be piped through (just make sure that you pipe the new encoder to a different destination). Streams buffer all their data in memory until they are piped to their destination.

On a related note, given your batch use-case, I'd recommend sticking with the file-backed implementation to avoid consuming too much memory. All this data would also be lost if the machine happens to crash.

There's a race condition in the above implementation though: the file upload could be triggered before all the records are flushed to the file-system. It would be safer to wait until the local file write stream's 'finish' event before starting the upload.

If you have error stack traces to share, I can try to provide pointers on what is going on.

Owner

mtth commented Nov 18, 2016

was this tested under load of 500-1000 RPS (per server)?

Load testing of which part? ~1k records per second for encoders is unlikely to be an issue (these support upwards of 100k records per second depending on your schema). I'm not familiar with hot spots outside of 'avsc' though (e.g. concurrent uploads).

what is a proper number for blockSize?

The default (64kB) is safe. Anywhere between 1kB and a few 100kB is probably fine as well; lower will lead to faster but less compact compression. 16kB is another common value.

is _.blockCount the right variable to determine if a stream is not empty?

No, this field actually represents the number of records in the pending block; so it could be equal to 0 even if the stream isn't empty (it's also "private"). There is no field which exposes this functionality directly currently, we could add one but I'm wondering whether there would be a simpler way to achieve what you are trying to do (perhaps using an auxiliary variable?).

Would you be able to rotate the files when they reach a certain number of blocks rather than on a fixed time interval? It would solve the empty stream issue and also guarantee a more even file size distribution.

if i pipe an encoder, then end it and immediately open a new one, can i lose some of the events?

All the events will be piped through (just make sure that you pipe the new encoder to a different destination). Streams buffer all their data in memory until they are piped to their destination.

On a related note, given your batch use-case, I'd recommend sticking with the file-backed implementation to avoid consuming too much memory. All this data would also be lost if the machine happens to crash.

There's a race condition in the above implementation though: the file upload could be triggered before all the records are flushed to the file-system. It would be safer to wait until the local file write stream's 'finish' event before starting the upload.

If you have error stack traces to share, I can try to provide pointers on what is going on.

@adtecho

This comment has been minimized.

Show comment
Hide comment
@adtecho

adtecho Nov 19, 2016

If I pipe avsc BlockEncoder to a Google Cloud Storage FileEncoder, memory consumption should stay low no? I really want to move away from file-based since file mounting and disk i/o seems unexpected in Google App Engine.

I added an auxiliary variable ("has_data") to determine if an encoder has at least 1 event to write.

What I still don't get is how to rotate or "reset" the encoder after it is closed (ended) while continuing to receive 1000 new RPS. Re-defining it like this removes the pipe which might still be uploading:

avro.encoder.end()
avro.encoder = new avsc.streams.BlockEncoder(avro.type, {codec: 'deflate'})
avro.pipe(newWritableStream)

adtecho commented Nov 19, 2016

If I pipe avsc BlockEncoder to a Google Cloud Storage FileEncoder, memory consumption should stay low no? I really want to move away from file-based since file mounting and disk i/o seems unexpected in Google App Engine.

I added an auxiliary variable ("has_data") to determine if an encoder has at least 1 event to write.

What I still don't get is how to rotate or "reset" the encoder after it is closed (ended) while continuing to receive 1000 new RPS. Re-defining it like this removes the pipe which might still be uploading:

avro.encoder.end()
avro.encoder = new avsc.streams.BlockEncoder(avro.type, {codec: 'deflate'})
avro.pipe(newWritableStream)
@mtth

This comment has been minimized.

Show comment
Hide comment
@mtth

mtth Nov 20, 2016

Owner

If I pipe avsc BlockEncoder to a Google Cloud Storage FileEncoder, memory consumption should stay low no?

Right: if you pipe the BlockEncoder to a GCS file stream, the records will be uploaded directly rather than buffered (this isn't what the code sample above is doing though, it pipes the stream just before ending it). The trade-off here is that now you keep long-running outgoing connections which might experience slowness (in which case memory usage would also go up), get dropped (make sure to check what happens if that's the case, to make sure you don't lose partial uploads), etc.

What I still don't get is how to rotate or "reset" the encoder after it is closed (ended) while continuing to receive 1000 new RPS. Re-defining it like this removes the pipe which might still be uploading:

You wouldn't be reusing the same pipe destination (GCS writable stream). The pipe on the existing stream should still be effective until the block encoder is out of events (at which point both will be GC'ed). I'll try to illustrate using examples based on your code; here are the two main ways which you can use to reset the encoder:

  • Buffering locally, then uploading (each encoder's upload will continue in the background even though the encoder isn't accessible anymore; to make sure this is the case you can add a listener to the 'finish' event on the writable stream, or even 'data' event on the encoder to count the number of emitted blocks):
var encoder;
// ...
encoder = new BlockEncoder(/* ... */);
encoder.write(/* ... */); // This record will be buffered in memory.
// ...
encoder.write(/* ... */);
encoder.pipe(createGcsUploadStream()); // Start the upload.
encoder.end(); // Flush and terminate the upload when all records are written.
encoder = new BlockEncoder(/* ... */); // Create a new encoder (unpiped).
// At this point the previous encoder isn't available anymore, but the upload will
// still continue in the background.
encoder.write(/* ... */);
// etc.
  • Directly uploading the records:
var encoder;
// ...
encoder = new BlockEncoder(/* ... */);
encoder.pipe(createGcsUploadStream()); // Pipe encoded records directly.
encoder.write(/* ... */); // This record gets piped through.
// ...
encoder.write(/* ... */);
encoder.end(); // Flush and terminate the upload when all records are written.
encoder = new BlockEncoder(/* ... */); // Create a new encoder.
encoder.pipe(createGcsUploadStream()); // And pipe it to a new destination.
encoder.write(/* ... */);
// etc.

Does this make sense?

Owner

mtth commented Nov 20, 2016

If I pipe avsc BlockEncoder to a Google Cloud Storage FileEncoder, memory consumption should stay low no?

Right: if you pipe the BlockEncoder to a GCS file stream, the records will be uploaded directly rather than buffered (this isn't what the code sample above is doing though, it pipes the stream just before ending it). The trade-off here is that now you keep long-running outgoing connections which might experience slowness (in which case memory usage would also go up), get dropped (make sure to check what happens if that's the case, to make sure you don't lose partial uploads), etc.

What I still don't get is how to rotate or "reset" the encoder after it is closed (ended) while continuing to receive 1000 new RPS. Re-defining it like this removes the pipe which might still be uploading:

You wouldn't be reusing the same pipe destination (GCS writable stream). The pipe on the existing stream should still be effective until the block encoder is out of events (at which point both will be GC'ed). I'll try to illustrate using examples based on your code; here are the two main ways which you can use to reset the encoder:

  • Buffering locally, then uploading (each encoder's upload will continue in the background even though the encoder isn't accessible anymore; to make sure this is the case you can add a listener to the 'finish' event on the writable stream, or even 'data' event on the encoder to count the number of emitted blocks):
var encoder;
// ...
encoder = new BlockEncoder(/* ... */);
encoder.write(/* ... */); // This record will be buffered in memory.
// ...
encoder.write(/* ... */);
encoder.pipe(createGcsUploadStream()); // Start the upload.
encoder.end(); // Flush and terminate the upload when all records are written.
encoder = new BlockEncoder(/* ... */); // Create a new encoder (unpiped).
// At this point the previous encoder isn't available anymore, but the upload will
// still continue in the background.
encoder.write(/* ... */);
// etc.
  • Directly uploading the records:
var encoder;
// ...
encoder = new BlockEncoder(/* ... */);
encoder.pipe(createGcsUploadStream()); // Pipe encoded records directly.
encoder.write(/* ... */); // This record gets piped through.
// ...
encoder.write(/* ... */);
encoder.end(); // Flush and terminate the upload when all records are written.
encoder = new BlockEncoder(/* ... */); // Create a new encoder.
encoder.pipe(createGcsUploadStream()); // And pipe it to a new destination.
encoder.write(/* ... */);
// etc.

Does this make sense?

@adtecho

This comment has been minimized.

Show comment
Hide comment
@adtecho

adtecho Nov 20, 2016

Thanks! I've opt'ed for the 2nd option and putting the following into setInterval of 5-10 minutes:

  1. check encoder has_data variable
  2. if true, create a new encoder, gcs upload stream and pipe them together
  3. if not, continue waiting for events

Sorry I did not know the upload will continue at the background once you recreate an encoder. I also had a bug where I forgot to redefine the on data/finish events on each pass.

Lastly, I've forced the encoder to end when /_ah/stop was firing so I don't lose events when instance terminates.

Updated the blog post as well. Thanks so much!

adtecho commented Nov 20, 2016

Thanks! I've opt'ed for the 2nd option and putting the following into setInterval of 5-10 minutes:

  1. check encoder has_data variable
  2. if true, create a new encoder, gcs upload stream and pipe them together
  3. if not, continue waiting for events

Sorry I did not know the upload will continue at the background once you recreate an encoder. I also had a bug where I forgot to redefine the on data/finish events on each pass.

Lastly, I've forced the encoder to end when /_ah/stop was firing so I don't lose events when instance terminates.

Updated the blog post as well. Thanks so much!

@adtecho adtecho closed this Nov 20, 2016

@adtecho

This comment has been minimized.

Show comment
Hide comment
@adtecho

adtecho Nov 20, 2016

ouch, re-opening. now files get written as corrupted. both uploaded ones and locally piped ones.
getting the following with avro tools 1.8.1:

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 822334438
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:220)
    at org.apache.avro.tool.DataFileReadTool.run(DataFileReadTool.java:76)
    at org.apache.avro.tool.Main.run(Main.java:87)
    at org.apache.avro.tool.Main.main(Main.java:76)

adtecho commented Nov 20, 2016

ouch, re-opening. now files get written as corrupted. both uploaded ones and locally piped ones.
getting the following with avro tools 1.8.1:

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 822334438
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:220)
    at org.apache.avro.tool.DataFileReadTool.run(DataFileReadTool.java:76)
    at org.apache.avro.tool.Main.run(Main.java:87)
    at org.apache.avro.tool.Main.main(Main.java:76)

@adtecho adtecho reopened this Nov 20, 2016

@adtecho

This comment has been minimized.

Show comment
Hide comment
@adtecho

adtecho Nov 20, 2016

The above only replicates on live, high-traffic instances. I was able to solve it with 2 encoders and a rotating "currentActive" flag.

So encoder and pipe now have at least 5 minutes to completely finalize, around which the other encoder becomes active.

adtecho commented Nov 20, 2016

The above only replicates on live, high-traffic instances. I was able to solve it with 2 encoders and a rotating "currentActive" flag.

So encoder and pipe now have at least 5 minutes to completely finalize, around which the other encoder becomes active.

@adtecho adtecho closed this Nov 20, 2016

@mtth

This comment has been minimized.

Show comment
Hide comment
@mtth

mtth Nov 21, 2016

Owner

Glad to hear that you got it working. In case it's helpful, I've implemented below a basic but functional rotating block encoder.

const avro = require('avsc');
const fs = require('fs');
const stream = require('stream');

/** Encoder which periodically changes output location. */
class RotatingEncoder extends stream.Writable {

  constructor(type, opts, cb) {
    super({objectMode: true});
    this.type = type;
    this.opts = opts; // Block encoder options.
    this.cb = cb; // Destination callback, called at each rotation.

    this.count = undefined;
    this.encoder = undefined;
    this._reset();

    this.timeout = setInterval(() => {
      if (!this.count) {
        return;
      }
      this.encoder.end();
      this._reset();
    }, opts.interval || 5 * 60 * 1000); // 5 min default refresh interval.

    this.once('finish', () => {
      clearInterval(this.timeout);
      this.encoder.end();
    });
  }

  _write(val, encoding, cb) {
    this.count++;
    this.encoder.write(val, encoding, cb);
  }

  _reset() {
    this.count = 0;
    this.encoder = new avro.streams.BlockEncoder(this.type, this.opts);
    this.encoder.pipe(this.cb());
  }
}

Sample usage, outputting to the local file-system:

const encoder = new RotatingEncoder(type, {codec: 'deflate'}, () => {
  return fs.createWriteStream('data-' + Date.now() + '.avro');
});
Owner

mtth commented Nov 21, 2016

Glad to hear that you got it working. In case it's helpful, I've implemented below a basic but functional rotating block encoder.

const avro = require('avsc');
const fs = require('fs');
const stream = require('stream');

/** Encoder which periodically changes output location. */
class RotatingEncoder extends stream.Writable {

  constructor(type, opts, cb) {
    super({objectMode: true});
    this.type = type;
    this.opts = opts; // Block encoder options.
    this.cb = cb; // Destination callback, called at each rotation.

    this.count = undefined;
    this.encoder = undefined;
    this._reset();

    this.timeout = setInterval(() => {
      if (!this.count) {
        return;
      }
      this.encoder.end();
      this._reset();
    }, opts.interval || 5 * 60 * 1000); // 5 min default refresh interval.

    this.once('finish', () => {
      clearInterval(this.timeout);
      this.encoder.end();
    });
  }

  _write(val, encoding, cb) {
    this.count++;
    this.encoder.write(val, encoding, cb);
  }

  _reset() {
    this.count = 0;
    this.encoder = new avro.streams.BlockEncoder(this.type, this.opts);
    this.encoder.pipe(this.cb());
  }
}

Sample usage, outputting to the local file-system:

const encoder = new RotatingEncoder(type, {codec: 'deflate'}, () => {
  return fs.createWriteStream('data-' + Date.now() + '.avro');
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment