From d965b13e7814112fcbab7b8bf0cdcd29fd9489d9 Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Thu, 30 Mar 2023 06:45:17 -0400 Subject: [PATCH] DEVEXP-363 ExportListener now provides access to DocumentPage --- .../fastfunctest/AbstractFunctionalTest.java | 31 +++++++ .../datamovement/ExportBatchesTest.java | 86 +++++++++++++++++++ .../client/datamovement/ExportListener.java | 55 ++++++++---- 3 files changed, 157 insertions(+), 15 deletions(-) create mode 100644 marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/datamovement/ExportBatchesTest.java diff --git a/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java index 28c62a833..a92f004ca 100644 --- a/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java +++ b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/AbstractFunctionalTest.java @@ -8,9 +8,11 @@ import com.marklogic.client.MarkLogicVersion; import com.marklogic.client.document.DocumentManager; import com.marklogic.client.document.DocumentWriteSet; +import com.marklogic.client.document.JSONDocumentManager; import com.marklogic.client.functionaltest.BasicJavaClientREST; import com.marklogic.client.io.DocumentMetadataHandle; import com.marklogic.client.io.FileHandle; +import com.marklogic.client.io.JacksonHandle; import com.marklogic.mgmt.resource.databases.DatabaseManager; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -20,6 +22,8 @@ import javax.xml.parsers.ParserConfigurationException; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.stream.Stream; /** @@ -31,6 +35,8 @@ public abstract class AbstractFunctionalTest extends BasicJavaClientREST { protected final static String DB_NAME = "java-functest"; + protected final static ObjectMapper objectMapper = new ObjectMapper(); + protected static DatabaseClient client; protected static DatabaseClient schemasClient; protected static DatabaseClient adminModulesClient; @@ -79,6 +85,31 @@ public static void classTearDown() { schemasClient.release(); } + /** + * Convenience method for easily writing some JSON docs where the content of the docs doesn't matter. + * + * @param count + * @param collections + * @return + */ + protected List writeJsonDocs(int count, String... collections) { + DocumentMetadataHandle metadata = new DocumentMetadataHandle() + .withCollections(collections) + .withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE); + JSONDocumentManager mgr = client.newJSONDocumentManager(); + DocumentWriteSet set = mgr.newWriteSet(); + List uris = new ArrayList<>(); + for (int i = 1; i <= count; i++) { + String uri = "/test/" + i + ".json"; + uris.add(uri); + set.add(uri, metadata, new JacksonHandle( + objectMapper.createObjectNode().put("test", i) + )); + } + mgr.write(set); + return uris; + } + protected static void loadFileToDB(DatabaseClient client, String filename, String uri, String type, String[] collections) throws IOException, ParserConfigurationException, SAXException { // create doc manager diff --git a/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/datamovement/ExportBatchesTest.java b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/datamovement/ExportBatchesTest.java new file mode 100644 index 000000000..5fa80ac37 --- /dev/null +++ b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/fastfunctest/datamovement/ExportBatchesTest.java @@ -0,0 +1,86 @@ +package com.marklogic.client.fastfunctest.datamovement; + +import com.marklogic.client.datamovement.DataMovementManager; +import com.marklogic.client.datamovement.ExportListener; +import com.marklogic.client.datamovement.QueryBatcher; +import com.marklogic.client.fastfunctest.AbstractFunctionalTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ExportBatchesTest extends AbstractFunctionalTest { + + List testDocUris; + + @BeforeEach + void beforeEach() { + deleteDocuments(client); + testDocUris = writeJsonDocs(20, "ExportBatchesTest"); + } + + @Test + void simplePageConsumer() { + AtomicInteger pageCount = new AtomicInteger(); + AtomicInteger docCount = new AtomicInteger(); + + ExportListener listener = new ExportListener().onDocumentPageReady(documentPage -> { + pageCount.incrementAndGet(); + while (documentPage.hasContent()) { + documentPage.next(); + docCount.incrementAndGet(); + } + }); + + runJob(listener, 5); + assertEquals(4, pageCount.get(), "Should get 4 pages of 5 docs each"); + assertEquals(20, docCount.get()); + } + + @Test + void consumerThrowsException() { + AtomicInteger failureCount = new AtomicInteger(); + + ExportListener listener = new ExportListener() + .onDocumentPageReady(documentPage -> { + throw new RuntimeException("Intentional error"); + }) + .onFailure(((batch, throwable) -> failureCount.incrementAndGet())); + + runJob(listener, 5); + assertEquals(4, failureCount.get(), "The failure listener should have been invoked once for each batch, " + + "and batch should have failed."); + } + + @Test + void documentListenerAlreadySet() { + ExportListener listener = new ExportListener().onDocumentReady(doc -> doc.getUri()); + IllegalStateException ex = assertThrows(IllegalStateException.class, + () -> listener.onDocumentPageReady(page -> page.next())); + assertEquals("Cannot call onDocumentPageReady if a listener has already been added via onDocumentReady", + ex.getMessage(), "Both listeners cannot be set because the DocumentPage can only be iterated through once."); + } + + @Test + void documentPageListenerAlreadySet() { + ExportListener listener = new ExportListener().onDocumentPageReady(page -> page.next()); + IllegalStateException ex = assertThrows(IllegalStateException.class, + () -> listener.onDocumentReady(doc -> doc.getUri())); + assertEquals("Cannot call onDocumentReady if a listener has already been set via onDocumentPageReady", + ex.getMessage(), "Both listeners cannot be set because the DocumentPage can only be iterated through once."); + } + + private void runJob(ExportListener listener, int batchSize) { + DataMovementManager dmm = client.newDataMovementManager(); + QueryBatcher qb = dmm.newQueryBatcher(testDocUris.iterator()) + .withBatchSize(batchSize) + .onUrisReady(listener); + dmm.startJob(qb); + qb.awaitCompletion(); + dmm.stopJob(qb); + } +} diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/ExportListener.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/ExportListener.java index d38ed5a9c..d336d3bd9 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/ExportListener.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/ExportListener.java @@ -74,7 +74,8 @@ public class ExportListener implements QueryBatchListener { private QueryManager.QueryView view; private Set categories = new HashSet<>(); private Format nonDocumentFormat; - private List> exportListeners = new ArrayList<>(); + private List> documentListeners = new ArrayList<>(); + private Consumer documentPageListener; private boolean consistentSnapshot = false; private List>> failureListeners = new ArrayList<>(); private List> queryBatchFailureListeners = new ArrayList<>(); @@ -127,15 +128,19 @@ public void initializeListener(QueryBatcher queryBatcher) { @Override public void processEvent(QueryBatch batch) { try ( DocumentPage docs = getDocs(batch) ) { - while ( docs.hasNext() ) { - for ( Consumer listener : exportListeners ) { - try { - listener.accept(docs.next()); - } catch (Throwable t) { - logger.error("Exception thrown by an onDocumentReady listener", t); - } - } - } + if (documentPageListener != null) { + documentPageListener.accept(docs); + } else { + while ( docs.hasNext() ) { + for ( Consumer listener : documentListeners) { + try { + listener.accept(docs.next()); + } catch (Throwable t) { + logger.error("Exception thrown by an onDocumentReady listener", t); + } + } + } + } } catch (Throwable t) { for ( BatchFailureListener> listener : failureListeners ) { try { @@ -232,9 +237,7 @@ public ExportListener withTransform(ServerTransform transform) { * file system, a REST service, or any target supported by Java. If further * information is required about the document beyond what DocumentRecord can * provide, register a listener with {@link QueryBatcher#onUrisReady - * QueryBatcher.onUrisReady} instead. You do not need to call close() on - * each DocumentRecord because the ExportListener will call close for you on - * the entire DocumentPage. + * QueryBatcher.onUrisReady} instead. * * @param listener the code which will process each document * @return this instance for method chaining @@ -243,10 +246,32 @@ public ExportListener withTransform(ServerTransform transform) { * @see DocumentRecord */ public ExportListener onDocumentReady(Consumer listener) { - exportListeners.add(listener); - return this; + if (this.documentPageListener != null) { + throw new IllegalStateException("Cannot call onDocumentReady if a listener has already been set via onDocumentPageReady"); + } + documentListeners.add(listener); + return this; } + /** + * Sets a listener to process a page of retrieved documents. Useful for when documents should be written to an + * external system where it's more efficient to make batched writes to that system. Note that {@code close()} does + * need to be invoked on the {@code DocumentPage}; this class will handle that. + * + * @param listener the code which will process each page of documents + * @return this instance for method chaining + * @see Consumer + * @see DocumentPage + * @since 6.2.0 + */ + public ExportListener onDocumentPageReady(Consumer listener) { + if (this.documentListeners != null && !this.documentListeners.isEmpty()) { + throw new IllegalStateException("Cannot call onDocumentPageReady if a listener has already been added via onDocumentReady"); + } + this.documentPageListener = listener; + return this; + } + /** * When a batch fails or a callback throws an Exception, run this listener * code. Multiple listeners can be registered with this method.