From 9a11188975439817d69b2c52cf34721f1674b739 Mon Sep 17 00:00:00 2001
From: Rob Rudin
Date: Thu, 30 Mar 2023 11:26:29 -0400
Subject: [PATCH] DEVEXP-366 Can now adjust thread count on QueryBatcher
---
.../fastfunctest/AbstractFunctionalTest.java | 3 +
.../AdjustQueryBatcherThreadCountTest.java | 132 ++++++++++++++++++
.../client/datamovement/Batcher.java | 2 +-
.../client/datamovement/QueryBatcher.java | 22 +--
.../datamovement/impl/QueryBatcherImpl.java | 13 +-
5 files changed, 159 insertions(+), 13 deletions(-)
create mode 100644 marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/datamovement/AdjustQueryBatcherThreadCountTest.java
diff --git a/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java
index a92f004ca..89de98454 100644
--- a/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java
+++ b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java
@@ -17,6 +17,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
@@ -34,6 +36,7 @@
public abstract class AbstractFunctionalTest extends BasicJavaClientREST {
protected final static String DB_NAME = "java-functest";
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
protected final static ObjectMapper objectMapper = new ObjectMapper();
diff --git a/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/datamovement/AdjustQueryBatcherThreadCountTest.java b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/datamovement/AdjustQueryBatcherThreadCountTest.java
new file mode 100644
index 000000000..3ce0c9c9b
--- /dev/null
+++ b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/datamovement/AdjustQueryBatcherThreadCountTest.java
@@ -0,0 +1,132 @@
+package com.marklogic.client.fastfunctest.datamovement;
+
+import com.marklogic.client.datamovement.DataMovementManager;
+import com.marklogic.client.datamovement.QueryBatcher;
+import com.marklogic.client.fastfunctest.AbstractFunctionalTest;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class AdjustQueryBatcherThreadCountTest extends AbstractFunctionalTest {
+
+ @Test
+ void increaseThreadCount() {
+ List uris = writeJsonDocs(20);
+ Set threadNames = Collections.synchronizedSet(new HashSet<>());
+ AtomicInteger uriCount = new AtomicInteger();
+
+ DataMovementManager dmm = client.newDataMovementManager();
+ QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
+ .withThreadCount(1)
+ .withBatchSize(1)
+ .onUrisReady(batch -> {
+ waitFor(50);
+ threadNames.add(Thread.currentThread().getName());
+ uriCount.addAndGet(batch.getItems().length);
+ });
+
+ dmm.startJob(qb);
+ waitFor(100);
+ qb.withThreadCount(3);
+ qb.awaitCompletion();
+ dmm.stopJob(qb);
+
+ assertEquals(20, uriCount.get());
+ assertEquals(3, threadNames.size(), "3 threads should have processed all the batches, as the thread count " +
+ "was increased from 1 to 3 100ms into the job.");
+ }
+
+ @Test
+ void reduceThreadCount() {
+ List uris = writeJsonDocs(20);
+ List threadNames = Collections.synchronizedList(new ArrayList<>());
+
+ DataMovementManager dmm = client.newDataMovementManager();
+ QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
+ .withThreadCount(4)
+ .withBatchSize(1)
+ .onUrisReady(batch -> {
+ waitFor(50);
+ threadNames.add(Thread.currentThread().getName());
+ });
+
+ dmm.startJob(qb);
+ waitFor(100);
+ qb.withThreadCount(2);
+ qb.awaitCompletion();
+ dmm.stopJob(qb);
+
+ assertEquals(20, threadNames.size(), "With 20 docs and a batch size of 1, the onUrisReady listener should " +
+ "have been called 20 times and thus captured 20 names.");
+
+ Set lastEightThreadNames = new HashSet<>(threadNames.subList(12, 19));
+ assertEquals(2, lastEightThreadNames.size(), "Since the thread count was reduced from 4 to 2 100ms into the " +
+ "job, then we can assume that 8 batches of size 1 were processed by 4 threads during those first 100ms, " +
+ "as there's a 50ms pause in the onUrisReady listener. The thread count would have been reduced to 2. In " +
+ "theory, the last 12 batches should have only been processed by those 2 threads. But just to be safe, " +
+ "we verify that the last 8 batches were only processed by 2 threads in case it took the thread pool a " +
+ "little bit of time to switch from 4 to 2 threads.");
+ }
+
+ @Test
+ void setThreadCountToOneAndThenHigher() {
+ List uris = writeJsonDocs(20);
+ AtomicInteger uriCount = new AtomicInteger();
+
+ DataMovementManager dmm = client.newDataMovementManager();
+ QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
+ .withThreadCount(4)
+ .withBatchSize(1)
+ .onUrisReady(batch -> {
+ waitFor(50);
+ uriCount.addAndGet(batch.getItems().length);
+ });
+
+ dmm.startJob(qb);
+ waitFor(100);
+ qb.withThreadCount(1);
+ qb.withThreadCount(8);
+ qb.awaitCompletion();
+ dmm.stopJob(qb);
+
+ assertEquals(20, uriCount.get(), "The purpose of this test is to verify that if the thread count is set to 1, " +
+ "the thread pool doesn't stop or throw an error. It may pause execution (for reasons that aren't known), " +
+ "but testing has shown that increasing it to a value greater than 1 will cause execution to resume if it " +
+ "was in fact paused.");
+ }
+
+ @Test
+ void setThreadCountToZero() {
+ List uris = writeJsonDocs(20);
+ AtomicInteger uriCount = new AtomicInteger();
+
+ DataMovementManager dmm = client.newDataMovementManager();
+ QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
+ .withThreadCount(4)
+ .withBatchSize(1)
+ .onUrisReady(batch -> {
+ waitFor(50);
+ uriCount.addAndGet(batch.getItems().length);
+ });
+
+ dmm.startJob(qb);
+ waitFor(100);
+
+ IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> qb.withThreadCount(0));
+ assertEquals("threadCount must be 1 or greater", ex.getMessage());
+ assertEquals(4, qb.getThreadCount(), "The thread count should not have been adjusted since the input was invalid");
+
+ qb.awaitCompletion();
+ dmm.stopJob(qb);
+
+ assertEquals(20, uriCount.get(), "All 20 URIs should still have been retrieved");
+ }
+}
diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/Batcher.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/Batcher.java
index d8d51c34f..4fb83819e 100644
--- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/Batcher.java
+++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/Batcher.java
@@ -74,7 +74,7 @@ public interface Batcher {
* that batches will be processed sequentially because the calling thread
* will sometimes also process batches.
*
- * This method cannot be called after the job has started.
+ * Unless otherwise noted by a subclass, this method cannot be called after the job has started.
*
* @param threadCount the number of threads to use in this Batcher
*
diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/QueryBatcher.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/QueryBatcher.java
index db9ade49e..d273b9db7 100644
--- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/QueryBatcher.java
+++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/QueryBatcher.java
@@ -142,7 +142,7 @@ public interface QueryBatcher extends Batcher {
QueryBatcher onQueryFailure(QueryFailureListener listener);
/**
- * Add a listener to run when the Query job is completed i.e. when all the
+ *
Add a listener to run when the Query job is completed i.e. when all the
* document URIs are retrieved and the associated listeners are completed
*
* @param listener the code to run when the Query job is completed
@@ -333,6 +333,12 @@ public interface QueryBatcher extends Batcher {
* threads used for processing the queued batches (running processEvent on
* the listeners regiested with onUrisReady).
*
+ * As of the 6.2.0 release, this can now be adjusted after the batcher has been started. The underlying Java
+ * {@code ThreadPoolExecutor} will have both its core and max pool sizes set to the given thread count. Use caution
+ * when reducing this to a value of 1 while the batcher is running; in some cases, the underlying
+ * {@code ThreadPoolExecutor} may halt execution of any tasks. Execution can be resumed by increasing the thread count
+ * to a value of 2 or higher.
+ *
* @return this instance for method chaining
*/
@Override
@@ -391,19 +397,19 @@ public interface QueryBatcher extends Batcher {
* Retry in the same thread to query a batch that failed. If it fails again,
* all the failure listeners associated with the batcher using onQueryFailure
* method would be processed.
- *
+ *
* Note : Use this method with caution as there is a possibility of infinite
* loops. If a batch fails and one of the failure listeners calls this method
* to retry with failure listeners and if the batch again fails, this would go
* on as an infinite loop until the batch succeeds.
- *
+ *
* @param queryEvent the information about the batch that failed
*/
void retryWithFailureListeners(QueryEvent queryEvent);
-
+
/**
* Sets the limit for the maximum number of batches that can be collected.
- *
+ *
* @param maxBatches is the value of the limit.
*/
void setMaxBatches(long maxBatches);
@@ -412,10 +418,10 @@ public interface QueryBatcher extends Batcher {
* Caps the query at the current batch.
*/
void setMaxBatches();
-
+
/**
- * Returns the maximum number of Batches for the current job.
- *
+ * Returns the maximum number of Batches for the current job.
+ *
* @return the maximum number of Batches that can be collected.
*/
long getMaxBatches();
diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java
index 13d086d17..e565e3e9c 100644
--- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java
+++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java
@@ -364,12 +364,17 @@ public int getMaxDocToUriBatchRatio() {
@Override
public QueryBatcher withThreadCount(int threadCount) {
- requireNotStarted();
- if ( getThreadCount() <= 0 ) {
+ if (threadCount <= 0 ) {
throw new IllegalArgumentException("threadCount must be 1 or greater");
}
- threadCountSet = true;
- super.withThreadCount(threadCount);
+ if (threadPool != null) {
+ logger.info("Adjusting thread pool size from {} to {}", getThreadCount(), threadCount);
+ threadPool.setCorePoolSize(threadCount);
+ threadPool.setMaximumPoolSize(threadCount);
+ } else {
+ threadCountSet = true;
+ }
+ super.withThreadCount(threadCount);
return this;
}