Permalink
Browse files

initial statespout commit

  • Loading branch information...
1 parent 9d91adb commit b0a1b12c99dac3e92a74e3032f9f62c89b577517 @nathanmarz committed Sep 17, 2011
View
1 TODO
@@ -5,6 +5,7 @@ Use cases:
#################
+
* Repackage jzmq and zmq as a leiningen "native dep"
- this might be good, since the native dep can package builds for all different systems/os's?
View
@@ -59,6 +59,6 @@ topology.message.timeout.secs: 30
topology.skip.missing.serializations: false
topology.max.task.parallelism: null
topology.max.spout.pending: null
-topology.state.synchronization.timeout.secs: 60
+topology.state.sync.timeout.secs: 60
+topology.state.sync.max.tries: 3
topology.stats.sample.rate: 0.05
-
@@ -8,12 +8,15 @@
TimeCacheMap$ExpiredCallback BufferFileInputStream]))
(import (quote [backtype.storm.serialization TupleSerializer TupleDeserializer]))
(import (quote [backtype.storm.spout ISpout SpoutOutputCollector ISpoutOutputCollector ShellSpout]))
- (import (quote [backtype.storm.tuple Tuple Fields MessageId]))
+ (import (quote [backtype.storm.tuple Tuple Fields MessageId TupleImpl]))
(import (quote [backtype.storm.task IBolt IOutputCollector
OutputCollector OutputCollectorImpl IInternalOutputCollector
TopologyContext ShellBolt
- CoordinatedBolt CoordinatedBolt$SourceArgs KeyedFairBolt]))
+ CoordinatedBolt CoordinatedBolt$SourceArgs KeyedFairBolt
+ ComponentType]))
(import (quote [backtype.storm.daemon Shutdownable]))
+ (import (quote [backtype.storm.state IStateSpout ISubscribedState ISynchronizeOutputCollector IStateSpoutOutputCollector
+ StateSpoutOutputCollector SynchronizeOutputCollector]))
(use (quote [backtype.storm config util log clojure]))
(use (quote [clojure.contrib.seq :only [find-first]]))
(require (quote [backtype.storm [thrift :as thrift] [cluster :as cluster]
@@ -1,13 +1,24 @@
(ns backtype.storm.daemon.common
(:use [clojure.contrib.seq-utils :only [find-first]])
(:use [backtype.storm log config util])
+ (:import [backtype.storm Constants])
)
(def ACKER-COMPONENT-ID -1)
+
(def ACKER-INIT-STREAM-ID -1)
(def ACKER-ACK-STREAM-ID -2)
(def ACKER-FAIL-STREAM-ID -3)
+(def SYNC-REQUEST-STREAM-ID -4)
+
+(def SYNC-SYNC-SUBSTREAM -1)
+(def SYNC-RESYNC-SUBSTREAM -2)
+(def SYNC-SYNC-FINISH-SUBSTREAM -3)
+(def SYNC-ADD-SUBSTREAM -4)
+(def SYNC-UPDATE-SUBSTREAM -5)
+(def SYNC-REMOVE-SUBSTREAM -6)
+(def FAILURE-SUBSTREAM Constants/FAILURE_SUBSTREAM)
(defn system-component? [id]
(< id 0))
@@ -370,6 +370,9 @@
(InvalidTopologyException.
"All component ids must be positive")))
;; TODO: validate that every declared stream is positive
+ ;; TODO: validate that everything subscribes to a valid, declared stream
+ ;; TODO: check that failure stream subscriptions are to a spout stream
+ ;; TODO: check that field groupings are valid
))
(defn file-cache-map [conf]

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -403,7 +403,7 @@
`(with-var-roots [task/outbound-components (let [old# task/outbound-components]
(fn [& args#]
(merge (apply old# args#)
- {TrackerAggregator/TRACK_STREAM
+ {[TrackerAggregator/TRACK_STREAM]
{TRACKER-BOLT-ID (fn [& args#] 0)}}
)))
task/mk-acker-bolt (let [old# task/mk-acker-bolt]
@@ -262,13 +262,12 @@
* typically used in testing to limit the number of threads spawned in local mode.
*/
public static String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
-
-
/**
* The maximum amount of time a component gives a source of state to synchronize before it requests
* synchronization again.
*/
- public static String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs";
+ public static String TOPOLOGY_STATE_SYNC_TIMEOUT_SECS="topology.state.sync.timeout.secs";
+ public static String TOPOLOGY_STATE_SYNC_MAX_TRIES="topology.state.sync.max.tries";
/**
* The percentage of tuples to sample to produce stats for a task.
@@ -3,4 +3,5 @@
public class Constants {
public static final int COORDINATED_STREAM_ID = 100;
+ public static final int FAILURE_SUBSTREAM = -1;
}
@@ -31,14 +31,21 @@
private static final TField COMPONENT_ID_FIELD_DESC = new TField("componentId", TType.I32, (short)1);
private static final TField STREAM_ID_FIELD_DESC = new TField("streamId", TType.I32, (short)2);
+ private static final TField STREAM_TYPE_FIELD_DESC = new TField("streamType", TType.I32, (short)3);
private int componentId;
private int streamId;
+ private StreamType streamType;
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements TFieldIdEnum {
COMPONENT_ID((short)1, "componentId"),
- STREAM_ID((short)2, "streamId");
+ STREAM_ID((short)2, "streamId"),
+ /**
+ *
+ * @see StreamType
+ */
+ STREAM_TYPE((short)3, "streamType");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -57,6 +64,8 @@ public static _Fields findByThriftId(int fieldId) {
return COMPONENT_ID;
case 2: // STREAM_ID
return STREAM_ID;
+ case 3: // STREAM_TYPE
+ return STREAM_TYPE;
default:
return null;
}
@@ -108,6 +117,8 @@ public String getFieldName() {
new FieldValueMetaData(TType.I32)));
tmpMap.put(_Fields.STREAM_ID, new FieldMetaData("streamId", TFieldRequirementType.REQUIRED,
new FieldValueMetaData(TType.I32)));
+ tmpMap.put(_Fields.STREAM_TYPE, new FieldMetaData("streamType", TFieldRequirementType.OPTIONAL,
+ new EnumMetaData(TType.ENUM, StreamType.class)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
FieldMetaData.addStructMetaDataMap(GlobalStreamId.class, metaDataMap);
}
@@ -134,6 +145,9 @@ public GlobalStreamId(GlobalStreamId other) {
__isset_bit_vector.or(other.__isset_bit_vector);
this.componentId = other.componentId;
this.streamId = other.streamId;
+ if (other.is_set_streamType()) {
+ this.streamType = other.streamType;
+ }
}
public GlobalStreamId deepCopy() {
@@ -189,6 +203,37 @@ public void set_streamId_isSet(boolean value) {
__isset_bit_vector.set(__STREAMID_ISSET_ID, value);
}
+ /**
+ *
+ * @see StreamType
+ */
+ public StreamType get_streamType() {
+ return this.streamType;
+ }
+
+ /**
+ *
+ * @see StreamType
+ */
+ public void set_streamType(StreamType streamType) {
+ this.streamType = streamType;
+ }
+
+ public void unset_streamType() {
+ this.streamType = null;
+ }
+
+ /** Returns true if field streamType is set (has been asigned a value) and false otherwise */
+ public boolean is_set_streamType() {
+ return this.streamType != null;
+ }
+
+ public void set_streamType_isSet(boolean value) {
+ if (!value) {
+ this.streamType = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case COMPONENT_ID:
@@ -207,6 +252,14 @@ public void setFieldValue(_Fields field, Object value) {
}
break;
+ case STREAM_TYPE:
+ if (value == null) {
+ unset_streamType();
+ } else {
+ set_streamType((StreamType)value);
+ }
+ break;
+
}
}
@@ -222,6 +275,9 @@ public Object getFieldValue(_Fields field) {
case STREAM_ID:
return new Integer(get_streamId());
+ case STREAM_TYPE:
+ return get_streamType();
+
}
throw new IllegalStateException();
}
@@ -237,6 +293,8 @@ public boolean isSet(_Fields field) {
return is_set_componentId();
case STREAM_ID:
return is_set_streamId();
+ case STREAM_TYPE:
+ return is_set_streamType();
}
throw new IllegalStateException();
}
@@ -276,6 +334,15 @@ public boolean equals(GlobalStreamId that) {
return false;
}
+ boolean this_present_streamType = true && this.is_set_streamType();
+ boolean that_present_streamType = true && that.is_set_streamType();
+ if (this_present_streamType || that_present_streamType) {
+ if (!(this_present_streamType && that_present_streamType))
+ return false;
+ if (!this.streamType.equals(that.streamType))
+ return false;
+ }
+
return true;
}
@@ -293,6 +360,11 @@ public int hashCode() {
if (present_streamId)
builder.append(streamId);
+ boolean present_streamType = true && (is_set_streamType());
+ builder.append(present_streamType);
+ if (present_streamType)
+ builder.append(streamType.getValue());
+
return builder.toHashCode();
}
@@ -322,6 +394,15 @@ public int compareTo(GlobalStreamId other) {
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(is_set_streamType()).compareTo(typedOther.is_set_streamType());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_streamType()) { lastComparison = TBaseHelper.compareTo(this.streamType, typedOther.streamType);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -351,6 +432,13 @@ public void read(TProtocol iprot) throws TException {
TProtocolUtil.skip(iprot, field.type);
}
break;
+ case 3: // STREAM_TYPE
+ if (field.type == TType.I32) {
+ this.streamType = StreamType.findByValue(iprot.readI32());
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
default:
TProtocolUtil.skip(iprot, field.type);
}
@@ -370,6 +458,13 @@ public void write(TProtocol oprot) throws TException {
oprot.writeFieldBegin(STREAM_ID_FIELD_DESC);
oprot.writeI32(this.streamId);
oprot.writeFieldEnd();
+ if (this.streamType != null) {
+ if (is_set_streamType()) {
+ oprot.writeFieldBegin(STREAM_TYPE_FIELD_DESC);
+ oprot.writeI32(this.streamType.getValue());
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -386,6 +481,16 @@ public String toString() {
sb.append("streamId:");
sb.append(this.streamId);
first = false;
+ if (is_set_streamType()) {
+ if (!first) sb.append(", ");
+ sb.append("streamType:");
+ if (this.streamType == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.streamType);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -0,0 +1,44 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum StreamType implements TEnum {
+ NORMAL(1),
+ FAILURE(2);
+
+ private final int value;
+
+ private StreamType(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static StreamType findByValue(int value) {
+ switch (value) {
+ case 1:
+ return NORMAL;
+ case 2:
+ return FAILURE;
+ default:
+ return null;
+ }
+ }
+}
Oops, something went wrong.

0 comments on commit b0a1b12

Please sign in to comment.