Skip to content

Commit

Permalink
Merge pull request #67 from ckarlsen84/master
Browse files Browse the repository at this point in the history
Experimental support for transit (@ckarlsen)
  • Loading branch information
ptaoussanis committed Aug 28, 2014
2 parents 93a8660 + bc1c443 commit 50fc9ab
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 30 deletions.
6 changes: 4 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.taoensso/sente "0.15.1"
(defproject com.taoensso/sente "0.15.1-SNAPSHOT"
:author "Peter Taoussanis <https://www.taoensso.com>"
:description "Clojure channel sockets library"
:url "https://github.com/ptaoussanis/sente"
Expand All @@ -17,7 +17,9 @@
[org.clojure/tools.reader "0.8.5"]
[com.taoensso/encore "1.7.0"]
[com.taoensso/timbre "3.2.1"]
[http-kit "2.1.18"]]
[http-kit "2.1.18"]
[com.cognitect/transit-clj "0.8.247"]
[com.cognitect/transit-cljs "0.8.182"]]

:plugins
[[com.keminglabs/cljx "0.4.0"]
Expand Down
107 changes: 79 additions & 28 deletions src/taoensso/sente.cljx
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,61 @@
[clojure.tools.reader.edn :as edn]
[org.httpkit.server :as http-kit]
[taoensso.encore :as encore]
[taoensso.timbre :as timbre])
[taoensso.timbre :as timbre]
[cognitect.transit :as transit])
#+clj
(:import [java.io ByteArrayInputStream ByteArrayOutputStream])

#+cljs
(:require [clojure.string :as str]
[cljs.core.async :as async :refer (<! >! put! chan)]
[cljs.reader :as edn]
[taoensso.encore :as encore :refer (format)])
[taoensso.encore :as encore :refer (format)]
[cognitect.transit :as transit])

#+cljs
(:require-macros [cljs.core.async.macros :as asyncm :refer (go go-loop)]))

;;;;
;;;; Transit support
;;;;

#+clj
(defn write-transit [x]
(let [out (ByteArrayOutputStream. )
w (transit/writer out :json)
_ (transit/write w x)
ret (.toString out)]
(.reset out)
ret))

#+cljs
(defn write-transit [x]
(let [w (transit/writer :json)]
(transit/write w x)))

#+clj
(defn read-transit [x]
(let [in (ByteArrayInputStream. (.getBytes x))
reader (transit/reader in :json)]
(transit/read reader)))

#+cljs
(defn read-transit [x]
(let [reader (transit/reader :json)]
(transit/read reader x)))

(defn write-data [data-format s]
(if (= data-format :transit)
(write-transit s)
(pr-str s)))

(defn read-data [data-format s]
(if (= data-format :transit)
(read-transit s)
(edn/read-string s)))


;;;; Shared (client+server)

(defn- chan? [x]
Expand Down Expand Up @@ -116,14 +160,14 @@
(not (#{:chsk/closed :chsk/timeout :chsk/error} cb-reply)))

#+clj ; For #+cljs we'd rather just throw client-side on bad edn from server
(defn- try-read-edn [edn]
(try (edn/read-string edn)
(defn- try-read-edn [data-format edn]
(try (read-data data-format edn)
(catch Throwable t [:chsk/bad-edn edn])))

(defn- unwrap-edn-msg-with-?cb->clj "edn -> [clj ?cb-uuid]"
[edn]
(let [msg #+clj (try-read-edn edn)
#+cljs (edn/read-string edn)
[data-format edn]
(let [msg #+clj (try-read-edn data-format edn)
#+cljs (read-data data-format edn)
?cb-uuid (and (map? msg) (:chsk/cb-uuid msg))
clj (if-not ?cb-uuid msg (:chsk/clj msg))]
[clj ?cb-uuid]))
Expand Down Expand Up @@ -230,21 +274,23 @@
* csrf-token-fn ; (fn [ring-req]) -> CSRF token for Ajax POSTs.
* send-buf-ms-ajax ; [1]
* send-buf-ms-ws ; [1]
* data-format : :edn (default) or :transit (experimental)
[1] Optimization to allow transparent batching of rapidly-triggered
server>user pushes. This is esp. important for Ajax clients which use a
(slow) reconnecting poller. Actual event dispatch may occur <= given ms
after send call (larger values => larger batch windows)."
[& [{:keys [recv-buf-or-n send-buf-ms-ajax send-buf-ms-ws
user-id-fn csrf-token-fn]
user-id-fn csrf-token-fn data-format]
:or {recv-buf-or-n (async/sliding-buffer 1000)
send-buf-ms-ajax 100
send-buf-ms-ws 30
user-id-fn (fn [ring-req] (get-in ring-req [:session :uid]))
csrf-token-fn (fn [ring-req]
(or (get-in ring-req [:session :csrf-token])
(get-in ring-req [:session :ring.middleware.anti-forgery/anti-forgery-token])
(get-in ring-req [:session "__anti-forgery-token"])))}}]]
(get-in ring-req [:session "__anti-forgery-token"])))
data-format :edn}}]]

