Skip to content

Commit

Permalink
storage: fall back to cluster-default cleanup policy
Browse files Browse the repository at this point in the history
  • Loading branch information
pgellert committed May 14, 2024
1 parent ebe7a24 commit 6c7d2ca
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 29 deletions.
7 changes: 6 additions & 1 deletion src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -734,16 +734,21 @@ ss::future<> partition::update_configuration(topic_properties properties) {
&& model::is_archival_enabled(
new_ntp_config.shadow_indexing_mode.value());

// TODO: we are just comparing the topic-level configs here, whereas the
// corresponding cluster configs used as fallback values might also change.
// We should reconcile if that changes as well.
bool new_compaction_status
= new_ntp_config.cleanup_policy_bitflags.has_value()
&& (new_ntp_config.cleanup_policy_bitflags.value()
& model::cleanup_policy_bitflags::compaction)
== model::cleanup_policy_bitflags::compaction;
bool old_compaction_status = old_ntp_config.has_compacted_override();

if (
old_ntp_config.is_archival_enabled() != new_archival
|| old_ntp_config.is_read_replica_mode_enabled()
!= new_ntp_config.read_replica
|| old_ntp_config.is_compacted() != new_compaction_status) {
|| old_compaction_status != new_compaction_status) {
cloud_storage_changed = true;
}

Expand Down
29 changes: 14 additions & 15 deletions src/v/cluster/partition_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,11 @@ std::ostream& operator<<(std::ostream& o, const retention& r) {
return o;
}

static retention
get_retention_policy(const storage::ntp_config::default_overrides& prop) {
auto flags = prop.cleanup_policy_bitflags;
static retention get_retention_policy(const storage::ntp_config& prop) {
auto flags = prop.cleanup_policy();
if (
flags
&& (flags.value() & model::cleanup_policy_bitflags::deletion)
== model::cleanup_policy_bitflags::deletion) {
(flags & model::cleanup_policy_bitflags::deletion)
== model::cleanup_policy_bitflags::deletion) {
// If a space constraint is set on the topic, use that: otherwise
// use time based constraint if present. If total retention setting
// is less than local retention setting, take the smallest.
Expand All @@ -261,17 +259,18 @@ get_retention_policy(const storage::ntp_config::default_overrides& prop) {
//
// This will also drop the compact settings and replace it with
// delete.
if (prop.retention_local_target_bytes.has_optional_value()) {
auto v = prop.retention_local_target_bytes.value();
auto overrides = prop.get_overrides();
if (overrides.retention_local_target_bytes.has_optional_value()) {
auto v = overrides.retention_local_target_bytes.value();

if (prop.retention_bytes.has_optional_value()) {
v = std::min(prop.retention_bytes.value(), v);
if (overrides.retention_bytes.has_optional_value()) {
v = std::min(overrides.retention_bytes.value(), v);
}
return size_bound_deletion_parameters{v};
} else if (prop.retention_local_target_ms.has_optional_value()) {
auto v = prop.retention_local_target_ms.value();
if (prop.retention_time.has_optional_value()) {
v = std::min(prop.retention_time.value(), v);
} else if (overrides.retention_local_target_ms.has_optional_value()) {
auto v = overrides.retention_local_target_ms.value();
if (overrides.retention_time.has_optional_value()) {
v = std::min(overrides.retention_time.value(), v);
}
return time_bound_deletion_parameters{v};
}
Expand Down Expand Up @@ -299,7 +298,7 @@ static model::offset get_prev_offset(model::offset o) {
// entry point for the whole thing
ss::future<log_recovery_result> partition_downloader::download_log() {
auto prefix = std::filesystem::path(_ntpc.work_directory());
auto retention = get_retention_policy(_ntpc.get_overrides());
auto retention = get_retention_policy(_ntpc);
vlog(
_ctxlog.info,
"The target path: {}, ntp-config revision: {}, retention: {}",
Expand Down
37 changes: 24 additions & 13 deletions src/v/storage/ntp_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,23 @@ class ntp_config {

bool has_overrides() const { return _overrides != nullptr; }

bool is_compacted() const {
if (_overrides && _overrides->cleanup_policy_bitflags) {
return (_overrides->cleanup_policy_bitflags.value()
& model::cleanup_policy_bitflags::compaction)
== model::cleanup_policy_bitflags::compaction;
bool has_compacted_override() const {
auto cp_override = cleanup_policy_override();
if (!cp_override) {
return false;
}
return false;
return (cp_override.value()
& model::cleanup_policy_bitflags::compaction)
== model::cleanup_policy_bitflags::compaction;
}

bool is_compacted() const {
return (cleanup_policy() & model::cleanup_policy_bitflags::compaction)
== model::cleanup_policy_bitflags::compaction;
}

bool is_collectable() const {
// has no overrides
if (!_overrides || !_overrides->cleanup_policy_bitflags) {
return true;
}
// check if deletion bitflag is set
return (_overrides->cleanup_policy_bitflags.value()
& model::cleanup_policy_bitflags::deletion)
return (cleanup_policy() & model::cleanup_policy_bitflags::deletion)
== model::cleanup_policy_bitflags::deletion;
}

Expand Down Expand Up @@ -292,6 +292,17 @@ class ntp_config {
: cluster_default;
}

std::optional<model::cleanup_policy_bitflags>
cleanup_policy_override() const {
return _overrides ? _overrides->cleanup_policy_bitflags : std::nullopt;
}

model::cleanup_policy_bitflags cleanup_policy() const {
const auto& cluster_default
= config::shard_local_cfg().log_cleanup_policy();
return cleanup_policy_override().value_or(cluster_default);
}

private:
model::ntp _ntp;
/// \brief currently this is the basedir. In the future
Expand Down

0 comments on commit 6c7d2ca

Please sign in to comment.