Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-526 k/configs: dont override topic-level cleanup policy #18284

Merged
merged 11 commits into from
Jul 12, 2024
4 changes: 2 additions & 2 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ FIXTURE_TEST(test_mixed_timequery, e2e_fixture) {
// Disable remote fetch, forcing local data usage only.
auto disable_fetch_override = storage::ntp_config::default_overrides{
.shadow_indexing_mode = model::shadow_indexing_mode::archival};
log->update_configuration(disable_fetch_override).get();
log->set_overrides(disable_fetch_override);

auto make_and_verify_timequery =
[partition](
Expand Down Expand Up @@ -940,7 +940,7 @@ FIXTURE_TEST(test_mixed_timequery, e2e_fixture) {
// Enable remote fetch.
auto allow_fetch_override = storage::ntp_config::default_overrides{
.shadow_indexing_mode = model::shadow_indexing_mode::fetch};
log->update_configuration(allow_fetch_override).get();
log->set_overrides(allow_fetch_override);

// Now, timequeries should be able to read over the whole domain [0,
// max_timestamp]
Expand Down
126 changes: 91 additions & 35 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
#include "raft/fundamental.h"
#include "raft/fwd.h"
#include "raft/state_machine_manager.h"
#include "storage/ntp_config.h"

#include <seastar/core/shared_ptr_incomplete.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/util/defer.hh>

#include <chrono>

namespace cluster {

partition::partition(
Expand All @@ -52,7 +55,8 @@ partition::partition(
, _cloud_storage_cache(cloud_storage_cache)
, _cloud_storage_probe(
ss::make_shared<cloud_storage::partition_probe>(_raft->ntp()))
, _upload_housekeeping(upload_hks) {
, _upload_housekeeping(upload_hks)
, _log_cleanup_policy(config::shard_local_cfg().log_cleanup_policy.bind()) {
// Construct cloud_storage read path (remote_partition)
if (
config::shard_local_cfg().cloud_storage_enabled()
Expand Down Expand Up @@ -81,6 +85,23 @@ partition::partition(
}
}
}

_log_cleanup_policy.watch([this]() {
if (_as.abort_requested()) {
return ss::now();
}
auto changed = _raft->log()->notify_compaction_update();
BenPope marked this conversation as resolved.
Show resolved Hide resolved
if (changed) {
vlog(
clusterlog.debug,
"[{}] updating archiver for cluster config change in "
"log_cleanup_policy",
_raft->ntp());

return restart_archiver(false);
}
return ss::now();
});
Comment on lines +89 to +104
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lazin We're doing the archiver restart synchronously on topic config updates, so I also went with making the restart on config updates synchronous here. But I wonder if that's safe or if I should offload it to a fibre? What do you think?

}

ss::future<std::error_code> partition::prefix_truncate(
Expand Down Expand Up @@ -730,59 +751,73 @@ ss::future<> partition::update_configuration(topic_properties properties) {
// Before applying change, consider whether it changes cloud storage
// mode
bool cloud_storage_changed = false;

bool old_archival = old_ntp_config.is_archival_enabled();
bool new_archival = new_ntp_config.shadow_indexing_mode
&& model::is_archival_enabled(
new_ntp_config.shadow_indexing_mode.value());

bool new_compaction_status
= new_ntp_config.cleanup_policy_bitflags.has_value()
&& (new_ntp_config.cleanup_policy_bitflags.value()
& model::cleanup_policy_bitflags::compaction)
== model::cleanup_policy_bitflags::compaction;
if (
old_ntp_config.is_archival_enabled() != new_archival
|| old_ntp_config.is_read_replica_mode_enabled()
!= new_ntp_config.read_replica
|| old_ntp_config.is_compacted() != new_compaction_status) {
auto old_retention_ms = old_ntp_config.has_overrides()
? old_ntp_config.get_overrides().retention_time
: tristate<std::chrono::milliseconds>(
std::nullopt);
auto new_retention_ms = new_ntp_config.retention_time;

auto old_retention_bytes
= old_ntp_config.has_overrides()
? old_ntp_config.get_overrides().retention_bytes
: tristate<size_t>(std::nullopt);
auto new_retention_bytes = new_ntp_config.retention_bytes;

if (old_archival != new_archival) {
vlog(
clusterlog.debug,
"[{}] updating archiver for topic config change in "
"archival_enabled",
_raft->ntp());
cloud_storage_changed = true;
}
if (old_retention_ms != new_retention_ms) {
vlog(
clusterlog.debug,
"[{}] updating archiver for topic config change in "
"retention_ms",
_raft->ntp());
cloud_storage_changed = true;
}
if (old_retention_bytes != new_retention_bytes) {
vlog(
clusterlog.debug,
"[{}] updating archiver for topic config change in "
"retention_bytes",
_raft->ntp());
cloud_storage_changed = true;
}

// Pass the configuration update into the storage layer
co_await _raft->log()->update_configuration(new_ntp_config);
_raft->log()->set_overrides(new_ntp_config);
bool compaction_changed = _raft->log()->notify_compaction_update();
if (compaction_changed) {
vlog(
clusterlog.debug,
"[{}] updating archiver for topic config change in compaction",
_raft->ntp());
cloud_storage_changed = true;
}

// Update cached instance of topic properties
if (_topic_cfg) {
_topic_cfg->properties = std::move(properties);
}

// Pass the configuration update to the raft layer
_raft->notify_config_update();

// If this partition's cloud storage mode changed, rebuild the archiver.
// This must happen after raft update, because it reads raft's
// This must happen after the raft+storage update, because it reads raft's
// ntp_config to decide whether to construct an archiver.
if (cloud_storage_changed) {
vlog(
clusterlog.debug,
"update_configuration[{}]: updating archiver for config {}",
new_ntp_config,
_raft->ntp());

auto archiver_reset_guard = co_await ssx::with_timeout_abortable(
ss::get_units(_archiver_reset_mutex, 1),
ss::lowres_clock::now() + archiver_reset_mutex_timeout,
_as);

if (_archiver) {
_upload_housekeeping.local().deregister_jobs(
_archiver->get_housekeeping_jobs());
co_await _archiver->stop();
_archiver = nullptr;
}
maybe_construct_archiver();
if (_archiver) {
_archiver->notify_topic_config();
co_await _archiver->start();
}
co_await restart_archiver(true);
} else {
vlog(
clusterlog.trace,
Expand All @@ -801,6 +836,27 @@ ss::future<> partition::update_configuration(topic_properties properties) {
}
}

ss::future<> partition::restart_archiver(bool should_notify_topic_config) {
auto archiver_reset_guard = co_await ssx::with_timeout_abortable(
ss::get_units(_archiver_reset_mutex, 1),
ss::lowres_clock::now() + archiver_reset_mutex_timeout,
_as);

if (_archiver) {
_upload_housekeeping.local().deregister_jobs(
_archiver->get_housekeeping_jobs());
co_await _archiver->stop();
_archiver = nullptr;
}
maybe_construct_archiver();
if (_archiver) {
if (should_notify_topic_config) {
_archiver->notify_topic_config();
}
co_await _archiver->start();
}
}

std::optional<model::offset>
partition::get_term_last_offset(model::term_id term) const {
auto o = _raft->log()->get_term_last_offset(term);
Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "model/record_batch_reader.h"
#include "model/timeout_clock.h"
#include "raft/replicate.h"
#include "storage/ntp_config.h"
#include "storage/translating_reader.h"
#include "storage/types.h"

Expand Down Expand Up @@ -347,6 +348,11 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
ss::future<std::optional<storage::timequery_result>>
local_timequery(storage::timequery_config, bool allow_cloud_fallback);

// Restarts the archiver
// If should_notify_topic_config is set, it marks the topic_manifest as
// dirty so that it gets reuploaded
ss::future<> restart_archiver(bool should_notify_topic_config);

consensus_ptr _raft;
ss::shared_ptr<cluster::log_eviction_stm> _log_eviction_stm;
ss::shared_ptr<cluster::rm_stm> _rm_stm;
Expand Down Expand Up @@ -376,6 +382,7 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
std::unique_ptr<cluster::topic_configuration> _topic_cfg;

ss::sharded<archival::upload_housekeeping_service>& _upload_housekeeping;
config::binding<model::cleanup_policy_bitflags> _log_cleanup_policy;

friend std::ostream& operator<<(std::ostream& o, const partition& x);
};
Expand Down
28 changes: 12 additions & 16 deletions src/v/cluster/partition_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,8 @@ std::ostream& operator<<(std::ostream& o, const retention& r) {
return o;
}

static retention
get_retention_policy(const storage::ntp_config::default_overrides& prop) {
auto flags = prop.cleanup_policy_bitflags;
if (
flags
&& (flags.value() & model::cleanup_policy_bitflags::deletion)
== model::cleanup_policy_bitflags::deletion) {
static retention get_retention_policy(const storage::ntp_config& prop) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change is needed?

if (prop.is_collectable()) {
// If a space constraint is set on the topic, use that: otherwise
// use time based constraint if present. If total retention setting
// is less than local retention setting, take the smallest.
Expand All @@ -261,17 +256,18 @@ get_retention_policy(const storage::ntp_config::default_overrides& prop) {
//
// This will also drop the compact settings and replace it with
// delete.
if (prop.retention_local_target_bytes.has_optional_value()) {
auto v = prop.retention_local_target_bytes.value();
auto overrides = prop.get_overrides();
if (overrides.retention_local_target_bytes.has_optional_value()) {
auto v = overrides.retention_local_target_bytes.value();

if (prop.retention_bytes.has_optional_value()) {
v = std::min(prop.retention_bytes.value(), v);
if (overrides.retention_bytes.has_optional_value()) {
v = std::min(overrides.retention_bytes.value(), v);
}
return size_bound_deletion_parameters{v};
} else if (prop.retention_local_target_ms.has_optional_value()) {
auto v = prop.retention_local_target_ms.value();
if (prop.retention_time.has_optional_value()) {
v = std::min(prop.retention_time.value(), v);
} else if (overrides.retention_local_target_ms.has_optional_value()) {
auto v = overrides.retention_local_target_ms.value();
if (overrides.retention_time.has_optional_value()) {
v = std::min(overrides.retention_time.value(), v);
}
return time_bound_deletion_parameters{v};
}
Expand Down Expand Up @@ -299,7 +295,7 @@ static model::offset get_prev_offset(model::offset o) {
// entry point for the whole thing
ss::future<log_recovery_result> partition_downloader::download_log() {
auto prefix = std::filesystem::path(_ntpc.work_directory());
auto retention = get_retention_policy(_ntpc.get_overrides());
auto retention = get_retention_policy(_ntpc);
vlog(
_ctxlog.info,
"The target path: {}, ntp-config revision: {}, retention: {}",
Expand Down
12 changes: 0 additions & 12 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,6 @@ ss::future<std::vector<topic_result>> topics_frontend::create_topics(
tp.cfg.properties.shadow_indexing
= _metadata_cache.get_default_shadow_indexing_mode();
}

/**
* We always override cleanup policy. i.e. topic cleanup policy will
* stay the same even if it was changed in defaults (broker
* configuration) and there was no override passed by client while
* creating a topic. The the same policy is applied in Kafka.
*/

if (!tp.cfg.properties.cleanup_policy_bitflags.has_value()) {
tp.cfg.properties.cleanup_policy_bitflags
= _metadata_cache.get_default_cleanup_policy_bitflags();
}
}

vlog(clusterlog.info, "Create topics {}", topics);
Expand Down
6 changes: 2 additions & 4 deletions src/v/config/convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,8 @@ struct convert<model::cleanup_policy_bitflags> {
static Node encode(const type& rhs) {
Node node;

auto compaction = (rhs & model::cleanup_policy_bitflags::compaction)
== model::cleanup_policy_bitflags::compaction;
auto deletion = (rhs & model::cleanup_policy_bitflags::deletion)
== model::cleanup_policy_bitflags::deletion;
auto compaction = model::is_compaction_enabled(rhs);
auto deletion = model::is_deletion_enabled(rhs);

if (compaction && deletion) {
node = "compact,delete";
Expand Down
20 changes: 3 additions & 17 deletions src/v/kafka/server/handlers/alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

#include "kafka/server/handlers/alter_configs.h"

#include "cluster/metadata_cache.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "features/feature_table.h"
Expand Down Expand Up @@ -56,8 +55,7 @@ static void parse_and_set_shadow_indexing_mode(
}

checked<cluster::topic_properties_update, alter_configs_resource_response>
create_topic_properties_update(
const request_context& ctx, alter_configs_resource& resource) {
create_topic_properties_update(alter_configs_resource& resource) {
model::topic_namespace tp_ns(
model::kafka_namespace, model::topic(resource.resource_name));
cluster::topic_properties_update update(tp_ns);
Expand Down Expand Up @@ -88,15 +86,6 @@ create_topic_properties_update(
update.custom_properties.data_policy.op
= cluster::incremental_update_operation::none;

/**
* Since 'cleanup.policy' is always defaulted to 'delete' at topic creation,
* we must special case the handling to preserve this default.
*/
update.properties.cleanup_policy_bitflags.op
= cluster::incremental_update_operation::set;
update.properties.cleanup_policy_bitflags.value
= ctx.metadata_cache().get_default_cleanup_policy_bitflags();

update.properties.record_key_schema_id_validation.op
= cluster::incremental_update_operation::set;
update.properties.record_key_schema_id_validation_compat.op
Expand Down Expand Up @@ -324,11 +313,8 @@ alter_topic_configuration(
return do_alter_topics_configuration<
alter_configs_resource,
alter_configs_resource_response>(
ctx,
std::move(resources),
validate_only,
[&ctx](alter_configs_resource& r) {
return create_topic_properties_update(ctx, r);
ctx, std::move(resources), validate_only, [](alter_configs_resource& r) {
return create_topic_properties_update(r);
});
}

Expand Down
3 changes: 1 addition & 2 deletions src/v/kafka/server/handlers/configs/config_response_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,7 @@ config_response_container_t make_topic_configs(
maybe_make_documentation(
include_documentation,
config::shard_local_cfg().log_cleanup_policy.desc()),
&describe_as_string<model::cleanup_policy_bitflags>,
true);
&describe_as_string<model::cleanup_policy_bitflags>);

const std::string_view docstring{
topic_properties.is_compacted()
Expand Down
3 changes: 1 addition & 2 deletions src/v/kafka/server/handlers/delete_records.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ validate_at_topic_level(request_context& ctx, const delete_records_topic& t) {
return true;
}
const auto& bitflags = cfg.properties.cleanup_policy_bitflags;
return (*bitflags & model::cleanup_policy_bitflags::deletion)
== model::cleanup_policy_bitflags::deletion;
return model::is_deletion_enabled(*bitflags);
};
const auto is_nodelete_topic = [](const delete_records_topic& t) {
const auto& nodelete_topics
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/tests/consumer_groups_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ FIXTURE_TEST(conditional_retention_test, consumer_offsets_fixture) {
storage::ntp_config::default_overrides ov;
ov.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion
| model::cleanup_policy_bitflags::compaction;
log->update_configuration(ov).get();
log->set_overrides(ov);
log->notify_compaction_update();
log->flush().get();
log->force_roll(ss::default_priority_class()).get();
for (auto retention_enabled : {false, true}) {
Expand Down
Loading
Loading