Permalink
Browse files

Merge branch '0.9.0' of github.com:nathanmarz/storm into 0.9.0

  • Loading branch information...
2 parents 99153f9 + cb5b413 commit 33b25c0e42296e295ee253b8179cb35be80f810e Jason Jackson committed Dec 18, 2012
View
@@ -34,9 +34,14 @@
* Trident now throws an error during construction of a topology when try to select fields that don't exist in a stream (thanks xumingming)
* Compute the capacity of a bolt based on execute latency and #executed over last 10 minutes and display in UI
* Storm UI displays exception instead of blank page when there's an error rendering the page (thanks Frostman)
+ * Added MultiScheme interface (thanks sritchie)
+ * Added MockTridentTuple for testing (thanks emblem)
+ * Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
+ * Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
+ * Bug fix: fixed NPE when emitting during emit method of Aggregator
## 0.8.1
View
@@ -63,6 +63,8 @@ You must not remove this notice, or any other, from this software.
* Ross Feinstein ([@rnfein](https://github.com/rnfein))
* Junichiro Takagi ([@tjun](https://github.com/tjun))
* Bryan Peterson ([@Lazyshot](https://github.com/Lazyshot))
+* Sam Ritchie ([@sritchie](https://github.com/sritchie))
+* Stuart Anderson ([@emblem](https://github.com/emblem))
## Acknowledgements
View
@@ -1,4 +1,4 @@
-(defproject storm/storm "0.9.0-wip5-SNAPSHOT"
+(defproject storm/storm "0.9.0-wip9"
:url "http://storm-project.clj"
:description "Distributed and fault-tolerant realtime computation"
:license {:name "Eclipse Public License - Version 1.0" :url "https://github.com/nathanmarz/storm/blob/master/LICENSE.html"}
@@ -112,8 +112,8 @@
stream->component->grouper (:stream->component->grouper executor-data)
user-context (:user-context task-data)
executor-stats (:stats executor-data)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))
- task-id (:task-id task-data)]
+ debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+
(fn ([^Integer out-task-id ^String stream ^List values]
(when debug?
(log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
@@ -26,6 +26,8 @@
(defprotocol ZMQContextQuery
(zmq-context [this]))
+(def NOBLOCK-SNDMORE (bit-or ZMQ/SNDMORE ZMQ/NOBLOCK))
+
(deftype ZMQConnection [socket ^ByteBuffer bb]
Connection
(recv-with-flags [this flags]
@@ -37,8 +39,8 @@
(send [this task message]
(.clear bb)
(.putShort bb (short task))
- (mq/send socket (.array bb) ZMQ/SNDMORE)
- (mq/send socket message)) ;; TODO: temporarily remove the noblock flag
+ (mq/send socket (.array bb) NOBLOCK-SNDMORE)
+ (mq/send socket message ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
(close [this]
(.close socket)
))
@@ -0,0 +1,6 @@
+package backtype.storm.spout;
+
+public interface IMultiSchemableSpout {
+ MultiScheme getScheme();
+ void setScheme(MultiScheme scheme);
+}
@@ -0,0 +1,10 @@
+package backtype.storm.spout;
+
+import java.util.List;
+
+import backtype.storm.tuple.Fields;
+
+public interface MultiScheme {
+ public Iterable<List<Object>> deserialize(byte[] ser);
+ public Fields getOutputFields();
+}
@@ -0,0 +1,21 @@
+package backtype.storm.spout;
+
+import java.util.List;
+
+import backtype.storm.tuple.Fields;
+
+
+import static backtype.storm.utils.Utils.tuple;
+import static java.util.Arrays.asList;
+
+public class RawMultiScheme implements MultiScheme {
+ @Override
+ public Iterable<List<Object>> deserialize(byte[] ser) {
+ return asList(tuple(ser));
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields("bytes");
+ }
+}
@@ -0,0 +1,22 @@
+package backtype.storm.spout;
+
+import java.util.Arrays;
+import java.util.List;
+
+import backtype.storm.tuple.Fields;
+
+public class SchemeAsMultiScheme implements MultiScheme {
+ public final Scheme scheme;
+
+ public SchemeAsMultiScheme(Scheme scheme) {
+ this.scheme = scheme;
+ }
+
+ @Override public Iterable<List<Object>> deserialize(final byte[] ser) {
+ return Arrays.asList(scheme.deserialize(ser));
+ }
+
+ @Override public Fields getOutputFields() {
+ return scheme.getOutputFields();
+ }
+}
@@ -0,0 +1,14 @@
+package backtype.storm.task;
+
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+
+
+public interface IMetricsContext {
+ <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs);
+ ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs);
+ CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs);
+}
@@ -29,7 +29,7 @@
* <p>The TopologyContext is also used to declare ISubscribedState objects to
* synchronize state with StateSpouts this object is subscribed to.</p>
*/
-public class TopologyContext extends WorkerTopologyContext {
+public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
private Integer _taskId;
private Map<String, Object> _taskData = new HashMap<String, Object>();
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
@@ -1,12 +1,17 @@
package storm.trident.operation;
-import backtype.storm.metric.api.*;
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+import backtype.storm.task.IMetricsContext;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import storm.trident.tuple.TridentTuple;
import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-public class TridentOperationContext {
+public class TridentOperationContext implements IMetricsContext{
TridentTuple.Factory _factory;
TopologyContext _topoContext;
@@ -4,11 +4,19 @@
import storm.trident.tuple.TridentTuple;
public class Debug extends BaseFilter {
+ private final String name;
+
+ public Debug() {
+ name = "DEBUG: ";
+ }
+
+ public Debug(String name) {
+ this.name = "DEBUG(" + name + "): ";
+ }
@Override
public boolean isKeep(TridentTuple tuple) {
- System.out.println("DEBUG: " + tuple.toString());
+ System.out.println(name + tuple.toString());
return true;
}
-
}
@@ -56,6 +56,7 @@ public void aggregate(Object[] arr, TridentTuple tuple, TridentCollector collect
} else {
curr = val.get(group);
}
+ groupColl.currGroup = group;
_agg.aggregate(curr, input, groupColl);
}
@@ -49,7 +49,7 @@ public void prepare(Map conf, TopologyContext context, BatchOutputCollector batc
int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
for(Node n: _nodes) {
if(n.stateInfo!=null) {
- State s = n.stateInfo.spec.stateFactory.makeState(conf, context.getThisTaskIndex(), thisComponentNumTasks);
+ State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
context.setTaskData(n.stateInfo.id, s);
}
}
@@ -1,8 +1,9 @@
package storm.trident.state;
+import backtype.storm.task.IMetricsContext;
import java.io.Serializable;
import java.util.Map;
public interface StateFactory extends Serializable {
- State makeState(Map conf, int partitionIndex, int numPartitions);
+ State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions);
}
@@ -1,5 +1,6 @@
package storm.trident.testing;
+import backtype.storm.task.IMetricsContext;
import storm.trident.state.ITupleCollection;
import backtype.storm.tuple.Values;
import java.util.*;
@@ -70,7 +71,7 @@ public Factory(int maxSize) {
}
@Override
- public State makeState(Map conf, int partitionIndex, int numPartitions) {
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new LRUMemoryMapState(_maxSize, _id);
}
}
@@ -1,5 +1,6 @@
package storm.trident.testing;
+import backtype.storm.task.IMetricsContext;
import storm.trident.state.ITupleCollection;
import backtype.storm.tuple.Values;
import java.util.*;
@@ -67,7 +68,7 @@ public Factory() {
}
@Override
- public State makeState(Map conf, int partitionIndex, int numPartitions) {
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new MemoryMapState(_id);
}
}
Oops, something went wrong.

0 comments on commit 33b25c0

Please sign in to comment.