Skip to content

Commit

Permalink
clipboardPatchFile
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed Dec 31, 2020
1 parent 1120c23 commit b1cc9ed
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 40 deletions.
Expand Up @@ -19,6 +19,7 @@
import static com.github.sonus21.rqueue.listener.RqueueMessageHeaders.buildMessageHeaders;
import static com.github.sonus21.rqueue.utils.Constants.DELTA_BETWEEN_RE_ENQUEUE_TIME;
import static com.github.sonus21.rqueue.utils.Constants.ONE_MILLI;
import static com.github.sonus21.rqueue.utils.Constants.REDIS_KEY_SEPARATOR;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
Expand Down Expand Up @@ -267,33 +268,48 @@ private void handleMessage() {
}
}

private void processPeriodicMessage() {
RqueueMessage newMessage =
job.getRqueueMessage().toBuilder()
.processAt(job.getRqueueMessage().nextProcessAt())
.build();
// avoid duplicate message enqueue due to retry by checking the message key
// avoid cross slot error by using tagged queue name in the key
String messageId =
job.getQueueDetail().getQueueName()
+ Constants.REDIS_KEY_SEPARATOR
+ job.getRqueueMessage().getId()
+ "::sch::"
+ newMessage.getProcessAt();
// let's assume a message can be executing for at most 2x of their visibility timeout
private long getTtlForScheduledMessageKey(RqueueMessage message) {
// Assume a message can be executing for at most 2x of their visibility timeout
// due to failure in some other job same message should not be enqueued
long expiryInSeconds = 2 * job.getQueueDetail().getVisibilityTimeout() / ONE_MILLI;
// A message wil be processed after period, so it must stay in the system till that time
// how many more seconds are left to process this message
long remainingTime = (newMessage.getProcessAt() - System.currentTimeMillis()) / ONE_MILLI;
long remainingTime = (message.getProcessAt() - System.currentTimeMillis()) / ONE_MILLI;
if (remainingTime > 0) {
expiryInSeconds += remainingTime;
}
return expiryInSeconds;
}

private String getScheduledMessageKey(RqueueMessage message) {
// avoid duplicate message enqueue due to retry by checking the message key
// avoid cross slot error by using tagged queue name in the key
// enqueuing duplicate message can lead to duplicate consumption when one job is executing task
// at the same time this message was enqueued.
return String.format(
"%s%s%s%ssch%s%d",
job.getQueueDetail().getQueueName(),
REDIS_KEY_SEPARATOR,
job.getRqueueMessage().getId(),
REDIS_KEY_SEPARATOR,
REDIS_KEY_SEPARATOR,
message.getProcessAt());
}

private void processPeriodicMessage() {
RqueueMessage newMessage =
job.getRqueueMessage().toBuilder()
.processAt(job.getRqueueMessage().nextProcessAt())
.build();
String messageKey = getScheduledMessageKey(newMessage);
long expiryInSeconds = getTtlForScheduledMessageKey(newMessage);
log.debug(
"Schedule periodic message: {} Status: {}",
job.getRqueueMessage(),
getRqueueMessageTemplate()
.scheduleMessage(
job.getQueueDetail().getDelayedQueueName(),
messageId,
messageKey,
newMessage,
expiryInSeconds));
handleMessage();
Expand Down
Expand Up @@ -48,39 +48,39 @@
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.lang.ref.WeakReference;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConverter;

@CoreUnitTest
@SuppressWarnings("unchecked")
class RqueueExecutorTest extends TestBase {
private RqueueMessageListenerContainer container = mock(RqueueMessageListenerContainer.class);
private WeakReference<RqueueMessageListenerContainer> containerWeakReference =
private final RqueueMessageListenerContainer container =
mock(RqueueMessageListenerContainer.class);
private final WeakReference<RqueueMessageListenerContainer> containerWeakReference =
new WeakReference<>(container);
private RqueueWebConfig rqueueWebConfig = new RqueueWebConfig();
private RqueueConfig rqueueConfig = mock(RqueueConfig.class);
private RqueueMessageMetadataService rqueueMessageMetadataService =
private final RqueueWebConfig rqueueWebConfig = new RqueueWebConfig();
private final RqueueConfig rqueueConfig = mock(RqueueConfig.class);
private final RqueueMessageMetadataService rqueueMessageMetadataService =
mock(RqueueMessageMetadataService.class);
private RqueueRedisTemplate<String> stringRqueueRedisTemplate = mock(RqueueRedisTemplate.class);
private RqueueJobDao rqueueJobDao = mock(RqueueJobDao.class);
private RqueueStringDao rqueueStringDao = mock(RqueueStringDao.class);
private TestMessageProcessor deadLetterProcessor = new TestMessageProcessor();
private TestMessageProcessor discardProcessor = new TestMessageProcessor();
private TestMessageProcessor preProcessMessageProcessor = new TestMessageProcessor();
private RqueueMessageTemplate messageTemplate = mock(RqueueMessageTemplate.class);
private RqueueMessageHandler messageHandler = mock(RqueueMessageHandler.class);
private final RqueueRedisTemplate<String> stringRqueueRedisTemplate =
mock(RqueueRedisTemplate.class);
private final RqueueJobDao rqueueJobDao = mock(RqueueJobDao.class);
private final RqueueStringDao rqueueStringDao = mock(RqueueStringDao.class);
private final TestMessageProcessor deadLetterProcessor = new TestMessageProcessor();
private final TestMessageProcessor discardProcessor = new TestMessageProcessor();
private final TestMessageProcessor preProcessMessageProcessor = new TestMessageProcessor();
private final RqueueMessageTemplate messageTemplate = mock(RqueueMessageTemplate.class);
private final RqueueMessageHandler messageHandler = mock(RqueueMessageHandler.class);
private RqueueMessage rqueueMessage = new RqueueMessage();
private final Semaphore semaphore = new Semaphore(100);
private final TaskExecutionBackOff taskBackOff = new FixedTaskExecutionBackOff();
private PostProcessingHandler postProcessingHandler;
private ApplicationEventPublisher applicationEventPublisher =
private final ApplicationEventPublisher applicationEventPublisher =
mock(ApplicationEventPublisher.class);
private final RqueueSystemConfigDao rqueueSystemConfigDao = mock(RqueueSystemConfigDao.class);
private final String queueName = "test-queue";
Expand All @@ -89,7 +89,16 @@ class RqueueExecutorTest extends TestBase {

@BeforeEach
public void init() throws IllegalAccessException {
rqueueMessage.setMessage("test message");
MessageConverter messageConverter = new GenericMessageConverter();
rqueueMessage =
RqueueMessageUtils.buildMessage(
messageConverter,
queueName,
payload,
null,
null,
RqueueMessageHeaders.emptyMessageHeaders());
defaultMessageMetadata = new MessageMetadata(rqueueMessage, MessageStatus.ENQUEUED);
MessageProcessorHandler messageProcessorHandler =
new MessageProcessorHandler(null, deadLetterProcessor, discardProcessor, null);
postProcessingHandler =
Expand All @@ -113,13 +122,6 @@ public void init() throws IllegalAccessException {
.when(messageHandler)
.handleMessage(any());
doReturn(1).when(rqueueConfig).getRetryPerPoll();
rqueueMessage = new RqueueMessage();
Message<String> message =
(Message<String>)
messageConverter.toMessage("test message", RqueueMessageHeaders.emptyMessageHeaders());
rqueueMessage.setMessage(message.getPayload());
rqueueMessage.setQueueName(queueName);
defaultMessageMetadata = new MessageMetadata(rqueueMessage, MessageStatus.ENQUEUED);
}

@Test
Expand Down Expand Up @@ -262,6 +264,39 @@ public boolean process(Object message) {
.removeElementFromZset(queueDetail.getProcessingQueueName(), rqueueMessage);
}

@Test
void testHandlePeriodicMessage() {
QueueDetail queueDetail = TestUtils.createQueueDetail(queueName);
long period = 10000L;
RqueueMessage message =
rqueueMessage.toBuilder().period(period).processAt(System.currentTimeMillis()).build();
RqueueMessage newMessage = message.toBuilder().processAt(message.nextProcessAt()).build();
String messageKey =
"__rq::queue::"
+ queueName
+ Constants.REDIS_KEY_SEPARATOR
+ message.getId()
+ Constants.REDIS_KEY_SEPARATOR
+ "sch"
+ Constants.REDIS_KEY_SEPARATOR
+ newMessage.getProcessAt();
doReturn(defaultMessageMetadata)
.when(rqueueMessageMetadataService)
.getOrCreateMessageMetadata(any());
new RqueueExecutor(
containerWeakReference,
rqueueConfig,
postProcessingHandler,
message,
queueDetail,
semaphore)
.run();
verify(messageTemplate, times(1))
.scheduleMessage(
eq(queueDetail.getDelayedQueueName()), eq(messageKey), eq(newMessage), any());
verify(messageHandler, times(1)).handleMessage(any());
}

private class TestMessageProcessor implements MessageProcessor {
private int count;

Expand Down

0 comments on commit b1cc9ed

Please sign in to comment.