Skip to content

Commit

Permalink
Apply doc-count batching policy to transactions before pipelining
Browse files Browse the repository at this point in the history
Partition transactions according to number of referenced docs, so that many small transactions can benefit from batching without overwhelming existing fetch impls or exhausting memory/cpu resources.

Relates to #1800 - may fix
  • Loading branch information
wotbrew committed Aug 26, 2022
1 parent ed53f5b commit bf2912f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
35 changes: 25 additions & 10 deletions core/src/xtdb/tx.clj
Expand Up @@ -464,13 +464,16 @@
:else
true)))

(defn ->tx-ingester {::sys/deps {:tx-indexer :xtdb/tx-indexer
(defn ->tx-ingester {::sys/args {:batch-preferred-doc-count
{:default 1024
:spec ::sys/pos-int}}
::sys/deps {:tx-indexer :xtdb/tx-indexer
:index-store :xtdb/index-store
:document-store :xtdb/document-store
:tx-log :xtdb/tx-log
:bus :xtdb/bus
:secondary-indices :xtdb/secondary-indices}}
[{:keys [tx-log tx-indexer document-store bus index-store secondary-indices]}]
[{:keys [tx-log tx-indexer document-store bus index-store secondary-indices batch-preferred-doc-count]}]
(log/info "Started tx-ingester")

(let [!error (atom nil)
Expand All @@ -489,7 +492,22 @@
(when (compare-and-set! !error nil t)
(when-not (instance? InterruptedException t)
(log/fatal t "Ingester error occurred"))
(bus/send bus {::xt/event-type ::ingester-error, :ingester-error t})))]
(bus/send bus {::xt/event-type ::ingester-error, :ingester-error t})))

;; See https://github.com/xtdb/xtdb/pull/1808
(batch-transactions [txs]
(lazy-seq
(loop [batch []
doc-hashes #{}
[tx :as txs] txs]
(if-not (seq txs)
(when (seq batch) [{:batch batch, :doc-hashes doc-hashes}])
(let [new-batch (conj batch tx)
new-doc-hashes (txc/tx-events->doc-hashes (::txe/tx-events tx))
unioned-doc-hashes (into doc-hashes new-doc-hashes)]
(if (<= (long batch-preferred-doc-count) (count unioned-doc-hashes))
(cons {:batch batch, :doc-hashes doc-hashes} (batch-transactions (rest txs)))
(recur new-batch unioned-doc-hashes (rest txs))))))))]

;; catching all the secondary indices up to where XTDB is
(when (and latest-xtdb-tx-id (seq secondary-indices))
Expand Down Expand Up @@ -553,12 +571,8 @@
(db/index-tx-docs in-flight-tx docs)
(submit-job! txs-index-executor txs-index-fn (assoc m :tx tx :in-flight-tx in-flight-tx)))))

(txs-doc-fetch-fn [txs]
(let [docs (strict-fetch-docs document-store
(->> txs
(into #{}
(comp (map ::txe/tx-events)
(mapcat txc/tx-events->doc-hashes)))))
(txs-doc-fetch-fn [txs doc-hashes]
(let [docs (strict-fetch-docs document-store doc-hashes)
txs (doall
(for [tx txs]
{:tx tx
Expand All @@ -569,7 +583,8 @@
(when (Thread/interrupted)
(throw (InterruptedException.)))

(submit-job! txs-docs-fetch-executor txs-doc-fetch-fn txs)
(doseq [{:keys [batch doc-hashes]} (batch-transactions txs)]
(submit-job! txs-docs-fetch-executor txs-doc-fetch-fn batch doc-hashes))

(catch Throwable t
(set-ingester-error! t)
Expand Down
20 changes: 20 additions & 0 deletions test/test/xtdb/tx_test.clj
Expand Up @@ -1519,3 +1519,23 @@

(t/is (= #{[:foo]}
(xt/q db '{:find [e], :where [[e :xt/id]]})))))

;; https://github.com/xtdb/xtdb/pull/1808 (try to catch batching regression)
(t/deftest mix-sized-transaction-test
(fix/with-tmp-dirs #{db-dir}
(let [ctr (atom 0)
cfg {:xtdb/tx-log {:kv-store {:xtdb/module 'xtdb.rocksdb/->kv-store
:db-dir (io/file db-dir "txs")}}
:xtdb/document-store {:kv-store {:xtdb/module 'xtdb.rocksdb/->kv-store
:db-dir (io/file db-dir "docs")}}}]
(with-open [node (xt/start-node cfg)]
(doseq [tx-doc-count (concat
(range 1 10)
(range 1 100)
(take 10 (cycle [42 100 42 1000 2000])))
:let [puts (map (fn [n] [::xt/put {:xt/id n ::n n}]) (range @ctr (+ @ctr tx-doc-count)))]]
(xt/submit-tx node puts)
(swap! ctr + tx-doc-count)))
(with-open [node (xt/start-node cfg)]
(xt/sync node)
(t/is (= #{[@ctr]} (xt/q (xt/db node) '{:find [(count e)] :where [[e ::n]]})))))))

0 comments on commit bf2912f

Please sign in to comment.