Skip to content

Commit

Permalink
check submission default param for window, IBMStreams#2494
Browse files Browse the repository at this point in the history
  • Loading branch information
markheger committed Jul 7, 2020
1 parent ab7081e commit 0571b5a
Showing 1 changed file with 14 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.ibm.streamsx.topology.function.BiFunction;
import com.ibm.streamsx.topology.function.Function;
import com.ibm.streamsx.topology.function.Supplier;
import com.ibm.streamsx.topology.generator.port.PortProperties;
import com.ibm.streamsx.topology.internal.functional.FunctionalOpProperties;
import com.ibm.streamsx.topology.internal.functional.SubmissionParameter;
import com.ibm.streamsx.topology.internal.logic.LogicUtils;
import com.ibm.streamsx.topology.internal.logic.ObjectUtils;
import com.ibm.streamsx.topology.internal.messages.Messages;
Expand Down Expand Up @@ -47,6 +49,18 @@ private WindowDefinition(TStream<T> stream, String policy, long config, TimeUnit

assert (timeUnit == null && !policy.equals(BInputPort.Window.TIME_POLICY)) ||
(timeUnit != null && policy.equals(BInputPort.Window.TIME_POLICY));

if (this.supplierConfig != null) {
Integer configVal;
if (this.supplierConfig.get() != null)
configVal = this.supplierConfig.get();
else if (this.supplierConfig instanceof SubmissionParameter<?>)
configVal = ((SubmissionParameter<Integer>)this.supplierConfig).getDefaultValue();
else
throw new IllegalArgumentException(Messages.getString("CORE_ILLEGAL_WINDOW_VALUE"));
if (configVal != null && configVal <= 0)
throw new IllegalArgumentException(Messages.getString("CORE_ILLEGAL_WINDOW_VALUE"));
}
}

public WindowDefinition(TStream<T> stream, int count) {
Expand Down

0 comments on commit 0571b5a

Please sign in to comment.