My common queue library implements queue operations.
The purpose of the library is to provide higher-level abstractions of queue operations.
- DelayQueue implemented by Redis sorted set
- MemoryLimitedLinkedBlockingQueue: LinkedBlockingQueue with limited memory footprint
- MemorySafeLinkedBlockingQueue: Memory safe LinkedBlockingQueue
<dependency>
<groupId>cn.yusiwen.commons</groupId>
<artifactId>commons-queue</artifactId>
<version>1.0.2.1</version>
</dependency>
Initialize queue:
public void init() {
redisClient = RedisClient.create("redis://<redis-server>:6379");
RedisDelayQueue.Builder builder = redisDelayQueue();
builder.client(redisClient); // Set redis client
builder.mapper(objectMapper); // Set ObjectMapper
builder.handlerScheduler(Schedulers.fromExecutorService(executor)); // Set scheduler for handlers
builder.enableScheduling(true); // Enable scheduling
builder.schedulingInterval(Duration.ofSeconds(1)); // Set scheduling interval
builder.schedulingBatchSize(SCHEDULING_BATCH_SIZE); // Set prefetch size for backpressure
builder.pollingTimeout(POLLING_TIMEOUT); // Set polling timeout
builder.taskContextHandler(new DefaultTaskContextHandler());
builder.dataSetPrefix("");
builder.retryAttempts(10); // Set max attempts
builder.metrics(new NoopMetrics());
builder.refreshSubscriptionsInterval(Duration.ofMinutes(5)); // Set interval for refreshing subcription
queue = builder.build();
}
Add handler and task:
queue.addTaskHandler(DemoTask.class, e -> Mono.fromCallable(() -> {
LOG.info("DemoTask received, id = {}", e.getId());
return TRUE;
}), 1);
queue.enqueue(new DemoTask("1"), Duration.ofSeconds(10)).subscribe();
The DemoTask
will be triggered in 10 seconds.
More details in Demo.java
MemorySafeLinkedBlockingQueue<MyData> queue = new MemorySafeLinkedBlockingQueue<>(maxFreeMemory);
MemoryLimitedLinkedBlockingQueue<MyData> queue = new MemoryLimitedLinkedBlockingQueue<>(memoryLimit, instrumentation)
See Demo.java for more details.