Skip to content

Commit

Permalink
Merge pull request #16191 from redpanda-data/vbotbuildovich/backport-…
Browse files Browse the repository at this point in the history
…16160-v23.2.x-132

[v23.2.x] cloud_storage: use fragvec to hold replaced segments
  • Loading branch information
piyushredpanda committed Jan 20, 2024
2 parents 3fcc905 + 4059216 commit 4788dc9
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 39 deletions.
11 changes: 6 additions & 5 deletions src/v/cloud_storage/partition_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -819,14 +819,13 @@ bool partition_manifest::advance_start_offset(model::offset new_start_offset) {
return false;
}

std::vector<partition_manifest::lw_segment_meta>
fragmented_vector<partition_manifest::lw_segment_meta>
partition_manifest::lw_replaced_segments() const {
return _replaced;
return _replaced.copy();
}

std::vector<segment_meta> partition_manifest::replaced_segments() const {
std::vector<segment_meta> res;
res.reserve(_replaced.size());
fragmented_vector<segment_meta> partition_manifest::replaced_segments() const {
fragmented_vector<segment_meta> res;
for (const auto& s : _replaced) {
res.push_back(lw_segment_meta::convert(s));
}
Expand Down Expand Up @@ -2724,6 +2723,8 @@ partition_manifest_serde_from_partition_manifest(partition_manifest const& m)
(([&]<typename Src>(auto& dest, Src const& src) {
if constexpr (std::is_same_v<Src, segment_meta_cstore>) {
dest = src.to_iobuf();
} else if constexpr (reflection::is_fragmented_vector<Src>) {
dest = src.copy();
} else {
dest = src;
}
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_storage/partition_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class partition_manifest : public base_manifest {
using value = segment_meta;
using segment_map = segment_meta_cstore;
using spillover_manifest_map = segment_meta_cstore;
using replaced_segments_list = std::vector<lw_segment_meta>;
using replaced_segments_list = fragmented_vector<lw_segment_meta>;
using const_iterator = segment_map::const_iterator;

/// Generate segment name to use in the cloud
Expand Down Expand Up @@ -439,11 +439,11 @@ class partition_manifest : public base_manifest {
const_iterator segment_containing(kafka::offset o) const;

// Return collection of segments that were replaced in lightweight format.
std::vector<partition_manifest::lw_segment_meta>
fragmented_vector<partition_manifest::lw_segment_meta>
lw_replaced_segments() const;

/// Return collection of segments that were replaced by newer segments.
std::vector<segment_meta> replaced_segments() const;
fragmented_vector<segment_meta> replaced_segments() const;

/// Return the number of replaced segments currently awaiting deletion.
size_t replaced_segments_count() const;
Expand Down
59 changes: 29 additions & 30 deletions src/v/cluster/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1296,43 +1296,42 @@ void archival_metadata_stm::apply_replace_manifest(iobuf val) {
get_last_offset());
}

std::vector<cloud_storage::partition_manifest::lw_segment_meta>
fragmented_vector<cloud_storage::partition_manifest::lw_segment_meta>
archival_metadata_stm::get_segments_to_cleanup() const {
// Include replaced segments to the backlog
using lw_segment_meta = cloud_storage::partition_manifest::lw_segment_meta;
std::vector<lw_segment_meta> backlog = _manifest->lw_replaced_segments();
const fragmented_vector<lw_segment_meta> source_backlog
= _manifest->lw_replaced_segments();

// Make sure that 'replaced' list doesn't have any references to active
// segments. This is a protection from the data loss. This should not
// happen, but protects us from data loss in cases where bugs elsewhere.
auto backlog_size = backlog.size();
backlog.erase(
std::remove_if(
backlog.begin(),
backlog.end(),
[this](const lw_segment_meta& m) {
auto it = _manifest->find(m.base_offset);
if (it == _manifest->end()) {
return false;
}
auto m_name = _manifest->generate_remote_segment_name(
cloud_storage::partition_manifest::lw_segment_meta::convert(m));
auto s_name = _manifest->generate_remote_segment_name(*it);
// The segment will have the same path as the one we have in
// manifest in S3 so if we will delete it the data will be lost.
if (m_name == s_name) {
vlog(
_logger.warn,
"The replaced segment name {} collides with the segment {} "
"in the manifest. It will be removed to prevent the data "
"loss.",
m_name,
s_name);
return true;
}
return false;
}),
backlog.end());
const auto backlog_size = source_backlog.size();
fragmented_vector<lw_segment_meta> backlog;
for (const auto& m : source_backlog) {
auto it = _manifest->find(m.base_offset);
if (it == _manifest->end()) {
backlog.push_back(m);
continue;
}
auto m_name = _manifest->generate_remote_segment_name(
cloud_storage::partition_manifest::lw_segment_meta::convert(m));
auto s_name = _manifest->generate_remote_segment_name(*it);
// The segment will have the same path as the one we have in
// manifest in S3 so if we will delete it the data will be lost.
if (m_name == s_name) {
vlog(
_logger.error,
"The replaced segment name {} collides with the segment "
"{} "
"in the manifest. It will be removed to prevent the data "
"loss.",
m_name,
s_name);
continue;
}
backlog.push_back(m);
}

if (backlog.size() < backlog_size) {
vlog(
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class archival_metadata_stm final : public persisted_stm<> {

// Return list of all segments that has to be
// removed from S3.
std::vector<cloud_storage::partition_manifest::lw_segment_meta>
fragmented_vector<cloud_storage::partition_manifest::lw_segment_meta>
get_segments_to_cleanup() const;

/// Create batch builder that can be used to combine and replicate multipe
Expand Down

0 comments on commit 4788dc9

Please sign in to comment.