Skip to content

Commit

Permalink
ouch all tests passed.
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed May 6, 2020
1 parent ef09868 commit c748340
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 91 deletions.
Expand Up @@ -79,7 +79,7 @@ public void verifyDelayedQueueStatus() throws TimedOutException {
}

public void verifyMetricStatus() throws TimedOutException {
enqueue(emailDlq, i -> Email.newInstance(), 10);
enqueue(emailDeadLetterQueue, i -> Email.newInstance(), 10);

Job job = Job.newInstance();
failureManager.createFailureDetail(job.getId(), -1, 0);
Expand All @@ -90,7 +90,7 @@ public void verifyMetricStatus() throws TimedOutException {
meterRegistry
.get("dead.letter.queue.size")
.tags("rqueue", "test")
.tags("queue", emailQueueName)
.tags("queue", emailQueue)
.gauge()
.value(),
0);
Expand All @@ -107,7 +107,7 @@ public void verifyMetricStatus() throws TimedOutException {
}

public void verifyCountStatus() throws TimedOutException {
messageSender.put(emailQueueName, Email.newInstance());
messageSender.put(emailQueue, Email.newInstance());
Job job = Job.newInstance();
failureManager.createFailureDetail(job.getId(), 1, 1);
messageSender.put(jobQueue, job);
Expand All @@ -122,25 +122,25 @@ public void verifyCountStatus() throws TimedOutException {
>= 1,
30000,
"job process",
() -> printQueueStats(newArrayList(jobQueue, emailQueueName, notificationQueue)));
() -> printQueueStats(newArrayList(jobQueue, emailQueue, notificationQueue)));
waitFor(
() ->
meterRegistry
.get("execution.count")
.tags("rqueue", "test")
.tags("queue", emailQueueName)
.tags("queue", emailQueue)
.counter()
.count()
== 1,
"message process",
() -> printQueueStats(newArrayList(jobQueue, emailQueueName, notificationQueue)));
() -> printQueueStats(newArrayList(jobQueue, emailQueue, notificationQueue)));

assertEquals(
0,
meterRegistry
.get("failure.count")
.tags("rqueue", "test")
.tags("queue", emailQueueName)
.tags("queue", emailQueue)
.counter()
.count(),
0);
Expand Down
Expand Up @@ -50,7 +50,7 @@ public class SpringTestBase {
protected String emailQueue;

@Value("${job.queue.name}")
protected String jobQueueName;
protected String jobQueue;

@Value("${email.dead.letter.queue.name}")
protected String emailDeadLetterQueue;
Expand All @@ -64,15 +64,6 @@ public class SpringTestBase {
@Value("${notification.queue.retry.count}")
protected int notificationRetryCount;

@Value("${email.dead.letter.queue.name}")
protected String emailDlq;

@Value("${email.queue.name}")
protected String emailQueueName;

@Value("${job.queue.name}")
protected String jobQueue;

protected void enqueue(Object message, String queueName) {
RqueueMessage rqueueMessage = RqueueMessageFactory.buildMessage(message, queueName, null, null);
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
Expand Down Expand Up @@ -101,7 +92,7 @@ protected void enqueueIn(String zsetName, Factory factory, Delay delay, int n) {
long score = delay.getDelay(i);
RqueueMessage rqueueMessage =
RqueueMessageFactory.buildMessage(object, zsetName, null, score);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, score);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, rqueueMessage.getProcessAt());
}
}

