Permalink
Browse files

Merge branch 'release/0.5.1'

  • Loading branch information...
2 parents 19bb645 + f665ff9 commit 3de22a4e53ba470afcb7a6ec14b2be32acdaebfb @stomita committed Jan 11, 2013
Showing with 286 additions and 92 deletions.
  1. +61 −30 README.md
  2. +77 −0 examples/migration.js
  3. +69 −0 examples/pipeline.js
  4. +10 −21 lib/bulk.js
  5. +7 −22 lib/query.js
  6. +61 −18 lib/record-stream.js
  7. +1 −1 package.json
View
@@ -89,9 +89,9 @@ conn.login(username, password, function(err, userInfo) {
```javascript
var sf = require('node-salesforce');
var conn = new sf.Connection({
- // you can change loginUrl to connect to sandbox or prerelease env.
- // loginUrl : 'https://test.salesforce.com',
oauth2 : {
+ // you can change loginUrl to connect to sandbox or prerelease env.
+ // loginUrl : 'https://test.salesforce.com',
clientId : '<your Salesforce OAuth2 client ID is here>',
clientSecret : '<your Salesforce OAuth2 client secret is here>',
redirectUri : '<callback URI is here>'
@@ -124,22 +124,23 @@ conn.logout(function(err) {
### Authorization Request
+Following example is using express.js framework
+
```javascript
var sf = require('node-salesforce');
-//
-// Following example is using express.js framework
-//
+// OAuth2 client information can be shared with multiple connections.
+var oauth2 = new sf.OAuth2({
+ // you can change loginUrl to connect to sandbox or prerelease env.
+ // loginUrl : 'https://test.salesforce.com',
+ clientId : '<your Salesforce OAuth2 client ID is here>',
+ clientSecret : '<your Salesforce OAuth2 client secret is here>',
+ redirectUri : '<callback URI is here>'
+});
+
// get authz url and redirect to it.
app.get('/oauth2/auth', function(req, res) {
- var conn = new sf.Connection({
- oauth2 : {
- clientId : '<your Salesforce OAuth2 client ID is here>',
- clientSecret : '<your Salesforce OAuth2 client secret is here>',
- redirectUri : '<callback URI is here>'
- }
- });
- res.redirect(conn.oauth2.getAuthorizationUrl({ scope : 'api id web' }));
+ res.redirect(oauth2.getAuthorizationUrl({ scope : 'api id web' }));
});
```
@@ -148,13 +149,7 @@ app.get('/oauth2/auth', function(req, res) {
```javascript
// pass received authz code and get access token
app.get('/oauth2/callback', function(req, res) {
- var conn = new sf.Connection({
- oauth2 : {
- clientId : '<your Salesforce OAuth2 client ID is here>',
- clientSecret : '<your Salesforce OAuth2 client secret is here>',
- redirectUri : '<callback URI is here>'
- }
- });
+ var conn = new sf.Connection({ oauth2 : oauth2 });
var code = req.param('code');
conn.authorize(code, function(err, userInfo) {
if (err) { return console.error(err); }
@@ -416,7 +411,7 @@ batch.on("queue", function(batchInfo) { // fired when batch request is queued in
jobId = batchInfo.jobId);
// ...
});
-batch.execute(acconts);
+batch.execute(accounts);
// and check the status later.
var job = conn.bulk.job(jobId);
@@ -435,7 +430,7 @@ batch.on("response", function(rets) { // fired when batch finished and result re
});
// or use Bulk#load() method in one call to upload records and fetch results.
-conn.bulk.load("Account", "insert", acconts, function(err, rets) {
+conn.bulk.load("Account", "insert", accounts, function(err, rets) {
if (err) { return console.error(err); }
for (var i=0; i < rets.length; i++) {
if (rets[i].success) {
@@ -448,12 +443,12 @@ conn.bulk.load("Account", "insert", acconts, function(err, rets) {
});
// same as following calls
-conn.sobject("Account").insertBulk(acconts, function(err, rets) {
+conn.sobject("Account").insertBulk(accounts, function(err, rets) {
// ...
});
//
-conn.sobject("Account").bulkload("insert").execute(acconts, function(err, rets) {
+conn.sobject("Account").bulkload("insert").execute(accounts, function(err, rets) {
// ...
});
```
@@ -495,7 +490,19 @@ conn.bulk.load("Account", "insert", csvFileIn, function(err, rets) {
## Record Stream Pipeline
-###
+Record stream is a stream system which regards records in its stream, similar to Node.js's standard readable/writable streams.
+
+Query object returned by Connection#query() / SObject#find() method is considered as InputRecordStream which emits event "record" when received record from server.
+
+Batch object returned by Job#createBatch() / Bulk#load() / SObject#bulkload() method is considered as OutputRecordStream and have send() and end() method to accept incoming record.
+
+You can use InputRecordStream#pipe(outputRecordStream) to pipe record stream.
+
+RecordStream can be converted to usual Node.js's stream object by calling RecordStream#stream() method.
+By default (and only currently) records are serizalized to CSV string.
+
+### Piping Query Record Stream
+
```javascript
// DELETE FROM Account WHERE CreatedDate < LAST_YEAR
var Account = conn.sobject('Account');
@@ -527,21 +534,41 @@ Opportunity.find({ "Account.Id" : accId },
// Export all account records to CSV file
var csvFileOut = require('fs').createWriteStream('path/to/Account.csv');
conn.query("SELECT Id, Name, Type, BillingState, BillingCity, BillingStreet FROM Account")
- .stream()
+ .stream() // Convert to Node.js's usual readable stream.
.pipe(csvFileOut);
```
-### Data migration via bulk stream
+### Record Stream Filters
```javascript
+// Map record and pass to downstream
+conn.sobject('Contact')
+ .find({}, { Id: 1, Name: 1 })
+ .pipe(sf.RecordStream.map(function(r) {
+ return { ID: r.Id, FULL_NAME: r.Name }
+ }))
+ .stream().pipe(fs.createWriteStream("Contact.csv"));
+
+// Filter only matching record to pass downstream
+conn.sobject('Lead')
+ .find({}, { Id: 1, Name: 1 })
+ .pipe(sf.RecordStream.filter(function(r) {
+ return { ID: r.Id, FULL_NAME: r.Name }
+ }))
+ .stream().pipe(fs.createWriteStream("Contact.csv"));
+```
+
+### Data Migration using Bulkload Batch Record Stream
-// Connection for org which migrate to
+```javascript
+
+// Connection for org which migrating data from
var conn1 = new sf.Connection(
// ...
);
-// Connection for org which migrate to
+// Connection for org which migrating data to
var conn2 = new sf.Connection(
// ...
);
@@ -551,16 +578,20 @@ var job = conn2.bulk.createJob("Account", "insert");
var batch = job.createBatch();
query.pipe(batch);
batch.on('queue', function() {
- //...
jobId = job.id;
batchId = batch.id;
+ //...
})
```
## Change History
+v0.5.1 (Jan 11, 2013):
+
+* Move Query#stream() method to RecordStream#stream() to support stream serialization even in filtered stream.
+
v0.5.0 (Jan 11, 2013):
* Support Bulk API for insert/update/upsert/delete/hardDelete operation (except for 'query').
View
@@ -0,0 +1,77 @@
+var async = require('async');
+var sf = require('../lib/salesforce');
+
+var config = {};// { logLevel: "DEBUG" };
+var conn1 = new sf.Connection(config);
+var conn2 = new sf.Connection(config);
+
+async.waterfall([
+ function(next) {
+ async.parallel([
+ function(cb) {
+ conn1.login(process.env.SF_USERNAME_1, process.env.SF_PASSWORD_1, cb);
+ },
+ function(cb) {
+ conn2.login(process.env.SF_USERNAME_2, process.env.SF_PASSWORD_2, cb);
+ }
+ ], next);
+ },
+ function(rets, next) {
+ conn2.sobject('Account').count(next);
+ },
+ function(cnt, next) {
+ console.log("Account count in conn2 : " + cnt);
+ async.parallel([
+ function(cb) {
+ conn1.sobject('Account').describe(cb);
+ },
+ function(cb) {
+ conn2.sobject('Account').describe(cb);
+ }
+ ], next);
+ },
+ function(sobjects, next) {
+ var so1 = sobjects[0], so2 = sobjects[1];
+ var fields1 = {};
+ so1.fields.forEach(function(f) { fields1[f.name] = 1; });
+ var fields2 = {};
+ so2.fields.forEach(function(f) {
+ if (fields1[f.name] && f.updateable && !f.custom && f.type !== 'reference') {
+ fields2[f.name] = 1;
+ }
+ });
+
+ conn1.sobject('Account').find({}, fields2)
+ .pipe(conn2.bulk.load('Account', 'insert'))
+ .on('response', function(res) { next(null, res); })
+ .on('error', function(err){ next(err); });
+ },
+ function(rets, next) {
+ var success = rets.filter(function(r) { return r.success; }).length;
+ var failure = rets.length - success;
+ console.log("bulkload sucess = " + success + ", failure = " + failure);
+ conn2.sobject('Account').count(next);
+ },
+ function(cnt, next) {
+ console.log("Account count in conn2 : " + cnt);
+ conn2.sobject('Account').find({ CreatedDate : sf.Date.TODAY }).exec(next);
+ },
+ function(records, next) {
+ console.log("deleting created records ("+records.length+")");
+ conn2.bulk.load('Account', 'delete', records, next);
+ },
+ function(rets, next) {
+ var success = rets.filter(function(r) { return r.success; }).length;
+ var failure = rets.length - success;
+ console.log("delete sucess = " + success + ", failure = " + failure);
+ conn2.sobject('Account').count(next);
+ },
+ function(cnt, next) {
+ console.log("Account count in conn2 : " + cnt);
+ next();
+ }
+], function(err, res) {
+ if (err) {
+ console.error(err);
+ }
+});
View
@@ -0,0 +1,69 @@
+var fs = require('fs');
+var async = require('async');
+var sf = require('../lib/salesforce');
+
+var config = {};// { logLevel: "DEBUG" };
+var conn = new sf.Connection(config);
+
+var Opportunity = conn.sobject('Opportunity');
+
+async.waterfall([
+ function(next) {
+ conn.login(process.env.SF_USERNAME, process.env.SF_PASSWORD, next);
+ },
+ function(sobjects, next) {
+ Opportunity.find({ AccountId: { $ne: null }}, { Id: 1, Name: 1, "Account.Name": 1 })
+ .pipe(sf.RecordStream.map(function(r) {
+ r.Name = r.Account.Name + ' *** ' + r.Name;
+ return r;
+ }))
+ .pipe(Opportunity.updateBulk())
+ .on('response', function(rets) {
+ next(null, rets);
+ })
+ .on('error', function(err) {
+ next(err);
+ });
+
+ },
+ function(rets, next) {
+ var success = rets.filter(function(r) { return r.success; }).length;
+ var failure = rets.length - success;
+ console.log("bulk update sucess = " + success + ", failure = " + failure);
+ next();
+ },
+ function(next) {
+ Opportunity.find({ Name : { $like: '% *** %' }}, { Id: 1, Name: 1 })
+ .pipe(sf.RecordStream.map(function(r) {
+ r.Name = r.Name.replace(/^.+ \*\*\* /g, '');
+ return r;
+ }))
+ .pipe(Opportunity.updateBulk())
+ .on('response', function(rets) {
+ next(null, rets);
+ })
+ .on('error', function(err) {
+ next(err);
+ });
+ },
+ function(rets, next) {
+ var success = rets.filter(function(r) { return r.success; }).length;
+ var failure = rets.length - success;
+ console.log("bulk update sucess = " + success + ", failure = " + failure);
+ next();
+ },
+ function(next) {
+ Opportunity
+ .find({}, { Id: 1, Name: 1, Amount: 1, StageName: 1, CreatedDate: 1 })
+ .pipe(sf.RecordStream.filter(function(r) {
+ return r.Amount > 500000;
+ }))
+ .stream().pipe(fs.createWriteStream("opps.csv"))
+ .on('end', function() { next(); })
+ .on('error', function(err) { next(err); });
+ }
+], function(err, res) {
+ if (err) {
+ console.error(err);
+ }
+});
View
@@ -219,13 +219,16 @@ Job.prototype._changeState = function(state, callback) {
/*--------------------------------------------*/
/**
- *
+ * Batch (extends RecordStream implements Sendable)
*/
var Batch = function(conn, job, batchId) {
+ Batch.super_.apply(this);
+ this.sendable = true;
this._conn = conn;
this.job = job;
this.id = batchId;
- this.sendable = true;
+ this._csvStream = new RecordStream.CSVStream();
+ this._csvStream.stream().pipe(this.stream());
};
util.inherits(Batch, RecordStream);
@@ -251,34 +254,20 @@ Batch.prototype.execute = function(input, callback) {
} else {
var data;
if (_.isArray(input)) {
- var records = _.map(input, function(rec) {
- rec = _.clone(rec);
- if (self.job.operation === "insert") {
- delete rec.Id;
- }
- delete rec.type;
- delete rec.attributes;
- return rec;
- });
- data = CSV.toCSV(records);
+ _.forEach(input, function(record) { self.send(record); });
} else if (_.isString(input)){
data = input;
+ var stream = this.stream();
+ stream.write(data);
+ stream.end();
}
- var stream = this.stream();
- stream.write(data);
- stream.end();
}
};
/**
*
*/
Batch.prototype.send = function(record) {
- if (!this._csvStream) {
- var csvStream = new RecordStream.CSVStream();
- csvStream.stream().pipe(this.stream());
- this._csvStream = csvStream;
- }
record = _.clone(record);
if (this.job.operation === "insert") {
delete record.Id;
@@ -393,7 +382,7 @@ Batch.prototype.retrieve = function(callback) {
};
/**
- *
+ * @override
*/
Batch.prototype.stream = function() {
if (!this._stream) {
Oops, something went wrong.

0 comments on commit 3de22a4

Please sign in to comment.