-
Notifications
You must be signed in to change notification settings - Fork 552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
archival: Adjacent segment merging #8238
archival: Adjacent segment merging #8238
Conversation
fbc2116
to
579bc91
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just leaving some high level thoughts for now.
return o; | ||
} | ||
|
||
upload_housekeeping_service::upload_housekeeping_service( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm interested in the overarching design of this, having a single loop that goes through available tasks and executing them. Long term I think it can be helpful to sort/prioritize work fairly based on the current needs of the system.
That said, was this out of necessity? Could adjacent segment merging have just been added to the existing archiver housekeeping path? Curious why the architectural shift?
src/v/archival/segment_reupload.cc
Outdated
"offset: {}", | ||
_manifest.get_ntp(), | ||
start_offset); | ||
return {}; | ||
} | ||
|
||
const auto& segment = *it; | ||
if (segment->finished_self_compaction()) { | ||
auto is_compacted = segment->finished_self_compaction(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable names are confusing here: "is_compacted" refers to whether the segment has been compacted yet, but non_compacted refers to the configuration of the topic. Maybe change these to is_segment_compacted and is_topic_compacted or similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/v/config/configuration.cc
Outdated
"Smallest acceptable segment size in the cloud storage. Default: " | ||
"cloud_storage_segment_size_target/2", | ||
{.needs_restart = needs_restart::no, .visibility = visibility::tunable}, | ||
std::nullopt) | ||
, cloud_storage_upload_ctrl_update_interval_ms( | ||
*this, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please can you add an additional bool configuration for entirely switching off the feature (i.e. not registering any housekeeping jobs at all), in case we need it in the field.
We can mostly disable it by setting the size target to something small, but I would like to be able to switch off even the scheduling + inspection of the manifest.
63899b6
to
5ba4f52
Compare
|
||
namespace archival { | ||
|
||
enum class housekeeping_state { | ||
// Housekeeping is on pause |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe say "Housekeeping is idle" to avoid confusion with the pause
state.
|
||
ss::future<> run(retry_chain_node& rtc) override { | ||
vlog(test_log.info, "mock job executed"); | ||
executed++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: executed
actually means started here, right? We increment executed
before the job finished.
super().setUp() # topic is created here | ||
|
||
def tearDown(self): | ||
self.s3_client.empty_bucket(self.s3_bucket_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of s3_client
changed to cloud_storage_client
to support Azure parametrisation.
def _find_partition_manifests(self): | ||
res = [] | ||
for obj in self.s3_client.list_objects(self.s3_bucket_name): | ||
if obj.Key.endswith("manifest.json") and not obj.Key.endswith( | ||
"topic_manifest.json"): | ||
res.append(obj.Key) | ||
return res | ||
|
||
def _download_partition_manifest(self, manifest_path): | ||
"""Find and download individual partition manifest""" | ||
manifest = self.s3_client.get_object_data(self.s3_bucket_name, | ||
manifest_path) | ||
self.logger.info(f"manifest found: {manifest}") | ||
return json.loads(manifest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5ba4f52
to
aab6caa
Compare
const cloud_storage::partition_manifest& manifest) | ||
-> std::optional<adjacent_segment_run> { | ||
adjacent_segment_run run(_archiver.get_ntp()); | ||
auto [low_watermark, high_watermark] = get_low_high_segment_size( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we rename these to size_lower_bound
and size_upper_bound
or somesuch? "watermark" is a bit ambiguous IMO
if (meta.size_bytes + s.size_bytes <= max_size) { | ||
// Move the end of the small segment run forward | ||
meta.committed_offset = s.committed_offset; | ||
meta.max_timestamp = s.max_timestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible for segments to have out of order timestamps? Should we std::max here?
bool | ||
maybe_add_segment(const cloud_storage::segment_meta& s, size_t max_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you add a comment, at least mentioning what this returns? Without reading the rest of the code, I would've assumed it "Returns true if actually added", but it looks like "Returns true if the resulting segment has more than one segment"?
if (!run.maybe_add_segment(meta, high_watermark)) { | ||
continue; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a break
?
cloud_storage::remote& _remote; | ||
/// Idle timer, used to detect idle state | ||
ss::timer<ss::lowres_clock> _idle_timer; | ||
ss::timer<ss::lowres_clock> _epoch_timer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we can call this _force_housekeep_timer
or somesuch? "epoch" is a bit too general and doesn't quite capture the context without already understanding how it works IMO
aab6caa
to
0ae4021
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR looks like it has all the necessary pieces, minus a small bug (I think). Looks like CI is unhappy with it though (segfaults in most DT tests).
} | ||
} | ||
|
||
void adjacent_segment_merger::interrupt() { _as.request_abort(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this abort_source
be used for anything else? At the moment it just looks like a bool, and we don't really use it in our async work.
co_await dynamic_cast<adjacent_segment_merger*>( | ||
_local_segment_merger.get()) | ||
->stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would it make sense to give housekeeping_job
an abstract stop()
method?
si_settings = SISettings( | ||
test_context, | ||
cloud_storage_max_connections=10, | ||
log_segment_size=0x10000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: probably easier for future readers as 65536
def setUp(self): | ||
super().setUp() # topic is created here | ||
|
||
def tearDown(self): | ||
self.cloud_storage_client.empty_bucket(self.bucket_name) | ||
super().tearDown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can omit all of these? They should happen by default already
# Every 'produce' call should create at least one segment | ||
# in the cloud which is 1MiB | ||
self.kafka_tools.produce(self.topic, 1024, 1024, acks) | ||
time.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be non-flaky to add a wait that we have more than one segment in the cloud?
0ae4021
to
9196d9e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI failure looks like #8293
Seems like this has all the major components in; if we're in a rush to get this merged, we can address feedback in follow-on PRs. Would be good to get a look from others, since the latest commits landed recently
The service is not doing anything yet. This is only a scaffolding for the next commits. It contains the intrusive list of all ntp_archiver instances. The instances are registered by the cloud::partition after being created. The upload_housekeeping_service is created in application.cc as a sharded service and passed to the partition_manager and further to the partition.
The method can be used to compare to retry_chain_node objects. If they both have the same root the method will return true.
The method can be used to subscribe to usage events. Method returns a future that will be available when the remote will be used to upload/download/delete a segment or upload/download a manifest. The caller can then subscribe to the next event.
The idle timeout is used by the cloud storage to decide when the system becomes idle. If no uploads or downloads happend during idle timeout the system is considered being idle and housekeeping jobs could be triggered without waiting for full cloud storage housekeeping interval.
Implement scheduling logic for upload housekeeping service. The service is not tied to ntp-archiver anymore. It schedules housekeeping_job objects. The ntp_archiver is supposed to register its periodic job which will be executed by the service. The service runs all jobs once per epoch. The epoch is one cloud_storage_housekeeping_interval_ms period. If the cloud storage is idle for cloud_storage_idle_timeout_ms the housekeeping may be triggered earlier. But in this case it may also be put on pause if the cloud storage will become active again. The goal is to prevent housekeeping to run in bursts and run it more evently if possible.
This is a wrapper for seastar::sleep_abortable which accepts more than one abourt source. Can be useful when you can't decide which abort_source to pick!
The parameter 'cloud_storage_segment_size_min' represents minimal segment size that redpanda should re-upload to the cloud storage.
The object represents a series of adjacent segments. It contains inclusive begin and end offsets, number of segments and their total size.
segments. This is needed to reupload small segments.
The object can be used to re-upload segments from local data.
The housekeeping job is a local_adjacent_segment_merger which uploads segments from local disk to S3 in case if there are a lot of small segments. The 'ntp_archiver' is extended. The method that returns next upload candidate based on predicate is added. The new 'upload' method can be used to upload this upload cadidate to the cloud storage. Both methods are supposed to be used externally, by the upload_housekeeping_serivce. The 'upload' method is taking hold of the semaphore that protects the upload path. The 'upload', 'upload_tx, and 'upload_segment' methods are now accept the 'retry_chain_node' optinally. By default they're using internal retry chain of the 'ntp_archiver'. But when 'upload_housekeeping_service' is calling 'upload' method it has to pass its own 'retry_chain_node' to supress notification that the upload may trigger. Otherwise the first upload done by the 'upload_housekeeping_service' will send notification back to housekeeping service and will put subsequent uploads on pause. The 'find_upload_candidates' first uses the predicate to find candidates in the manifest. Then it uses 'segment_collector' to create a multi-segment upload candidate with locks. The 'local_adjacent_segment_merger' uses predicate that scans manifest for the next series of small segments that can be merged and re-upload. The merging is not happenes on disk, only in the cloud storage.
The test verifies that adjacent segment merging works for data present on disk.
Previously, upload_candidate was used only to represent data stored locally. With adjacent segment merging we no longer upload only local data. In order to support reupload of remote data the struct was extended with 'remote_sources' field. The field contains list of object ids of segments in the cloud storage. Valid struct can only contain 'sources' or 'remote_sources' but not both of them. The 'adjacent_segments_run' was also updated. Instead of storing individual fields it now stores 'cloud_storage::segment_meta'. This is needed to compute segment_meta for the reuploaded data. The 'find_reupload_candidate' of the 'ntp_archiver' can now find both local and remote upload candidates. The 'upload' method of the 'ntp_archiver' have two separate implementations for local and remote data. The latter one is not implemented yet. The 'local_adjacent_segment_merger' is now 'adjacent_segment_merger' and can be used to upload both local and remote (when 'upload' method will be fully implemented).
Replace three methods with the same implementation: upload_loop_can_continue, sync_manifest_loop_can_continue, and housekeeping_can_continue. The reason behind this is that the implementation is the same and it could be confusing to have three different methods for all cases where we are updating the metadata.
into test_archival_service_rpunit binary.
Explicitly deregister housekeeping jobs before partition stops the ntp_archiver instance.
9196d9e
to
ac1204d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again nothing truly blocking, as long as we address the follow-ups before the release
/// When the underlying partition is stopped or the | ||
/// entire shard is stopped the job is 'completed' | ||
/// by calling 'complete' method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: needs an update? Or the completed()
method is in a follow-up PR
@@ -1014,6 +1016,13 @@ void application::wire_up_redpanda_services(model::node_id node_id) { | |||
cloud_configs.local().bucket_name, | |||
std::ref(cloud_storage_api)) | |||
.get(); | |||
|
|||
construct_service( | |||
_archival_upload_housekeeping, std::ref(cloud_storage_api)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should pass in the archival scheduling group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will add this in the followup
This PR introduces shard local
upload_housekeeping_service
which can run jobs in the background. Thentp_archiver
instances can create and registerhousekeeping_job
objects. Theupload_housekeeping_service
then will run these jobs periodically.The service uses the following scheduling strategy:
cloud_storage::remote
is idle then the service activates and executes jobs one by one;ntp_archiver
orremote_partition
objects starts using the cloud storage api the service pauses execution without interrupting the current job forcibly.cloud_storage_housekeeping_interval_ms
passes the service goes intodraining
mode and executes all jobs which are not executed yet.This approach guarantees that every registered job will be executed once per
cloud_storage_housekeeping_interval_ms
.The
remote
object was extended with notification mechanism. It can be used to subscribe to all cloud storage api events. There is a mechanism that allows the subscriber to filter out some events. This mechanism is used to filter out events triggered by housekeeping jobs.There are two housekeeping jobs per archiver. Local adjacent segment reupload and remote adjacent segment reupload. The first one reuploads small segments in case if data is available on disk. The remote job downloads the segments from the cloud to reupload them.
Backports Required
UX Changes
Release Notes
Features