{:pre [(encore/pos-int? send-buf-ms-ajax)
(encore/pos-int? send-buf-ms-ws)]}
Expand Down Expand Up @@ -325,7 +371,7 @@
(assert (vector? buffered-evs))
(assert (set? ev-uuids))

(let [buffered-evs-edn (pr-str buffered-evs)]
(let [buffered-evs-edn (write-data data-format buffered-evs)]
(case type
:ws (send-buffered-evs>ws-clients! conns_
uid buffered-evs-edn)
Expand Down Expand Up @@ -371,7 +417,7 @@
:ajax-post-fn ; Does not participate in `conns_` (has specific req->resp)
(fn [ring-req]
(http-kit/with-channel ring-req hk-ch
(let [msg (-> ring-req :params :edn try-read-edn)
(let [msg (->> ring-req :params :edn (try-read-edn data-format))
dummy-cb? (and (map? msg) (:chsk/dummy-cb? msg))
clj (if-not dummy-cb? msg (:chsk/clj msg))]

Expand All @@ -384,13 +430,13 @@
(when-not dummy-cb?
(fn reply-fn [resp-clj] ; Any clj form
(timbre/tracef "Chsk send (ajax reply): %s" resp-clj)
(let [resp-edn (pr-str resp-clj)]
(let [resp-edn (write-data data-format resp-clj)]
;; true iff apparent success:
(http-kit/send! hk-ch resp-edn))))})

(when dummy-cb?
(timbre/tracef "Chsk send (ajax reply): cb-dummy-200")
(http-kit/send! hk-ch (pr-str :chsk/cb-dummy-200))))))
(http-kit/send! hk-ch (write-data data-format :chsk/cb-dummy-200))))))

:ajax-get-or-ws-handshake-fn ; Ajax handshake/poll, or WebSocket handshake
(fn [ring-req]
Expand All @@ -413,7 +459,7 @@

handshake!
(fn [hk-ch] (http-kit/send! hk-ch
(pr-str [:chsk/handshake [uid csrf-token]])))]
(write-data data-format [:chsk/handshake [uid csrf-token]])))]

