forked from riemann/riemann
-
Notifications
You must be signed in to change notification settings - Fork 0
/
udp.clj
181 lines (161 loc) · 7.46 KB
/
udp.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
(ns riemann.transport.udp
"Accepts messages from external sources. Associated with a core. Sends
incoming events to the core's streams, queries the core's index for states."
(:import [java.net InetSocketAddress]
[java.util.concurrent Executors]
[io.netty.bootstrap Bootstrap]
[io.netty.channel Channel
ChannelHandler
ChannelInitializer
ChannelOption
ChannelHandlerContext
DefaultMessageSizeEstimator
ChannelOutboundHandler
ChannelInboundHandler
ChannelInboundHandlerAdapter
FixedRecvByteBufAllocator]
[io.netty.channel.group ChannelGroup]
[io.netty.channel.socket.nio NioDatagramChannel]
[io.netty.channel.nio NioEventLoopGroup])
(:require [interval-metrics.core :as metrics]
[riemann.test :as test])
(:use [clojure.tools.logging :only [warn info]]
[clojure.string :only [split]]
[riemann.instrumentation :only [Instrumented]]
[riemann.service :only [Service ServiceEquiv]]
[riemann.time :only [unix-time]]
[riemann.transport :only [handle
ioutil-lock
channel-group
datagram->byte-buf-decoder
protobuf-decoder
msg-decoder
shutdown-event-executor-group
shared-event-executor
channel-initializer]]))
(defn gen-udp-handler
[core stats ^ChannelGroup channel-group handler]
(proxy [ChannelInboundHandlerAdapter] []
(channelActive [^ChannelHandlerContext ctx]
(.add channel-group (.channel ctx)))
(channelRead [^ChannelHandlerContext ctx ^Object message]
(handler @core stats ctx message))
(exceptionCaught [^ChannelHandlerContext ctx ^Throwable cause]
(warn cause "UDP handler caught"))
(isSharable [] true)))
(defn udp-handler
"Given a core, a channel, and a message, applies the message to core."
[core stats ctx message]
(handle core message)
(metrics/update! stats (- (System/nanoTime) (:decode-time message))))
(defrecord UDPServer [^String host
^int port
max-size
^int so-rcvbuf
^ChannelGroup channel-group
^ChannelHandler handler
stats
core
killer]
; core is an atom to a core
; killer is an atom to a function that shuts down the server
ServiceEquiv
; TODO compare pipeline-factory!
(equiv? [this other]
(and (instance? UDPServer other)
(= max-size (:max-size other))
(= so-rcvbuf (:so-rcvbuf other))
(= host (:host other))
(= port (:port other))))
Service
(conflict? [this other]
(and (instance? UDPServer other)
(= max-size (:max-size other))
(= so-rcvbuf (:so-rcvbuf other))
(= host (:host other))
(= port (:port other))))
(reload! [this new-core]
(reset! core new-core))
(start! [this]
(when-not test/*testing*
(locking ioutil-lock
(locking this
(when-not @killer
(let [worker-group (NioEventLoopGroup.)
bootstrap (Bootstrap.)]
; Configure bootstrap
(doto bootstrap
(.group worker-group)
(.channel NioDatagramChannel)
(.option ChannelOption/SO_BROADCAST false)
(.option ChannelOption/MESSAGE_SIZE_ESTIMATOR
(DefaultMessageSizeEstimator. max-size))
(.option ChannelOption/RCVBUF_ALLOCATOR
(FixedRecvByteBufAllocator. max-size))
(.handler handler))
; Setup Channel options
(if (> so-rcvbuf 0) (.option bootstrap ChannelOption/SO_RCVBUF so-rcvbuf))
; Start bootstrap
(->> (InetSocketAddress. host port)
(.bind bootstrap)
(.sync)
(.channel)
(.add channel-group))
(info "UDP server" host port max-size so-rcvbuf "online")
; fn to close server
(reset! killer
(fn killer []
(-> channel-group .close .awaitUninterruptibly)
@(shutdown-event-executor-group worker-group)
(info "UDP server" host port max-size so-rcvbuf "shut down")))))))))
(stop! [this]
(locking this
(when @killer
(@killer)
(reset! killer nil))))
Instrumented
(events [this]
(let [svc (str "riemann server udp " host ":" port)
in (metrics/snapshot! stats)
base {:state "ok"
:tags ["riemann"]
:time (:time in)}]
(map (partial merge base)
(concat [{:service (str svc " in rate")
:metric (:rate in)}]
(map (fn [[q latency]]
{:service (str svc " in latency " q)
:metric latency})
(:latencies in)))))))
(defn udp-server
"Starts a new UDP server. Doesn't start until (service/start!).
IMPORTANT: The UDP server has a maximum datagram size--by default, 16384
bytes. If your client does not agree on the maximum datagram size (and send
big messages over TCP instead), it can send large messages which will be
dropped with protobuf parse errors in the log.
Options:
- :host The address to listen on (default 127.0.0.1).
- :port The port to listen on (default 5555).
- :max-size The maximum datagram size (default 16384 bytes).
- :so-rcvbuf The socket option for receive buffer in bytes (SO_RCVBUF)
- :channel-group A ChannelGroup used to track all connections
- :initializer A ChannelInitializer"
([] (udp-server {}))
([opts]
(let [core (get opts :core (atom nil))
stats (metrics/rate+latency)
host (get opts :host "127.0.0.1")
port (get opts :port 5555)
max-size (get opts :max-size 16384)
so-rcvbuf (get opts :so-rcvbuf -1)
channel-group (get opts :channel-group
(channel-group
(str "udp-server" host ":" port "(" max-size ")")))
ci (get opts :initializer
(channel-initializer
^:shared datagram-decoder (datagram->byte-buf-decoder)
^:shared protobuf-decoder (protobuf-decoder)
^:shared msg-decoder (msg-decoder)
^{:shared true :executor shared-event-executor} handler
(gen-udp-handler core stats channel-group udp-handler)))]
(UDPServer. host port max-size so-rcvbuf channel-group ci stats core (atom nil)))))