Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch '0.9.0' of github.com:nathanmarz/storm into 0.9.0

  • Loading branch information...
commit 33b25c0e42296e295ee253b8179cb35be80f810e 2 parents 99153f9 + cb5b413
Jason Jackson authored
View
5 CHANGELOG.md
@@ -34,9 +34,14 @@
* Trident now throws an error during construction of a topology when try to select fields that don't exist in a stream (thanks xumingming)
* Compute the capacity of a bolt based on execute latency and #executed over last 10 minutes and display in UI
* Storm UI displays exception instead of blank page when there's an error rendering the page (thanks Frostman)
+ * Added MultiScheme interface (thanks sritchie)
+ * Added MockTridentTuple for testing (thanks emblem)
+ * Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
+ * Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
+ * Bug fix: fixed NPE when emitting during emit method of Aggregator
## 0.8.1
View
2  README.markdown
@@ -63,6 +63,8 @@ You must not remove this notice, or any other, from this software.
* Ross Feinstein ([@rnfein](https://github.com/rnfein))
* Junichiro Takagi ([@tjun](https://github.com/tjun))
* Bryan Peterson ([@Lazyshot](https://github.com/Lazyshot))
+* Sam Ritchie ([@sritchie](https://github.com/sritchie))
+* Stuart Anderson ([@emblem](https://github.com/emblem))
## Acknowledgements
View
2  project.clj
@@ -1,4 +1,4 @@
-(defproject storm/storm "0.9.0-wip5-SNAPSHOT"
+(defproject storm/storm "0.9.0-wip9"
:url "http://storm-project.clj"
:description "Distributed and fault-tolerant realtime computation"
:license {:name "Eclipse Public License - Version 1.0" :url "https://github.com/nathanmarz/storm/blob/master/LICENSE.html"}
View
4 src/clj/backtype/storm/daemon/task.clj
@@ -112,8 +112,8 @@
stream->component->grouper (:stream->component->grouper executor-data)
user-context (:user-context task-data)
executor-stats (:stats executor-data)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))
- task-id (:task-id task-data)]
+ debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+
(fn ([^Integer out-task-id ^String stream ^List values]
(when debug?
(log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
View
6 src/clj/backtype/storm/messaging/zmq.clj
@@ -26,6 +26,8 @@
(defprotocol ZMQContextQuery
(zmq-context [this]))
+(def NOBLOCK-SNDMORE (bit-or ZMQ/SNDMORE ZMQ/NOBLOCK))
+
(deftype ZMQConnection [socket ^ByteBuffer bb]
Connection
(recv-with-flags [this flags]
@@ -37,8 +39,8 @@
(send [this task message]
(.clear bb)
(.putShort bb (short task))
- (mq/send socket (.array bb) ZMQ/SNDMORE)
- (mq/send socket message)) ;; TODO: temporarily remove the noblock flag
+ (mq/send socket (.array bb) NOBLOCK-SNDMORE)
+ (mq/send socket message ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
(close [this]
(.close socket)
))
View
6 src/jvm/backtype/storm/spout/IMultiSchemableSpout.java
@@ -0,0 +1,6 @@
+package backtype.storm.spout;
+
+public interface IMultiSchemableSpout {
+ MultiScheme getScheme();
+ void setScheme(MultiScheme scheme);
+}
View
10 src/jvm/backtype/storm/spout/MultiScheme.java
@@ -0,0 +1,10 @@
+package backtype.storm.spout;
+
+import java.util.List;
+
+import backtype.storm.tuple.Fields;
+
+public interface MultiScheme {
+ public Iterable<List<Object>> deserialize(byte[] ser);
+ public Fields getOutputFields();
+}
View
21 src/jvm/backtype/storm/spout/RawMultiScheme.java
@@ -0,0 +1,21 @@
+package backtype.storm.spout;
+
+import java.util.List;
+
+import backtype.storm.tuple.Fields;
+
+
+import static backtype.storm.utils.Utils.tuple;
+import static java.util.Arrays.asList;
+
+public class RawMultiScheme implements MultiScheme {
+ @Override
+ public Iterable<List<Object>> deserialize(byte[] ser) {
+ return asList(tuple(ser));
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields("bytes");
+ }
+}
View
22 src/jvm/backtype/storm/spout/SchemeAsMultiScheme.java
@@ -0,0 +1,22 @@
+package backtype.storm.spout;
+
+import java.util.Arrays;
+import java.util.List;
+
+import backtype.storm.tuple.Fields;
+
+public class SchemeAsMultiScheme implements MultiScheme {
+ public final Scheme scheme;
+
+ public SchemeAsMultiScheme(Scheme scheme) {
+ this.scheme = scheme;
+ }
+
+ @Override public Iterable<List<Object>> deserialize(final byte[] ser) {
+ return Arrays.asList(scheme.deserialize(ser));
+ }
+
+ @Override public Fields getOutputFields() {
+ return scheme.getOutputFields();
+ }
+}
View
14 src/jvm/backtype/storm/task/IMetricsContext.java
@@ -0,0 +1,14 @@
+package backtype.storm.task;
+
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+
+
+public interface IMetricsContext {
+ <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs);
+ ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs);
+ CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs);
+}
View
2  src/jvm/backtype/storm/task/TopologyContext.java
@@ -29,7 +29,7 @@
* <p>The TopologyContext is also used to declare ISubscribedState objects to
* synchronize state with StateSpouts this object is subscribed to.</p>
*/
-public class TopologyContext extends WorkerTopologyContext {
+public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
private Integer _taskId;
private Map<String, Object> _taskData = new HashMap<String, Object>();
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
View
9 src/jvm/storm/trident/operation/TridentOperationContext.java
@@ -1,12 +1,17 @@
package storm.trident.operation;
-import backtype.storm.metric.api.*;
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+import backtype.storm.task.IMetricsContext;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import storm.trident.tuple.TridentTuple;
import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-public class TridentOperationContext {
+public class TridentOperationContext implements IMetricsContext{
TridentTuple.Factory _factory;
TopologyContext _topoContext;
View
12 src/jvm/storm/trident/operation/builtin/Debug.java
@@ -4,11 +4,19 @@
import storm.trident.tuple.TridentTuple;
public class Debug extends BaseFilter {
+ private final String name;
+
+ public Debug() {
+ name = "DEBUG: ";
+ }
+
+ public Debug(String name) {
+ this.name = "DEBUG(" + name + "): ";
+ }
@Override
public boolean isKeep(TridentTuple tuple) {
- System.out.println("DEBUG: " + tuple.toString());
+ System.out.println(name + tuple.toString());
return true;
}
-
}
View
1  src/jvm/storm/trident/operation/impl/GroupedAggregator.java
@@ -56,6 +56,7 @@ public void aggregate(Object[] arr, TridentTuple tuple, TridentCollector collect
} else {
curr = val.get(group);
}
+ groupColl.currGroup = group;
_agg.aggregate(curr, input, groupColl);
}
View
2  src/jvm/storm/trident/planner/SubtopologyBolt.java
@@ -49,7 +49,7 @@ public void prepare(Map conf, TopologyContext context, BatchOutputCollector batc
int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
for(Node n: _nodes) {
if(n.stateInfo!=null) {
- State s = n.stateInfo.spec.stateFactory.makeState(conf, context.getThisTaskIndex(), thisComponentNumTasks);
+ State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
context.setTaskData(n.stateInfo.id, s);
}
}
View
3  src/jvm/storm/trident/state/StateFactory.java
@@ -1,8 +1,9 @@
package storm.trident.state;
+import backtype.storm.task.IMetricsContext;
import java.io.Serializable;
import java.util.Map;
public interface StateFactory extends Serializable {
- State makeState(Map conf, int partitionIndex, int numPartitions);
+ State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions);
}
View
3  src/jvm/storm/trident/testing/LRUMemoryMapState.java
@@ -1,5 +1,6 @@
package storm.trident.testing;
+import backtype.storm.task.IMetricsContext;
import storm.trident.state.ITupleCollection;
import backtype.storm.tuple.Values;
import java.util.*;
@@ -70,7 +71,7 @@ public Factory(int maxSize) {
}
@Override
- public State makeState(Map conf, int partitionIndex, int numPartitions) {
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new LRUMemoryMapState(_maxSize, _id);
}
}
View
3  src/jvm/storm/trident/testing/MemoryMapState.java
@@ -1,5 +1,6 @@
package storm.trident.testing;
+import backtype.storm.task.IMetricsContext;
import storm.trident.state.ITupleCollection;
import backtype.storm.tuple.Values;
import java.util.*;
@@ -67,7 +68,7 @@ public Factory() {
}
@Override
- public State makeState(Map conf, int partitionIndex, int numPartitions) {
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new MemoryMapState(_id);
}
}
View
149 src/jvm/storm/trident/testing/MockTridentTuple.java
@@ -0,0 +1,149 @@
+package storm.trident.testing;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import storm.trident.tuple.TridentTuple;
+
+/**
+ * A tuple intended for use in testing.
+ */
+public class MockTridentTuple extends ArrayList<Object> implements TridentTuple{
+ private final Map<String, Integer> fieldMap;
+
+ public MockTridentTuple(List<String> fieldNames, List<?> values) {
+ super(values);
+ fieldMap = setupFieldMap(fieldNames);
+ }
+
+ public MockTridentTuple(List<String> fieldName, Object... values) {
+ super(Arrays.asList(values));
+ fieldMap = setupFieldMap(fieldName);
+ }
+
+ private Map<String, Integer> setupFieldMap(List<String> fieldNames) {
+ Map<String, Integer> newFieldMap = new HashMap<String, Integer>(fieldNames.size());
+
+ int idx = 0;
+ for (String fieldName : fieldNames) {
+ newFieldMap.put(fieldName, idx++);
+ }
+ return newFieldMap;
+ }
+
+ private int getIndex(String fieldName) {
+ Integer index = fieldMap.get(fieldName);
+ if (index == null) {
+ throw new IllegalArgumentException("Unknown field name: " + fieldName);
+ }
+ return index;
+ }
+
+ @Override
+ public List<Object> getValues() {
+ return this;
+ }
+
+ @Override
+ public Object getValue(int i) {
+ return get(i);
+ }
+
+ @Override
+ public String getString(int i) {
+ return (String)get(i);
+ }
+
+ @Override
+ public Integer getInteger(int i) {
+ return (Integer)get(i);
+ }
+
+ @Override
+ public Long getLong(int i) {
+ return (Long)get(i);
+ }
+
+ @Override
+ public Boolean getBoolean(int i) {
+ return (Boolean)get(i);
+ }
+
+ @Override
+ public Short getShort(int i) {
+ return (Short)get(i);
+ }
+
+ @Override
+ public Byte getByte(int i) {
+ return (Byte)get(i);
+ }
+
+ @Override
+ public Double getDouble(int i) {
+ return (Double)get(i);
+ }
+
+ @Override
+ public Float getFloat(int i) {
+ return (Float)get(i);
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return (byte[]) get(i);
+ }
+
+ @Override
+ public Object getValueByField(String field) {
+ return get(getIndex(field));
+ }
+
+ @Override
+ public String getStringByField(String field) {
+ return (String) getValueByField(field);
+ }
+
+ @Override
+ public Integer getIntegerByField(String field) {
+ return (Integer) getValueByField(field);
+ }
+
+ @Override
+ public Long getLongByField(String field) {
+ return (Long) getValueByField(field);
+ }
+
+ @Override
+ public Boolean getBooleanByField(String field) {
+ return (Boolean) getValueByField(field);
+ }
+
+ @Override
+ public Short getShortByField(String field) {
+ return (Short) getValueByField(field);
+ }
+
+ @Override
+ public Byte getByteByField(String field) {
+ return (Byte) getValueByField(field);
+ }
+
+ @Override
+ public Double getDoubleByField(String field) {
+ return (Double) getValueByField(field);
+ }
+
+ @Override
+ public Float getFloatByField(String field) {
+ return (Float) getValueByField(field);
+ }
+
+ @Override
+ public byte[] getBinaryByField(String field) {
+ return (byte[]) getValueByField(field);
+ }
+}
View
12 src/jvm/storm/trident/tuple/ComboList.java
@@ -3,13 +3,16 @@
import java.io.Serializable;
import java.util.AbstractList;
import java.util.List;
+import org.apache.commons.lang.builder.ToStringBuilder;
public class ComboList extends AbstractList<Object> {
public static class Factory implements Serializable {
Pointer[] index;
+ int[] sizes;
public Factory(int... sizes) {
+ this.sizes = sizes;
int total = 0;
for(int size: sizes) {
total+=size;
@@ -27,6 +30,15 @@ public Factory(int... sizes) {
}
public ComboList create(List[] delegates) {
+ if(delegates.length!=sizes.length) {
+ throw new RuntimeException("Expected " + sizes.length + " lists, but instead got " + delegates.length + " lists");
+ }
+ for(int i=0; i<delegates.length; i++) {
+ List l = delegates[i];
+ if(l==null || l.size() != sizes[i]) {
+ throw new RuntimeException("Got unexpected delegates to ComboList: " + ToStringBuilder.reflectionToString(delegates));
+ }
+ }
return new ComboList(delegates, index);
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.