Skip to content

Commit 8e1be9a

Browse files
deephbztiensonqin
authored andcommitted
fix(db-sync): stream snapshot downloads directly from sync do
1 parent 3a04a0e commit 8e1be9a

2 files changed

Lines changed: 20 additions & 60 deletions

File tree

deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@
129129
(let [url (js/URL. (.-url request))]
130130
(str (.-origin url) "/assets/" graph-id "/" snapshot-id ".snapshot")))
131131

132+
(defn- snapshot-stream-url [request graph-id]
133+
(let [url (js/URL. (.-url request))]
134+
(str (.-origin url) "/sync/" graph-id "/snapshot/stream")))
135+
132136
(defn- maybe-decompress-stream [stream encoding]
133137
(if (and (= encoding snapshot-content-encoding) (exists? js/DecompressionStream))
134138
(.pipeThrough stream (js/DecompressionStream. "gzip"))
@@ -419,35 +423,17 @@
419423

420424
(defn- handle-sync-snapshot-download
421425
[^js self request]
422-
(let [graph-id (graph-id-from-request request)
423-
^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self))]
426+
(let [graph-id (graph-id-from-request request)]
424427
(cond
425428
(not (seq graph-id))
426429
(http/bad-request "missing graph id")
427430

428-
(nil? bucket)
429-
(http/error-response "missing assets bucket" 500)
430-
431431
:else
432432
(p/let [ready-for-sync? (<ready-for-sync? self graph-id)]
433433
(if-not ready-for-sync?
434434
(http/error-response "graph not ready" 409)
435-
(p/let [snapshot-id (str (random-uuid))
436-
key (snapshot-key graph-id snapshot-id)
437-
stream (-> (snapshot-export-stream self)
438-
(maybe-compress-stream))
439-
multipart? (and (some? (.-createMultipartUpload bucket))
440-
(fn? (.-createMultipartUpload bucket)))
441-
opts #js {:httpMetadata #js {:contentType snapshot-content-type
442-
:contentEncoding snapshot-content-encoding
443-
:cacheControl snapshot-cache-control}
444-
:customMetadata #js {:purpose "snapshot"
445-
:created-at (str (common/now-ms))}}
446-
_ (if multipart?
447-
(upload-multipart! bucket key stream opts)
448-
(p/let [body (<buffer-stream stream)]
449-
(.put bucket key body opts)))
450-
url (snapshot-url request graph-id snapshot-id)]
435+
(let [key (str "stream/" graph-id ".snapshot")
436+
url (snapshot-stream-url request graph-id)]
451437
(http/json-response :sync/snapshot-download {:ok true
452438
:key key
453439
:url url

deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs

Lines changed: 13 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -29,55 +29,29 @@
2929

3030
(deftest snapshot-download-uses-gzip-encoding-when-compression-supported-test
3131
(async done
32-
(let [put-call (atom nil)
33-
rows [[1 "row-1" nil]
34-
[2 "row-2" "{\"a\":1}"]]
35-
bucket #js {:put (fn [key body opts]
36-
(reset! put-call {:key key :body body :opts opts})
37-
(js/Promise.resolve #js {:ok true}))}
38-
sql (empty-sql)
32+
(let [sql (empty-sql)
3933
conn (d/create-conn db-schema/schema)
40-
self #js {:env #js {:LOGSEQ_SYNC_ASSETS bucket}
34+
self #js {:env #js {}
4135
:conn conn
4236
:schema-ready true
4337
:sql sql}
4438
{:keys [request url]} (request-url)
45-
original-compression-stream (.-CompressionStream js/globalThis)
46-
restore! #(aset js/globalThis "CompressionStream" original-compression-stream)]
47-
(aset js/globalThis
48-
"CompressionStream"
49-
(passthrough-compression-stream-constructor))
50-
(-> (p/with-redefs [sync-handler/fetch-snapshot-kvs-rows (fn [_sql last-addr _limit]
51-
(if (neg? last-addr) rows []))
52-
sync-handler/snapshot-row-count (fn [_sql] (count rows))]
53-
(p/let [resp (sync-handler/handle {:self self
54-
:request request
55-
:url url
56-
:route {:handler :sync/snapshot-download}})
39+
expected-url "http://localhost/sync/graph-1/snapshot/stream"]
40+
(-> (p/let [resp (sync-handler/handle {:self self
41+
:request request
42+
:url url
43+
:route {:handler :sync/snapshot-download}})
5744
text (.text resp)
58-
body (js->clj (js/JSON.parse text) :keywordize-keys true)
59-
http-metadata (aget (:opts @put-call) "httpMetadata")
60-
payload (js/Uint8Array. (:body @put-call))
61-
rows (snapshot/finalize-framed-buffer payload)
62-
addrs (mapv first rows)]
45+
body (js->clj (js/JSON.parse text) :keywordize-keys true)]
6346
(is (= 200 (.-status resp)))
47+
(is (= true (:ok body)))
48+
(is (= "stream/graph-1.snapshot" (:key body)))
49+
(is (= expected-url (:url body)))
6450
(is (= "gzip" (:content-encoding body)))
65-
(is (= "gzip" (aget http-metadata "contentEncoding")))
66-
(is (= "application/transit+json" (aget http-metadata "contentType")))
67-
(is (= 2 (count rows)))
68-
(is (= (sort addrs) addrs))
69-
(is (every? (fn [[addr content _addresses]]
70-
(and (int? addr)
71-
(string? content)))
72-
rows))
73-
(is (= [[1 "row-1" nil]
74-
[2 "row-2" "{\"a\":1}"]]
75-
rows))))
51+
(done))
7652
(p/then (fn []
77-
(restore!)
78-
(done)))
53+
nil))
7954
(p/catch (fn [error]
80-
(restore!)
8155
(is false (str error))
8256
(done)))))))
8357

0 commit comments

Comments
 (0)