perf: optimize reading transactions in commit loop#3117
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3117 +/- ##
==========================================
- Coverage 77.92% 77.87% -0.06%
==========================================
Files 242 242
Lines 81738 82015 +277
Branches 81738 82015 +277
==========================================
+ Hits 63695 63869 +174
- Misses 14861 14940 +79
- Partials 3182 3206 +24
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
9b88d10 to
063cb88
Compare
063cb88 to
5937da9
Compare
westonpace
left a comment
There was a problem hiding this comment.
Just minor thoughts (though it looks like CI is failing)
|
|
||
| pub fn size(&self) -> usize { | ||
| if let Some(cache) = self.cache.as_ref() { | ||
| cache.sync(); |
There was a problem hiding this comment.
Is this mainly for testing purposes? Or are there non-testing reasons we need this to be accurate? I don't think it's a problem as we shouldn't be calling size in a loop so seems fine, just curious.
| // If read_version is zero, then it might not have originally been | ||
| // passed. We can assume the latest version. | ||
| if transaction.read_version > 0 { | ||
| builder = builder.with_version(transaction.read_version) |
There was a problem hiding this comment.
Do we not need to pass in the read_version anymore?
There was a problem hiding this comment.
I think, we only need to load the read_version if it's a detached commit. I've fixed this in the latest commits.
| .and_then(|(version, other_transaction)| { | ||
| let res = check_transaction( | ||
| transaction, | ||
| version, | ||
| Some(other_transaction.as_ref()), | ||
| ); | ||
| futures::future::ready(res) | ||
| }) | ||
| .try_all(|_| futures::future::ready(true)) |
There was a problem hiding this comment.
Can you make the and_then a try_for_each? Then you don't need the try_all?
| #[derive(Debug)] | ||
| struct IoTrackingMultipartUpload { | ||
| target: Box<dyn MultipartUpload>, | ||
| stats: Arc<Mutex<IoStats>>, | ||
| } | ||
|
|
||
| #[async_trait::async_trait] | ||
| impl MultipartUpload for IoTrackingMultipartUpload { | ||
| async fn abort(&mut self) -> OSResult<()> { | ||
| self.target.abort().await | ||
| } | ||
|
|
||
| async fn complete(&mut self) -> OSResult<PutResult> { | ||
| self.target.complete().await | ||
| } | ||
|
|
||
| fn put_part(&mut self, payload: PutPayload) -> UploadPart { | ||
| { | ||
| let mut stats = self.stats.lock().unwrap(); | ||
| stats.write_iops += 1; | ||
| stats.write_bytes += payload.content_length() as u64; | ||
| } | ||
| self.target.put_part(payload) | ||
| } | ||
| } |
Closes #3057