Skip to content

Commit

Permalink
Renamed otp to sms
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed May 10, 2020
1 parent b4ee2db commit b61cdf6
Show file tree
Hide file tree
Showing 16 changed files with 63 additions and 66 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ public class MessageListener {
log.info("Notification message: {}", notification);
}

// asynchronously send otp to the user
@RqueueListener(value = "otp", priority="critical=10,high=8,medium=4,low=1")
public void onMessage(Otp otp) {
log.info("Otp message: {}", otp);
// asynchronously send sms to the user
@RqueueListener(value = "sms", priority="critical=10,high=8,medium=4,low=1")
public void onMessage(Sms sms) {
log.info("Sms message: {}", sms);
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.github.sonus21.rqueue.test.dto.FeedGeneration;
import com.github.sonus21.rqueue.test.dto.Job;
import com.github.sonus21.rqueue.test.dto.Notification;
import com.github.sonus21.rqueue.test.dto.Otp;
import com.github.sonus21.rqueue.test.dto.Sms;
import com.github.sonus21.rqueue.test.dto.Reservation;
import com.github.sonus21.rqueue.test.service.ConsumedMessageService;
import com.github.sonus21.rqueue.test.service.FailureManager;
Expand Down Expand Up @@ -77,17 +77,17 @@ public void onMessage(Email email) throws Exception {
}

@RqueueListener(
value = "${otp.queue}",
active = "${otp.queue.active}",
priority = "${otp.queue.priority}",
priorityGroup = "${otp.queue.group}",
concurrency = "${otp.queue.concurrency}")
public void onMessage(Otp otp) throws Exception {
log.info("OtpListener: {}", otp);
if (failureManager.shouldFail(otp.getId())) {
throw new Exception("Failing otp task to be retried" + otp);
value = "${sms.queue}",
active = "${sms.queue.active}",
priority = "${sms.queue.priority}",
priorityGroup = "${sms.queue.group}",
concurrency = "${sms.queue.concurrency}")
public void onMessage(Sms sms) throws Exception {
log.info("SmsListener: {}", sms);
if (failureManager.shouldFail(sms.getId())) {
throw new Exception("Failing sms task to be retried" + sms);
}
consumedMessageService.save(otp);
consumedMessageService.save(sms);
}

@RqueueListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.github.sonus21.rqueue.test.dto;

import java.util.Random;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
Expand All @@ -32,16 +31,14 @@
@Setter
@ToString
@EqualsAndHashCode(callSuper = true)
public class Otp extends BaseQueueMessage {
public class Sms extends BaseQueueMessage {
private String phoneNumber;
private String otp;
private String sms;

public static Otp newInstance() {
Otp otp =
new Otp(
"+91" + RandomStringUtils.randomNumeric(10),
String.valueOf(new Random().nextInt(100000)));
otp.setId(UUID.randomUUID().toString());
return otp;
public static Sms newInstance() {
String txt = "Dear , Version 2.0 of Rqueue is released.";
Sms sms = new Sms("+91" + RandomStringUtils.randomNumeric(10), txt);
sms.setId(UUID.randomUUID().toString());
return sms;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.github.sonus21.rqueue.test.dto.Email;
import com.github.sonus21.rqueue.test.dto.FeedGeneration;
import com.github.sonus21.rqueue.test.dto.Job;
import com.github.sonus21.rqueue.test.dto.Otp;
import com.github.sonus21.rqueue.test.dto.Sms;
import com.github.sonus21.rqueue.test.dto.Reservation;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.TimeoutUtils;
Expand All @@ -41,20 +41,20 @@ protected void checkGroupConsumer() throws TimedOutException {
}

protected void checkQueueLevelConsumer() throws TimedOutException {
rqueueMessageSender.enqueue(otpQueue, Otp.newInstance());
rqueueMessageSender.enqueueWithPriority(otpQueue, "critical", Otp.newInstance());
rqueueMessageSender.enqueueWithPriority(otpQueue, "high", Otp.newInstance());
rqueueMessageSender.enqueueWithPriority(otpQueue, "medium", Otp.newInstance());
rqueueMessageSender.enqueueWithPriority(otpQueue, "low", Otp.newInstance());
rqueueMessageSender.enqueue(smsQueue, Sms.newInstance());
rqueueMessageSender.enqueueWithPriority(smsQueue, "critical", Sms.newInstance());
rqueueMessageSender.enqueueWithPriority(smsQueue, "high", Sms.newInstance());
rqueueMessageSender.enqueueWithPriority(smsQueue, "medium", Sms.newInstance());
rqueueMessageSender.enqueueWithPriority(smsQueue, "low", Sms.newInstance());
TimeoutUtils.waitFor(
() ->
getMessageCount(
Arrays.asList(
otpQueue,
otpQueue + "_critical",
otpQueue + "_high",
otpQueue + "_medium",
otpQueue + "_low"))
smsQueue,
smsQueue + "_critical",
smsQueue + "_high",
smsQueue + "_medium",
smsQueue + "_low"))
== 0,
20 * Constants.ONE_MILLI,
"multi level queues to drain");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,26 @@
package com.github.sonus21.rqueue.test.tests;

import com.github.sonus21.rqueue.exception.TimedOutException;
import com.github.sonus21.rqueue.test.dto.Otp;
import com.github.sonus21.rqueue.test.dto.Sms;
import com.github.sonus21.rqueue.utils.TimeoutUtils;
import java.util.Arrays;

public abstract class MultiLevelQueueListenerTestBase extends SpringTestBase {
protected void checkQueueLevelConsumer() throws TimedOutException {
rqueueMessageSender.enqueue(otpQueue, Otp.newInstance());
rqueueMessageSender.enqueueWithPriority(otpQueue, "critical", Otp.newInstance());
rqueueMessageSender.enqueueWithPriority(otpQueue, "high", Otp.newInstance());
rqueueMessageSender.enqueueWithPriority(otpQueue, "medium", Otp.newInstance());
rqueueMessageSender.enqueueWithPriority(otpQueue, "low", Otp.newInstance());
rqueueMessageSender.enqueue(smsQueue, Sms.newInstance());
rqueueMessageSender.enqueueWithPriority(smsQueue, "critical", Sms.newInstance());
rqueueMessageSender.enqueueWithPriority(smsQueue, "high", Sms.newInstance());
rqueueMessageSender.enqueueWithPriority(smsQueue, "medium", Sms.newInstance());
rqueueMessageSender.enqueueWithPriority(smsQueue, "low", Sms.newInstance());
TimeoutUtils.waitFor(
() ->
getMessageCount(
Arrays.asList(
otpQueue,
otpQueue + "_critical",
otpQueue + "_high",
otpQueue + "_medium",
otpQueue + "_low"))
smsQueue,
smsQueue + "_critical",
smsQueue + "_high",
smsQueue + "_medium",
smsQueue + "_low"))
== 0,
"Waiting for multi level queues to drain");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public abstract class SpringTestBase {
@Value("${notification.queue.retry.count}")
protected int notificationRetryCount;

@Value("${otp.queue}")
protected String otpQueue;
@Value("${sms.queue}")
protected String smsQueue;

@Value("${feed.generation.queue}")
protected String feedGenerationQueue;
Expand Down
10 changes: 5 additions & 5 deletions rqueue-common-test/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ email.queue.active=true
mysql.db.name=test
rqueue.metrics.tags.rqueue=test
email.execution.time=15*60*1000
otp.queue=otp
otp.queue.active=false
otp.queue.group=
otp.queue.priority=critical:10, high:6, medium:4, low:2
otp.queue.concurrency=-1
sms.queue=sms
sms.queue.active=false
sms.queue.group=
sms.queue.priority=critical:10, high:6, medium:4, low:2
sms.queue.concurrency=-1
chat.indexing.queue=chat-indexing
chat.indexing.queue.active=false
chat.indexing.queue.priority=30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
properties = {
"spring.redis.port=7004",
"mysql.db.name=SpringAppTest",
"otp.queue.active=true",
"sms.queue.active=true",
"notification.queue.active=false",
"email.queue.active=true",
"job.queue.active=true",
Expand All @@ -71,7 +71,7 @@ public void numActiveQueues() {
assertTrue(registeredQueue.containsKey(jobQueue));
assertTrue(registeredQueue.containsKey(feedGenerationQueue));
assertTrue(registeredQueue.containsKey(reservationQueue));
assertTrue(registeredQueue.containsKey(otpQueue));
assertTrue(registeredQueue.containsKey(smsQueue));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@
properties = {
"spring.redis.port=7013",
"mysql.db.name=StrictHeterogeneousConcurrencyBasedQueueListener",
"otp.queue.active=true",
"sms.queue.active=true",
"notification.queue.active=false",
"email.queue.active=true",
"job.queue.active=true",
"priority.mode=STRICT",
"reservation.queue.active=true",
"feed.generation.queue.active=true",
"chat.indexing.queue.active=true",
"otp.queue.concurrency=5",
"sms.queue.concurrency=5",
"reservation.queue.concurrency=2",
"feed.generation.queue.concurrency=1-5",
"chat.indexing.queue.concurrency=3-5",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
properties = {
"spring.redis.port=7011",
"mysql.db.name=StrictHeterogeneousQueueListener",
"otp.queue.active=true",
"sms.queue.active=true",
"notification.queue.active=false",
"email.queue.active=true",
"job.queue.active=true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"spring.redis.port=7007",
"mysql.db.name=StrictMultiLevelQueueListener",
"max.workers.count=10",
"otp.queue.active=true",
"sms.queue.active=true",
"notification.queue.active=false",
"email.queue.active=false",
"job.queue.active=false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
properties = {
"spring.redis.port=7009",
"mysql.db.name=StrictPriorityQueueListenerTest",
"otp.queue.active=true",
"sms.queue.active=true",
"notification.queue.active=false",
"email.queue.active=false",
"job.queue.active=false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@
properties = {
"spring.redis.port=7014",
"mysql.db.name=WeightedHeterogeneousConcurrencyBasedQueueListener",
"otp.queue.active=true",
"sms.queue.active=true",
"notification.queue.active=false",
"email.queue.active=true",
"job.queue.active=true",
"priority.mode=WEIGHTED",
"reservation.queue.active=true",
"feed.generation.queue.active=true",
"chat.indexing.queue.active=true",
"otp.queue.concurrency=5",
"sms.queue.concurrency=5",
"reservation.queue.concurrency=2",
"feed.generation.queue.concurrency=1-5",
"chat.indexing.queue.concurrency=3-5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
properties = {
"spring.redis.port=7012",
"mysql.db.name=WeightedHeterogeneousQueueListener",
"otp.queue.active=true",
"sms.queue.active=true",
"notification.queue.active=false",
"email.queue.active=true",
"job.queue.active=true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
properties = {
"spring.redis.port=7006",
"mysql.db.name=WeightedMultiLevelQueueListener",
"otp.queue.active=true",
"sms.queue.active=true",
"notification.queue.active=false",
"email.queue.active=false",
"job.queue.active=false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
properties = {
"spring.redis.port=7010",
"mysql.db.name=WeightedPriorityQueueListener",
"otp.queue.active=false",
"sms.queue.active=false",
"notification.queue.active=false",
"email.queue.active=false",
"job.queue.active=false",
Expand Down

0 comments on commit b61cdf6

Please sign in to comment.