Skip to content

Commit

Permalink
Merge branch 'master' of github.com:nathanmarz/storm into logback3
Browse files Browse the repository at this point in the history
Conflicts:
	project.clj
  • Loading branch information
Jason Jackson committed Aug 16, 2012
2 parents d01ed2c + 09f86bd commit a04991c
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 10 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

* Changed debug level of "Failed message" logging to DEBUG
* Deprecated LinearDRPCTopologyBuilder, TimeCacheMap, and transactional topologies
* During "storm jar", whether topology is already running or not is checked before submitting jar to save time (thanks jasonjckn)
* Added BaseMultiReducer class to Trident that provides empty implementations of prepare and cleanup
* Bug fix: When an item is consumed off an internal buffer, the entry on the buffer is nulled to allow GC to happen on that data
* Bug fix: Helper class for Trident MapStates now clear their read cache when a new commit happens, preventing updates from spilling over from a failed batch attempt to the next attempt
* Bug fix: Fix NonTransactionalMap to take in an IBackingMap for regular values rather than TransactionalValue (thanks sjoerdmulder)
* Bug fix: Fix NPE when no input fields given for regular Aggregator
* Bug fix: Fix IndexOutOfBoundsExceptions when a bolt for global aggregation had a parallelism greater than 1 (possible with splitting, stateQuerying, and multiReduce)
* Bug fix: Fix "fields size" error that would sometimes occur when splitting a stream with multiple eaches

## 0.8.0

Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject storm/storm "0.8.1-wip2-SNAPSHOT"
(defproject storm/storm "0.8.1-wip4"
:url "http://storm-project.clj"
:description "Distributed and fault-tolerant realtime computation"
:license {:name "Eclipse Public License - Version 1.0" :url "https://github.com/nathanmarz/storm/blob/master/LICENSE.html"}
Expand Down
13 changes: 9 additions & 4 deletions src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,16 @@ public Stream chainEnd() {
Set<String> allInFields = new HashSet<String>();
for(int i=0; i<_aggs.size(); i++) {
AggSpec spec = _aggs.get(i);
inputFields[i] = spec.inFields;
Fields infields = spec.inFields;
if(infields==null) infields = new Fields();
Fields outfields = spec.outFields;
if(outfields==null) outfields = new Fields();

inputFields[i] = infields;
aggs[i] = spec.agg;
outSizes[i] = spec.outFields.size();
allOutFields.addAll(spec.outFields.toList());
allInFields.addAll(spec.inFields.toList());
outSizes[i] = outfields.size();
allOutFields.addAll(outfields.toList());
allInFields.addAll(infields.toList());
}
if(new HashSet(allOutFields).size() != allOutFields.size()) {
throw new IllegalArgumentException("Output fields for chained aggregators must be distinct: " + allOutFields.toString());
Expand Down
16 changes: 16 additions & 0 deletions src/jvm/storm/trident/operation/BaseMultiReducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package storm.trident.operation;

import java.util.Map;

public abstract class BaseMultiReducer<T> implements MultiReducer<T> {

@Override
public void prepare(Map conf, TridentMultiReducerContext context) {
}


@Override
public void cleanup() {
}

}
4 changes: 3 additions & 1 deletion src/jvm/storm/trident/partition/IndexHashGrouping.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
public class IndexHashGrouping implements CustomStreamGrouping {
public static int objectToIndex(Object val, int numPartitions) {
if(val==null) return 0;
else return val.hashCode() % numPartitions;
else {
return Math.abs(val.hashCode()) % numPartitions;
}
}

int _index;
Expand Down
2 changes: 1 addition & 1 deletion src/jvm/storm/trident/state/Serializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

public interface Serializer<T> extends Serializable {
byte[] serialize(T obj);
Object deserialize(byte[] b);
T deserialize(byte[] b);
}
30 changes: 30 additions & 0 deletions src/jvm/storm/trident/testing/CountAsAggregator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package storm.trident.testing;

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseAggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;


public class CountAsAggregator extends BaseAggregator<CountAsAggregator.State> {

static class State {
long count = 0;
}

@Override
public State init(Object batchId, TridentCollector collector) {
return new State();
}

@Override
public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {
state.count++;
}

@Override
public void complete(State state, TridentCollector collector) {
collector.emit(new Values(state.count));
}

}
4 changes: 3 additions & 1 deletion src/jvm/storm/trident/testing/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ public class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
for(String word: tuple.getString(0).split(" ")) {
collector.emit(new Values(word));
if(word.length() > 0) {
collector.emit(new Values(word));
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/jvm/storm/trident/testing/StringLength.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package storm.trident.testing;

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class StringLength extends BaseFunction {

@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
collector.emit(new Values(tuple.getString(0).length()));
}

}
2 changes: 1 addition & 1 deletion src/jvm/storm/trident/tuple/TridentTupleView.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static class OperationOutputFactory implements Factory {

public OperationOutputFactory(Factory parent, Fields selfFields) {
_parent = parent;
_fieldIndex = parent.getFieldIndex();
_fieldIndex = new HashMap(parent.getFieldIndex());
int myIndex = parent.numDelegates();
for(int i=0; i<selfFields.size(); i++) {
String field = selfFields.get(i);
Expand Down
42 changes: 41 additions & 1 deletion test/clj/storm/trident/integration_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns storm.trident.integration-test
(:use [clojure test])
(:require [backtype.storm [testing :as t]])
(:import [storm.trident.testing Split])
(:import [storm.trident.testing Split CountAsAggregator StringLength])
(:use [storm.trident testing])
(:use [backtype.storm util]))

Expand Down Expand Up @@ -37,3 +37,43 @@
(is (= [[2]] (exec-drpc drpc "words" "man")))
(is (= [[8]] (exec-drpc drpc "words" "man where you the")))
)))))


(deftest test-count-agg
(t/with-local-cluster [cluster]
(with-drpc [drpc]
(letlocals
(bind topo (TridentTopology.))
(-> topo
(.newDRPCStream "numwords" drpc)
(.each (fields "args") (Split.) (fields "word"))
(.aggregate (CountAsAggregator.) (fields "count"))
(.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
(.project (fields "count")))
(with-topology [cluster topo]
(doseq [i (range 100)]
(is (= [[1]] (exec-drpc drpc "numwords" "the"))))
(is (= [[0]] (exec-drpc drpc "numwords" "")))
(is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
)))))

(deftest test-split-merge
(t/with-local-cluster [cluster]
(with-drpc [drpc]
(letlocals
(bind topo (TridentTopology.))
(bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
(bind s1
(-> drpc-stream
(.each (fields "args") (Split.) (fields "word"))
(.project (fields "word"))))
(bind s2
(-> drpc-stream
(.each (fields "args") (StringLength.) (fields "len"))
(.project (fields "len"))))

(.merge topo [s1 s2])
(with-topology [cluster topo]
(is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
(is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
)))))

0 comments on commit a04991c

Please sign in to comment.