Skip to content

Commit

Permalink
* Provide a way to enqueue unique messages #27
Browse files Browse the repository at this point in the history
* Provide a way to add HTTP proxy for external api calls #34
* Provide a way to fetch messages based on the id and queue name
* Provide api to delete any enqueued messages #33
  • Loading branch information
sonus21 committed Aug 23, 2020
1 parent 4c9c5c8 commit 29a778c
Show file tree
Hide file tree
Showing 63 changed files with 1,844 additions and 351 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.QueueRegistry;
import com.github.sonus21.rqueue.core.EndpointRegistry;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageSender;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
Expand Down Expand Up @@ -119,7 +119,7 @@ protected void enqueueIn(Object message, String zsetName, long delay) {
}

protected Map<String, List<RqueueMessage>> getMessageMap(String queueName) {
QueueDetail queueDetail = QueueRegistry.get(queueName);
QueueDetail queueDetail = EndpointRegistry.get(queueName);
Map<String, List<RqueueMessage>> queueNameToMessage = new HashMap<>();
List<RqueueMessage> messages =
rqueueMessageTemplate.readFromList(queueDetail.getQueueName(), 0, -1);
Expand Down Expand Up @@ -160,7 +160,7 @@ protected void printQueueStats(List<String> queueNames) {
}

protected void cleanQueue(String queue) {
QueueDetail queueDetail = QueueRegistry.get(queue);
QueueDetail queueDetail = EndpointRegistry.get(queue);
stringRqueueRedisTemplate.delete(queueDetail.getQueueName());
stringRqueueRedisTemplate.delete(queueDetail.getDelayedQueueName());
stringRqueueRedisTemplate.delete(queueDetail.getProcessingQueueName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.github.sonus21.rqueue.config;

import com.github.sonus21.rqueue.utils.StringUtils;
import java.net.Proxy;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
Expand All @@ -32,28 +34,31 @@ public class RqueueConfig {
private final boolean sharedConnection;
private final int dbVersion;

@Value("${rqueue.version:2.0.4}")
@Value("${rqueue.version:2.1.0}")
private String version;

@Value("${rqueue.version.enabled:true}")
private boolean versionEnabled;

@Value("${rqueue.key.prefix:__rq::}")
private String prefix;

@Value("${rqueue.cluster.mode:true}")
private boolean clusterMode;

@Value("${rqueue.simple.queue.prefix:queue::}")
@Value("${rqueue.simple.queue.prefix:}")
private String simpleQueuePrefix;

@Value("${rqueue.delayed.queue.prefix:d-queue::}")
@Value("${rqueue.delayed.queue.prefix:}")
private String delayedQueuePrefix;

@Value("${rqueue.delayed.queue.channel.prefix:d-channel::}")
@Value("${rqueue.delayed.queue.channel.prefix:}")
private String delayedQueueChannelPrefix;

@Value("${rqueue.processing.queue.name.prefix:p-queue::}")
@Value("${rqueue.processing.queue.name.prefix:}")
private String processingQueuePrefix;

@Value("${rqueue.processing.queue.channel.prefix:p-channel::}")
@Value("${rqueue.processing.queue.channel.prefix:}")
private String processingQueueChannelPrefix;

@Value("${rqueue.queues.key.suffix:queues}")
Expand All @@ -77,43 +82,105 @@ public class RqueueConfig {
@Value("${rqueue.default.queue.with.queue.level.priority:-1}")
private int defaultQueueWithQueueLevelPriority;

@Value("${rqueue.net.proxy.host:}")
private String proxyHost;

@Value("${rqueue.net.proxy.port:}")
private Integer proxyPort;

@Value("${rqueue.net.proxy.type:HTTP}")
private Proxy.Type proxyType;

@Value("${rqueue.message.durability:10080}")
private long messageDurabilityInMinute;

public String getQueuesKey() {
return prefix + queuesKeySuffix;
}

private String getSimpleQueueSuffix() {
if (!StringUtils.isEmpty(simpleQueuePrefix)) {
return simpleQueuePrefix;
}
if (dbVersion == 2) {
return "queue::";
}
return "queue-v2::";
}

private String getDelayedQueueSuffix() {
if (!StringUtils.isEmpty(delayedQueuePrefix)) {
return delayedQueuePrefix;
}
if (dbVersion == 2) {
return "d-queue::";
}
return "d-queue-v2::";
}

private String getDelayedQueueChannelSuffix() {
if (!StringUtils.isEmpty(delayedQueueChannelPrefix)) {
return delayedQueueChannelPrefix;
}
if (dbVersion == 2) {
return "d-channel::";
}
return "d-channel-v2::";
}

private String getProcessingQueueSuffix() {
if (!StringUtils.isEmpty(processingQueuePrefix)) {
return processingQueuePrefix;
}
if (dbVersion == 2) {
return "p-queue::";
}
return "p-queue-v2::";
}

private String getProcessingQueueChannelSuffix() {
if (!StringUtils.isEmpty(processingQueueChannelPrefix)) {
return processingQueueChannelPrefix;
}
if (dbVersion == 2) {
return "p-channel::";
}
return "p-channel-v2::";
}

public String getQueueName(String queueName) {
if (dbVersion >= 2) {
return prefix + simpleQueuePrefix + getTaggedName(queueName);
if (dbVersion == 1) {
return queueName;
}
return queueName;
return prefix + getSimpleQueueSuffix() + getTaggedName(queueName);
}

public String getDelayedQueueName(String queueName) {
if (dbVersion >= 2) {
return prefix + delayedQueuePrefix + getTaggedName(queueName);
if (dbVersion == 1) {
return "rqueue-delay::" + queueName;
}
return "rqueue-delay::" + queueName;
return prefix + getDelayedQueueSuffix() + getTaggedName(queueName);
}

public String getDelayedQueueChannelName(String queueName) {
if (dbVersion >= 2) {
return prefix + delayedQueueChannelPrefix + getTaggedName(queueName);
if (dbVersion == 1) {
return "rqueue-channel::" + queueName;
}
return "rqueue-channel::" + queueName;
return prefix + getDelayedQueueChannelSuffix() + getTaggedName(queueName);
}

public String getProcessingQueueName(String queueName) {
if (dbVersion >= 2) {
return prefix + processingQueuePrefix + getTaggedName(queueName);
if (dbVersion == 1) {
return "rqueue-processing::" + queueName;
}
return "rqueue-processing::" + queueName;
return prefix + getProcessingQueueSuffix() + getTaggedName(queueName);
}

public String getProcessingQueueChannelName(String queueName) {
if (dbVersion >= 2) {
return prefix + processingQueueChannelPrefix + getTaggedName(queueName);
if (dbVersion == 1) {
return "rqueue-processing-channel::" + queueName;
}
return "rqueue-processing-channel::" + queueName;
return prefix + getProcessingQueueChannelSuffix() + getTaggedName(queueName);
}

public String getLockKey(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.github.sonus21.rqueue.core.DelayedMessageScheduler;
import com.github.sonus21.rqueue.core.ProcessingMessageScheduler;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.RqueueMessageTemplateImpl;
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory;
import com.github.sonus21.rqueue.utils.RedisUtils;
import com.github.sonus21.rqueue.web.view.DateTimeFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.github.sonus21.rqueue.annotation.RqueueListener;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.RqueueMessageTemplateImpl;
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
import com.github.sonus21.rqueue.core.support.MessageProcessor;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ protected long getNextScheduleTime(String queueName, Long value) {

@Override
protected String getChannelName(String queueName) {
return QueueRegistry.get(queueName).getDelayedQueueChannelName();
return EndpointRegistry.get(queueName).getDelayedQueueChannelName();
}

@Override
protected String getZsetName(String queueName) {
return QueueRegistry.get(queueName).getDelayedQueueName();
return EndpointRegistry.get(queueName).getDelayedQueueName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import java.util.function.Function;
import java.util.stream.Collectors;

public class QueueRegistry {
public final class EndpointRegistry {
private static final Object lock = new Object();
private static Map<String, QueueDetail> queueNameToDetail = new HashMap<>();
private static final Map<String, QueueDetail> queueNameToDetail = new HashMap<>();

QueueRegistry() {}
private EndpointRegistry() {}

public static QueueDetail get(String queueName) {
QueueDetail queueDetail = queueNameToDetail.get(queueName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ protected synchronized void schedule(String queueName, Long startTime, boolean f
queueNameToLastMessageSeenTime.put(queueName, currentTime);

ScheduledTaskDetail scheduledTaskDetail = queueNameToScheduledTask.get(queueName);
QueueDetail queueDetail = QueueRegistry.get(queueName);
QueueDetail queueDetail = EndpointRegistry.get(queueName);
String zsetName = getZsetName(queueName);

if (scheduledTaskDetail == null || forceSchedule) {
Expand Down Expand Up @@ -241,7 +241,7 @@ protected synchronized void schedule(String queueName, Long startTime, boolean f

@SuppressWarnings("unchecked")
protected void initialize() {
List<String> queueNames = QueueRegistry.getActiveQueues();
List<String> queueNames = EndpointRegistry.getActiveQueues();
defaultScriptExecutor = new DefaultScriptExecutor<>(redisTemplate);
redisScript = (RedisScript<Long>) RedisScriptFactory.getScript(ScriptType.PUSH_MESSAGE);
queueRunningState = new ConcurrentHashMap<>(queueNames.size());
Expand All @@ -262,7 +262,7 @@ protected void initialize() {
public void onApplicationEvent(RqueueBootstrapEvent event) {
doStop();
if (event.isStart()) {
if (QueueRegistry.getActiveQueueCount() == 0) {
if (EndpointRegistry.getActiveQueueCount() == 0) {
getLogger().warn("No queues are configured");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ProcessingMessageScheduler extends MessageScheduler {
@Override
protected void initialize() {
super.initialize();
List<QueueDetail> queueDetails = QueueRegistry.getActiveQueueDetails();
List<QueueDetail> queueDetails = EndpointRegistry.getActiveQueueDetails();
this.queueNameToDelay = new ConcurrentHashMap<>(queueDetails.size());
for (QueueDetail queueDetail : queueDetails) {
this.queueNameToDelay.put(queueDetail.getName(), queueDetail.getVisibilityTimeout());
Expand All @@ -46,12 +46,12 @@ protected Logger getLogger() {

@Override
protected String getChannelName(String queueName) {
return QueueRegistry.get(queueName).getProcessingQueueChannelName();
return EndpointRegistry.get(queueName).getProcessingQueueChannelName();
}

@Override
protected String getZsetName(String queueName) {
return QueueRegistry.get(queueName).getProcessingQueueName();
return EndpointRegistry.get(queueName).getProcessingQueueName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

@SuppressWarnings("unchecked")
@ToString
class RedisScriptFactory {
static RedisScript getScript(ScriptType type) {
public class RedisScriptFactory {
public static RedisScript getScript(ScriptType type) {
Resource resource = new ClassPathResource(type.getPath());
DefaultRedisScript script = new DefaultRedisScript();
script.setLocation(resource);
Expand All @@ -48,7 +48,7 @@ static RedisScript getScript(ScriptType type) {
}
}

enum ScriptType {
public enum ScriptType {
ADD_MESSAGE("scripts/add_message.lua"),
POP_MESSAGE("scripts/pop_message.lua"),
MOVE_MESSAGE("scripts/move_message.lua"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.core;

import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.utils.PriorityUtils;
import java.util.List;

public interface RqueueEndpointManager {

/**
* Use this method to register any queue, that's only used for sending message.
*
* @param name name of the queue
* @param priorities list of priorities to be used while sending message on this queue.
*/
void registerQueue(String name, String... priorities);

default boolean isQueueRegistered(String queueName, String priority) {
return isQueueRegistered(PriorityUtils.getQueueNameForPriority(queueName, priority));
}

boolean isQueueRegistered(String queueName);

List<QueueDetail> getQueueConfig(String queueName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
public class RqueueMessage extends SerializableBase implements Cloneable {

private static final long serialVersionUID = -3488860960637488519L;
/**
* The message id, each message has a unique id, generated using
*/
private String id;
private String queueName;
private String message;
Expand All @@ -39,16 +42,22 @@ public class RqueueMessage extends SerializableBase implements Cloneable {
private Long reEnqueuedAt;
private int failureCount;

private void initTime(Long delay) {
// Monotonic increasing queued time
// This is used to check duplicate message in executor
this.queuedTime = System.nanoTime();
this.processAt = System.currentTimeMillis();
if (delay != null) {
this.processAt += delay;
}
}

public RqueueMessage(String queueName, String message, Integer retryCount, Long delay) {
this.queueName = queueName;
this.message = message;
this.retryCount = retryCount;
this.queuedTime = System.currentTimeMillis();
this.id = UUID.randomUUID().toString();
this.processAt = this.queuedTime;
if (delay != null) {
this.processAt += delay;
}
initTime(delay);
}

public void updateReEnqueuedAt() {
Expand Down

0 comments on commit 29a778c

Please sign in to comment.