This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 206
/
gc.clj
93 lines (86 loc) · 4.33 KB
/
gc.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
(ns onyx.gc
(:require [clojure.core.async :refer [chan <!!]]
[onyx.log.entry :refer [create-log-entry]]
[onyx.extensions :as extensions]
[onyx.checkpoint :as checkpoint]
[onyx.static.default-vals :refer [arg-or-default]]
[taoensso.timbre :refer [info]]))
(defn gc-log [onyx-client]
(let [id (java.util.UUID/randomUUID)
entry (create-log-entry :gc {:id id})
ch (chan 1000)]
(extensions/write-log-entry (:log onyx-client) entry)
(loop [replica (extensions/subscribe-to-log (:log onyx-client) ch)]
(let [entry (<!! ch)
new-replica (extensions/apply-log-entry entry (assoc replica :version (:message-id entry)))]
(if (and (= (:fn entry) :gc) (= (:id (:args entry)) id))
(let [diff (extensions/replica-diff entry replica new-replica)
args {:id id :type :client :log (:log onyx-client)}]
(extensions/fire-side-effects! entry replica new-replica diff args))
(recur new-replica))))
true))
;; It is possible that a crash will cause the coordinates to not be written,
;; therefore we attempt to delete past the final checkpoint in cases where
;; we know this is not the final replica-version.
;; It is safe to do so, as we know there is a next replica-version with a successful checkpoint.
(def attempt-deletion-past-successful-checkpoint-count 5)
(defn build-checkpoint-targets [log tenancy-id job-id delete-all?]
(let [watermarks (checkpoint/read-all-replica-epoch-watermarks log tenancy-id job-id)
sorted-watermarks (sort-by :replica-version watermarks)]
(if (empty? sorted-watermarks)
[]
(let [max-rv (apply max (map :replica-version watermarks))]
(reduce
(fn [all {:keys [replica-version epoch task-data]}]
(let [epoch-low-watermark (or (checkpoint/read-replica-epoch-low-watermark log tenancy-id job-id replica-version) 1)]
(cond (< replica-version max-rv)
(conj all {:replica-version replica-version
:delete? true
:epoch-range (range epoch-low-watermark (+ (inc epoch) attempt-deletion-past-successful-checkpoint-count))
:task-data task-data})
(= replica-version max-rv)
(conj all {:replica-version replica-version
:delete? delete-all?
:epoch-range (range epoch-low-watermark epoch)
:task-data task-data})
:else all)))
[]
sorted-watermarks)))))
(defn storage-connection [peer-config log]
(if (= :zookeeper (arg-or-default :onyx.peer/storage peer-config))
log
(checkpoint/storage peer-config nil))) ;; TODO: monitoring component?
(defn gc-checkpoints [{:keys [log peer-config] :as onyx-client} tenancy-id job-id delete-all?]
(let [targets (build-checkpoint-targets log tenancy-id job-id delete-all?)
storage (storage-connection peer-config log)]
(info "Garbage collecting tenancy-id:" tenancy-id "job-id:" job-id "targets:" targets)
(reduce
(fn [result {:keys [replica-version epoch-range task-data delete?]}]
(let [rets
(reduce
(fn [result epoch]
(reduce-kv
(fn [result cp-type task-id->slots]
(reduce-kv
(fn [result task-id slots]
(reduce
(fn [result slot]
(checkpoint/gc-checkpoint! storage tenancy-id job-id
replica-version epoch task-id
slot cp-type)
(update result :checkpoints-deleted inc))
result
(range (inc slots))))
result
task-id->slots))
result
task-data))
result
epoch-range)]
(if delete?
(checkpoint/gc-replica-epoch-watermark! log tenancy-id job-id replica-version)
(when-let [max-deleted (last epoch-range)]
(checkpoint/write-replica-epoch-low-watermark log tenancy-id job-id replica-version (inc max-deleted))))
rets))
{:checkpoints-deleted 0}
targets)))