Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.marklogic.client.io.marker.StructureReadHandle;
import com.marklogic.client.row.RawPlanDefinition;
import com.marklogic.client.row.RawQueryDSLPlan;
import com.marklogic.client.row.RawSQLPlan;
import com.marklogic.client.row.RowManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -57,23 +56,21 @@ class RowBatcherImpl<T> extends BatcherImpl implements RowBatcher<T> {
private final AtomicLong failedBatches = new AtomicLong(0);
private final AtomicInteger runningThreads = new AtomicInteger(0);
private RowBatchFailureListener[] failureListeners;
private RowBatchSuccessListener[] sucessListeners;

private String schemaName;
private String viewName;
private RowBatchSuccessListener[] successListeners;

private RawPlanDefinition pagedPlan;
private long rowCount = 0;

private RowManager defaultRowManager;
private ContentHandle<T> rowsHandle;
private Class<T> rowsClass;
private HostInfo[] hostInfos;

private boolean consistentSnapshot = false;
private final AtomicLong serverTimestamp = new AtomicLong(-1);

RowBatcherImpl(DataMovementManagerImpl moveMgr, ContentHandle<T> rowsHandle) {
private final ContentHandle<T> rowsHandle;
private final Class<T> rowsClass;
private final RowManager defaultRowManager;

RowBatcherImpl(DataMovementManagerImpl moveMgr, ContentHandle<T> rowsHandle) {
super(moveMgr);
if (rowsHandle == null)
throw new IllegalArgumentException("Cannot create RowBatcher with null rows manager");
Expand Down Expand Up @@ -124,22 +121,32 @@ public RowBatcher<T> withBatchView(RawQueryDSLPlan viewPlan) {
analyzePlan(viewPlan.getHandle());
return this;
}
private void analyzePlan(AbstractWriteHandle initialPlan) {

/**
* Calls the MarkLogic internal/viewinfo endpoint to obtain two critical items - the estimate of matching rows,
* and a modified version of the user's plan that includes "lower bounds" and "upper bounds" parameters. The
* estimate of matching rows allows for partitions to be defined based on the user-provided thread count.
* The user's modified plan is then run with a lower/upper bounds row ID value based on the calculated partitions.
*
* @param userPlan
*/
private void analyzePlan(AbstractWriteHandle userPlan) {
requireNotStarted("Must specify batch view before starting job");

DatabaseClientImpl client = (DatabaseClientImpl) getPrimaryClient();
JsonNode viewInfo = client.getServices().postResource(
null, "internal/viewinfo", null, null, initialPlan, new JacksonHandle()
null, "internal/viewinfo", null, null, userPlan, new JacksonHandle()
).get();
// System.out.println(viewInfo.toPrettyString());

JsonNode schemaNode = viewInfo.get("schemaName");
this.schemaName = (schemaNode != null) ? schemaNode.asText(null) : null;
this.viewName = viewInfo.get("viewName").asText(null);
this.rowCount = viewInfo.get("rowCount").asLong(0);
this.pagedPlan = getRowManager().newRawPlanDefinition(new JacksonHandle(viewInfo.get("modifiedPlan")));

JsonNode schemaNode = viewInfo.get("schemaName");
logger.info("plan analysis schema name: {}, view name: {}, row estimate: {}",
this.schemaName, this.viewName, this.rowCount);
(schemaNode != null) ? schemaNode.asText(null) : null,
viewInfo.get("viewName").asText(null),
this.rowCount
);
}

@Override
Expand All @@ -159,12 +166,12 @@ public RowBatcher<T> withThreadCount(int threadCount) {
public RowBatcher<T> onSuccess(RowBatchSuccessListener listener) {
requireNotStarted("Must set success listener before starting job");
if (listener == null) {
sucessListeners = null;
} else if (sucessListeners == null || sucessListeners.length == 0) {
sucessListeners = new RowBatchSuccessListener[]{listener};
successListeners = null;
} else if (successListeners == null || successListeners.length == 0) {
successListeners = new RowBatchSuccessListener[]{listener};
} else {
sucessListeners = Arrays.copyOf(sucessListeners, sucessListeners.length + 1);
sucessListeners[sucessListeners.length - 1] = listener;
successListeners = Arrays.copyOf(successListeners, successListeners.length + 1);
successListeners[successListeners.length - 1] = listener;
}
return this;
}
Expand Down Expand Up @@ -207,7 +214,7 @@ public RowBatcher<T> withConsistentSnapshot() {

@Override
public RowBatchSuccessListener[] getSuccessListeners() {
return sucessListeners;
return successListeners;
}
@Override
public RowBatchFailureListener[] getFailureListeners() {
Expand All @@ -216,7 +223,7 @@ public RowBatchFailureListener[] getFailureListeners() {
@Override
public void setSuccessListeners(RowBatchSuccessListener... listeners) {
requireNotStarted("Must set success listeners before starting job");
this.sucessListeners = listeners;
this.successListeners = listeners;
}
@Override
public void setFailureListeners(RowBatchFailureListener... listeners) {
Expand All @@ -228,10 +235,10 @@ private void initRequestEvent(RowBatchEventImpl event) {
event.withJobTicket(getJobTicket());
}
private void notifySuccess(RowBatchSuccessListener.RowBatchResponseEvent<T> event) {
if (sucessListeners == null || sucessListeners.length == 0) return;
for (RowBatchSuccessListener sucessListener: sucessListeners) {
if (successListeners == null || successListeners.length == 0) return;
for (RowBatchSuccessListener successListener: successListeners) {
try {
sucessListener.processEvent(event);
successListener.processEvent(event);
} catch(Throwable e) {
logger.info("error in success listener: {}", e.toString());
}
Expand Down Expand Up @@ -354,7 +361,7 @@ public synchronized void start(JobTicket ticket) {
if (this.pagedPlan == null)
throw new IllegalStateException("Plan must be supplied before starting the job");

if (sucessListeners == null || sucessListeners.length == 0)
if (successListeners == null || successListeners.length == 0)
throw new IllegalStateException("No listener for rows");

if (failureListeners == null || failureListeners.length == 0) {
Expand Down