Skip to content

Commit e81ca25

Browse files
committed
perf: throttle rtc-state-flow for postmessage
1 parent a9642ed commit e81ca25

File tree

3 files changed

+25
-5
lines changed

3 files changed

+25
-5
lines changed

src/main/frontend/common/missionary_util.cljs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@
6868
(catch Cancelled _
6969
(m/amb)))))))
7070

71+
(defn throttle [dur-ms >in]
72+
(m/ap
73+
(let [x (m/?> (m/relieve {} >in))]
74+
(m/amb x (do (m/? (m/sleep dur-ms)) (m/amb))))))
75+
7176
(defn run-task
7277
"Return the canceler"
7378
[task key & {:keys [succ fail]}]

src/main/frontend/worker/db_listener.cljs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ generate asset-change events.")
9898
(d/unlisten! conn ::listen-db-changes!)
9999
(prn :listen-db-changes! (keys handlers) :repo repo)
100100
(d/listen! conn ::listen-db-changes!
101-
(fn [{:keys [tx-data _db-before _db-after tx-meta] :as tx-report}]
101+
(fn listen-db-changes!-inner
102+
[{:keys [tx-data _db-before _db-after tx-meta] :as tx-report}]
102103
(let [tx-meta (merge (batch-tx/get-batch-opts) tx-meta)
103104
pipeline-replace? (:pipeline-replace? tx-meta)
104105
in-batch-tx-mode? (:batch-tx/batch-tx-mode? tx-meta)]

src/main/frontend/worker/rtc/core.cljs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
(ns frontend.worker.rtc.core
22
"Main(use missionary) ns for rtc related fns"
33
(:require [frontend.common.missionary-util :as c.m]
4+
[frontend.worker.device :as worker-device]
45
[frontend.worker.rtc.asset :as r.asset]
56
[frontend.worker.rtc.client :as r.client]
67
[frontend.worker.rtc.client-op :as client-op]
@@ -16,8 +17,7 @@
1617
[logseq.common.config :as common-config]
1718
[logseq.db :as ldb]
1819
[malli.core :as ma]
19-
[missionary.core :as m]
20-
[frontend.worker.device :as worker-device])
20+
[missionary.core :as m])
2121
(:import [missionary Cancelled]))
2222

2323
(def ^:private rtc-state-schema
@@ -306,7 +306,7 @@
306306
:block-uuids [block-uuid]
307307
:graph-uuid graph-uuid}))))
308308

309-
(def ^:private create-get-state-flow
309+
(def ^:private create-get-state-flow*
310310
(let [rtc-loop-metadata-flow (m/watch *rtc-loop-metadata)]
311311
(m/ap
312312
(let [{rtc-lock :*rtc-lock :keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push? *online-users]}
@@ -331,6 +331,8 @@
331331
(rtc-log-and-state/create-remote-t-flow graph-uuid))))
332332
(catch Cancelled _))))))
333333

334+
(def ^:private create-get-state-flow (c.m/throttle 300 create-get-state-flow*))
335+
334336
(defn new-task--get-debug-state
335337
[]
336338
(m/reduce {} nil (m/eduction (take 1) create-get-state-flow)))
@@ -415,4 +417,16 @@
415417
(def a (atom 1))
416418
(def f1 (m/watch a))
417419
(def f2 (create-pull-remote-updates-flow 5000 f1))
418-
(def cancel (c.m/run-task (m/reduce (fn [_ v] (prn :v v)) f2) :xxx))))
420+
(def cancel (c.m/run-task (m/reduce (fn [_ v] (prn :v v)) f2) :xxx)))
421+
422+
(defn sleep-emit [delays]
423+
(m/ap (let [n (m/?> (m/seed delays))
424+
r (m/? (m/sleep n n))]
425+
(prn :xxx r (t/now))
426+
r)))
427+
428+
(def cancel
429+
((->> (m/sample vector
430+
(m/latest identity (m/reductions {} 0 (sleep-emit [1000 1 2])))
431+
(sleep-emit [2000 3000 1000]))
432+
(m/reduce (fn [_ v] (prn :v v)))) prn prn)))

0 commit comments

Comments
 (0)