Skip to content

Commit

Permalink
New test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed Dec 23, 2020
1 parent 06abf09 commit 8284d41
Show file tree
Hide file tree
Showing 22 changed files with 608 additions and 120 deletions.
Expand Up @@ -58,4 +58,11 @@ protected String getThreadNamePrefix() {
protected int getThreadPoolSize() {
return rqueueSchedulerConfig.getDelayedMessageThreadPoolSize();
}

@Override
protected boolean isProcessingQueue(String queueName) {
return false;
}


}
Expand Up @@ -80,6 +80,8 @@ public abstract class MessageScheduler

protected abstract int getThreadPoolSize();

protected abstract boolean isProcessingQueue(String queueName);

private void doStart() {
for (String queueName : queueRunningState.keySet()) {
startQueue(queueName);
Expand Down Expand Up @@ -203,7 +205,11 @@ protected synchronized void schedule(String queueName, Long startTime, boolean f
long requiredDelay = max(1, startTime - currentTime);
long taskStartTime = startTime;
MessageMoverTask timerTask =
new MessageMoverTask(queueDetail.getName(), queueDetail.getQueueName(), zsetName);
new MessageMoverTask(
queueDetail.getName(),
queueDetail.getQueueName(),
zsetName,
isProcessingQueue(queueDetail.getName()));
Future<?> future;
if (requiredDelay < MIN_DELAY) {
future = scheduler.submit(timerTask);
Expand Down Expand Up @@ -233,7 +239,11 @@ protected synchronized void schedule(String queueName, Long startTime, boolean f
}
// Run was succeeded or cancelled submit new one
MessageMoverTask timerTask =
new MessageMoverTask(queueDetail.getName(), queueDetail.getQueueName(), zsetName);
new MessageMoverTask(
queueDetail.getName(),
queueDetail.getQueueName(),
zsetName,
isProcessingQueue(zsetName));
Future<?> future =
scheduler.schedule(
timerTask, Instant.ofEpochMilli(getNextScheduleTime(queueName, startTime)));
Expand Down Expand Up @@ -276,11 +286,13 @@ private class MessageMoverTask implements Runnable {
private final String name;
private final String queueName;
private final String zsetName;
private final boolean processingQueue;

MessageMoverTask(String name, String queueName, String zsetName) {
MessageMoverTask(String name, String queueName, String zsetName, boolean processingQueue) {
this.name = name;
this.queueName = queueName;
this.zsetName = zsetName;
this.processingQueue = processingQueue;
}

@Override
Expand All @@ -291,7 +303,11 @@ public void run() {
long currentTime = System.currentTimeMillis();
Long value =
defaultScriptExecutor.execute(
redisScript, Arrays.asList(queueName, zsetName), currentTime, MAX_MESSAGES);
redisScript,
Arrays.asList(queueName, zsetName),
currentTime,
MAX_MESSAGES,
processingQueue ? 1 : 0);
long nextExecutionTime = getNextScheduleTime(name, value);
schedule(name, nextExecutionTime, true);
}
Expand Down
Expand Up @@ -59,6 +59,11 @@ protected int getThreadPoolSize() {
return rqueueSchedulerConfig.getProcessingMessageThreadPoolSize();
}

@Override
protected boolean isProcessingQueue(String queueName) {
return true;
}

@Override
protected String getThreadNamePrefix() {
return "processingMessageScheduler-";
Expand Down
2 changes: 1 addition & 1 deletion rqueue-core/src/main/resources/scripts/delete_if_same.lua
@@ -1,6 +1,6 @@
-- get current value
local value = redis.call('GET', KEYS[1])
if value then
if not value then
return true
end
if value == ARGV[1] then
Expand Down
28 changes: 27 additions & 1 deletion rqueue-core/src/main/resources/scripts/move_expired_message.lua
@@ -1,7 +1,33 @@
local expiredValues = redis.call('ZRANGEBYSCORE', KEYS[2], 0, ARGV[1], 'LIMIT', 0, ARGV[2])
local updateFailureCount = tonumber(ARGV[3])
local function getValue(v)
if updateFailureCount == 0 then
return v
end
if string.sub(v, 59, 70) == 'failureCount' then
-- ":
local index = 73
local c = string.sub(v, index, index)
local retryCount = 0;
while (c == '0' or c == '1' or c == '2' or c == '3' or c == '4' or
c == '5' or c == '6' or c == '7' or c == '8' or c == '9')
do
retryCount = retryCount * 10 + c
index = index + 1
c = string.sub(v, index, index)
end
if index == 73 then
return v
end
retryCount = retryCount + 1
return string.sub(v, 1, 72) .. tostring(retryCount) .. string.sub(v, index)
end
return v
end

if #expiredValues > 0 then
for _, v in ipairs(expiredValues) do
redis.call('RPUSH', KEYS[1], v)
redis.call('RPUSH', KEYS[1], getValue(v))
end ;
redis.call('ZREM', KEYS[2], unpack(expiredValues))
end
Expand Down
@@ -0,0 +1,142 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.core;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.github.sonus21.TestBase;
import com.github.sonus21.junit.BootstrapRedis;
import com.github.sonus21.junit.TestQueue;
import com.github.sonus21.rqueue.CoreUnitTest;
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
import com.github.sonus21.rqueue.core.RedisScriptFactory.ScriptType;
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.script.DefaultScriptExecutor;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.script.ScriptExecutor;

@CoreUnitTest
@BootstrapRedis(systemRedis = false, port = 6301)
class RedisScriptFactoryTest extends TestBase {
private final RedisConnectionFactory redisConnectionFactory;
private final RqueueMessageTemplate rqueueMessageTemplate;

RedisScriptFactoryTest(RedisConnectionFactory redisConnectionFactory) {
this.redisConnectionFactory = redisConnectionFactory;
this.rqueueMessageTemplate = new RqueueMessageTemplateImpl(redisConnectionFactory);
}

@Test
@TestQueue(
value = {
"testExpiredMessageMoverWithFailureQueue",
"__rq::p-queue::testExpiredMessageMoverWithFailureQueue"
})
void testExpiredMessageMoverWithFailureQueue() {
String queueName = "testExpiredMessageMoverWithFailureQueue";
String zsetName = "__rq::p-queue::testExpiredMessageMoverWithFailureQueue";
RqueueMessage rqueueMessage1 = RqueueMessage.builder().message("Test message 1").build();
RqueueMessage rqueueMessage2 =
RqueueMessage.builder().message("Test message 2").failureCount(1).build();
RqueueMessage rqueueMessage3 =
RqueueMessage.builder().message("Test message 3").failureCount(110).build();
RqueueMessage rqueueMessage4 = RqueueMessage.builder().message("Test message 4").build();
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage1, 1000);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage2, 1500);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage3, 2000);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage4, 2500);
RedisScript<Long> script = RedisScriptFactory.getScript(ScriptType.MOVE_EXPIRED_MESSAGE);
RqueueRedisTemplate<Long> rqueueRedisTemplate =
new RqueueRedisTemplate<>(redisConnectionFactory);
ScriptExecutor<String> scriptExecutor =
new DefaultScriptExecutor<>(rqueueRedisTemplate.getRedisTemplate());
scriptExecutor.execute(script, Arrays.asList(queueName, zsetName), 2000, 100, 1);
List<RqueueMessage> messagesFromList = rqueueMessageTemplate.readFromList(queueName, 0, -1);
List<RqueueMessage> messagesFromZset = rqueueMessageTemplate.readFromZset(zsetName, 0, -1);
assertEquals(3, messagesFromList.size());
assertEquals(1, messagesFromZset.size());
assertEquals(1, messagesFromList.get(0).getFailureCount());
assertEquals(2, messagesFromList.get(1).getFailureCount());
assertEquals(111, messagesFromList.get(2).getFailureCount());
}

@Test
@TestQueue(value = {"testExpiredMessageMover", "__rq::d-queue::testExpiredMessageMover"})
void testExpiredMessageMover() {
String queueName = "testExpiredMessageMover";
String zsetName = "__rq::d-queue::testExpiredMessageMover";
RqueueMessage rqueueMessage1 = RqueueMessage.builder().message("Test message 1").build();
RqueueMessage rqueueMessage2 =
RqueueMessage.builder().message("Test message 2").failureCount(1).build();
RqueueMessage rqueueMessage3 =
RqueueMessage.builder().message("Test message 3").failureCount(110).build();
RqueueMessage rqueueMessage4 = RqueueMessage.builder().message("Test message 4").build();
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage1, 1000);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage2, 1500);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage3, 2000);
rqueueMessageTemplate.addToZset(zsetName, rqueueMessage4, 2500);
RedisScript<Long> script = RedisScriptFactory.getScript(ScriptType.MOVE_EXPIRED_MESSAGE);
RqueueRedisTemplate<Long> rqueueRedisTemplate =
new RqueueRedisTemplate<>(redisConnectionFactory);
ScriptExecutor<String> scriptExecutor =
new DefaultScriptExecutor<>(rqueueRedisTemplate.getRedisTemplate());
scriptExecutor.execute(script, Arrays.asList(queueName, zsetName), 2000, 100, 0);
List<RqueueMessage> messagesFromList = rqueueMessageTemplate.readFromList(queueName, 0, -1);
List<RqueueMessage> messagesFromZset = rqueueMessageTemplate.readFromZset(zsetName, 0, -1);
assertEquals(3, messagesFromList.size());
assertEquals(1, messagesFromZset.size());
assertEquals(0, messagesFromList.get(0).getFailureCount());
assertEquals(1, messagesFromList.get(1).getFailureCount());
assertEquals(110, messagesFromList.get(2).getFailureCount());
}

