Skip to content
This repository has been archived by the owner on Jun 28, 2021. It is now read-only.

Pausing the parser? #67

Closed
blazerguns opened this issue Nov 3, 2015 · 20 comments
Closed

Pausing the parser? #67

blazerguns opened this issue Nov 3, 2015 · 20 comments

Comments

@blazerguns
Copy link

I love your parser and I usually parse around 500K lines of CSV files. While this does well, I often noticed that during parsing, I run out of memory. The original code stored all records in array. I was wondering if I could just pause after say 500 entries in array and sleep for few a while before proceeding? This would give a chance for garbage collector to clean up some older objects for this to continue. Is this possible?

function parse_csv_internal_csv_stream(parseFile, onComplete){

log("[parse_csv_internal_csv_stream]:Parsing CSV internal csv starts", 7, conductor_log_modules.csv_functions); 
var finalData = [];
var readStream = fs.createReadStream(parseFile);

var parser = csv.parse({columns: true, relax: true});

readStream.on('open', function () {
// This just pipes the read stream to the response object (which goes to the client)
  readStream.pipe(parser);
});

readStream.on('error', function(err) {
  log(err.message, 3, conductor_log_modules.csv_functions);
  onComplete(err, null);
});

parser.on('readable', function(){
  while(record = parser.read()){
    finalData.push(record);
  }
});
// Catch any error
parser.on('error', function(err) {
  log(err.message, 3, conductor_log_modules.csv_functions);
  onComplete(err, null);
});

parser.on('finish', function () {
  log("[parse_csv_internal_csv_stream]:Parsing CSV internal csv ends", 7, conductor_log_modules.csv_functions); 
  onComplete(null, finalData);
  parser.end();
});

}

@wdavidw
Copy link
Member

wdavidw commented Nov 3, 2015

Hi, the parser has two api, a callback based which hold object in memory to supply the final callback and a stream based which doesnt hold anything in memory (unless i made a stupid mistake somewhere). you seems to be using the stream approach so this isnt normal. this is the first time (after many years, this module is maybe 5 years hold in history) that i've heard of a memory issue. could you make sure this come from the parser and if so, could you provide me with a sample file that i could download an test to reproduce?

@blazerguns
Copy link
Author

Hi David,

I am pretty sure its happening in the parser. At first I did not doubt the parser. In my complex app, here is the cpu/mem usage when parsing starts. I know parsing starts based on log message that throws up. I am running #top | grep node. From the logs below you can see the climb. The code being executed the above code I posted. Memory usage shoots up from 800 odd MB (which is also big) to 1.9 GB. At that stage, the node is running still at 100% cpu but the parsing is done, I am assuming it is garbage collector and then after few mins it drops memory usage to 1.2GB as shown below.

8781 chandva 20 0 1817m 889m 9168 S 87.1 5.6 2:26.06 node
28781 chandva 20 0 1817m 889m 9168 S 15.6 5.6 2:26.53 node
28781 chandva 20 0 1817m 892m 9168 S 14.0 5.6 2:26.95 node
28781 chandva 20 0 1801m 876m 9168 S 21.3 5.5 2:27.59 node
28781 chandva 20 0 1801m 876m 9168 S 13.6 5.5 2:28.00 node
28781 chandva 20 0 1818m 895m 9168 S 20.3 5.6 2:28.61 node
28781 chandva 20 0 1818m 898m 9168 S 0.7 5.6 2:28.63 node
28781 chandva 20 0 1821m 903m 9168 R 75.8 5.7 2:30.91 node
28781 chandva 20 0 1920m 1.0g 9168 R 102.4 6.3 2:33.99 node
28781 chandva 20 0 2096m 1.2g 9168 R 101.7 7.4 2:37.05 node
28781 chandva 20 0 2034m 1.1g 9168 R 103.7 7.0 2:40.17 node
28781 chandva 20 0 2196m 1.3g 9168 R 101.7 8.0 2:43.23 node
28781 chandva 20 0 2327m 1.4g 9168 R 101.4 8.9 2:46.28 node
28781 chandva 20 0 2362m 1.4g 9168 R 104.7 9.1 2:49.43 node
28781 chandva 20 0 2518m 1.6g 9168 R 101.0 10.1 2:52.47 node
28781 chandva 20 0 2557m 1.6g 9168 R 100.4 10.3 2:55.49 node
28781 chandva 20 0 2576m 1.6g 9168 R 105.7 10.4 2:58.67 node
28781 chandva 20 0 2583m 1.6g 9168 R 105.0 10.4 3:01.83 node
28781 chandva 20 0 2586m 1.6g 9168 R 105.7 10.4 3:05.01 node
28781 chandva 20 0 2593m 1.6g 9168 R 111.0 10.5 3:08.35 node
28781 chandva 20 0 2596m 1.6g 9168 R 105.4 10.5 3:11.52 node
28781 chandva 20 0 2600m 1.6g 9168 R 105.4 10.5 3:14.69 node
28781 chandva 20 0 2603m 1.6g 9168 R 105.7 10.6 3:17.87 node
28781 chandva 20 0 2607m 1.6g 9168 R 105.4 10.6 3:21.04 node
28781 chandva 20 0 2610m 1.7g 9168 R 105.9 10.6 3:24.23 node
28781 chandva 20 0 2614m 1.7g 9168 R 105.4 10.6 3:27.40 node
28781 chandva 20 0 2618m 1.7g 9168 R 105.7 10.7 3:30.58 node
28781 chandva 20 0 2646m 1.7g 9168 R 105.7 10.8 3:33.76 node
28781 chandva 20 0 2649m 1.7g 9168 R 106.0 10.9 3:36.95 node
28781 chandva 20 0 2650m 1.7g 9168 R 105.4 10.9 3:40.12 node
28781 chandva 20 0 2650m 1.7g 9168 R 100.4 10.9 3:43.14 node
28781 chandva 20 0 2651m 1.7g 9168 R 105.0 10.9 3:46.31 node
28781 chandva 20 0 2651m 1.7g 9168 R 105.7 10.9 3:49.50 node
......
28781 chandva 20 0 2819m 1.9g 9168 R 107.3 11.9 22:31.51 node
28781 chandva 20 0 2819m 1.9g 9168 R 100.0 11.9 22:34.52 node
28781 chandva 20 0 2869m 1.9g 9168 R 108.7 12.0 22:37.79 node <--- Parsing ended and long pause()
28781 chandva 20 0 2168m 1.2g 9168 R 73.8 7.8 22:40.01 node
28781 chandva 20 0 2168m 1.2g 9168 R 34.9 7.8 22:41.06 node
28781 chandva 20 0 2186m 1.2g 9168 S 13.3 8.0 22:41.46 node

