Skip to content

Commit

Permalink
Readme files and revert thread limit change
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed Apr 30, 2021
1 parent 8e7f6b9 commit bc86e40
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 142 deletions.
61 changes: 35 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<div>
<img align="left" src="https://raw.githubusercontent.com/sonus21/rqueue/master/rqueue-core/src/main/resources/public/rqueue/img/android-chrome-192x192.png" alt="Rqueue Logo" width="90">
<h1 style="float:left">Rqueue: Redis Queue,Task Queue, Delayed Queue for Spring and Spring Boot</h1>
<h1 style="float:left">Rqueue: Redis Queue, Task Queue, Delayed Queue for Spring and Spring Boot</h1>
</div>

[![Build Status](https://circleci.com/gh/sonus21/rqueue/tree/master.svg?style=shield)](https://circleci.com/gh/sonus21/rqueue/tree/master)
Expand All @@ -9,9 +9,9 @@
[![Javadoc](https://javadoc.io/badge2/com.github.sonus21/rqueue-core/javadoc.svg)](https://javadoc.io/doc/com.github.sonus21/rqueue-core)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)

**Rqueue** is an asynchronous task executor(worker) built for spring(boot) framework based on the spring
framework's messaging library backed by Redis. It can be used as message broker as well, where all
services code is in Spring.
**Rqueue** is an asynchronous task executor(worker) built for spring and spring-boot framework based
on the spring framework's messaging library backed by Redis. It can be used as message broker as
well, where all services code is in Spring.

<br/>

Expand All @@ -20,28 +20,32 @@ services code is in Spring.
## Features

* **Message Scheduling** : A message can be scheduled for any arbitrary period
* **Competing Consumers** multiple messages can be consumed in parallel by different workers.
* **Unique message** : Unique message processing for a queue based on the message id
* **Periodic message** : Process same message at certain interval
* **Message delivery**: It's guaranteed that a message is consumed **at least once**. (Message
would be consumed by a worker more than once due to the failure in the underlying
worker/restart-process etc, otherwise exactly one delivery)
* **Redis cluster** : Redis cluster can be used with Lettuce client.
* **Metrics** : In flight messages, waiting for consumption and delayed messages
* **Web Dashboard**: Web dashboard to manage a queue and queue insights including latency
* **Message retry**: Message would be retried automatically on application crash/failure/restart
etc.
* **Automatic message serialization and deserialization**
* **Concurrency**: Concurrency of any queue can be configured
* **Queue Priority** :
* **Message Multicasting**: Call multiple message listeners on every message
* **Metrics** : In flight messages, waiting for consumption and delayed messages
* **Competing Consumers** multiple messages can be consumed in parallel by different
workers/listeners.
* **Concurrency**: Concurrency of any listener can be configured
* **Queue Priority**:
* Group level queue priority(weighted and strict)
* Sub queue priority(weighted and strict)
* **Long execution job**: Long running jobs can check in periodically.
* **Execution Backoff** : Exponential and fixed back off (default fixed back off)
* **Middleware**: Add one or more middleware, middlewares are called before listener method.
* **Callbacks** : Callbacks for dead letter queue, discard etc
* **Events** 1. Bootstrap event 2. Task execution event.
* **Unique message** : Unique message processing for a queue based on the message id
* **Periodic message** : Process same message at certain interval
* **Redis connection**: A different redis setup can be used for Rqueue
* **Long execution job**: Long running jobs can check in periodically.
* **Middleware**: Add one or more middleware, middlewares are called before listener method.
* **Message Multicasting**: Call multiple message listeners on very message
* **Redis cluster** : Redis cluster can be used with Lettuce client.
* **Redis Sentinel** : Redis sentinel can be used with Rqueue.
* **Reactive Programming**: Supports reactive Redis and spring webflux
* **Web Dashboard**: Web dashboard to manage a queue and queue insights including latency

### Requirements

Expand Down Expand Up @@ -216,25 +220,30 @@ public class MessageListener {

## Queue Statistics

Micrometer based dashboard for queue

[![Grafana Dashboard](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/grafana-dashboard.png)](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/grafana-dashboard.png)

## Dashboard
## Web

Link: [http://localhost:8080/rqueue](http://localhost:8080/rqueue)

[![Execution Page](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/stats-graph.png)](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/stats-graph.png)
<br/>
<br/>
#### Dashboard

[![Dashboard](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/stats-graph.png)](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/stats-graph.png)

#### Message Waiting For Execution

[![Explore Queue](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/queue-explore.png)](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/queue-explore.png)

#### Recent jobs details

<br/>
[![Jobs](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/jobs.png)](https://raw.githubusercontent.com/sonus21/rqueue/master/docs/static/jobs.png)

## Status

Rqueue is stable and production ready, it's processing thousands of messages daily in production
deployments.
environment.

**We would love to add your organization name here, if you're one of the Rqueue users, please raise
a
Expand All @@ -260,11 +269,11 @@ environment variables for Spring versions.

**Please format your code with Google Java formatter.**

```bash
// springBootVersion = '2.2.0.RELEASE'
// springVersion = '5.2.0.RELEASE'
// springDataVersion = '2.2.0.RELEASE'
// microMeterVersion = '1.3.2'
```groovy
// springBootVersion = '2.0.6.RELEASE'
// springVersion = '5.0.10.RELEASE'
// springDataVersion = '2.0.6.RELEASE'
// microMeterVersion = '1.1.0'
```

## Links
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,8 @@ public class SimpleRqueueListenerContainerFactory {
private long pollingInterval = 200L;
// In case of failure how much time, we should wait for next job
private long backOffTime = 5 * Constants.ONE_MILLI;

// Thread pool size for listener method invocation
// maxNumWorkers = min(50, number of cpu * 10, 2 * number of queues)
// Number of workers requires for execution
private Integer maxNumWorkers;
// Thread pool size of message poller, Rqueue submits a task to a thread pool executor that polls
// the Redis at polling interval. default value is calculated as
// maxNumPollers = min(10, number of cpu * 5, number of queues)
private Integer maxNumPollers;

// This message processor would be called before a task can start execution.
// It needs to be noted that this message processor would be called multiple time
Expand Down Expand Up @@ -317,9 +311,8 @@ public RqueueMessageListenerContainer createMessageListenerContainer() {
notNull(getRqueueMessageHandler(), "rqueueMessageHandler must not be null");
notNull(redisConnectionFactory, "redisConnectionFactory must not be null");
if (rqueueMessageTemplate == null) {
rqueueMessageTemplate =
new RqueueMessageTemplateImpl(
getRedisConnectionFactory(), getReactiveRedisConnectionFactory());
rqueueMessageTemplate = new RqueueMessageTemplateImpl(getRedisConnectionFactory(),
getReactiveRedisConnectionFactory());
}
RqueueMessageListenerContainer messageListenerContainer =
new RqueueMessageListenerContainer(getRqueueMessageHandler(), rqueueMessageTemplate);
Expand All @@ -330,9 +323,6 @@ public RqueueMessageListenerContainer createMessageListenerContainer() {
if (maxNumWorkers != null) {
messageListenerContainer.setMaxNumWorkers(maxNumWorkers);
}
if (getMaxNumPollers() != null) {
messageListenerContainer.setMaxNumPollers(maxNumPollers);
}
messageListenerContainer.setBackOffTime(getBackOffTime());
messageListenerContainer.setPollingInterval(getPollingInterval());
if (postExecutionMessageProcessor != null) {
Expand Down Expand Up @@ -560,25 +550,4 @@ public void setReactiveRedisConnectionFactory(
notNull(reactiveRedisConnectionFactory, "reactiveRedisConnectionFactory can not be null");
this.reactiveRedisConnectionFactory = reactiveRedisConnectionFactory;
}

/**
* The configured message poller thread pool size
*
* @return thread pool size
*/
public Integer getMaxNumPollers() {
return maxNumPollers;
}

/**
* Set thread pool size for message poller.
*
* @param maxNumPollers message poller thread pool size
*/
public void setMaxNumPollers(int maxNumPollers) {
if (maxNumPollers < 1) {
throw new IllegalArgumentException("maxNumPollers must be greater than equal to 1");
}
this.maxNumPollers = maxNumPollers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@

package com.github.sonus21.rqueue.listener;

import static com.github.sonus21.rqueue.utils.Constants.CPU_MULTIPLICATION_FACTOR_FOR_POLLER_POOL_SIZE;
import static com.github.sonus21.rqueue.utils.Constants.CPU_MULTIPLICATION_FACTOR_FOR_WORKER_POOL_SIZE;
import static com.github.sonus21.rqueue.utils.Constants.DEFAULT_WORKER_COUNT_PER_QUEUE;
import static com.github.sonus21.rqueue.utils.Constants.MAX_POLLER_POOL_SIZE;
import static com.github.sonus21.rqueue.utils.Constants.MAX_WORKER_POOL_SIZE;
import static com.github.sonus21.rqueue.utils.ThreadUtils.waitForTermination;
import static com.github.sonus21.rqueue.utils.ThreadUtils.waitForWorkerTermination;
import static java.lang.Math.min;
import static org.springframework.util.Assert.notEmpty;
import static org.springframework.util.Assert.notNull;

Expand All @@ -43,7 +38,6 @@
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.StringUtils;
import com.github.sonus21.rqueue.utils.SystemUtil;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.github.sonus21.rqueue.utils.ThreadUtils.QueueThread;
import com.github.sonus21.rqueue.utils.backoff.FixedTaskExecutionBackOff;
Expand Down Expand Up @@ -103,17 +97,23 @@ public class RqueueMessageListenerContainer
@Autowired(required = false)
private RqueueMetricsCounter rqueueMetricsCounter;

@Autowired private ApplicationEventPublisher applicationEventPublisher;
@Autowired private RqueueWebConfig rqueueWebConfig;
@Autowired private RqueueConfig rqueueConfig;
@Autowired private RqueueMessageMetadataService rqueueMessageMetadataService;
@Autowired private RqueueSystemConfigDao rqueueSystemConfigDao;
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Autowired
private RqueueWebConfig rqueueWebConfig;
@Autowired
private RqueueConfig rqueueConfig;
@Autowired
private RqueueMessageMetadataService rqueueMessageMetadataService;
@Autowired
private RqueueSystemConfigDao rqueueSystemConfigDao;
private AsyncTaskExecutor taskExecutor;
@Autowired private RqueueJobDao rqueueJobDao;
@Autowired private RqueueStringDao rqueueStringDao;
@Autowired
private RqueueJobDao rqueueJobDao;
@Autowired
private RqueueStringDao rqueueStringDao;
private List<Middleware> middlewares;
private Integer maxNumWorkers;
private Integer maxNumPollers;
private String beanName;
private boolean defaultTaskExecutor = false;
private boolean autoStartup = true;
Expand Down Expand Up @@ -173,17 +173,6 @@ public void setMaxNumWorkers(int maxNumWorkers) {
this.maxNumWorkers = maxNumWorkers;
}

public Integer getMaxNumPollers() {
return maxNumPollers;
}

public void setMaxNumPollers(int maxNumPollers) {
if (maxNumPollers < 1) {
throw new IllegalArgumentException("maxNumPollers must be greater than zero");
}
this.maxNumPollers = maxNumPollers;
}

public long getBackOffTime() {
return backOffTime;
}
Expand Down Expand Up @@ -339,15 +328,7 @@ private void initializeRunningQueueState() {
}

private int getWorkersCount(int queueCount) {
if (queueCount == 0) {
return 0;
}
if (maxNumWorkers != null) {
return maxNumWorkers;
}
int cpuBasedMaxWorkerCount = SystemUtil.cpuCount() * CPU_MULTIPLICATION_FACTOR_FOR_WORKER_POOL_SIZE;
int count = min(cpuBasedMaxWorkerCount, queueCount * DEFAULT_WORKER_COUNT_PER_QUEUE);
return min(count, MAX_WORKER_POOL_SIZE);
return (maxNumWorkers == null ? queueCount * DEFAULT_WORKER_COUNT_PER_QUEUE : maxNumWorkers);
}

private AsyncTaskExecutor createTaskExecutor(int corePoolSize, int maxPoolSize) {
Expand All @@ -357,38 +338,17 @@ private AsyncTaskExecutor createTaskExecutor(int corePoolSize, int maxPoolSize)
DEFAULT_THREAD_NAME_PREFIX, prefix, corePoolSize, maxPoolSize);
}

private int getCorePoolSize(int queueCount) {
return getPollerCount(queueCount);
}

private int getMaxPoolSize(int workerJobCount, int queueCount) {
// one job is enqueued for each poller so we need queueCount poller threads
// otherwise job would be rejected
return queueCount + workerJobCount;
}

private AsyncTaskExecutor createNonConcurrencyBasedExecutor(
List<QueueDetail> queueDetails, int queueCount) {
List<QueueDetail> queueDetails, int pollerCount) {
int workersCount = getWorkersCount(queueDetails.size());
int corePoolSize = getCorePoolSize(queueCount);
int maxPoolSize = getMaxPoolSize(workersCount, queueCount);
int maxPoolSize = workersCount + pollerCount;
// one thread for message poller and one for executor
int corePoolSize = queueDetails.size() + pollerCount;
AsyncTaskExecutor executor = createTaskExecutor(corePoolSize, maxPoolSize);
initializeThreadMap(queueDetails, executor, true, workersCount);
return executor;
}

private int getPollerCount(int queueCount) {
if (queueCount == 0) {
return 0;
}
if (maxNumPollers != null) {
return maxNumPollers;
}
int count =
min(queueCount, SystemUtil.cpuCount() * CPU_MULTIPLICATION_FACTOR_FOR_POLLER_POOL_SIZE);
return min(count, MAX_POLLER_POOL_SIZE);
}

private void createExecutor(QueueDetail queueDetail) {
Concurrency concurrency = queueDetail.getConcurrency();
AsyncTaskExecutor executor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,4 @@ public final class Constants {
public static final String REDIS_KEY_SEPARATOR = "::";
public static final int MAX_STACKTRACE_LENGTH = 3000;
public static final String Comma = ",";
public static final int CPU_MULTIPLICATION_FACTOR_FOR_WORKER_POOL_SIZE = 10;
public static final int MAX_WORKER_POOL_SIZE = 50;
public static final int CPU_MULTIPLICATION_FACTOR_FOR_POLLER_POOL_SIZE = 5;
public static final int MAX_POLLER_POOL_SIZE = 10;
}
8 changes: 8 additions & 0 deletions rqueue-spring-boot-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## Rqueue Spring Boot Sample App

This sample project should be used only for spring-boot application.

* For spring-mvc refer
to [rqueue-spring-example](https://github.com/sonus21/rqueue/tree/master/rqueue-spring-example)
* For spring-boot reactive/webflux refer
to [rqueue-spring-boot-reactive-example](https://github.com/sonus21/rqueue/tree/master/rqueue-spring-boot-reactive-example)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.github.sonus21.rqueue.example;

import com.github.sonus21.rqueue.core.RqueueMessageSender;
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -30,7 +30,7 @@
@Slf4j
public class Controller {

private final RqueueMessageSender rqueueMessageSender;
private final RqueueMessageEnqueuer rqueueMessageEnqueuer;

@GetMapping(value = "/push")
public String push(
Expand All @@ -39,32 +39,39 @@ public String push(
@RequestParam(required = false) Integer numRetries,
@RequestParam(required = false) Long delay) {
if (numRetries == null && delay == null) {
rqueueMessageSender.enqueue(q, msg);
rqueueMessageEnqueuer.enqueue(q, msg);
} else if (numRetries == null) {
rqueueMessageSender.enqueueIn(q, msg, delay);
rqueueMessageEnqueuer.enqueueIn(q, msg, delay);
} else {
rqueueMessageSender.enqueueInWithRetry(q, msg, numRetries, delay);
rqueueMessageEnqueuer.enqueueInWithRetry(q, msg, numRetries, delay);
}
log.info("Message {}", msg);
return "Message sent successfully";
}

private String getQueue(String queue) {
if (queue == null) {
return "job-queue";
}
return queue;
}

@GetMapping("job")
public String sendJobNotification() {
public String sendJobNotification(@RequestParam(required = false) String queue) {
Job job = new Job();
job.setId(UUID.randomUUID().toString());
job.setMessage("Hi this is " + job.getId());
rqueueMessageSender.enqueue("job-queue", job);
rqueueMessageEnqueuer.enqueue(getQueue(queue), job);
log.info("{}", job);
return job.toString();
}

@GetMapping("job-delay")
public String sendJobNotificationWithDelay() {
public String sendJobNotificationWithDelay(@RequestParam(required = false) String queue) {
Job job = new Job();
job.setId(UUID.randomUUID().toString());
job.setMessage("Hi this is " + job.getId());
rqueueMessageSender.enqueueIn("job-queue", job, 2000L);
rqueueMessageEnqueuer.enqueueIn(getQueue(queue), job, 2000L);
return job.toString();
}
}

0 comments on commit bc86e40

Please sign in to comment.