Skip to content

Commit

Permalink
Merge pull request #7 from aspectit/master
Browse files Browse the repository at this point in the history
Allow feeding manual feeding of data to ya-csv
  • Loading branch information
koles committed Sep 11, 2011
2 parents 2207f69 + 98be9c8 commit d8de316
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 19 deletions.
22 changes: 21 additions & 1 deletion README.md
Expand Up @@ -22,7 +22,8 @@ Current version requires at least Node.js v0.1.99 and it's tested with Node.js v

- event based, suitable for processing big CSV streams
- configurable separator, quote and escape characters (comma, double-quote and double-quote by default)
- ignores lines starting with configurable comment character (off by default)
- ignores lines starting with configurable comment character (off by default)
- supports memory-only streaming

## More examples

Expand Down Expand Up @@ -70,3 +71,22 @@ Convert the `/etc/passwd` file to comma separated format, drop commented lines a
reader.addListener('data', function(data) {
writer.writeRecord(data);
});

Parsing an upload as the data comes in, using node-formidable:

upload_form.onPart = function(part) {
if (!part.filename) { upload_form.handlePart(part); return }

var reader = csv.createCsvFileReader({'comment': '#'});
reader.addListener('data', function(data) {
saveRecord(data);
});

part.on('data', function(buffer) {
// Pipe incoming data into the reader.
reader.parse(buffer);
});
part.on('end', function() {
reader.end()
}
}
45 changes: 27 additions & 18 deletions lib/ya-csv.js
Expand Up @@ -21,23 +21,11 @@ var CsvReader = csv.CsvReader = function(readStream, options) {
commentedLine: false
};

readStream.addListener('data', function(data) { self.parse(data) });
readStream.addListener('error', function() { self.emit('error') });
readStream.addListener('end', function() {
var ps = self.parsingStatus;
if (ps.quotedField) {
self.emit('error', new Error('Input stream ended but closing quotes expected'));
} else {
// dump open record
if (ps.openField) {
self._addField();
}
if (ps.openRecord.length > 0) {
self._addRecord();
}
self.emit('end');
}
});
if (readStream) {
readStream.addListener('data', function(data) { self.parse(data) });
readStream.addListener('error', function() { self.emit('error') });
readStream.addListener('end', function() { self.end() });
}
};
sys.inherits(CsvReader, events.EventEmitter);

Expand Down Expand Up @@ -114,6 +102,22 @@ CsvReader.prototype.parse = function(data) {
}
};

CsvReader.prototype.end = function() {
var ps = this.parsingStatus;
if (ps.quotedField) {
this.emit('error', new Error('Input stream ended but closing quotes expected'));
} else {
// dump open record
if (ps.openField) {
this._addField();
}
if (ps.openRecord.length > 0) {
this._addRecord();
}
this.emit('end');
}
}

CsvReader.prototype._isEscapable = function(c) {
if ((c === this.escapechar) || (c === this.quotechar)) {
return true;
Expand Down Expand Up @@ -169,8 +173,13 @@ csv.createCsvFileReader = function(path, options) {
};

csv.createCsvStreamReader = function(readStream, options) {
if (options === undefined && typeof readStream === 'object') {
options = readStream;
readStream = undefined;
}
options = options || {};
readStream.setEncoding(options.encoding ? options.encoding : 'utf8');
if (readStream)
readStream.setEncoding(options.encoding ? options.encoding : 'utf8');
return new CsvReader(readStream, options);
};

Expand Down
45 changes: 45 additions & 0 deletions test/stream.js
@@ -0,0 +1,45 @@
var csv = require('../lib/ya-csv'),
sys = require('sys'),
fs = require('fs');

sys.debug('start');

if (process.argv.length < 3) {
sys.error("Usage: node " + process.argv[1] + " <csv file>");
process.exit(1);
}

var csvIn = csv.createCsvStreamReader({
'separator': ',',
'quote': '"',
'comment': '#'
});

var lines = 0;
var columns = 0;

csvIn.addListener('end', function() {
sys.debug('end');
sys.debug(columns + ' columns, ' + lines + ' lines');
});

csvIn.addListener('error', function(e) {
sys.debug('error');
sys.debug(e);
});

csvIn.addListener('data', function(data) {
lines++;
columns += data.length;
});

var file = process.argv[2];
var fileIn = fs.createReadStream(file, {flags: 'r', bufferSize: 10});
fileIn.setEncoding('utf8');
fileIn.on('data', function(data) {
sys.debug(data);
csvIn.parse(data);
});
fileIn.on('end', function(data) {
csvIn.end();
});

0 comments on commit d8de316

Please sign in to comment.