Skip to content

Commit

Permalink
[FLINK-4609] [java-api] Remove redundant check for null in CrossOperator
Browse files Browse the repository at this point in the history
This closes apache#2490
  • Loading branch information
apivovarov authored and yuzhongliu committed Dec 5, 2016
1 parent 60b0736 commit 6551e4a
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import java.util.Arrays;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
Expand Down Expand Up @@ -124,21 +124,12 @@ protected org.apache.flink.api.common.operators.base.CrossOperatorBase<I1, I2, O
@Public
public static final class DefaultCross<I1, I2> extends CrossOperator<I1, I2, Tuple2<I1, I2>> {

private final DataSet<I1> input1;
private final DataSet<I2> input2;

public DefaultCross(DataSet<I1> input1, DataSet<I2> input2, CrossHint hint, String defaultName) {

super(input1, input2, new DefaultCrossFunction<I1, I2>(),
new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()),
hint, defaultName);

if (input1 == null || input2 == null) {
throw new NullPointerException();
}

this.input1 = input1;
this.input2 = input2;
new TupleTypeInfo<Tuple2<I1, I2>>(
Preconditions.checkNotNull(input1, "input1 is null").getType(),
Preconditions.checkNotNull(input2, "input2 is null").getType()),
hint, defaultName);
}

/**
Expand All @@ -155,9 +146,9 @@ public <R> CrossOperator<I1, I2, R> with(CrossFunction<I1, I2, R> function) {
if (function == null) {
throw new NullPointerException("Cross function must not be null.");
}
TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType(),
TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, getInput1().getType(), getInput2().getType(),
super.getDefaultName(), true);
return new CrossOperator<I1, I2, R>(input1, input2, clean(function), returnType,
return new CrossOperator<I1, I2, R>(getInput1(), getInput2(), clean(function), returnType,
getCrossHint(), Utils.getCallLocationName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.util.Preconditions;

/**
* Base class for operations that operates on two input data sets.
Expand All @@ -37,8 +38,8 @@ public abstract class TwoInputOperator<IN1, IN2, OUT, O extends TwoInputOperator


protected TwoInputOperator(DataSet<IN1> input1, DataSet<IN2> input2, TypeInformation<OUT> resultType) {
super(input1.getExecutionEnvironment(), resultType);

super(Preconditions.checkNotNull(input1, "input1 is null").getExecutionEnvironment(), resultType);
Preconditions.checkNotNull(input2, "input2 is null");
DataSet.checkSameExecutionContext(input1, input2);
this.input1 = input1;
this.input2 = input2;
Expand Down

0 comments on commit 6551e4a

Please sign in to comment.