Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import com.marklogic.client.MarkLogicVersion;
import com.marklogic.client.document.DocumentManager;
import com.marklogic.client.document.DocumentWriteSet;
import com.marklogic.client.document.JSONDocumentManager;
import com.marklogic.client.functionaltest.BasicJavaClientREST;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.FileHandle;
import com.marklogic.client.io.JacksonHandle;
import com.marklogic.mgmt.resource.databases.DatabaseManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -20,6 +22,8 @@
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

/**
Expand All @@ -31,6 +35,8 @@ public abstract class AbstractFunctionalTest extends BasicJavaClientREST {

protected final static String DB_NAME = "java-functest";

protected final static ObjectMapper objectMapper = new ObjectMapper();

protected static DatabaseClient client;
protected static DatabaseClient schemasClient;
protected static DatabaseClient adminModulesClient;
Expand Down Expand Up @@ -79,6 +85,31 @@ public static void classTearDown() {
schemasClient.release();
}

/**
* Convenience method for easily writing some JSON docs where the content of the docs doesn't matter.
*
* @param count
* @param collections
* @return
*/
protected List<String> writeJsonDocs(int count, String... collections) {
DocumentMetadataHandle metadata = new DocumentMetadataHandle()
.withCollections(collections)
.withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE);
JSONDocumentManager mgr = client.newJSONDocumentManager();
DocumentWriteSet set = mgr.newWriteSet();
List<String> uris = new ArrayList<>();
for (int i = 1; i <= count; i++) {
String uri = "/test/" + i + ".json";
uris.add(uri);
set.add(uri, metadata, new JacksonHandle(
objectMapper.createObjectNode().put("test", i)
));
}
mgr.write(set);
return uris;
}

