-
Notifications
You must be signed in to change notification settings - Fork 526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): sync shared buffer based on threshold #1671
Conversation
I will append the e2e benchmark results later. |
src/compute/src/server.rs
Outdated
storage_config.as_ref(), | ||
registry.clone(), | ||
)); | ||
log::info!("State store shared buffer threshold {} MB", storage_config.shared_buffer_threshold_mb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already printed all configs in L74 so I think there is no need to print shared_buffer_threshold_mb here.
pub shared_buffer_cur_size: atomic::AtomicU64, | ||
pub shared_buffer_threshold_size: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two fields are used solely by SharedBufferFlusher
and not exposed. Should we track shared_buffer_cur_size
in SharedBufferManager
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix
src/storage/src/hummock/mod.rs
Outdated
while self.stats.shared_buffer_threshold_size <= shared_buff_cur_size { | ||
yield_now().await; | ||
shared_buff_cur_size = self.stats.shared_buffer_cur_size.load(atomic::Ordering::SeqCst); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passively waiting for shared buffer (can wait for up to the SharedBufferFlusher check interval 10ms) to free up in a while loop is less optimal than proactively triggering a sync immediately. Also I think we should have two thresholds for shared buffer size:
size_low_watermark
: start syncing to S3 whenshared_buffer_cur_size
>size_low_watermark
size_high_watermark
haltingest_batch
whenshared_buffer_cur_size
>size_high_watermark
. Moreover,ingest_batch
should trigger a sync when there is no ongoing sync happening at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very helpful! You are right. With low and high watermark can achieve a more smooth strategy.
It's weird. The Grafana diagram reflects that executors can still ingest batch even reach shared buffer threshold. If use an older version as baseline (ae9222d, the version I first implement the code), the sync throughput is higher than the ingest. But the compute node still get killed due to OOM in both cases (minIO as storage). |
src/storage/src/hummock/mod.rs
Outdated
.load(atomic::Ordering::SeqCst); | ||
// yield current task if threshold has been reached | ||
while self.stats.shared_buffer_threshold_size <= shared_buff_cur_size { | ||
yield_now().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to have a guarantee about scheduling. In the worst cases, this will become a busy spin-loop. Suggest using watch
notifier here.
pub mod shared_buffer_batch; | ||
pub mod shared_buffer_flusher; | ||
pub mod shared_buffer_manager; | ||
pub mod shared_buffer_uploader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to mention, the shared_buffer_
prefix here seems to be redundant. 🤣
shared_buff_prev_size, | ||
sync_size | ||
); | ||
assert!(shared_buff_prev_size >= sync_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to assert this since sub-overflow will cause panic in L122.
pub shared_buffer_cur_size: atomic::AtomicU64, | ||
pub shared_buffer_threshold_size: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
src/storage/src/store.rs
Outdated
@@ -25,14 +25,15 @@ use crate::write_batch::WriteBatch; | |||
pub trait GetFutureTrait<'a> = Future<Output = Result<Option<Bytes>>> + Send; | |||
pub trait ScanFutureTrait<'a, R, B> = Future<Output = Result<Vec<(Bytes, Bytes)>>> + Send; | |||
pub trait EmptyFutureTrait<'a> = Future<Output = Result<()>> + Send; | |||
pub trait IngestBatchFutureTrait<'a> = Future<Output = Result<u64>> + Send; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider updating the doc for ingest_batch
as well.
Once the threshold is not reached, all waiting writers seem to have chances for writing their chunks. So this may also lead to OOM. A simple design might be to use a semaphore with initial permits reflecting the size threshold. When a writer needs to ingest a batch, it can try acquiring some permits according to the size of the batch. 🤣 |
c5ce0ba
to
606899e
Compare
Codecov Report
@@ Coverage Diff @@
## main #1671 +/- ##
==========================================
+ Coverage 71.33% 71.43% +0.10%
==========================================
Files 605 606 +1
Lines 79168 79481 +313
==========================================
+ Hits 56474 56778 +304
- Misses 22694 22703 +9
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Great work! Thanks for the PR.
Can we have more complicated test setups, like TPC-H with joins? I'd like to see if barrier can successfully pass through the system compared with the status before this PR. |
let mut res = Ok(()); | ||
let sync_res = rx.await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not let sync_size = rx.await.unwrap()?;
here?
.stats | ||
.shared_buffer_cur_size | ||
.fetch_sub(sync_size, Ordering::SeqCst); | ||
assert!(shared_buff_prev_size >= sync_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this assertion.
)); | ||
} | ||
} else { | ||
self.ongoing_flush.init(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if there're two coroutines running here at the same time? Seems this will panic on assertion. The subscribe
and init
should be executed atomically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a problem.
// Wait for notification | ||
rx.changed().await.unwrap(); | ||
if !(*rx.borrow()) { | ||
res = Err(HummockError::shared_buffer_error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Directly return here.
HummockValue::Put(_, val) => VALUE_META_SIZE + val.len(), | ||
HummockValue::Delete(_) => VALUE_META_SIZE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the inner implementation of HummockValue
changes in the future? IIRC, there is a function of encoded_len
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
encoded_len
seems to be the length in the SST file. But we need the size of kv pair in memory. May be there is a sizeof(HummockValue)
in Rust?
pub async fn write_batch( | ||
&self, | ||
batch: Vec<SharedBufferItem>, | ||
epoch: HummockEpoch, | ||
) -> HummockResult<u64> { | ||
let batch = SharedBufferBatch::new(batch, epoch); | ||
let size = batch.size; | ||
|
||
self.allocate_space(size).await?; | ||
|
||
self.shared_buffer | ||
.write() | ||
.entry(epoch) | ||
.or_insert(BTreeMap::new()) | ||
.insert(batch.end_user_key().to_vec(), batch.clone()); | ||
self.uploader_tx | ||
.send(SharedBufferUploaderItem::Batch(batch)) | ||
.map_err(HummockError::shared_buffer_error) | ||
.map_err(HummockError::shared_buffer_error)?; | ||
Ok(size) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider that there are 3 writers entering L181. Writer 1 exceeded the threshold and triggered flush, writer 2 advanced the current size by some number, then writer 3 exceeded the threshold and triggered flush again. But none of the 3 writers stepped to L183 at the time. It seems that in this case the shared buffer will be flushed twice (the second time flushed an empty buffer), and the shared buffer will still OOM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the multiple flush concern, in the flush()
method, only the first writer will send the flush request, and other writers will see there is an ongoing flush and subscribe to the flush notification.
And for the scenario in your case, I assume all three writers are on the same compute node. Then writer 2 cannot advance the size until writer 1 finished the flush.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But after writer 1 finish flushing and before writing shared buffer, there is still a chance that writer 2 and 3 can call flush again.
Sure, but the tpch-bench cannot run against risingwave now. The java frontend complains "could not find source catalog for x". It seems that the support of kafka source is broken now (maybe under refactoring). |
Rust frontend should have supported it. May join #proj-connector for more information. |
What's changed and what's your intention?
Add a threshold size to hummock shared buffer and sync batches to S3 if there is no enough space for incoming batches
shared_buffer_cur_size
andshared_buffer_threshold_size
toSharedBufferManager
write_batch()
will first allocate space for the incoming batchsync(None)
to flush the shared bufferChecklist
Refer to a related PR or issue link (optional)
close #990