Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.3.x] archival: clamp uploads to committed offset #18393

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 27 additions & 12 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1550,7 +1550,7 @@ ntp_archiver::do_schedule_single_upload(
ss::future<ntp_archiver::scheduled_upload>
ntp_archiver::schedule_single_upload(const upload_context& upload_ctx) {
auto start_upload_offset = upload_ctx.start_offset;
auto last_stable_offset = upload_ctx.last_offset;
auto last_stable_offset = upload_ctx.end_offset_exclusive;

auto log = _parent.log();

Expand Down Expand Up @@ -1625,7 +1625,7 @@ ntp_archiver::schedule_single_upload(const upload_context& upload_ctx) {
}

ss::future<std::vector<ntp_archiver::scheduled_upload>>
ntp_archiver::schedule_uploads(model::offset last_stable_offset) {
ntp_archiver::schedule_uploads(model::offset max_offset_exclusive) {
// We have to increment last offset to guarantee progress.
// The manifest's last offset contains dirty_offset of the
// latest uploaded segment but '_policy' requires offset that
Expand All @@ -1649,7 +1649,7 @@ ntp_archiver::schedule_uploads(model::offset last_stable_offset) {
params.push_back({
.upload_kind = segment_upload_kind::non_compacted,
.start_offset = start_upload_offset,
.last_offset = last_stable_offset,
.end_offset_exclusive = max_offset_exclusive,
.allow_reuploads = allow_reuploads_t::no,
.archiver_term = _start_term,
});
Expand All @@ -1660,7 +1660,7 @@ ntp_archiver::schedule_uploads(model::offset last_stable_offset) {
params.push_back({
.upload_kind = segment_upload_kind::compacted,
.start_offset = compacted_segments_upload_start,
.last_offset = model::offset::max(),
.end_offset_exclusive = model::offset::max(),
.allow_reuploads = allow_reuploads_t::yes,
.archiver_term = _start_term,
});
Expand All @@ -1681,7 +1681,7 @@ ntp_archiver::schedule_uploads(std::vector<upload_context> loop_contexts) {
"offset: {}, last offset: {}, uploads remaining: {}",
ctx.upload_kind,
ctx.start_offset,
ctx.last_offset,
ctx.end_offset_exclusive,
uploads_remaining);
break;
}
Expand All @@ -1691,13 +1691,13 @@ ntp_archiver::schedule_uploads(std::vector<upload_context> loop_contexts) {
"scheduling uploads, start offset: {}, last offset: {}, upload kind: "
"{}, uploads remaining: {}",
ctx.start_offset,
ctx.last_offset,
ctx.end_offset_exclusive,
ctx.upload_kind,
uploads_remaining);

// this metric is only relevant for non compacted uploads.
if (ctx.upload_kind == segment_upload_kind::non_compacted) {
_probe->upload_lag(ctx.last_offset - ctx.start_offset);
_probe->upload_lag(ctx.end_offset_exclusive - ctx.start_offset);
}

std::exception_ptr ep;
Expand Down Expand Up @@ -2033,15 +2033,30 @@ ss::future<ntp_archiver::batch_result> ntp_archiver::wait_all_scheduled_uploads(
.compacted_upload_result = compacted_result};
}

model::offset ntp_archiver::max_uploadable_offset_exclusive() const {
// We impose an additional (LSO) constraint on the uploadable offset to
// as we need to have a complete index of aborted transactions if any
// before we can upload a segment.
return std::min(
_parent.last_stable_offset(),
model::next_offset(_parent.committed_offset()));
}

ss::future<ntp_archiver::batch_result> ntp_archiver::upload_next_candidates(
std::optional<model::offset> lso_override) {
vlog(_rtclog.debug, "Uploading next candidates called for {}", _ntp);
auto last_stable_offset = lso_override ? *lso_override
: _parent.last_stable_offset();
std::optional<model::offset> unsafe_max_offset_override_exclusive) {
auto max_offset_exclusive = unsafe_max_offset_override_exclusive
? *unsafe_max_offset_override_exclusive
: max_uploadable_offset_exclusive();
vlog(
_rtclog.debug,
"Uploading next candidates called for {} with max_offset_exclusive={}",
_ntp,
max_offset_exclusive);
ss::gate::holder holder(_gate);
try {
auto units = co_await ss::get_units(_mutex, 1, _as);
auto scheduled_uploads = co_await schedule_uploads(last_stable_offset);
auto scheduled_uploads = co_await schedule_uploads(
max_offset_exclusive);
co_return co_await wait_all_scheduled_uploads(
std::move(scheduled_uploads));
} catch (const ss::gate_closed_exception&) {
Expand Down
20 changes: 16 additions & 4 deletions src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,27 @@ class ntp_archiver {
auto operator<=>(const batch_result&) const = default;
};

/// Compute the maximum offset that is safe to be uploaded to the cloud.
///
/// It must be guaranteed that this offset is monotonically increasing/
/// can never go backwards. Otherwise, the local and cloud logs will
/// diverge leading to undefined behavior.
model::offset max_uploadable_offset_exclusive() const;

/// \brief Upload next set of segments to S3 (if any)
/// The semaphore is used to track number of parallel uploads. The method
/// will pick not more than '_concurrency' candidates and start
/// uploading them.
///
/// \param lso_override last stable offset override
/// \param unsafe_max_offset_override_exclusive Overrides the maximum offset
/// that can be uploaded. ONLY FOR TESTING. It is not clamped to a
/// safe value/committed offset as some tests work directly with
/// segments bypassing the raft thus not advancing the committed
/// offset.
/// \return future that returns number of uploaded/failed segments
virtual ss::future<batch_result> upload_next_candidates(
std::optional<model::offset> last_stable_offset_override = std::nullopt);
std::optional<model::offset> unsafe_max_offset_override_exclusive
= std::nullopt);

ss::future<cloud_storage::download_result> sync_manifest();

Expand Down Expand Up @@ -410,7 +422,7 @@ class ntp_archiver {
/// The next scheduled upload will start from this offset
model::offset start_offset;
/// Uploads will stop at this offset
model::offset last_offset;
model::offset end_offset_exclusive;
/// Controls checks for reuploads, compacted segments have this
/// check disabled
allow_reuploads_t allow_reuploads;
Expand All @@ -427,7 +439,7 @@ class ntp_archiver {

/// Start all uploads
ss::future<std::vector<scheduled_upload>>
schedule_uploads(model::offset last_stable_offset);
schedule_uploads(model::offset max_offset_exclusive);

ss::future<std::vector<scheduled_upload>>
schedule_uploads(std::vector<upload_context> loop_contexts);
Expand Down