protected static void loadFileToDB(DatabaseClient client, String filename, String uri, String type, String[] collections) throws IOException, ParserConfigurationException,
SAXException {
// create doc manager
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.marklogic.client.fastfunctest.datamovement;

import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.ExportListener;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.fastfunctest.AbstractFunctionalTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class ExportBatchesTest extends AbstractFunctionalTest {

List<String> testDocUris;

@BeforeEach
void beforeEach() {
deleteDocuments(client);
testDocUris = writeJsonDocs(20, "ExportBatchesTest");
}

@Test
void simplePageConsumer() {
AtomicInteger pageCount = new AtomicInteger();
AtomicInteger docCount = new AtomicInteger();

ExportListener listener = new ExportListener().onDocumentPageReady(documentPage -> {
pageCount.incrementAndGet();
while (documentPage.hasContent()) {
documentPage.next();
docCount.incrementAndGet();
}
});

runJob(listener, 5);
assertEquals(4, pageCount.get(), "Should get 4 pages of 5 docs each");
assertEquals(20, docCount.get());
}

@Test
void consumerThrowsException() {
AtomicInteger failureCount = new AtomicInteger();

ExportListener listener = new ExportListener()
.onDocumentPageReady(documentPage -> {
throw new RuntimeException("Intentional error");
})
.onFailure(((batch, throwable) -> failureCount.incrementAndGet()));

runJob(listener, 5);
assertEquals(4, failureCount.get(), "The failure listener should have been invoked once for each batch, " +
"and batch should have failed.");
}

@Test
void documentListenerAlreadySet() {
ExportListener listener = new ExportListener().onDocumentReady(doc -> doc.getUri());
IllegalStateException ex = assertThrows(IllegalStateException.class,
() -> listener.onDocumentPageReady(page -> page.next()));
assertEquals("Cannot call onDocumentPageReady if a listener has already been added via onDocumentReady",
ex.getMessage(), "Both listeners cannot be set because the DocumentPage can only be iterated through once.");
}

@Test
void documentPageListenerAlreadySet() {
ExportListener listener = new ExportListener().onDocumentPageReady(page -> page.next());
IllegalStateException ex = assertThrows(IllegalStateException.class,
() -> listener.onDocumentReady(doc -> doc.getUri()));
assertEquals("Cannot call onDocumentReady if a listener has already been set via onDocumentPageReady",
ex.getMessage(), "Both listeners cannot be set because the DocumentPage can only be iterated through once.");
}

private void runJob(ExportListener listener, int batchSize) {
DataMovementManager dmm = client.newDataMovementManager();
QueryBatcher qb = dmm.newQueryBatcher(testDocUris.iterator())
.withBatchSize(batchSize)
.onUrisReady(listener);
dmm.startJob(qb);
qb.awaitCompletion();
dmm.stopJob(qb);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public class ExportListener implements QueryBatchListener {
private QueryManager.QueryView view;
private Set<DocumentManager.Metadata> categories = new HashSet<>();
private Format nonDocumentFormat;
private List<Consumer<DocumentRecord>> exportListeners = new ArrayList<>();
private List<Consumer<DocumentRecord>> documentListeners = new ArrayList<>();
private Consumer<DocumentPage> documentPageListener;
private boolean consistentSnapshot = false;
private List<BatchFailureListener<Batch<String>>> failureListeners = new ArrayList<>();
private List<BatchFailureListener<QueryBatch>> queryBatchFailureListeners = new ArrayList<>();
Expand Down Expand Up @@ -127,15 +128,19 @@ public void initializeListener(QueryBatcher queryBatcher) {
@Override
public void processEvent(QueryBatch batch) {
try ( DocumentPage docs = getDocs(batch) ) {
while ( docs.hasNext() ) {
for ( Consumer<DocumentRecord> listener : exportListeners ) {
try {
listener.accept(docs.next());
} catch (Throwable t) {
logger.error("Exception thrown by an onDocumentReady listener", t);
}
}
}
if (documentPageListener != null) {
documentPageListener.accept(docs);
} else {
while ( docs.hasNext() ) {
for ( Consumer<DocumentRecord> listener : documentListeners) {
try {
listener.accept(docs.next());
} catch (Throwable t) {
logger.error("Exception thrown by an onDocumentReady listener", t);
}
}
}
}
} catch (Throwable t) {
for ( BatchFailureListener<Batch<String>> listener : failureListeners ) {
try {
Expand Down Expand Up @@ -232,9 +237,7 @@ public ExportListener withTransform(ServerTransform transform) {
* file system, a REST service, or any target supported by Java. If further
* information is required about the document beyond what DocumentRecord can
* provide, register a listener with {@link QueryBatcher#onUrisReady
* QueryBatcher.onUrisReady} instead. You do not need to call close() on
* each DocumentRecord because the ExportListener will call close for you on
* the entire DocumentPage.
* QueryBatcher.onUrisReady} instead.
*
* @param listener the code which will process each document
* @return this instance for method chaining
Expand All @@ -243,10 +246,32 @@ public ExportListener withTransform(ServerTransform transform) {
* @see DocumentRecord
*/
public ExportListener onDocumentReady(Consumer<DocumentRecord> listener) {
exportListeners.add(listener);
return this;
if (this.documentPageListener != null) {
throw new IllegalStateException("Cannot call onDocumentReady if a listener has already been set via onDocumentPageReady");
}
documentListeners.add(listener);
return this;
}

/**
* Sets a listener to process a page of retrieved documents. Useful for when documents should be written to an
* external system where it's more efficient to make batched writes to that system. Note that {@code close()} does
* need to be invoked on the {@code DocumentPage}; this class will handle that.
*
* @param listener the code which will process each page of documents
* @return this instance for method chaining
* @see Consumer
* @see DocumentPage
* @since 6.2.0
*/
public ExportListener onDocumentPageReady(Consumer<DocumentPage> listener) {
if (this.documentListeners != null && !this.documentListeners.isEmpty()) {
throw new IllegalStateException("Cannot call onDocumentPageReady if a listener has already been added via onDocumentReady");
}
this.documentPageListener = listener;
return this;
}

/**
* When a batch fails or a callback throws an Exception, run this listener
* code. Multiple listeners can be registered with this method.
Expand Down