From 47c8f5c38128fd871588a92546e58980e18c6033 Mon Sep 17 00:00:00 2001 From: Peter Taoussanis Date: Thu, 28 Aug 2014 17:56:08 +0700 Subject: [PATCH] NB Merged refactor [#12], [#59], [#60], [#66], [#67] (@ckarlsen), [#68], [#69] (@hugoduncan), more Completely refactored Sente's client<->server data handling for: * Greater consistency, simplicity, robustness. * Increased efficiency (smaller cb ids, leaner cb wrapping format). * Pluggable serialization. The pluggable serialization brings optional support for Transit, incl. JSON and MessagePack over Transit (thanks to @ckarlsen for getting the ball rolling on this!). An early, experimental FlexiPacker is provided that allows per-payload format selection and simple heuristic-based auto format selection for highly efficient client<->server data comms in a wide range of use cases. This will likely become Sente's standard (if not default) package format in future. Other changes in this squashed commit include: * General code review + clean-up. * Improved logging. * A new client-side `chsk-destroy!` API fn. * Improved client+server router support for component-style configs (thanks to @hugoduncan for this!): server+client side routers now both receive `event-msg`s, and those `event-msg`s contain more useful goodies. * Client-side router now traps+logs errors like the server-side router. * Improve reference example's compatibility with LightTable. --- example-project/project.clj | 6 +- example-project/src/example/my_app.cljx | 149 +-- project.clj | 10 +- src/taoensso/sente.cljx | 1158 ++++++++++++---------- src/taoensso/sente/interfaces.cljx | 26 + src/taoensso/sente/packers/transit.cljx | 161 +++ src/taoensso/sente/servers/http-kit.cljx | 7 + 7 files changed, 921 insertions(+), 596 deletions(-) create mode 100644 src/taoensso/sente/interfaces.cljx create mode 100644 src/taoensso/sente/packers/transit.cljx create mode 100644 src/taoensso/sente/servers/http-kit.cljx diff --git a/example-project/project.clj b/example-project/project.clj index 500669b..832d97e 100644 --- a/example-project/project.clj +++ b/example-project/project.clj @@ -27,7 +27,11 @@ [ring/ring-defaults "0.1.1"] ; Incl. `ring-anti-forgery`, etc. [hiccup "1.0.5"] ; Optional, just for HTML [org.clojure/core.match "0.2.2"] ; Optional but quite handly - ] + ;; + ;;; Transit deps optional; may be used to aid perf. of larger data payloads + ;;; (see reference example for details): + [com.cognitect/transit-clj "0.8.247"] + [com.cognitect/transit-cljs "0.8.184"]] :plugins [[lein-pprint "1.1.1"] diff --git a/example-project/src/example/my_app.cljx b/example-project/src/example/my_app.cljx index 4ea0967..0ec1d2f 100644 --- a/example-project/src/example/my_app.cljx +++ b/example-project/src/example/my_app.cljx @@ -12,7 +12,7 @@ INSTRUCTIONS: 1. Call `lein start-dev` at your terminal. 2. Connect to development nREPL (port will be printed). - 3. Evaluate this namespace and `(start-http-server!)` in this namespace. + 3. Evaluate this namespace and `(start!)` in this namespace. 4. Open browser & point to local http server (port will be printed). 5. Observe browser's console + nREPL's std-out. @@ -32,7 +32,10 @@ [clojure.core.async :as async :refer (! >!! put! chan go go-loop)] [taoensso.timbre :as timbre] [taoensso.sente :as sente] - [ring.middleware.anti-forgery :as ring-anti-forgery]) + [ring.middleware.anti-forgery :as ring-anti-forgery] + + ;; Optional, for Transit encoding: + [taoensso.sente.packers.transit :as sente-transit]) #+cljs (:require-macros @@ -44,17 +47,26 @@ [cljs.core.match] [cljs.core.async :as async :refer (! put! chan)] [taoensso.encore :as encore :refer (logf)] - [taoensso.sente :as sente :refer (cb-success?)])) + [taoensso.sente :as sente :refer (cb-success?)] + + ;; Optional, for Transit encoding: + [taoensso.sente.packers.transit :as sente-transit])) -;; #+clj (timbre/set-level! :trace) +;; (sente/set-logging-level! :trace) #+clj (defn- logf [fmt & xs] (println (apply format fmt xs))) -;;;; Setup server-side chsk handlers ------------------------------------------- +(def packer + "Defines our packing (serialization) format for client<->server comms." + ;; :edn ; Default + (sente-transit/get-flexi-packer :edn) ; Experimental, needs Transit deps + ) + +;;;; Server-side setup #+clj (let [{:keys [ch-recv send-fn ajax-post-fn ajax-get-or-ws-handshake-fn connected-uids]} - (sente/make-channel-socket! {})] + (sente/make-channel-socket! {:packer packer})] (def ring-ajax-post ajax-post-fn) (def ring-ajax-get-or-ws-handshake ajax-get-or-ws-handshake-fn) (def ch-chsk ch-recv) ; ChannelSocket's receive channel @@ -112,80 +124,65 @@ {:read-token (fn [req] (-> req :params :csrf-token))})] (ring.middleware.defaults/wrap-defaults my-routes ring-defaults-config))) -#+clj (defonce http-server_ (atom nil)) - -#+clj -(defn stop-http-server! [] - (when-let [current-server @http-server_] - (current-server :timeout 100))) - -#+clj -(defn start-http-server! [] - (let [s (http-kit-server/run-server (var my-ring-handler) {:port 0}) - uri (format "http://localhost:%s/" (:local-port (meta s)))] - (stop-http-server!) - (logf "Http-kit server is running at `%s`" uri) - (.browse (java.awt.Desktop/getDesktop) - (java.net.URI. uri)) - (reset! http-server_ s))) - -(comment (start-http-server!)) +;;;; Client-side setup -;;;; Setup client-side chsk handlers ------------------------------------------- +(def ^:private random-chsk-type-for-fun (if (>= (rand) 0.5) :ajax :auto)) #+cljs (let [{:keys [chsk ch-recv send-fn state]} (sente/make-channel-socket! "/chsk" ; Note the same URL as before - {:type (if (>= (rand) 0.5) :ajax :auto)})] + {:type random-chsk-type-for-fun + :packer packer})] (def chsk chsk) (def ch-chsk ch-recv) ; ChannelSocket's receive channel (def chsk-send! send-fn) ; ChannelSocket's send API fn - (def chsk-state state) ; Watchable, read-only atom + (def chsk-state state) ; Watchable, read-only atom ) -;;;; Setup routers ------------------------------------------------------------- +;;;; Routing handlers #+cljs (logf "ClojureScript appears to have loaded correctly.") #+clj -(defn- event-msg-handler - [{:as ev-msg :keys [ring-req event ?reply-fn]} _] +(defn- event-msg-handler "Server-side event-msg handler." + [{:as ev-msg + :keys [event ring-req ?reply-fn push-fn ; ... Useful stuff in here + ]}] (let [session (:session ring-req) uid (:uid session) [id data :as ev] event] (logf "Event: %s" ev) (match [id data] - ;; TODO: Match your events here, reply when appropriate <...> - :else - (do (logf "Unmatched event: %s" ev) - (when-not (:dummy-reply-fn? (meta ?reply-fn)) - (?reply-fn {:umatched-event-as-echoed-from-from-server ev})))))) - -#+clj -(defonce chsk-router - (sente/start-chsk-router-loop! event-msg-handler ch-chsk)) + ;; TODO: Match your events here, reply when appropriate <...> + :else + (do (logf "Unmatched event: %s" ev) + (when-not (:dummy-reply-fn (meta ?reply-fn)) + (?reply-fn {:umatched-event-as-echoed-from-from-server ev})))))) #+cljs -(defn- event-handler [[id data :as ev] _] - (logf "Event: %s" ev) - (match [id data] - ;; TODO Match your events here <...> - [:chsk/state {:first-open? true}] - (logf "Channel socket successfully established!") - [:chsk/state new-state] (logf "Chsk state change: %s" new-state) - [:chsk/recv payload] (logf "Push event from server: %s" payload) - :else (logf "Unmatched event: %s" ev))) +(defn- event-msg-handler "Client-side event-msg handler." + [{:as ev-msg + :keys [event ch-recv send-fn ; ... Useful stuff in here + ]}] + (let [[id data :as ev] event] -#+cljs -(defonce chsk-router - (sente/start-chsk-router-loop! event-handler ch-chsk)) + (logf "Event: %s" ev) + (match [id data] + ;; TODO Match your events here <...> + [:chsk/state {:first-open? true}] + (logf "Channel socket successfully established!") + + [:chsk/state new-state] (logf "Chsk state change: %s" new-state) + [:chsk/recv payload] (logf "Push event from server: %s" payload) + + :else (logf "Unmatched event: %s" ev)))) ;;;; Example: broadcast server>user ;; As an example of push notifications, we'll setup a server loop to broadcast ;; an event to _all_ possible user-ids every 10 seconds: #+clj -(defonce broadcaster +(defn start-broadcaster! [] (go-loop [i 0] (user: %s" @connected-uids)) @@ -206,7 +203,7 @@ (comment (test-fast-server>user-pushes)) -;;;; Setup client buttons +;;;; Client-side UI #+cljs (when-let [target-el (.getElementById js/document "btn1")] @@ -238,11 +235,45 @@ ;;; our channel socket to reconnect, thereby picking up the new ;;; session. - (encore/ajax-lite "/login" {:method :post - :params - {:user-id (str user-id) - :csrf-token (:csrf-token @chsk-state)}} - (fn [ajax-resp] - (logf "Ajax login response: %s" ajax-resp))) + (encore/ajax-lite "/login" + {:method :post + :params {:user-id (str user-id) + :csrf-token (:csrf-token @chsk-state)}} + (fn [ajax-resp] (logf "Ajax login response: %s" ajax-resp))) (sente/chsk-reconnect! chsk))))))) + +;;;; Init + +#+clj (defonce http-server_ (atom nil)) +#+clj +(defn stop-http-server! [] + (when-let [stop-f @http-server_] + (stop-f :timeout 100))) + +#+clj +(defn start-http-server! [] + (stop-http-server!) + (let [s (http-kit-server/run-server (var my-ring-handler) {:port 0}) + uri (format "http://localhost:%s/" (:local-port (meta s)))] + (reset! http-server_ s) + (logf "Http-kit server is running at `%s`" uri) + (.browse (java.awt.Desktop/getDesktop) + (java.net.URI. uri)))) + +#+clj (defonce router_ (atom nil)) +#+cljs (def router_ (atom nil)) +(defn stop-router! [] (when-let [stop-f @router_] (stop-f))) +(defn start-router! [] + (stop-router!) + (reset! router_ (sente/start-chsk-router! ch-chsk event-msg-handler))) + +(defn start! [] + (start-router!) + #+clj (start-http-server!) + #+clj (start-broadcaster!)) + +#+cljs (start!) +;; #+clj (start!) ; Auto-start disabled for LightTable, etc. +(comment (start!) + (test-fast-server>user-pushes)) diff --git a/project.clj b/project.clj index 7931e84..80222f5 100644 --- a/project.clj +++ b/project.clj @@ -15,10 +15,8 @@ [org.clojure/clojurescript "0.0-2322"] [org.clojure/core.async "0.1.338.0-5c5012-alpha"] [org.clojure/tools.reader "0.8.7"] - [com.taoensso/encore "1.7.1"] + [com.taoensso/encore "1.7.3"] [com.taoensso/timbre "3.2.1"] - [com.cognitect/transit-clj "0.8.247"] - [com.cognitect/transit-cljs "0.8.184"] [http-kit "2.1.19"]] :plugins @@ -30,8 +28,10 @@ :server-jvm {:jvm-opts ^:replace ["-server"]} :1.6 {:dependencies [[org.clojure/clojure "1.6.0"]]} :1.7 {:dependencies [[org.clojure/clojure "1.7.0-alpha1"]]} - :test {:dependencies [[expectations "2.0.9"] - [org.clojure/test.check "0.5.9"] + :test {:dependencies [[com.cognitect/transit-clj "0.8.247"] + [com.cognitect/transit-cljs "0.8.184"] + [expectations "2.0.9"] + [org.clojure/test.check "0.5.9"] ;; [com.cemerick/double-check "0.5.7"] ] :plugins [[lein-expectations "0.0.8"] diff --git a/src/taoensso/sente.cljx b/src/taoensso/sente.cljx index efdce60..262492a 100644 --- a/src/taoensso/sente.cljx +++ b/src/taoensso/sente.cljx @@ -1,9 +1,9 @@ (ns taoensso.sente "Channel sockets. Otherwise known as The Shiz. - Protocol | client>server | client>server + ack/reply | server>user[1] push - * WebSockets: ✓ [2] ✓ - * Ajax: [3] ✓ [4] + Protocol | client>server | client>server ?+ ack/reply | server>user[1] push + * WebSockets: ✓ [2] ✓ + * Ajax: [3] ✓ [4] [1] By user-id => ALL of a user's connected clients (browser tabs, devices, etc.). Note that user > session > client > connection for consistency @@ -22,6 +22,8 @@ * cb - Callback. * tout - Timeout. * ws - WebSocket/s. + * pstr - Packed string. Arbitrary Clojure data serialized as a string (e.g. + edn) for client<->server comms. Special messages (implementation detail): * Callback replies: :chsk/closed, :chsk/timeout, :chsk/error. @@ -29,22 +31,17 @@ [:chsk/handshake [ ]], [:chsk/ws-ping], [:chsk/state ], - [:chsk/recv <[buffered-evs]>] ; server>user push + [:chsk/recv <[buffered-evs]>] ; server>user push * Server-side events: - [:chsk/bad-edn ], - [:chsk/bad-event ], - [:chsk/uidport-open], - [:chsk/uidport-close]. + [:chsk/bad-package ], ; was :chsk/bad-edn + [:chsk/bad-event ], + [:chsk/uidport-open], + [:chsk/uidport-close]. - * Event wrappers: {:chsk/clj :chsk/dummy-cb? true} (for [2]), - {:chsk/clj :chsk/cb-uuid } (for [4]). + * Callback wrapping: [ ] for [2],[3]. Notable implementation details: - * Edn is used as a flexible+convenient transfer format, but can be seen as - an implementation detail. Users may apply additional string encoding (e.g. - JSON) at will. (This would incur a cost, but it'd be negligable compared - to even the fastest network transfer times). * core.async is used liberally where brute-force core.async allows for significant implementation simplifications. We lean on core.async's strong efficiency here. @@ -68,71 +65,41 @@ (:require [clojure.string :as str] [clojure.core.async :as async :refer (! >!! put! chan go go-loop)] - [clojure.tools.reader.edn :as edn] - [org.httpkit.server :as http-kit] - [taoensso.encore :as encore] - [taoensso.timbre :as timbre] - [cognitect.transit :as transit]) - #+clj - (:import [java.io ByteArrayInputStream ByteArrayOutputStream]) + ;; [clojure.tools.reader.edn :as edn] + [org.httpkit.server :as http-kit] + [taoensso.encore :as encore] + [taoensso.timbre :as timbre] + [taoensso.sente.interfaces :as interfaces]) #+cljs (:require [clojure.string :as str] [cljs.core.async :as async :refer (! put! chan)] - [cljs.reader :as edn] + ;; [cljs.reader :as edn] [taoensso.encore :as encore :refer (format)] - [cognitect.transit :as transit]) + [taoensso.sente.interfaces :as interfaces]) #+cljs (:require-macros [cljs.core.async.macros :as asyncm :refer (go go-loop)])) -;;;; -;;;; Transit support -;;;; - -#+clj -(defn write-transit [x] - (let [out (ByteArrayOutputStream. ) - w (transit/writer out :json) - _ (transit/write w x) - ret (.toString out)] - (.reset out) - ret)) - -#+cljs -(defn write-transit [x] - (let [w (transit/writer :json)] - (transit/write w x))) - -#+clj -(defn read-transit [x] - (let [in (ByteArrayInputStream. (.getBytes x)) - reader (transit/reader in :json)] - (transit/read reader))) - -#+cljs -(defn read-transit [x] - (let [reader (transit/reader :json)] - (transit/read reader x))) - -(defn write-data [data-format s] - (if (= data-format :transit) - (write-transit s) - (pr-str s))) +;;;; Logging -(defn read-data [data-format s] - (if (= data-format :transit) - (read-transit s) - (edn/read-string s))) +#+clj (refer 'taoensso.timbre :only '(tracef debugf infof warnf errorf)) +#+cljs (do (def tracef encore/tracef) + (def debugf encore/debugf) + (def infof encore/infof) + (def warnf encore/warnf) + (def errorf encore/errorf)) +(defn set-logging-level! [level] + #+clj (timbre/set-level! level) + #+cljs (reset! encore/logging-level level)) -;;;; Shared (client+server) +;; (set-logging-level! :trace) ; For debugging -(defn- chan? [x] - #+clj (instance? clojure.core.async.impl.channels.ManyToManyChannel x) - #+cljs (instance? cljs.core.async.impl.channels.ManyToManyChannel x)) +;;;; Events +;; * Clients & server both send `event`s and receive (i.e. route) `event-msg`s. -(defn- validate-event-form [x] +(defn- validate-event [x] (cond (not (vector? x)) :wrong-type (not (#{1 2} (count x))) :wrong-length @@ -141,10 +108,10 @@ (not (namespace ev-id)) :unnamespaced-id :else nil)))) -(defn event? "Valid [ev-id ?ev-data] form?" [x] (nil? (validate-event-form x))) +(defn event? "Valid [ev-id ?ev-data] form?" [x] (nil? (validate-event x))) (defn assert-event [x] - (when-let [?err (validate-event-form x)] + (when-let [?err (validate-event x)] (let [err-fmt (str (case ?err @@ -156,132 +123,151 @@ " Event should be of `[ev-id ?ev-data]` form: %s")] (throw (ex-info (format err-fmt (str x)) {:malformed-event x}))))) -(defn cb-success? [cb-reply] ;; Cb reply need _not_ be `event` form! - (not (#{:chsk/closed :chsk/timeout :chsk/error} cb-reply))) - -#+clj ; For #+cljs we'd rather just throw client-side on bad edn from server -(defn- try-read-edn [data-format edn] - (try (read-data data-format edn) - (catch Throwable t [:chsk/bad-edn edn]))) - -(defn- unwrap-edn-msg-with-?cb->clj "edn -> [clj ?cb-uuid]" - [data-format edn] - (let [msg #+clj (try-read-edn data-format edn) - #+cljs (read-data data-format edn) - ?cb-uuid (and (map? msg) (:chsk/cb-uuid msg)) - clj (if-not ?cb-uuid msg (:chsk/clj msg))] - [clj ?cb-uuid])) +(defn- chan? [x] + #+clj (instance? clojure.core.async.impl.channels.ManyToManyChannel x) + #+cljs (instance? cljs.core.async.impl.channels.ManyToManyChannel x)) -;;;; Server +(defn event-msg? [x] + #+cljs + (and + (map? x) + (encore/keys= x #{:ch-recv :send-fn :state :event}) + (let [{:keys [ch-recv send-fn state event]} x] + (and + (chan? ch-recv) + (ifn? send-fn) + (encore/atom? state) + (event? event)))) -#+clj -(defn event-msg? - "Valid {:client-uuid _ :ring-req _ :event _ :?reply-fn _} form?" - [x] - (and (map? x) (= (count x) 4) - (every? #{:client-uuid :ring-req :event :?reply-fn} (keys x)) - (let [{:keys [client-uuid hk-ch ring-req event ?reply-fn]} x] - (and (string? client-uuid) ; Set by client (Ajax) or server (WebSockets) - (map? ring-req) - (event? event) - (or (nil? ?reply-fn) (ifn? ?reply-fn)))))) + #+clj + (and + (map? x) + (encore/keys= x + #{:ch-recv :push-fn :connected-uids + :client-uuid :ring-req :event :?reply-fn}) + (let [{:keys [ch-recv push-fn connected-uids + client-uuid ring-req event ?reply-fn]} x] + (and + (chan? ch-recv) + (ifn? push-fn) + (encore/atom? connected-uids) + ;; (string? client-uuid) + (encore/nblank-str? client-uuid) ; Set by client (ajax) or server (ws) + (map? ring-req) + (event? event) + (or (nil? ?reply-fn) (ifn? ?reply-fn)))))) #+clj -(defn- receive-event-msg! - [ch-recv {:as ev-msg :keys [client-uuid ring-req event ?reply-fn]}] - (let [ev-msg* - {:client-uuid client-uuid ; Browser-tab / device identifier - :ring-req ring-req - :event (if (event? event) event [:chsk/bad-event event]) +(defn- put-event-msg>ch-recv! + [ch-recv {:as ev-msg + :keys [ch-recv push-fn connected-uids + client-uuid hk-ch ring-req event ?reply-fn]}] + (let [valid-event (if (event? event) event [:chsk/bad-event event]) + ev-msg* + {:ch-recv ch-recv + :push-fn push-fn + :connected-uids connected-uids + ;; + :client-uuid client-uuid ; Browser-tab / device identifier + :ring-req ring-req + :event valid-event :?reply-fn (if (ifn? ?reply-fn) ?reply-fn - (-> (fn [resp-clj] ; Dummy warn fn - (timbre/warnf "Trying to reply to non-cb req: %s" event)) - ;; Useful to distinguish between a real cb reply fn and dummy: - (with-meta {:dummy-reply-fn? true})))}] + ^:dummy-reply-fn ; Useful for routers, etc. + (fn [resp-clj] + (warnf "Trying to reply to non-cb event: %s (with reply %s)" + valid-event resp-clj)))}] - (if (event-msg? ev-msg*) ; Be conservative about what we put to chan! - (put! ch-recv ev-msg*) - (timbre/warnf "Bad ev-msg!: %s (%s)" ev-msg* ev-msg)))) + (if-not (event-msg? ev-msg*) ; NB conservative!! + (warnf "Bad ev-msg!: %s" ev-msg) ; Log 'n drop + (put! ch-recv ev-msg*)))) -#+clj -(defn- send-buffered-evs>ws-clients! - "Actually pushes buffered events (edn) to all uid's WebSocket conns." - [conns_ uid buffered-evs-edn] - (doseq [hk-ch (get-in @conns_ [:ws uid])] - (http-kit/send! hk-ch buffered-evs-edn))) +#+cljs +(defn cb-success? "Note that cb reply need _not_ be `event` form!" + [cb-reply-clj] (not (#{:chsk/closed :chsk/timeout :chsk/error} cb-reply-clj))) + +;;;; Packing +;; * Client<->server payloads are arbitrary Clojure vals (cb replies or events). +;; * Payloads are packed for client<->server transit. +;; * Packing includes ->str encoding, and may incl. wrapping to carry cb info. + +(defn- unpack* "pstr->clj" [packer pstr] + (try + (assert (string? pstr)) + (interfaces/unpack packer pstr) + (catch #+clj Throwable #+cljs :default t + (debugf "Bad package: %s (%s)" pstr t) + #+clj [:chsk/bad-package pstr] + #+cljs (throw t) ; Let client rethrow on bad pstr from server + ))) + +(defn- with-?meta [x ?m] (if (seq ?m) (with-meta x ?m) x)) +(defn- pack* "clj->prefixed-pstr" + ([packer ?packer-meta clj] + (str "-" ; => Unwrapped (no cb metadata) + (interfaces/pack packer (with-?meta clj ?packer-meta)))) + + ([packer ?packer-meta clj ?cb-uuid] + (let [;;; Keep wrapping as light as possible: + ?cb-uuid (if (= ?cb-uuid :ajax-cb) 0 ?cb-uuid) + wrapped-clj (if ?cb-uuid [clj ?cb-uuid] [clj])] + (str "+" ; => Wrapped (cb metadata) + (interfaces/pack packer (with-?meta wrapped-clj ?packer-meta)))))) + +(defn- pack [& args] + (let [pstr (apply pack* args)] + (tracef "Packing: %s -> %s" args pstr) + pstr)) + +(defn- unpack "prefixed-pstr->[clj ?cb-uuid]" + [packer prefixed-pstr] + (assert (string? prefixed-pstr)) + (let [prefix (encore/substr prefixed-pstr 0 1) + pstr (encore/substr prefixed-pstr 1) + clj (unpack* packer pstr) ; May be un/wrapped + wrapped? (case prefix "-" false "+" true) + [clj ?cb-uuid] (if wrapped? clj [clj nil]) + ?cb-uuid (if (= 0 ?cb-uuid) :ajax-cb ?cb-uuid)] + (tracef "Unpacking: %s -> %s" prefixed-pstr [clj ?cb-uuid]) + [clj ?cb-uuid])) -#+clj -(defn- send-buffered-evs>ajax-clients! - "Actually pushes buffered events (edn) to all uid's Ajax conns. Allows some - time for possible Ajax poller reconnects." - [conns_ uid buffered-evs-edn & [{:keys [nmax-attempts ms-base ms-rand] - ;; <= 7 attempts at ~135ms ea = 945ms - :or {nmax-attempts 7 - ms-base 90 - ms-rand 90}}]] - (comment (* 7 (+ 90 (/ 90 2.0)))) - (let [;; All connected/possibly-reconnecting client uuids: - client-uuids-unsatisfied (keys (get-in @conns_ [:ajax uid]))] - (when-not (empty? client-uuids-unsatisfied) - ;; (println "client-uuids-unsatisfied: " client-uuids-unsatisfied) - (go-loop [n 0 client-uuids-satisfied #{}] - (let [?pulled ; nil or { [ ]} - (encore/swap-in! conns_ [:ajax uid] - (fn [m] ; { [ ]} - (let [ks-to-pull (remove client-uuids-satisfied (keys m))] - ;; (println "ks-to-pull: " ks-to-pull) - (if (empty? ks-to-pull) - (encore/swapped m nil) - (encore/swapped - (reduce - (fn [m k] - (let [[?hk-ch udt-last-connected] (get m k)] - (assoc m k [nil udt-last-connected]))) - m ks-to-pull) - (select-keys m ks-to-pull))))))] - (assert (or (nil? ?pulled) (map? ?pulled))) - (let [?newly-satisfied - (when ?pulled - (reduce-kv - (fn [s client-uuid [?hk-ch _]] - (if (or (nil? ?hk-ch) - ;; hk-ch may have closed already (`send!` will noop): - (not (http-kit/send! ?hk-ch buffered-evs-edn))) - s - (conj s client-uuid))) #{} ?pulled)) - now-satisfied (into client-uuids-satisfied ?newly-satisfied)] - ;; (println "now-satisfied:" now-satisfied) - (when (and (< n nmax-attempts) - (some (complement now-satisfied) client-uuids-unsatisfied)) - ;; Allow some time for possible poller reconnects: - (ws-clients! + ^:private send-buffered-evs>ajax-clients!) #+clj (defn make-channel-socket! - "Returns `{:keys [ch-recv send-fn ajax-post-fn ajax-get-or-ws-handshake-fn - connected-uids]}`: - * ch-recv - core.async channel ; For server-side chsk request router, will - ; receive `event-msg`s from clients. - * send-fn - (fn [user-id ev]) ; For server>user push - * ajax-post-fn - (fn [ring-req]) ; For Ring CSRF-POST, chsk URL - * ajax-get-or-ws-handshake-fn - (fn [ring-req]) ; For Ring GET, chsk URL - * connected-uids ; Watchable, read-only (atom {:ws #{_} :ajax #{_} :any #{_}}) + "Returns a map with keys: + :ch-recv ; core.async channel to receive `event-msg`s (internal or from clients). + :send-fn ; (fn [user-id ev] for server>user push. + :ajax-post-fn ; (fn [ring-req] for Ring CSRF-POST + chsk URL. + :ajax-get-or-ws-handshake-fn ; (fn [ring-req]) for Ring GET + chsk URL. + :connected-uids ; Watchable, read-only (atom {:ws #{_} :ajax #{_} :any #{_}}). Common options: - * user-id-fn ; (fn [ring-req]) -> unique user-id for server>user push. - * csrf-token-fn ; (fn [ring-req]) -> CSRF token for Ajax POSTs. - * send-buf-ms-ajax ; [1] - * send-buf-ms-ws ; [1] - * data-format : :edn (default) or :transit (experimental) + :user-id-fn ; (fn [ring-req]) -> unique user-id for server>user push. + :csrf-token-fn ; (fn [ring-req]) -> CSRF token for Ajax POSTs. + :send-buf-ms-ajax ; [1] + :send-buf-ms-ws ; [1] + :packer ; :edn (default), or an IPacker implementation (experimental). [1] Optimization to allow transparent batching of rapidly-triggered server>user pushes. This is esp. important for Ajax clients which use a (slow) reconnecting poller. Actual event dispatch may occur <= given ms after send call (larger values => larger batch windows)." [& [{:keys [recv-buf-or-n send-buf-ms-ajax send-buf-ms-ws - user-id-fn csrf-token-fn data-format] + user-id-fn csrf-token-fn packer] :or {recv-buf-or-n (async/sliding-buffer 1000) send-buf-ms-ajax 100 send-buf-ms-ws 30 @@ -290,12 +276,13 @@ (or (get-in ring-req [:session :csrf-token]) (get-in ring-req [:session :ring.middleware.anti-forgery/anti-forgery-token]) (get-in ring-req [:session "__anti-forgery-token"]))) - data-format :edn}}]] + packer :edn}}]] {:pre [(encore/pos-int? send-buf-ms-ajax) (encore/pos-int? send-buf-ms-ws)]} - (let [ch-recv (chan recv-buf-or-n) + (let [packer (interfaces/coerce-packer packer) + ch-recv (chan recv-buf-or-n) conns_ (atom {:ws {} ; { <#{hk-chs}>} :ajax {} ; { { [ ]}} }) @@ -339,147 +326,170 @@ (when (and (contains? old-any uid) (not (contains? new-any uid))) :newly-disconnected))))))] - newly-disconnected?))] - - {:ch-recv ch-recv + newly-disconnected?)) + + send-fn ; server>user (by uid) push + (fn [user-id ev + ;; Extra arity currently undocumented: + & [{:as _opts :keys [flush-send-buffer?]}]] + (let [uid user-id + uid-name (str (or uid "nil")) + _ (tracef "Chsk send: (->uid %s) %s" uid-name ev) + _ (assert-event ev) + ev-uuid (encore/uuid-str) + + flush-buffer! + (fn [type] + (when-let + [pulled + (encore/swap-in! send-buffers_ [type] + (fn [m] + ;; Don't actually flush unless the event buffered + ;; with _this_ send call is still buffered (awaiting + ;; flush). This means that we'll have many (go + ;; block) buffer flush calls that'll noop. They're + ;; cheap, and this approach is preferable to + ;; alternatives like flush workers. + (let [[_ ev-uuids] (get m uid)] + (if (contains? ev-uuids ev-uuid) + (encore/swapped (dissoc m uid) + (get m uid)) + (encore/swapped m nil)))))] + (let [[buffered-evs ev-uuids] pulled] + (assert (vector? buffered-evs)) + (assert (set? ev-uuids)) + + (let [packer-metas (map meta buffered-evs) + combined-packer-meta (reduce merge {} packer-metas) + buffered-evs-ppstr (pack packer + combined-packer-meta + buffered-evs)] + (tracef "buffered-evs-ppstr: %s (with meta %s)" + buffered-evs-ppstr combined-packer-meta) + (case type + :ws (send-buffered-evs>ws-clients! conns_ + uid buffered-evs-ppstr) + :ajax (send-buffered-evs>ajax-clients! conns_ + uid buffered-evs-ppstr))))))] + + (if (= ev [:chsk/close]) ; Currently undocumented + (do + (debugf "Chsk closing (client may reconnect): %s" uid-name) + (when flush-send-buffer? + (doseq [type [:ws :ajax]] + (flush-buffer! type))) + + (doseq [hk-ch (get-in @conns_ [:ws uid])] (http-kit/close hk-ch)) + (doseq [hk-ch (->> (get-in @conns_ [:ajax uid]) + (vals) + (map first) + (remove nil?))] (http-kit/close hk-ch))) + + (do + ;; Buffer event + (doseq [type [:ws :ajax]] + (encore/swap-in! send-buffers_ [type uid] + (fn [old-v] + (if-not old-v [[ev] #{ev-uuid}] + (let [[buffered-evs ev-uuids] old-v] + [(conj buffered-evs ev) + (conj ev-uuids ev-uuid)]))))) + + ;;; Flush event buffers after relevant timeouts: + ;; * May actually flush earlier due to another timeout. + ;; * We send to _all_ of a uid's connections. + ;; * Broadcasting is possible but I'd suggest doing it rarely, and + ;; only to users we know/expect are actually online. + (go (when-not flush-send-buffer? (user (by uid) push - (fn [user-id ev & [{:as _opts :keys [flush-send-buffer?]}]] - (let [uid user-id - uid-name (str (or uid "nil")) - _ (timbre/tracef "Chsk send: (->uid %s) %s" uid-name ev) - _ (assert-event ev) - ev-uuid (encore/uuid-str) - - flush-buffer! - (fn [type] - (when-let [pulled - (encore/swap-in! send-buffers_ [type] - (fn [m] - ;; Don't actually flush unless the event buffered - ;; with _this_ send call is still buffered (awaiting - ;; flush). This means that we'll have many (go - ;; block) buffer flush calls that'll noop. They're - ;; cheap, and this approach is preferable to - ;; alternatives like flush workers. - (let [[_ ev-uuids] (get m uid)] - (if (contains? ev-uuids ev-uuid) - (encore/swapped (dissoc m uid) - (get m uid)) - (encore/swapped m nil)))))] - (let [[buffered-evs ev-uuids] pulled] - (assert (vector? buffered-evs)) - (assert (set? ev-uuids)) - - (let [buffered-evs-edn (write-data data-format buffered-evs)] - (case type - :ws (send-buffered-evs>ws-clients! conns_ - uid buffered-evs-edn) - :ajax (send-buffered-evs>ajax-clients! conns_ - uid buffered-evs-edn))))))] - - (if (= ev [:chsk/close]) - (do - (timbre/debugf "Chsk CLOSING: %s" uid-name) - - (when flush-send-buffer? - (doseq [type [:ws :ajax]] - (flush-buffer! type))) - - (doseq [hk-ch (get-in @conns_ [:ws uid])] (http-kit/close hk-ch)) - (doseq [hk-ch (->> (get-in @conns_ [:ajax uid]) - (vals) - (map first) - (remove nil?))] (http-kit/close hk-ch))) - - (do - ;; Buffer event - (doseq [type [:ws :ajax]] - (encore/swap-in! send-buffers_ [type uid] - (fn [old-v] - (if-not old-v [[ev] #{ev-uuid}] - (let [[buffered-evs ev-uuids] old-v] - [(conj buffered-evs ev) - (conj ev-uuids ev-uuid)]))))) - - ;;; Flush event buffers after relevant timeouts: - ;; * May actually flush earlier due to another timeout. - ;; * We send to _all_ of a uid's connections. - ;; * Broadcasting is possible but I'd suggest doing it rarely, and - ;; only to users we know/expect are actually online. - (go (when-not flush-send-buffer? (resp) (fn [ring-req] (http-kit/with-channel ring-req hk-ch - (let [msg (->> ring-req :params :edn (try-read-edn data-format)) - dummy-cb? (and (map? msg) (:chsk/dummy-cb? msg)) - clj (if-not dummy-cb? msg (:chsk/clj msg))] - - (receive-event-msg! ch-recv - {;; Currently unused for non-lp POSTs, but necessary for `event-msg?`: - :client-uuid "degenerate-ajax-post-fn-uuid" ; (encore/uuid-str) - :ring-req ring-req - :event clj - :?reply-fn - (when-not dummy-cb? - (fn reply-fn [resp-clj] ; Any clj form - (timbre/tracef "Chsk send (ajax reply): %s" resp-clj) - (let [resp-edn (write-data data-format resp-clj)] - ;; true iff apparent success: - (http-kit/send! hk-ch resp-edn))))}) - - (when dummy-cb? - (timbre/tracef "Chsk send (ajax reply): cb-dummy-200") - (http-kit/send! hk-ch (write-data data-format :chsk/cb-dummy-200)))))) + (let [ppstr (get-in ring-req [:params :ppstr]) + [clj has-cb?] (unpack packer ppstr)] + + (put-event-msg>ch-recv! ch-recv + (merge ev-msg-const + {;; Currently unused for non-lp POSTs, but necessary for `event-msg?`: + :client-uuid "dummy-ajax-post-fn-uuid" ; (encore/uuid-str) + :ring-req ring-req + :event clj + :?reply-fn + (when has-cb? + (fn reply-fn [resp-clj] ; Any clj form + (tracef "Chsk send (ajax reply): %s" resp-clj) + (let [resp-ppstr (pack packer (meta resp-clj) resp-clj)] + ;; true iff apparent success: + (http-kit/send! hk-ch resp-ppstr))))})) + + (when-not has-cb? + (tracef "Chsk send (ajax reply): dummy-cb-200") + (http-kit/send! hk-ch + (let [ppstr (pack packer nil :chsk/dummy-cb-200)] + ppstr)))))) :ajax-get-or-ws-handshake-fn ; Ajax handshake/poll, or WebSocket handshake (fn [ring-req] (http-kit/with-channel ring-req hk-ch (let [uid (user-id-fn ring-req) - uid-name (str (or uid "nil")) csrf-token (csrf-token-fn ring-req) - client-uuid ; Browser-tab / device identifier + uid-name (str (or uid "nil")) + client-uuid ; Browser-tab / device identifier (str uid "-" ; Security measure (can't be controlled by client) (or (get-in ring-req [:params :ajax-client-uuid]) - (encore/uuid-str))) + (encore/uuid-str 8) ; Reduced len (combined with uid) + )) - receive-event-msg!* ; Partial + receive-event-msg! ; Partial (fn [event & [?reply-fn]] - (receive-event-msg! ch-recv - {:client-uuid client-uuid ; Fixed (constant) with handshake - :ring-req ring-req ; '' - :event event - :?reply-fn ?reply-fn})) + (put-event-msg>ch-recv! ch-recv + (merge ev-msg-const + {:client-uuid client-uuid ; Fixed (constant) with handshake + :ring-req ring-req ; '' + :event event + :?reply-fn ?reply-fn}))) handshake! - (fn [hk-ch] (http-kit/send! hk-ch - (write-data data-format [:chsk/handshake [uid csrf-token]])))] + (fn [hk-ch] + (tracef "Handshake!") + (http-kit/send! hk-ch + (let [ppstr (pack packer nil [:chsk/handshake [uid csrf-token]])] + ppstr)))] (if (:websocket? ring-req) (do ; WebSocket handshake - (timbre/tracef "New WebSocket channel: %s %s" + (tracef "New WebSocket channel: %s (%s)" uid-name (str hk-ch)) ; _Must_ call `str` on ch (encore/swap-in! conns_ [:ws uid] (fn [s] (conj (or s #{}) hk-ch))) (when (connect-uid! :ws uid) - (receive-event-msg!* [:chsk/uidport-open])) + (receive-event-msg! [:chsk/uidport-open])) (http-kit/on-receive hk-ch - (fn [req-edn] - (let [[clj ?cb-uuid] (unwrap-edn-msg-with-?cb->clj data-format req-edn)] - (receive-event-msg!* clj + (fn [req-ppstr] + (let [[clj ?cb-uuid] (unpack packer req-ppstr)] + (receive-event-msg! clj ; Should be ev (when ?cb-uuid (fn reply-fn [resp-clj] ; Any clj form - (timbre/tracef "Chsk send (ws reply): %s" resp-clj) - (let [resp-edn (write-data data-format {:chsk/clj resp-clj - :chsk/cb-uuid ?cb-uuid})] + (tracef "Chsk send (ws reply): %s" resp-clj) + (let [resp-ppstr (pack packer (meta resp-clj) + resp-clj ?cb-uuid)] ;; true iff apparent success: - (http-kit/send! hk-ch resp-edn)))))))) + (http-kit/send! hk-ch resp-ppstr)))))))) ;; We rely on `on-close` to trigger for _every_ conn: (http-kit/on-close hk-ch @@ -492,7 +502,7 @@ (assoc m uid new))))) ;; (when (upd-connected-uid! uid) - ;; (receive-event-msg!* [:chsk/uidport-close])) + ;; (receive-event-msg! [:chsk/uidport-close])) (go ;; Allow some time for possible reconnects (sole window @@ -505,7 +515,7 @@ ;; WS semantics: 'still disconnected after disconnect+5s'. ;; (when (upd-connected-uid! uid) - (receive-event-msg!* [:chsk/uidport-close]))))) + (receive-event-msg! [:chsk/uidport-close]))))) (handshake! hk-ch)) @@ -518,7 +528,7 @@ (nil? v))))] (when (connect-uid! :ajax uid) - (receive-event-msg!* [:chsk/uidport-open])) + (receive-event-msg! [:chsk/uidport-open])) ;; We rely on `on-close` to trigger for _every_ conn: (http-kit/on-close hk-ch @@ -549,13 +559,76 @@ :disconnected))))))] (when disconnected? (when (upd-connected-uid! uid) - (receive-event-msg!* [:chsk/uidport-close])))))))) + (receive-event-msg! [:chsk/uidport-close])))))))) (when handshake? (handshake! hk-ch) ; Client will immediately repoll ))))))})) -;;;; Client +#+clj +(defn- send-buffered-evs>ws-clients! + "Actually pushes buffered events (as packed-str) to all uid's WebSocket conns." + [conns_ uid buffered-evs-pstr] + (tracef "send-buffered-evs>ws-clients!: %s" buffered-evs-pstr) + (doseq [hk-ch (get-in @conns_ [:ws uid])] + (http-kit/send! hk-ch buffered-evs-pstr))) + +#+clj +(defn- send-buffered-evs>ajax-clients! + "Actually pushes buffered events (as packed-str) to all uid's Ajax conns. + Allows some time for possible Ajax poller reconnects." + [conns_ uid buffered-evs-pstr & [{:keys [nmax-attempts ms-base ms-rand] + ;; <= 7 attempts at ~135ms ea = 945ms + :or {nmax-attempts 7 + ms-base 90 + ms-rand 90}}]] + (comment (* 7 (+ 90 (/ 90 2.0)))) + (let [;; All connected/possibly-reconnecting client uuids: + client-uuids-unsatisfied (keys (get-in @conns_ [:ajax uid]))] + (when-not (empty? client-uuids-unsatisfied) + ;; (tracef "client-uuids-unsatisfied: %s" client-uuids-unsatisfied) + (go-loop [n 0 client-uuids-satisfied #{}] + (let [?pulled ; nil or { [ ]} + (encore/swap-in! conns_ [:ajax uid] + (fn [m] ; { [ ]} + (let [ks-to-pull (remove client-uuids-satisfied (keys m))] + ;; (tracef "ks-to-pull: %s" ks-to-pull) + (if (empty? ks-to-pull) + (encore/swapped m nil) + (encore/swapped + (reduce + (fn [m k] + (let [[?hk-ch udt-last-connected] (get m k)] + (assoc m k [nil udt-last-connected]))) + m ks-to-pull) + (select-keys m ks-to-pull))))))] + (assert (or (nil? ?pulled) (map? ?pulled))) + (let [?newly-satisfied + (when ?pulled + (reduce-kv + (fn [s client-uuid [?hk-ch _]] + (if (or (nil? ?hk-ch) + ;; hk-ch may have closed already (`send!` will noop): + (not (http-kit/send! ?hk-ch buffered-evs-pstr))) + s + (conj s client-uuid))) #{} ?pulled)) + now-satisfied (into client-uuids-satisfied ?newly-satisfied)] + ;; (tracef "now-satisfied: %s" now-satisfied) + (when (and (< n nmax-attempts) + (some (complement now-satisfied) client-uuids-unsatisfied)) + ;; Allow some time for possible poller reconnects: + (edn-msg-with-?cb "clj -> [edn ?cb-uuid]" - [cbs-waiting_ clj ?timeout-ms ?cb-fn data-format] - (let [?cb-uuid (when ?cb-fn (encore/uuid-str)) - msg (if-not ?cb-uuid clj {:chsk/clj clj :chsk/cb-uuid ?cb-uuid}) - ;; Note that if pr-str throws, it'll throw before swap!ing cbs-waiting: - edn (write-data data-format msg)] - (when ?cb-uuid - (swap! cbs-waiting_ - (fn [[_ m]] [nil (assoc m ?cb-uuid ?cb-fn)])) - (when ?timeout-ms - (go (chsk-state! [{:keys [chs state_] :as chsk} merge-state] (let [[old-state new-state] @@ -610,12 +656,16 @@ (let [new-state (merge old-state merge-state)] (encore/swapped new-state [old-state new-state]))))] (when (not= old-state new-state) - ;; (encore/debugf "Chsk state change: %s" new-state) + ;; (debugf "Chsk state change: %s" new-state) (put! (:state chs) new-state) new-state))) -#+cljs ;; Experimental, undocumented: -(defn- wrap-cb-chan-as-fn [?cb ev] +#+cljs +(defn- cb-chan-as-fn + "Experimental, undocumented. Allows a core.async channel to be provided + instead of a cb-fn. The channel will receive values of form + [.cb ]." + [?cb ev] (if (or (nil? ?cb) (ifn? ?cb)) ?cb (do (assert (chan? ?cb)) (assert-event ev) @@ -626,8 +676,9 @@ reply])))))) #+cljs -(defn- receive-buffered-evs! - [ch-recv clj] {:pre [(vector? clj)]} +(defn- receive-buffered-evs! [ch-recv clj] + (tracef "receive-buffered-evs!: %s" clj) + (assert (vector? clj)) (let [buffered-evs clj] (doseq [ev buffered-evs] (assert-event ev) @@ -635,11 +686,12 @@ #+cljs (defn- handle-when-handshake! [chsk clj] + (tracef "handle-when-handshake!: %s" clj) (when (and (vector? clj) ; Nb clj may be callback reply (= (first clj) :chsk/handshake)) (let [[_ [uid csrf-token]] clj] (when (str/blank? csrf-token) - (encore/warnf "Sente warning: NO CSRF TOKEN AVAILABLE")) + (warnf "Sente warning: NO CSRF TOKEN AVAILABLE")) (merge>chsk-state! chsk {:open? true :uid uid @@ -647,118 +699,132 @@ :handled))) #+cljs ;; Handles reconnects, keep-alives, callbacks: -(defrecord ChWebSocket [url chs socket_ kalive-ms kalive-timer_ kalive-due?_ - nattempt_ - cbs-waiting_ ; [dissoc'd-fn { ...}] - state_ ; {:type _ :open? _ :uid _ :csrf-token _} - data-format - ] +(defrecord ChWebSocket + [url chs socket_ kalive-ms kalive-timer_ kalive-due?_ nattempt_ + cbs-waiting_ ; [dissoc'd-fn { ...}] + state_ ; {:type _ :open? _ :uid _ :csrf-token _ :destroyed? _} + packer ; IPacker + ] + IChSocket (chsk-send! [chsk ev] (chsk-send! chsk ev nil nil)) (chsk-send! [chsk ev ?timeout-ms ?cb] - ;; (encore/debugf "Chsk send: (%s) %s" (if ?cb "cb" "no cb") ev) + ;; (debugf "Chsk send: (%s) %s" (if ?cb "cb" "no cb") ev) (assert-send-args ev ?timeout-ms ?cb) - (let [?cb-fn (wrap-cb-chan-as-fn ?cb ev)] + (let [?cb-fn (cb-chan-as-fn ?cb ev)] (if-not (:open? @state_) ; Definitely closed - (do (encore/warnf "Chsk send against closed chsk.") + (do (warnf "Chsk send against closed chsk.") (when ?cb-fn (?cb-fn :chsk/closed))) - (let [[edn ?cb-uuid] (wrap-clj->edn-msg-with-?cb - cbs-waiting_ ev ?timeout-ms ?cb-fn data-format)] + (let [?cb-uuid (when ?cb-fn + (encore/uuid-str 6)) ; Mini uuid (short-lived, per client) + ppstr (pack packer (meta ev) ev ?cb-uuid)] + + (when ?cb-uuid + (swap! cbs-waiting_ + (fn [[_ m]] [nil (assoc m ?cb-uuid ?cb-fn)])) + (when ?timeout-ms + (go (chsk-state! chsk {:destroyed? true :open? false}) + (chsk-reconnect! chsk)) - (chsk-make! [chsk] + (chsk-init! [chsk] (when-let [WebSocket (or (aget js/window "WebSocket") (aget js/window "MozWebSocket"))] ((fn connect! [] - (let [retry! - (fn [] - (let [nattempt* (swap! nattempt_ inc)] - (.clearInterval js/window @kalive-timer_) - (encore/warnf "Chsk is closed: will try reconnect (%s)." - nattempt*) - (encore/set-exp-backoff-timeout! connect! nattempt*)))] - - (if-let [socket (try (WebSocket. url) - (catch js/Error e - (encore/errorf "WebSocket js/Error: %s" e) - false))] - (->> - (doto socket - (aset "onerror" - (fn [ws-ev] (encore/errorf "WebSocket error: %s" ws-ev))) - (aset "onmessage" ; Nb receives both push & cb evs! - (fn [ws-ev] - (let [edn (aget ws-ev "data") - ;; Nb may or may NOT satisfy `event?` since we also - ;; receive cb replies here!: - [clj ?cb-uuid] (unwrap-edn-msg-with-?cb->clj data-format edn)] - ;; (assert-event clj) ;; NO! - (or - (and (handle-when-handshake! chsk clj) - (reset! nattempt_ 0)) - (if ?cb-uuid - (if-let [cb-fn (pull-unused-cb-fn! cbs-waiting_ ?cb-uuid)] - (cb-fn clj) - (encore/warnf "Cb reply w/o local cb-fn: %s" clj)) - (let [buffered-evs clj] - (receive-buffered-evs! (:recv chs) buffered-evs))))))) - (aset "onopen" - (fn [_ws-ev] - (reset! kalive-timer_ - (.setInterval js/window - (fn [] - (when @kalive-due?_ ; Don't ping unnecessarily - (chsk-send! chsk [:chsk/ws-ping])) - (reset! kalive-due?_ true)) - kalive-ms)) - ;; (merge>chsk-state! chsk - ;; {:open? true}) ; NO, handshake better! - )) - (aset "onclose" ; Fires repeatedly when server is down - (fn [_ws-ev] (merge>chsk-state! chsk {:open? false}) - (retry!)))) - - (reset! socket_)) - - ;; Couldn't even get a socket: - (retry!))))) + (when-not (:destroyed? @state_) + (let [retry! + (fn [] + (let [nattempt* (swap! nattempt_ inc)] + (.clearInterval js/window @kalive-timer_) + (warnf "Chsk is closed: will try reconnect (%s)." nattempt*) + (encore/set-exp-backoff-timeout! connect! nattempt*)))] + + (if-let [socket (try (WebSocket. url) + (catch js/Error e + (errorf "WebSocket js/Error: %s" e) + nil))] + (reset! socket_ + (doto socket + (aset "onerror" (fn [ws-ev] (errorf "WebSocket error: %s" ws-ev))) + (aset "onmessage" ; Nb receives both push & cb evs! + (fn [ws-ev] + (let [;; Nb may or may NOT satisfy `event?` since we also + ;; receive cb replies here! This is actually why + ;; we prefix our pstrs to indicate whether they're + ;; wrapped or not. + ppstr (aget ws-ev "data") + [clj ?cb-uuid] (unpack packer ppstr)] + ;; (assert-event clj) ;; NO! + (or + (and (handle-when-handshake! chsk clj) + (reset! nattempt_ 0)) + (if ?cb-uuid + (if-let [cb-fn (pull-unused-cb-fn! cbs-waiting_ + ?cb-uuid)] + (cb-fn clj) + (warnf "Cb reply w/o local cb-fn: %s" clj)) + (let [buffered-evs clj] + (receive-buffered-evs! (:chsk-state! chsk {:open? true}) + )) + + (aset "onclose" ; Fires repeatedly when server is down + (fn [_ws-ev] (merge>chsk-state! chsk {:open? false}) + (retry!))))) + + ;; Couldn't even get a socket: + (retry!)))))) chsk))) #+cljs -(defrecord ChAjaxSocket [url chs timeout ajax-client-uuid curr-xhr_ state_ data-format] +(defrecord ChAjaxSocket [url chs timeout ajax-client-uuid curr-xhr_ state_ packer] IChSocket (chsk-send! [chsk ev] (chsk-send! chsk ev nil nil)) (chsk-send! [chsk ev ?timeout-ms ?cb] - ;; (encore/debugf "Chsk send: (%s) %s" (if ?cb "cb" "no cb") ev) + ;; (debugf "Chsk send: (%s) %s" (if ?cb "cb" "no cb") ev) (assert-send-args ev ?timeout-ms ?cb) - (let [?cb-fn (wrap-cb-chan-as-fn ?cb ev)] + (let [?cb-fn (cb-chan-as-fn ?cb ev)] (if-not (:open? @state_) ; Definitely closed - (do (encore/warnf "Chsk send against closed chsk.") + (do (warnf "Chsk send against closed chsk.") (when ?cb-fn (?cb-fn :chsk/closed))) (do (encore/ajax-lite url {:method :post :timeout ?timeout-ms - :resp-type :text ; Prefer to do our own edn reading + :resp-type :text ; We'll do our own pstr decoding :params - (let [dummy-cb? (not ?cb-fn) - msg (if-not dummy-cb? ev {:chsk/clj ev - :chsk/dummy-cb? true}) - edn (write-data data-format msg)] - {:_ (encore/now-udt) ; Force uncached resp - :edn edn :csrf-token (:csrf-token @state_)})} + (let [ppstr (pack packer (meta ev) ev (when ?cb-fn :ajax-cb))] + {:_ (encore/now-udt) ; Force uncached resp + :ppstr ppstr + :csrf-token (:csrf-token @state_)})} (fn ajax-cb [{:keys [content error]}] (if error @@ -767,66 +833,69 @@ (do (merge>chsk-state! chsk {:open? false}) (when ?cb-fn (?cb-fn :chsk/error)))) - (let [resp-edn content - resp-clj (read-data data-format resp-edn)] + (let [resp-ppstr content + [resp-clj _] (unpack packer resp-ppstr)] (if ?cb-fn (?cb-fn resp-clj) - (when (not= resp-clj :chsk/cb-dummy-200) - (encore/warnf "Cb reply w/o local cb-fn: %s" resp-clj))) + (when (not= resp-clj :chsk/dummy-cb-200) + (warnf "Cb reply w/o local cb-fn: %s" resp-clj))) (merge>chsk-state! chsk {:open? true}))))) :apparent-success)))) - ;; Will auto-recover and handshake _iff_ uid has changed since last handshake: - (chsk-reconnect! [chsk] (when-let [x @curr-xhr_] (.abort x))) + (chsk-reconnect! [chsk] (when-let [x @curr-xhr_] (.abort x))) + (chsk-destroy! [chsk] + (merge>chsk-state! chsk {:destroyed? true :open? false}) + (chsk-reconnect! chsk)) - (chsk-make! [chsk] + (chsk-init! [chsk] ((fn async-poll-for-update! [nattempt] - (let [retry! - (fn [] - (let [nattempt* (inc nattempt)] - (encore/warnf - "Chsk is closed: will try reconnect (%s)." - nattempt*) - (encore/set-exp-backoff-timeout! - (partial async-poll-for-update! nattempt*) - nattempt*))) - - ajax-req! ; Just for Pace wrapping below - (fn [] - (reset! curr-xhr_ - (encore/ajax-lite url - {:method :get :timeout timeout - :resp-type :text ; Prefer to do our own edn reading - :params {:_ (encore/now-udt) ; Force uncached resp - :ajax-client-uuid ajax-client-uuid}} - (fn ajax-cb [{:keys [content error]}] - (if error - (if (or (= error :timeout) - (= error :abort) ; Abort => intentional, not an error - ;; It's particularly important that reconnect - ;; aborts don't mark a chsk as closed since - ;; we've no guarantee that a new handshake will - ;; take place to remark as open (e.g. if uid - ;; hasn't changed since last handshake). - ) - (async-poll-for-update! 0) - (do (merge>chsk-state! chsk {:open? false}) - (retry!))) - - ;; The Ajax long-poller is used only for events, never cbs: - (let [edn content - clj (read-data data-format edn)] - (or - (handle-when-handshake! chsk clj) - (let [buffered-evs clj] - (receive-buffered-evs! (:recv chs) buffered-evs) - (merge>chsk-state! chsk {:open? true}))) - (async-poll-for-update! 0)))))))] - - (if-let [pace (aget js/window "Pace")] - ;; Assumes relevant extern is defined for :advanced mode compilation: - (.ignore pace ajax-req!) ; Pace.js shouldn't trigger for long-polling - (ajax-req!)))) + (tracef "async-poll-for-update!") + (when-not (:destroyed? @state_) + (let [retry! + (fn [] + (let [nattempt* (inc nattempt)] + (warnf "Chsk is closed: will try reconnect (%s)." nattempt*) + (encore/set-exp-backoff-timeout! + (partial async-poll-for-update! nattempt*) + nattempt*))) + + ajax-req! ; Just for Pace wrapping below + (fn [] + (reset! curr-xhr_ + (encore/ajax-lite url + {:method :get :timeout timeout + :resp-type :text ; Prefer to do our own pstr reading + :params {:_ (encore/now-udt) ; Force uncached resp + :ajax-client-uuid ajax-client-uuid}} + (fn ajax-cb [{:keys [content error]}] + (if error + (if (or (= error :timeout) + (= error :abort) ; Abort => intentional, not err + ;; It's particularly important that reconnect + ;; aborts don't mark a chsk as closed since + ;; we've no guarantee that a new handshake will + ;; take place to remark as open (e.g. if uid + ;; hasn't changed since last handshake). + ) + (async-poll-for-update! 0) + (do (merge>chsk-state! chsk {:open? false}) + (retry!))) + + ;; The Ajax long-poller is used only for events, never cbs: + (let [ppstr content + [clj _] (unpack packer ppstr)] + (or + (handle-when-handshake! chsk clj) + (let [buffered-evs clj] + (receive-buffered-evs! (:chsk-state! chsk {:open? true}))) + (async-poll-for-update! 0)))))))] + + ;; TODO Make this pluggable + (if-let [pace (aget js/window "Pace")] + ;; Assumes relevant extern is defined for :advanced mode compilation: + (.ignore pace ajax-req!) ; Pace.js shouldn't trigger for long-polling + (ajax-req!))))) 0) chsk)) @@ -855,117 +924,144 @@ #+cljs (defn make-channel-socket! - "Returns `{:keys [chsk ch-recv send-fn state]}` for new ChWebSocket/ChAjaxSocket: - * chsk - The IChSocket implementer. You can usually ignore this. - * ch-recv - core.async channel that'll receive async (notably server>user) - events. - * send-fn - API fn to send client>server[1]. - * state - Watchable, read-only (atom {:type _ :open? _ :uid _ :csrf-token _}). + "Returns a map with keys: + :ch-recv ; core.async channel to receive `event-msg`s (internal or from clients). + :send-fn ; (fn [event & [?timeout-ms ?cb-fn]]) for client>server send. + :state ; Watchable, read-only (atom {:type _ :open? _ :uid _ :csrf-token _}). + :chsk ; IChSocket implementer. You can usu. ignore this. Common options: - * type ; e/o #{:auto :ws :ajax}. You'll usually want the default (:auto). - * ws-kalive-ms ; Ping to keep a WebSocket conn alive if no activity w/in - ; given number of milliseconds. - * lp-kalive-ms ; Ping to keep a long-polling (Ajax) conn alive ''. - * chsk-url-fn ; Please see `default-chsk-url-fn` for details. - * data-format : :edn (default) or :transit (experimental)" + :type ; e/o #{:auto :ws :ajax}. You'll usually want the default (:auto). + :ws-kalive-ms ; Ping to keep a WebSocket conn alive if no activity w/in given + ; number of milliseconds. + :lp-kalive-ms ; Ping to keep a long-polling (Ajax) conn alive ''. + :chsk-url-fn ; Please see `default-chsk-url-fn` for details. + :packer ; :edn (default), or an IPacker implementation (experimental)." [path & - & [{:keys [type recv-buf-or-n ws-kalive-ms lp-timeout chsk-url-fn data-format] + & [{:keys [type recv-buf-or-n ws-kalive-ms lp-timeout chsk-url-fn packer] :or {type :auto recv-buf-or-n (async/sliding-buffer 2048) ; Mostly for buffered-evs ws-kalive-ms 25000 ; < Heroku 30s conn timeout lp-timeout 25000 ; '' chsk-url-fn default-chsk-url-fn - data-format :edn}} + packer :edn}} _deprecated-more-opts]] {:pre [(#{:ajax :ws :auto} type)]} (when (not (nil? _deprecated-more-opts)) - (encore/warnf - "`make-channel-socket!` fn signature CHANGED with Sente v0.10.0.")) - - (let [;; Want _separate_ buffers for state+recv even if we're later merging - chs {:state (chan (async/sliding-buffer 1)) - :recv (chan recv-buf-or-n) - :internal (chan recv-buf-or-n)} + (warnf "`make-channel-socket!` fn signature CHANGED with Sente v0.10.0.")) + (let [packer (interfaces/coerce-packer packer) window-location (encore/get-window-location) + private-chs {:state (chan (async/sliding-buffer 1)) + :internal (chan (async/sliding-buffer 10)) + :ChWebSocket {:url (chsk-url-fn path window-location :ws) - :chs chs - :data-format data-format + :chs private-chs + :packer packer :socket_ (atom nil) :kalive-ms ws-kalive-ms :kalive-timer_ (atom nil) :kalive-due?_ (atom true) :nattempt_ (atom 0) :cbs-waiting_ (atom [nil {}]) - :state_ (atom {:type :ws :open? false})}))) + :state_ (atom {:type :ws :open? false + :destroyed? false})}))) (and (not= type :ws) (let [;; Unchanging over multiple long-poll (re)connects: ajax-client-uuid (encore/uuid-str)] - (chsk-make! + (chsk-init! (map->ChAjaxSocket {:url (chsk-url-fn path window-location (not :ws)) - :chs chs - :data-format data-format + :chs private-chs + :packer packer :timeout lp-timeout :ajax-client-uuid ajax-client-uuid :curr-xhr_ (atom nil) - :state_ (atom {:type :ajax :open? false})}))))) + :state_ (atom {:type :ajax :open? false + :destroyed? false})}))))) - ever-opened?_ (atom false) - state* (fn [state] - (if (or (not (:open? state)) @ever-opened?_) - state - (do (reset! ever-opened?_ true) - (assoc state :first-open? true))))] + send-fn (partial chsk-send! chsk) + + public-ch-recv + (async/map< + (fn ev->ev-msg [ev] + {:ch-recv public-ch-recv + :send-fn send-fn + :state (:state_ chsk) + :event ev}) + public-ch-recv)] (when chsk {:chsk chsk - :send-fn (partial chsk-send! chsk) - :state (:state_ chsk) - :ch-recv - (async/merge - [(->> (:internal chs) (async/map< (fn [ev] {:pre [(event? ev)]} ev))) - (->> (:state chs) (async/map< (fn [state] [:chsk/state (state* state)]))) - (->> (:recv chs) (async/map< (fn [ev] [:chsk/recv ev])))])}))) - -;;;; Routers - -#+clj -(defn start-chsk-router-loop! [event-msg-handler ch] - (let [ctrl-ch (chan)] + :ch-recv public-ch-recv + :send-fn send-fn + :state (:state_ chsk)}))) + +;;;; Router wrapper + +(defn start-chsk-router! + "Creates a go-loop to call `(event-msg-handler )` and returns a + `(fn stop! [])`. Advanced users may choose to instead write their own loop + against `ch-recv`." + [ch-recv event-msg-handler] + (let [ch-ctrl (chan)] (go-loop [] - (when-not ; nil or ::stop - (try - (let [[v p] (async/alts! [ch ctrl-ch])] - (if (identical? p ctrl-ch) ::stop + (when-not + (try ; Returns nil or ::stop + (let [[v p] (async/alts! [ch-recv ch-ctrl])] + (if (identical? p ch-ctrl) ::stop (let [event-msg v] (try - (timbre/tracef "Event-msg: %s" event-msg) - (do (event-msg-handler event-msg ch) nil) - (catch Throwable t - (timbre/errorf t "Chsk-router-loop handling error: %s" event-msg)))))) - (catch Throwable t - (timbre/errorf t "Chsk-router-loop channel error!"))) + (tracef "Pre-handler event-msg: %s" event-msg) + (assert (event-msg? v)) + (event-msg-handler event-msg) + nil + (catch #+clj Throwable #+cljs :default t + (errorf #+clj t + "Chsk router handling error: %s" event-msg)))))) + (catch #+clj Throwable #+cljs :default t + (errorf #+clj t + "Chsk router channel error!"))) (recur))) - (fn stop! [] (async/close! ctrl-ch)))) + (fn stop! [] (async/close! ch-ctrl)))) + +;;;; Deprecated + +#+clj +(defn start-chsk-router-loop! + "DEPRECATED: Please use `start-chsk-router!` instead." + [event-msg-handler ch-recv] + (start-chsk-router! ch-recv + ;; Old handler form: (fn [ev-msg ch-recv]) + (fn [ev-msg] (event-msg-handler ev-msg (:ch-recv ev-msg))))) #+cljs -(defn start-chsk-router-loop! [event-handler ch] - (let [ctrl-ch (chan)] - (go-loop [] - (let [[v p] (async/alts! [ch ctrl-ch])] - (if (identical? p ctrl-ch) ::stop - (let [[id data :as event] v] - ;; Provide ch to handler to allow event injection back into loop: - (event-handler event ch) ; Allow errors to throw - (recur))))) - (fn stop! [] (async/close! ctrl-ch)))) +(defn start-chsk-router-loop! + "DEPRECATED: Please use `start-chsk-router!` instead." + [event-handler ch-recv] + (start-chsk-router! ch-recv + ;; Old handler form: (fn [ev ch-recv]) + (fn [ev-msg] (event-handler (:event ev-msg) (:ch-recv ev-msg))))) diff --git a/src/taoensso/sente/interfaces.cljx b/src/taoensso/sente/interfaces.cljx new file mode 100644 index 0000000..e4538d7 --- /dev/null +++ b/src/taoensso/sente/interfaces.cljx @@ -0,0 +1,26 @@ +(ns taoensso.sente.interfaces + "Experimental (pre-alpha): subject to change. + Public interfaces / extension points." + #+clj (:require [clojure.tools.reader.edn :as edn]) + #+cljs (:require [cljs.reader :as edn])) + +;;;; Servers + +#+clj (defprotocol IAsyncHTTPServer "TODO: Extension pt. for HTTP servers.") + +;;;; Packers + +(defprotocol IPacker + "Extension pt. for client<->server comms data un/packers: + arbitrary Clojure data <-> serialized strings." + (pack [_ x]) + (unpack [_ x])) + +(deftype EdnPacker [] + IPacker + (pack [_ x] (pr-str x)) + (unpack [_ s] (edn/read-string s))) + +(def edn-packer "Default Edn packer." (->EdnPacker)) +(defn coerce-packer [x] (if (= x :edn) edn-packer + (do (assert (satisfies? IPacker x)) x))) diff --git a/src/taoensso/sente/packers/transit.cljx b/src/taoensso/sente/packers/transit.cljx new file mode 100644 index 0000000..b012485 --- /dev/null +++ b/src/taoensso/sente/packers/transit.cljx @@ -0,0 +1,161 @@ +(ns taoensso.sente.packers.transit + "Experimental (pre-alpha): subject to change. + Optional Transit-format[1] IPacker implementation for use with Sente. + [1] https://github.com/cognitect/transit-format." + {:author "Peter Taoussanis, @ckarlsen84"} + + #+clj + (:require [clojure.string :as str] + [clojure.tools.reader.edn :as edn] + [taoensso.encore :as encore] + [taoensso.timbre :as timbre] + [cognitect.transit :as transit] + [taoensso.sente.interfaces :as interfaces :refer (pack unpack)]) + + #+clj + (:import [java.io ByteArrayInputStream ByteArrayOutputStream]) + + #+cljs + (:require [clojure.string :as str] + [cljs.reader :as edn] + [taoensso.encore :as encore :refer (format)] + [cognitect.transit :as transit] + [taoensso.sente.interfaces :as interfaces :refer (pack unpack)])) + +(defn- get-charset [transit-fmt] + ;; :msgpack appears to need ISO-8859-1 to retain binary data correctly when + ;; string-encoded, all other (non-binary) formats can get UTF-8: + (if (= transit-fmt :msgpack) "ISO-8859-1" "UTF-8")) + +(deftype TransitPacker [transit-fmt] + taoensso.sente.interfaces/IPacker + (pack [_ x] + #+cljs (transit/write (transit/writer transit-fmt) x) + #+clj (let [charset (get-charset transit-fmt) + ^ByteArrayOutputStream baos (ByteArrayOutputStream. 512)] + (transit/write (transit/writer baos transit-fmt) x) + (.toString baos ^String charset))) + + (unpack [_ s] + #+cljs (transit/read (transit/reader transit-fmt) s) + #+clj (let [charset (get-charset transit-fmt) + ba (.getBytes ^String s ^String charset) + ^ByteArrayInputStream bais (ByteArrayInputStream. ba)] + (transit/read (transit/reader bais transit-fmt))))) + +(def ^:private edn-packer interfaces/edn-packer) ; Alias +(def ^:private json-packer (->TransitPacker :json)) +(def ^:private msgpack-packer (->TransitPacker :msgpack)) + +;;;; FlexiPacker + +(defn- max-flexi-format? [fmt] (= fmt :msgpack)) +(def ^:private max-flexi-format + (let [ordered-formats [nil :edn :json :msgpack] + scored-formats (zipmap ordered-formats (next (range)))] + (fn [xs] (apply max-key scored-formats xs)))) + +(comment (max-flexi-format [:msgpack :json])) + +(defn- auto-flexi-format [x] + (cond + (string? x) ; Large strings are common for HTML, etc. + (let [c (count x)] + (cond (> c 500) :msgpack + (> c 300) :json)) + + (and (sequential? x) (counted? x)) + (let [c (count x)] + (cond (> c 50) :msgpack + (> c 20) :json + ;; TODO Try heuristically? (check random sample, etc.) + )))) + +(comment (auto-flexi-format (take 100 (range)))) + +(deftype FlexiPacker [default-fmt] + taoensso.sente.interfaces/IPacker + (pack [_ x] + (let [?meta-format (when-let [m (meta x)] + (max-flexi-format (filter m (keys m)))) + ?auto-format (when-not ?meta-format (auto-flexi-format x)) + ;; ?auto-format (when-not (max-flexi-format? ?meta-format) + ;; (auto-flexi-format x)) + fmt (max-flexi-format [?auto-format ?meta-format default-fmt])] + (case fmt + :msgpack (str "m" (pack msgpack-packer x)) + :json (str "j" (pack json-packer x)) + :edn (str "e" (pack edn-packer x))))) + + (unpack [_ s] + (let [prefix (encore/substr s 0 1) + s* (encore/substr s 1)] + (case prefix + "m" (unpack msgpack-packer s*) + "j" (unpack json-packer s*) + "e" (unpack edn-packer s*) + (throw (ex-info (str "Malformed FlexiPacker data: " s) + {:s s})))))) + +(defn get-flexi-packer + "Experimental (pre-alpha): subject to change. + Returns an IPacker implementation that un/packs data with a variable format + determined by the data's size, metadata, or the provided `default-fmt` when no + metadata is present. + + (def fpack (partial pack (get-flexi-packer :edn))) + (fpack ^:edn {:a :A :b :B}) => \"e{:a :A, :b :B}\" + (fpack ^:json {:a :A :b :B}) => \"j[\"^ \",\"~:a\",\"~:A\",\"~:b\",\"~:B\"]\" + (fpack ^:msgpack {:a :A :b :B} => \"m\202£~:a£~:A£~:b£~:B\"" + + [& [?default-fmt]] + (let [default-fmt (or ?default-fmt :edn)] + (assert (#{:edn ; Not a transit format + ;; Transit formats: + :json :json-verbose :msgpack} default-fmt)) + (->FlexiPacker default-fmt))) + +(def default-flexi-packer (get-flexi-packer :edn)) + +(comment + (let [fpacker (get-flexi-packer)] + (def fpack (partial pack fpacker)) + (def funpack (partial unpack fpacker))) + (count (fpack ^:edn {:a :A :b :B})) + (count (fpack ^:json {:a :A :b :B})) + (count (fpack ^:msgpack {:a :A :b :B})) + (funpack (fpack ^:msgpack {:a :A :b :B :utf8 "ಬಾ ಇಲ್ಲಿ ಸಂಭವಿಸ"}))) + +(comment ; Packer benchmarks + (let [data + {:sm "Hello this is just a small string" + :md {:a :A :b :B :c :C :d "This is a slightly larger datum, yo"} + :lg ^:msgpack + {:a "ahjkhfkjdhfkjdhfjkhdfjkhdjkfhdfkjhdfkjsdsfsifsuifuiosudfd" + :b "fdjhfkjdhfjkdhfjkdhfjkhdjfkhdjkfhdfjkhsfsfiueiuiuiufiuiid" + :c "fdjhfs[pdopoeiroejlkjfdklfjdkjfkdjfkldsfdfueiuiyqqqhahdhf" + :d "fdkjoiwueuoiuwdm,sn,mndfdifdiofudfuoidfdfdfdfe3iuqiiuausj" + :e "ejhfiurhiuui2ureoiuoieuroiueoirueioureisfdfddjghiuyiuyeyu" + :f [1 383 398498 2 9 3389 893 9 309 290349 3782 1273 4447 933] + :g #{:a :b :c :d :e :f :g :h :hello/foo :hello/bar :hello/baz}}} + + data (:lg data) ; <-- Tweak input data size here + size (fn [packer] (count (pack packer data))) + bench (fn [packer] (encore/round (encore/qbench 10000 + (unpack packer (pack packer data)))))] + + {:size {:edn (size edn-packer) + :json (size json-packer) + :msgpack (size msgpack-packer) + :flexi (size (get-flexi-packer))} + :time {:edn (bench edn-packer) + :json (bench json-packer) + :msgpack (bench msgpack-packer) + :flexi (bench (get-flexi-packer))}}) + + {:size {:edn 35, :json 43, :msgpack 41}, + :time {:edn 81, :json 316, :msgpack 515}} + {:size {:edn 63, :json 86, :msgpack 67}, + :time {:edn 228, :json 284, :msgpack 613}} + {:size {:edn 448, :json 510, :msgpack 444, :flexi 445}, + :time {:edn 3027, :json 1054, :msgpack 2054, :flexi 2213}}) diff --git a/src/taoensso/sente/servers/http-kit.cljx b/src/taoensso/sente/servers/http-kit.cljx new file mode 100644 index 0000000..d0d0382 --- /dev/null +++ b/src/taoensso/sente/servers/http-kit.cljx @@ -0,0 +1,7 @@ +(ns taoensso.sente.servers.http-kit + "Experimental (pre-alpha): subject to change. + Optional http-kit[1] IAsyncHTTPServer implementation for use with Sente. + [1] http://http-kit.org/." + {:author "Peter Taoussanis"}) + +;;;; TODO