Skip to content

Commit

Permalink
fix(eh-integration): refactor tests to fix after types refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Feb 16, 2016
1 parent 6376e9d commit 1dc2bd1
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 187 deletions.
10 changes: 4 additions & 6 deletions lib/adapters/translate_encoder.js
Expand Up @@ -73,13 +73,11 @@ var encoder = processor({
var k = encoder(vals[idx]);
var v = encoder(vals[idx + 1]);
result[k] = v;
// if (k instanceof AMQPSymbol) {
// isFields = true;
// result[k.contents] = v;
// } else {
// result[k] = v;
// }
if (k instanceof ForcedType && k.typeName === 'symbol') {
isFields = true;
}
}

return isFields ? new AMQPFields(result) : result;
}
},
Expand Down
8 changes: 4 additions & 4 deletions lib/amqp_client.js
Expand Up @@ -59,7 +59,7 @@ function AMQPClient(policy) {
this.policy = u.deepMerge(this._originalPolicy);
this._connection = null;
this._session = null;

this._reconnect = null;
if (!!this.policy.reconnect) {
this._timeouts = u.generateTimeouts(this.policy.reconnect);
Expand Down Expand Up @@ -224,9 +224,9 @@ AMQPClient.prototype.createReceiver = function(address, policyOverrides) {
'apache.org:legacy-amqp-topic-binding:string' :
'apache.org:legacy-amqp-direct-binding:string';

linkPolicy.attach.source.filter = translator([
'described', ['symbol', filterSymbol], ['string', address.subject]
]);
linkPolicy.attach.source.filter = {};
linkPolicy.attach.source.filter[filterSymbol] =
translator(['described', ['symbol', filterSymbol], ['string', address.subject]]);
}

var self = this;
Expand Down
6 changes: 3 additions & 3 deletions lib/codec.js
Expand Up @@ -209,10 +209,10 @@ Codec.prototype._encodeObject = function(value, buffer) {
// NOTE: Described type constructors are either ulongs or a symbol. Here
// we are checking if the passed in value is a number and forcing a
// ulong encoding. For everything else we attempt to encode as a symbol.
var descriptor = (value.descriptor instanceof ForcedType) ? value.descriptor.value : value.descriptor;
var descriptorEncoder =
(typeof value.descriptor === 'number') ? types.ulong : types.symbol;
descriptorEncoder.encode(value.descriptor, buffer);

(typeof descriptor === 'number') ? types.ulong : types.symbol;
descriptorEncoder.encode(descriptor, buffer);
this.encode(value.getValue() || [], buffer);
return;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/frames.js
Expand Up @@ -200,7 +200,7 @@ frames.TransferFrame = defineFrame(FrameType.AMQP, {
{ name: 'handle', type: 'uint', mandatory: true },
{ name: 'deliveryId', type: 'uint' },
{ name: 'deliveryTag', type: 'binary' },
{ name: 'messageFormat', type: 'uint' },
{ name: 'messageFormat', type: 'uint', default: 0 },
{ name: 'settled', type: 'boolean' },
{ name: 'more', type: 'boolean', default: false },
{ name: 'rcvSettleMode', type: 'ubyte', default: 0 },
Expand Down
2 changes: 1 addition & 1 deletion test/integration/qpid/client.test.js
Expand Up @@ -125,7 +125,7 @@ describe('Client', function() {
{
section: 'messageAnnotations',
data: {
annotations: {
messageAnnotations: {
"x-foo" : 5,
"x-bar" : "wibble"
}
Expand Down
275 changes: 137 additions & 138 deletions test/integration/servicebus/eventhubs/client.test.js
Expand Up @@ -8,154 +8,153 @@ var AMQPClient = require('../../../../lib/index.js').Client,
uuid = require('uuid'),
_ = require('lodash');

function createPartitionReceivers(client, count, prefix, options) {
return Promise.map(_.range(count), function(partition) {
return client.createReceiver(prefix + partition, options);
});
}

var test = {};
describe('ServiceBus', function() {
describe('EventHubs', function () {
describe('EventHubs', function () {
beforeEach(function () {
if (!!test.client) test.client = undefined;
test.client = new AMQPClient(Policy.ServiceBusQueue);
});

beforeEach(function () {
if (!!test.client) test.client = undefined;
test.client = new AMQPClient(Policy.ServiceBusQueue);
});
afterEach(function () {
return test.client.disconnect()
.then(function() { test.client = undefined; });
});

afterEach(function () {
return test.client.disconnect().then(function () {
test.client = undefined;
});
});

it('should connect, send, and receive a message', function (done) {
expect(config.senderLink, 'Required env vars not found in ' + Object.keys(process.env)).to.exist;
var msgVal = uuid.v4();
test.client.connect(config.address)
.then(function() {
return Promise.all(
_.range(config.partitionCount).map(function(partition) { return test.client.createReceiver(config.receiverLinkPrefix + partition); }).
concat(test.client.createSender(config.senderLink))
);
})
.then(function (links) {
var sender = links.pop();
_.each(links, function (receiver) {
receiver.on('message', function (message) {
expect(message).to.exist;
expect(message.body).to.exist;
// Ignore messages that aren't from us.
if (!!message.body.DataValue && message.body.DataValue === msgVal) {
done();
}
});
});
it('should connect, send, and receive a message', function (done) {
expect(config.senderLink, 'Required env vars not found in ' + Object.keys(process.env)).to.exist;

return sender.send({"DataString": "From Node v2", "DataValue": msgVal});
var msgVal = uuid.v4();
return test.client.connect(config.address)
.then(function() {
return createPartitionReceivers(test.client, config.partitionCount, config.receiverLinkPrefix);
})
.map(function(receiver) {
receiver.on('message', function (message) {
expect(message).to.exist;
expect(message.body).to.exist;
// Ignore messages that aren't from us.
if (!!message.body.DataValue && message.body.DataValue === msgVal) {
done();
}
});
});

it('should create receiver with date-based x-header', function (done) {
expect(config.senderLink, 'Required env vars not found in ' + Object.keys(process.env)).to.exist;
var msgVal = uuid.v4();
var now = Date.now() - (1000 * 5); // 5 seconds ago
var filterOptions = {
attach: { source: { filter: {
'apache.org:selector-filter:string': translator(
['described', ['symbol', 'apache.org:selector-filter:string'], ['string', "amqp.annotation.x-opt-enqueuedtimeutc > " + now]])
} } }
};
test.client.connect(config.address)
.then(function() {
return Promise.all(
_.range(config.partitionCount).map(function(partition) { return test.client.createReceiver(config.receiverLinkPrefix + partition, filterOptions); }).
concat(test.client.createSender(config.senderLink))
);
})
.then(function (links) {
var sender = links.pop();
_.each(links, function (receiver) {
receiver.on('message', function (message) {
expect(message).to.exist;
expect(message.body).to.exist;
// Ignore messages that aren't from us.
if (!!message.body.DataValue && message.body.DataValue === msgVal) {
done();
}
});
});
})
.then(function() { return test.client.createSender(config.senderLink); })
.then(function(sender) { return sender.send({ DataString: 'From Node v2', DataValue: msgVal }); });
});

it('should create receiver with date-based x-header', function (done) {
expect(config.senderLink, 'Required env vars not found in ' + Object.keys(process.env)).to.exist;

return sender.send({"DataString": "From Node v2", "DataValue": msgVal});
var msgVal = uuid.v4();
var now = Date.now() - (1000 * 5); // 5 seconds ago

var filterOptions = {
attach: { source: { filter: {
'apache.org:selector-filter:string': translator(
['described', ['symbol', 'apache.org:selector-filter:string'], ['string', 'amqp.annotation.x-opt-enqueuedtimeutc > ' + now]])
} } }
};

return test.client.connect(config.address)
.then(function() {
return createPartitionReceivers(test.client, config.partitionCount, config.receiverLinkPrefix, filterOptions);
})
.map(function(receiver) {
receiver.on('message', function(message) {
expect(message).to.exist;
expect(message.body).to.exist;
// Ignore messages that aren't from us.
if (!!message.body.DataValue && message.body.DataValue === msgVal) {
done();
}
});
});

it('should send to a specific partition', function (done) {
expect(config.partitionSenderLinkPrefix, 'Required env vars not found in ' + Object.keys(process.env)).to.exist;
var msgVal = uuid.v4();
var partition = '1';
test.client.connect(config.address)
.then(function() {
return Promise.all([
test.client.createReceiver(config.receiverLinkPrefix + partition),
test.client.createSender(config.partitionSenderLinkPrefix + partition)
]);
})
.spread(function (receiver, sender) {
receiver.on('message', function (message) {
expect(message).to.exist;
expect(message.body).to.exist;
// Ignore messages that aren't from us.
if (!!message.body.DataValue && message.body.DataValue === msgVal) {
done();
}
});

return sender.send({"DataString": "From Node v2", "DataValue": msgVal});
})
.then(function() { return test.client.createSender(config.senderLink); })
.then(function(sender) { return sender.send({ DataString: 'From Node v2', DataValue: msgVal }); });
});

it('should send to a specific partition', function (done) {
expect(config.partitionSenderLinkPrefix, 'Required env vars not found in ' + Object.keys(process.env)).to.exist;

var msgVal = uuid.v4();
var partition = '1';
test.client.connect(config.address)
.then(function() {
return Promise.all([
test.client.createReceiver(config.receiverLinkPrefix + partition),
test.client.createSender(config.partitionSenderLinkPrefix + partition)
]);
})
.spread(function (receiver, sender) {
receiver.on('message', function (message) {
expect(message).to.exist;
expect(message.body).to.exist;
// Ignore messages that aren't from us.
if (!!message.body.DataValue && message.body.DataValue === msgVal) {
done();
}
});
});

it('should only receive messages after last offset when using offset-based x-header', function (done) {
expect(config.partitionSenderLinkPrefix, 'Required env vars not found in ' + Object.keys(process.env)).to.exist;
var msgVal1 = uuid.v4();
var msgVal2 = uuid.v4();
var partition = '1';
test.client.connect(config.address)
.then(function() {
return Promise.all([
test.client.createReceiver(config.receiverLinkPrefix + partition),
test.client.createSender(config.partitionSenderLinkPrefix + partition)
]);
})
.spread(function (receiver, sender) {
receiver.on('message', function (message) {
expect(message).to.exist;
expect(message.body).to.exist;
// Ignore messages that aren't from us.
if (!!message.body.DataValue && message.body.DataValue === msgVal1) {
var offset = message.annotations.value['x-opt-offset'];
var timestamp = message.annotations.value['x-opt-enqueued-time'].getTime();
receiver.detach().then(function() {
var filterOptions = {
attach: { source: { filter: {
'apache.org:selector-filter:string': translator(
['described', ['symbol', 'apache.org:selector-filter:string'], ['string', "amqp.annotation.x-opt-offset > '" + offset + "'"]])
} } }
};
test.client.createReceiver(config.receiverLinkPrefix + partition, filterOptions).then(function (receiver2) {
receiver2.on('message', function(msg) {
expect(msg).to.exist;
expect(msg.body).to.exist;
// Ignore messages that aren't from this test run.
if (!!msg.body.DataValue && (msg.body.DataValue === msgVal1 || msg.body.DataValue === msgVal2)) {
expect(msg.annotations.value['x-opt-enqueued-time'].getTime()).to.be.above(timestamp);
expect(msg.body.DataValue).to.not.eql(msgVal1);
expect(msg.body.DataValue).to.eql(msgVal2);
done();
}
});
return sender.send({"DataString": "From Node v2", "DataValue": msgVal2});

return sender.send({ DataString: 'From Node v2', DataValue: msgVal });
});
});

it('should only receive messages after last offset when using offset-based x-header', function (done) {
expect(config.partitionSenderLinkPrefix, 'Required env vars not found in ' + Object.keys(process.env)).to.exist;

var msgVal1 = uuid.v4();
var msgVal2 = uuid.v4();
var partition = '1';
test.client.connect(config.address)
.then(function() {
return Promise.all([
test.client.createReceiver(config.receiverLinkPrefix + partition),
test.client.createSender(config.partitionSenderLinkPrefix + partition)
]);
})
.spread(function (receiver, sender) {
receiver.on('message', function (message) {
expect(message).to.exist;
expect(message.body).to.exist;
// Ignore messages that aren't from us.
if (!!message.body.DataValue && message.body.DataValue === msgVal1) {
var offset = message.messageAnnotations['x-opt-offset'];
var timestamp = message.messageAnnotations['x-opt-enqueued-time'].getTime();
receiver.detach().then(function() {
var filterOptions = {
attach: { source: { filter: {
'apache.org:selector-filter:string': translator(
['described', ['symbol', 'apache.org:selector-filter:string'], ['string', "amqp.annotation.x-opt-offset > '" + offset + "'"]])
} } }
};
test.client.createReceiver(config.receiverLinkPrefix + partition, filterOptions).then(function (receiver2) {
receiver2.on('message', function(msg) {
expect(msg).to.exist;
expect(msg.body).to.exist;
// Ignore messages that aren't from this test run.
if (!!msg.body.DataValue && (msg.body.DataValue === msgVal1 || msg.body.DataValue === msgVal2)) {
expect(msg.messageAnnotations['x-opt-enqueued-time'].getTime()).to.be.above(timestamp);
expect(msg.body.DataValue).to.not.eql(msgVal1);
expect(msg.body.DataValue).to.eql(msgVal2);
done();
}
});
return sender.send({ DataString: 'From Node v2', DataValue: msgVal2 });
});
}
});

return sender.send({"DataString": "From Node v2", "DataValue": msgVal1});
});
}
});
});

return sender.send({ DataString: 'From Node v2', DataValue: msgVal1 });
});
});
});

}); // EventHubs
}); // ServiceBus

0 comments on commit 1dc2bd1

Please sign in to comment.