Skip to content
This repository has been archived by the owner on Aug 30, 2018. It is now read-only.

Commit

Permalink
Merge branch 'release/0.5.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
stomita committed Jan 11, 2013
2 parents 35259ee + d708f58 commit 19bb645
Show file tree
Hide file tree
Showing 18 changed files with 2,160 additions and 193 deletions.
1 change: 0 additions & 1 deletion .gitignore
@@ -1,7 +1,6 @@
lib-cov
*.seed
*.log
*.csv
*.dat
*.out
*.pid
Expand Down
173 changes: 173 additions & 0 deletions README.md
Expand Up @@ -395,8 +395,181 @@ conn.topic("InvoiceStatementUpdates").subscribe(function(message) {
```


## Bulk Operation

### Loading records

```javascript
// Records to insert in bulk.
var accounts = [
{ Name : 'Account #1', ... },
{ Name : 'Account #2', ... },
{ Name : 'Account #3', ... },
...
];

// Create bulkload job and add batch to execute loading,
var job = conn.bulk.createJob("Account", "insert");
var batch = job.createBatch();
batch.on("queue", function(batchInfo) { // fired when batch request is queued in server.
batchId = batchInfo.id);
jobId = batchInfo.jobId);
// ...
});
batch.execute(acconts);

// and check the status later.
var job = conn.bulk.job(jobId);
var batch = job.batch(batchId);
batch.poll(1000 /* interval(ms) */, 20000 /* timeout(ms) */); // start polling
batch.on("response", function(rets) { // fired when batch finished and result retrieved
if (err) { return console.error(err); }
for (var i=0; i < rets.length; i++) {
if (rets[i].success) {
console.log("#" + (i+1) + " loaded successfully, id = " + rets[i].id);
} else {
console.log("#" + (i+1) + " error occurred, message = " + rets[i].errors.join(', '));
}
}
// ...
});

// or use Bulk#load() method in one call to upload records and fetch results.
conn.bulk.load("Account", "insert", acconts, function(err, rets) {
if (err) { return console.error(err); }
for (var i=0; i < rets.length; i++) {
if (rets[i].success) {
console.log("#" + (i+1) + " loaded successfully, id = " + rets[i].id);
} else {
console.log("#" + (i+1) + " error occurred, message = " + rets[i].errors.join(', '));
}
}
// ...
});

// same as following calls
conn.sobject("Account").insertBulk(acconts, function(err, rets) {
// ...
});

//
conn.sobject("Account").bulkload("insert").execute(acconts, function(err, rets) {
// ...
});
```

### Loading CSV file

```javascript
// CSV file to upload
var csvFileIn = require('fs').createReadStream("path/to/Account.csv");

// Create bulkload job and add batch to execute loading,
var job = conn.bulk.createJob("Account", "insert");
var batch = job.createBatch();
batch.on("queue", function(batchInfo) { // fired when batch request is queued in server.
batchId = batchInfo.id);
jobId = batchInfo.jobId);
// ...
});

// connect any readable stream via pipe method
csvFileIn.pipe(batch.stream());


// or use Bulk#load() method in one call to upload file and fetch results.
conn.bulk.load("Account", "insert", csvFileIn, function(err, rets) {
if (err) { return console.error(err); }
for (var i=0; i < rets.length; i++) {
if (rets[i].success) {
console.log("#" + (i+1) + " loaded successfully, id = " + rets[i].id);
} else {
console.log("#" + (i+1) + " error occurred, message = " + rets[i].errors.join(', '));
}
}
// ...
});


```

## Record Stream Pipeline

###
```javascript
// DELETE FROM Account WHERE CreatedDate < LAST_YEAR
var Account = conn.sobject('Account');
Account.find({ CreatedDate: { $lt: sf.Date.LAST_YEAR }})
.pipe(Account.deleteBulk())
.on('response', function(rets){
// ...
})
.on('error', function(err) {
// ...
});

// UPDATE Opportunity SET Name = Accout.Name + ' - ' + Name WHERE Account.Id = :accId
var Opportunity = conn.sobject('Opportunity');
Opportunity.find({ "Account.Id" : accId },
{ Id: 1, Name: 1, "Account.Name": 1 })
.pipe(sf.RecordStream.map(function(r) {
r.Name = r.Account.Name + ' - ' + r.Name;
return r;
}))
.pipe(Opportunity.updateBulk())
.on('response', function(rets) {
// ...
})
.on('error', function(err) {
// ...
});

// Export all account records to CSV file
var csvFileOut = require('fs').createWriteStream('path/to/Account.csv');
conn.query("SELECT Id, Name, Type, BillingState, BillingCity, BillingStreet FROM Account")
.stream()
.pipe(csvFileOut);

```

### Data migration via bulk stream

```javascript

// Connection for org which migrate to
var conn1 = new sf.Connection(
// ...
);

// Connection for org which migrate to
var conn2 = new sf.Connection(
// ...
);

var query = conn1.query("SELECT Id, Name, Type, BillingState, BillingCity, BillingStreet FROM Account");
var job = conn2.bulk.createJob("Account", "insert");
var batch = job.createBatch();
query.pipe(batch);
batch.on('queue', function() {
//...
jobId = job.id;
batchId = batch.id;
})

```


## Change History

v0.5.0 (Jan 11, 2013):

* Support Bulk API for insert/update/upsert/delete/hardDelete operation (except for 'query').

* Refine Query#pipe to pipe to other output record stream (like bulk upload batch).

* Add Query#stream() method to convert record stream to general node.js readable stream (generates CSV data).


v0.4.0 (Nov 05, 2012):

* Support JSON-style query object to query records other than SOQL, inspired by MongoDB query interface.
Expand Down

0 comments on commit 19bb645

Please sign in to comment.