Skip to content

Commit

Permalink
Merge ce4e189 into 5f70248
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed Apr 8, 2020
2 parents 5f70248 + ce4e189 commit 817d2de
Show file tree
Hide file tree
Showing 51 changed files with 1,204 additions and 501 deletions.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
#Sat Oct 12 02:36:57 IST 2019
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,10 @@
package com.github.sonus21.rqueue.spring.boot.application;

import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.listener.ConsumerQueueDetail;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.processor.NoOpMessageProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
Expand All @@ -44,9 +45,12 @@ public static void main(String[] args) {
public RqueueMessageListenerContainer rqueueMessageListenerContainer(
RqueueMessageHandler rqueueMessageHandler, RedisConnectionFactory redisConnectionFactory) {
return new RqueueMessageListenerContainer(
rqueueMessageHandler, new RqueueMessageTemplate(redisConnectionFactory)) {
rqueueMessageHandler,
new RqueueMessageTemplate(redisConnectionFactory),
new NoOpMessageProcessor(),
new NoOpMessageProcessor()) {
@Override
protected void startQueue(String queueName, ConsumerQueueDetail queueDetail) {}
protected void startQueue(String queueName, QueueDetail queueDetail) {}
};
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.processor.NoOpMessageProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand Down Expand Up @@ -48,7 +49,10 @@ public RqueueMessageListenerContainer rqueueMessageListenerContainer(
RqueueMessageHandler rqueueMessageHandler, RedisConnectionFactory redisConnectionFactory) {
RqueueMessageListenerContainer rqueueMessageListenerContainer =
new RqueueMessageListenerContainer(
rqueueMessageHandler, new RqueueMessageTemplate(redisConnectionFactory));
rqueueMessageHandler,
new RqueueMessageTemplate(redisConnectionFactory),
new NoOpMessageProcessor(),
new NoOpMessageProcessor());
rqueueMessageListenerContainer.setMaxNumWorkers(maxWorkers);
return rqueueMessageListenerContainer;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package com.github.sonus21.rqueue.spring.boot.tests.integration;

import static com.github.sonus21.rqueue.utils.WaitForUtil.waitFor;
import static com.github.sonus21.rqueue.utils.TimeUtils.waitFor;
import static org.junit.Assert.assertEquals;

import com.github.sonus21.rqueue.exception.TimedOutException;
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,17 +16,17 @@

package com.github.sonus21.rqueue.spring.boot.tests.integration;

import static com.github.sonus21.rqueue.utils.RedisUtil.getRedisTemplate;
import static com.github.sonus21.rqueue.utils.WaitForUtil.waitFor;
import static com.github.sonus21.rqueue.utils.RedisUtils.getRedisTemplate;
import static com.github.sonus21.rqueue.utils.TimeUtils.waitFor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static rqueue.test.Utility.buildMessage;
import static rqueue.test.TestUtils.buildMessage;

import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.exception.TimedOutException;
import com.github.sonus21.rqueue.producer.RqueueMessageSender;
import com.github.sonus21.rqueue.spring.boot.application.ApplicationListenerDisabled;
import com.github.sonus21.rqueue.utils.QueueInfo;
import com.github.sonus21.rqueue.utils.QueueUtils;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
Expand Down Expand Up @@ -57,6 +57,7 @@ public class MessageChannelTest {
@Autowired private RqueueMessageSender messageSender;
@Autowired private RedisConnectionFactory redisConnectionFactory;
private RedisTemplate<String, RqueueMessage> redisTemplate;

@Value("${email.queue.name}")
private String emailQueue;

Expand All @@ -75,15 +76,15 @@ public void publishMessageIsTriggeredOnMessageAddition() throws TimedOutExceptio
redisTemplate
.opsForZSet()
.add(
QueueInfo.getTimeQueueName(emailQueue),
QueueUtils.getTimeQueueName(emailQueue),
buildMessage(email, emailQueue, null, null),
currentTime - 1000L);
}
email = Email.newInstance();
log.info("adding new message {}", email);
messageSender.put(emailQueue, email, 1000L);
waitFor(
() -> redisTemplate.opsForZSet().size(QueueInfo.getTimeQueueName(emailQueue)) <= 1,
() -> redisTemplate.opsForZSet().size(QueueUtils.getTimeQueueName(emailQueue)) <= 1,
"one or less messages in zset");
assertTrue(
"Messages are correctly moved",
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package com.github.sonus21.rqueue.spring.boot.tests.integration;

import static com.github.sonus21.rqueue.utils.WaitForUtil.waitFor;
import static com.github.sonus21.rqueue.utils.TimeUtils.waitFor;
import static org.junit.Assert.assertEquals;

import com.github.sonus21.rqueue.exception.TimedOutException;
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,15 +16,15 @@

package com.github.sonus21.rqueue.spring.boot.tests.integration;

import static com.github.sonus21.rqueue.utils.RedisUtil.getRedisTemplate;
import static com.github.sonus21.rqueue.utils.WaitForUtil.waitFor;
import static rqueue.test.Utility.buildMessage;
import static com.github.sonus21.rqueue.utils.RedisUtils.getRedisTemplate;
import static com.github.sonus21.rqueue.utils.TimeUtils.waitFor;
import static rqueue.test.TestUtils.buildMessage;

import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.exception.TimedOutException;
import com.github.sonus21.rqueue.producer.RqueueMessageSender;
import com.github.sonus21.rqueue.spring.boot.application.ApplicationWithCustomConfiguration;
import com.github.sonus21.rqueue.utils.QueueInfo;
import com.github.sonus21.rqueue.utils.QueueUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
Expand All @@ -43,7 +43,7 @@
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import rqueue.test.RunTestUntilFail;
import rqueue.test.Utility;
import rqueue.test.TestUtils;
import rqueue.test.dto.Job;
import rqueue.test.service.ConsumedMessageService;

Expand All @@ -70,20 +70,22 @@ public class ProcessingMessageSchedulerTest {

@Value("${job.queue.name}")
private String jobQueueName;

@Rule
public RunTestUntilFail retry =
new RunTestUntilFail(
log,
1,
() -> {
for (Entry<String, List<RqueueMessage>> entry :
Utility.getMessageMap(jobQueueName, redisTemplate).entrySet()) {
TestUtils.getMessageMap(jobQueueName, redisTemplate).entrySet()) {
log.error("FAILING Queue {}", entry.getKey());
for (RqueueMessage message : entry.getValue()) {
log.error("FAILING Queue {} Msg {}", entry.getKey(), message);
}
}
});

private int messageCount = 110;

@PostConstruct
Expand All @@ -94,7 +96,7 @@ public void init() {
@Test
public void publishMessageIsTriggeredOnMessageRemoval()
throws InterruptedException, TimedOutException {
String processingQueueName = QueueInfo.getProcessingQueueName(jobQueueName);
String processingQueueName = QueueUtils.getProcessingQueueName(jobQueueName);
long currentTime = System.currentTimeMillis();
List<Job> jobs = new ArrayList<>();
List<String> ids = new ArrayList<>();
Expand Down
Expand Up @@ -19,7 +19,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.github.sonus21.rqueue.listener.ConsumerQueueDetail;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.spring.app.AppWithMetricEnabled;
import java.util.Map;
Expand Down Expand Up @@ -54,7 +54,7 @@ public class SpringAppTest {

@Test
public void numListeners() {
Map<String, ConsumerQueueDetail> registeredQueue = container.getRegisteredQueues();
Map<String, QueueDetail> registeredQueue = container.getRegisteredQueues();
assertEquals(3, registeredQueue.size());
assertTrue(registeredQueue.containsKey(notificationQueueName));
assertTrue(registeredQueue.containsKey(emailQueue));
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@

import com.github.sonus21.rqueue.converter.GenericMessageConverter;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.utils.QueueInfo;
import com.github.sonus21.rqueue.utils.QueueUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -31,7 +31,9 @@
import org.springframework.util.CollectionUtils;

@Slf4j
public abstract class Utility {
public class TestUtils {
private TestUtils() {}

private static final GenericMessageConverter converter = new GenericMessageConverter();

public static RqueueMessage buildMessage(
Expand All @@ -51,30 +53,30 @@ public static Map<String, List<RqueueMessage>> getMessageMap(
queueNameToMessage.put(queueName, messages);

Set<RqueueMessage> messagesFromZset =
redisTemplate.opsForZSet().range(QueueInfo.getTimeQueueName(queueName), 0, -1);
redisTemplate.opsForZSet().range(QueueUtils.getTimeQueueName(queueName), 0, -1);
if (!CollectionUtils.isEmpty(messagesFromZset)) {
messages = new ArrayList<>(messagesFromZset);
} else {
messages = new ArrayList<>();
}
queueNameToMessage.put(QueueInfo.getTimeQueueName(queueName), messages);
queueNameToMessage.put(QueueUtils.getTimeQueueName(queueName), messages);

Set<RqueueMessage> messagesInProcessingQueue =
redisTemplate.opsForZSet().range(QueueInfo.getProcessingQueueName(queueName), 0, -1);
redisTemplate.opsForZSet().range(QueueUtils.getProcessingQueueName(queueName), 0, -1);
if (!CollectionUtils.isEmpty(messagesInProcessingQueue)) {
messages = new ArrayList<>(messagesInProcessingQueue);
} else {
messages = new ArrayList<>();
}
queueNameToMessage.put(QueueInfo.getProcessingQueueName(queueName), messages);
queueNameToMessage.put(QueueUtils.getProcessingQueueName(queueName), messages);
return queueNameToMessage;
}

public static void printQueueStats(
List<String> queueNames, RedisTemplate<String, RqueueMessage> redisTemplate) {
for (String queueName : queueNames) {
for (Entry<String, List<RqueueMessage>> entry :
Utility.getMessageMap(queueName, redisTemplate).entrySet()) {
TestUtils.getMessageMap(queueName, redisTemplate).entrySet()) {
for (RqueueMessage message : entry.getValue()) {
log.info("Queue: {} Msg: {}", entry.getKey(), message);
}
Expand Down
Expand Up @@ -16,25 +16,25 @@

package rqueue.test.tests;

import static com.github.sonus21.rqueue.utils.RedisUtil.getRedisTemplate;
import static com.github.sonus21.rqueue.utils.WaitForUtil.waitFor;
import static com.github.sonus21.rqueue.utils.RedisUtils.getRedisTemplate;
import static com.github.sonus21.rqueue.utils.TimeUtils.waitFor;
import static com.google.common.collect.Lists.newArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static rqueue.test.Utility.buildMessage;
import static rqueue.test.TestUtils.buildMessage;

import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.exception.TimedOutException;
import com.github.sonus21.rqueue.producer.RqueueMessageSender;
import com.github.sonus21.rqueue.utils.QueueInfo;
import com.github.sonus21.rqueue.utils.QueueUtils;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Random;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import rqueue.test.Utility;
import rqueue.test.TestUtils;
import rqueue.test.dto.Email;
import rqueue.test.dto.Job;
import rqueue.test.dto.Notification;
Expand All @@ -49,11 +49,6 @@ public class MetricTestBase {
@Autowired protected MeterRegistry meterRegistry;
protected RedisTemplate<String, RqueueMessage> redisTemplate;

@PostConstruct
public void init() {
redisTemplate = getRedisTemplate(redisConnectionFactory);
}

@Value("${email.dead.letter.queue.name}")
private String emailDlq;

Expand All @@ -66,6 +61,11 @@ public void init() {
@Value("${notification.queue.name}")
private String notificationQueue;

@PostConstruct
public void init() {
redisTemplate = getRedisTemplate(redisConnectionFactory);
}

public void delayedQueueStatus(RedisTemplate<String, RqueueMessage> redisTemplate)
throws TimedOutException {
Random random = new Random();
Expand All @@ -81,7 +81,7 @@ public void delayedQueueStatus(RedisTemplate<String, RqueueMessage> redisTemplat
redisTemplate
.opsForZSet()
.add(
QueueInfo.getTimeQueueName(notificationQueue),
QueueUtils.getTimeQueueName(notificationQueue),
buildMessage(notification, notificationQueue, null, null),
System.currentTimeMillis() - delay);
} else {
Expand Down Expand Up @@ -169,7 +169,7 @@ public void countStatus() throws TimedOutException {
30000,
"job process",
() ->
Utility.printQueueStats(
TestUtils.printQueueStats(
newArrayList(jobQueue, emailQueueName, notificationQueue), redisTemplate));
waitFor(
() ->
Expand All @@ -182,7 +182,7 @@ public void countStatus() throws TimedOutException {
== 1,
"message process",
() ->
Utility.printQueueStats(
TestUtils.printQueueStats(
newArrayList(jobQueue, emailQueueName, notificationQueue), redisTemplate));

assertEquals(
Expand Down

0 comments on commit 817d2de

Please sign in to comment.