Skip to content

Commit

Permalink
feat(transaction): transform replies of transactions
Browse files Browse the repository at this point in the history
BREAKING CHANGE:

  1. Reply transformers is supported inside transactions.
  2. `Pipeline#execBuffer()` is deprecated. Use `Pipeline#exec()` instead.

Closes #158.
  • Loading branch information
luin committed Mar 13, 2016
1 parent 4a119bf commit e0b1883
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 25 deletions.
30 changes: 19 additions & 11 deletions lib/command.js
Original file line number Diff line number Diff line change
Expand Up @@ -200,25 +200,33 @@ Command.prototype.stringifyArguments = function () {
Command.prototype._convertValue = function (resolve) {
var _this = this;
return function (value) {
// Convert buffer/buffer[] to string/string[]
var result = value;
var transformer;
try {
if (_this.replyEncoding) {
result = utils.convertBufferToString(value, _this.replyEncoding);
}
transformer = Command._transformer.reply[_this.name];
if (transformer) {
result = transformer(result);
}
resolve(result);
resolve(_this.transformReply(value));
} catch (err) {
_this.reject(err);
}
return _this.promise;
};
};

/**
* Convert buffer/buffer[] to string/string[],
* and apply reply transformer.
*
* @public
*/
Command.prototype.transformReply = function (result) {
if (this.replyEncoding) {
result = utils.convertBufferToString(result, this.replyEncoding);
}
var transformer = Command._transformer.reply[this.name];
if (transformer) {
result = transformer(result);
}

return result;
};

Command.FLAGS = {
// Commands that can be processed when Redis is loading data from disk
VALID_WHEN_LOADING: ['info', 'auth', 'select', 'subscribe', 'unsubscribe', 'psubscribe',
Expand Down
24 changes: 19 additions & 5 deletions lib/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var Command = require('./command');
var fbuffer = require('flexbuffer');
var Promise = require('bluebird');
var utils = require('./utils');
var util = require('util');
var commands = require('redis-commands');

function Pipeline(redis) {
Expand Down Expand Up @@ -35,13 +36,27 @@ function Pipeline(redis) {
_.assign(Pipeline.prototype, Commander.prototype);

Pipeline.prototype.fillResult = function (value, position) {
var i;
if (this._queue[position].name === 'exec' && Array.isArray(value[1])) {
var execLength = value[1].length;
for (i = 0; i < execLength; i++) {
if (value[1][i] instanceof Error) {
continue;
}
var cmd = this._queue[position - (execLength - i)];
try {
value[1][i] = cmd.transformReply(value[1][i]);
} catch (err) {
value[1][i] = err;
}
}
}
this._result[position] = value;

if (--this.replyPending) {
return;
}

var i;
if (this.isCluster) {
var retriable = true;
var commonError;
Expand Down Expand Up @@ -175,18 +190,17 @@ Pipeline.prototype.multi = function () {
};

var execBuffer = Pipeline.prototype.execBuffer;
Pipeline.prototype.execBuffer = function () {
Pipeline.prototype.execBuffer = util.deprecate(function () {
if (this._transactions > 0) {
this._transactions -= 1;
}
return execBuffer.apply(this, arguments);
};
}, 'Pipeline#execBuffer: Use Pipeline#exec instead');

var exec = Pipeline.prototype.exec;
Pipeline.prototype.exec = function (callback) {
if (this._transactions > 0) {
this._transactions -= 1;
return exec.apply(this, arguments);
return execBuffer.apply(this, arguments);
}
if (!this.nodeifiedPromise) {
this.nodeifiedPromise = true;
Expand Down
46 changes: 37 additions & 9 deletions test/functional/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('transaction', function () {

it('should handle compile-time errors correctly', function (done) {
var redis = new Redis();
redis.multi().set('foo').get('foo').exec(function (err, result) {
redis.multi().set('foo').get('foo').exec(function (err) {
expect(err).to.be.instanceof(Error);
expect(err.toString()).to.match(/Transaction discarded because of previous errors/);
done();
Expand Down Expand Up @@ -69,14 +69,42 @@ describe('transaction', function () {
});
});

it('should support execBuffer', function (done) {
var redis = new Redis();
redis.multi().set('foo', 'bar').get('foo').execBuffer(function (err, res) {
expect(res[0][1]).to.be.instanceof(Buffer);
expect(res[0][1].toString()).to.eql('OK');
expect(res[1][1]).to.be.instanceof(Buffer);
expect(res[1][1].toString()).to.eql('bar');
done();
describe('transformer', function () {
it('should trigger transformer', function (done) {
var redis = new Redis();
var pending = 2;
var data = { name: 'Bob', age: '17' };
redis.multi().hmset('foo', data).hgetall('foo', function (err, res) {
expect(res).to.eql('QUEUED');
if (!--pending) {
done();
}
}).hgetallBuffer('foo').exec(function (err, res) {
expect(res[0][1]).to.eql('OK');
expect(res[1][1]).to.eql(data);
expect(res[2][1]).to.eql({
name: new Buffer('Bob'),
age: new Buffer('17')
});
if (!--pending) {
done();
}
});
});

it('should trigger transformer inside pipeline', function (done) {
var redis = new Redis();
var data = { name: 'Bob', age: '17' };
redis.pipeline().hmset('foo', data).multi().typeBuffer('foo')
.hgetall('foo').exec().hgetall('foo').exec(function (err, res) {
expect(res[0][1]).to.eql('OK');
expect(res[1][1]).to.eql('OK');
expect(res[2][1]).to.eql(new Buffer('QUEUED'));
expect(res[3][1]).to.eql('QUEUED');
expect(res[4][1]).to.eql([new Buffer('hash'), data]);
expect(res[5][1]).to.eql(data);
done();
});
});
});

Expand Down

0 comments on commit e0b1883

Please sign in to comment.