Skip to content

Commit

Permalink
metrics: Add domain packet queueing metrics
Browse files Browse the repository at this point in the history
Adds a new `readyset_domain.packets_queued` gauge metric to keep track
of how many packets are queued for each domain at a given time.

Change-Id: I310fd5df3bc787b743aa1c3da4ae7c8f9aa8ae4b
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6213
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
ethan-readyset committed Oct 24, 2023
1 parent d1739fe commit 8b81da2
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 11 deletions.
8 changes: 8 additions & 0 deletions readyset-client/src/metrics/mod.rs
Expand Up @@ -355,6 +355,14 @@ pub mod recorded {
/// | packet_type | The type of packet |
pub const DOMAIN_PACKET_SENT: &str = "readyset_domain.packet_sent";

/// Gauge: The number of dataflow packets queued for each domain.
///
/// | Tag | Description |
/// | domain | The index of the domain. |
/// | shard | The shard of the base table the lookup is requested in. |
/// | packet_type | The type of packet |
pub const DOMAIN_PACKETS_QUEUED: &str = "readyset_domain.packets_queued";

/// Histogram: The time a snapshot takes to be performed.
pub const REPLICATOR_SNAPSHOT_DURATION: &str = "readyset_replicator.snapshot_duration_us";

Expand Down
46 changes: 40 additions & 6 deletions readyset-dataflow/src/domain/channel/mod.rs
Expand Up @@ -11,14 +11,17 @@ use std::task::{Context, Poll};

use async_bincode::{AsyncBincodeWriter, AsyncDestination};
use futures_util::sink::{Sink, SinkExt};
use metrics::{register_gauge, Gauge, SharedString};
use readyset_client::internal::ReplicaAddress;
use readyset_client::metrics::recorded;
use readyset_client::{CONNECTION_FROM_BASE, CONNECTION_FROM_DOMAIN};
use readyset_errors::{ReadySetError, ReadySetResult};
use strum::{EnumCount, IntoEnumIterator};
use tokio::io::BufWriter;
use tokio::sync::broadcast;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};

use crate::prelude::Packet;
use crate::{Packet, PacketDiscriminants};

pub mod tcp;

Expand All @@ -33,34 +36,65 @@ const COORDINATOR_CHANGE_CHANNEL_BUFFER_SIZE: usize = 64;

/// Constructs a [`DomainSender`]/[`DomainReceiver`] channel that can be used to send [`Packet`]s to
/// a domain who lives in the same process as the sender.
pub fn channel() -> (DomainSender, DomainReceiver) {
pub(crate) fn domain_channel(replica_address: ReplicaAddress) -> (DomainSender, DomainReceiver) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

(DomainSender { tx }, DomainReceiver { rx })
let index: SharedString = replica_address.domain_index.index().to_string().into();
let shard: SharedString = replica_address.shard.to_string().into();
let packets_queued: [Gauge; PacketDiscriminants::COUNT] = PacketDiscriminants::iter()
.map(|d| {
let name: &'static str = d.into();
register_gauge!(recorded::DOMAIN_PACKETS_QUEUED,
"domain" => index.clone(),
"shard" => shard.clone(),
"packet_type" => name,
)
})
.collect::<Vec<Gauge>>()
.try_into()
.ok()
.unwrap();

(
DomainSender {
tx,
packets_queued: packets_queued.clone(),
},
DomainReceiver { rx, packets_queued },
)
}

/// A wrapper around a [`tokio::sync::mpsc::UnboundedSender`] to be used for sending messages to
/// domains who live in the same process as the sender.
#[derive(Clone)]
pub struct DomainSender {
tx: UnboundedSender<Packet>,
packets_queued: [Gauge; PacketDiscriminants::COUNT],
}

impl DomainSender {
pub fn send(&self, packet: Packet) -> Result<(), mpsc::error::SendError<Packet>> {
self.tx.send(packet)
let discriminant: PacketDiscriminants = (&packet).into();

self.tx.send(packet).map(|()| {
self.packets_queued[discriminant as usize].increment(1.0);
})
}
}

/// A wrapper around a [`tokio::sync::mpsc::UnboundedReceiver`] to be used for sending messages to
/// domains who live in the same process as the sender.
pub struct DomainReceiver {
rx: UnboundedReceiver<Packet>,
packets_queued: [Gauge; PacketDiscriminants::COUNT],
}

impl DomainReceiver {
pub async fn recv(&mut self) -> Option<Packet> {
self.rx.recv().await
self.rx.recv().await.map(|packet| {
let discriminant: PacketDiscriminants = (&packet).into();
self.packets_queued[discriminant as usize].decrement(1.0);
packet
})
}
}

Expand Down
6 changes: 5 additions & 1 deletion readyset-dataflow/src/domain/mod.rs
Expand Up @@ -44,7 +44,7 @@ use vec1::Vec1;

pub(crate) use self::replay_paths::ReplayPath;
use self::replay_paths::{Destination, ReplayPathSpec, ReplayPaths, Target};
use crate::domain::channel::ChannelCoordinator;
use crate::domain::channel::{ChannelCoordinator, DomainReceiver, DomainSender};
use crate::node::special::EgressTx;
use crate::node::{NodeProcessingResult, ProcessEnv};
use crate::payload::{
Expand Down Expand Up @@ -4505,4 +4505,8 @@ impl Domain {

Ok(())
}

pub fn channel(&self) -> (DomainSender, DomainReceiver) {
channel::domain_channel(self.address())
}
}
4 changes: 1 addition & 3 deletions readyset-dataflow/src/lib.rs
Expand Up @@ -63,9 +63,7 @@ pub use dataflow_state::{
BaseTableState, DurabilityMode, MaterializedNodeState, PersistenceParameters, PersistentState,
};

pub use crate::domain::channel::{
channel as domain_channel, ChannelCoordinator, DomainReceiver, DomainSender, DualTcpStream,
};
pub use crate::domain::channel::{ChannelCoordinator, DomainReceiver, DomainSender, DualTcpStream};
pub use crate::domain::{Domain, DomainBuilder, DomainIndex};
pub use crate::node_map::NodeMap;
pub use crate::payload::{DomainRequest, Packet, PacketDiscriminants};
Expand Down
2 changes: 1 addition & 1 deletion readyset-server/src/worker/mod.rs
Expand Up @@ -336,7 +336,7 @@ impl Worker {

// this channel is used for in-process domain traffic, to avoid going through the
// network stack unnecessarily
let (local_tx, local_rx) = dataflow::domain_channel();
let (local_tx, local_rx) = domain.channel();
// this channel is used for domain requests; it has a buffer size of 1 to prevent
// flooding a domain with requests
let (req_tx, req_rx) = tokio::sync::mpsc::channel(1);
Expand Down

0 comments on commit 8b81da2

Please sign in to comment.