Skip to content

Commit

Permalink
some marginally-better backwards compatibility for removing the evict…
Browse files Browse the repository at this point in the history
… time ranges #428

- if we come across a legacy evict event on a Kafka log, we throw, stopping the indexer, and get the user to manually intervene
- if the user sets CRUX_EVICT_TIME_RANGES=EVICT_ALL, we ignore the valid-time range and evict the doc for all vt.
  • Loading branch information
jarohen committed Nov 25, 2019
1 parent 90cc625 commit 4fadb5b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
29 changes: 26 additions & 3 deletions crux-core/src/crux/tx.clj
Expand Up @@ -96,10 +96,27 @@
tx-id)
(c/->id-buffer new-v)]]}))

(defn tx-command-evict [indexer kv object-store snapshot tx-log [op k] transact-time tx-id]
(def evict-time-ranges-env-var "CRUX_EVICT_TIME_RANGES")
(def ^:dynamic evict-all-on-legacy-time-ranges? (= (System/getenv evict-time-ranges-env-var) "EVICT_ALL"))

(defn tx-command-evict [indexer kv object-store snapshot tx-log [op k & legacy-args] transact-time tx-id]
(let [eid (c/new-id k)
history-descending (idx/entity-history snapshot eid)]
{:post-commit-fn #(when tx-log
{:pre-commit-fn #(cond
(empty? legacy-args) true

;; we throw here rather than return falsy because we want the indexer to stop,
;; and the user to explicitly do something about it
(not evict-all-on-legacy-time-ranges?)
(throw (IllegalArgumentException. (str "Evict no longer supports time-range parameters. "
"See https://github.com/juxt/crux/pull/438 for more details, and what to do about this message.")))

:else (do
(log/warnf "Evicting '%s' for all valid-times, '%s' set"
k evict-time-ranges-env-var)
true))

:post-commit-fn #(when tx-log
(doseq [^EntityTx entity-tx history-descending]
;; TODO: Direct interface call to help
;; Graal, not sure why this is needed,
Expand Down Expand Up @@ -175,7 +192,7 @@
:when post-commit-fn]
(post-commit-fn))})))))

(defn tx-command-unknown [indexer kv object-store snapshot tx-log [op k start-valid-time end-valid-time keep-latest? keep-earliest?] transact-time tx-id]
(defn tx-command-unknown [indexer kv object-store snapshot tx-log [op & _] transact-time tx-id]
(throw (IllegalArgumentException. (str "Unknown tx-op: " op))))

(def ^:private tx-op->command
Expand All @@ -198,24 +215,30 @@
db/Indexer
(index-doc [_ content-hash doc]
(log/debug "Indexing doc:" content-hash)

(when (not (contains? doc :crux.db/id))
(throw (IllegalArgumentException.
(str "Missing required attribute :crux.db/id: " (pr-str doc)))))

(let [content-hash (c/new-id content-hash)
evicted? (idx/evicted-doc? doc)]
(when-let [normalized-doc (if evicted?
(when-let [existing-doc (with-open [snapshot (kv/new-snapshot kv)]
(db/get-single-object object-store snapshot content-hash))]
(idx/delete-doc-from-index kv content-hash existing-doc))

(idx/index-doc kv content-hash doc))]

(let [stats-fn #(idx/update-predicate-stats kv evicted? normalized-doc)]
(if stats-executor
(.submit stats-executor ^Runnable stats-fn)
(stats-fn))))

(db/put-objects object-store [[content-hash doc]])))

(index-tx [this tx-events tx-time tx-id]
(s/assert :crux.tx.event/tx-events tx-events)

(with-open [snapshot (kv/new-snapshot kv)]
(binding [*current-tx* {:crux.tx/tx-id tx-id
:crux.tx/tx-time tx-time
Expand Down
10 changes: 8 additions & 2 deletions crux-core/src/crux/tx/event.clj
Expand Up @@ -25,9 +25,15 @@
:new-doc id?
:at-valid-time (s/? date?)))

;;; TODO (JH) can we remove the previous valid-time range params from here, if there are still events with these params in Kafka?
;;; We can't remove the previous valid-time range params from here,
;;; if there are still events with these params in Kafka.
;;; This is checked in the KvIndexer.
(defmethod tx-event :crux.tx/evict [_] (s/cat :op #{:crux.tx/evict}
:id id?))
:id id?
:start-valid-time (s/? date?)
:end-valid-time (s/? date?)
:keep-latest? (s/? boolean?)
:keep-earliest? (s/? boolean?)))

(defmethod tx-event :crux.tx/fn [_] (s/cat :op #{:crux.tx/fn}
:fn-id id?
Expand Down
38 changes: 38 additions & 0 deletions crux-test/test/crux/tx_test.clj
Expand Up @@ -235,6 +235,44 @@
(t/testing "eviction removes docs"
(t/is (empty? (db/get-objects (:object-store *api*) snapshot (keep :content-hash picasso-history)))))))))

(t/deftest test-handles-legacy-evict-events
(let [{put-tx-time ::tx/tx-time, put-tx-id ::tx/tx-id} (api/submit-tx *api* [[:crux.tx/put picasso #inst "2018-05-21"]])

_ (api/sync *api* put-tx-time nil)

evict-tx-time #inst "2018-05-22"
evict-tx-id (inc put-tx-id)

index-evict! #(db/index-tx (:indexer *api*)
[[:crux.tx/evict picasso-id #inst "2018-05-23"]]
evict-tx-time
evict-tx-id)]

;; we have to index these manually because the new evict API won't allow docs
;; with the legacy valid-time range
(t/testing "eviction throws if legacy params and no explicit override"
(t/is (thrown-with-msg? IllegalArgumentException
#"^Evict no longer supports time-range parameters."
(index-evict!))))

(t/testing "no docs evicted yet"
(with-open [snapshot (kv/new-snapshot (:kv-store *api*))]
(t/is (seq (db/get-objects (:object-store *api*) snapshot
(->> (idx/entity-history snapshot picasso-id)
(keep :content-hash)))))))

(binding [tx/evict-all-on-legacy-time-ranges? true]
(index-evict!))

;; give the evict loopback time to evict the doc
(Thread/sleep 500)

(t/testing "eviction removes docs"
(with-open [snapshot (kv/new-snapshot (:kv-store *api*))]
(t/is (empty? (db/get-objects (:object-store *api*) snapshot
(->> (idx/entity-history snapshot picasso-id)
(keep :content-hash)))))))))

(t/deftest test-can-store-doc
(let [content-hash (c/new-id picasso)]
(t/is (= 48 (count picasso)))
Expand Down

0 comments on commit 4fadb5b

Please sign in to comment.