From 007eea84b525b2b537894f676b33d4f9e6a38620 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Tue, 7 May 2024 16:58:53 +0100 Subject: [PATCH] archival: clamp uploads to committed offset The archival/tiered storage correctness assumption builds on the (wrong) assumption that LSO is monotonic. Tiered storage doesn't have a concept of suffix truncation so if that would happen it would lead violations of correctness properties and diverging logs/undefined behavior. However, we have discovered that property does not hold if there are no in-progress transaction and acks=0/1 or write caching is in use because LSO falls back to "last visible index"[^1] which can get truncated. Ref https://github.com/redpanda-data/redpanda/issues/18244 [^1]: https://github.com/redpanda-data/redpanda/blob/88ac775f9f7954330732024abfa6e9ed5c9c11fd/src/v/cluster/rm_stm.cc#L1322 --- src/v/archival/ntp_archiver_service.cc | 18 ++++++++++++++---- src/v/archival/ntp_archiver_service.h | 7 +++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 8c9ff012b53c9..018b9f338f8a5 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -2026,15 +2026,25 @@ ss::future ntp_archiver::wait_all_scheduled_uploads( .compacted_upload_result = compacted_result}; } +model::offset ntp_archiver::max_uploadable_offset() const { + // We impose an additional (LSO) constraint on the uploadable offset to + // as we need to have a complete index of aborted transactions if any + // before we can upload a segment. + return std::min(_parent.last_stable_offset(), _parent.committed_offset()); +} + ss::future ntp_archiver::upload_next_candidates( std::optional lso_override) { - vlog(_rtclog.debug, "Uploading next candidates called for {}", _ntp); - auto last_stable_offset = lso_override ? *lso_override - : _parent.last_stable_offset(); + auto max_offset = lso_override ? *lso_override : max_uploadable_offset(); + vlog( + _rtclog.debug, + "Uploading next candidates called for {} with max_offset={}", + _ntp, + max_offset); ss::gate::holder holder(_gate); try { auto units = co_await ss::get_units(_mutex, 1, _as); - auto scheduled_uploads = co_await schedule_uploads(last_stable_offset); + auto scheduled_uploads = co_await schedule_uploads(max_offset); co_return co_await wait_all_scheduled_uploads( std::move(scheduled_uploads)); } catch (const ss::gate_closed_exception&) { diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index 4d0e15c4fb9d7..b456ae4990df2 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -175,6 +175,13 @@ class ntp_archiver { auto operator<=>(const batch_result&) const = default; }; + /// Compute the maximum offset that is safe to be uploaded to the cloud. + /// + /// It must be guaranteed that this offset is monotonically increasing/ + /// can never go backwards. Otherwise, the local and cloud logs will + /// diverge leading to undefined behavior. + model::offset max_uploadable_offset() const; + /// \brief Upload next set of segments to S3 (if any) /// The semaphore is used to track number of parallel uploads. The method /// will pick not more than '_concurrency' candidates and start