Skip to content

Commit

Permalink
Gather more cluster metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Feb 16, 2024
1 parent 9975e0e commit 3d240d1
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 15 deletions.
6 changes: 5 additions & 1 deletion quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::member::{
GRPC_ADVERTISE_ADDR_KEY, PIPELINE_METRICS_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY,
READINESS_VALUE_READY,
};
use crate::metrics::spawn_metrics_task;
use crate::ClusterNode;

const MARKED_FOR_DELETION_GRACE_PERIOD: usize = if cfg!(any(test, feature = "testsuite")) {
Expand Down Expand Up @@ -161,8 +162,11 @@ impl Cluster {
let chitchat = chitchat_handle.chitchat();
let live_nodes_stream = chitchat.lock().await.live_nodes_watcher();
let (ready_members_tx, ready_members_rx) = watch::channel(Vec::new());

spawn_ready_members_task(cluster_id.clone(), live_nodes_stream, ready_members_tx);

let weak_chitchat = Arc::downgrade(&chitchat);
spawn_metrics_task(weak_chitchat, self_node.chitchat_id());

let inner = InnerCluster {
cluster_id: cluster_id.clone(),
self_chitchat_id: self_node.chitchat_id(),
Expand Down
10 changes: 7 additions & 3 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,17 @@ impl Transport for CountingUdpTransport {
let socket = UdpSocket::open(listen_addr).await?;
Ok(Box::new(CountingUdpSocket {
socket,
gossip_recv: crate::metrics::CLUSTER_METRICS.gossip_recv_total.clone(),
gossip_recv: crate::metrics::CLUSTER_METRICS
.gossip_recv_messages_total
.clone(),
gossip_recv_bytes: crate::metrics::CLUSTER_METRICS
.gossip_recv_bytes_total
.clone(),
gossip_send: crate::metrics::CLUSTER_METRICS.gossip_send_total.clone(),
gossip_send: crate::metrics::CLUSTER_METRICS
.gossip_sent_messages_total
.clone(),
gossip_send_bytes: crate::metrics::CLUSTER_METRICS
.gossip_send_bytes_total
.gossip_sent_bytes_total
.clone(),
}))
}
Expand Down
15 changes: 14 additions & 1 deletion quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashSet;
use std::mem::size_of;
use std::net::SocketAddr;
use std::str::FromStr;

use anyhow::Context;
use chitchat::{ChitchatId, NodeState};
use chitchat::{ChitchatId, NodeState, Version};
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::types::NodeId;
use tracing::{error, warn};
Expand All @@ -46,6 +47,8 @@ pub(crate) trait NodeStateExt {
fn grpc_advertise_addr(&self) -> anyhow::Result<SocketAddr>;

fn is_ready(&self) -> bool;

fn size_bytes(&self) -> usize;
}

impl NodeStateExt for NodeState {
Expand All @@ -66,6 +69,16 @@ impl NodeStateExt for NodeState {
.map(|health_value| health_value == READINESS_VALUE_READY)
.unwrap_or(false)
}

// TODO: Expose more accurate size of the state in Chitchat.
fn size_bytes(&self) -> usize {
const SIZE_OF_VERSION: usize = size_of::<Version>();
const SIZE_OF_TOMBSTONE: usize = size_of::<u64>();

self.key_values()
.map(|(key, value)| key.len() + value.value.len() + SIZE_OF_VERSION + SIZE_OF_TOMBSTONE)
.sum()
}
}

/// Cluster member.
Expand Down
128 changes: 118 additions & 10 deletions quickwit/quickwit-cluster/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,71 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::net::SocketAddr;
use std::sync::Weak;
use std::time::Duration;

use chitchat::{Chitchat, ChitchatId};
use once_cell::sync::Lazy;
use quickwit_common::metrics::{new_counter, IntCounter};
use quickwit_common::metrics::{new_counter, new_gauge, IntCounter, IntGauge};
use tokio::sync::Mutex;

use crate::member::NodeStateExt;

pub struct ClusterMetrics {
pub gossip_recv_total: IntCounter,
pub live_nodes: IntGauge,
pub ready_nodes: IntGauge,
pub zombie_nodes: IntGauge,
pub dead_nodes: IntGauge,
pub cluster_state_size_bytes: IntGauge,
pub node_state_size_bytes: IntGauge,
pub node_state_keys: IntGauge,
pub gossip_recv_messages_total: IntCounter,
pub gossip_recv_bytes_total: IntCounter,
pub gossip_send_total: IntCounter,
pub gossip_send_bytes_total: IntCounter,
pub gossip_sent_messages_total: IntCounter,
pub gossip_sent_bytes_total: IntCounter,
}

impl Default for ClusterMetrics {
fn default() -> Self {
ClusterMetrics {
gossip_recv_total: new_counter(
"gossip_recv_total",
live_nodes: new_gauge(
"live_nodes",
"The number of live nodes observed locally.",
"cluster",
),
ready_nodes: new_gauge(
"ready_nodes",
"The number of ready nodes observed locally.",
"cluster",
),
zombie_nodes: new_gauge(
"zombie_nodes",
"The number of zombie nodes observed locally.",
"cluster",
),
dead_nodes: new_gauge(
"dead_nodes",
"The number of dead nodes observed locally.",
"cluster",
),
cluster_state_size_bytes: new_gauge(
"cluster_state_size_bytes",
"The size of the cluster state in bytes.",
"cluster",
),
node_state_keys: new_gauge(
"node_state_keys",
"The number of keys in the node state.",
"cluster",
),
node_state_size_bytes: new_gauge(
"node_state_size_bytes",
"The size of the node state in bytes.",
"cluster",
),
gossip_recv_messages_total: new_counter(
"gossip_recv_messages_total",
"Total number of gossip messages received.",
"cluster",
),
Expand All @@ -40,13 +90,13 @@ impl Default for ClusterMetrics {
"Total amount of gossip data received in bytes.",
"cluster",
),
gossip_send_total: new_counter(
"gossip_send_total",
gossip_sent_messages_total: new_counter(
"gossip_sent_messages_total",
"Total number of gossip messages sent.",
"cluster",
),
gossip_send_bytes_total: new_counter(
"gossip_send_bytes_total",
gossip_sent_bytes_total: new_counter(
"gossip_sent_bytes_total",
"Total amount of gossip data sent in bytes.",
"cluster",
),
Expand All @@ -55,3 +105,61 @@ impl Default for ClusterMetrics {
}

pub static CLUSTER_METRICS: Lazy<ClusterMetrics> = Lazy::new(ClusterMetrics::default);

pub(crate) fn spawn_metrics_task(
weak_chitchat: Weak<Mutex<Chitchat>>,
self_chitchat_id: ChitchatId,
) {
const METRICS_INTERVAL: Duration = Duration::from_secs(15);

const SIZE_OF_GENERATION_ID: usize = std::mem::size_of::<u64>();
const SIZE_OF_SOCKET_ADDR: usize = std::mem::size_of::<SocketAddr>();

let future = async move {
let mut interval = tokio::time::interval(METRICS_INTERVAL);

while let Some(chitchat) = weak_chitchat.upgrade() {
interval.tick().await;

let mut num_ready_nodes = 0;
let mut cluster_state_size_bytes = 0;

let chitchat_guard = chitchat.lock().await;

let num_live_nodes = chitchat_guard.live_nodes().count();
let num_zombie_nodes = chitchat_guard.scheduled_for_deletion_nodes().count();
let num_dead_nodes = chitchat_guard.dead_nodes().count();

for (chitchat_id, node_state) in chitchat_guard.node_states() {
if node_state.is_ready() {
num_ready_nodes += 1;
}
let chitchat_id_size_bytes =
chitchat_id.node_id.len() + SIZE_OF_GENERATION_ID + SIZE_OF_SOCKET_ADDR;
let node_state_size_bytes = node_state.size_bytes();

cluster_state_size_bytes += chitchat_id_size_bytes + node_state_size_bytes;

if *chitchat_id == self_chitchat_id {
CLUSTER_METRICS
.node_state_keys
.set(node_state.num_key_values() as i64);
CLUSTER_METRICS
.node_state_size_bytes
.set(node_state_size_bytes as i64);
}
}
drop(chitchat_guard);

CLUSTER_METRICS.live_nodes.set(num_live_nodes as i64);
CLUSTER_METRICS.ready_nodes.set(num_ready_nodes as i64);
CLUSTER_METRICS.zombie_nodes.set(num_zombie_nodes as i64);
CLUSTER_METRICS.dead_nodes.set(num_dead_nodes as i64);

CLUSTER_METRICS
.cluster_state_size_bytes
.set(cluster_state_size_bytes as i64);
}
};
tokio::spawn(future);
}

0 comments on commit 3d240d1

Please sign in to comment.