Skip to content

Commit

Permalink
updated readme
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed May 14, 2020
1 parent 659ee4d commit 0276268
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 47 deletions.
9 changes: 3 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# Changelog
All notable changes to Rqueue project will be documented in this file.
# [Rqueue] New and Notable Changes

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
**NOTE**: The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [2.0.1] - 17-May-2020

### Added
- Allow registering a queue, that can be in push only mode
- Apis to schedule task at the given time
- Allow the delay to be provided in different formats.
- Refine enqueueIn apis to support Duration and TimeUnit

### Fixes
- Arguments mismatch due to multiple class loaders.
Expand Down Expand Up @@ -49,7 +47,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* **Fixed** Bootstrap issue due to optional dependencies of micrometer

## [1.3] - 11-Dec-2019
### Added
* Expose multiple queue metrics using micrometer. (queue-size, delay queue size, processing queue size, dead letter queue size, execution counter, failure counter)
* An api to move messages from dead letter queue to other queue. (Any source queue to target queue).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import com.github.sonus21.rqueue.config.SimpleRqueueListenerContainerFactory;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.utils.Constants;
import io.lettuce.core.ReadFrom;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
Expand Down Expand Up @@ -56,6 +58,9 @@ public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
@Bean
public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory(
RqueueMessageHandler rqueueMessageHandler) {
LettuceClientConfiguration lettuceClientConfiguration =
LettuceClientConfiguration.builder().readFrom(ReadFrom.MASTER).build();

RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
List<RedisNode> redisNodes = new ArrayList<>();
redisNodes.add(new RedisNode("127.0.0.1", 9000));
Expand All @@ -66,7 +71,7 @@ public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory
redisNodes.add(new RedisNode("127.0.0.1", 9005));
redisClusterConfiguration.setClusterNodes(redisNodes);
LettuceConnectionFactory lettuceConnectionFactory =
new LettuceConnectionFactory(redisClusterConfiguration);
new LettuceConnectionFactory(redisClusterConfiguration, lettuceClientConfiguration);
lettuceConnectionFactory.afterPropertiesSet();
SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory =
new SimpleRqueueListenerContainerFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@
*/
public interface RqueueMessageSender {
/**
* Enqueue a message on given queue without any delay, listener would try to consume this message
* immediately but due to heavy load message consumption can be delayed if message producer rate
* is higher than the rate at consumer consume the messages.
* Enqueue a message on given queue without any delay, consume as soon as possible.
*
* @deprecated migrate to {@link #enqueue(String, Object)}
* @param queueName on which queue message has to be send
Expand All @@ -59,7 +57,7 @@ default boolean put(String queueName, Object message) {
}

/**
* Enqueue a message on given queue without any delay, execute as soon as possible.
* Enqueue a message on given queue without any delay, consume as soon as possible.
*
* @param queueName on which queue message has to be send
* @param message message object it could be any arbitrary object.
Expand All @@ -68,8 +66,8 @@ default boolean put(String queueName, Object message) {
boolean enqueue(String queueName, Object message);

/**
* Enqueue a message on the given with the provided delay. It will be executed as soon as the
* delay elapse.
* Enqueue a message on the given queue with the provided delay. It will be available to consume
* as soon as the delay elapse.
*
* @param queueName on which queue message has to be send
* @param message message object it could be any arbitrary object.
Expand All @@ -82,8 +80,8 @@ default boolean put(String queueName, Object message, long delayInMilliSecs) {
}

/**
* Schedule a message on the given with the provided delay. It will be executed as soon as the
* delay elapse.
* Schedule a message on the given queue with the provided delay. It will be available to consume
* as soon as the delay elapse.
*
* @param queueName on which queue message has to be send
* @param message message object it could be any arbitrary object.
Expand All @@ -93,8 +91,8 @@ default boolean put(String queueName, Object message, long delayInMilliSecs) {
boolean enqueueIn(String queueName, Object message, long delayInMilliSecs);

/**
* Schedule a message on the given with the provided delay. It will be executed as soon as the
* delay elapse.
* Schedule a message on the given queue with the provided delay. It will be available to consume
* as soon as the delay elapse.
*
* @param queueName on which queue message has to be send
* @param message message object it could be any arbitrary object.
Expand All @@ -106,8 +104,8 @@ default boolean enqueueIn(String queueName, Object message, Duration delay) {
}

/**
* Schedule a message on the given with the provided delay. It will be executed as soon as the
* delay elapse.
* Schedule a message on the given queue with the provided delay. It will be available to consume
* as soon as the specified delay elapse.
*
* @param queueName on which queue message has to be send
* @param message message object it could be any arbitrary object.
Expand All @@ -120,8 +118,8 @@ default boolean enqueueIn(String queueName, Object message, long delay, TimeUnit
}

/**
* Schedule a message on the given at the provided time. It will be executed as soon as the given
* time is reached.
* Schedule a message on the given queue at the provided time. It will be available to consume as
* soon as the given time is reached.
*
* @param queueName on which queue message has to be send
* @param message message object it could be any arbitrary object.
Expand All @@ -133,8 +131,8 @@ default boolean enqueueAt(String queueName, Object message, long startTimeInMill
}

/**
* Schedule a message on the given at the provided time. It will be executed as soon as the given
* time is reached.
* Schedule a message on the given queue at the provided time. It will be available to consume as
* soon as the given time is reached.
*
* @param queueName on which queue message has to be send
* @param message message object it could be any arbitrary object.
Expand All @@ -146,8 +144,8 @@ default boolean enqueueAt(String queueName, Object message, Instant starTime) {
}

/**
* Schedule a message on the given at the provided time. It will be executed as soon as the given
* time is reached.
* Schedule a message on the given queue at the provided time. It will be available to consume as
* soon as the given time is reached.
*
* @param queueName on which queue message has to be send
* @param message message object it could be any arbitrary object.
Expand All @@ -159,8 +157,8 @@ default boolean enqueueAt(String queueName, Object message, Date starTime) {
}

/**
* Enqueue a message on the given with the given retry count. This message would not be consumed
* more than the specified time due to failure in underlying systems.
* Enqueue a message on the given queue with the given retry count. This message would not be
* consumed more than the specified time due to failure in underlying systems.
*
* @deprecated migrate to {@link #enqueueWithPriority(String, String, Object)}
* @param queueName on which queue message has to be send
Expand All @@ -175,8 +173,8 @@ default boolean put(String queueName, Object message, int retryCount) {
}

/**
* Enqueue a message on the given with the given retry count. This message would not be consumed
* more than the specified time due to failure in underlying systems.
* Enqueue a message on the given queue with the given retry count. This message would not be
* consumed more than the specified time due to failure in underlying systems.
*
* @param queueName on which queue message has to be send
* @param message message object it could be any arbitrary object.
Expand All @@ -187,7 +185,8 @@ default boolean put(String queueName, Object message, int retryCount) {
boolean enqueueWithRetry(String queueName, Object message, int retryCount);

/**
* Enqueue a task that would be scheduled to run in the specified milli seconds.
* Enqueue a message on the given queue that would be scheduled to run in the specified milli
* seconds.
*
* @deprecated migrate to {@link #enqueueInWithRetry(String, Object, int, long)}
* @param queueName on which queue message has to be send
Expand All @@ -204,7 +203,7 @@ default boolean put(String queueName, Object message, int retryCount, long delay
}

/**
* Enqueue a task that would be scheduled to run in the specified milli seconds. *
* Enqueue a task that would be scheduled to run in the specified milli seconds.
*
* @param queueName on which queue message has to be send
* @param message message object it could be any arbitrary object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private boolean pushMessage(
rqueueMessage);
}
} catch (Exception e) {
log.error("Message could not be pushed ", e);
log.error("Queue: {} Message {} could not be pushed {}", queueName, rqueueMessage, e);
return false;
}
return true;
Expand All @@ -222,29 +222,32 @@ private List<MessageConverter> getMessageConverters(
}

@Override
public void registerQueue(String name, String... priorities) {
public void registerQueue(String queueName, String... priorities) {
validateQueue(queueName);
notNull(priorities, "priorities cannot be null");
QueueDetail queueDetail =
QueueDetail.builder()
.name(name)
.name(queueName)
.active(false)
.queueName(rqueueConfig.getQueueName(name))
.delayedQueueName(rqueueConfig.getDelayedQueueName(name))
.delayedQueueChannelName(rqueueConfig.getDelayedQueueChannelName(name))
.processingQueueName(rqueueConfig.getProcessingQueueName(name))
.processingQueueChannelName(rqueueConfig.getProcessingQueueChannelName(name))
.queueName(rqueueConfig.getQueueName(queueName))
.delayedQueueName(rqueueConfig.getDelayedQueueName(queueName))
.delayedQueueChannelName(rqueueConfig.getDelayedQueueChannelName(queueName))
.processingQueueName(rqueueConfig.getProcessingQueueName(queueName))
.processingQueueChannelName(rqueueConfig.getProcessingQueueChannelName(queueName))
.build();
QueueRegistry.register(queueDetail);
for (String priority : priorities) {
String suffix = PriorityUtils.getSuffix(priority);
queueDetail =
QueueDetail.builder()
.name(name + suffix)
.name(queueName + suffix)
.active(false)
.queueName(rqueueConfig.getQueueName(name) + suffix)
.delayedQueueName(rqueueConfig.getDelayedQueueName(name) + suffix)
.delayedQueueChannelName(rqueueConfig.getDelayedQueueChannelName(name) + suffix)
.processingQueueName(rqueueConfig.getProcessingQueueName(name) + suffix)
.processingQueueChannelName(rqueueConfig.getProcessingQueueChannelName(name) + suffix)
.queueName(rqueueConfig.getQueueName(queueName) + suffix)
.delayedQueueName(rqueueConfig.getDelayedQueueName(queueName) + suffix)
.delayedQueueChannelName(rqueueConfig.getDelayedQueueChannelName(queueName) + suffix)
.processingQueueName(rqueueConfig.getProcessingQueueName(queueName) + suffix)
.processingQueueChannelName(
rqueueConfig.getProcessingQueueChannelName(queueName) + suffix)
.build();
QueueRegistry.register(queueDetail);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ private void callMessageProcessor(TaskStatus status, RqueueMessage rqueueMessage
}
}

private void updateCounter(boolean failOrExecution) {
private void updateCounter(boolean fail) {
RqueueCounter rqueueCounter = Objects.requireNonNull(container.get()).getRqueueCounter();
if (rqueueCounter == null) {
return;
}
if (failOrExecution) {
if (fail) {
rqueueCounter.updateFailureCount(queueDetail.getName());
} else {
rqueueCounter.updateExecutionCount(queueDetail.getName());
Expand Down Expand Up @@ -362,6 +362,8 @@ void start() {
updateCounter(true);
failureCount += 1;
} catch (Exception e) {
updateCounter(true);
failureCount += 1;
log(Level.ERROR, "Message execution failed", e);
}
retryCount--;
Expand Down

0 comments on commit 0276268

Please sign in to comment.