-
Notifications
You must be signed in to change notification settings - Fork 15
/
jetty.clj
236 lines (216 loc) · 9.44 KB
/
jetty.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved
;; Copyright © 2019-2022 Manetu, Inc. All rights reserved
;;
;; SPDX-License-Identifier: Apache-2.0
(ns protojure.internal.grpc.client.providers.http2.jetty
(:require [promesa.core :as p]
[promesa.exec :as p.exec]
[clojure.core.async :refer [>!! <!! <! >! go go-loop] :as async]
[clojure.tools.logging :as log])
(:import (java.net InetSocketAddress)
(java.nio ByteBuffer)
(org.eclipse.jetty.http2.client HTTP2Client)
(org.eclipse.jetty.http2.api Stream$Listener
Stream
Session)
(org.eclipse.jetty.http2.api.server ServerSessionListener$Adapter)
(org.eclipse.jetty.http2.frames HeadersFrame
DataFrame)
(org.eclipse.jetty.util Promise Callback)
(org.eclipse.jetty.util.component LifeCycle
LifeCycle$Listener)
(org.eclipse.jetty.http HttpFields
HttpField
HttpURI
HttpVersion
MetaData$Request
MetaData$Response
MetaData)
(org.eclipse.jetty.util.ssl SslContextFactory SslContextFactory$Client))
(:refer-clojure :exclude [resolve]))
(set! *warn-on-reflection* true)
;;------------------------------------------------------------------------------------
;; Utility functions
;;------------------------------------------------------------------------------------
(defn- jetty-promise
"converts a jetty promise to promesa"
[f]
(p/create
(fn [resolve reject]
(let [p (reify Promise
(succeeded [_ result]
(resolve result))
(failed [_ error]
(reject error)))]
(f p)))))
(defn- jetty-callback-promise
"converts a jetty 'callback' to promesa"
[f]
(let [p (async/promise-chan)
cb (reify Callback
(succeeded [_]
(async/put! p true))
(failed [_ error]
(async/put! p error)))]
(f cb)
p))
(defn- ->fields
"converts a map of [string string] name/value attributes to a jetty HttpFields container"
[headers]
(let [fields (HttpFields/build)]
(run! (fn [[k v]] (.put fields ^String k ^String v)) headers)
fields))
(defn- fields->
"converts jetty HttpFields container to a [string string] map"
[^HttpFields fields]
(->> (.iterator fields)
(iterator-seq)
(reduce (fn [acc ^HttpField x]
(assoc acc (.getName x) (.getValue x))) {})))
(defn- build-request
"Builds a HEADERFRAME representing our request"
[{:keys [method headers url] :or {method "GET" headers {}} :as request} last?]
(log/trace "Sending request:" request "ENDFRAME=" last?)
(let [_uri (HttpURI/from ^String url)]
(as-> (->fields headers) $
(MetaData$Request. method _uri HttpVersion/HTTP_2 $)
(HeadersFrame. $ nil last?))))
(defn- close-all! [& channels]
(run! (fn [ch] (when (some? ch) (async/close! ch))) channels))
(defn- stream-log [sev ^Stream stream & msg]
(log/log sev (apply str (cons (str "STREAM " (.getId stream) ": ") msg))))
(defn- receive-listener
"Implements a org.eclipse.jetty.http2.api.Stream.Listener set of callbacks"
[meta-ch data-ch]
(let [end-stream! (fn [stream] (stream-log :trace stream "Closing") (close-all! meta-ch data-ch))]
(reify Stream$Listener
(onHeaders [_ stream frame]
(let [^MetaData metadata (.getMetaData ^HeadersFrame frame)
fields (fields-> (.getFields metadata))
data (if (.isResponse metadata)
(let [status (.getStatus ^MetaData$Response metadata)
reason (.getReason ^MetaData$Response metadata)]
(cond-> {:headers fields}
(some? status) (assoc :status status)
(some? reason) (assoc :reason reason)))
{:trailers fields})
last? (.isEndStream ^HeadersFrame frame)]
(stream-log :trace stream "Received HEADER-FRAME: " data " ENDFRAME=" last?)
(>!! meta-ch data)
(when last?
(end-stream! stream))))
(onData [_ stream frame callback]
(let [data (.getData ^DataFrame frame)
len (.remaining data)
last? (.isEndStream ^DataFrame frame)]
(stream-log :trace stream "Received DATA-FRAME (" len " bytes) ENDFRAME=" last?)
(when (and (some? data-ch) (pos? len))
(let [clone (ByteBuffer/allocate len)]
(.put clone data)
(async/>!! data-ch (.flip clone))))
(when last?
(end-stream! stream))
(.succeeded callback)))
(onFailure [_ stream error reason ex callback]
(stream-log :error stream "FAILURE: code-> " error " message-> " (ex-message ex))
(>!! meta-ch {:error {:type :failure :code error :reason reason :ex ex}})
(end-stream! stream)
(.succeeded callback))
(onReset [_ stream frame]
(stream-log :error stream "Received RST-FRAME")
(let [error (.getError frame)]
(>!! meta-ch {:error {:type :reset :code error}})
(end-stream! stream)))
(onIdleTimeout [_ stream ex]
(stream-log :error stream "Timeout")
(>!! meta-ch {:error {:type :timeout :error ex}})
(end-stream! stream))
(onClosed [_ stream]
(stream-log :trace stream "Closed"))
(onPush [_ stream frame]
(stream-log :trace stream "Received PUSH-FRAME")))))
(defn- transmit-data-frame
"Transmits a single DATA frame"
([stream data]
(transmit-data-frame stream data false 0))
([^Stream stream ^ByteBuffer data last? padding]
(stream-log :trace stream "Sending DATA-FRAME with " (.remaining data) " bytes, ENDFRAME=" last?)
(jetty-callback-promise
(fn [cb]
(let [frame (DataFrame. (.getId stream) data last? padding)]
(.data stream frame cb))))))
(def empty-data (ByteBuffer/wrap (byte-array 0)))
(defn- transmit-eof
"Transmits an empty DATA frame with the ENDSTREAM flag set to true, signifying the end of stream"
[stream]
(transmit-data-frame stream empty-data true 0))
(defn transmit-data-frames
"Creates DATA frames from the buffers on the channel"
[input stream]
(if (some? input)
(p/create
(fn [resolve reject]
(go-loop []
(if-let [frame (<! input)]
(let [r (<! (transmit-data-frame stream frame))]
(if (= r true)
(recur)
(reject r)))
(let [r (<! (transmit-eof stream))]
(if (= r true)
(resolve true)
(reject r)))))))
(p/resolved true)))
;;------------------------------------------------------------------------------------
;; Exposed API
;;------------------------------------------------------------------------------------
(def ^:const default-input-buffer (* 1024 1024))
(defn connect [{:keys [host port input-buffer-size idle-timeout ssl] :or {host "localhost" input-buffer-size default-input-buffer port 80 ssl false} :as params}]
(let [client (HTTP2Client.)
address (InetSocketAddress. ^String host (int port))
listener (ServerSessionListener$Adapter.)
ssl-context-factory (when ssl (SslContextFactory$Client.))]
(when ssl (.addBean client ssl-context-factory))
(log/debug "Connecting with parameters: " params)
(.setInputBufferSize client input-buffer-size)
(.setInitialStreamRecvWindow client input-buffer-size)
(when idle-timeout
(.setIdleTimeout client idle-timeout))
(.start client)
(-> (jetty-promise
(fn [p]
(.connect client (when ssl ssl-context-factory) address listener p)))
(p/then (fn [session]
(let [context {:client client :session session}]
(log/debug "Session established:" context)
context)))
(p/catch (fn [e]
(p/create
(fn [resolve reject]
(.stop client) ;; run (.stop) in a different thread, because p/catch will be called from .connect -> reject
(reject e))
p.exec/default-executor))))))
(defn send-request
[{:keys [^Session session] :as context}
{:keys [input-ch meta-ch output-ch] :as request}]
(let [request-frame (build-request request (nil? input-ch))
listener (receive-listener meta-ch output-ch)]
(-> (jetty-promise
(fn [p]
(.newStream session request-frame p listener)))
(p/catch (fn [ex]
(close-all! meta-ch output-ch)
(throw ex))))))
(defn disconnect [{:keys [^HTTP2Client client] :as context}]
(log/debug "Disconnecting:" context)
(p/create
(fn [resolve reject]
(let [listener (reify LifeCycle$Listener
(^void lifeCycleFailure [this ^LifeCycle event ^Throwable cause]
(.removeEventListener client this)
(reject cause))
(^void lifeCycleStopped [this ^LifeCycle event]
(.removeEventListener client this)
(resolve (dissoc context :client :session))))]
(.addEventListener client listener)
(.stop client)))))