Skip to content

Commit

Permalink
Fixes #237
Browse files Browse the repository at this point in the history
  • Loading branch information
cressie176 committed May 27, 2024
1 parent aa8fa13 commit f96e826
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Change Log

## 20.1.0
- Ignore and remove immediateNack header based on the xDeath header. See https://github.com/onebeyond/rascal/issues/237

## 20.0.0
- Replaced superagent with native node http client as per https://github.com/onebeyond/rascal/issues/234

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1534,7 +1534,7 @@ As mentioned previously, dead lettering invalid messages is a good strategy with
ackOrNack(err, { strategy: 'republish', immediateNack: true });
```
If you ever want to resend the message to the same queue you will have to remove the `properties.headers.rascal.<queue>.immediateNack` header first.
Prior to Rascal v20.1.0, if you wanted to resend the message to the original queue you had to remove the `properties.headers.rascal.<queue>.immediateNack` header first. From v20.1.0, Rascal will ignore and remove the immediateNack header if the message's xDeath header indicates that the message was dead lettered after it was republished with immediateNack.
##### Forward
Expand Down
6 changes: 5 additions & 1 deletion lib/amqp/SubscriberError.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ module.exports = function SubscriptionRecovery(broker, vhost) {
_.set(publishOptions, 'headers.rascal.error.code', err.code);
_.set(publishOptions, 'headers.rascal.restoreRoutingHeaders', _.has(strategyConfig, 'restoreRoutingHeaders') ? strategyConfig.restoreRoutingHeaders : true);

if (strategyConfig.immediateNack) _.set(publishOptions, ['headers', 'rascal', 'recovery', originalQueue, 'immediateNack'], true);
if (strategyConfig.immediateNack) {
const xDeathRecords = message.properties.headers['x-death'] || [];
const xDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || { count: 0 };
_.set(publishOptions, ['headers', 'rascal', 'recovery', originalQueue], { immediateNack: true, xDeath });
}

const ackMessage = () => {
session._ack(message, (err) => {
Expand Down
12 changes: 11 additions & 1 deletion lib/amqp/Subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,17 @@ function Subscription(broker, vhost, subscriptionConfig, counter) {
}

function immediateNack(message) {
if (_.get(message, ['properties', 'headers', 'rascal', 'recovery', message.properties.headers.rascal.originalQueue, 'immediateNack'])) return true;
const originalQueue = message.properties.headers.rascal.originalQueue;
const xDeathRecords = message.properties.headers['x-death'] || [];
const currentXDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || { count: 0 };
const previousXDeath = _.get(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath'], { count: 0 });
const hasImmediateNackHeader = _.has(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']);
if (!hasImmediateNackHeader) return false;
debug('Message %s has been marked for immediate nack', message.properties.messageId);
if (currentXDeath.count === previousXDeath.count) return true;
debug('Message %s has been replayed after being dead lettered. Removing immediate nack.', message.properties.messageId);
_.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']);
_.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath']);
return false;
}

Expand Down
176 changes: 176 additions & 0 deletions test/subscriptions.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,182 @@ describe('Subscriptions', () => {
);
});

it('should ignore immediate nack when messages are replayed to the original queue from a dead letter queue', (test, done) => {
createBroker(
{
vhosts: {
'/': {
namespace,
exchanges: {
e1: {
assert: true,
},
e2: {
assert: true,
},
},
queues: {
q1: {
assert: true,
options: {
arguments: {
'x-dead-letter-exchange': 'e2',
},
},
},
q2: {
assert: true,
},
},
bindings: {
b1: {
source: 'e1',
destination: 'q1',
},
b2: {
source: 'e2',
destination: 'q2',
},
},
},
},
publications: _.pick(publications, 'p1'),
subscriptions: {
s1: {
vhost: '/',
queue: 'q1',
},
s2: {
vhost: '/',
queue: 'q2',
},
},
},
(err, broker) => {
assert.ifError(err);
broker.publish('p1', 'test message', (err) => {
assert.ifError(err);

broker.subscribe('s1', (err, subscription) => {
assert.ifError(err);
let count = 0;
subscription.on('message', (message, content, ackOrNack) => {
count++;
if (count === 1) {
assert.ok(message);
ackOrNack(new Error('immediate nack'), {
strategy: 'republish',
immediateNack: true,
});
} else {
assert.strictEqual(count, 2);
assert.ok(message);
ackOrNack();
done();
}
});
});

broker.subscribe('s2', (err, subscription) => {
assert.ifError(err);
subscription.on('message', (message, content, ackOrNack) => {
ackOrNack();
broker.forward('p1', message, () => {});
});
});
});
},
);
});

it('should ignore immediate nack when messages are replayed to the original queue from a dead letter queue repeatedly', (test, done) => {
createBroker(
{
vhosts: {
'/': {
namespace,
exchanges: {
e1: {
assert: true,
},
e2: {
assert: true,
},
},
queues: {
q1: {
assert: true,
options: {
arguments: {
'x-dead-letter-exchange': 'e2',
},
},
},
q2: {
assert: true,
},
},
bindings: {
b1: {
source: 'e1',
destination: 'q1',
},
b2: {
source: 'e2',
destination: 'q2',
},
},
},
},
publications: _.pick(publications, 'p1'),
subscriptions: {
s1: {
vhost: '/',
queue: 'q1',
},
s2: {
vhost: '/',
queue: 'q2',
},
},
},
(err, broker) => {
assert.ifError(err);
broker.publish('p1', 'test message', (err) => {
assert.ifError(err);

broker.subscribe('s1', (err, subscription) => {
assert.ifError(err);
let count = 0;
subscription.on('message', (message, content, ackOrNack) => {
count++;
if (count <= 2) {
assert.ok(message);
ackOrNack(new Error('immediate nack'), {
strategy: 'republish',
immediateNack: true,
});
} else {
assert.strictEqual(count, 3);
assert.ok(message);
ackOrNack();
done();
}
});
});

broker.subscribe('s2', (err, subscription) => {
assert.ifError(err);
subscription.on('message', (message, content, ackOrNack) => {
ackOrNack();
broker.forward('p1', message, () => {});
});
});
});
},
);
});

it('should forward messages to publication when requested', (test, done) => {
createBroker(
{
Expand Down
160 changes: 160 additions & 0 deletions test/subscriptionsAsPromised.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,166 @@ describe(
});
});

it('should ignore immediate nack when messages are replayed to the original queue from a dead letter queue', (test, done) => {
createBroker({
vhosts: {
'/': {
namespace,
exchanges: {
e1: {
assert: true,
},
e2: {
assert: true,
},
},
queues: {
q1: {
assert: true,
options: {
arguments: {
'x-dead-letter-exchange': 'e2',
},
},
},
q2: {
assert: true,
},
},
bindings: {
b1: {
source: 'e1',
destination: 'q1',
},
b2: {
source: 'e2',
destination: 'q2',
},
},
},
},
publications: _.pick(publications, 'p1'),
subscriptions: {
s1: {
vhost: '/',
queue: 'q1',
},
s2: {
vhost: '/',
queue: 'q2',
},
},
}).then((broker) => {
broker.publish('p1', 'test message').then(() => {
broker.subscribe('s1').then((subscription) => {
let count = 0;
subscription.on('message', (message, content, ackOrNack) => {
count++;
if (count === 1) {
assert.ok(message);
ackOrNack(new Error('immediate nack'), {
strategy: 'republish',
immediateNack: true,
});
} else {
assert.strictEqual(count, 2);
assert.ok(message);
ackOrNack();
done();
}
});
});

broker.subscribe('s2').then((subscription) => {
subscription.on('message', (message, content, ackOrNack) => {
ackOrNack();
broker.forward('p1', message, () => {});
});
});
});
});
});

it('should ignore immediate nack when messages are replayed to the original queue from a dead letter queue repeatedly', (test, done) => {
createBroker({
vhosts: {
'/': {
namespace,
exchanges: {
e1: {
assert: true,
},
e2: {
assert: true,
},
},
queues: {
q1: {
assert: true,
options: {
arguments: {
'x-dead-letter-exchange': 'e2',
},
},
},
q2: {
assert: true,
},
},
bindings: {
b1: {
source: 'e1',
destination: 'q1',
},
b2: {
source: 'e2',
destination: 'q2',
},
},
},
},
publications: _.pick(publications, 'p1'),
subscriptions: {
s1: {
vhost: '/',
queue: 'q1',
},
s2: {
vhost: '/',
queue: 'q2',
},
},
}).then((broker) => {
broker.publish('p1', 'test message').then(() => {
broker.subscribe('s1').then((subscription) => {
let count = 0;
subscription.on('message', (message, content, ackOrNack) => {
count++;
if (count <= 2) {
assert.ok(message);
ackOrNack(new Error('immediate nack'), {
strategy: 'republish',
immediateNack: true,
});
} else {
assert.strictEqual(count, 3);
assert.ok(message);
ackOrNack();
done();
}
});
});

broker.subscribe('s2').then((subscription) => {
subscription.on('message', (message, content, ackOrNack) => {
ackOrNack();
broker.forward('p1', message, () => {});
});
});
});
});
});

it('should forward messages to publication when requested', (test, done) => {
createBroker({
vhosts,
Expand Down

0 comments on commit f96e826

Please sign in to comment.