-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Faster snapshot creation #1353
Faster snapshot creation #1353
Conversation
// extra care is needed to capture outstanding deleted points | ||
let deleted_points_guard = self.deleted_points.read(); | ||
let wrapped_segment_arc = self.wrapped_segment.get(); | ||
let wrapped_segment_guard = wrapped_segment_arc.read(); | ||
|
||
// stable copy of the deleted points at the time of the snapshot | ||
let deleted_points_copy = deleted_points_guard.clone(); | ||
|
||
// parse wrapped segment id from it's data path | ||
let wrapped_segment_id = wrapped_segment_guard | ||
.data_path() | ||
.file_stem() | ||
.unwrap() | ||
.to_str() | ||
.unwrap() | ||
.to_string(); | ||
|
||
// create temporary dir for operations on wrapped segment data | ||
let tmp_dir_path = snapshot_dir_path.join(format!("segment_copy_{}", Uuid::new_v4())); | ||
create_dir_all(&tmp_dir_path)?; | ||
|
||
// snapshot wrapped segment data into the temporary dir | ||
let archive_path = wrapped_segment_guard.take_snapshot(&tmp_dir_path)?; | ||
|
||
// restore a copy of the wrapped segment data into the temporary dir | ||
Segment::restore_snapshot(&archive_path, &wrapped_segment_id)?; | ||
let archive_path = { | ||
let wrapped_segment_arc = self.wrapped_segment.get(); | ||
let wrapped_segment_guard = wrapped_segment_arc.read(); | ||
|
||
// build a path to a copy of the wrapped segment data | ||
let full_copy_path = tmp_dir_path.join(&wrapped_segment_id); | ||
// snapshot wrapped segment data into the temporary dir | ||
wrapped_segment_guard.take_snapshot(snapshot_dir_path)? | ||
}; | ||
|
||
// snapshot write_segment | ||
let write_segment_rw = self.write_segment.get(); | ||
let write_segment_guard = write_segment_rw.read(); | ||
|
||
// Write segment is not unique to the proxy segment, therefore it might overwrite an existing snapshot. | ||
write_segment_guard.take_snapshot(snapshot_dir_path)?; | ||
// guaranteed to be higher than anything in wrapped segment and does not exceed WAL at the same time | ||
let write_segment_version = write_segment_guard.version(); | ||
|
||
// unlock deleted_points as we have a stable copy | ||
drop(wrapped_segment_guard); | ||
drop(deleted_points_guard); | ||
|
||
// load copy of wrapped segment in memory | ||
let mut in_memory_wrapped_segment = load_segment(&full_copy_path)?.ok_or_else(|| { | ||
OperationError::service_error(format!( | ||
"Failed to load segment from {:?}", | ||
full_copy_path | ||
)) | ||
})?; | ||
|
||
// remove potentially deleted points from wrapped_segment | ||
for deleted_point in deleted_points_copy { | ||
in_memory_wrapped_segment.delete_point(write_segment_version, deleted_point)?; | ||
} | ||
|
||
let archive_path = in_memory_wrapped_segment.take_snapshot(snapshot_dir_path)?; | ||
|
||
// release segment resources | ||
drop(in_memory_wrapped_segment); | ||
|
||
// delete temporary copy | ||
remove_dir_all(tmp_dir_path)?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just take snapshot by means of existing function
lib/collection/src/collection_manager/holders/segment_holder.rs
Outdated
Show resolved
Hide resolved
lib/collection/src/collection_manager/holders/segment_holder.rs
Outdated
Show resolved
Hide resolved
{ | ||
// Do not change segments while snapshotting | ||
let segments_read = self.segments.read(); | ||
segments_read.snapshot_all_segments(&snapshot_segments_shard_path)?; | ||
|
||
// snapshot all shard's WAL | ||
self.snapshot_wal(snapshot_shard_path)?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hold segments
read lock to ensure, that segments list is not modified during the snapshot (optimization does not interfere)
|
||
// snapshot all shard's WAL | ||
self.snapshot_wal(snapshot_shard_path)?; | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this whole block should be running within tokio::task::spawn_blocking
as it performs a lot of disk operations on a regular tokio thread.
|
||
// remove potentially deleted points from wrapped_segment | ||
for deleted_point in deleted_points_copy { | ||
in_memory_wrapped_segment.delete_point(write_segment_version, deleted_point)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We were previously deleting explicitly the deleted points in the wrapped segment based on the proxy information.
How is the the deduplication strategy handling those deleted points in the proxy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted points are handled by records in WAL. The operations in WAL will be re-applied to all segments and if we removed something - it will also be removed on recovery.
let sid1 = holder.add(segment1); | ||
let sid2 = holder.add(segment2); | ||
|
||
let res = holder.deduplicate_points().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could have a second call to deduplicate_points
at the end asserting that it found zero points that time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't get that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am proposing to call deduplicate_points
a second time to assert that the function has a side effect.
It should return 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for debug builds only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just in this test.
This was a nitpick, I approved the PR 👍
dad1d34
to
d6d7a33
Compare
* deduplication after proxy snapshot recovery * revert changes in apply_points_to_appendable * clippy fixes * spawn blocking in snapshot creation
The current process of snapshot creation might be slow, if shard is currently under optimization.
But instead of trying to save proper state of the segments, why don't we just fix all problems on the load?
This PR significantly simplifies process of snapshot creation, while also introducing additional checks for recovery and points updates:
There is also an async removal of tmp files, I don't expect a significant speedup there, just nice-to-have optimization