Skip to content

Commit

Permalink
Adding a way to manage the read stream
Browse files Browse the repository at this point in the history
For very large file ( 1GB+ ) the read stream will pull in data
4-5x faster then node can write it out which causes excessive
memory usage. By providing a way to manage the read stream via
pause / resume methods, one is able to parse & manage very large files
  • Loading branch information
esatterwhite committed Feb 25, 2012
1 parent c5f93d2 commit e9260db
Showing 1 changed file with 65 additions and 2 deletions.
67 changes: 65 additions & 2 deletions lib/ya-csv.js
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -4,6 +4,24 @@ var sys = require('sys'),


var csv = exports; var csv = exports;


/**
* Provides Base CSV Reading capabilities
* @class CsvReader
* @extends EventEmitter
*/

/**
* The constructor
* @constructor
* @param readStream {ReadStread} An instance of the ReadStream Cl
* @param options {Object} optional paramaters for the reader <br/>
* - rows {Number}
* - openRecord {Array}
* - openField {String}
* - lastChar {String}
* - quetedField {Boolean}
* - commentedLine{Boolean}
*/
var CsvReader = csv.CsvReader = function(readStream, options) { var CsvReader = csv.CsvReader = function(readStream, options) {
var self = this; var self = this;
_setOptions(self, options); _setOptions(self, options);
Expand All @@ -21,10 +39,55 @@ var CsvReader = csv.CsvReader = function(readStream, options) {
readStream.addListener('data', this.parse.bind(this)); readStream.addListener('data', this.parse.bind(this));
readStream.addListener('error', this.emit.bind(this, 'error')); readStream.addListener('error', this.emit.bind(this, 'error'));
readStream.addListener('end', this.end.bind(this)); readStream.addListener('end', this.end.bind(this));


/**
* Pauses the readStream
* @method pause
* @return {ReadStream} the readstream instance
*/
self.pause = function(){
readStream.pause();
return self;
}

/**
* Resumes the readStream
* @method resume
* @return {ReadStream} the readstream instance
*/
self.resume = function(){
readStream.resume();
return self;
}
/**
* Closes the readStream
* @method destroy
* @return {ReadStream} the readstream instance
*/
self.destoy = function(){
readStream.destroy();
return self;
}
/**
* Closes the readStream when its file stream has been drained
* @method destroySoon
* @return {ReadStream} the readstream instance
*/
self.destroySoon = function(){
readstream.destroy();
return self;
}
} }

}; };
sys.inherits(CsvReader, events.EventEmitter); sys.inherits(CsvReader, events.EventEmitter);


/**
* Parses incoming data as a readable CSV file
* @method parse
* @param data {Array} Array of values to parse from the incommin file
*/
CsvReader.prototype.parse = function(data) { CsvReader.prototype.parse = function(data) {
var ps = this.parsingStatus; var ps = this.parsingStatus;
for (var i = 0; i < data.length; i++) { for (var i = 0; i < data.length; i++) {
Expand Down Expand Up @@ -99,6 +162,7 @@ CsvReader.prototype.parse = function(data) {
} }
}; };



CsvReader.prototype.end = function() { CsvReader.prototype.end = function() {
var ps = this.parsingStatus; var ps = this.parsingStatus;
if (ps.quotedField) { if (ps.quotedField) {
Expand All @@ -114,7 +178,6 @@ CsvReader.prototype.end = function() {
this.emit('end'); this.emit('end');
} }
} }

CsvReader.prototype._isEscapable = function(c) { CsvReader.prototype._isEscapable = function(c) {
if ((c === this.escapechar) || (c === this.quotechar)) { if ((c === this.escapechar) || (c === this.quotechar)) {
return true; return true;
Expand Down Expand Up @@ -219,7 +282,7 @@ function _appendField(outArr, writer, field) {
if(typeof(field) !== 'undefined' && field !== null) { if(typeof(field) !== 'undefined' && field !== null) {
field = String(field); field = String(field);
} else { } else {
outArr.push(''); outArr.push('');
return; return;
} }
} }
Expand Down

0 comments on commit e9260db

Please sign in to comment.