Skip to content

Commit

Permalink
feat(receiver-stream): add prototype for ReceiverStream
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Mar 1, 2016
1 parent 7b2e46e commit d52b8f4
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 0 deletions.
13 changes: 13 additions & 0 deletions lib/amqp_client.js
Expand Up @@ -8,6 +8,7 @@ var EventEmitter = require('events').EventEmitter,
Connection = require('./connection'),
Sasl = require('./sasl'),
Session = require('./session'),
ReceiverStream = require('./streams/receiver_stream'),

errors = require('./errors'),

Expand Down Expand Up @@ -208,6 +209,17 @@ AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
address = u.parseLinkAddress(address || this._defaultQueue, this.policy);
policyOverrides = policyOverrides || {};

// check if user wants a stream
var isStream = false;
if (policyOverrides && !!policyOverrides.stream) {
isStream = true;
delete policyOverrides.stream;

// the stream will handle flow
policyOverrides.credit = function() {}; // noop
policyOverrides.creditQuantum = 0;
}

var linkName = u.linkName(address.name, policyOverrides),
linkPolicy = u.deepMerge({
attach: {
Expand All @@ -234,6 +246,7 @@ AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
var attach = function() {
var attachPromise = function(_err, _link) {
if (!!_err) return reject(_err);
if (isStream) return resolve(new ReceiverStream(_link));
return resolve(_link);
};

Expand Down
37 changes: 37 additions & 0 deletions lib/streams/receiver_stream.js
@@ -0,0 +1,37 @@
'use strict';
var Readable = require('stream').Readable,
util = require('util');

function ReceiverStream(link, options) {
Readable.call(this, { objectMode: true });
this._link = link;
this._increaing = false;

link.on('message', this._processMessage.bind(this));
link.on('detached', this._haltProcessing.bind(this));
link.on('errorReceived', this._haltProcessing.bind(this));
link.on('creditChange', this._creditChange.bind(this));
}
util.inherits(ReceiverStream, Readable);

ReceiverStream.prototype._read = function(size) {
if (this._link.linkCredit <= 0 && !this._increasing) {
return this._link.addCredits(size);
}
};

ReceiverStream.prototype._processMessage = function(message) {
if (!this.push(message)) {
return this._link.flow({ linkCredit: 0 });
}
};

ReceiverStream.prototype._haltProcessing = function() {
this.push(null);
};

ReceiverStream.prototype._creditChange = function(credits) {
this._increasing = false;
};

module.exports = ReceiverStream;
48 changes: 48 additions & 0 deletions test/integration/qpid/streams.test.js
@@ -0,0 +1,48 @@
'use strict';
var Promise = require('bluebird'),
AMQPClient = require('../../..').Client,
config = require('./config'),
expect = require('chai').expect;

var test = {};
describe('Streams', function() {

describe('ReadableStream', function() {
beforeEach(function() {
if (!!test.client) delete test.client;
test.client = new AMQPClient();
return test.client.connect(config.address);
});

afterEach(function() {
return test.client.disconnect()
.then(function() { delete test.client; });
});

it('should let you create a receiver link as a readable stream', function(done) {
var expected = Array.apply(null, new Array(100))
.map(function(a) { return Math.floor(Math.random() * 100); });

return Promise.all([
test.client.createReceiver(config.defaultLink, { stream: true }),
test.client.createSender(config.defaultLink)
])
.spread(function(stream, sender) {
var count = 0;
stream.on('data', function(data) {
expect(expected[count]).to.eql(data.body);
count++;
if (count === expected.length) done();
});

var promises = [];
for (var i = 0; i < expected.length; ++i)
promises.push(sender.send(expected[i]));
return Promise.all(promises);
});
});


}); // ReadableStream

});

0 comments on commit d52b8f4

Please sign in to comment.