Skip to content

Commit

Permalink
Made Tuples operate better in Clojure
Browse files Browse the repository at this point in the history
* Now implements ILookup, AFn, Seqable, Indexed, IMeta
* Fake tuple generator for testing
  • Loading branch information
schleyfox committed Jan 9, 2012
1 parent a3094ef commit d9df90f
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 15 deletions.
13 changes: 12 additions & 1 deletion src/clj/backtype/storm/testing.clj
Expand Up @@ -11,7 +11,8 @@
(:import [java.util.concurrent.atomic AtomicInteger])
(:import [java.util.concurrent ConcurrentHashMap])
(:import [backtype.storm.utils Time Utils RegisteredGlobalState])
(:import [backtype.storm.tuple Fields])
(:import [backtype.storm.tuple Fields Tuple])
(:import [backtype.storm.task TopologyContext])
(:import [backtype.storm.generated GlobalStreamId Bolt])
(:import [backtype.storm.testing FeederSpout FixedTupleSpout FixedTuple TupleCaptureBolt
SpoutTracker BoltTracker])
Expand Down Expand Up @@ -458,3 +459,13 @@
(Thread/sleep 5))
(reset! (:last-spout-emit tracked-topology) target)
)))

(defn ^{:dirty-hack true} fake-tuple [fields values]
(let [ task->component {1 "1"}
topo (mk-topology
{"1" (mk-spout-spec
(feeder-spout fields))}
{})
context (TopologyContext. topo task->component "fake" "" "" 1)]
(Tuple. context values 1 "default")
))
126 changes: 112 additions & 14 deletions src/jvm/backtype/storm/tuple/Tuple.java
Expand Up @@ -5,11 +5,23 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.Utils;
import clojure.lang.ILookup;
import clojure.lang.Seqable;
import clojure.lang.Indexed;
import clojure.lang.Counted;
import clojure.lang.ISeq;
import clojure.lang.ASeq;
import clojure.lang.AFn;
import clojure.lang.IPersistentMap;
import clojure.lang.PersistentArrayMap;
import clojure.lang.Obj;
import clojure.lang.IMeta;
import clojure.lang.Keyword;
import clojure.lang.Symbol;
import clojure.lang.MapEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import clojure.lang.RT;

/**
* The tuple is the main data structure in Storm. A tuple is a named list of values,
Expand All @@ -22,12 +34,13 @@
* use another type, you'll need to implement and register a serializer for that type.
* See {@link http://github.com/nathanmarz/storm/wiki/Serialization} for more info.
*/
public class Tuple implements ILookup {
public class Tuple extends AFn implements ILookup, Seqable, Indexed, IMeta {
private List<Object> values;
private int taskId;
private String streamId;
private TopologyContext context;
private MessageId id;
private IPersistentMap _meta;

//needs to get taskId explicitly b/c could be in a different task than where it was created
public Tuple(TopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
Expand All @@ -36,6 +49,10 @@ public Tuple(TopologyContext context, List<Object> values, int taskId, String st
this.streamId = streamId;
this.id = id;
this.context = context;
this._meta = new PersistentArrayMap( new Object[] {
makeKeyword("stream"), getSourceStreamId(),
makeKeyword("component"), getSourceComponent(),
makeKeyword("task"), getSourceTask()});

String componentId = context.getComponentId(taskId);
Fields schema = context.getComponentOutputFields(componentId, streamId);
Expand Down Expand Up @@ -266,22 +283,20 @@ public int hashCode() {
return System.identityHashCode(this);
}

private static final Keyword makeKeyword(String name) {
private final Keyword makeKeyword(String name) {
return Keyword.intern(Symbol.create(name));
}

private static final Keyword STREAM_KEYWORD = makeKeyword("stream");
private static final Keyword COMPONENT_KEYWORD = makeKeyword("component");
private static final Keyword TASK_KEYWORD = makeKeyword("task");

}

/* ILookup */
@Override
public Object valAt(Object o) {
if(o.equals(STREAM_KEYWORD)) {
return getSourceStreamId();
} else if(o.equals(COMPONENT_KEYWORD)) {
return getSourceComponent();
} else if(o.equals(TASK_KEYWORD)) {
return getSourceTask();
try {
if(o instanceof Keyword) {
return getValueByField(((Keyword) o).getName());
} else if(o instanceof String) {
return getValueByField((String) o);
}
} catch(IllegalArgumentException e) {
}
return null;
}
Expand All @@ -292,4 +307,87 @@ public Object valAt(Object o, Object def) {
if(ret==null) ret = def;
return ret;
}

/* IFn */
@Override
public Object invoke(Object o) {
return valAt(o);
}

@Override
public Object invoke(Object o, Object notfound) {
return valAt(o, notfound);
}

/* Seqable */
public ISeq seq() {
if(values.size() > 0) {
return new Seq(getFields().toList(), values, 0);
}
return null;
}

static class Seq extends ASeq implements Counted {
final List<String> fields;
final List<Object> values;
final int i;

Seq(List<String> fields, List<Object> values, int i) {
this.fields = fields;
this.values = values;
this.i = i;
}

public Seq(IPersistentMap meta, List<String> fields, List<Object> values, int i) {
super(meta);
this.fields= fields;
this.values = values;
this.i = i;
}

public Object first() {
return new MapEntry(fields.get(i), values.get(i));
}

public ISeq next() {
if(i+1 < fields.size()) {
return new Seq(fields, values, i+1);
}
return null;
}

public int count() {
return fields.size();
}

public Obj withMeta(IPersistentMap meta) {
return new Seq(meta, fields, values, i);
}
}

/* Indexed */
public Object nth(int i) {
if(i < values.size()) {
return values.get(i);
} else {
return null;
}
}

public Object nth(int i, Object notfound) {
Object ret = nth(i);
if(ret==null) ret = notfound;
return ret;
}

/* Counted */
public int count() {
return values.size();
}

/* IMeta */
public IPersistentMap meta() {
return _meta;
}

}

0 comments on commit d9df90f

Please sign in to comment.