@Test
@TestQueue("testDeleteIfSame")
void testDeleteIfSame() {
String key = "testDeleteIfSame";
RqueueMessage rqueueMessage = RqueueMessage.builder().message("Test message 1").build();
RqueueMessage rqueueMessage2 = RqueueMessage.builder().message("Test message 2").build();
RedisScript<Boolean> script = RedisScriptFactory.getScript(ScriptType.DELETE_IF_SAME);
RqueueRedisTemplate<RqueueMessage> template =
new RqueueMessageTemplateImpl(redisConnectionFactory);
template.set(key, rqueueMessage);
ScriptExecutor<String> scriptExecutor =
new DefaultScriptExecutor<>(template.getRedisTemplate());
assertTrue(template.exist(key));

// value mismatch
assertFalse(scriptExecutor.execute(script, Collections.singletonList(key), rqueueMessage2));

assertTrue(template.exist(key));

// actual delete
assertTrue(scriptExecutor.execute(script, Collections.singletonList(key), rqueueMessage));
assertFalse(template.exist(key));

// key does not exist test
assertTrue(scriptExecutor.execute(script, Collections.singletonList(key), rqueueMessage2));

assertTrue(scriptExecutor.execute(script, Collections.singletonList(key), rqueueMessage));
}
}
Expand Up @@ -36,7 +36,7 @@
"mysql.db.name=BootDelayedChannelTest",
"max.workers.count=120",
"use.system.redis=false",
"monitor.thread.count=1",
"monitor.enabled=true"
})
@SpringBootTest
@Slf4j
Expand Down
Expand Up @@ -37,7 +37,7 @@
"mysql.db.name=BootProcessingChannelTest",
"max.workers.count=120",
"use.system.redis=false",
"monitor.thread.count=1",
"monitor.enabled=true",
"start.queue.enabled=true",
})
@SpringBootTest
Expand Down
Expand Up @@ -35,7 +35,7 @@
"spring.redis.port=8005",
"spring.redis2.port=8006",
"spring.redis2.host=localhost",
"monitor.thread.count=1",
"monitor.enabled=true",
})
@SpringBootIntegrationTest
class MultiRedisSetup extends RetryTests {
Expand Down
22 changes: 0 additions & 22 deletions rqueue-spring-common-test/build.gradle
Expand Up @@ -5,26 +5,4 @@ dependencies {
// https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus
compile group: 'io.micrometer', name: 'micrometer-registry-prometheus', version: "${microMeterVersion}"

// https://mvnrepository.com/artifact/io.lettuce/lettuce-core
compile group: 'io.lettuce', name: 'lettuce-core', version: "${lettuceVersion}"

// https://mvnrepository.com/artifact/javax.persistence/javax.persistence-api
compile group: 'javax.persistence', name: 'javax.persistence-api', version: "${jpaVersion}"

compile "com.h2database:h2:${h2Version}"
compile "it.ozimov:embedded-redis:${embeddedRedisVersion}"

// https://mvnrepository.com/artifact/org.springframework.data/spring-data-jpa
compile group: 'org.springframework.data', name: 'spring-data-jpa', version: "${springBootVersion}"

// https://mvnrepository.com/artifact/org.hibernate/hibernate-core
compile group: 'org.hibernate', name: 'hibernate-core', version: "${hibernateVersion}"

// https://mvnrepository.com/artifact/javax.annotation/javax.annotation-api
compile group: 'javax.annotation', name: 'javax.annotation-api', version: "${javaxAnnotationVersion}"

compile group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: "${jupiterVersion}"

compile group: 'org.apache.commons', name: 'commons-lang3', version: "${lang3Version}"

}

0 comments on commit 8284d41

Please sign in to comment.