Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clojure tuples #94

Merged
merged 15 commits into from
Feb 2, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 47 additions & 16 deletions src/clj/backtype/storm/clojure.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@
(hint collector 'backtype.storm.task.OutputCollector)]
)

; Special case for clojure where we use a closure instead of the prepare
; method
(defmethod hinted-args 'prepare-fn [_ [conf context collector]]
[(hint conf 'java.util.Map)
(hint context 'backtype.storm.task.TopologyContext)
(hint collector 'java.util.Map)])

(defmethod hinted-args 'execute [_ [tuple]]
[(hint tuple 'backtype.storm.tuple.Tuple)]
)
Expand All @@ -34,6 +41,11 @@
[]
)

(defmethod hinted-args 'open-fn [_ [conf context collector]]
[(hint conf 'java.util.Map)
(hint context 'backtype.storm.task.TopologyContext)
(hint collector 'java.util.Map)]
)
(defmethod hinted-args 'open [_ [conf context collector]]
[(hint conf 'java.util.Map)
(hint context 'backtype.storm.task.TopologyContext)
Expand Down Expand Up @@ -116,7 +128,7 @@
(let [[args & impl-body] impl
coll-sym (nth args 1)
args (vec (take 1 args))
prepargs (hinted-args 'prepare [(gensym "conf") (gensym "context") coll-sym])]
prepargs (hinted-args 'prepare-fn [(gensym "conf") (gensym "context") coll-sym])]
`(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
definer (if params
`(defn ~name [& args#]
Expand Down Expand Up @@ -148,7 +160,7 @@
(cons 'fn impl)
(let [[args & impl-body] impl
coll-sym (first args)
prepargs (hinted-args 'open [(gensym "conf") (gensym "context") coll-sym])]
prepargs (hinted-args 'open-fn [(gensym "conf") (gensym "context") coll-sym])]
`(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
definer (if params
`(defn ~name [& args#]
Expand All @@ -167,31 +179,50 @@
~definer
))))

(defnk emit-bolt! [^OutputCollector collector ^List values
(defprotocol TupleValues
(tuple-values [values collector stream]))

(extend-protocol TupleValues
java.util.Map
(tuple-values [this collector ^String stream]
(let [ fields (.. (:context collector) (getThisOutputFields stream) toList) ]
(vec (map (into
(empty this) (for [[k v] this]
[(if (keyword? k) (name k) k) v]))
fields))))
java.util.List
(tuple-values [this collector stream]
this))

(defnk emit-bolt! [collector ^TupleValues values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
(let [^List anchor (collectify anchor)]
(.emit collector stream anchor values)
(let [^List anchor (collectify anchor)
values (tuple-values values collector stream) ]
(.emit (:output-collector collector) stream anchor values)
))

(defnk emit-direct-bolt! [^OutputCollector collector task ^List values
(defnk emit-direct-bolt! [collector task ^TupleValues values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
(let [^List anchor (collectify anchor)]
(.emitDirect collector task stream anchor values)
(let [^List anchor (collectify anchor)
values (tuple-values values collector stream) ]
(.emitDirect (:output-collector collector) task stream anchor values)
))

(defn ack! [^OutputCollector collector ^Tuple tuple]
(.ack collector tuple))
(defn ack! [collector ^Tuple tuple]
(.ack (:output-collector collector) tuple))

(defn fail! [^OutputCollector collector ^Tuple tuple]
(.fail collector tuple))
(defn fail! [collector ^Tuple tuple]
(.fail (:output-collector collector) tuple))

(defnk emit-spout! [^SpoutOutputCollector collector ^List values
(defnk emit-spout! [collector ^TupleValues values
:stream Utils/DEFAULT_STREAM_ID :id nil]
(.emit collector stream values id))
(let [values (tuple-values values collector stream)]
(.emit (:output-collector collector) stream values id)))

(defnk emit-direct-spout! [^SpoutOutputCollector collector task ^List values
(defnk emit-direct-spout! [collector task ^TupleValues values
:stream Utils/DEFAULT_STREAM_ID :id nil]
(.emitDirect collector task stream values id))
(let [values (tuple-values values collector stream)]
(.emitDirect (:output-collector collector) task stream values id)))

(defalias topology thrift/mk-topology)
(defalias bolt-spec thrift/mk-bolt-spec)
Expand Down
3 changes: 2 additions & 1 deletion src/clj/backtype/storm/testing.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
(:import [java.util.concurrent.atomic AtomicInteger])
(:import [java.util.concurrent ConcurrentHashMap])
(:import [backtype.storm.utils Time Utils RegisteredGlobalState])
(:import [backtype.storm.tuple Fields])
(:import [backtype.storm.tuple Fields Tuple])
(:import [backtype.storm.task TopologyContext])
(:import [backtype.storm.generated GlobalStreamId Bolt])
(:import [backtype.storm.testing FeederSpout FixedTupleSpout FixedTuple
TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker
Expand Down
8 changes: 7 additions & 1 deletion src/jvm/backtype/storm/clojure/ClojureBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import clojure.lang.IFn;
import clojure.lang.PersistentArrayMap;
import clojure.lang.Keyword;
import clojure.lang.Symbol;
import clojure.lang.RT;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -37,10 +40,13 @@ public void prepare(final Map stormConf, final TopologyContext context, final Ou
IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
try {
IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
Keyword.intern(Symbol.create("output-collector")), collector,
Keyword.intern(Symbol.create("context")), context});
List<Object> args = new ArrayList<Object>() {{
add(stormConf);
add(context);
add(collector);
add(collectorMap);
}};

_bolt = (IBolt) preparer.applyTo(RT.seq(args));
Expand Down
8 changes: 7 additions & 1 deletion src/jvm/backtype/storm/clojure/ClojureSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import clojure.lang.IFn;
import clojure.lang.PersistentArrayMap;
import clojure.lang.Keyword;
import clojure.lang.Symbol;
import clojure.lang.RT;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -35,10 +38,13 @@ public void open(final Map conf, final TopologyContext context, final SpoutOutpu
IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
try {
IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
Keyword.intern(Symbol.create("output-collector")), collector,
Keyword.intern(Symbol.create("context")), context});
List<Object> args = new ArrayList<Object>() {{
add(conf);
add(context);
add(collector);
add(collectorMap);
}};

_spout = (ISpout) preparer.applyTo(RT.seq(args));
Expand Down
2 changes: 1 addition & 1 deletion src/jvm/backtype/storm/task/OutputCollectorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public OutputCollectorImpl(TopologyContext context, IInternalOutputCollector col
_context = context;
_collector = collector;
}

public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
return _collector.emit(anchorTuple(anchors, streamId, tuple));
}
Expand Down
170 changes: 170 additions & 0 deletions src/jvm/backtype/storm/tuple/IndifferentAccessMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package backtype.storm.tuple;


import clojure.lang.ILookup;
import clojure.lang.Seqable;
import clojure.lang.Indexed;
import clojure.lang.Counted;
import clojure.lang.ISeq;
import clojure.lang.ASeq;
import clojure.lang.AFn;
import clojure.lang.IPersistentMap;
import clojure.lang.PersistentArrayMap;
import clojure.lang.IMapEntry;
import clojure.lang.IPersistentCollection;
import clojure.lang.Obj;
import clojure.lang.IMeta;
import clojure.lang.Keyword;
import clojure.lang.Symbol;
import clojure.lang.MapEntry;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Collection;
import java.util.Set;
import clojure.lang.RT;

public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap, Map {

protected IPersistentMap _map;

protected IndifferentAccessMap() {
}

public IndifferentAccessMap(IPersistentMap map) {
setMap(map);
}

public IPersistentMap getMap() {
return _map;
}

public IPersistentMap setMap(IPersistentMap map) {
_map = map;
return _map;
}

public int size() {
return ((Map) getMap()).size();
}

public int count() {
return size();
}

public ISeq seq() {
return getMap().seq();
}

@Override
public Object valAt(Object o) {
if(o instanceof Keyword) {
return valAt(((Keyword) o).getName());
}
return getMap().valAt(o);
}

@Override
public Object valAt(Object o, Object def) {
Object ret = valAt(o);
if(ret==null) ret = def;
return ret;
}

/* IFn */
@Override
public Object invoke(Object o) {
return valAt(o);
}

@Override
public Object invoke(Object o, Object notfound) {
return valAt(o, notfound);
}

/* IPersistentMap */
/* Naive implementation, but it might be good enough */
public IPersistentMap assoc(Object k, Object v) {
if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);

return new IndifferentAccessMap(getMap().assoc(k, v));
}

public IPersistentMap assocEx(Object k, Object v) throws Exception {
if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);

return new IndifferentAccessMap(getMap().assocEx(k, v));
}

public IPersistentMap without(Object k) throws Exception {
if(k instanceof Keyword) return without(((Keyword) k).getName());

return new IndifferentAccessMap(getMap().without(k));
}

public boolean containsKey(Object k) {
if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
return getMap().containsKey(k);
}

public IMapEntry entryAt(Object k) {
if(k instanceof Keyword) return entryAt(((Keyword) k).getName());

return getMap().entryAt(k);
}

public IPersistentCollection cons(Object o) {
return getMap().cons(o);
}

public IPersistentCollection empty() {
return new IndifferentAccessMap(PersistentArrayMap.EMPTY);
}

public boolean equiv(Object o) {
return getMap().equiv(o);
}

public Iterator iterator() {
return getMap().iterator();
}

/* Map */
public boolean containsValue(Object v) {
return ((Map) getMap()).containsValue(v);
}

public Set entrySet() {
return ((Map) getMap()).entrySet();
}

public Object get(Object k) {
return valAt(k);
}

public boolean isEmpty() {
return ((Map) getMap()).isEmpty();
}

public Set keySet() {
return ((Map) getMap()).keySet();
}

public Collection values() {
return ((Map) getMap()).values();
}

/* Not implemented */
public void clear() {
throw new UnsupportedOperationException();
}
public Object put(Object k, Object v) {
throw new UnsupportedOperationException();
}
public void putAll(Map m) {
throw new UnsupportedOperationException();
}
public Object remove(Object k) {
throw new UnsupportedOperationException();
}
}
Loading