-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial local loglet implementation #1302
Conversation
I'll investigate e2e tests. |
@tillrohrmann e2e were failing on timeout, I've reduced the default batching latency and it's now ready for your review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @AhmedSoliman. Can't wait to see the local loglet in action :-) My only relevant comment is about the place where we decide about the offset index. It looks as if it could lead in very rare cases to missing a record when reading.
// Most of the changes are highly temporal, we try to delay flushing | ||
// As much as we can to increase the chances to observe a deletion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this comment really apply to the log workload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, copy pasta :) will remove.
pub struct Options { | ||
pub rocksdb_threads: usize, | ||
pub rocksdb_disable_statistics: bool, | ||
pub rocksdb_disable_wal: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to allow disabling the wal for the local loglet? This might be a lurking footgun?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to leave the option available but what matters more that LogStore supports it (planning your reuse parts of it later)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
pub fn new(storage_path: &Path, raw_options: &serde_json::Value) -> Arc<Self> { | ||
let opts = | ||
serde_json::from_value(raw_options.clone()).expect("to be able to deserialize options"); | ||
// todo: implement loglet loading error handling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we have error handling for loglet providers, could we move the log_writer creation to the new
function? Then we wouldn't need the OnceLock
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the answer would be no since we need to start the log writer to obtain the handle which we set in the OnceLock
.
pub fn shutdown(&self) { | ||
let start = Instant::now(); | ||
if let Err(e) = self.db.flush_wal(true) { | ||
warn!("Failed to flush local loglet rocksdb WAL: {}", e); | ||
} | ||
self.db.cancel_all_background_work(true); | ||
debug!( | ||
"Local loglet clean rocksdb shutdown took {:?}", | ||
start.elapsed(), | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice :-) Should we do something similar for the graceful shutdown of the PP state storage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. I think potentially yes. I'm being extra defensive for the logstore part, not sure if this will bring immediate value to PP state storage at the moment though.
receive: watch::Receiver<LogletOffset>, | ||
} | ||
|
||
impl OffsetWatch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice utility 🤩
/// SmallVec is used to avoid heap allocation for the common case of small | ||
/// number of updates. | ||
updates: SmallVec<[LogStateUpdate; 1]>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice :-)
ad09fb9
to
fa11705
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @AhmedSoliman. Making the offset generation and enqueuing into the request channel atomic is good temporary solution for the problem described before. +1 for merging this PR :-)
// receiver.await.unwrap_or_else(|_| { | ||
// warn!("Unsure if the local loglet record was written, the ack channel was dropped"); | ||
// Err(Error::Shutdown(ShutdownError)) | ||
// }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be removed.
pub struct Options { | ||
pub rocksdb_threads: usize, | ||
pub rocksdb_disable_statistics: bool, | ||
pub rocksdb_disable_wal: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
First take on implementing a local loglet on rocksdb. This also sets it as the default. Along with the previous PR in the stack, single node durability is restored.
🥳 |
Initial local loglet implementation
First take on implementing a local loglet on rocksdb. This also sets it as the default. Along with the previous PR in the stack, single node durability is restored.