Skip to content

Commit

Permalink
Merge pull request #315 from rabix/fix/scatter
Browse files Browse the repository at this point in the history
fix minor scatter issues
  • Loading branch information
sivkovic committed Jul 23, 2017
2 parents dfda06e + 6af5299 commit e577ee6
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
Expand Up @@ -15,10 +15,10 @@
import org.rabix.bindings.model.dag.DAGLinkPort;
import org.rabix.bindings.model.dag.DAGNode;
import org.rabix.common.helper.InternalSchemaHelper;
import org.rabix.engine.store.model.scatter.PortMapping;
import org.rabix.engine.store.model.scatter.RowMapping;
import org.rabix.engine.store.model.scatter.ScatterStrategy;
import org.rabix.engine.store.model.scatter.ScatterStrategyException;
import org.rabix.engine.store.model.scatter.PortMapping;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -101,7 +101,12 @@ public synchronized void enable(String port, Object value, Integer position, Int
}

@Override
@SuppressWarnings("unchecked")
public List<Object> valueStructure(String jobId, String portId, UUID rootId) {
if (emptyListDetected) {
return (List<Object>) generateOutputsForEmptyList();
}

Collections.sort(combinations, new Comparator<Combination>() {
@Override
public int compare(Combination o1, Combination o2) {
Expand Down Expand Up @@ -294,7 +299,13 @@ public Object generateOutputsForEmptyList() {
if (scatterMethod.equals(ScatterMethod.flat_crossproduct)) {
return new ArrayList<>();
}
return new ArrayList<>(); // TODO implement outputs for nested_crossproduct

Integer numberOfEmptyLists = values.values().stream().map(l -> l.size() != 0? l.size() : 1).reduce((x,y) -> x*y).get();
List<List<?>> result = new ArrayList<>();
for (int i = 0; i < numberOfEmptyLists; i++) {
result.add(new ArrayList<>());
}
return result;
}

@Override
Expand Down
Expand Up @@ -196,6 +196,9 @@ public boolean isBlocking() {

@Override
public List<Object> valueStructure(String jobId, String portId, UUID rootId) {
if (combinations.isEmpty()) {
return new LinkedList<>();
}
Collections.sort(combinations, new Comparator<Combination>() {
@Override
public int compare(Combination o1, Combination o2) {
Expand Down
Expand Up @@ -123,7 +123,7 @@ public void handle(final OutputUpdateEvent event) throws EventHandlerException {
ScatterStrategy scatterStrategy = sourceJob.getScatterStrategy();

boolean isValueFromScatterStrategy = false;
if (scatterStrategy.isBlocking()) {
if (scatterStrategy.isBlocking() || scatterStrategy.isEmptyListDetected()) {
if (sourceJob.isOutputPortReady(event.getPortId())) {
isValueFromScatterStrategy = true;

Expand Down

0 comments on commit e577ee6

Please sign in to comment.