This is ok in this scenario, but in some cases it touches 2.0b before crash with no memory. The most interesting bit is that this always happens when the parser is on. I do some memory intensive ops after the parser is done (and memory is 1.2G), and my operation makes memory climb to 2.3G and then clears up after the op is done. So my question is why my memory intensive op is allowing memory to climb to 2.3G while the parser craps out by 1.9G. The only difference is cpu usage. During my memory intensive ops, the cpu is around 20%. During parsing it is constant 100%. So I suspect it is something to do with it.

Now, you could clearly doubt my app on this, so I wrote a simple code to test the above.

var destination = 'tmp/myfile.csv'

// The listen port for the API
var listen_port = 4449;

var api_certs = {
key: fs.readFileSync('certs/server.key'),
cert: fs.readFileSync('certs/server.crt')
};

function parse_csv_internal_csv_stream(parseFile, onComplete){

log("[parse_csv_internal_csv_stream]:Parsing CSV internal csv starts", 7, conductor_log_modules.csv_functions); 
var finalData = [];
var readStream = fs.createReadStream(parseFile);

var parser = csv.parse({columns: true, relax: true});

readStream.on('open', function () {
// This just pipes the read stream to the response object (which goes to the client)
  readStream.pipe(parser);
});

readStream.on('error', function(err) {
  log(err.message, 3, conductor_log_modules.csv_functions);
  onComplete(err, null);
});

parser.on('readable', function(){
  while(record = parser.read()){
    finalData.push(record);
  }
});
// Catch any error
parser.on('error', function(err) {
  log(err.message, 3, conductor_log_modules.csv_functions);
  onComplete(err, null);
});

parser.on('finish', function () {
  log("[parse_csv_internal_csv_stream]:Parsing CSV internal csv ends", 7, conductor_log_modules.csv_functions); 
  onComplete(null, finalData);
  //log(finalData);
  parser.end();
});

}

function parse_csv(csv_data) {
log("[parse_csv]:Parsing CSV starts", 7, conductor_log_modules.csv_functions);
return new Promise(function (fulfill, reject) {
parse_csv_internal_csv_stream(csv_data, function(err, result) {
if (err == null) {
log("[parse_csv]:Parsing CSV ends", 7, conductor_log_modules.csv_functions);
fulfill(result);
} else {
log('Parse CSV function has an error!', 3, conductor_log_modules.csv_functions);
return reject(err);
}
});
});
}

https.createServer(api_certs, function (req, res) {

var parseData = undefined;

parse_csv(destination)
.then(function (cmdb_data) {
log("First stage parse done with " + cmdb_data.length);
//log(cmdb_data);
});
}).listen(listen_port);

This is very simple simulation and here is what I found on my laptop (OSX)
4812 node 0.0 00:00.19 6 0 34 17M 0B 0B 4812 344 sleeping *0[1] <-start

4812 node 0.0 01:01.71 11 0 43 1500M 0B 6680K 4812 344 sleeping *0[1] <- end

Could it be that all the size is merely parseData??

@wdavidw
Copy link
Member

wdavidw commented Nov 3, 2015

ok, then i must do sth stupid that wasn't there before

@blazerguns
Copy link
Author

Do you want my CSV file? I have to sanitize it before giving you or all the powers that rule me will fire me from my job :-)

@wdavidw
Copy link
Member

wdavidw commented Nov 3, 2015

i'll try to save u this effort, give me a change to replicate this before i confirm

@wdavidw
Copy link
Member

