Skip to content

Commit

Permalink
Merge branch 'master' of github.com:nodejitsu/txn
Browse files Browse the repository at this point in the history
  • Loading branch information
jcrugzz committed Dec 1, 2014
2 parents b49767d + acba9c8 commit d5d80ab
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 27 deletions.
13 changes: 6 additions & 7 deletions lib/lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// limitations under the License.

var assert = require('assert')
, log4js = require('log4js')
, debug = require('debug')
, request_mod = require('request')
;

Expand Down Expand Up @@ -85,8 +85,7 @@ function req_json(opts, callback) {
if(opts.method && opts.method !== 'GET')
opts.headers['content-type'] = 'application/json';

var LOG = opts.log || log4js.getLogger('txn');
LOG.setLevel(process.env.txn_log_level || 'info');
var LOG = debug('txn');

if(opts.json)
opts.body = JS(opts.json);
Expand All @@ -98,7 +97,7 @@ function req_json(opts, callback) {
function on_timeout() {
timed_out = true;
var msg = 'Timeout: ' + JS(opts);
LOG.warn(msg);
LOG(msg);

if(in_flight && in_flight.end)
in_flight.end();
Expand All @@ -110,11 +109,11 @@ function req_json(opts, callback) {
}

var method = opts.method || 'GET';
LOG.debug(method + ' ' + opts.uri);
LOG(method + ' ' + opts.uri);
in_flight = DEFS.request(opts, function(er, resp, body) {
clearTimeout(timer);
if(timed_out) {
LOG.debug('Ignoring timed-out response: ' + opts.uri);
LOG('Ignoring timed-out response: ' + opts.uri);
return;
}

Expand All @@ -131,4 +130,4 @@ function req_json(opts, callback) {
return in_flight;
}

}) // defaultable
}, require) // defaultable
36 changes: 17 additions & 19 deletions lib/txn.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
// limitations under the License.

var util = require('util')
, log4js = require('log4js')
, debug = require('debug')
, events = require('events')
, assert = require('assert')
, obj_diff = require('obj_diff')
, tick = typeof global.setImmediate !== 'function' ? process.nextTick : setImmediate
;

require('defaultable').def(module,
{ 'log' : log4js.getLogger('txn')
{ 'log' : debug('txn')
, 'log_level' : process.env.txn_log_level || 'info'
, 'timestamps' : false
, 'timestamp_generator': function() { return new Date(); }
Expand Down Expand Up @@ -121,8 +121,6 @@ function Transaction (opts) {
self.log = opts.log || DEFAULT.log;
self.timestamp_generator = opts.timestamp_generator || DEFAULT.timestamp_generator;

self.log.setLevel(DEFAULT.log_level);

// These can be falsy.
self.after = ('after' in opts) ? opts.after : DEFAULT.after;
self.create = ('create' in opts) ? opts.create : DEFAULT.create;
Expand Down Expand Up @@ -173,7 +171,7 @@ Transaction.prototype.attempt = function() {
var self = this;

if(self.tries >= self.max_tries) {
self.log.debug('Too many tries ('+self.name+'): ' + self.tries);
self.log('Too many tries ('+self.name+'): ' + self.tries);
return self.emit('exhausted', self.tries);
}

Expand All @@ -185,12 +183,12 @@ Transaction.prototype.attempt = function() {
return go(); // No delay.
else if(self.tries == 0 && self.after) {
delay = self.after;
self.log.debug('Initial delay before first run ('+self.name+'): ' + delay);
self.log('Initial delay before first run ('+self.name+'): ' + delay);
self.retry_timer = setTimeout(go, delay);
}
else {
delay = self.delay * Math.pow(2, self.tries);
self.log.debug('Delay until next attempt ('+self.name+'): ' + delay);
self.log('Delay until next attempt ('+self.name+'): ' + delay);
self.retry_timer = setTimeout(go, delay);
}

Expand All @@ -206,13 +204,13 @@ Transaction.prototype.attempt = function() {

Transaction.prototype.run = function() {
var self = this;
self.log.debug('Transaction '+self.name+' (' + self.tries + '/' + self.max_tries + '): ' + self.uri);
self.log('Transaction '+self.name+' (' + self.tries + '/' + self.max_tries + '): ' + self.uri);

if(self.doc) {
self.log.debug('Skipping fetch, assuming known doc: ' + self.doc._id)
self.log('Skipping fetch, assuming known doc: ' + self.doc._id)
run_op(self.doc, !('_rev' in self.doc));
} else {
self.log.debug('Fetching doc: ' + (self.id || self.uri_to_id(self.uri)));
self.log('Fetching doc: ' + (self.id || self.uri_to_id(self.uri)));

self.req.method = 'GET';
self.req.uri = self.uri;
Expand All @@ -226,7 +224,7 @@ Transaction.prototype.run = function() {

if(is_create) {
couch_doc = { "_id": decodeURIComponent(self.id || self.uri_to_id(self.uri)) };
self.log.debug('Creating new doc: ' + lib.JS(couch_doc));
self.log('Creating new doc: ' + lib.JS(couch_doc));
}

return run_op(couch_doc, is_create);
Expand Down Expand Up @@ -265,7 +263,7 @@ Transaction.prototype.run = function() {
delete self.op_timer;

if(already_done) {
self.log.debug('Ignoring operation after timeout');
self.log('Ignoring operation after timeout');
return self.emit('ignore');
} else
already_done = true;
Expand All @@ -274,18 +272,18 @@ Transaction.prototype.run = function() {
return self.emit('error', er);

if(new_doc) {
self.log.debug('Using new doc: ' + lib.JS(new_doc));
self.log('Using new doc: ' + lib.JS(new_doc));
self.emit('replace', doc, new_doc);
doc = new_doc;
}

if(obj_diff.atmost(original.doc, doc, {})) {
self.log.debug('Skipping txn update for unchanged doc: ' + original.id);
self.log('Skipping txn update for unchanged doc: ' + original.id);
return self.emit('done', doc);
}

var diff = obj_diff.diff(original.doc, doc);
self.log.debug('Operation diff (%s): %j', self.name, diff)
self.log('Operation diff (%s): %j', self.name, diff)
self.emit('change', diff);

doc._id = original.id;
Expand All @@ -303,13 +301,13 @@ Transaction.prototype.run = function() {
, json : doc
};

self.log.debug('Updating transaction ('+self.name+'): ' + update_req.uri);
self.log('Updating transaction ('+self.name+'): ' + update_req.uri);
self.stores += 1

lib.req_couch(update_req, function(er, resp, result) {
if(er && resp && resp.statusCode === 409 && result.error === "conflict") {
// Retryable error.
self.log.debug('Conflict: '+self.name);
self.log('Conflict: '+self.name);
self.emit('conflict', self.tries);
return self.attempt();
}
Expand All @@ -330,7 +328,7 @@ Transaction.prototype.cancel = function() {
clearTimeout(self.retry_timer);
clearTimeout(self.op_timer);

self.log.debug('Cancelling transaction try: ' + self.tries);
self.log('Cancelling transaction try: ' + self.tries);
self.retry_timer = null;
self.op_timer = null;

Expand All @@ -344,4 +342,4 @@ Transaction.prototype.uri_to_id = function(uri) {
return parts[ parts.length - 1 ];
}

}) // defaultable
}, require) // defaultable
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
, "dependencies" : { "defaultable" : "~0.7.2"
, "request" : "^2.48.0"
, "browser-request": "~0.2.1"
, "log4js" : "~0.5.4"
, "debug" : "~0.7.2"
, "obj_diff" : "~0.2.0"
}
, "browser": { "request" : "browser-request"
Expand Down

0 comments on commit d5d80ab

Please sign in to comment.