From bf2912f1ded7f9276545a09dcdcee00ef33b625c Mon Sep 17 00:00:00 2001 From: Dan Stone Date: Thu, 25 Aug 2022 17:22:00 +0100 Subject: [PATCH] Apply doc-count batching policy to transactions before pipelining Partition transactions according to number of referenced docs, so that many small transactions can benefit from batching without overwhelming existing fetch impls or exhausting memory/cpu resources. Relates to #1800 - may fix --- core/src/xtdb/tx.clj | 35 +++++++++++++++++++++++++---------- test/test/xtdb/tx_test.clj | 20 ++++++++++++++++++++ 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/core/src/xtdb/tx.clj b/core/src/xtdb/tx.clj index 9c5d312de7..9482b49532 100644 --- a/core/src/xtdb/tx.clj +++ b/core/src/xtdb/tx.clj @@ -464,13 +464,16 @@ :else true))) -(defn ->tx-ingester {::sys/deps {:tx-indexer :xtdb/tx-indexer +(defn ->tx-ingester {::sys/args {:batch-preferred-doc-count + {:default 1024 + :spec ::sys/pos-int}} + ::sys/deps {:tx-indexer :xtdb/tx-indexer :index-store :xtdb/index-store :document-store :xtdb/document-store :tx-log :xtdb/tx-log :bus :xtdb/bus :secondary-indices :xtdb/secondary-indices}} - [{:keys [tx-log tx-indexer document-store bus index-store secondary-indices]}] + [{:keys [tx-log tx-indexer document-store bus index-store secondary-indices batch-preferred-doc-count]}] (log/info "Started tx-ingester") (let [!error (atom nil) @@ -489,7 +492,22 @@ (when (compare-and-set! !error nil t) (when-not (instance? InterruptedException t) (log/fatal t "Ingester error occurred")) - (bus/send bus {::xt/event-type ::ingester-error, :ingester-error t})))] + (bus/send bus {::xt/event-type ::ingester-error, :ingester-error t}))) + + ;; See https://github.com/xtdb/xtdb/pull/1808 + (batch-transactions [txs] + (lazy-seq + (loop [batch [] + doc-hashes #{} + [tx :as txs] txs] + (if-not (seq txs) + (when (seq batch) [{:batch batch, :doc-hashes doc-hashes}]) + (let [new-batch (conj batch tx) + new-doc-hashes (txc/tx-events->doc-hashes (::txe/tx-events tx)) + unioned-doc-hashes (into doc-hashes new-doc-hashes)] + (if (<= (long batch-preferred-doc-count) (count unioned-doc-hashes)) + (cons {:batch batch, :doc-hashes doc-hashes} (batch-transactions (rest txs))) + (recur new-batch unioned-doc-hashes (rest txs))))))))] ;; catching all the secondary indices up to where XTDB is (when (and latest-xtdb-tx-id (seq secondary-indices)) @@ -553,12 +571,8 @@ (db/index-tx-docs in-flight-tx docs) (submit-job! txs-index-executor txs-index-fn (assoc m :tx tx :in-flight-tx in-flight-tx))))) - (txs-doc-fetch-fn [txs] - (let [docs (strict-fetch-docs document-store - (->> txs - (into #{} - (comp (map ::txe/tx-events) - (mapcat txc/tx-events->doc-hashes))))) + (txs-doc-fetch-fn [txs doc-hashes] + (let [docs (strict-fetch-docs document-store doc-hashes) txs (doall (for [tx txs] {:tx tx @@ -569,7 +583,8 @@ (when (Thread/interrupted) (throw (InterruptedException.))) - (submit-job! txs-docs-fetch-executor txs-doc-fetch-fn txs) + (doseq [{:keys [batch doc-hashes]} (batch-transactions txs)] + (submit-job! txs-docs-fetch-executor txs-doc-fetch-fn batch doc-hashes)) (catch Throwable t (set-ingester-error! t) diff --git a/test/test/xtdb/tx_test.clj b/test/test/xtdb/tx_test.clj index e50453697b..429a615fae 100644 --- a/test/test/xtdb/tx_test.clj +++ b/test/test/xtdb/tx_test.clj @@ -1519,3 +1519,23 @@ (t/is (= #{[:foo]} (xt/q db '{:find [e], :where [[e :xt/id]]}))))) + +;; https://github.com/xtdb/xtdb/pull/1808 (try to catch batching regression) +(t/deftest mix-sized-transaction-test + (fix/with-tmp-dirs #{db-dir} + (let [ctr (atom 0) + cfg {:xtdb/tx-log {:kv-store {:xtdb/module 'xtdb.rocksdb/->kv-store + :db-dir (io/file db-dir "txs")}} + :xtdb/document-store {:kv-store {:xtdb/module 'xtdb.rocksdb/->kv-store + :db-dir (io/file db-dir "docs")}}}] + (with-open [node (xt/start-node cfg)] + (doseq [tx-doc-count (concat + (range 1 10) + (range 1 100) + (take 10 (cycle [42 100 42 1000 2000]))) + :let [puts (map (fn [n] [::xt/put {:xt/id n ::n n}]) (range @ctr (+ @ctr tx-doc-count)))]] + (xt/submit-tx node puts) + (swap! ctr + tx-doc-count))) + (with-open [node (xt/start-node cfg)] + (xt/sync node) + (t/is (= #{[@ctr]} (xt/q (xt/db node) '{:find [(count e)] :where [[e ::n]]})))))))