From 5efb3ef6cf7d4f5efea348cd151a23d58f4a0a23 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 31 Mar 2016 10:40:37 -0700 Subject: [PATCH] Explicitly track the Source a ReadEvaluator is using This permits use of sources that are not the initial source used in the transform. BoundedSource#splitIntoBundles and UnboundedSource#generateInitialSplits generate multiple source objects for the same transform in order to permit parallelism. Also some cleanups: - Use proper scoping, interfaces in BoundedReadEvaluator - Use BoundedReader instead of Reader - contentsRemaining should be method-scoped not instance-scoped --- .../BoundedReadEvaluatorFactory.java | 26 ++++++++++--------- .../UnboundedReadEvaluatorFactory.java | 14 +++++++--- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java index eaea3ed293d2c..a69db5d559ec9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java @@ -18,7 +18,6 @@ import com.google.cloud.dataflow.sdk.io.BoundedSource; import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader; import com.google.cloud.dataflow.sdk.io.Read.Bounded; -import com.google.cloud.dataflow.sdk.io.Source.Reader; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; @@ -60,8 +59,7 @@ public TransformEvaluator forApplication( private TransformEvaluator getTransformEvaluator( final AppliedPTransform, Bounded> transform, - final InProcessEvaluationContext evaluationContext) - throws IOException { + final InProcessEvaluationContext evaluationContext) { BoundedReadEvaluator evaluator = getTransformEvaluatorQueue(transform, evaluationContext).poll(); if (evaluator == null) { @@ -91,8 +89,9 @@ private Queue> getTransformEvaluatorQueu if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { // If no queue existed in the evaluators, add an evaluator to initialize the evaluator // factory for this transform + BoundedSource source = transform.getTransform().getSource(); BoundedReadEvaluator evaluator = - new BoundedReadEvaluator(transform, evaluationContext); + new BoundedReadEvaluator(transform, evaluationContext, source); evaluatorQueue.offer(evaluator); } else { // otherwise return the existing Queue that arrived before us @@ -114,13 +113,19 @@ private Queue> getTransformEvaluatorQueu private static class BoundedReadEvaluator implements TransformEvaluator { private final AppliedPTransform, Bounded> transform; private final InProcessEvaluationContext evaluationContext; - private boolean contentsRemaining; + /** + * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same + * as the source derived from {@link #transform} due to splitting. + */ + private BoundedSource source; public BoundedReadEvaluator( AppliedPTransform, Bounded> transform, - InProcessEvaluationContext evaluationContext) { + InProcessEvaluationContext evaluationContext, + BoundedSource source) { this.transform = transform; this.evaluationContext = evaluationContext; + this.source = source; } @Override @@ -128,12 +133,9 @@ public void processElement(WindowedValue element) {} @Override public InProcessTransformResult finishBundle() throws IOException { - try (final Reader reader = - transform - .getTransform() - .getSource() - .createReader(evaluationContext.getPipelineOptions());) { - contentsRemaining = reader.start(); + try (final BoundedReader reader = + source.createReader(evaluationContext.getPipelineOptions());) { + boolean contentsRemaining = reader.start(); UncommittedBundle output = evaluationContext.createRootBundle(transform.getOutput()); while (contentsRemaining) { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java index 549afabcc0269..cba9007cc60db 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java @@ -88,8 +88,10 @@ private Queue> getTransformEvaluatorQu if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { // If no queue existed in the evaluators, add an evaluator to initialize the evaluator // factory for this transform + UnboundedSource source = transform.getTransform().getSource(); UnboundedReadEvaluator evaluator = - new UnboundedReadEvaluator(transform, evaluationContext, evaluatorQueue); + new UnboundedReadEvaluator( + transform, evaluationContext, source, evaluatorQueue); evaluatorQueue.offer(evaluator); } else { // otherwise return the existing Queue that arrived before us @@ -114,15 +116,22 @@ private static class UnboundedReadEvaluator implements TransformEvaluat private final AppliedPTransform, Unbounded> transform; private final InProcessEvaluationContext evaluationContext; private final Queue> evaluatorQueue; + /** + * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same + * source as derived from {@link #transform} due to splitting. + */ + private final UnboundedSource source; private CheckpointMark checkpointMark; public UnboundedReadEvaluator( AppliedPTransform, Unbounded> transform, InProcessEvaluationContext evaluationContext, + UnboundedSource source, Queue> evaluatorQueue) { this.transform = transform; this.evaluationContext = evaluationContext; this.evaluatorQueue = evaluatorQueue; + this.source = source; this.checkpointMark = null; } @@ -133,8 +142,7 @@ public void processElement(WindowedValue element) {} public InProcessTransformResult finishBundle() throws IOException { UncommittedBundle output = evaluationContext.createRootBundle(transform.getOutput()); try (UnboundedReader reader = - createReader( - transform.getTransform().getSource(), evaluationContext.getPipelineOptions());) { + createReader(source, evaluationContext.getPipelineOptions());) { int numElements = 0; if (reader.start()) { do {