(if (:websocket? ring-req)
(do ; WebSocket handshake
Expand All @@ -425,13 +471,13 @@

(http-kit/on-receive hk-ch
(fn [req-edn]
(let [[clj ?cb-uuid] (unwrap-edn-msg-with-?cb->clj req-edn)]
(let [[clj ?cb-uuid] (unwrap-edn-msg-with-?cb->clj data-format req-edn)]
(receive-event-msg!* clj
(when ?cb-uuid
(fn reply-fn [resp-clj] ; Any clj form
(timbre/tracef "Chsk send (ws reply): %s" resp-clj)
(let [resp-edn (pr-str {:chsk/clj resp-clj
:chsk/cb-uuid ?cb-uuid})]
(let [resp-edn (write-data data-format {:chsk/clj resp-clj
:chsk/cb-uuid ?cb-uuid})]
;; true iff apparent success:
(http-kit/send! hk-ch resp-edn))))))))

Expand Down Expand Up @@ -531,11 +577,11 @@

#+cljs
(defn- wrap-clj->edn-msg-with-?cb "clj -> [edn ?cb-uuid]"
[cbs-waiting_ clj ?timeout-ms ?cb-fn]
[cbs-waiting_ clj ?timeout-ms ?cb-fn data-format]
(let [?cb-uuid (when ?cb-fn (encore/uuid-str))
msg (if-not ?cb-uuid clj {:chsk/clj clj :chsk/cb-uuid ?cb-uuid})
;; Note that if pr-str throws, it'll throw before swap!ing cbs-waiting:
edn (pr-str msg)]
edn (write-data data-format msg)]
(when ?cb-uuid
(swap! cbs-waiting_
(fn [[_ m]] [nil (assoc m ?cb-uuid ?cb-fn)]))
Expand Down Expand Up @@ -605,6 +651,7 @@
nattempt_
cbs-waiting_ ; [dissoc'd-fn {<uuid> <fn> ...}]
state_ ; {:type _ :open? _ :uid _ :csrf-token _}
data-format
]
IChSocket
(chsk-send! [chsk ev] (chsk-send! chsk ev nil nil))
Expand All @@ -616,7 +663,7 @@
(do (encore/warnf "Chsk send against closed chsk.")
(when ?cb-fn (?cb-fn :chsk/closed)))
(let [[edn ?cb-uuid] (wrap-clj->edn-msg-with-?cb
cbs-waiting_ ev ?timeout-ms ?cb-fn)]
cbs-waiting_ ev ?timeout-ms ?cb-fn data-format)]
(try
(.send @socket_ edn)
(reset! kalive-due?_ false)
Expand Down Expand Up @@ -657,7 +704,7 @@
(let [edn (aget ws-ev "data")
;; Nb may or may NOT satisfy `event?` since we also
;; receive cb replies here!:
[clj ?cb-uuid] (unwrap-edn-msg-with-?cb->clj edn)]
[clj ?cb-uuid] (unwrap-edn-msg-with-?cb->clj data-format edn)]
;; (assert-event clj) ;; NO!
(or
(and (handle-when-handshake! chsk clj)
Expand Down Expand Up @@ -691,7 +738,7 @@
chsk)))

#+cljs
(defrecord ChAjaxSocket [url chs timeout ajax-client-uuid curr-xhr_ state_]
(defrecord ChAjaxSocket [url chs timeout ajax-client-uuid curr-xhr_ state_ data-format]
IChSocket
(chsk-send! [chsk ev] (chsk-send! chsk ev nil nil))
(chsk-send! [chsk ev ?timeout-ms ?cb]
Expand All @@ -709,7 +756,7 @@
(let [dummy-cb? (not ?cb-fn)
msg (if-not dummy-cb? ev {:chsk/clj ev
:chsk/dummy-cb? true})
edn (pr-str msg)]
edn (write-data data-format msg)]
{:_ (encore/now-udt) ; Force uncached resp
:edn edn :csrf-token (:csrf-token @state_)})}

Expand All @@ -721,7 +768,7 @@
(when ?cb-fn (?cb-fn :chsk/error))))

(let [resp-edn content
resp-clj (edn/read-string resp-edn)]
resp-clj (read-data data-format resp-edn)]
(if ?cb-fn (?cb-fn resp-clj)
(when (not= resp-clj :chsk/cb-dummy-200)
(encore/warnf "Cb reply w/o local cb-fn: %s" resp-clj)))
Expand Down Expand Up @@ -768,7 +815,7 @@

;; The Ajax long-poller is used only for events, never cbs:
(let [edn content
clj (edn/read-string edn)]
clj (read-data data-format edn)]
(or
(handle-when-handshake! chsk clj)
(let [buffered-evs clj]
Expand Down Expand Up @@ -820,14 +867,16 @@
* ws-kalive-ms ; Ping to keep a WebSocket conn alive if no activity w/in
; given number of milliseconds.
* lp-kalive-ms ; Ping to keep a long-polling (Ajax) conn alive ''.
* chsk-url-fn ; Please see `default-chsk-url-fn` for details."
* chsk-url-fn ; Please see `default-chsk-url-fn` for details.
* data-format : :edn (default) or :transit (experimental)"
[path &
& [{:keys [type recv-buf-or-n ws-kalive-ms lp-timeout chsk-url-fn]
& [{:keys [type recv-buf-or-n ws-kalive-ms lp-timeout chsk-url-fn data-format]
:or {type :auto
recv-buf-or-n (async/sliding-buffer 2048) ; Mostly for buffered-evs
ws-kalive-ms 25000 ; < Heroku 30s conn timeout
lp-timeout 25000 ; ''
chsk-url-fn default-chsk-url-fn}}
chsk-url-fn default-chsk-url-fn
data-format :edn}}
_deprecated-more-opts]]

{:pre [(#{:ajax :ws :auto} type)]}
Expand All @@ -849,6 +898,7 @@
(map->ChWebSocket
{:url (chsk-url-fn path window-location :ws)
:chs chs
:data-format data-format
:socket_ (atom nil)
:kalive-ms ws-kalive-ms
:kalive-timer_ (atom nil)
Expand All @@ -864,6 +914,7 @@
(map->ChAjaxSocket
{:url (chsk-url-fn path window-location (not :ws))
:chs chs
:data-format data-format
:timeout lp-timeout
:ajax-client-uuid ajax-client-uuid
:curr-xhr_ (atom nil)
Expand Down

0 comments on commit 50fc9ab

Please sign in to comment.