From 256add48480b9ea9f1462c83458af60df1a0716c Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Tue, 7 May 2024 16:58:53 +0100 Subject: [PATCH 1/3] archival: clamp uploads to committed offset The archival/tiered storage correctness assumption builds on the (wrong) assumption that LSO is monotonic. Tiered storage doesn't have a concept of suffix truncation so if that would happen it would lead violations of correctness properties and diverging logs/undefined behavior. However, we have discovered that property does not hold if there are no in-progress transaction and acks=0/1 or write caching is in use because LSO falls back to "last visible index"[^1] which can get truncated. Ref https://github.com/redpanda-data/redpanda/issues/18244 [^1]: https://github.com/redpanda-data/redpanda/blob/88ac775f9f7954330732024abfa6e9ed5c9c11fd/src/v/cluster/rm_stm.cc#L1322 (cherry picked from commit ff358ccfe4c4247881de9750c0475f60beeea1e9) --- src/v/archival/ntp_archiver_service.cc | 29 +++++++++++++++++++------- src/v/archival/ntp_archiver_service.h | 16 +++++++++++--- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index c95230f3348f..5068901a0165 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -1625,7 +1625,7 @@ ntp_archiver::schedule_single_upload(const upload_context& upload_ctx) { } ss::future> -ntp_archiver::schedule_uploads(model::offset last_stable_offset) { +ntp_archiver::schedule_uploads(model::offset max_offset_exclusive) { // We have to increment last offset to guarantee progress. // The manifest's last offset contains dirty_offset of the // latest uploaded segment but '_policy' requires offset that @@ -1649,7 +1649,7 @@ ntp_archiver::schedule_uploads(model::offset last_stable_offset) { params.push_back({ .upload_kind = segment_upload_kind::non_compacted, .start_offset = start_upload_offset, - .last_offset = last_stable_offset, + .last_offset = max_offset_exclusive, .allow_reuploads = allow_reuploads_t::no, .archiver_term = _start_term, }); @@ -2033,15 +2033,30 @@ ss::future ntp_archiver::wait_all_scheduled_uploads( .compacted_upload_result = compacted_result}; } +model::offset ntp_archiver::max_uploadable_offset_exclusive() const { + // We impose an additional (LSO) constraint on the uploadable offset to + // as we need to have a complete index of aborted transactions if any + // before we can upload a segment. + return std::min( + _parent.last_stable_offset(), + model::next_offset(_parent.committed_offset())); +} + ss::future ntp_archiver::upload_next_candidates( - std::optional lso_override) { - vlog(_rtclog.debug, "Uploading next candidates called for {}", _ntp); - auto last_stable_offset = lso_override ? *lso_override - : _parent.last_stable_offset(); + std::optional max_offset_override_exclusive) { + auto max_offset_exclusive = max_offset_override_exclusive + ? *max_offset_override_exclusive + : max_uploadable_offset_exclusive(); + vlog( + _rtclog.debug, + "Uploading next candidates called for {} with max_offset_exclusive={}", + _ntp, + max_offset_exclusive); ss::gate::holder holder(_gate); try { auto units = co_await ss::get_units(_mutex, 1, _as); - auto scheduled_uploads = co_await schedule_uploads(last_stable_offset); + auto scheduled_uploads = co_await schedule_uploads( + max_offset_exclusive); co_return co_await wait_all_scheduled_uploads( std::move(scheduled_uploads)); } catch (const ss::gate_closed_exception&) { diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index f9e569fff45e..9296a6676fb4 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -178,15 +178,25 @@ class ntp_archiver { auto operator<=>(const batch_result&) const = default; }; + /// Compute the maximum offset that is safe to be uploaded to the cloud. + /// + /// It must be guaranteed that this offset is monotonically increasing/ + /// can never go backwards. Otherwise, the local and cloud logs will + /// diverge leading to undefined behavior. + model::offset max_uploadable_offset_exclusive() const; + /// \brief Upload next set of segments to S3 (if any) /// The semaphore is used to track number of parallel uploads. The method /// will pick not more than '_concurrency' candidates and start /// uploading them. /// - /// \param lso_override last stable offset override + /// \param max_offset_override_exclusive Overrides the maximum offset + /// that can be uploaded. If nullopt, the maximum offset is + /// calculated automatically. /// \return future that returns number of uploaded/failed segments virtual ss::future upload_next_candidates( - std::optional last_stable_offset_override = std::nullopt); + std::optional max_offset_override_exclusive + = std::nullopt); ss::future sync_manifest(); @@ -427,7 +437,7 @@ class ntp_archiver { /// Start all uploads ss::future> - schedule_uploads(model::offset last_stable_offset); + schedule_uploads(model::offset max_offset_exclusive); ss::future> schedule_uploads(std::vector loop_contexts); From 360d8f94e27fcc68f3db04f9c6b67b833087c180 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 8 May 2024 14:28:49 +0100 Subject: [PATCH 2/3] archival: rename upload_context::last_offset to end_offset_exclusive This communicates better the type of offset. No functional changes/cosmetic only. (cherry picked from commit 2647d9f8a6bf5b8248281076deeed095de58218e) --- src/v/archival/ntp_archiver_service.cc | 12 ++++++------ src/v/archival/ntp_archiver_service.h | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 5068901a0165..b209c0373524 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -1550,7 +1550,7 @@ ntp_archiver::do_schedule_single_upload( ss::future ntp_archiver::schedule_single_upload(const upload_context& upload_ctx) { auto start_upload_offset = upload_ctx.start_offset; - auto last_stable_offset = upload_ctx.last_offset; + auto last_stable_offset = upload_ctx.end_offset_exclusive; auto log = _parent.log(); @@ -1649,7 +1649,7 @@ ntp_archiver::schedule_uploads(model::offset max_offset_exclusive) { params.push_back({ .upload_kind = segment_upload_kind::non_compacted, .start_offset = start_upload_offset, - .last_offset = max_offset_exclusive, + .end_offset_exclusive = max_offset_exclusive, .allow_reuploads = allow_reuploads_t::no, .archiver_term = _start_term, }); @@ -1660,7 +1660,7 @@ ntp_archiver::schedule_uploads(model::offset max_offset_exclusive) { params.push_back({ .upload_kind = segment_upload_kind::compacted, .start_offset = compacted_segments_upload_start, - .last_offset = model::offset::max(), + .end_offset_exclusive = model::offset::max(), .allow_reuploads = allow_reuploads_t::yes, .archiver_term = _start_term, }); @@ -1681,7 +1681,7 @@ ntp_archiver::schedule_uploads(std::vector loop_contexts) { "offset: {}, last offset: {}, uploads remaining: {}", ctx.upload_kind, ctx.start_offset, - ctx.last_offset, + ctx.end_offset_exclusive, uploads_remaining); break; } @@ -1691,13 +1691,13 @@ ntp_archiver::schedule_uploads(std::vector loop_contexts) { "scheduling uploads, start offset: {}, last offset: {}, upload kind: " "{}, uploads remaining: {}", ctx.start_offset, - ctx.last_offset, + ctx.end_offset_exclusive, ctx.upload_kind, uploads_remaining); // this metric is only relevant for non compacted uploads. if (ctx.upload_kind == segment_upload_kind::non_compacted) { - _probe->upload_lag(ctx.last_offset - ctx.start_offset); + _probe->upload_lag(ctx.end_offset_exclusive - ctx.start_offset); } std::exception_ptr ep; diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index 9296a6676fb4..00a2863e3300 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -420,7 +420,7 @@ class ntp_archiver { /// The next scheduled upload will start from this offset model::offset start_offset; /// Uploads will stop at this offset - model::offset last_offset; + model::offset end_offset_exclusive; /// Controls checks for reuploads, compacted segments have this /// check disabled allow_reuploads_t allow_reuploads; From 713682d2341e6046cfb919fa395e810700712052 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 8 May 2024 22:24:15 +0100 Subject: [PATCH 3/3] archival: add a note about max_offset_override_exclusive Outrageously verbose but for a good reason. (cherry picked from commit 15ac2810ab70a2e8ca8e7e1f738ab7d836759f63) --- src/v/archival/ntp_archiver_service.cc | 6 +++--- src/v/archival/ntp_archiver_service.h | 10 ++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index b209c0373524..fa435ac49ac0 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -2043,9 +2043,9 @@ model::offset ntp_archiver::max_uploadable_offset_exclusive() const { } ss::future ntp_archiver::upload_next_candidates( - std::optional max_offset_override_exclusive) { - auto max_offset_exclusive = max_offset_override_exclusive - ? *max_offset_override_exclusive + std::optional unsafe_max_offset_override_exclusive) { + auto max_offset_exclusive = unsafe_max_offset_override_exclusive + ? *unsafe_max_offset_override_exclusive : max_uploadable_offset_exclusive(); vlog( _rtclog.debug, diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index 00a2863e3300..f1a6e4bb5eec 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -190,12 +190,14 @@ class ntp_archiver { /// will pick not more than '_concurrency' candidates and start /// uploading them. /// - /// \param max_offset_override_exclusive Overrides the maximum offset - /// that can be uploaded. If nullopt, the maximum offset is - /// calculated automatically. + /// \param unsafe_max_offset_override_exclusive Overrides the maximum offset + /// that can be uploaded. ONLY FOR TESTING. It is not clamped to a + /// safe value/committed offset as some tests work directly with + /// segments bypassing the raft thus not advancing the committed + /// offset. /// \return future that returns number of uploaded/failed segments virtual ss::future upload_next_candidates( - std::optional max_offset_override_exclusive + std::optional unsafe_max_offset_override_exclusive = std::nullopt); ss::future sync_manifest();