/
thrift.clj
198 lines (166 loc) · 6.82 KB
/
thrift.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
(ns backtype.storm.thrift
(:import [backtype.storm.generated JavaObject Grouping Nimbus StormTopology Bolt Nimbus$Client
Nimbus$Iface ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
GlobalStreamId ComponentObject ComponentObject$_Fields ShellComponent])
(:import [backtype.storm.utils Utils])
(:import [backtype.storm Constants])
(:import [backtype.storm.grouping CustomStreamGrouping])
(:import [backtype.storm.drpc CoordinatedBolt CoordinatedBolt$SourceArgs
KeyedFairBolt])
(:import [backtype.storm.topology OutputFieldsGetter IBasicBolt BasicBoltExecutor])
(:import [org.apache.thrift7.protocol TBinaryProtocol TProtocol])
(:import [org.apache.thrift7.transport TTransport TFramedTransport TSocket])
(:use [backtype.storm util config])
(:use [clojure.contrib.def :only [defnk]])
)
(defn instantiate-java-object [^JavaObject obj]
(let [name (symbol (.get_full_class_name obj))
args (map (memfn getFieldValue) (.get_args_list obj))]
(eval `(new ~name ~@args))
))
(def grouping-constants
{Grouping$_Fields/FIELDS :fields
Grouping$_Fields/SHUFFLE :shuffle
Grouping$_Fields/ALL :all
Grouping$_Fields/NONE :none
Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
Grouping$_Fields/CUSTOM_OBJECT :custom-object
Grouping$_Fields/DIRECT :direct
})
(defn grouping-type [^Grouping grouping]
(grouping-constants (.getSetField grouping)))
(defn field-grouping [^Grouping grouping]
(when-not (= (grouping-type grouping) :fields)
(throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping")))
(.get_fields grouping))
(defn global-grouping? [^Grouping grouping]
(and (= :fields (grouping-type grouping))
(empty? (field-grouping grouping))
))
(defn parallelism-hint [^ComponentCommon component-common]
(let [phint (.get_parallelism_hint component-common)]
(if (= phint 0) 1 phint)
))
(defn nimbus-client-and-conn [host port]
(let [transport (TFramedTransport. (TSocket. host port))
prot (TBinaryProtocol. transport)
client (Nimbus$Client. prot)]
(.open transport)
[client transport] ))
(defmacro with-nimbus-connection [[client-sym host port] & body]
`(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)]
(try
~@body
(finally (.close conn#)))
))
(defmacro with-configured-nimbus-connection [client-sym & body]
`(let [conf# (read-storm-config)
host# (conf# NIMBUS-HOST)
port# (conf# NIMBUS-THRIFT-PORT)]
(with-nimbus-connection [~client-sym host# port#]
~@body )))
(defn mk-component-common [component parallelism-hint]
(let [getter (OutputFieldsGetter.)
_ (.declareOutputFields component getter)
ret (ComponentCommon. (.getFieldsDeclaration getter))]
(when parallelism-hint
(.set_parallelism_hint ret parallelism-hint))
ret
))
(defn direct-output-fields [fields]
(StreamInfo. fields true))
(defn output-fields [fields]
(StreamInfo. fields false))
(defn mk-output-spec [output-spec]
(let [output-spec (if (map? output-spec) output-spec {Utils/DEFAULT_STREAM_ID output-spec})]
(map-val
(fn [out]
(if (instance? StreamInfo out)
out
(StreamInfo. out false)
))
output-spec
)))
(defn mk-plain-component-common [output-spec parallelism-hint]
(let [ret (ComponentCommon. (mk-output-spec output-spec))]
(when parallelism-hint
(.set_parallelism_hint ret parallelism-hint))
ret
))
(defnk mk-spout-spec [spout :parallelism-hint nil :p nil]
;; for backwards compatibility
(let [parallelism-hint (if p p parallelism-hint)]
(SpoutSpec. (ComponentObject/serialized_java (Utils/serialize spout))
(mk-component-common spout parallelism-hint)
(.isDistributed spout))
))
(defn mk-shuffle-grouping []
(Grouping/shuffle (NullStruct.)))
(defn mk-fields-grouping [fields]
(Grouping/fields fields))
(defn mk-global-grouping []
(mk-fields-grouping []))
(defn mk-direct-grouping []
(Grouping/direct (NullStruct.)))
(defn mk-all-grouping []
(Grouping/all (NullStruct.)))
(defn mk-none-grouping []
(Grouping/none (NullStruct.)))
(defn deserialized-component-object [^ComponentObject obj]
(when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
(throw (RuntimeException. "Cannot deserialize non-java-serialized object")))
(Utils/deserialize (.get_serialized_java obj))
)
(defn serialize-component-object [obj]
(ComponentObject/serialized_java (Utils/serialize obj)))
(defn mk-grouping [grouping-spec]
(cond (nil? grouping-spec) (mk-none-grouping)
(instance? Grouping grouping-spec) grouping-spec
(instance? CustomStreamGrouping grouping-spec) (Grouping/custom_serialized (Utils/serialize grouping-spec))
(instance? JavaObject grouping-spec) (Grouping/custom_object grouping-spec)
(sequential? grouping-spec) (mk-fields-grouping grouping-spec)
(= grouping-spec :shuffle) (mk-shuffle-grouping)
(= grouping-spec :none) (mk-none-grouping)
(= grouping-spec :all) (mk-all-grouping)
(= grouping-spec :global) (mk-global-grouping)
(= grouping-spec :direct) (mk-direct-grouping)
true (throw (IllegalArgumentException. (str grouping-spec " is not a valid grouping")))
))
(defn- mk-inputs [inputs]
(into {}
(for [[stream-id grouping-spec] inputs]
[(if (sequential? stream-id)
(GlobalStreamId. (first stream-id) (second stream-id))
(GlobalStreamId. stream-id (Utils/DEFAULT_STREAM_ID)))
(mk-grouping grouping-spec)]
)))
(defnk mk-bolt-spec [inputs bolt :parallelism-hint nil :p nil]
;; for backwards compatibility
(let [parallelism-hint (if p p parallelism-hint)
bolt (if (instance? IBasicBolt bolt) (BasicBoltExecutor. bolt) bolt)]
(Bolt.
(mk-inputs inputs)
(ComponentObject/serialized_java (Utils/serialize bolt))
(mk-component-common bolt parallelism-hint)
)))
(defnk mk-shell-bolt-spec [inputs command script output-spec :parallelism-hint nil :p nil]
;; for backwards compatibility
(let [parallelism-hint (if p p parallelism-hint)]
(Bolt.
(mk-inputs inputs)
(ComponentObject/shell (ShellComponent. command script))
(mk-plain-component-common output-spec parallelism-hint)
)))
(defn mk-topology
([spout-map bolt-map]
(StormTopology. spout-map bolt-map {}))
([spout-map bolt-map state-spout-map]
(StormTopology. spout-map bolt-map state-spout-map)))
(defnk coordinated-bolt [bolt :type nil :all-out false]
(let [source (condp = type
nil nil
:all (CoordinatedBolt$SourceArgs/all)
:single (CoordinatedBolt$SourceArgs/single))]
(CoordinatedBolt. bolt source all-out)
))
(def COORD-STREAM Constants/COORDINATED_STREAM_ID)