diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 1aa0de5ba589..c3efddbf536a 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -1840,6 +1840,13 @@ configuration::configuration() "the data directory. Redpanda will refuse to start if it is not found.", {.needs_restart = needs_restart::no, .visibility = visibility::user}, false) + , enable_storage_space_manager( + *this, + "enable_storage_space_manager", + "Enable the storage space manager that coordinates and control space " + "usage between log data and the cloud storage cache.", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + true) , memory_abort_on_alloc_failure( *this, "memory_abort_on_alloc_failure", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index a52109c4ca5a..e64f113662c7 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -363,6 +363,7 @@ struct configuration final : public config_store { bounded_property storage_space_alert_free_threshold_bytes; bounded_property storage_min_free_bytes; property storage_strict_data_init; + property enable_storage_space_manager; // memory related settings property memory_abort_on_alloc_failure; diff --git a/src/v/redpanda/CMakeLists.txt b/src/v/redpanda/CMakeLists.txt index 62b6ec9a75f1..8661948b9b89 100644 --- a/src/v/redpanda/CMakeLists.txt +++ b/src/v/redpanda/CMakeLists.txt @@ -53,6 +53,7 @@ v_cc_library( v::pandaproxy_rest v::pandaproxy_schema_registry v::migrations + v::storage_resource_mgmt ) add_subdirectory(tests) diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 3e687b9f007d..609fb6e82794 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1351,6 +1351,13 @@ void application::wire_up_redpanda_services(model::node_id node_id) { .get(); } + construct_single_service( + space_manager, + config::shard_local_cfg().enable_storage_space_manager.bind(), + &storage, + &shadow_index_cache, + &partition_manager); + // group membership syschecks::systemd_message("Creating kafka group manager").get(); construct_service( @@ -2240,6 +2247,8 @@ void application::start_runtime_services( for (const auto& m : _migrators) { m->start(controller->get_abort_source().local()); } + + space_manager->start().get(); } /** diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 43b3060253bf..c3dfeea06b4b 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -138,6 +138,7 @@ class application { ss::sharded storage; ss::sharded storage_node; ss::sharded local_monitor; + std::unique_ptr space_manager; std::unique_ptr controller; std::unique_ptr coprocessing; diff --git a/src/v/resource_mgmt/CMakeLists.txt b/src/v/resource_mgmt/CMakeLists.txt index bc1ee64d0450..422f6d137e9e 100644 --- a/src/v/resource_mgmt/CMakeLists.txt +++ b/src/v/resource_mgmt/CMakeLists.txt @@ -2,6 +2,13 @@ v_cc_library( NAME resource_mgmt SRCS available_memory.cc + DEPS + Seastar::seastar + ) + +v_cc_library( + NAME storage_resource_mgmt + SRCS storage.cc DEPS Seastar::seastar diff --git a/src/v/resource_mgmt/storage.cc b/src/v/resource_mgmt/storage.cc index cb4366c28fb6..3f823e72d636 100644 --- a/src/v/resource_mgmt/storage.cc +++ b/src/v/resource_mgmt/storage.cc @@ -11,21 +11,125 @@ #include "storage.h" +#include "cluster/partition_manager.h" +#include "vlog.h" + +#include + +static ss::logger rlog("resource_mgmt"); + namespace storage { -std::ostream& operator<<(std::ostream& o, const disk_space_alert d) { - switch (d) { - case disk_space_alert::ok: - o << "ok"; - break; - case disk_space_alert::low_space: - o << "low_space"; - break; - case disk_space_alert::degraded: - o << "degraded"; - break; +disk_space_manager::disk_space_manager( + config::binding enabled, + ss::sharded* storage, + ss::sharded* cache, + ss::sharded* pm) + : _enabled(std::move(enabled)) + , _storage(storage) + , _cache(cache->local_is_initialized() ? cache : nullptr) + , _pm(pm) { + _enabled.watch([this] { + vlog( + rlog.info, + "{} disk space manager control loop", + _enabled() ? "Enabling" : "Disabling"); + _control_sem.signal(); + }); +} + +ss::future<> disk_space_manager::start() { + vlog( + rlog.info, + "Starting disk space manager service ({})", + _enabled() ? "enabled" : "disabled"); + ssx::spawn_with_gate(_gate, [this] { return run_loop(); }); + co_return; +} + +ss::future<> disk_space_manager::stop() { + vlog(rlog.info, "Stopping disk space manager service"); + _control_sem.broken(); + co_await _gate.close(); +} + +ss::future<> disk_space_manager::run_loop() { + /* + * we want the code here to actually run a little, but the final shape of + * configuration options is not yet known. + */ + constexpr auto frequency = std::chrono::seconds(30); + + while (true) { + try { + if (_enabled()) { + co_await _control_sem.wait( + frequency, std::max(_control_sem.current(), size_t(1))); + } else { + co_await _control_sem.wait(); + } + } catch (const ss::semaphore_timed_out&) { + // time for some controlling + } + + if (!_enabled()) { + continue; + } + + /* + * Collect cache and logs storage usage information. These accumulate + * across all shards (despite the local() accessor). If a failure occurs + * we wait rather than operate with a reduced set of information. + */ + cloud_storage::cache_usage_target cache_usage_target; + try { + cache_usage_target + = co_await _pm->local().get_cloud_cache_disk_usage_target(); + } catch (...) { + vlog( + rlog.info, + "Unable to collect cloud cache usage: {}", + std::current_exception()); + continue; + } + + storage::usage_report logs_usage; + try { + logs_usage = co_await _storage->local().disk_usage(); + } catch (...) { + vlog( + rlog.info, + "Unable to collect log storage usage: {}", + std::current_exception()); + continue; + } + + vlog( + rlog.debug, + "Cloud storage cache target minimum size {} nice to have {}", + cache_usage_target.target_min_bytes, + cache_usage_target.target_bytes); + + vlog( + rlog.debug, + "Log storage usage total {} - data {} index {} compaction {}", + logs_usage.usage.total(), + logs_usage.usage.data, + logs_usage.usage.index, + logs_usage.usage.compaction); + + vlog( + rlog.debug, + "Log storage usage available for reclaim local {} total {}", + logs_usage.reclaim.retention, + logs_usage.reclaim.available); + + vlog( + rlog.debug, + "Log storage usage target minimum size {} nice to have {}", + logs_usage.target.min_capacity, + logs_usage.target.min_capacity_wanted); } - return o; } -} // namespace storage \ No newline at end of file +} // namespace storage diff --git a/src/v/resource_mgmt/storage.h b/src/v/resource_mgmt/storage.h index 566673252ef3..0ebf3a8efe3e 100644 --- a/src/v/resource_mgmt/storage.h +++ b/src/v/resource_mgmt/storage.h @@ -11,16 +11,76 @@ #pragma once +#include "config/property.h" +#include "seastarx.h" +#include "ssx/semaphore.h" + +#include + #include +namespace cloud_storage { +class cache; +} + +namespace cluster { +class partition_manager; +} + namespace storage { +class api; + enum class disk_space_alert { ok = 0, low_space = 1, degraded = 2 }; inline disk_space_alert max_severity(disk_space_alert a, disk_space_alert b) { return std::max(a, b); } -std::ostream& operator<<(std::ostream& o, const storage::disk_space_alert d); +inline std::ostream& operator<<(std::ostream& o, const disk_space_alert d) { + switch (d) { + case disk_space_alert::ok: + o << "ok"; + break; + case disk_space_alert::low_space: + o << "low_space"; + break; + case disk_space_alert::degraded: + o << "degraded"; + break; + } + return o; +} + +/* + * + */ +class disk_space_manager { +public: + disk_space_manager( + config::binding enabled, + ss::sharded* storage, + ss::sharded* cache, + ss::sharded* pm); + + disk_space_manager(disk_space_manager&&) noexcept = delete; + disk_space_manager& operator=(disk_space_manager&&) noexcept = delete; + disk_space_manager(const disk_space_manager&) = delete; + disk_space_manager& operator=(const disk_space_manager&) = delete; + ~disk_space_manager() = default; + + ss::future<> start(); + ss::future<> stop(); + +private: + config::binding _enabled; + ss::sharded* _storage; + ss::sharded* _cache; + ss::sharded* _pm; + + ss::gate _gate; + ss::future<> run_loop(); + ssx::semaphore _control_sem{0, "resource_mgmt::space_manager"}; +}; -} // namespace storage \ No newline at end of file +} // namespace storage