Skip to content

Commit

Permalink
get pending/schedule/processing message count
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed Feb 22, 2021
1 parent 6469f9f commit f8ab976
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
import com.github.sonus21.rqueue.dao.RqueueStringDao;
import com.github.sonus21.rqueue.dao.impl.RqueueStringDaoImpl;
import com.github.sonus21.rqueue.metrics.RqueueQueueMetrics;
import com.github.sonus21.rqueue.utils.RedisUtils;
import com.github.sonus21.rqueue.web.view.DateTimeFunction;
import com.github.sonus21.rqueue.web.view.DeadLetterQueuesFunction;
Expand Down Expand Up @@ -186,4 +187,10 @@ public JtwigViewResolver rqueueViewResolver() {
viewResolver.setSuffix(".html");
return viewResolver;
}

@Bean
public RqueueQueueMetrics rqueueQueueMetrics(
RqueueRedisTemplate<String> stringRqueueRedisTemplate) {
return new RqueueQueueMetrics(stringRqueueRedisTemplate);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2021 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.github.sonus21.rqueue.metrics;

import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
import com.github.sonus21.rqueue.core.EndpointRegistry;
import com.github.sonus21.rqueue.exception.QueueDoesNotExist;
import com.github.sonus21.rqueue.listener.QueueDetail;

/**
* This class reports queue message counter.
*
* <p>Count can be sent to some monitoring tool like Prometheus, influx db etc
*/
public class RqueueQueueMetrics {

private final RqueueRedisTemplate<String> redisTemplate;

public RqueueQueueMetrics(RqueueRedisTemplate<String> redisTemplate) {
this.redisTemplate = redisTemplate;
}

/**
* Get number of messages waiting for consumption
*
* @param queue queue name
* @return -1 if queue is not registered otherwise message count
*/
public long getPendingMessageCount(String queue) {
try {
QueueDetail queueDetail = EndpointRegistry.get(queue);
return redisTemplate.getListSize(queueDetail.getQueueName());
} catch (QueueDoesNotExist e) {
return -1;
}
}

/**
* Get number of messages waiting in delayed queue, these messages would move to pending queue as
* soon as the scheduled time is reach.
*
* @param queue queue name
* @return -1 if queue is not registered otherwise message count
*/
public long getScheduledMessageCount(String queue) {
try {
QueueDetail queueDetail = EndpointRegistry.get(queue);
return redisTemplate.getZsetSize(queueDetail.getDelayedQueueName());
} catch (QueueDoesNotExist e) {
return -1;
}
}

/**
* Get number of messages those are currently being processed
*
* @param queue queue name
* @return -1 if queue is not registered otherwise message count
*/
public long getProcessingMessageCount(String queue) {
try {
QueueDetail queueDetail = EndpointRegistry.get(queue);
return redisTemplate.getZsetSize(queueDetail.getProcessingQueueName());
} catch (QueueDoesNotExist e) {
return -1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,9 @@ void metricStatus() throws TimedOutException {
void countStatusTest() throws TimedOutException {
this.verifyCountStatus();
}

@Test
void messageCount() throws TimedOutException {
verifyQueueMessageCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@
import static com.github.sonus21.rqueue.utils.TimeoutUtils.waitFor;
import static com.google.common.collect.Lists.newArrayList;

import com.github.sonus21.rqueue.core.EndpointRegistry;
import com.github.sonus21.rqueue.exception.TimedOutException;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.metrics.RqueueQueueMetrics;
import com.github.sonus21.rqueue.test.common.SpringTestBase;
import com.github.sonus21.rqueue.test.dto.Email;
import com.github.sonus21.rqueue.test.dto.Job;
import com.github.sonus21.rqueue.test.dto.Notification;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.TimeoutUtils;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Instant;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class MetricTest extends SpringTestBase {
@Autowired protected MeterRegistry meterRegistry;

@Autowired
protected MeterRegistry meterRegistry;
@Autowired
protected RqueueQueueMetrics rqueueQueueMetrics;

protected void verifyDelayedQueueStatus() throws TimedOutException {
long maxDelay = 0;
Expand Down Expand Up @@ -141,12 +149,25 @@ protected void verifyCountStatus() throws TimedOutException {
waitFor(
() ->
meterRegistry
.get("failure.count")
.tags("rqueue", "test")
.tags("queue", emailQueue)
.counter()
.count()
.get("failure.count")
.tags("rqueue", "test")
.tags("queue", emailQueue)
.counter()
.count()
== 0,
"stats collection");
}

protected void verifyQueueMessageCount() throws TimedOutException {
QueueDetail queueDetail = EndpointRegistry.get(notificationQueue);
enqueue(queueDetail.getQueueName(), i -> Notification.newInstance(), 1000);
enqueueIn(queueDetail.getDelayedQueueName(), i -> Notification.newInstance(), i -> 30_000L,
100);
TimeoutUtils.waitFor(() -> rqueueQueueMetrics.getProcessingMessageCount(notificationQueue) > 0,
"at least one message in processing");
TimeoutUtils.waitFor(() -> rqueueQueueMetrics.getScheduledMessageCount(notificationQueue) > 0,
"at least one message in scheduled queue");
TimeoutUtils.waitFor(() -> rqueueQueueMetrics.getPendingMessageCount(notificationQueue) > 0,
"at least one message in pending queue");
}
}

0 comments on commit f8ab976

Please sign in to comment.