Expand Down
Expand Up @@ -43,7 +43,7 @@ public class ApplicationTest extends SpringTestBase {
public void afterNRetryTaskIsDeletedFromProcessingQueue() throws TimedOutException {
Job job = Job.newInstance();
failureManager.createFailureDetail(job.getId(), 3, 10);
messageSender.put(jobQueueName, job);
messageSender.put(jobQueue, job);
waitFor(
() -> {
Job jobInDb = consumedMessageService.getMessage(job.getId(), Job.class);
Expand All @@ -52,7 +52,7 @@ public void afterNRetryTaskIsDeletedFromProcessingQueue() throws TimedOutExcepti
"job to be executed");
waitFor(
() -> {
List<Object> messages = messageSender.getAllMessages(jobQueueName);
List<Object> messages = messageSender.getAllMessages(jobQueue);
return !messages.contains(job);
},
"message should be deleted from internal storage");
Expand Down
Expand Up @@ -43,19 +43,27 @@
@SpringBootTest
@Slf4j
public class MessageChannelTest extends SpringTestBase {
/**
* This test verified whether any pending message in the delayed queue are moved or not Whenever a
* delayed message is pushed then it's checked whether there're any pending messages on delay
* queue. if expired delayed messages are found on the head then a message is published on delayed
* channel.
*/
@Test
public void publishMessageIsTriggeredOnMessageAddition() throws TimedOutException {
int messageCount = 200;
enqueueIn(rqueueConfig.getQueueName(emailQueue), i -> Email.newInstance(), i -> -1000L, 10);
String delayedQueueName = rqueueConfig.getDelayedQueueName(emailQueue);
enqueueIn(delayedQueueName, i -> Email.newInstance(), i -> -1000L, messageCount);
Email email = Email.newInstance();
log.info("adding new message {}", email);
messageSender.put(emailQueue, email, 1000L);
waitFor(
() -> stringRqueueRedisTemplate.getZsetSize(emailQueue) <= 1,
() -> stringRqueueRedisTemplate.getZsetSize(delayedQueueName) <= 1,
"one or zero messages in zset");
assertTrue(
"Messages are correctly moved",
stringRqueueRedisTemplate.getListSize(emailQueue) >= messageCount);
stringRqueueRedisTemplate.getListSize(rqueueConfig.getQueueName(emailQueue))
>= messageCount);
assertEquals(messageCount + 1, messageSender.getAllMessages(emailQueue).size());
}
}
Expand Up @@ -71,15 +71,15 @@ public void emailIsRetried() throws TimedOutException {
public void jobIsRetriedAndMessageIsInProcessingQueue() throws TimedOutException {
Job job = Job.newInstance();
failureManager.createFailureDetail(job.getId(), -1, 0);
messageSender.put(jobQueueName, job);
messageSender.put(jobQueue, job);
waitFor(() -> failureManager.getFailureCount(job.getId()) >= 3, "Job to be retried");
waitFor(
() -> {
List<Object> messages = messageSender.getAllMessages(jobQueueName);
List<Object> messages = messageSender.getAllMessages(jobQueue);
return messages.contains(job);
},
"message should be present in internal storage");
// more then one copy should not be present
assertEquals(1, messageSender.getAllMessages(jobQueueName).size());
assertEquals(1, messageSender.getAllMessages(jobQueue).size());
}
}
Expand Up @@ -57,8 +57,7 @@ public class ProcessingMessageSchedulerTest extends SpringTestBase {
new RunTestUntilFail(
log,
() -> {
for (Entry<String, List<RqueueMessage>> entry :
getMessageMap(jobQueueName).entrySet()) {
for (Entry<String, List<RqueueMessage>> entry : getMessageMap(jobQueue).entrySet()) {
log.error("FAILING Queue {}", entry.getKey());
for (RqueueMessage message : entry.getValue()) {
log.error("FAILING Queue {} Msg {}", entry.getKey(), message);
Expand All @@ -70,7 +69,7 @@ public class ProcessingMessageSchedulerTest extends SpringTestBase {

@Test
public void publishMessageIsTriggeredOnMessageRemoval() throws TimedOutException {
String processingQueueName = jobQueueName;
String processingQueueName = jobQueue;
long currentTime = System.currentTimeMillis();
List<Job> jobs = new ArrayList<>();
List<String> ids = new ArrayList<>();
Expand All @@ -88,7 +87,7 @@ public void publishMessageIsTriggeredOnMessageRemoval() throws TimedOutException
}
TimeoutUtils.sleep(maxDelay);
waitFor(
() -> 0 == messageSender.getAllMessages(jobQueueName).size(),
() -> 0 == messageSender.getAllMessages(jobQueue).size(),
30 * Constants.ONE_MILLI,
"messages to be consumed");
waitFor(
Expand Down
Expand Up @@ -90,16 +90,15 @@ public void moveMessageZsetToList() {

@Test
public void moveMessageZsetToZset() {
String srcZset = jobQueueName + "src";
String tgtZset = jobQueueName + "tgt";
String srcZset = jobQueue + "src";
String tgtZset = jobQueue + "tgt";
enqueueIn(srcZset, i -> Job.newInstance(), i -> 5000L, 10);
rqueueMessageTemplate.moveMessageZsetToZset(srcZset, tgtZset, 10, 0, false);
Assert.assertEquals(10, stringRqueueRedisTemplate.getZsetSize(tgtZset).intValue());
Assert.assertEquals(0, stringRqueueRedisTemplate.getZsetSize(srcZset).intValue());

rqueueMessageTemplate.moveMessageZsetToZset(tgtZset, "_rq::xx" + jobQueueName, 10, 0, false);
rqueueMessageTemplate.moveMessageZsetToZset(tgtZset, "_rq::xx" + jobQueue, 10, 0, false);
Assert.assertEquals(0, stringRqueueRedisTemplate.getZsetSize(tgtZset).intValue());
Assert.assertEquals(
10, stringRqueueRedisTemplate.getZsetSize("_rq::xx" + jobQueueName).intValue());
Assert.assertEquals(10, stringRqueueRedisTemplate.getZsetSize("_rq::xx" + jobQueue).intValue());
}
}

0 comments on commit c748340

Please sign in to comment.