Skip to content
Browse files

Merge branch 'master' into 0.9.0

  • Loading branch information...
2 parents 5b62dc5 + 47b6093 commit e2f041ebb1bdecef326dabda736a03c40a014f9f @nathanmarz committed Dec 7, 2012
View
3 CHANGELOG.md
@@ -34,6 +34,9 @@
* Trident now throws an error during construction of a topology when try to select fields that don't exist in a stream (thanks xumingming)
* Compute the capacity of a bolt based on execute latency and #executed over last 10 minutes and display in UI
* Storm UI displays exception instead of blank page when there's an error rendering the page (thanks Frostman)
+ * Added MultiScheme interface (thanks sritchie)
+ * Added MockTridentTuple for testing (thanks emblem)
+ * Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
View
2 README.markdown
@@ -63,6 +63,8 @@ You must not remove this notice, or any other, from this software.
* Ross Feinstein ([@rnfein](https://github.com/rnfein))
* Junichiro Takagi ([@tjun](https://github.com/tjun))
* Bryan Peterson ([@Lazyshot](https://github.com/Lazyshot))
+* Sam Ritchie ([@sritchie](https://github.com/sritchie))
+* Stuart Anderson ([@emblem](https://github.com/emblem))
## Acknowledgements
View
4 src/clj/backtype/storm/daemon/task.clj
@@ -112,8 +112,8 @@
stream->component->grouper (:stream->component->grouper executor-data)
user-context (:user-context task-data)
executor-stats (:stats executor-data)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))
- task-id (:task-id task-data)]
+ debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+
(fn ([^Integer out-task-id ^String stream ^List values]
(when debug?
(log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
View
6 src/jvm/backtype/storm/spout/IMultiSchemableSpout.java
@@ -0,0 +1,6 @@
+package backtype.storm.spout;
+
+public interface IMultiSchemableSpout {
+ MultiScheme getScheme();
+ void setScheme(MultiScheme scheme);
+}
View
10 src/jvm/backtype/storm/spout/MultiScheme.java
@@ -0,0 +1,10 @@
+package backtype.storm.spout;
+
+import java.util.List;
+
+import backtype.storm.tuple.Fields;
+
+public interface MultiScheme {
+ public Iterable<List<Object>> deserialize(byte[] ser);
+ public Fields getOutputFields();
+}
View
21 src/jvm/backtype/storm/spout/RawMultiScheme.java
@@ -0,0 +1,21 @@
+package backtype.storm.spout;
+
+import java.util.List;
+
+import backtype.storm.tuple.Fields;
+
+
+import static backtype.storm.utils.Utils.tuple;
+import static java.util.Arrays.asList;
+
+public class RawMultiScheme implements MultiScheme {
+ @Override
+ public Iterable<List<Object>> deserialize(byte[] ser) {
+ return asList(tuple(ser));
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields("bytes");
+ }
+}
View
22 src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java
@@ -0,0 +1,22 @@
+package backtype.storm.spout;
+
+import java.util.Arrays;
+import java.util.List;
+
+import backtype.storm.tuple.Fields;
+
+public class SchemeAsMultiScheme implements MultiScheme {
+ public final Scheme scheme;
+
+ public SchemeAsMultiScheme(Scheme scheme) {
+ this.scheme = scheme;
+ }
+
+ @Override public Iterable<List<Object>> deserialize(final byte[] ser) {
+ return Arrays.asList(scheme.deserialize(ser));
+ }
+
+ @Override public Fields getOutputFields() {
+ return scheme.getOutputFields();
+ }
+}
View
6 src/jvm/backtype/storm/task/IMetricsContext.java
@@ -0,0 +1,6 @@
+package backtype.storm.task;
+
+
+public interface IMetricsContext {
+
+}
View
2 src/jvm/backtype/storm/task/TopologyContext.java
@@ -29,7 +29,7 @@
* <p>The TopologyContext is also used to declare ISubscribedState objects to
* synchronize state with StateSpouts this object is subscribed to.</p>
*/
-public class TopologyContext extends WorkerTopologyContext {
+public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
private Integer _taskId;
private Map<String, Object> _taskData = new HashMap<String, Object>();
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
View
12 src/jvm/storm/trident/operation/builtin/Debug.java
@@ -4,11 +4,19 @@
import storm.trident.tuple.TridentTuple;
public class Debug extends BaseFilter {
+ private final String name;
+
+ public Debug() {
+ name = "DEBUG: ";
+ }
+
+ public Debug(String name) {
+ this.name = "DEBUG(" + name + "): ";
+ }
@Override
public boolean isKeep(TridentTuple tuple) {
- System.out.println("DEBUG: " + tuple.toString());
+ System.out.println(name + tuple.toString());
return true;
}
-
}
View
2 src/jvm/storm/trident/planner/SubtopologyBolt.java
@@ -49,7 +49,7 @@ public void prepare(Map conf, TopologyContext context, BatchOutputCollector batc
int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
for(Node n: _nodes) {
if(n.stateInfo!=null) {
- State s = n.stateInfo.spec.stateFactory.makeState(conf, context.getThisTaskIndex(), thisComponentNumTasks);
+ State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
context.setTaskData(n.stateInfo.id, s);
}
}
View
3 src/jvm/storm/trident/state/StateFactory.java
@@ -1,8 +1,9 @@
package storm.trident.state;
+import backtype.storm.task.IMetricsContext;
import java.io.Serializable;
import java.util.Map;
public interface StateFactory extends Serializable {
- State makeState(Map conf, int partitionIndex, int numPartitions);
+ State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions);
}
View
3 src/jvm/storm/trident/testing/LRUMemoryMapState.java
@@ -1,5 +1,6 @@
package storm.trident.testing;
+import backtype.storm.task.IMetricsContext;
import storm.trident.state.ITupleCollection;
import backtype.storm.tuple.Values;
import java.util.*;
@@ -70,7 +71,7 @@ public Factory(int maxSize) {
}
@Override
- public State makeState(Map conf, int partitionIndex, int numPartitions) {
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new LRUMemoryMapState(_maxSize, _id);
}
}
View
3 src/jvm/storm/trident/testing/MemoryMapState.java
@@ -1,5 +1,6 @@
package storm.trident.testing;
+import backtype.storm.task.IMetricsContext;
import storm.trident.state.ITupleCollection;
import backtype.storm.tuple.Values;
import java.util.*;
@@ -67,7 +68,7 @@ public Factory() {
}
@Override
- public State makeState(Map conf, int partitionIndex, int numPartitions) {
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new MemoryMapState(_id);
}
}
View
149 src/jvm/storm/trident/testing/MockTridentTuple.java
@@ -0,0 +1,149 @@
+package storm.trident.testing;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import storm.trident.tuple.TridentTuple;
+
+/**
+ * A tuple intended for use in testing.
+ */
+public class MockTridentTuple extends ArrayList<Object> implements TridentTuple{
+ private final Map<String, Integer> fieldMap;
+
+ public MockTridentTuple(List<String> fieldNames, List<?> values) {
+ super(values);
+ fieldMap = setupFieldMap(fieldNames);
+ }
+
+ public MockTridentTuple(List<String> fieldName, Object... values) {
+ super(Arrays.asList(values));
+ fieldMap = setupFieldMap(fieldName);
+ }
+
+ private Map<String, Integer> setupFieldMap(List<String> fieldNames) {
+ Map<String, Integer> newFieldMap = new HashMap<String, Integer>(fieldNames.size());
+
+ int idx = 0;
+ for (String fieldName : fieldNames) {
+ newFieldMap.put(fieldName, idx++);
+ }
+ return newFieldMap;
+ }
+
+ private int getIndex(String fieldName) {
+ Integer index = fieldMap.get(fieldName);
+ if (index == null) {
+ throw new IllegalArgumentException("Unknown field name: " + fieldName);
+ }
+ return index;
+ }
+
+ @Override
+ public List<Object> getValues() {
+ return this;
+ }
+
+ @Override
+ public Object getValue(int i) {
+ return get(i);
+ }
+
+ @Override
+ public String getString(int i) {
+ return (String)get(i);
+ }
+
+ @Override
+ public Integer getInteger(int i) {
+ return (Integer)get(i);
+ }
+
+ @Override
+ public Long getLong(int i) {
+ return (Long)get(i);
+ }
+
+ @Override
+ public Boolean getBoolean(int i) {
+ return (Boolean)get(i);
+ }
+
+ @Override
+ public Short getShort(int i) {
+ return (Short)get(i);
+ }
+
+ @Override
+ public Byte getByte(int i) {
+ return (Byte)get(i);
+ }
+
+ @Override
+ public Double getDouble(int i) {
+ return (Double)get(i);
+ }
+
+ @Override
+ public Float getFloat(int i) {
+ return (Float)get(i);
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return (byte[]) get(i);
+ }
+
+ @Override
+ public Object getValueByField(String field) {
+ return get(getIndex(field));
+ }
+
+ @Override
+ public String getStringByField(String field) {
+ return (String) getValueByField(field);
+ }
+
+ @Override
+ public Integer getIntegerByField(String field) {
+ return (Integer) getValueByField(field);
+ }
+
+ @Override
+ public Long getLongByField(String field) {
+ return (Long) getValueByField(field);
+ }
+
+ @Override
+ public Boolean getBooleanByField(String field) {
+ return (Boolean) getValueByField(field);
+ }
+
+ @Override
+ public Short getShortByField(String field) {
+ return (Short) getValueByField(field);
+ }
+
+ @Override
+ public Byte getByteByField(String field) {
+ return (Byte) getValueByField(field);
+ }
+
+ @Override
+ public Double getDoubleByField(String field) {
+ return (Double) getValueByField(field);
+ }
+
+ @Override
+ public Float getFloatByField(String field) {
+ return (Float) getValueByField(field);
+ }
+
+ @Override
+ public byte[] getBinaryByField(String field) {
+ return (byte[]) getValueByField(field);
+ }
+}

0 comments on commit e2f041e

Please sign in to comment.
Something went wrong with that request. Please try again.