Skip to content

Commit

Permalink
streaming: Add finished percentage metrics for node ops using streaming
Browse files Browse the repository at this point in the history
We have added the finished percentage for repair based node operations.

This patch adds the finished percentage for node ops using the old
streaming.

Example output:

scylla_streaming_finished_percentage{ops="bootstrap",shard="0"} 1.000000
scylla_streaming_finished_percentage{ops="decommission",shard="0"} 1.000000
scylla_streaming_finished_percentage{ops="rebuild",shard="0"} 0.561945
scylla_streaming_finished_percentage{ops="removenode",shard="0"} 1.000000
scylla_streaming_finished_percentage{ops="repair",shard="0"} 1.000000
scylla_streaming_finished_percentage{ops="replace",shard="0"} 1.000000

In addition to the metrics, log shows the percentage is added.

[shard 0] range_streamer - Finished 2698 out of 2817 ranges for rebuild, finished percentage=0.95775646

Fixes #11600

Closes #11601
  • Loading branch information
asias authored and denesb committed Sep 22, 2022
1 parent 517c152 commit 9ed401c
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 0 deletions.
7 changes: 7 additions & 0 deletions dht/range_streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::effec

future<> range_streamer::stream_async() {
auto nr_ranges_remaining = nr_ranges_to_stream();
_nr_total_ranges = nr_ranges_remaining;
logger.info("{} starts, nr_ranges_remaining={}", _description, nr_ranges_remaining);
auto start = lowres_clock::now();
return do_for_each(_to_stream, [this, start, description = _description] (auto& stream) {
Expand Down Expand Up @@ -279,6 +280,12 @@ future<> range_streamer::stream_async() {
}
sp.execute().discard_result().get();
ranges_to_stream.clear();
// Update finished percentage
auto remaining = nr_ranges_to_stream();
float percentage = _nr_total_ranges == 0 ? 1 : (_nr_total_ranges - remaining) / (float)_nr_total_ranges;
_stream_manager.local().update_finished_percentage(_reason, percentage);
logger.info("Finished {} out of {} ranges for {}, finished percentage={}",
_nr_total_ranges - remaining, _nr_total_ranges, _reason, percentage);
};
try {
for (auto it = range_vec.begin(); it < range_vec.end();) {
Expand Down
1 change: 1 addition & 0 deletions dht/range_streamer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private:
unsigned _nr_rx_added = 0;
// Limit the number of nodes to stream in parallel to reduce memory pressure with large cluster.
seastar::semaphore _limiter{16};
size_t _nr_total_ranges = 0;
};

} // dht
30 changes: 30 additions & 0 deletions streaming/stream_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,38 @@ stream_manager::stream_manager(db::config& cfg,
(void)_io_throughput_updater.trigger_later();
}

_finished_percentage[streaming::stream_reason::bootstrap] = 1;
_finished_percentage[streaming::stream_reason::decommission] = 1;
_finished_percentage[streaming::stream_reason::removenode] = 1;
_finished_percentage[streaming::stream_reason::rebuild] = 1;
_finished_percentage[streaming::stream_reason::repair] = 1;
_finished_percentage[streaming::stream_reason::replace] = 1;

auto ops_label_type = sm::label("ops");
_metrics.add_group("streaming", {
sm::make_counter("total_incoming_bytes", [this] { return _total_incoming_bytes; },
sm::description("Total number of bytes received on this shard.")),

sm::make_counter("total_outgoing_bytes", [this] { return _total_outgoing_bytes; },
sm::description("Total number of bytes sent on this shard.")),

sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::bootstrap]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("bootstrap")}),

sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::decommission]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("decommission")}),

sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::removenode]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("removenode")}),

sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::rebuild]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("rebuild")}),

sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::repair]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("repair")}),

sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::replace]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("replace")}),
});
}

Expand Down Expand Up @@ -365,4 +391,8 @@ shared_ptr<stream_session> stream_manager::get_session(streaming::plan_id plan_i
return coordinator->get_or_create_session(*this, from);
}

void stream_manager::update_finished_percentage(streaming::stream_reason reason, float percentage) {
_finished_percentage[reason] = percentage;
}

} // namespace streaming
5 changes: 5 additions & 0 deletions streaming/stream_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once
#include "streaming/stream_fwd.hh"
#include "streaming/progress_info.hh"
#include "streaming/stream_reason.hh"
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/distributed.hh>
#include "utils/updateable_value.hh"
Expand Down Expand Up @@ -96,6 +97,7 @@ private:
uint64_t _total_outgoing_bytes{0};
semaphore _mutation_send_limiter{256};
seastar::metrics::metric_groups _metrics;
std::unordered_map<streaming::stream_reason, float> _finished_percentage;

utils::updateable_value<uint32_t> _io_throughput_mbs;
serialized_action _io_throughput_updater = serialized_action([this] { return update_io_throughput(_io_throughput_mbs()); });
Expand Down Expand Up @@ -184,6 +186,9 @@ private:
void init_messaging_service_handler();
future<> uninit_messaging_service_handler();
future<> update_io_throughput(uint32_t value_mbs);

public:
void update_finished_percentage(streaming::stream_reason reason, float percentage);
};

} // namespace streaming

0 comments on commit 9ed401c

Please sign in to comment.