wdavidw commented Nov 3, 2015

can you try this or sth similar. from the coffee repl, in the parent directory where csv module are git cloned:

stringify = require './csv-stringify'
parse = require './csv-parse'
generate = require './csv-generate'
generate(duration:3600*1000).pipe(parse()).pipe(stringify()).pipe(fs.createWriteStream('./test.csv'));

i've produce a 2GB file with constant memory usage: real memory size of 100MB, virtual memory size of 3GB.

@blazerguns
Copy link
Author

How are you calculating the memory? Does it have to be coffee repl? Or can I use my csv instead of generating a new one?

@wdavidw
Copy link
Member

wdavidw commented Nov 3, 2015

to calculate memory, you can use ps. on osx, i simply run Activity Monitor. for this exemple, coffee repl yes, you could also easily convert the code to js if you wish. i'll rather have you testing this code. then you can replace generate(duration:3600*1000) with fs.createReadStream('your/file.csv').

@blazerguns
Copy link
Author

I had sometime to play around and I noticed that if I comment out finalData.push(record); from my code, the parser seem to be behaving as you described. It is well under 60MB when parsing. I think we can conclude that the output we are storing in array is the memory eater. Now, for my question, should'nt we have some way to pause the parsing and flush the array off and continue? Wouldnt this help for massive files?

@wdavidw
Copy link
Member

wdavidw commented Apr 26, 2016

about the pause/resume mechanism, this parser is a good Node.js citizen and implement the official Stream API which itself include those methods.

@wdavidw wdavidw closed this as completed Apr 26, 2016
@ghost
Copy link

ghost commented Sep 25, 2017

@wdavidw csv-parser doing the greate job to synchronously parse the data record by record. But whenever parser triggers some error, already triggered records in the transformer are continue to parse. I'm looking for something related to parser stream pause and resume but I couldn't find anything, maybe my bad. What I expect something like the following:-

parser.on('error', (err) => {
 console.log('parser error');
});
const transformer = transform(async (record, callback) => {
 try {
  console.log('record');
  await this.processRecord(record);
  console.log('record processed');
  callback();
 } catch (err) {
  console.log('Some error');
 }
}, {parallel: 1});
transformer.on('error', (err) => {
 console.log('transformer error');
});

processRecord function is consists of IO operations.

And suppose a 3 line csv file where 3rd line is an error, I expect the log to be

record
record processed
record
record processed
parse error

But I got the logs as follows:

record
parse error
record processed
record
record processed

So I want to know is there any possibility to pause and resume parser, as I need to parse line one by one so that when an error occurred I can simply handle that. I'm expecting something like this: asynchronous processing in line-by-line

@wdavidw
Copy link
Member

wdavidw commented Sep 25, 2017

The parser implement the Node.js Stream API which has pause and resume functions but I doubt they will provide what you wish. The problem with CSV is that we cant process line by line easily because a linefeed (which we call rowDelimiter) can be inside a quoted cell and that makes it unclear on how we should interpret what is a rowDelimiter in case of an error.

@wdavidw
Copy link
Member

wdavidw commented Sep 25, 2017

If you are using the stream-transform package, then you should be able to set the "parallel" option to "1" to avoid parallel execution. Seems like this could solve you're issue.

@ghost
Copy link

ghost commented Sep 25, 2017

But it is possible to emit an event just after receives a complete record, is it? and continue further only if the parser is not in paused state, isn't it possible? If so it might be a greate addition. Just curious to ask that's all :)

@ghost
Copy link

ghost commented Sep 25, 2017

If you are using the stream-transform package, then you should be able to set the "parallel" option to "1" to avoid parallel execution

Yes I am using that thing, but whenever an error emitted from parser, the transformer continues to process pending records.

@wdavidw
Copy link
Member

wdavidw commented Sep 25, 2017

CSV parsing is sequential, so no problem from here. Following the Stream API, the throttling is handled by the consumer which received the data after the parse (eg stream-transform).

@wdavidw
Copy link
Member

wdavidw commented Sep 25, 2017

When an error occured during parsing, no further records are emited by the paser.

@ghost
Copy link

ghost commented Sep 25, 2017

When an error occured during parsing, no further records are emited by the paser.

Suppose a 10 line record, if 6th line is an error containing one, the parser will emit 5 records along with an error event ryt?
Despite the transform is parsing one by one(assumes IO operation is occured in transform) then the error event receiver will executes in between the transformer, is it? So that after processing the error handler, transform continues to execute the it's logic for pending records which is unnecessary. If a parser pause and resume is available, then we can pause on first record, then do stuffs, and continue and so on ? Did you get what I mean?

@wdavidw
Copy link
Member

wdavidw commented Sep 25, 2017

The error event emit by the stream-writer (the parser in our case) will prevent new transform functions to be called in the stream-reader, so disabling parallel execution (in case you have one) should achieve what you wish. I dont see the need to pause/resume the parser (even if you can do this with the steam api)

@ghost
Copy link

ghost commented Sep 26, 2017

ok @wdavidw , I made changes to my code. Thanks for your prompt support and gr8 module :)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants