diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/RowBatcherImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/RowBatcherImpl.java index 45de18e06..46bc88739 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/RowBatcherImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/RowBatcherImpl.java @@ -31,7 +31,6 @@ import com.marklogic.client.io.marker.StructureReadHandle; import com.marklogic.client.row.RawPlanDefinition; import com.marklogic.client.row.RawQueryDSLPlan; -import com.marklogic.client.row.RawSQLPlan; import com.marklogic.client.row.RowManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,23 +56,21 @@ class RowBatcherImpl extends BatcherImpl implements RowBatcher { private final AtomicLong failedBatches = new AtomicLong(0); private final AtomicInteger runningThreads = new AtomicInteger(0); private RowBatchFailureListener[] failureListeners; - private RowBatchSuccessListener[] sucessListeners; - - private String schemaName; - private String viewName; + private RowBatchSuccessListener[] successListeners; private RawPlanDefinition pagedPlan; private long rowCount = 0; - private RowManager defaultRowManager; - private ContentHandle rowsHandle; - private Class rowsClass; private HostInfo[] hostInfos; private boolean consistentSnapshot = false; private final AtomicLong serverTimestamp = new AtomicLong(-1); - RowBatcherImpl(DataMovementManagerImpl moveMgr, ContentHandle rowsHandle) { + private final ContentHandle rowsHandle; + private final Class rowsClass; + private final RowManager defaultRowManager; + + RowBatcherImpl(DataMovementManagerImpl moveMgr, ContentHandle rowsHandle) { super(moveMgr); if (rowsHandle == null) throw new IllegalArgumentException("Cannot create RowBatcher with null rows manager"); @@ -124,22 +121,32 @@ public RowBatcher withBatchView(RawQueryDSLPlan viewPlan) { analyzePlan(viewPlan.getHandle()); return this; } - private void analyzePlan(AbstractWriteHandle initialPlan) { + + /** + * Calls the MarkLogic internal/viewinfo endpoint to obtain two critical items - the estimate of matching rows, + * and a modified version of the user's plan that includes "lower bounds" and "upper bounds" parameters. The + * estimate of matching rows allows for partitions to be defined based on the user-provided thread count. + * The user's modified plan is then run with a lower/upper bounds row ID value based on the calculated partitions. + * + * @param userPlan + */ + private void analyzePlan(AbstractWriteHandle userPlan) { requireNotStarted("Must specify batch view before starting job"); DatabaseClientImpl client = (DatabaseClientImpl) getPrimaryClient(); JsonNode viewInfo = client.getServices().postResource( - null, "internal/viewinfo", null, null, initialPlan, new JacksonHandle() + null, "internal/viewinfo", null, null, userPlan, new JacksonHandle() ).get(); - // System.out.println(viewInfo.toPrettyString()); - JsonNode schemaNode = viewInfo.get("schemaName"); - this.schemaName = (schemaNode != null) ? schemaNode.asText(null) : null; - this.viewName = viewInfo.get("viewName").asText(null); this.rowCount = viewInfo.get("rowCount").asLong(0); this.pagedPlan = getRowManager().newRawPlanDefinition(new JacksonHandle(viewInfo.get("modifiedPlan"))); + + JsonNode schemaNode = viewInfo.get("schemaName"); logger.info("plan analysis schema name: {}, view name: {}, row estimate: {}", - this.schemaName, this.viewName, this.rowCount); + (schemaNode != null) ? schemaNode.asText(null) : null, + viewInfo.get("viewName").asText(null), + this.rowCount + ); } @Override @@ -159,12 +166,12 @@ public RowBatcher withThreadCount(int threadCount) { public RowBatcher onSuccess(RowBatchSuccessListener listener) { requireNotStarted("Must set success listener before starting job"); if (listener == null) { - sucessListeners = null; - } else if (sucessListeners == null || sucessListeners.length == 0) { - sucessListeners = new RowBatchSuccessListener[]{listener}; + successListeners = null; + } else if (successListeners == null || successListeners.length == 0) { + successListeners = new RowBatchSuccessListener[]{listener}; } else { - sucessListeners = Arrays.copyOf(sucessListeners, sucessListeners.length + 1); - sucessListeners[sucessListeners.length - 1] = listener; + successListeners = Arrays.copyOf(successListeners, successListeners.length + 1); + successListeners[successListeners.length - 1] = listener; } return this; } @@ -207,7 +214,7 @@ public RowBatcher withConsistentSnapshot() { @Override public RowBatchSuccessListener[] getSuccessListeners() { - return sucessListeners; + return successListeners; } @Override public RowBatchFailureListener[] getFailureListeners() { @@ -216,7 +223,7 @@ public RowBatchFailureListener[] getFailureListeners() { @Override public void setSuccessListeners(RowBatchSuccessListener... listeners) { requireNotStarted("Must set success listeners before starting job"); - this.sucessListeners = listeners; + this.successListeners = listeners; } @Override public void setFailureListeners(RowBatchFailureListener... listeners) { @@ -228,10 +235,10 @@ private void initRequestEvent(RowBatchEventImpl event) { event.withJobTicket(getJobTicket()); } private void notifySuccess(RowBatchSuccessListener.RowBatchResponseEvent event) { - if (sucessListeners == null || sucessListeners.length == 0) return; - for (RowBatchSuccessListener sucessListener: sucessListeners) { + if (successListeners == null || successListeners.length == 0) return; + for (RowBatchSuccessListener successListener: successListeners) { try { - sucessListener.processEvent(event); + successListener.processEvent(event); } catch(Throwable e) { logger.info("error in success listener: {}", e.toString()); } @@ -354,7 +361,7 @@ public synchronized void start(JobTicket ticket) { if (this.pagedPlan == null) throw new IllegalStateException("Plan must be supplied before starting the job"); - if (sucessListeners == null || sucessListeners.length == 0) + if (successListeners == null || successListeners.length == 0) throw new IllegalStateException("No listener for rows"); if (failureListeners == null || failureListeners.length == 0) {