Skip to content

Commit

Permalink
Queue registry (#16)
Browse files Browse the repository at this point in the history
* Updated readme file

* WIP

* WIP

* WIP

* ouch all tests passed.
  • Loading branch information
sonus21 committed May 6, 2020
1 parent bd7ece0 commit f2f7286
Show file tree
Hide file tree
Showing 90 changed files with 1,560 additions and 1,605 deletions.
23 changes: 9 additions & 14 deletions README.md
Expand Up @@ -96,28 +96,25 @@ public class MessageListener {
log.info("simple-queue: {}", message);
}

@RqueueListener(value = "delayed-queue", delayedQueue = "true")
public void delayedMessage(String message) {
log.info("delayedMessage: {}", message);
}

@RqueueListener(value = "delayed-queue-2", delayedQueue = "true",
numRetries="3", deadLetterQueue="failed-delayed-queue")
public void delayedMessageWithDlq(String message) {
log.info("delayedMessageWithDlq: {}", message);
}

// Scheduled Job notification
@RqueueListener(value = "job-queue", delayedQueue = "true",
numRetries="3", deadLetterQueue="failed-job-queue")
public void onMessage(Job job) {
log.info("Job created: {}", job);
}

@RqueueListener(value = "notification-queue", delayedQueue = "true",
// Scheduled push notification
@RqueueListener(value = "push-notification-queue", delayedQueue = "true",
numRetries="3", deadLetterQueue="failed-notification-queue")
public void onMessage(Notification notification) {
log.info("Notification message: {}", notification);
}

// asynchronously send otp to the user
@RqueueListener(value = "otp", delayedQueue = "true")
public void onMessage(Otp otp) {
log.info("Otp message: {}", otp);
}
}
```

Expand Down Expand Up @@ -339,6 +336,4 @@ Please report problem, bug or feature(s) to [issue](https://github.com/sonus21/r
## License
The Rqueue is released under version 2.0 of the Apache License.




2 changes: 2 additions & 0 deletions build.gradle
Expand Up @@ -24,11 +24,13 @@ ext {
springVersion = System.getenv("SPRING_VERSION")
springDataVersion = System.getenv("SPRING_DATA_VERSION")
microMeterVersion = System.getenv("MICROMETER_VERSION")

springBootVersion = '2.1.0.RELEASE'
springVersion = '5.1.2.RELEASE'
springDataVersion = '2.1.2.RELEASE'
microMeterVersion = '1.1.0'


// logging dependencies
lombokVersion = '1.18.10'
logbackVersion = '1.2.3'
Expand Down

This file was deleted.

Expand Up @@ -16,52 +16,21 @@

package com.github.sonus21.rqueue.test.tests;

import static com.github.sonus21.rqueue.core.support.RqueueMessageFactory.buildMessage;
import static com.github.sonus21.rqueue.utils.TimeoutUtils.waitFor;
import static com.google.common.collect.Lists.newArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.exception.TimedOutException;
import com.github.sonus21.rqueue.producer.RqueueMessageSender;
import com.github.sonus21.rqueue.test.TestUtils;
import com.github.sonus21.rqueue.test.dto.Email;
import com.github.sonus21.rqueue.test.dto.Job;
import com.github.sonus21.rqueue.test.dto.Notification;
import com.github.sonus21.rqueue.test.service.ConsumedMessageService;
import com.github.sonus21.rqueue.test.service.FailureManager;
import com.github.sonus21.rqueue.utils.QueueUtils;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Random;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.RedisConnectionFactory;

public class MetricTestBase {
@Autowired protected ConsumedMessageService consumedMessageService;
@Autowired protected RqueueMessageSender messageSender;
@Autowired protected FailureManager failureManager;
@Autowired protected RedisConnectionFactory redisConnectionFactory;
public class MetricTestBase extends SpringTestBase {
@Autowired protected MeterRegistry meterRegistry;
@Autowired protected RqueueMessageTemplate rqueueMessageTemplate;

@Qualifier("stringRqueueRedisTemplate")
private RqueueRedisTemplate<String> rqueueRedisTemplate;

@Value("${email.dead.letter.queue.name}")
private String emailDlq;

@Value("${email.queue.name}")
private String emailQueueName;

@Value("${job.queue.name}")
private String jobQueue;

@Value("${notification.queue.name}")
private String notificationQueue;

public void verifyDelayedQueueStatus() throws TimedOutException {
Random random = new Random();
Expand All @@ -74,10 +43,7 @@ public void verifyDelayedQueueStatus() throws TimedOutException {
}
Notification notification = Notification.newInstance();
if (i < maxMessages / 2) {
rqueueMessageTemplate.addToZset(
QueueUtils.getDelayedQueueName(notificationQueue),
buildMessage(notification, notificationQueue, null, null),
System.currentTimeMillis() - delay);
enqueueIn(notification, rqueueConfig.getDelayedQueueName(notificationQueue), -delay);
} else {
messageSender.put(notificationQueue, notification, delay);
}
Expand Down Expand Up @@ -113,10 +79,7 @@ public void verifyDelayedQueueStatus() throws TimedOutException {
}

public void verifyMetricStatus() throws TimedOutException {
for (int i = 0; i < 10; i++) {
Email email = Email.newInstance();
rqueueMessageTemplate.addMessage(emailDlq, buildMessage(email, emailQueueName, null, null));
}
enqueue(emailDeadLetterQueue, i -> Email.newInstance(), 10);

Job job = Job.newInstance();
failureManager.createFailureDetail(job.getId(), -1, 0);
Expand All @@ -127,7 +90,7 @@ public void verifyMetricStatus() throws TimedOutException {
meterRegistry
.get("dead.letter.queue.size")
.tags("rqueue", "test")
.tags("queue", emailQueueName)
.tags("queue", emailQueue)
.gauge()
.value(),
0);
Expand All @@ -144,7 +107,7 @@ public void verifyMetricStatus() throws TimedOutException {
}

public void verifyCountStatus() throws TimedOutException {
messageSender.put(emailQueueName, Email.newInstance());
messageSender.put(emailQueue, Email.newInstance());
Job job = Job.newInstance();
failureManager.createFailureDetail(job.getId(), 1, 1);
messageSender.put(jobQueue, job);
Expand All @@ -159,29 +122,25 @@ public void verifyCountStatus() throws TimedOutException {
>= 1,
30000,
"job process",
() ->
TestUtils.printQueueStats(
newArrayList(jobQueue, emailQueueName, notificationQueue), rqueueMessageTemplate));
() -> printQueueStats(newArrayList(jobQueue, emailQueue, notificationQueue)));
waitFor(
() ->
meterRegistry
.get("execution.count")
.tags("rqueue", "test")
.tags("queue", emailQueueName)
.tags("queue", emailQueue)
.counter()
.count()
== 1,
"message process",
() ->
TestUtils.printQueueStats(
newArrayList(jobQueue, emailQueueName, notificationQueue), rqueueMessageTemplate));
() -> printQueueStats(newArrayList(jobQueue, emailQueue, notificationQueue)));

assertEquals(
0,
meterRegistry
.get("failure.count")
.tags("rqueue", "test")
.tags("queue", emailQueueName)
.tags("queue", emailQueue)
.counter()
.count(),
0);
Expand Down
@@ -0,0 +1,130 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.test.tests;

import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageSender;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.support.RqueueMessageFactory;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.test.service.ConsumedMessageService;
import com.github.sonus21.rqueue.test.service.FailureManager;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

@Slf4j
public class SpringTestBase {
@Autowired protected RqueueMessageSender rqueueMessageSender;
@Autowired protected RqueueMessageTemplate rqueueMessageTemplate;
@Autowired protected RqueueConfig rqueueConfig;
@Autowired protected RqueueWebConfig rqueueWebConfig;
@Autowired protected RqueueMessageSender messageSender;
@Autowired protected RqueueRedisTemplate<String> stringRqueueRedisTemplate;
@Autowired protected ConsumedMessageService consumedMessageService;
@Autowired protected RqueueMessageListenerContainer rqueueMessageListenerContainer;
@Autowired protected FailureManager failureManager;

@Value("${email.queue.name}")
protected String emailQueue;

@Value("${job.queue.name}")
protected String jobQueue;

@Value("${email.dead.letter.queue.name}")
protected String emailDeadLetterQueue;

@Value("${email.queue.retry.count}")
protected int emailRetryCount;

@Value("${notification.queue.name}")
protected String notificationQueue;

@Value("${notification.queue.retry.count}")
protected int notificationRetryCount;

protected void enqueue(Object message, String queueName) {
RqueueMessage rqueueMessage = RqueueMessageFactory.buildMessage(message, queueName, null, null);
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
}

public interface Factory {
Object next(int i);
}

public interface Delay {
long getDelay(int i);
}

protected void enqueue(String queueName, Factory factory, int n) {
for (int i = 0; i < n; i++) {
Object object = factory.next(i);
RqueueMessage rqueueMessage =
RqueueMessageFactory.buildMessage(object, queueName, null, null);
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
}
}

protected void enqueueIn(String zsetName, Factory factory, Delay delay, int n) {
for (int i = 0; i < n; i++) {
Object object = factory.next(i);
long score = delay.getDelay(i);
RqueueMessage rqueueMessage =
RqueueMessageFactory.buildMessage(object, zsetName, null, score);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, rqueueMessage.getProcessAt());
}
}

protected void enqueueIn(Object message, String zsetName, long delay) {
RqueueMessage rqueueMessage = RqueueMessageFactory.buildMessage(message, zsetName, null, delay);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage, rqueueMessage.getProcessAt());
}

protected Map<String, List<RqueueMessage>> getMessageMap(String queueName) {
Map<String, List<RqueueMessage>> queueNameToMessage = new HashMap<>();
List<RqueueMessage> messages =
rqueueMessageTemplate.readFromList(rqueueConfig.getQueueName(queueName), 0, -1);
queueNameToMessage.put(rqueueConfig.getQueueName(queueName), messages);

List<RqueueMessage> messagesFromZset =
rqueueMessageTemplate.readFromZset(rqueueConfig.getDelayedQueueName(queueName), 0, -1);
queueNameToMessage.put(rqueueConfig.getDelayedQueueName(queueName), messagesFromZset);

List<RqueueMessage> messagesInProcessingQueue =
rqueueMessageTemplate.readFromZset(rqueueConfig.getProcessingQueueName(queueName), 0, -1);
queueNameToMessage.put(
rqueueConfig.getProcessingQueueName(queueName), messagesInProcessingQueue);
return queueNameToMessage;
}

protected void printQueueStats(List<String> queueNames) {
for (String queueName : queueNames) {
for (Entry<String, List<RqueueMessage>> entry : getMessageMap(queueName).entrySet()) {
for (RqueueMessage message : entry.getValue()) {
log.info("Queue: {} Msg: {}", entry.getKey(), message);
}
}
}
}
}

0 comments on commit f2f7286

Please sign in to comment.