Skip to content

Commit

Permalink
fix joins
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Aug 2, 2012
1 parent 34ae39f commit 36b9fea
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/jvm/storm/trident/operation/GroupedMultiReducer.java
Expand Up @@ -7,7 +7,7 @@


public interface GroupedMultiReducer<T> extends Serializable { public interface GroupedMultiReducer<T> extends Serializable {
void prepare(Map conf, TridentMultiReducerContext context); void prepare(Map conf, TridentMultiReducerContext context);
T init(TridentCollector collector); T init(TridentCollector collector, TridentTuple group);
void execute(T state, int streamIndex, TridentTuple group, TridentTuple input, TridentCollector collector); void execute(T state, int streamIndex, TridentTuple group, TridentTuple input, TridentCollector collector);
void complete(T state, TridentTuple group, TridentCollector collector); void complete(T state, TridentTuple group, TridentCollector collector);
void cleanup(); void cleanup();
Expand Down
Expand Up @@ -26,6 +26,7 @@ public GroupedMultiReducerExecutor(GroupedMultiReducer reducer, List<Fields> gro
} }
_groupFields = groupFields; _groupFields = groupFields;
_inputFields = inputFields; _inputFields = inputFields;
_reducer = reducer;
} }


@Override @Override
Expand All @@ -48,11 +49,11 @@ public void execute(Map<TridentTuple, Object> state, int streamIndex, TridentTup
ProjectionFactory inputFactory = _inputFactories.get(streamIndex); ProjectionFactory inputFactory = _inputFactories.get(streamIndex);


TridentTuple group = groupFactory.create(full); TridentTuple group = groupFactory.create(full);
TridentTuple input = groupFactory.create(full); TridentTuple input = inputFactory.create(full);


Object curr; Object curr;
if(!state.containsKey(group)) { if(!state.containsKey(group)) {
curr = _reducer.init(collector); curr = _reducer.init(collector, group);
state.put(group, curr); state.put(group, curr);
} else { } else {
curr = state.get(group); curr = state.get(group);
Expand Down
18 changes: 11 additions & 7 deletions src/jvm/storm/trident/operation/impl/JoinerMultiReducer.java
Expand Up @@ -30,15 +30,15 @@ public JoinerMultiReducer(List<JoinType> types, int numGroupFields, List<Fields>
public void prepare(Map conf, TridentMultiReducerContext context) { public void prepare(Map conf, TridentMultiReducerContext context) {
int[] sizes = new int[_sideFields.size() + 1]; int[] sizes = new int[_sideFields.size() + 1];
sizes[0] = _numGroupFields; sizes[0] = _numGroupFields;
for(int i=0; i<_sideFields.size()-1; i++) { for(int i=0; i<_sideFields.size(); i++) {
sizes[i+1] = _sideFields.get(i).size(); sizes[i+1] = _sideFields.get(i).size();
} }
_factory = new ComboList.Factory(sizes); _factory = new ComboList.Factory(sizes);
} }


@Override @Override
public JoinState init(TridentCollector collector) { public JoinState init(TridentCollector collector, TridentTuple group) {
return new JoinState(_types.size()); return new JoinState(_types.size(), group);
} }


@Override @Override
Expand All @@ -49,6 +49,7 @@ public void execute(JoinState state, int streamIndex, TridentTuple group, Triden
if(side.isEmpty()) { if(side.isEmpty()) {
state.numSidesReceived++; state.numSidesReceived++;
} }

side.add(input); side.add(input);
if(state.numSidesReceived == state.sides.length) { if(state.numSidesReceived == state.sides.length) {
emitCrossJoin(state, collector, streamIndex, input); emitCrossJoin(state, collector, streamIndex, input);
Expand Down Expand Up @@ -92,12 +93,13 @@ private void emitCrossJoin(JoinState state, TridentCollector collector, int over
boolean keepGoing = true; boolean keepGoing = true;
//emit cross-join of all emitted tuples //emit cross-join of all emitted tuples
while(keepGoing) { while(keepGoing) {
List[] combined = new List[sides.length]; List[] combined = new List[sides.length+1];
combined[0] = state.group;
for(int i=0; i<sides.length; i++) { for(int i=0; i<sides.length; i++) {
if(i==overrideIndex) { if(i==overrideIndex) {
combined[i] = overrideTuple; combined[i+1] = overrideTuple;
} else { } else {
combined[i] = sides[i].get(indices[i]); combined[i+1] = sides[i].get(indices[i]);
} }
} }
collector.emit(_factory.create(combined)); collector.emit(_factory.create(combined));
Expand Down Expand Up @@ -125,10 +127,12 @@ public static class JoinState {
List<List>[] sides; List<List>[] sides;
int numSidesReceived = 0; int numSidesReceived = 0;
int[] indices; int[] indices;
TridentTuple group;


public JoinState(int numSides) { public JoinState(int numSides, TridentTuple group) {
sides = new List[numSides]; sides = new List[numSides];
indices = new int[numSides]; indices = new int[numSides];
this.group = group;
for(int i=0; i<numSides; i++) { for(int i=0; i<numSides; i++) {
sides[i] = new ArrayList<List>(); sides[i] = new ArrayList<List>();
} }
Expand Down

0 comments on commit 36b9fea

Please sign in to comment.