Skip to content

Commit

Permalink
commitlog: Make segment allocation wait iff disk usage > max
Browse files Browse the repository at this point in the history
Instead of allowing new segments to be added, explicitly wait
for either disk delete or recycle to happen iff current disk
usage is larger than limit.
  • Loading branch information
Calle Wilund committed Jan 11, 2021
1 parent ab55a1b commit be8c359
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions db/commitlog/commitlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this<segment_
using request_controller_type = basic_semaphore<timeout_exception_factory, db::timeout_clock>;
using request_controller_units = semaphore_units<timeout_exception_factory, db::timeout_clock>;
request_controller_type _request_controller;
shared_promise<> _disk_deletions;

std::optional<shared_future<with_clock<db::timeout_clock>>> _segment_allocating;
std::unordered_map<sstring, descriptor> _files_to_delete;
Expand Down Expand Up @@ -372,7 +373,7 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this<segment_
segment_id_type _ids = 0;
std::vector<sseg_ptr> _segments;
queue<sseg_ptr> _reserve_segments;
std::deque<sstring> _recycled_segments;
queue<sstring> _recycled_segments;
std::unordered_map<flush_handler_id, flush_handler> _flush_handlers;
flush_handler_id _flush_ids = 0;
replay_position _flush_position;
Expand Down Expand Up @@ -1060,6 +1061,7 @@ db::commitlog::segment_manager::segment_manager(config c)
// always be admitted for processing.
, _request_controller(max_request_controller_units(), request_controller_timeout_exception_factory{})
, _reserve_segments(1)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
{
assert(max_size > 0);
Expand Down Expand Up @@ -1396,8 +1398,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
}

if (!_recycled_segments.empty()) {
auto src = std::move(_recycled_segments.front());
_recycled_segments.pop_front();
auto src = _recycled_segments.pop();
// Note: we have to do the rename here to ensure
// proper descriptor id order. If we renamed in the delete call
// that recycled the file we could potentially have
Expand All @@ -1408,6 +1409,14 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
});
}

if (max_disk_size != 0 && totals.total_size_on_disk >= max_disk_size) {
clogger.debug("Disk usage ({} MB) exceeds maximum ({} MB) - allocation will wait...", totals.total_size_on_disk/(1024*1024), max_disk_size/(1024*1024));
auto f = cfg.reuse_segments ? _recycled_segments.not_empty() : _disk_deletions.get_shared_future();
return f.then([this] {
return allocate_segment();
});
}

return allocate_segment_ex(std::move(d), std::move(dst), flags|open_flags::create);
}

Expand Down Expand Up @@ -1533,14 +1542,21 @@ future<> db::commitlog::segment_manager::clear_reserve_segments() {
_reserve_segments.pop();
}

auto re = std::exchange(_recycled_segments, {});
auto i = re.begin();
auto e = re.end();
std::vector<sstring> tmp;
tmp.reserve(_recycled_segments.size());

_recycled_segments.consume([&](sstring s) {
tmp.emplace_back(std::move(s));
return true;
});

auto i = tmp.begin();
auto e = tmp.end();

return parallel_for_each(i, e, [this](const sstring& filename) {
clogger.debug("Deleting recycled segment file {}", filename);
return delete_file(filename);
}).finally([this, re = std::move(re)] {
}).finally([this, tmp = std::move(tmp)] {
return do_pending_deletes();
});
}
Expand Down Expand Up @@ -1609,7 +1625,10 @@ future<> db::commitlog::segment_manager::delete_file(const sstring& filename) {
return seastar::file_size(filename).then([this, filename](uint64_t size) {
clogger.debug("Deleting segment file {}", filename);
return commit_io_check(&seastar::remove_file, filename).then([this, size] {
totals.total_size_on_disk -= size;
clogger.trace("Reclaimed {} MB", size/(1024*1024));
totals.total_size_on_disk -= size;
auto p = std::exchange(_disk_deletions, {});
p.set_value();
});
});
}
Expand All @@ -1629,16 +1648,17 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
return f.finally([&] {
// We allow reuse of the segment if the current disk size is less than shard max.
auto usage = totals.total_size_on_disk;
if (!_shutdown && cfg.reuse_segments && usage <= max_disk_size) {
if (!_shutdown && cfg.reuse_segments) {
descriptor d(next_id(), "Recycled-" + cfg.fname_prefix);
auto dst = this->filename(d);

clogger.debug("Recycling segment file {}", filename);
// must rename the file since we must ensure the
// data is not replayed. Changing the name will
// cause header ID to be invalid in the file -> ignored
return rename_file(filename, dst).then([this, dst] {
_recycled_segments.emplace_back(dst);
return rename_file(filename, dst).then([this, dst]() mutable {
auto b = _recycled_segments.push(std::move(dst));
assert(b); // we set this to max_size_t so...
return make_ready_future<>();
}).handle_exception([this, filename](auto&&) {
return delete_file(filename);
Expand Down

0 comments on commit be8c359

Please sign in to comment.