Skip to content

Commit

Permalink
Add TransactionManager test
Browse files Browse the repository at this point in the history
  • Loading branch information
wltsmrz committed Feb 25, 2015
1 parent ff86d53 commit 76bad86
Show file tree
Hide file tree
Showing 5 changed files with 1,030 additions and 81 deletions.
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -25,7 +25,7 @@
"superagent": "^0.18.0"
},
"devDependencies": {
"assert-diff": "0.0.4",
"assert-diff": "^1.0.1",
"coveralls": "~2.10.0",
"eslint": "^0.13.0",
"gulp": "~3.8.10",
Expand Down
145 changes: 81 additions & 64 deletions src/js/ripple/transactionmanager.js
@@ -1,11 +1,13 @@
var util = require('util');
var assert = require('assert');
var async = require('async');
'use strict';

var util = require('util');
var assert = require('assert');
var async = require('async');
var EventEmitter = require('events').EventEmitter;
var Transaction = require('./transaction').Transaction;
var RippleError = require('./rippleerror').RippleError;
var Transaction = require('./transaction').Transaction;
var RippleError = require('./rippleerror').RippleError;
var PendingQueue = require('./transactionqueue').TransactionQueue;
var log = require('./log').internal.sub('transactionmanager');
var log = require('./log').internal.sub('transactionmanager');

/**
* @constructor TransactionManager
Expand All @@ -20,7 +22,7 @@ function TransactionManager(account) {
this._account = account;
this._accountID = account._account_id;
this._remote = account._remote;
this._nextSequence = void(0);
this._nextSequence = undefined;
this._maxFee = this._remote.max_fee;
this._maxAttempts = this._remote.max_attempts;
this._submissionTimeout = this._remote.submission_timeout;
Expand All @@ -37,7 +39,7 @@ function TransactionManager(account) {

function updatePendingStatus(ledger) {
self._updatePendingStatus(ledger);
};
}

this._remote.on('ledger_closed', updatePendingStatus);

Expand All @@ -47,7 +49,7 @@ function TransactionManager(account) {
// hooking back into ledger_closed
self._remote.on('ledger_closed', updatePendingStatus);
});
};
}

this._remote.on('disconnect', function() {
self._remote.removeListener('ledger_closed', updatePendingStatus);
Expand All @@ -56,7 +58,7 @@ function TransactionManager(account) {

// Query server for next account transaction sequence
this._loadSequence();
};
}

util.inherits(TransactionManager, EventEmitter);

Expand Down Expand Up @@ -96,7 +98,7 @@ TransactionManager.normalizeTransaction = function(tx) {
var transaction = { };
var keys = Object.keys(tx);

for (var i=0; i<keys.length; i++) {
for (var i = 0; i < keys.length; i++) {
var k = keys[i];
switch (k) {
case 'transaction':
Expand Down Expand Up @@ -162,7 +164,8 @@ TransactionManager.prototype._transactionReceived = function(tx) {

if (!(submission instanceof Transaction)) {
// The received transaction does not correlate to one submitted
return this._pending.addReceivedId(hash, transaction);
this._pending.addReceivedId(hash, transaction);
return;
}

// ND: A `success` handler will `finalize` this later
Expand Down Expand Up @@ -197,7 +200,7 @@ TransactionManager.prototype._adjustFees = function() {
transaction.once('presubmit', function() {
transaction.emit('error', 'tejMaxFeeExceeded');
});
};
}

this._pending.forEach(function(transaction) {
if (transaction._setFixedFee) {
Expand All @@ -209,7 +212,8 @@ TransactionManager.prototype._adjustFees = function() {

if (Number(newFee) > self._maxFee) {
// Max transaction fee exceeded, abort submission
return maxFeeExceeded(transaction);
maxFeeExceeded(transaction);
return;
}

transaction.tx_json.Fee = newFee;
Expand Down Expand Up @@ -244,6 +248,10 @@ TransactionManager.prototype._updatePendingStatus = function(ledger) {
assert.strictEqual(typeof ledger.ledger_index, 'number');

this._pending.forEach(function(transaction) {
if (transaction.finalized) {
return;
}

switch (ledger.ledger_index - transaction.submitIndex) {
case 4:
transaction.emit('missing', ledger);
Expand All @@ -253,10 +261,6 @@ TransactionManager.prototype._updatePendingStatus = function(ledger) {
break;
}

if (transaction.finalized) {
return;
}

if (ledger.ledger_index > transaction.tx_json.LastLedgerSequence) {
// Transaction must fail
transaction.emit('error', new RippleError(
Expand All @@ -265,46 +269,54 @@ TransactionManager.prototype._updatePendingStatus = function(ledger) {
});
};

//Fill an account transaction sequence
// Fill an account transaction sequence
TransactionManager.prototype._fillSequence = function(tx, callback) {
var self = this;

function submitFill(sequence, callback) {
var fill = self._remote.transaction();
fill.account_set(self._accountID);
fill.tx_json.Sequence = sequence;
fill.once('submitted', callback);
function submitFill(sequence, fCallback) {
var fillTransaction = self._remote.createTransaction('AccountSet', {
account: self._accountID
});
fillTransaction.tx_json.Sequence = sequence;

// Secrets may be set on a per-transaction basis
if (tx._secret) {
fill.secret(tx._secret);
fillTransaction.secret(tx._secret);
}

fill.submit();
};
fillTransaction.once('submitted', fCallback);
fillTransaction.submit();
}

function sequenceLoaded(err, sequence) {
if (typeof sequence !== 'number') {
return callback(new Error('Failed to fetch account transaction sequence'));
log.info('fill sequence: failed to fetch account transaction sequence');
return callback();
}

var sequenceDif = tx.tx_json.Sequence - sequence;
var sequenceDiff = tx.tx_json.Sequence - sequence;
var submitted = 0;

;(function nextFill(sequence) {
if (sequence >= tx.tx_json.Sequence) {
return;
}

submitFill(sequence, function() {
if (++submitted === sequenceDif) {
async.whilst(
function() {
return submitted < sequenceDiff;
},
function(asyncCallback) {
submitFill(sequence, function(res) {
++submitted;
if (res.engine_result === 'tesSUCCESS') {
self.emit('sequence_filled', err);
}
asyncCallback();
});
},
function() {
if (callback) {
callback();
} else {
nextFill(sequence + 1);
}
});
})(sequence);
};
}
);
}

this._loadSequence(sequenceLoaded);
};
Expand All @@ -318,7 +330,7 @@ TransactionManager.prototype._fillSequence = function(tx, callback) {

TransactionManager.prototype._loadSequence = function(callback) {
var self = this;
var callback = (typeof callback === 'function') ? callback : function(){};
callback = (typeof callback === 'function') ? callback : function() {};

function sequenceLoaded(err, sequence) {
if (err || typeof sequence !== 'number') {
Expand All @@ -331,7 +343,7 @@ TransactionManager.prototype._loadSequence = function(callback) {
self._nextSequence = sequence;
self.emit('sequence_loaded', sequence);
callback(err, sequence);
};
}

this._account.getNextSequence(sequenceLoaded);
};
Expand All @@ -346,10 +358,11 @@ TransactionManager.prototype._loadSequence = function(callback) {

TransactionManager.prototype._handleReconnect = function(callback) {
var self = this;
var callback = (typeof callback === 'function') ? callback : function(){};
callback = (typeof callback === 'function') ? callback : function() {};

if (!this._pending.length()) {
return callback();
callback();
return;
}

function handleTransactions(err, transactions) {
Expand All @@ -372,7 +385,7 @@ TransactionManager.prototype._handleReconnect = function(callback) {
// Resubmit pending transactions after sequence is loaded
self._resubmit();
});
};
}

var options = {
account: this._accountID,
Expand Down Expand Up @@ -410,7 +423,7 @@ TransactionManager.prototype._waitLedgers = function(ledgers, callback) {
self._remote.removeListener('ledger_closed', ledgerClosed);
callback();
}
};
}

this._remote.on('ledger_closed', ledgerClosed);
};
Expand All @@ -426,8 +439,16 @@ TransactionManager.prototype._waitLedgers = function(ledgers, callback) {

TransactionManager.prototype._resubmit = function(ledgers, pending) {
var self = this;
var ledgers = ledgers || 0;
var pending = pending ? [ pending ] : this._pending;

if (ledgers && typeof ledgers !== 'number') {
pending = ledgers;
ledgers = 0;
}

ledgers = ledgers || 0;
pending = pending instanceof Transaction
? [pending]
: this.getPending().getQueue();

function resubmitTransaction(transaction, next) {
if (!transaction || transaction.finalized) {
Expand All @@ -449,7 +470,7 @@ TransactionManager.prototype._resubmit = function(ledgers, pending) {
}

while (self._pending.hasSequence(transaction.tx_json.Sequence)) {
//Sequence number has been consumed by another transaction
// Sequence number has been consumed by another transaction
transaction.tx_json.Sequence += 1;

if (self._remote.trace) {
Expand All @@ -467,7 +488,7 @@ TransactionManager.prototype._resubmit = function(ledgers, pending) {
});

self._request(transaction);
};
}

this._waitLedgers(ledgers, function() {
async.eachSeries(pending, resubmitTransaction);
Expand Down Expand Up @@ -528,8 +549,8 @@ TransactionManager.prototype._request = function(tx) {
}

if (tx.attempts > 0 && !remote.local_signing) {
var message = 'Automatic resubmission requires local signing';
tx.emit('error', new RippleError('tejLocalSigningRequired', message));
var errMessage = 'Automatic resubmission requires local signing';
tx.emit('error', new RippleError('tejLocalSigningRequired', errMessage));
return;
}

Expand All @@ -542,22 +563,22 @@ TransactionManager.prototype._request = function(tx) {
// Transaction may succeed after Sequence is updated
self._resubmit(1, tx);
}
};
}

function transactionRetry(message) {
function transactionRetry() {
// XXX This may no longer be necessary. Instead, update sequence numbers
// after a transaction fails definitively
self._fillSequence(tx, function() {
self._resubmit(1, tx);
});
};
}

function transactionFailedLocal(message) {
if (message.engine_result === 'telINSUF_FEE_P') {
// Transaction may succeed after Fee is updated
self._resubmit(1, tx);
}
};
}

function submissionError(error) {
// Either a tem-class error or generic server error such as tooBusy. This
Expand All @@ -568,7 +589,7 @@ TransactionManager.prototype._request = function(tx) {
self._nextSequence--;
tx.emit('error', error);
}
};
}

function submitted(message) {
if (tx.finalized) {
Expand Down Expand Up @@ -611,7 +632,7 @@ TransactionManager.prototype._request = function(tx) {
// tem
submissionError(message);
}
};
}

function requestTimeout() {
// ND: What if the response is just slow and we get a response that
Expand All @@ -630,7 +651,7 @@ TransactionManager.prototype._request = function(tx) {
}
self._resubmit(1, tx);
}
};
}

tx.submitIndex = this._remote._ledger_current_index;

Expand Down Expand Up @@ -661,10 +682,7 @@ TransactionManager.prototype._request = function(tx) {

tx.emit('postsubmit');

//XXX
submitRequest.timeout(self._submissionTimeout, requestTimeout);

return submitRequest;
};

/**
Expand All @@ -675,7 +693,6 @@ TransactionManager.prototype._request = function(tx) {

TransactionManager.prototype.submit = function(tx) {
var self = this;
var remote = this._remote;

if (typeof this._nextSequence !== 'number') {
// If sequence number is not yet known, defer until it is.
Expand Down

0 comments on commit 76bad86

Please sign in to comment.