Skip to content

Commit

Permalink
resource was not getting released due to sleep in the deactivate method
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed Dec 24, 2022
1 parent b6f0194 commit 1fa34f0
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 34 deletions.
6 changes: 3 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,12 @@ workflows:
- integration_test:
requires:
- producer_only_test
- reactive_integration_test:
- redis_custer_test:
requires:
- integration_test
- redis_custer_test:
- reactive_integration_test:
requires:
- reactive_integration_test
- redis_custer_test
- report_code_coverage:
requires:
- redis_custer_test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr;
import com.github.sonus21.rqueue.utils.QueueThreadPool;
import com.github.sonus21.rqueue.utils.TimeoutUtils;

import java.util.Collections;
import java.util.List;

import org.slf4j.event.Level;
import org.springframework.messaging.MessageHeaders;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.QueueThreadPool;
import java.util.List;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.CollectionUtils;

import java.util.List;

abstract class RqueueMessagePoller extends MessageContainerBase {

final List<Middleware> middlewares;
Expand Down Expand Up @@ -100,17 +101,13 @@ boolean shouldExit() {
return true;
}

protected boolean hasAvailableThreads(QueueDetail queueDetail, QueueThreadPool queueThreadPool){
return getAvailablePoolSize(queueDetail, queueThreadPool) > 0;
protected boolean hasAvailableThreads(QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
return queueThreadPool.availableThreads() > 0;
}

protected int getAvailablePoolSize(QueueDetail queueDetail, QueueThreadPool queueThreadPool){
int poolSize = Math.min(queueDetail.getBatchSize(), queueThreadPool.availableThreads());
return poolSize;
}

protected int getBatchSize(QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
int batchSize = getAvailablePoolSize(queueDetail, queueThreadPool);
int batchSize = Math.min(queueDetail.getBatchSize(), queueThreadPool.availableThreads());
batchSize = Math.max(batchSize, Constants.MIN_BATCH_SIZE);
log(Level.DEBUG, "Batch size {}", null, batchSize);
return batchSize;
Expand All @@ -125,6 +122,8 @@ private void sendMessagesToExecutor(
}
}

// at this point of time, we've acquired batchSize semaphore that means its guarantee that we'll
// have batchSize available threads in the thread pool
private void pollAndExecute(
int index,
String queue,
Expand All @@ -136,23 +135,26 @@ private void pollAndExecute(
List<RqueueMessage> messages = getMessages(queueDetail, batchSize);
log(Level.TRACE, "Queue: {} Fetched Msgs {}", null, queue, messages);
int messageCount = CollectionUtils.isEmpty(messages) ? 0 : messages.size();
if (messageCount == 0) {
deactivate(index, queue, DeactivateType.NO_MESSAGE);
}
// free additional required threads e.g 10 asked but only 5 messages are there
// free additional requested threads e.g 10 requested but only 5 messages are there
queueThreadPool.release(batchSize - messageCount);
if (messageCount > 0) {
sendMessagesToExecutor(queueDetail, queueThreadPool, messages);
} else {
deactivate(index, queue, DeactivateType.NO_MESSAGE);
}
} catch (Exception e) {
queueThreadPool.release(batchSize);
log(Level.WARN, "Listener failed for the queue {}", e, queue);
deactivate(index, queue, DeactivateType.POLL_FAILED);
}
} else {
// release resource
queueThreadPool.release(batchSize);
}
}

protected void poll(int index, String queue, QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
void poll(int index, String queue, QueueDetail queueDetail,
QueueThreadPool queueThreadPool) {
log(Level.TRACE, "Polling queue {}", null, queue);
int batchSize = getBatchSize(queueDetail, queueThreadPool);
boolean acquired;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright 2022 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright 2022 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,11 +28,7 @@
import com.github.sonus21.rqueue.models.request.QueueExploreRequest;
import com.github.sonus21.rqueue.spring.boot.reactive.ReactiveWebApplication;
import com.github.sonus21.rqueue.spring.boot.tests.SpringBootIntegrationTest;
import com.github.sonus21.rqueue.test.dto.Email;
import com.github.sonus21.rqueue.test.dto.Job;
import com.github.sonus21.rqueue.test.tests.BasicListenerTest;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.TimeoutUtils;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -100,14 +96,6 @@ void testPath(String path) throws Exception {

@Test
void getChartLatency() throws Exception {
for (int i = 0; i < 100; i++) {
Job job = Job.newInstance();
enqueue(jobQueue, job);
}
TimeoutUtils.waitFor(
() -> getMessageCount(jobQueue) == 0,
Constants.SECONDS_IN_A_MINUTE * Constants.ONE_MILLI,
"Job to run");
ChartDataRequest chartDataRequest =
new ChartDataRequest(ChartType.LATENCY, AggregationType.DAILY);
this.webTestClient
Expand All @@ -125,7 +113,6 @@ void getChartLatency() throws Exception {

@Test
void exploreData() throws Exception {
enqueue(emailDeadLetterQueue, i -> Email.newInstance(), 30, true);
QueueExploreRequest request = new QueueExploreRequest();
request.setType(DataType.LIST);
request.setSrc(emailQueue);
Expand Down Expand Up @@ -197,7 +184,6 @@ void moveMessage() throws Exception {

@Test
void viewData() throws Exception {
enqueue(emailDeadLetterQueue, i -> Email.newInstance(), 30, true);
DateViewRequest dateViewRequest = new DateViewRequest();
dateViewRequest.setName(emailDeadLetterQueue);
dateViewRequest.setType(DataType.LIST);
Expand Down

0 comments on commit 1fa34f0

Please sign in to comment.