|
19 | 19 | [missionary.core :as m]
|
20 | 20 | [tick.core :as tick]))
|
21 | 21 |
|
22 |
| -(defn- new-task--register-graph-updates |
23 |
| - [get-ws-create-task graph-uuid major-schema-version repo] |
| 22 | +(defn- apply-remote-updates-from-apply-ops |
| 23 | + [apply-ops-resp graph-uuid repo conn date-formatter add-log-fn] |
| 24 | + (if-let [remote-ex (:ex-data apply-ops-resp)] |
| 25 | + (do (add-log-fn :rtc.log/pull-remote-data (assoc remote-ex :sub-type :pull-remote-data-exception)) |
| 26 | + (case (:type remote-ex) |
| 27 | + :graph-lock-failed nil |
| 28 | + :graph-lock-missing |
| 29 | + (throw r.ex/ex-remote-graph-lock-missing) |
| 30 | + :rtc.exception/get-s3-object-failed |
| 31 | + (throw (ex-info (:ex-message apply-ops-resp) (:ex-data apply-ops-resp))) |
| 32 | + ;;else |
| 33 | + (throw (ex-info "Unavailable3" {:remote-ex remote-ex})))) |
| 34 | + (do (assert (pos? (:t apply-ops-resp)) apply-ops-resp) |
| 35 | + (r.remote-update/apply-remote-update |
| 36 | + graph-uuid repo conn date-formatter {:type :remote-update :value apply-ops-resp} add-log-fn)))) |
| 37 | + |
| 38 | +(defn- new-task--init-request |
| 39 | + [get-ws-create-task graph-uuid major-schema-version repo conn *last-calibrate-t *server-schema-version add-log-fn] |
24 | 40 | (m/sp
|
25 |
| - (try |
26 |
| - (let [{remote-t :t :as resp} |
27 |
| - (m/? (ws-util/send&recv get-ws-create-task {:action "register-graph-updates" |
28 |
| - :graph-uuid graph-uuid |
29 |
| - :schema-version (str major-schema-version)}))] |
30 |
| - (rtc-log-and-state/update-remote-t graph-uuid remote-t) |
31 |
| - (when-not (client-op/get-local-tx repo) |
32 |
| - (client-op/update-local-tx repo remote-t)) |
33 |
| - resp) |
34 |
| - (catch :default e |
35 |
| - (if (= :rtc.exception/remote-graph-not-ready (:type (ex-data e))) |
36 |
| - (throw (ex-info "remote graph is still creating" {:missionary/retry true} e)) |
37 |
| - (throw e)))))) |
| 41 | + (let [t-before (client-op/get-local-tx repo) |
| 42 | + get-graph-skeleton? (or (nil? @*last-calibrate-t) |
| 43 | + (< 500 (- t-before @*last-calibrate-t)))] |
| 44 | + (try |
| 45 | + (let [t-before (client-op/get-local-tx repo) |
| 46 | + {remote-t :t |
| 47 | + server-schema-version :server-schema-version |
| 48 | + server-builtin-db-idents :server-builtin-db-idents |
| 49 | + :as resp} |
| 50 | + (m/? (ws-util/send&recv get-ws-create-task {:action "init-request" |
| 51 | + :graph-uuid graph-uuid |
| 52 | + :schema-version (str major-schema-version) |
| 53 | + :t-before (or t-before 1) |
| 54 | + :get-graph-skeleton get-graph-skeleton?}))] |
| 55 | + (if-let [remote-ex (:ex-data resp)] |
| 56 | + (do |
| 57 | + (add-log-fn :rtc.log/init-request remote-ex) |
| 58 | + (case (:type remote-ex) |
| 59 | + :graph-lock-failed nil |
| 60 | + :graph-lock-missing (throw r.ex/ex-remote-graph-lock-missing) |
| 61 | + ;; else |
| 62 | + (throw (ex-info "Unavailable4" {:remote-ex remote-ex})))) |
| 63 | + (do |
| 64 | + (when server-schema-version |
| 65 | + (reset! *server-schema-version server-schema-version) |
| 66 | + (reset! *last-calibrate-t remote-t)) |
| 67 | + (when remote-t |
| 68 | + (rtc-log-and-state/update-remote-t graph-uuid remote-t) |
| 69 | + (when (not t-before) |
| 70 | + (client-op/update-local-tx repo remote-t))) |
| 71 | + (when (and server-schema-version server-builtin-db-idents) |
| 72 | + (r.skeleton/calibrate-graph-skeleton server-schema-version server-builtin-db-idents @conn)) |
| 73 | + resp))) |
| 74 | + (catch :default e |
| 75 | + (if (= :rtc.exception/remote-graph-not-ready (:type (ex-data e))) |
| 76 | + (throw (ex-info "remote graph is still creating" {:missionary/retry true} e)) |
| 77 | + (throw e))))))) |
38 | 78 |
|
39 | 79 | (def ^:private *register-graph-updates-sent
|
40 | 80 | "ws -> [bool, added-inst, [graph-uuid,major-schema-version,repo]]"
|
|
54 | 94 | (defn ensure-register-graph-updates--memoized
|
55 | 95 | "Return a task: get or create a mws(missionary wrapped websocket).
|
56 | 96 | see also `ws/get-mws-create`.
|
57 |
| - But ensure `register-graph-updates` and `calibrate-graph-skeleton` has been sent" |
58 |
| - [get-ws-create-task graph-uuid major-schema-version repo conn |
| 97 | + But ensure `init-request` and `calibrate-graph-skeleton` has been sent" |
| 98 | + [get-ws-create-task graph-uuid major-schema-version repo conn date-formatter |
59 | 99 | *last-calibrate-t *online-users *server-schema-version add-log-fn]
|
60 | 100 | (m/sp
|
61 | 101 | (let [ws (m/? get-ws-create-task)
|
|
81 | 121 | 10000)))]
|
82 | 122 | (reset! *online-users online-users)))
|
83 | 123 | :succ (constantly nil)))
|
84 |
| - (let [{:keys [max-remote-schema-version]} |
| 124 | + (let [{:keys [max-remote-schema-version] :as init-request-resp} |
85 | 125 | (try
|
86 | 126 | (m/?
|
87 | 127 | (c.m/backoff
|
88 | 128 | {:delay-seq ;retry 5 times if remote-graph is creating (4000 8000 16000 32000 64000)
|
89 | 129 | (take 5 (drop 2 c.m/delays))
|
90 | 130 | :reset-flow worker-flows/online-event-flow}
|
91 |
| - (new-task--register-graph-updates get-ws-create-task graph-uuid major-schema-version repo))) |
| 131 | + (new-task--init-request |
| 132 | + get-ws-create-task graph-uuid major-schema-version repo conn |
| 133 | + *last-calibrate-t *server-schema-version |
| 134 | + add-log-fn))) |
92 | 135 | (catch :default e
|
93 | 136 | (swap! *register-graph-updates-sent assoc-in [ws 0] false)
|
94 | 137 | (throw e)))]
|
|
98 | 141 | max-remote-schema-version db-schema/version major-schema-version)
|
99 | 142 | :repo repo
|
100 | 143 | :graph-uuid graph-uuid
|
101 |
| - :remote-schema-version max-remote-schema-version}))) |
102 |
| - (let [t (client-op/get-local-tx repo)] |
103 |
| - (when (or (nil? @*last-calibrate-t) |
104 |
| - (< 500 (- t @*last-calibrate-t))) |
105 |
| - (let [{:keys [server-schema-version _server-builtin-db-idents]} |
106 |
| - (m/? (r.skeleton/new-task--calibrate-graph-skeleton |
107 |
| - get-ws-create-task graph-uuid major-schema-version @conn))] |
108 |
| - (reset! *server-schema-version server-schema-version)) |
109 |
| - (reset! *last-calibrate-t t)))) |
| 144 | + :remote-schema-version max-remote-schema-version})) |
| 145 | + (apply-remote-updates-from-apply-ops init-request-resp graph-uuid repo conn date-formatter add-log-fn))) |
110 | 146 | ws)))
|
111 | 147 |
|
112 | 148 | (defn- ->pos
|
|
464 | 500 | :ops [] :t-before (or local-tx 1)}
|
465 | 501 | r (m/? (ws-util/send&recv get-ws-create-task message))]
|
466 | 502 | (r.throttle/add-rtc-api-call-record! message)
|
467 |
| - (if-let [remote-ex (:ex-data r)] |
468 |
| - (do (add-log-fn :rtc.log/pull-remote-data (assoc remote-ex :sub-type :pull-remote-data-exception)) |
469 |
| - (case (:type remote-ex) |
470 |
| - :graph-lock-failed nil |
471 |
| - :graph-lock-missing (throw r.ex/ex-remote-graph-lock-missing) |
472 |
| - :rtc.exception/get-s3-object-failed nil |
473 |
| - ;;else |
474 |
| - (throw (ex-info "Unavailable3" {:remote-ex remote-ex})))) |
475 |
| - (do (assert (pos? (:t r)) r) |
476 |
| - (r.remote-update/apply-remote-update |
477 |
| - graph-uuid repo conn date-formatter {:type :remote-update :value r} add-log-fn)))))) |
| 503 | + (apply-remote-updates-from-apply-ops r graph-uuid repo conn date-formatter add-log-fn)))) |
0 commit comments