Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected void releaseResources() throws IOException {
channel.queueBind(DLQ, DLX, "test");

//measure round-trip latency
QueueMessageConsumer c = new QueueMessageConsumer(channel);
AccumulatingMessageConsumer c = new AccumulatingMessageConsumer(channel);
String cTag = channel.basicConsume(TEST_QUEUE_NAME, true, c);
long start = System.currentTimeMillis();
publish(null, "test");
Expand Down Expand Up @@ -536,6 +536,9 @@ public void process(GetResponse getResponse) {
assertNotNull(headers);
ArrayList<Object> death = (ArrayList<Object>) headers.get("x-death");
assertNotNull(death);
assertNotNull(headers.get("x-first-death-queue"));
assertNotNull(headers.get("x-first-death-reason"));
assertNotNull(headers.get("x-first-death-exchange"));
assertEquals(1, death.size());
assertDeathReason(death, 0, TEST_QUEUE_NAME, reason,
"amq.direct",
Expand All @@ -562,7 +565,7 @@ private void sleep(long millis) {

/* check that each message arrives within epsilon of the
publication time + TTL + latency */
private void checkPromptArrival(QueueMessageConsumer c,
private void checkPromptArrival(AccumulatingMessageConsumer c,
int count, long latency) throws Exception {
long epsilon = TTL / 10;
for (int i = 0; i < count; i++) {
Expand Down Expand Up @@ -697,11 +700,11 @@ private static String randomQueueName() {
return DeadLetterExchange.class.getSimpleName() + "-" + UUID.randomUUID().toString();
}

class QueueMessageConsumer extends DefaultConsumer {
class AccumulatingMessageConsumer extends DefaultConsumer {

BlockingQueue<byte[]> messages = new LinkedBlockingQueue<byte[]>();

public QueueMessageConsumer(Channel channel) {
public AccumulatingMessageConsumer(Channel channel) {
super(channel);
}

Expand Down