Skip to content

Commit

Permalink
Sync: synchronizer transact!, handle the gap between snapshot and txes
Browse files Browse the repository at this point in the history
  • Loading branch information
tonsky committed Oct 24, 2023
1 parent f52eaae commit db32698
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 25 deletions.
9 changes: 5 additions & 4 deletions src/datascript/conn.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@
(transact! conn tx-data nil))
([conn tx-data tx-meta]
{:pre [(conn? conn)]}
(let [report (-transact! conn tx-data tx-meta)]
(doseq [[_ callback] (some-> (:listeners (meta conn)) (deref))]
(callback report))
report)))
(locking conn
(let [report (-transact! conn tx-data tx-meta)]
(doseq [[_ callback] (some-> (:listeners (meta conn)) (deref))]
(callback report))
report))))

(defn reset-conn!
([conn db]
Expand Down
2 changes: 1 addition & 1 deletion src/datascript/sync/client.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[datascript.serialize :as serialize]))

(defn client-id []
(long (* (rand) 9007199254740991)))
(long (* (rand) 0x1FFFFFFFFFFFFF)))

(def *last-tx-id
(atom 0))
Expand Down
39 changes: 26 additions & 13 deletions src/datascript/sync/server.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
(get @(:clients (meta conn)) channel))

(defn on-tx [conn report]
;; TODO filter what to send where
(let [msg {:message :transacted
:tx-data (db/tx-from-datoms (:tx-data report))
:tx-id (:tx-id (:tx-meta report))
:server-idx (:db/current-tx (:tempids report))}]
(doseq [[channel {:keys [status send-fn]}] @(:clients (meta conn))
; :let [_ (prn "broadcasting to" channel status)]
:when (= :active status)]
(send-fn channel msg))))
(let [*clients (:clients (meta conn))
msg {:message :transacted
:tx-data (db/tx-from-datoms (:tx-data report))
:tx-id (:tx-id (:tx-meta report))
:server-idx (:db/current-tx (:tempids report))}]
(doseq [[channel {:keys [status send-fn pending]}] @*clients]
(if (= :active status)
(do
(when pending
(doseq [msg pending]
(send-fn channel msg))
(swap! *clients update client dissoc :pending))
(send-fn channel msg))
(swap! *clients update client update :pending (fnil conj []) msg)))))

(defn client-connected [conn channel send-fn]
(let [*clients (:clients (meta conn))
Expand All @@ -27,18 +32,26 @@
(conn/listen! conn :sync #(on-tx conn %)))
nil))

(defn drop-before [txs server-idx]
(vec
(drop-while #(<= (:server-idx %) server-idx) txs)))

(defn client-message [conn channel body]
(case (:message body)
:catching-up
(let [{:keys [patterns server-idx]} body ;; TODO delta from server-idx
{:keys [send-fn]} (client conn channel)
db @conn]
db @conn
server-idx (:max-tx db)]
(send-fn channel
{:message :catched-up
:snapshot (serialize/serializable db) ;; TODO patterns
:server-idx (:max-tx db)})
;; TODO race - external txs between (:max-tx db) and after :status :active
(swap! (:clients (meta conn)) update channel assoc :status :active))
:server-idx server-idx})
(swap! (:clients (meta conn)) update channel
(fn [client]
(-> client
(assoc :status :active)
(update :pending drop-before server-idx)))))

:transacting
(doseq [{:keys [tx-data tx-id]} (:txs body)]
Expand Down
10 changes: 3 additions & 7 deletions test/datascript/test/sync.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@
(throw (ex-info "Timeout" {}))))))

(def freeze
; identity
tdc/transit-write-str
)
tdc/transit-write-str)

(def thaw
; identity
tdc/transit-read-str
)
tdc/transit-read-str)

(defn setup []
(let [server (d/create-conn)
Expand Down Expand Up @@ -80,7 +76,7 @@
(tdc/all-datoms @c2)))
(is (= #{[1 :name "Ivan"]}
(tdc/all-datoms @server)))))


; (t/test-ns *ns*)
; (t/run-test-var #'test-conn)
Expand Down

0 comments on commit db32698

Please sign in to comment.