Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Commit

Permalink
inspects response trailers periodically to detect end of response
Browse files Browse the repository at this point in the history
  • Loading branch information
shamsimam committed Jul 9, 2019
1 parent 27a8e25 commit e7c9dbb
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
40 changes: 32 additions & 8 deletions src/clj/qbits/jet/client/http.clj
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@
(reduction-function result (decode-body chunk as))))))

(defn- make-response
[request ^Response response body-ch error-chan trailers-ch]
[request ^Response response abort-ch body-ch error-chan trailers-ch]
(let [headers (util/http-fields->map (.getHeaders response))
status (.getStatus response)]
{:body body-ch
{:abort-ch abort-ch
:body body-ch
:error-chan error-chan
:headers headers
:request request
Expand Down Expand Up @@ -351,6 +352,23 @@
(forEach [_ action] (.forEach content-provider action))
(spliterator [_] (.spliterator content-provider)))))

(defn- track-response-trailers
"Asynchronously loops and looks for presence of trailers in the response.
When available, they are propagated to the trailers channel and other request channels closed.
Please see https://github.com/eclipse/jetty.project/issues/3842 for details on why we need this."
[response-trailers-ch response-body-ch close-request-channels! response]
(async/go
(loop []
;; iterate every second and check if trailers are available
(async/<! (async/timeout 1000))
(let [trailers (.getTrailers ^HttpResponse response)]
(if trailers
(do
(async/>! response-trailers-ch (util/http-fields->map trailers))
(close-request-channels!))
(when-not (protocols/closed? response-body-ch)
(recur)))))))

(defn request
[^HttpClient client
{:keys [url method query-string form-params headers body trailers-fn
Expand Down Expand Up @@ -382,7 +400,14 @@
(decode-chunk-xform as)))
trailers-ch (async/promise-chan)
^Request request (.newRequest client ^String url)
trailers-supported? (and trailers-fn (instance? HttpRequest request))]
trailers-supported? (and trailers-fn (instance? HttpRequest request))
close-request-channels! (fn close-request-channels! []
(when abort-ch
(async/close! abort-ch))
(async/close! body-ch)
(async/close! trailers-ch)
(async/close! ch)
(async/close! error-ch))]

(some->> version
HttpVersion/fromString
Expand Down Expand Up @@ -485,7 +510,9 @@
(.onResponseHeaders request
(reify Response$HeadersListener
(onHeaders [_ response]
(async/put! ch (make-response request response body-ch error-ch trailers-ch)))))
(async/put! ch (make-response request response abort-ch body-ch error-ch trailers-ch))
(when (util/http2-request? version)
(track-response-trailers trailers-ch body-ch close-request-channels! response)))))

(.onRequestFailure request
(reify Request$FailureListener
Expand All @@ -502,10 +529,7 @@
(when (instance? HttpResponse response)
(when-let [trailers (.getTrailers ^HttpResponse response)]
(async/>!! trailers-ch (util/http-fields->map trailers)))))
(async/close! body-ch)
(async/close! trailers-ch)
(async/close! ch)
(async/close! error-ch))))
(close-request-channels!))))
ch))

(defn get
Expand Down
5 changes: 4 additions & 1 deletion src/clj/qbits/jet/servlet.clj
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@
(catch Exception ex
(-> request-map :ctrl (async/put! [::error ex]))))
(when-not (chan? (:body %))
(.complete ctx))))))
(try
(.complete ctx)
(catch Exception ex
(-> request-map :ctrl (async/put! [::error ex])))))))))

clojure.lang.IPersistentMap
(-update-response [response-map request-map]
Expand Down

0 comments on commit e7c9dbb

Please sign in to comment.