Skip to content

Commit

Permalink
resources: add disk space manager
Browse files Browse the repository at this point in the history
Introduces a new disk space manager control loop that has access to both
the cloud storage cache as well as log storage for the purposes of
managing free space.

This commit only introduces the control loop and prints storage
statistics collected from the subsystem using our new APIs at debug
level.

Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
  • Loading branch information
dotnwat committed Jun 4, 2023
1 parent 61316cb commit ed387c6
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 15 deletions.
7 changes: 7 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,13 @@ configuration::configuration()
"the data directory. Redpanda will refuse to start if it is not found.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
false)
, enable_storage_space_manager(
*this,
"enable_storage_space_manager",
"Enable the storage space manager that coordinates and control space "
"usage between log data and the cloud storage cache.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
true)
, memory_abort_on_alloc_failure(
*this,
"memory_abort_on_alloc_failure",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ struct configuration final : public config_store {
bounded_property<size_t> storage_space_alert_free_threshold_bytes;
bounded_property<size_t> storage_min_free_bytes;
property<bool> storage_strict_data_init;
property<bool> enable_storage_space_manager;

// memory related settings
property<bool> memory_abort_on_alloc_failure;
Expand Down
1 change: 1 addition & 0 deletions src/v/redpanda/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ v_cc_library(
v::pandaproxy_rest
v::pandaproxy_schema_registry
v::migrations
v::storage_resource_mgmt
)

add_subdirectory(tests)
Expand Down
9 changes: 9 additions & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,13 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
.get();
}

construct_single_service(
space_manager,
config::shard_local_cfg().enable_storage_space_manager.bind(),
&storage,
&shadow_index_cache,
&partition_manager);

// group membership
syschecks::systemd_message("Creating kafka group manager").get();
construct_service(
Expand Down Expand Up @@ -2240,6 +2247,8 @@ void application::start_runtime_services(
for (const auto& m : _migrators) {
m->start(controller->get_abort_source().local());
}

space_manager->start().get();
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/v/redpanda/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class application {
ss::sharded<storage::api> storage;
ss::sharded<storage::node_api> storage_node;
ss::sharded<cluster::node::local_monitor> local_monitor;
std::unique_ptr<storage::disk_space_manager> space_manager;

std::unique_ptr<cluster::controller> controller;
std::unique_ptr<coproc::api> coprocessing;
Expand Down
7 changes: 7 additions & 0 deletions src/v/resource_mgmt/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ v_cc_library(
NAME resource_mgmt
SRCS
available_memory.cc
DEPS
Seastar::seastar
)

v_cc_library(
NAME storage_resource_mgmt
SRCS
storage.cc
DEPS
Seastar::seastar
Expand Down
130 changes: 117 additions & 13 deletions src/v/resource_mgmt/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,125 @@

#include "storage.h"

#include "cluster/partition_manager.h"
#include "vlog.h"

#include <seastar/util/log.hh>

static ss::logger rlog("resource_mgmt");

namespace storage {

std::ostream& operator<<(std::ostream& o, const disk_space_alert d) {
switch (d) {
case disk_space_alert::ok:
o << "ok";
break;
case disk_space_alert::low_space:
o << "low_space";
break;
case disk_space_alert::degraded:
o << "degraded";
break;
disk_space_manager::disk_space_manager(
config::binding<bool> enabled,
ss::sharded<storage::api>* storage,
ss::sharded<cloud_storage::cache>* cache,
ss::sharded<cluster::partition_manager>* pm)
: _enabled(std::move(enabled))
, _storage(storage)
, _cache(cache->local_is_initialized() ? cache : nullptr)
, _pm(pm) {
_enabled.watch([this] {
vlog(
rlog.info,
"{} disk space manager control loop",
_enabled() ? "Enabling" : "Disabling");
_control_sem.signal();
});
}

ss::future<> disk_space_manager::start() {
vlog(
rlog.info,
"Starting disk space manager service ({})",
_enabled() ? "enabled" : "disabled");
ssx::spawn_with_gate(_gate, [this] { return run_loop(); });
co_return;
}

ss::future<> disk_space_manager::stop() {
vlog(rlog.info, "Stopping disk space manager service");
_control_sem.broken();
co_await _gate.close();
}

ss::future<> disk_space_manager::run_loop() {
/*
* we want the code here to actually run a little, but the final shape of
* configuration options is not yet known.
*/
constexpr auto frequency = std::chrono::seconds(30);

while (true) {
try {
if (_enabled()) {
co_await _control_sem.wait(
frequency, std::max(_control_sem.current(), size_t(1)));
} else {
co_await _control_sem.wait();
}
} catch (const ss::semaphore_timed_out&) {
// time for some controlling
}

if (!_enabled()) {
continue;
}

/*
* Collect cache and logs storage usage information. These accumulate
* across all shards (despite the local() accessor). If a failure occurs
* we wait rather than operate with a reduced set of information.
*/
cloud_storage::cache_usage_target cache_usage_target;
try {
cache_usage_target
= co_await _pm->local().get_cloud_cache_disk_usage_target();
} catch (...) {
vlog(
rlog.info,
"Unable to collect cloud cache usage: {}",
std::current_exception());
continue;
}

storage::usage_report logs_usage;
try {
logs_usage = co_await _storage->local().disk_usage();
} catch (...) {
vlog(
rlog.info,
"Unable to collect log storage usage: {}",
std::current_exception());
continue;
}

vlog(
rlog.debug,
"Cloud storage cache target minimum size {} nice to have {}",
cache_usage_target.target_min_bytes,
cache_usage_target.target_bytes);

vlog(
rlog.debug,
"Log storage usage total {} - data {} index {} compaction {}",
logs_usage.usage.total(),
logs_usage.usage.data,
logs_usage.usage.index,
logs_usage.usage.compaction);

vlog(
rlog.debug,
"Log storage usage available for reclaim local {} total {}",
logs_usage.reclaim.retention,
logs_usage.reclaim.available);

vlog(
rlog.debug,
"Log storage usage target minimum size {} nice to have {}",
logs_usage.target.min_capacity,
logs_usage.target.min_capacity_wanted);
}
return o;
}

} // namespace storage
} // namespace storage
64 changes: 62 additions & 2 deletions src/v/resource_mgmt/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,76 @@

#pragma once

#include "config/property.h"
#include "seastarx.h"
#include "ssx/semaphore.h"

#include <seastar/core/sharded.hh>

#include <iostream>

namespace cloud_storage {
class cache;
}

namespace cluster {
class partition_manager;
}

namespace storage {

class api;

enum class disk_space_alert { ok = 0, low_space = 1, degraded = 2 };

inline disk_space_alert max_severity(disk_space_alert a, disk_space_alert b) {
return std::max(a, b);
}

std::ostream& operator<<(std::ostream& o, const storage::disk_space_alert d);
inline std::ostream& operator<<(std::ostream& o, const disk_space_alert d) {
switch (d) {
case disk_space_alert::ok:
o << "ok";
break;
case disk_space_alert::low_space:
o << "low_space";
break;
case disk_space_alert::degraded:
o << "degraded";
break;
}
return o;
}

/*
*
*/
class disk_space_manager {
public:
disk_space_manager(
config::binding<bool> enabled,
ss::sharded<storage::api>* storage,
ss::sharded<cloud_storage::cache>* cache,
ss::sharded<cluster::partition_manager>* pm);

disk_space_manager(disk_space_manager&&) noexcept = delete;
disk_space_manager& operator=(disk_space_manager&&) noexcept = delete;
disk_space_manager(const disk_space_manager&) = delete;
disk_space_manager& operator=(const disk_space_manager&) = delete;
~disk_space_manager() = default;

ss::future<> start();
ss::future<> stop();

private:
config::binding<bool> _enabled;
ss::sharded<storage::api>* _storage;
ss::sharded<cloud_storage::cache>* _cache;
ss::sharded<cluster::partition_manager>* _pm;

ss::gate _gate;
ss::future<> run_loop();
ssx::semaphore _control_sem{0, "resource_mgmt::space_manager"};
};

} // namespace storage
} // namespace storage

0 comments on commit ed387c6

Please sign in to comment.