Skip to content

Commit

Permalink
Add some template args and direct casts to help the eclipse compiler …
Browse files Browse the repository at this point in the history
…determine the right casts
  • Loading branch information
dkulp authored and lukecwik committed Jan 29, 2018
1 parent 41cc515 commit 2970b53
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 12 deletions.
Expand Up @@ -547,7 +547,7 @@ public final void injectElements(TimestampedValue<InputT>... values) throws Exce
Instant timestamp = input.getTimestamp();
Collection<W> windows =
windowFn.assignWindows(
new TestAssignContext<>(
new TestAssignContext<W>(
windowFn, value, timestamp, GlobalWindow.INSTANCE));
return WindowedValue.of(value, timestamp, windows, PaneInfo.NO_FIRING);
} catch (Exception e) {
Expand Down
Expand Up @@ -266,7 +266,7 @@ public final void injectElements(Collection<TimestampedValue<InputT>> values) th
Instant timestamp = input.getTimestamp();
Collection<W> assignedWindows =
windowFn.assignWindows(
new TestAssignContext<>(windowFn, value, timestamp, GlobalWindow.INSTANCE));
new TestAssignContext<W>(windowFn, value, timestamp, GlobalWindow.INSTANCE));

for (W window : assignedWindows) {
activeWindows.addActiveForTesting(window);
Expand Down
Expand Up @@ -99,7 +99,7 @@ public void processElement(WindowedValue<InputT> compressedElement) throws Excep
private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows(
WindowFn<InputT, W> windowFn, WindowedValue<InputT> element) throws Exception {
WindowFn<InputT, W>.AssignContext assignContext =
new DirectAssignContext<>(windowFn, element);
new DirectAssignContext<InputT, W>(windowFn, element);
Collection<? extends BoundedWindow> windows = windowFn.assignWindows(assignContext);
return windows;
}
Expand Down
Expand Up @@ -95,8 +95,8 @@ public void getRootTransformsContainsRootTransforms() {
assertThat(graph.getRootTransforms(), hasSize(3));
assertThat(
graph.getRootTransforms(),
Matchers.containsInAnyOrder(
graph.getProducer(created), graph.getProducer(counted), graph.getProducer(unCounted)));
Matchers.containsInAnyOrder(new Object[] {
graph.getProducer(created), graph.getProducer(counted), graph.getProducer(unCounted)}));
for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms()) {
// Root transforms will have no inputs
assertThat(root.getInputs().entrySet(), emptyIterable());
Expand All @@ -114,9 +114,10 @@ public void getRootTransformsContainsEmptyFlatten() {
empty.setCoder(StringUtf8Coder.of());
p.traverseTopologically(visitor);
DirectGraph graph = visitor.getGraph();
assertThat(graph.getRootTransforms(), Matchers.containsInAnyOrder(graph.getProducer(empty)));
assertThat(graph.getRootTransforms(),
Matchers.containsInAnyOrder(new Object[] {graph.getProducer(empty)}));
AppliedPTransform<?, ?, ?> onlyRoot = Iterables.getOnlyElement(graph.getRootTransforms());
assertThat(onlyRoot.getTransform(), Matchers.equalTo(flatten));
assertThat((Object) onlyRoot.getTransform(), Matchers.equalTo(flatten));
assertThat(onlyRoot.getInputs().entrySet(), emptyIterable());
assertThat(onlyRoot.getOutputs(), equalTo(empty.expand()));
}
Expand Down Expand Up @@ -148,9 +149,10 @@ public void processElement(DoFn<String, String>.ProcessContext c)

assertThat(
graph.getPerElementConsumers(created),
Matchers.containsInAnyOrder(transformedProducer, flattenedProducer));
Matchers.containsInAnyOrder(new Object[] {transformedProducer, flattenedProducer}));
assertThat(
graph.getPerElementConsumers(transformed), Matchers.containsInAnyOrder(flattenedProducer));
graph.getPerElementConsumers(transformed),
Matchers.containsInAnyOrder(new Object[] {flattenedProducer}));
assertThat(graph.getPerElementConsumers(flattened), emptyIterable());
}

Expand All @@ -168,7 +170,7 @@ public void getValueToConsumersWithDuplicateInputSucceeds() {

assertThat(
graph.getPerElementConsumers(created),
Matchers.containsInAnyOrder(flattenedProducer, flattenedProducer));
Matchers.containsInAnyOrder(new Object[] {flattenedProducer, flattenedProducer}));
assertThat(graph.getPerElementConsumers(flattened), emptyIterable());
}

Expand Down
Expand Up @@ -272,9 +272,12 @@ private Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing(
if (!encodedKeyedElements.isEmpty()) {
// new input for key.
try {
//cast to GenTraversable to avoid a ambiguous call to head() which can come from
//mulitple super interfacesof Seq<byte[]>
byte[] b = ((scala.collection.GenTraversable<byte[]>) encodedKeyedElements).head();
final KV<Long, Iterable<WindowedValue<InputT>>> keyedElements =
CoderHelpers.fromByteArray(
encodedKeyedElements.head(), KvCoder.of(VarLongCoder.of(), itrWvCoder));
b, KvCoder.of(VarLongCoder.of(), itrWvCoder));

final Long rddTimestamp = keyedElements.getKey();

Expand Down
Expand Up @@ -51,7 +51,8 @@ public BeamSqlCaseExpression(List<BeamSqlExpression> operands) {

@Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
for (int i = 0; i < operands.size() - 1; i += 2) {
if (opValueEvaluated(i, inputRow, window)) {
Boolean b = opValueEvaluated(i, inputRow, window);
if (b != null && b) {
return BeamSqlPrimitive.of(
outputType,
opValueEvaluated(i + 1, inputRow, window)
Expand Down

0 comments on commit 2970b53

Please sign in to comment.