Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;

import javax.xml.parsers.ParserConfigurationException;
Expand All @@ -34,6 +36,7 @@
public abstract class AbstractFunctionalTest extends BasicJavaClientREST {

protected final static String DB_NAME = "java-functest";
protected final Logger logger = LoggerFactory.getLogger(getClass());

protected final static ObjectMapper objectMapper = new ObjectMapper();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.marklogic.client.fastfunctest.datamovement;

import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.fastfunctest.AbstractFunctionalTest;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class AdjustQueryBatcherThreadCountTest extends AbstractFunctionalTest {

@Test
void increaseThreadCount() {
List<String> uris = writeJsonDocs(20);
Set<String> threadNames = Collections.synchronizedSet(new HashSet<>());
AtomicInteger uriCount = new AtomicInteger();

DataMovementManager dmm = client.newDataMovementManager();
QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
.withThreadCount(1)
.withBatchSize(1)
.onUrisReady(batch -> {
waitFor(50);
threadNames.add(Thread.currentThread().getName());
uriCount.addAndGet(batch.getItems().length);
});

dmm.startJob(qb);
waitFor(100);
qb.withThreadCount(3);
qb.awaitCompletion();
dmm.stopJob(qb);

assertEquals(20, uriCount.get());
assertEquals(3, threadNames.size(), "3 threads should have processed all the batches, as the thread count " +
"was increased from 1 to 3 100ms into the job.");
}

@Test
void reduceThreadCount() {
List<String> uris = writeJsonDocs(20);
List<String> threadNames = Collections.synchronizedList(new ArrayList<>());

DataMovementManager dmm = client.newDataMovementManager();
QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
.withThreadCount(4)
.withBatchSize(1)
.onUrisReady(batch -> {
waitFor(50);
threadNames.add(Thread.currentThread().getName());
});

dmm.startJob(qb);
waitFor(100);
qb.withThreadCount(2);
qb.awaitCompletion();
dmm.stopJob(qb);

assertEquals(20, threadNames.size(), "With 20 docs and a batch size of 1, the onUrisReady listener should " +
"have been called 20 times and thus captured 20 names.");

Set<String> lastEightThreadNames = new HashSet<>(threadNames.subList(12, 19));
assertEquals(2, lastEightThreadNames.size(), "Since the thread count was reduced from 4 to 2 100ms into the " +
"job, then we can assume that 8 batches of size 1 were processed by 4 threads during those first 100ms, " +
"as there's a 50ms pause in the onUrisReady listener. The thread count would have been reduced to 2. In " +
"theory, the last 12 batches should have only been processed by those 2 threads. But just to be safe, " +
"we verify that the last 8 batches were only processed by 2 threads in case it took the thread pool a " +
"little bit of time to switch from 4 to 2 threads.");
}

@Test
void setThreadCountToOneAndThenHigher() {
List<String> uris = writeJsonDocs(20);
AtomicInteger uriCount = new AtomicInteger();

DataMovementManager dmm = client.newDataMovementManager();
QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
.withThreadCount(4)
.withBatchSize(1)
.onUrisReady(batch -> {
waitFor(50);
uriCount.addAndGet(batch.getItems().length);
});

dmm.startJob(qb);
waitFor(100);
qb.withThreadCount(1);
qb.withThreadCount(8);
qb.awaitCompletion();
dmm.stopJob(qb);

assertEquals(20, uriCount.get(), "The purpose of this test is to verify that if the thread count is set to 1, " +
"the thread pool doesn't stop or throw an error. It may pause execution (for reasons that aren't known), " +
"but testing has shown that increasing it to a value greater than 1 will cause execution to resume if it " +
"was in fact paused.");
}

@Test
void setThreadCountToZero() {
List<String> uris = writeJsonDocs(20);
AtomicInteger uriCount = new AtomicInteger();

DataMovementManager dmm = client.newDataMovementManager();
QueryBatcher qb = dmm.newQueryBatcher(uris.iterator())
.withThreadCount(4)
.withBatchSize(1)
.onUrisReady(batch -> {
waitFor(50);
uriCount.addAndGet(batch.getItems().length);
});

dmm.startJob(qb);
waitFor(100);

IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> qb.withThreadCount(0));
assertEquals("threadCount must be 1 or greater", ex.getMessage());
assertEquals(4, qb.getThreadCount(), "The thread count should not have been adjusted since the input was invalid");

qb.awaitCompletion();
dmm.stopJob(qb);

assertEquals(20, uriCount.get(), "All 20 URIs should still have been retrieved");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public interface Batcher {
* that batches will be processed sequentially because the calling thread
* will sometimes also process batches.</p>
*
* <p>This method cannot be called after the job has started.</p>
* <p>Unless otherwise noted by a subclass, this method cannot be called after the job has started.</p>
*
* @param threadCount the number of threads to use in this Batcher
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public interface QueryBatcher extends Batcher {
QueryBatcher onQueryFailure(QueryFailureListener listener);

/**
* <p>Add a listener to run when the Query job is completed i.e. when all the
* <p>Add a listener to run when the Query job is completed i.e. when all the
* document URIs are retrieved and the associated listeners are completed</p>
*
* @param listener the code to run when the Query job is completed
Expand Down Expand Up @@ -333,6 +333,12 @@ public interface QueryBatcher extends Batcher {
* threads used for processing the queued batches (running processEvent on
* the listeners regiested with onUrisReady).
*
* As of the 6.2.0 release, this can now be adjusted after the batcher has been started. The underlying Java
* {@code ThreadPoolExecutor} will have both its core and max pool sizes set to the given thread count. Use caution
* when reducing this to a value of 1 while the batcher is running; in some cases, the underlying
* {@code ThreadPoolExecutor} may halt execution of any tasks. Execution can be resumed by increasing the thread count
* to a value of 2 or higher.
*
* @return this instance for method chaining
*/
@Override
Expand Down Expand Up @@ -391,19 +397,19 @@ public interface QueryBatcher extends Batcher {
* Retry in the same thread to query a batch that failed. If it fails again,
* all the failure listeners associated with the batcher using onQueryFailure
* method would be processed.
*
*
* Note : Use this method with caution as there is a possibility of infinite
* loops. If a batch fails and one of the failure listeners calls this method
* to retry with failure listeners and if the batch again fails, this would go
* on as an infinite loop until the batch succeeds.
*
*
* @param queryEvent the information about the batch that failed
*/
void retryWithFailureListeners(QueryEvent queryEvent);

/**
* Sets the limit for the maximum number of batches that can be collected.
*
*
* @param maxBatches is the value of the limit.
*/
void setMaxBatches(long maxBatches);
Expand All @@ -412,10 +418,10 @@ public interface QueryBatcher extends Batcher {
* Caps the query at the current batch.
*/
void setMaxBatches();

/**
* Returns the maximum number of Batches for the current job.
*
* Returns the maximum number of Batches for the current job.
*
* @return the maximum number of Batches that can be collected.
*/
long getMaxBatches();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,17 @@ public int getMaxDocToUriBatchRatio() {

@Override
public QueryBatcher withThreadCount(int threadCount) {
requireNotStarted();
if ( getThreadCount() <= 0 ) {
if (threadCount <= 0 ) {
throw new IllegalArgumentException("threadCount must be 1 or greater");
}
threadCountSet = true;
super.withThreadCount(threadCount);
if (threadPool != null) {
logger.info("Adjusting thread pool size from {} to {}", getThreadCount(), threadCount);
threadPool.setCorePoolSize(threadCount);
threadPool.setMaximumPoolSize(threadCount);
} else {
threadCountSet = true;
}
super.withThreadCount(threadCount);
return this;
}

Expand Down