Skip to content

Commit

Permalink
added localOrShuffleGrouping to java apis
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Mar 28, 2012
1 parent 438a858 commit 4f63ad5
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<
public LinearDRPCInputDeclarer shuffleGrouping();
public LinearDRPCInputDeclarer shuffleGrouping(String streamId);

public LinearDRPCInputDeclarer localOrShuffleGrouping();
public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId);

public LinearDRPCInputDeclarer noneGrouping();
public LinearDRPCInputDeclarer noneGrouping(String streamId);

Expand Down
22 changes: 22 additions & 0 deletions src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,28 @@ public void declare(String prevComponent, InputDeclarer declarer) {
return this;
}

@Override
public LinearDRPCInputDeclarer localOrShuffleGrouping() {
addDeclaration(new InputDeclaration() {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.localOrShuffleGrouping(prevComponent);
}
});
return this;
}

@Override
public LinearDRPCInputDeclarer localOrShuffleGrouping(final String streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.localOrShuffleGrouping(prevComponent, streamId);
}
});
return this;
}

@Override
public LinearDRPCInputDeclarer noneGrouping() {
addDeclaration(new InputDeclaration() {
Expand Down
3 changes: 3 additions & 0 deletions src/jvm/backtype/storm/topology/InputDeclarer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public interface InputDeclarer<T extends InputDeclarer> {
public T shuffleGrouping(String componentId);
public T shuffleGrouping(String componentId, String streamId);

public T localOrShuffleGrouping(String componentId);
public T localOrShuffleGrouping(String componentId, String streamId);

public T noneGrouping(String componentId);
public T noneGrouping(String componentId, String streamId);

Expand Down
8 changes: 8 additions & 0 deletions src/jvm/backtype/storm/topology/TopologyBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ public BoltDeclarer shuffleGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.shuffle(new NullStruct()));
}

public BoltDeclarer localOrShuffleGrouping(String componentId) {
return localOrShuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}

public BoltDeclarer localOrShuffleGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.local_or_shuffle(new NullStruct()));
}

public BoltDeclarer noneGrouping(String componentId) {
return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,38 @@ public String getComponent() {
return this;
}

@Override
public BoltDeclarer localOrShuffleGrouping(final String component) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.localOrShuffleGrouping(component);
}

@Override
public String getComponent() {
return component;
}
});
return this;
}

@Override
public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.localOrShuffleGrouping(component, streamId);
}

@Override
public String getComponent() {
return component;
}
});
return this;
}

@Override
public BoltDeclarer noneGrouping(final String component) {
addDeclaration(new InputDeclaration() {
Expand Down

0 comments on commit 4f63ad5

Please sign in to comment.