diff --git a/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java index a92f004ca..89de98454 100644 --- a/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java +++ b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java @@ -17,6 +17,8 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; import javax.xml.parsers.ParserConfigurationException; @@ -34,6 +36,7 @@ public abstract class AbstractFunctionalTest extends BasicJavaClientREST { protected final static String DB_NAME = "java-functest"; + protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final static ObjectMapper objectMapper = new ObjectMapper(); diff --git a/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/datamovement/AdjustQueryBatcherThreadCountTest.java b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/datamovement/AdjustQueryBatcherThreadCountTest.java new file mode 100644 index 000000000..3ce0c9c9b --- /dev/null +++ b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/datamovement/AdjustQueryBatcherThreadCountTest.java @@ -0,0 +1,132 @@ +package com.marklogic.client.fastfunctest.datamovement; + +import com.marklogic.client.datamovement.DataMovementManager; +import com.marklogic.client.datamovement.QueryBatcher; +import com.marklogic.client.fastfunctest.AbstractFunctionalTest; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AdjustQueryBatcherThreadCountTest extends AbstractFunctionalTest { + + @Test + void increaseThreadCount() { + List uris = writeJsonDocs(20); + Set threadNames = Collections.synchronizedSet(new HashSet<>()); + AtomicInteger uriCount = new AtomicInteger(); + + DataMovementManager dmm = client.newDataMovementManager(); + QueryBatcher qb = dmm.newQueryBatcher(uris.iterator()) + .withThreadCount(1) + .withBatchSize(1) + .onUrisReady(batch -> { + waitFor(50); + threadNames.add(Thread.currentThread().getName()); + uriCount.addAndGet(batch.getItems().length); + }); + + dmm.startJob(qb); + waitFor(100); + qb.withThreadCount(3); + qb.awaitCompletion(); + dmm.stopJob(qb); + + assertEquals(20, uriCount.get()); + assertEquals(3, threadNames.size(), "3 threads should have processed all the batches, as the thread count " + + "was increased from 1 to 3 100ms into the job."); + } + + @Test + void reduceThreadCount() { + List uris = writeJsonDocs(20); + List threadNames = Collections.synchronizedList(new ArrayList<>()); + + DataMovementManager dmm = client.newDataMovementManager(); + QueryBatcher qb = dmm.newQueryBatcher(uris.iterator()) + .withThreadCount(4) + .withBatchSize(1) + .onUrisReady(batch -> { + waitFor(50); + threadNames.add(Thread.currentThread().getName()); + }); + + dmm.startJob(qb); + waitFor(100); + qb.withThreadCount(2); + qb.awaitCompletion(); + dmm.stopJob(qb); + + assertEquals(20, threadNames.size(), "With 20 docs and a batch size of 1, the onUrisReady listener should " + + "have been called 20 times and thus captured 20 names."); + + Set lastEightThreadNames = new HashSet<>(threadNames.subList(12, 19)); + assertEquals(2, lastEightThreadNames.size(), "Since the thread count was reduced from 4 to 2 100ms into the " + + "job, then we can assume that 8 batches of size 1 were processed by 4 threads during those first 100ms, " + + "as there's a 50ms pause in the onUrisReady listener. The thread count would have been reduced to 2. In " + + "theory, the last 12 batches should have only been processed by those 2 threads. But just to be safe, " + + "we verify that the last 8 batches were only processed by 2 threads in case it took the thread pool a " + + "little bit of time to switch from 4 to 2 threads."); + } + + @Test + void setThreadCountToOneAndThenHigher() { + List uris = writeJsonDocs(20); + AtomicInteger uriCount = new AtomicInteger(); + + DataMovementManager dmm = client.newDataMovementManager(); + QueryBatcher qb = dmm.newQueryBatcher(uris.iterator()) + .withThreadCount(4) + .withBatchSize(1) + .onUrisReady(batch -> { + waitFor(50); + uriCount.addAndGet(batch.getItems().length); + }); + + dmm.startJob(qb); + waitFor(100); + qb.withThreadCount(1); + qb.withThreadCount(8); + qb.awaitCompletion(); + dmm.stopJob(qb); + + assertEquals(20, uriCount.get(), "The purpose of this test is to verify that if the thread count is set to 1, " + + "the thread pool doesn't stop or throw an error. It may pause execution (for reasons that aren't known), " + + "but testing has shown that increasing it to a value greater than 1 will cause execution to resume if it " + + "was in fact paused."); + } + + @Test + void setThreadCountToZero() { + List uris = writeJsonDocs(20); + AtomicInteger uriCount = new AtomicInteger(); + + DataMovementManager dmm = client.newDataMovementManager(); + QueryBatcher qb = dmm.newQueryBatcher(uris.iterator()) + .withThreadCount(4) + .withBatchSize(1) + .onUrisReady(batch -> { + waitFor(50); + uriCount.addAndGet(batch.getItems().length); + }); + + dmm.startJob(qb); + waitFor(100); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> qb.withThreadCount(0)); + assertEquals("threadCount must be 1 or greater", ex.getMessage()); + assertEquals(4, qb.getThreadCount(), "The thread count should not have been adjusted since the input was invalid"); + + qb.awaitCompletion(); + dmm.stopJob(qb); + + assertEquals(20, uriCount.get(), "All 20 URIs should still have been retrieved"); + } +} diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/Batcher.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/Batcher.java index d8d51c34f..4fb83819e 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/Batcher.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/Batcher.java @@ -74,7 +74,7 @@ public interface Batcher { * that batches will be processed sequentially because the calling thread * will sometimes also process batches.

* - *

This method cannot be called after the job has started.

+ *

Unless otherwise noted by a subclass, this method cannot be called after the job has started.

* * @param threadCount the number of threads to use in this Batcher * diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/QueryBatcher.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/QueryBatcher.java index db9ade49e..d273b9db7 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/QueryBatcher.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/QueryBatcher.java @@ -142,7 +142,7 @@ public interface QueryBatcher extends Batcher { QueryBatcher onQueryFailure(QueryFailureListener listener); /** - *

Add a listener to run when the Query job is completed i.e. when all the + *

Add a listener to run when the Query job is completed i.e. when all the * document URIs are retrieved and the associated listeners are completed

* * @param listener the code to run when the Query job is completed @@ -333,6 +333,12 @@ public interface QueryBatcher extends Batcher { * threads used for processing the queued batches (running processEvent on * the listeners regiested with onUrisReady). * + * As of the 6.2.0 release, this can now be adjusted after the batcher has been started. The underlying Java + * {@code ThreadPoolExecutor} will have both its core and max pool sizes set to the given thread count. Use caution + * when reducing this to a value of 1 while the batcher is running; in some cases, the underlying + * {@code ThreadPoolExecutor} may halt execution of any tasks. Execution can be resumed by increasing the thread count + * to a value of 2 or higher. + * * @return this instance for method chaining */ @Override @@ -391,19 +397,19 @@ public interface QueryBatcher extends Batcher { * Retry in the same thread to query a batch that failed. If it fails again, * all the failure listeners associated with the batcher using onQueryFailure * method would be processed. - * + * * Note : Use this method with caution as there is a possibility of infinite * loops. If a batch fails and one of the failure listeners calls this method * to retry with failure listeners and if the batch again fails, this would go * on as an infinite loop until the batch succeeds. - * + * * @param queryEvent the information about the batch that failed */ void retryWithFailureListeners(QueryEvent queryEvent); - + /** * Sets the limit for the maximum number of batches that can be collected. - * + * * @param maxBatches is the value of the limit. */ void setMaxBatches(long maxBatches); @@ -412,10 +418,10 @@ public interface QueryBatcher extends Batcher { * Caps the query at the current batch. */ void setMaxBatches(); - + /** - * Returns the maximum number of Batches for the current job. - * + * Returns the maximum number of Batches for the current job. + * * @return the maximum number of Batches that can be collected. */ long getMaxBatches(); diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java index 13d086d17..e565e3e9c 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java @@ -364,12 +364,17 @@ public int getMaxDocToUriBatchRatio() { @Override public QueryBatcher withThreadCount(int threadCount) { - requireNotStarted(); - if ( getThreadCount() <= 0 ) { + if (threadCount <= 0 ) { throw new IllegalArgumentException("threadCount must be 1 or greater"); } - threadCountSet = true; - super.withThreadCount(threadCount); + if (threadPool != null) { + logger.info("Adjusting thread pool size from {} to {}", getThreadCount(), threadCount); + threadPool.setCorePoolSize(threadCount); + threadPool.setMaximumPoolSize(threadCount); + } else { + threadCountSet = true; + } + super.withThreadCount(threadCount); return this; }