Skip to content

Commit

Permalink
archival: clamp uploads to committed offset
Browse files Browse the repository at this point in the history
The archival/tiered storage correctness assumption builds on the
(wrong) assumption that LSO is monotonic. Tiered storage doesn't have a
concept of suffix truncation so if that would happen it would lead
violations of correctness properties and diverging logs/undefined
behavior.

However, we have discovered that property does not hold if there are no
in-progress transaction and acks=0/1 or write caching is in use because
LSO falls back to "last visible index"[^1] which can get truncated.

Ref #18244

[^1]: https://github.com/redpanda-data/redpanda/blob/88ac775f9f7954330732024abfa6e9ed5c9c11fd/src/v/cluster/rm_stm.cc#L1322
  • Loading branch information
nvartolomei committed May 7, 2024
1 parent 11c0cb8 commit 007eea8
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
18 changes: 14 additions & 4 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2026,15 +2026,25 @@ ss::future<ntp_archiver::batch_result> ntp_archiver::wait_all_scheduled_uploads(
.compacted_upload_result = compacted_result};
}

model::offset ntp_archiver::max_uploadable_offset() const {
// We impose an additional (LSO) constraint on the uploadable offset to
// as we need to have a complete index of aborted transactions if any
// before we can upload a segment.
return std::min(_parent.last_stable_offset(), _parent.committed_offset());
}

ss::future<ntp_archiver::batch_result> ntp_archiver::upload_next_candidates(
std::optional<model::offset> lso_override) {
vlog(_rtclog.debug, "Uploading next candidates called for {}", _ntp);
auto last_stable_offset = lso_override ? *lso_override
: _parent.last_stable_offset();
auto max_offset = lso_override ? *lso_override : max_uploadable_offset();
vlog(
_rtclog.debug,
"Uploading next candidates called for {} with max_offset={}",
_ntp,
max_offset);
ss::gate::holder holder(_gate);
try {
auto units = co_await ss::get_units(_mutex, 1, _as);
auto scheduled_uploads = co_await schedule_uploads(last_stable_offset);
auto scheduled_uploads = co_await schedule_uploads(max_offset);
co_return co_await wait_all_scheduled_uploads(
std::move(scheduled_uploads));
} catch (const ss::gate_closed_exception&) {
Expand Down
7 changes: 7 additions & 0 deletions src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ class ntp_archiver {
auto operator<=>(const batch_result&) const = default;
};

/// Compute the maximum offset that is safe to be uploaded to the cloud.
///
/// It must be guaranteed that this offset is monotonically increasing/
/// can never go backwards. Otherwise, the local and cloud logs will
/// diverge leading to undefined behavior.
model::offset max_uploadable_offset() const;

/// \brief Upload next set of segments to S3 (if any)
/// The semaphore is used to track number of parallel uploads. The method
/// will pick not more than '_concurrency' candidates and start
Expand Down

0 comments on commit 007eea8

Please sign in to comment.