Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Leaks? #60

Closed
lynxaegon opened this issue May 24, 2018 · 6 comments
Closed

Memory Leaks? #60

lynxaegon opened this issue May 24, 2018 · 6 comments

Comments

@lynxaegon
Copy link

lynxaegon commented May 24, 2018

Hi,

I'm using elasticsearchJS to export a whole index from ES in batches of 4096.
The whole tool uses about 500mb RAM while dumping ES index to parquet format.
(nodeJS has 2GB memory limit set)

If i lower or increase the batch size (or randomly) it uses a lot of memory like 2-3GB and it gets killed.
The quickest way to reproduce is to increase the batch size that it has to process.
The generate parquet file, usually has ~5.4GB.

Is there anything i can do to debug this more?

Thanks!

P.S.: I'm using git+ssh://git@github.com/ironSource/parquetjs.git#1fa58b589d9b6451379f1558214e9ae751909596 as the parquetJS package.

@asmuth
Copy link
Contributor

asmuth commented May 24, 2018

I assume you have already seen the section of the README section about setting the row group/buffer size? https://github.com/ironSource/parquetjs#buffering--row-group-size -- If you keep increasing the batch size an OOM error will eventually be the expected failure case.

The default batch size is 4096; is it possible that your records are on the order of ~500KB each? Have you tried lowering the parquet.js row group size (via setRowGroupSize) yet?

@lynxaegon
Copy link
Author

I saw the section, but even with 1024 as a rowGroupSize it sometimes (very rarely) sends an OOM error.
I think the records are ~10-25KB each. I will keep trying to change the rowGroupSize.
Thanks

@ku-s-h
Copy link

ku-s-h commented Nov 9, 2018

I am facing a similar issue with my app. It writes incoming data to parquet, rotates the files every 1 minute and pushes to s3. The application start with ~400mb of memory and keeps on increasing, eventually crossing 10gb in a few hours. Each record is a small json and parquet file generated each minutes is ~40mb.

Parquet

var parquet = require('parquetjs');
var S3 = require('./s3-out');
var dir = 'parquet/';
var writer;
var out_file;
var interval;
var timeoutHandle = null;

function startTimeout() {
  stopTimeout();
  timeoutHandle = setTimeout(rotateFile, 60000);
  console.log("start timeout");
}

function stopTimeout() {
  clearTimeout(timeoutHandle);
  console.log("clear timeout");
}

var schema = new parquet.ParquetSchema({
    // schema
});

function getFileName() {
    var d = new Date();
    return `${process.env.POD_NAME}-${d.getHours()}-${d.getMinutes()}-${d.getSeconds()}.parquet`
}

async function setupWriter() {
    startTimeout();
    // interval = setInterval(rotateFile, 60000);
    out_file = getFileName();
    writer = await parquet.ParquetWriter.openFile(schema, dir + out_file);
    writer.setRowGroupSize(100);
    console.log("opened " + out_file);
}

async function writeToParquet(data) {
    if(data && data[0]) {
        for(var i =0; i < data.length; i++) {
            var time = data[i].at;
            if(time) {
                time = time.toString();
            }
            await writer.appendRow({
                // schema
            });
        }
    } else {
        console.log("Bad data to write to parquet");
    }
}

async function rotateFile() {
    const used = process.memoryUsage();
    for (let key in used) {
    console.log(`${key} ${Math.round(used[key] / 1024 / 1024 * 100) / 100} MB`);
    }
    var old_writer = writer;
    var old_out_file = out_file;
    out_file = getFileName();
    var new_writer = await parquet.ParquetWriter.openFile(schema, dir + out_file);
    new_writer.setRowGroupSize(100);
    console.log("opened " + out_file);
    writer = new_writer;
    try {
        await old_writer.close().then(function() {
        console.log("closed " + old_out_file);
        old_writer = null;
        var date = new Date();
        var s3_dir = 'logs/' + date.getFullYear() + '/' + (date.getMonth() + 1) + '/' + date.getDate() + '/'; 
        S3.pushToS3(dir, old_out_file, s3_dir + old_out_file, function() {
            startTimeout();
        });
        });
    } catch(err) {
        console.log("Writing error " + err);
        startTimeout();
    }
}

exports.setupWriter = setupWriter;
exports.writeToParquet = writeToParquet;

S3

var fs = require('fs');
var AWS = require('aws-sdk');
var s3;

function setupS3Client() {
    s3 = new AWS.S3({
        region: 'ap-south-1',
        maxRetries: 3
    })
}


function pushToS3(dir, filename, key, callback) {
    fs.readFile(dir + filename, function(err, data) {
        if(err) {
            console.log("file read error: " + err);
            callback();
        } else {
            var params = {
                Bucket: 'parquet',
                Key: key,
                Body: data
            }
            s3.upload(params, function(err, data) {
                if (err) {
                    console.log("Error uploading to s3 " + err);
                    callback();
                } else {
                    console.log("file uplaoded");
                    callback();
                }
            })
        }
    })
}

exports.setupS3Client = setupS3Client;
exports.pushToS3 = pushToS3;

Chaning the group size also does not alter this behaviour.

@arnabguptadev
Copy link
Contributor

arnabguptadev commented Jun 1, 2020

I think I hit this same issue. Will try to post a working sample to demonstrate, but here's what I did:

  • Tried to write 20 rows.
  • Set row group sizes to different values: 3/5/7/50
  • Each time the output size is different by wide amounts
  • Writes more lines than expected (validated in Athena using count(*))

Debugging through the library it seems that if the flushing happens only inside the close method (here: https://github.com/ironSource/parquetjs/blob/master/lib/writer.js#L108)- you get everything fine and the smallest output.

But if - due to your row group size - it is triggered also in append (here: https://github.com/ironSource/parquetjs/blob/master/lib/writer.js#L96) then you end up with duplicate rows. For large amounts of rows that continues to build up till it blows memory.

I tried the following as a quick and dirty workaround and seems to work: I changed the above lines in writer.js to:

      let to_write = this.rowBuffer;
      let rowCount = this.rowBuffer.rowCount;
      this.rowBuffer.rowCount = 0;
      this.rowBuffer = {};
      to_write.rowCount = rowCount;
      await this.envelopeWriter.writeRowGroup(to_write);

With this it does seem to keep the count integrity in place.

I think having the await before resetting the buffer (this.rowBuffer = {};) is the issue.

Does this sound right?

Regards,
Arnab.

@asmuth
Copy link
Contributor

asmuth commented Jun 10, 2020

OK, thanks for figuring this out! I think I see what is going on here now. There is an undocumented assumption that appendRow is not called concurrently, i.e. that, at any given time, there is only one outstanding call to the method.

For now, the following workaround should solve the problem: Ensure that calls to appendRow are sequenced, i.e. only start a new call to appendRow once the previous one has returned. This should ideally be done by using the "await" keyword on any call to appendRow.

As for a long term solution, I am not enough of a JavaScript expert to tell if something needs to be fixed in parquetjs or not. In my eyes, our current behaviour is similar to the the behaviour of the "fs" module built into nodejs. E.g. if you call fs.appendFile concurrently the result would be equally undefined. However, we might at the least consider to add a better error message for users that hit this issue.

Also see the discussion in #105

@asmuth
Copy link
Contributor

asmuth commented Jun 16, 2020

Closing this as resolved.

@asmuth asmuth closed this as completed Jun 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants