Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

move Query#stream() to RecordStream#stream()

  • Loading branch information...
commit 96c685f4865e51a613049e646f3aab8182cdb37c 1 parent 26ae5dd
@stomita authored
Showing with 78 additions and 61 deletions.
  1. +10 −21 lib/bulk.js
  2. +7 −22 lib/query.js
  3. +61 −18 lib/record-stream.js
View
31 lib/bulk.js
@@ -219,13 +219,16 @@ Job.prototype._changeState = function(state, callback) {
/*--------------------------------------------*/
/**
- *
+ * Batch (extends RecordStream implements Sendable)
*/
var Batch = function(conn, job, batchId) {
+ Batch.super_.apply(this);
+ this.sendable = true;
this._conn = conn;
this.job = job;
this.id = batchId;
- this.sendable = true;
+ this._csvStream = new RecordStream.CSVStream();
+ this._csvStream.stream().pipe(this.stream());
};
util.inherits(Batch, RecordStream);
@@ -251,22 +254,13 @@ Batch.prototype.execute = function(input, callback) {
} else {
var data;
if (_.isArray(input)) {
- var records = _.map(input, function(rec) {
- rec = _.clone(rec);
- if (self.job.operation === "insert") {
- delete rec.Id;
- }
- delete rec.type;
- delete rec.attributes;
- return rec;
- });
- data = CSV.toCSV(records);
+ _.forEach(input, function(record) { self.send(record); });
} else if (_.isString(input)){
data = input;
+ var stream = this.stream();
+ stream.write(data);
+ stream.end();
}
- var stream = this.stream();
- stream.write(data);
- stream.end();
}
};
@@ -274,11 +268,6 @@ Batch.prototype.execute = function(input, callback) {
*
*/
Batch.prototype.send = function(record) {
- if (!this._csvStream) {
- var csvStream = new RecordStream.CSVStream();
- csvStream.stream().pipe(this.stream());
- this._csvStream = csvStream;
- }
record = _.clone(record);
if (this.job.operation === "insert") {
delete record.Id;
@@ -393,7 +382,7 @@ Batch.prototype.retrieve = function(callback) {
};
/**
- *
+ * @override
*/
Batch.prototype.stream = function() {
if (!this._stream) {
View
29 lib/query.js
@@ -6,9 +6,12 @@ var util = require('util'),
RecordStream = require("./record-stream");
/**
- * Query
+ * Query (extends RecordStream implements Receivable)
*/
var Query = module.exports = function(conn, config, locator) {
+ Query.super_.apply(this);
+ this.receivable = true;
+
this._conn = conn;
if (_.isString(config)) { // if query config is string, it is given in SOQL.
this._soql = config;
@@ -22,8 +25,6 @@ var Query = module.exports = function(conn, config, locator) {
this._buffer = [];
this._paused = true;
this._closed = false;
-
- Query.super_.apply(this);
};
util.inherits(Query, RecordStream);
@@ -236,26 +237,10 @@ Query.prototype.execute = function(options, callback) {
/**
- * Auto start query when pipe called.
+ * Auto start query when pipe() is called.
*/
Query.prototype.pipe = function() {
- var dist = RecordStream.prototype.pipe.apply(this, arguments);
+ var dest = RecordStream.prototype.pipe.apply(this, arguments);
this.resume();
- return dist;
+ return dest;
};
-
-/**
- * Create ReadableStream instance for serializing records
- */
-Query.prototype.stream = function(type) {
- type = type || 'csv';
- var recStream;
- if (type === "csv") {
- recStream = new RecordStream.CSVStream();
- }
- if (!recStream) {
- throw new Error("No stream type defined for '"+type+"'.");
- }
- this.pipe(recStream);
- return recStream.stream(); // get readable stream instance
-};
View
79 lib/record-stream.js
@@ -6,7 +6,8 @@ var events = require('events'),
CSV = require('./csv');
/**
- *
+ * RecordStream
+ * @abstract
*/
var RecordStream = function() {
this.sendable = false;
@@ -23,17 +24,17 @@ var RecordStream = function() {
util.inherits(RecordStream, events.EventEmitter);
-/*--- Output Record Stream methods ---*/
+/*--- Output Record Stream methods (Sendable) ---*/
/**
- * Receive record into stream.
+ * Output record into stream.
*/
RecordStream.prototype.send = function(record) {
// abstract
};
/**
- * Resume record fetch and query execution
+ * End sending records into stream.
*/
RecordStream.prototype.end = function() {
this.sendable = false;
@@ -56,7 +57,7 @@ RecordStream.prototype.destroySoon = function() {
};
-/*--- Input Record Stream methods ---*/
+/*--- Input Record Stream methods (Receivable) ---*/
/*
* Pause record fetch
@@ -152,7 +153,25 @@ RecordStream.prototype.pipe = function (dest, options) {
return dest;
};
-
+/**
+ * Create Stream instance for serializing/deserialize records
+ */
+RecordStream.prototype.stream = function(type) {
+ type = type || 'csv';
+ var recStream;
+ if (type === "csv") {
+ recStream = new RecordStream.CSVStream();
+ }
+ if (!recStream) {
+ throw new Error("No stream type defined for '"+type+"'.");
+ }
+ if (this.receivable) {
+ this.pipe(recStream);
+ } else if (this.sendable) {
+ recStream.pipe(this);
+ }
+ return recStream.stream(); // get Node.js stream instance
+};
/* --------------------------------------------------- */
/**
@@ -160,6 +179,7 @@ RecordStream.prototype.pipe = function (dest, options) {
*/
RecordStream.map = function(fn) {
var rstream = new RecordStream();
+ rstream.receivable = true;
rstream.send = function(record) {
var rec = fn(record) || record; // if not returned record, use same record
this.emit('record', rec);
@@ -173,6 +193,7 @@ RecordStream.map = function(fn) {
*/
RecordStream.filter = function(fn) {
var rstream = new RecordStream();
+ rstream.receivable = true;
rstream.send = function(record) {
if (fn(record)) {
this.emit('record', record);
@@ -185,14 +206,18 @@ RecordStream.filter = function(fn) {
/* --------------------------------------------------- */
/**
- * CSVStream (extends OutputRecordStream)
+ * CSVStream (extends RecordStream implements Receivable, Sendable)
*/
var CSVStream = function(headers) {
+ var self = this;
this.sendable = true;
- this.readable = true;
+ this.receivable = true;
this.headers = headers;
this.wroteHeaders = false;
this._stream = new Stream();
+ this._buffer = [];
+ this._stream.on('data', function(data) { self._handleData(data); });
+ this._stream.on('end', function(data) { self._handleEnd(data); });
};
util.inherits(CSVStream, RecordStream);
@@ -208,15 +233,7 @@ CSVStream.prototype.send = function(record) {
this._stream.emit("data", CSV.arrayToCSV(this.headers) + "\n");
this.wroteHeaders = true;
}
- var row = [];
- _.forEach(this.headers, function(header) {
- var value = record[header];
- if (_.isNull(value) || _.isUndefined(value) || typeof value !== 'string') {
- value = '';
- }
- row.push(String(value));
- });
- this._stream.emit("data", CSV.arrayToCSV(row) + "\n");
+ this._stream.emit("data", CSV.recordToCSV(record, this.headers) + "\n");
};
/**
@@ -230,7 +247,33 @@ CSVStream.prototype.end = function(record) {
};
/**
- * Get delegating ReadableStream
+ *
+ */
+CSVStream.prototype._handleData = function(data, enc) {
+ this._buffer.push([ data, enc ]);
+};
+
+/**
+ *
+ */
+CSVStream.prototype._handleEnd = function(data, enc) {
+ var self = this;
+ if (data) {
+ this._buffer.push([ data, enc ]);
+ }
+ data = this._buffer.map(function(d) {
+ return d[0].toString(d[1] || 'utf-8');
+ }).join('');
+ var records = CSV.parseCSV(data);
+ records.forEach(function(record) {
+ self.emit('record', record);
+ });
+ this.emit('end');
+};
+
+/**
+ * Get delegating Node.js stream
+ * @override
*/
CSVStream.prototype.stream = function(record) {
return this._stream;
Please sign in to comment.
Something went wrong with that request. Please try again.