-
Notifications
You must be signed in to change notification settings - Fork 15
/
jetty.clj
190 lines (173 loc) · 7.19 KB
/
jetty.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved
;;
;; SPDX-License-Identifier: Apache-2.0
(ns protojure.internal.grpc.client.providers.http2.jetty
(:require [promesa.core :as p]
[clojure.core.async :refer [>!! <!! <! >! go go-loop] :as async]
[clojure.tools.logging :as log])
(:import (java.net InetSocketAddress)
(java.nio ByteBuffer)
(org.eclipse.jetty.http2.client HTTP2Client)
(org.eclipse.jetty.http2.api Stream$Listener)
(org.eclipse.jetty.http2.api.server ServerSessionListener$Adapter)
(org.eclipse.jetty.http2.frames HeadersFrame
DataFrame)
(org.eclipse.jetty.util Promise Callback)
(org.eclipse.jetty.http HttpFields
HttpURI
HttpVersion
MetaData$Request))
(:refer-clojure :exclude [resolve]))
;;------------------------------------------------------------------------------------
;; Utility functions
;;------------------------------------------------------------------------------------
(defn- jetty-promise
"converts a jetty promise to promesa"
[f]
(p/promise
(fn [resolve reject]
(let [p (reify Promise
(succeeded [_ result]
(resolve result))
(failed [_ error]
(reject error)))]
(f p)))))
(defn- jetty-callback-promise
"converts a jetty 'callback' to promesa"
[f]
(p/promise
(fn [resolve reject]
(let [cb (reify Callback
(succeeded [_]
(resolve true))
(failed [_ error]
(reject error)))]
(f cb)))))
(defn- ->fields
"converts a map of [string string] name/value attributes to a jetty HttpFields container"
[headers]
(let [fields (new HttpFields)]
(run! (fn [[k v]] (.put fields ^String k ^String v)) headers)
fields))
(defn- fields->
"converts jetty HttpFields container to a [string string] map"
[fields]
(->> (.iterator fields)
(iterator-seq)
(reduce (fn [acc x]
(assoc acc (.getName x) (.getValue x))) {})))
(defn- build-request
"Builds a HEADERFRAME representing our request"
[{:keys [method headers url] :or {method "GET" headers {}} :as request} last?]
(log/trace "Sending request:" request "ENDFRAME=" last?)
(let [_uri (HttpURI. ^String url)]
(as-> (->fields headers) $
(MetaData$Request. method _uri HttpVersion/HTTP_2 $)
(HeadersFrame. $ nil last?))))
(defn- close-all! [& channels]
(run! (fn [ch] (when (some? ch) (async/close! ch))) channels))
(defn- stream-log [sev stream & msg]
(log/log sev (apply str (cons (str "STREAM " (.getId stream) ": ") msg))))
(defn- receive-listener
"Implements a org.eclipse.jetty.http2.api.Stream.Listener set of callbacks"
[meta-ch data-ch]
(let [end-stream! (fn [stream] (stream-log :trace stream "Closing") (close-all! meta-ch data-ch))]
(reify Stream$Listener
(onHeaders [_ stream frame]
(let [metadata (.getMetaData frame)
fields (fields-> (.getFields metadata))
data (if (.isResponse metadata)
(let [status (.getStatus metadata)
reason (.getReason metadata)]
(-> {:headers fields}
(cond-> (some? status) (assoc :status status))
(cond-> (some? reason) (assoc :reason reason))))
{:trailers fields})
last? (.isEndStream frame)]
(stream-log :trace stream "Received HEADER-FRAME: " data " ENDFRAME=" last?)
(>!! meta-ch data)
(when last?
(end-stream! stream))))
(onData [_ stream frame callback]
(let [data (.getData frame)
len (.remaining data)
last? (.isEndStream frame)]
(stream-log :trace stream "Received DATA-FRAME (" len " bytes) ENDFRAME=" last?)
(when (some? data-ch)
(doseq [b (repeatedly len #(.get data))]
(async/put! data-ch (bit-and 0xff b)))) ;; FIXME: cast to byte?
(when last?
(end-stream! stream)))
(.succeeded callback))
(onFailure [_ stream error reason callback]
(stream-log :error stream "FAILURE: " error)
(>!! meta-ch {:error {:type :failure :code error :reason reason}})
(end-stream! stream)
(.succeeded callback))
(onReset [_ stream frame]
(stream-log :error stream "Received RST-FRAME")
(let [error (.getError frame)]
(>!! meta-ch {:error {:type :reset :code error}})
(end-stream! stream)))
(onIdleTimeout [_ stream ex]
(stream-log :error stream "Timeout")
(>!! meta-ch {:error {:type :timeout :error ex}})
(end-stream! stream))
(onClosed [_ stream]
(stream-log :trace stream "Closed"))
(onPush [_ stream frame]
(stream-log :trace stream "Received PUSH-FRAME")))))
(defn- transmit-data-frame
"Transmits a single DATA frame"
([stream data]
(transmit-data-frame stream data false 0))
([stream data last? padding]
(stream-log :trace stream "Sending DATA-FRAME with " (count data) " bytes, ENDFRAME=" last?)
@(jetty-callback-promise
(fn [cb]
(let [frame (DataFrame. (.getId stream) (ByteBuffer/wrap data) last? padding)]
(.data stream frame cb))))))
(defn- transmit-eof
"Transmits an empty DATA frame with the ENDSTREAM flag set to true, signifying the end of stream"
[stream]
(transmit-data-frame stream (byte-array 0) true 0))
(defn- transmit-data-frames
"Creates DATA frames from the buffers on the channel"
[input stream]
(when (some? input)
(go-loop []
(if-let [frame (<! input)]
(do
(transmit-data-frame stream frame)
(recur))
(transmit-eof stream)))))
;;------------------------------------------------------------------------------------
;; Exposed API
;;------------------------------------------------------------------------------------
(defn connect [{:keys [host port] :or {host "localhost" port 80} :as params}]
(let [client (HTTP2Client.)
address (InetSocketAddress. ^String host (int port))
listener (ServerSessionListener$Adapter.)]
(log/debug "Connecting with parameters: " params)
(.start client)
(-> (jetty-promise
(fn [p]
(.connect client nil address listener p)))
(p/then (fn [session]
(let [context {:client client :session session}]
(log/debug "Session established:" context)
context))))))
(defn send-request
[{:keys [session] :as context}
{:keys [input-ch meta-ch output-ch] :as request}]
(let [request-frame (build-request request (nil? input-ch))
listener (receive-listener meta-ch output-ch)]
(-> (jetty-promise
(fn [p]
(.newStream session request-frame p listener)))
(p/then (partial transmit-data-frames input-ch))
(p/catch (fn [ex] (close-all! meta-ch output-ch) (throw ex))))))
(defn disconnect [{:keys [client] :as context}]
(log/debug "Disconnecting:" context)
(.stop client)
(dissoc context :client :session))