Permalink
Browse files

Working emit with map from clojure

* Should probably refactor a bit and test more
  • Loading branch information...
1 parent bbe4aee commit b37dfdbb13fb0c1c58f47203da6c5097242e92ce @schleyfox committed Jan 9, 2012
@@ -156,16 +156,25 @@
~definer
))))
-(defnk emit-bolt! [^OutputCollector collector ^List values
+(defn- mk-tuple-values
+ [^OutputCollector collector stream values]
+ (if (map? values)
+ (let [ fields (.. collector getContext (getThisOutputFields stream)) ]
+ (map values fields))
+ values))
+
+(defnk emit-bolt! [^OutputCollector collector values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
- (let [^List anchor (collectify anchor)]
- (.emit collector stream anchor values)
+ (let [^List anchor (collectify anchor)
+ tuple-values (mk-tuple-values collector stream values) ]
+ (.emit collector stream anchor tuple-values)
))
-(defnk emit-direct-bolt! [^OutputCollector collector task ^List values
+(defnk emit-direct-bolt! [^OutputCollector collector task values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
- (let [^List anchor (collectify anchor)]
- (.emitDirect collector task stream anchor values)
+ (let [^List anchor (collectify anchor)
+ tuple-values (mk-tuple-values collector stream values) ]
+ (.emitDirect collector task stream anchor tuple-values)
))
(defn ack! [^OutputCollector collector ^Tuple tuple]
@@ -94,6 +94,9 @@ public void reportError(Throwable error) {
_delegate.reportError(error);
}
+ public TopologyContext getContext() {
+ return _delegate.getContext();
+ }
private void updateTaskCounts(Object id, List<Integer> tasks) {
Map<Integer, Integer> taskEmittedTuples = _tracked.get(id).taskEmittedTuples;
@@ -2,6 +2,7 @@
import java.util.List;
import backtype.storm.tuple.Tuple;
+import backtype.storm.task.TopologyContext;
import java.util.Collection;
public interface IOutputCollector {
@@ -13,4 +14,9 @@
void ack(Tuple input);
void fail(Tuple input);
void reportError(Throwable error);
+
+ /**
+ * Returns the topology context if it is available
+ */
+ TopologyContext getContext();
}
@@ -2,6 +2,7 @@
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
+import backtype.storm.task.TopologyContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -30,6 +31,10 @@ public OutputCollectorImpl(TopologyContext context, IInternalOutputCollector col
_context = context;
_collector = collector;
}
+
+ public TopologyContext getContext() {
+ return _context;
+ }
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
return _collector.emit(anchorTuple(anchors, streamId, tuple));
@@ -0,0 +1,168 @@
+package backtype.storm.tuple;
+
+
+import clojure.lang.ILookup;
+import clojure.lang.Seqable;
+import clojure.lang.Indexed;
+import clojure.lang.Counted;
+import clojure.lang.ISeq;
+import clojure.lang.ASeq;
+import clojure.lang.AFn;
+import clojure.lang.IPersistentMap;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.IMapEntry;
+import clojure.lang.IPersistentCollection;
+import clojure.lang.Obj;
+import clojure.lang.IMeta;
+import clojure.lang.Keyword;
+import clojure.lang.Symbol;
+import clojure.lang.MapEntry;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Collection;
+import java.util.Set;
+import clojure.lang.RT;
+
+public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap, Map {
+
+ protected IPersistentMap _map;
+
+ protected IndifferentAccessMap() {
+ }
+
+ public IndifferentAccessMap(IPersistentMap map) {
+ setMap(map);
+ }
+
+ public IPersistentMap getMap() {
+ return _map;
+ }
+
+ public IPersistentMap setMap(IPersistentMap map) {
+ _map = map;
+ return _map;
+ }
+
+ public int size() {
+ return ((Map) getMap()).size();
+ }
+
+ public int count() {
+ return size();
+ }
+
+ public ISeq seq() {
+ return getMap().seq();
+ }
+
+ @Override
+ public Object valAt(Object o) {
+ if(o instanceof Keyword) {
+ return valAt(((Keyword) o).getName());
+ }
+ return getMap().valAt(o);
+ }
+
+ @Override
+ public Object valAt(Object o, Object def) {
+ Object ret = valAt(o);
+ if(ret==null) ret = def;
+ return ret;
+ }
+
+ /* IFn */
+ @Override
+ public Object invoke(Object o) {
+ return valAt(o);
+ }
+
+ @Override
+ public Object invoke(Object o, Object notfound) {
+ return valAt(o, notfound);
+ }
+
+ /* IPersistentMap */
+ /* Naive implementation, but it might be good enough */
+ public IPersistentMap assoc(Object k, Object v) {
+ if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);
+
+ return new IndifferentAccessMap(getMap().assoc(k, v));
+ }
+
+ public IPersistentMap assocEx(Object k, Object v) throws Exception {
+ if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);
+
+ return new IndifferentAccessMap(getMap().assocEx(k, v));
+ }
+
+ public IPersistentMap without(Object k) throws Exception {
+ if(k instanceof Keyword) return without(((Keyword) k).getName());
+
+ return new IndifferentAccessMap(getMap().without(k));
+ }
+
+ public boolean containsKey(Object k) {
+ if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
+ return getMap().containsKey(k);
+ }
+
+ public IMapEntry entryAt(Object k) {
+ if(k instanceof Keyword) return entryAt(((Keyword) k).getName());
+
+ return getMap().entryAt(k);
+ }
+
+ public IPersistentCollection cons(Object o) {
+ return getMap().cons(o);
+ }
+
+ public IPersistentCollection empty() {
+ return new IndifferentAccessMap(PersistentArrayMap.EMPTY);
+ }
+
+ public boolean equiv(Object o) {
+ return getMap().equiv(o);
+ }
+
+ public Iterator iterator() {
+ return getMap().iterator();
+ }
+
+ /* Map */
+ public boolean containsValue(Object v) {
+ return ((Map) getMap()).containsValue(v);
+ }
+
+ public Set entrySet() {
+ return ((Map) getMap()).entrySet();
+ }
+
+ public Object get(Object k) {
+ return valAt(k);
+ }
+
+ public boolean isEmpty() {
+ return ((Map) getMap()).isEmpty();
+ }
+
+ public Set keySet() {
+ return ((Map) getMap()).keySet();
+ }
+
+ public Collection values() {
+ return ((Map) getMap()).values();
+ }
+
+ /* Not implemented */
+ public void clear() {
+ }
+ public Object put(Object k, Object v) {
+ return null;
+ }
+ public void putAll(Map m) {
+ }
+ public Object remove(Object k) {
+ return null;
+ }
+}
@@ -39,7 +39,7 @@
* use another type, you'll need to implement and register a serializer for that type.
* See {@link http://github.com/nathanmarz/storm/wiki/Serialization} for more info.
*/
-public class Tuple extends AFn implements ILookup, Seqable, Indexed, IMeta, IPersistentMap, Map {
+public class Tuple extends IndifferentAccessMap implements Seqable, Indexed, IMeta {
private List<Object> values;
private int taskId;
private String streamId;
@@ -49,6 +49,7 @@
//needs to get taskId explicitly b/c could be in a different task than where it was created
public Tuple(TopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
+ super();
this.values = values;
this.taskId = taskId;
this.streamId = streamId;
@@ -306,24 +307,6 @@ public Object valAt(Object o) {
return null;
}
- @Override
- public Object valAt(Object o, Object def) {
- Object ret = valAt(o);
- if(ret==null) ret = def;
- return ret;
- }
-
- /* IFn */
- @Override
- public Object invoke(Object o) {
- return valAt(o);
- }
-
- @Override
- public Object invoke(Object o, Object notfound) {
- return valAt(o, notfound);
- }
-
/* Seqable */
public ISeq seq() {
if(values.size() > 0) {
@@ -405,88 +388,10 @@ private PersistentArrayMap toMap() {
return new PersistentArrayMap(array);
}
- /* IPersistentMap */
- /* Naive implementation, but it might be good enough */
- public IPersistentMap assoc(Object k, Object v) {
- if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);
-
- return toMap().assoc(k, v);
- }
-
- public IPersistentMap assocEx(Object k, Object v) throws Exception {
- if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);
-
- return toMap().assocEx(k, v);
- }
-
- public IPersistentMap without(Object k) throws Exception {
- if(k instanceof Keyword) return without(((Keyword) k).getName());
-
- return toMap().without(k);
- }
-
- public boolean containsKey(Object k) {
- if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
- return toMap().containsKey(k);
- }
-
- public IMapEntry entryAt(Object k) {
- if(k instanceof Keyword) return entryAt(((Keyword) k).getName());
-
- return toMap().entryAt(k);
- }
-
- public IPersistentCollection cons(Object o) {
- return toMap().cons(o);
- }
-
- public IPersistentCollection empty() {
- return PersistentArrayMap.EMPTY;
- }
-
- public boolean equiv(Object o) {
- return toMap().equiv(o);
- }
-
- public Iterator iterator() {
- return toMap().iterator();
- }
-
- /* Map */
- public boolean containsValue(Object v) {
- return values.contains(v);
- }
-
- public Set entrySet() {
- return toMap().entrySet();
- }
-
- public Object get(Object k) {
- return valAt(k);
- }
-
- public boolean isEmpty() {
- return values.size() == 0;
- }
-
- public Set keySet() {
- return toMap().keySet();
- }
-
- public Collection values() {
- return values;
- }
-
- /* Not implemented */
- public void clear() {
- }
- public Object put(Object k, Object v) {
- return null;
- }
- public void putAll(Map m) {
- }
- public Object remove(Object k) {
- return null;
+ public IPersistentMap getMap() {
+ if(_map==null) {
+ setMap(toMap());
+ }
+ return _map;
}
-
}
Oops, something went wrong.

0 comments on commit b37dfdb

Please sign in to comment.