diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index c95230f3348f..fa435ac49ac0 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -1550,7 +1550,7 @@ ntp_archiver::do_schedule_single_upload( ss::future ntp_archiver::schedule_single_upload(const upload_context& upload_ctx) { auto start_upload_offset = upload_ctx.start_offset; - auto last_stable_offset = upload_ctx.last_offset; + auto last_stable_offset = upload_ctx.end_offset_exclusive; auto log = _parent.log(); @@ -1625,7 +1625,7 @@ ntp_archiver::schedule_single_upload(const upload_context& upload_ctx) { } ss::future> -ntp_archiver::schedule_uploads(model::offset last_stable_offset) { +ntp_archiver::schedule_uploads(model::offset max_offset_exclusive) { // We have to increment last offset to guarantee progress. // The manifest's last offset contains dirty_offset of the // latest uploaded segment but '_policy' requires offset that @@ -1649,7 +1649,7 @@ ntp_archiver::schedule_uploads(model::offset last_stable_offset) { params.push_back({ .upload_kind = segment_upload_kind::non_compacted, .start_offset = start_upload_offset, - .last_offset = last_stable_offset, + .end_offset_exclusive = max_offset_exclusive, .allow_reuploads = allow_reuploads_t::no, .archiver_term = _start_term, }); @@ -1660,7 +1660,7 @@ ntp_archiver::schedule_uploads(model::offset last_stable_offset) { params.push_back({ .upload_kind = segment_upload_kind::compacted, .start_offset = compacted_segments_upload_start, - .last_offset = model::offset::max(), + .end_offset_exclusive = model::offset::max(), .allow_reuploads = allow_reuploads_t::yes, .archiver_term = _start_term, }); @@ -1681,7 +1681,7 @@ ntp_archiver::schedule_uploads(std::vector loop_contexts) { "offset: {}, last offset: {}, uploads remaining: {}", ctx.upload_kind, ctx.start_offset, - ctx.last_offset, + ctx.end_offset_exclusive, uploads_remaining); break; } @@ -1691,13 +1691,13 @@ ntp_archiver::schedule_uploads(std::vector loop_contexts) { "scheduling uploads, start offset: {}, last offset: {}, upload kind: " "{}, uploads remaining: {}", ctx.start_offset, - ctx.last_offset, + ctx.end_offset_exclusive, ctx.upload_kind, uploads_remaining); // this metric is only relevant for non compacted uploads. if (ctx.upload_kind == segment_upload_kind::non_compacted) { - _probe->upload_lag(ctx.last_offset - ctx.start_offset); + _probe->upload_lag(ctx.end_offset_exclusive - ctx.start_offset); } std::exception_ptr ep; @@ -2033,15 +2033,30 @@ ss::future ntp_archiver::wait_all_scheduled_uploads( .compacted_upload_result = compacted_result}; } +model::offset ntp_archiver::max_uploadable_offset_exclusive() const { + // We impose an additional (LSO) constraint on the uploadable offset to + // as we need to have a complete index of aborted transactions if any + // before we can upload a segment. + return std::min( + _parent.last_stable_offset(), + model::next_offset(_parent.committed_offset())); +} + ss::future ntp_archiver::upload_next_candidates( - std::optional lso_override) { - vlog(_rtclog.debug, "Uploading next candidates called for {}", _ntp); - auto last_stable_offset = lso_override ? *lso_override - : _parent.last_stable_offset(); + std::optional unsafe_max_offset_override_exclusive) { + auto max_offset_exclusive = unsafe_max_offset_override_exclusive + ? *unsafe_max_offset_override_exclusive + : max_uploadable_offset_exclusive(); + vlog( + _rtclog.debug, + "Uploading next candidates called for {} with max_offset_exclusive={}", + _ntp, + max_offset_exclusive); ss::gate::holder holder(_gate); try { auto units = co_await ss::get_units(_mutex, 1, _as); - auto scheduled_uploads = co_await schedule_uploads(last_stable_offset); + auto scheduled_uploads = co_await schedule_uploads( + max_offset_exclusive); co_return co_await wait_all_scheduled_uploads( std::move(scheduled_uploads)); } catch (const ss::gate_closed_exception&) { diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index f9e569fff45e..f1a6e4bb5eec 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -178,15 +178,27 @@ class ntp_archiver { auto operator<=>(const batch_result&) const = default; }; + /// Compute the maximum offset that is safe to be uploaded to the cloud. + /// + /// It must be guaranteed that this offset is monotonically increasing/ + /// can never go backwards. Otherwise, the local and cloud logs will + /// diverge leading to undefined behavior. + model::offset max_uploadable_offset_exclusive() const; + /// \brief Upload next set of segments to S3 (if any) /// The semaphore is used to track number of parallel uploads. The method /// will pick not more than '_concurrency' candidates and start /// uploading them. /// - /// \param lso_override last stable offset override + /// \param unsafe_max_offset_override_exclusive Overrides the maximum offset + /// that can be uploaded. ONLY FOR TESTING. It is not clamped to a + /// safe value/committed offset as some tests work directly with + /// segments bypassing the raft thus not advancing the committed + /// offset. /// \return future that returns number of uploaded/failed segments virtual ss::future upload_next_candidates( - std::optional last_stable_offset_override = std::nullopt); + std::optional unsafe_max_offset_override_exclusive + = std::nullopt); ss::future sync_manifest(); @@ -410,7 +422,7 @@ class ntp_archiver { /// The next scheduled upload will start from this offset model::offset start_offset; /// Uploads will stop at this offset - model::offset last_offset; + model::offset end_offset_exclusive; /// Controls checks for reuploads, compacted segments have this /// check disabled allow_reuploads_t allow_reuploads; @@ -427,7 +439,7 @@ class ntp_archiver { /// Start all uploads ss::future> - schedule_uploads(model::offset last_stable_offset); + schedule_uploads(model::offset max_offset_exclusive); ss::future> schedule_uploads(std::vector loop_contexts);