Skip to content

Commit

Permalink
Merge pull request apache#176 from tgroh/backport_106
Browse files Browse the repository at this point in the history
Explicitly track the Source a ReadEvaluator is using
  • Loading branch information
davorbonaci committed Apr 4, 2016
2 parents 1a457b3 + 5efb3ef commit 8e3fd70
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
Expand Up @@ -18,7 +18,6 @@
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
import com.google.cloud.dataflow.sdk.io.Read.Bounded;
import com.google.cloud.dataflow.sdk.io.Source.Reader;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
Expand Down Expand Up @@ -60,8 +59,7 @@ public <InputT> TransformEvaluator<InputT> forApplication(

private <OutputT> TransformEvaluator<?> getTransformEvaluator(
final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
final InProcessEvaluationContext evaluationContext)
throws IOException {
final InProcessEvaluationContext evaluationContext) {
BoundedReadEvaluator<?> evaluator =
getTransformEvaluatorQueue(transform, evaluationContext).poll();
if (evaluator == null) {
Expand Down Expand Up @@ -91,8 +89,9 @@ private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueu
if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
// If no queue existed in the evaluators, add an evaluator to initialize the evaluator
// factory for this transform
BoundedSource<OutputT> source = transform.getTransform().getSource();
BoundedReadEvaluator<OutputT> evaluator =
new BoundedReadEvaluator<OutputT>(transform, evaluationContext);
new BoundedReadEvaluator<OutputT>(transform, evaluationContext, source);
evaluatorQueue.offer(evaluator);
} else {
// otherwise return the existing Queue that arrived before us
Expand All @@ -114,26 +113,29 @@ private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueu
private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
private final InProcessEvaluationContext evaluationContext;
private boolean contentsRemaining;
/**
* The source being read from by this {@link BoundedReadEvaluator}. This may not be the same
* as the source derived from {@link #transform} due to splitting.
*/
private BoundedSource<OutputT> source;

public BoundedReadEvaluator(
AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
InProcessEvaluationContext evaluationContext) {
InProcessEvaluationContext evaluationContext,
BoundedSource<OutputT> source) {
this.transform = transform;
this.evaluationContext = evaluationContext;
this.source = source;
}

@Override
public void processElement(WindowedValue<Object> element) {}

@Override
public InProcessTransformResult finishBundle() throws IOException {
try (final Reader<OutputT> reader =
transform
.getTransform()
.getSource()
.createReader(evaluationContext.getPipelineOptions());) {
contentsRemaining = reader.start();
try (final BoundedReader<OutputT> reader =
source.createReader(evaluationContext.getPipelineOptions());) {
boolean contentsRemaining = reader.start();
UncommittedBundle<OutputT> output =
evaluationContext.createRootBundle(transform.getOutput());
while (contentsRemaining) {
Expand Down
Expand Up @@ -88,8 +88,10 @@ private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQu
if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
// If no queue existed in the evaluators, add an evaluator to initialize the evaluator
// factory for this transform
UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
UnboundedReadEvaluator<OutputT> evaluator =
new UnboundedReadEvaluator<OutputT>(transform, evaluationContext, evaluatorQueue);
new UnboundedReadEvaluator<OutputT>(
transform, evaluationContext, source, evaluatorQueue);
evaluatorQueue.offer(evaluator);
} else {
// otherwise return the existing Queue that arrived before us
Expand All @@ -114,15 +116,22 @@ private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluat
private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
private final InProcessEvaluationContext evaluationContext;
private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
/**
* The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
* source as derived from {@link #transform} due to splitting.
*/
private final UnboundedSource<OutputT, ?> source;
private CheckpointMark checkpointMark;

public UnboundedReadEvaluator(
AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
InProcessEvaluationContext evaluationContext,
UnboundedSource<OutputT, ?> source,
Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
this.transform = transform;
this.evaluationContext = evaluationContext;
this.evaluatorQueue = evaluatorQueue;
this.source = source;
this.checkpointMark = null;
}

Expand All @@ -133,8 +142,7 @@ public void processElement(WindowedValue<Object> element) {}
public InProcessTransformResult finishBundle() throws IOException {
UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
try (UnboundedReader<OutputT> reader =
createReader(
transform.getTransform().getSource(), evaluationContext.getPipelineOptions());) {
createReader(source, evaluationContext.getPipelineOptions());) {
int numElements = 0;
if (reader.start()) {
do {
Expand Down

0 comments on commit 8e3fd70

Please sign in to comment.