-
Notifications
You must be signed in to change notification settings - Fork 552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage/mvlog: introduce batch collection to be used in new log impl #17358
Conversation
} | ||
} | ||
|
||
TEST(BatchCollectorTest, TestDataTooHigh) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥
Adds an error type to be used in the new multi-version log.
Adds an enum to be used along the read path.
Adds an abstraction that will be used on the read path to collect record batches. This abstraction encapsulates some of the behavior that exists in storage::log_segment_batch_reader and storage::skipping_consumer in that its role is to determine whether it should collect a given record as a part of a read, and then collect it. Some later changes will introduce a new entry abstraction that will wrap the entire record batch header and body to be fed into this collector. I considered implementing the existing batch_consumer interface and encapsulating some bits of the segment batch reader into a new reader class, but felt like the batch_consumer overcomplicated the business logic of batch collection.
5bf4b43
to
8fb5985
Compare
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/46929#018e81f1-ef35-4eca-a427-70e829dc9381 |
@@ -0,0 +1,13 @@ | |||
enable_clang_tidy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚡
Seastar::seastar | ||
v::base | ||
v::bytes | ||
v::storage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something i did when i was working on io::
that i think was nice was to avoid all but the bare essential dependencies for as long as possible. do we need v::storage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this a bit offline. We'll only end up bringing a couple classes out of storage. In the near term, that'll be some serialization utils and the log_reader_config
. I'll consider either moving the generic stuff into its own module, or copying over just what I need to this module, though likely as follow up
} | ||
|
||
private: | ||
static constexpr size_t default_max_buffer_size = 32_KiB; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
surprised that this doesn't need to be defined up above where it's used. oh well, i guess if it compiles!
const size_t target_max_buffer_size_; | ||
|
||
// The last offset seen by this collector. | ||
model::offset last_offset_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does the initial value of offset::min
mean? that no offset has yet been seen sort of like std::optional<offset>
? i guess min() also works out for the comparison to the added batch offset without any extra checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right exactly. This is a good call out -- it's probably worth switching over to optional<>
rather than sentinel values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with some planning, it is often really nice to do something like have the constructor take the first batch and then there is no special initial state.
// TODO: add ghost batch building here. | ||
|
||
cur_buffer_size_ += batch_hdr.size_bytes; | ||
batch_hdr.ctx.term = cur_term_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it so the term isn't stored in the batches on disk, we're going to mix that in as we go along from some other source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, at least in my local branch, the term is going to be stored as a part of the entry body envelope for record batches. For now I'm just reusing the disk serialization for record batches we have today in segment appender, rather than using the envelope, since we already have an envelope to wrap the record batch guts.
|
||
cur_buffer_size_ += batch_hdr.size_bytes; | ||
batch_hdr.ctx.term = cur_term_; | ||
batches_.emplace_back(model::record_batch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't emplace_back let you pass in the ctor parameters directly instead of using the move constructor of record_batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops! Yes. Will follow up
result<reader_outcome, errc> set_term(model::term_id new_term) noexcept; | ||
|
||
// Releases the batches to the caller. | ||
ss::circular_buffer<model::record_batch> release_batches() noexcept { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you consider simplifying the interface by making batch_collector non-reusable? then release_batches could effectively be r-value qualified, and the term would be passed into the constructor and avoid resetting cur_buffer_size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did, but I ended up preferring the conceptual simplicity of one batch collector per log reader. Because a log reader may span multiple segments, and because each segment will need to add to a collector, I found it easier to reason about lifecycle of the collector by tying its lifetime to the log reader.
I don't have the full read path working end-to-end yet, but these bits haven't been changing much so I figured I'd push it to get some early feedback.
This PR introduces some foundational types, and a building block for the read path to be used in an upcoming implementation of a storage log that handles concurrency with MVCC (hence the added namespace
mvlog
).This log's basic unit of data will be an "entry", which may be a record batch, or another kind (e.g. term marker, truncation marker, etc). Ultimately though, this log will need to be able to implement the
model::record_batch_reader::impl
, and to that end, this PR introduces abatch_collector
abstraction that will be used as a building block for the reader.The batch collector takes inspiration from the existing parser and consumer implementations in the storage layer, but focuses exclusively on logical checks and invariants of the data. The idea here will be that this collector will be owned by the reader implementation to collect record batches across multiple segments, with the collector indicating to the higher level reader lifecycle signals like being done or being full.
Backports Required
Release Notes