diff --git a/Cargo.lock b/Cargo.lock index 48865bbd21..1bc20685c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -169,15 +169,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "bincode" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" -dependencies = [ - "serde", -] - [[package]] name = "bit-set" version = "0.5.2" @@ -972,7 +963,7 @@ dependencies = [ "cfg-if 1.0.0", "js-sys", "libc", - "wasi 0.10.0+wasi-snapshot-preview1", + "wasi", "wasm-bindgen", ] @@ -1543,20 +1534,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "mio" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ba42135c6a5917b9db9cd7b293e5409e1c6b041e6f9825e92e55a894c63b6f8" -dependencies = [ - "libc", - "log", - "miow", - "ntapi", - "wasi 0.11.0+wasi-snapshot-preview1", - "winapi 0.3.9", -] - [[package]] name = "miow" version = "0.3.7" @@ -2266,7 +2243,7 @@ dependencies = [ "itertools", "quickwit-common", "quickwit-proto", - "quickwit-swim", + "scuttlebutt", "serde", "serde_json", "tempfile", @@ -2520,6 +2497,7 @@ dependencies = [ "assert-json-diff", "async-trait", "bytes", + "chrono", "futures", "futures-util", "hyper", @@ -2539,6 +2517,7 @@ dependencies = [ "quickwit-search", "quickwit-storage", "quickwit-telemetry", + "scuttlebutt", "serde", "serde_json", "serde_qs", @@ -2582,23 +2561,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "quickwit-swim" -version = "0.2.1" -dependencies = [ - "bincode", - "flume", - "futures", - "mio 0.8.1", - "rand", - "serde", - "serde_json", - "thiserror", - "tokio", - "tracing", - "uuid 0.8.2", -] - [[package]] name = "quickwit-telemetry" version = "0.2.1" @@ -3104,6 +3066,20 @@ dependencies = [ "untrusted", ] +[[package]] +name = "scuttlebutt" +version = "0.1.0" +source = "git+https://github.com/quickwit-oss/scuttlebutt?rev=56e2e8f#56e2e8ff6dec6e26f4ab4fc31e20adda72cd2594" +dependencies = [ + "anyhow", + "bytes", + "rand", + "serde", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "security-framework" version = "2.4.2" @@ -3637,7 +3613,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", - "wasi 0.10.0+wasi-snapshot-preview1", + "wasi", "winapi 0.3.9", ] @@ -3713,7 +3689,7 @@ dependencies = [ "bytes", "libc", "memchr", - "mio 0.7.14", + "mio", "num_cpus", "once_cell", "parking_lot", @@ -4256,12 +4232,6 @@ version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" -[[package]] -name = "wasi" -version = "0.11.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" - [[package]] name = "wasm-bindgen" version = "0.2.78" diff --git a/Cargo.toml b/Cargo.toml index e41d6cc181..546c04f4f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ members = [ "quickwit-search", "quickwit-serve", "quickwit-storage", - "quickwit-swim", "quickwit-telemetry", "quickwit-indexing", ] diff --git a/quickwit-cluster/Cargo.toml b/quickwit-cluster/Cargo.toml index e397faff2a..e7b876a4d2 100644 --- a/quickwit-cluster/Cargo.toml +++ b/quickwit-cluster/Cargo.toml @@ -14,7 +14,6 @@ anyhow = "1.0" async-trait = "0.1" flume = "0.10" quickwit-common = { version = "0.2.1", path = "../quickwit-common"} -quickwit-swim = { version = "0.2.1", path = "../quickwit-swim" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" @@ -22,8 +21,8 @@ tokio = { version = "1.16", features = [ "full" ] } tokio-stream = { version = "0.1", features = [ "sync" ] } tracing = "0.1.29" uuid = "0.8" - quickwit-proto = { version = "0.2.1", path = "../quickwit-proto"} +scuttlebutt = { git = "https://github.com/quickwit-oss/scuttlebutt", version = "0.1.0", rev="56e2e8f" } [dev-dependencies] itertools = '0.10' diff --git a/quickwit-cluster/src/cluster.rs b/quickwit-cluster/src/cluster.rs index 5021fe2bc3..404ad86ba6 100644 --- a/quickwit-cluster/src/cluster.rs +++ b/quickwit-cluster/src/cluster.rs @@ -22,44 +22,83 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use quickwit_swim::prelude::{ - ArtilleryError, ArtilleryMember, ArtilleryMemberEvent, ArtilleryMemberState, - Cluster as ArtilleryCluster, ClusterConfig as ArtilleryClusterConfig, -}; +use scuttlebutt::server::ScuttleServer; +use scuttlebutt::{FailureDetectorConfig, NodeId}; use tokio::sync::watch; use tokio::time::timeout; use tokio_stream::wrappers::WatchStream; use tokio_stream::StreamExt; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; use uuid::Uuid; -use crate::error::{ClusterError, ClusterResult}; - -/// The ID that makes the cluster unique. -const CLUSTER_ID: &str = "quickwit-cluster"; - -const CLUSTER_EVENT_TIMEOUT: Duration = Duration::from_millis(200); +use crate::error::ClusterResult; /// A member information. #[derive(Clone, Debug, PartialEq)] pub struct Member { /// An ID that makes a member unique. - pub node_id: String, - - /// Listen address. - pub listen_addr: SocketAddr, - + pub node_unique_id: String, + /// timestamp (ms) when node starts. + pub generation: i64, + /// advertised UdpServerSocket + pub gossip_public_address: SocketAddr, /// If true, it means self. pub is_self: bool, } +impl Member { + pub fn new(node_unique_id: String, generation: i64, gossip_public_address: SocketAddr) -> Self { + Self { + node_unique_id, + gossip_public_address, + generation, + is_self: true, + } + } + + pub fn internal_id(&self) -> String { + format!("{}/{}", self.node_unique_id, self.generation) + } +} + +impl TryFrom for Member { + type Error = anyhow::Error; + + fn try_from(node_id: NodeId) -> Result { + let (node_unique_id_str, generation_str) = node_id.id.split_once('/').ok_or_else(|| { + anyhow::anyhow!( + "Could not create a Member instance from NodeId `{}`", + node_id.id + ) + })?; + + let gossip_public_address: SocketAddr = node_id.gossip_public_address.parse()?; + Ok(Self { + node_unique_id: node_unique_id_str.to_string(), + generation: generation_str.parse()?, + gossip_public_address, + is_self: false, + }) + } +} + +impl From for NodeId { + fn from(member: Member) -> Self { + Self::new( + member.internal_id(), + member.gossip_public_address.to_string(), + ) + } +} + /// This is an implementation of a cluster using the SWIM protocol. pub struct Cluster { + pub node_id: String, /// A socket address that represents itself. pub listen_addr: SocketAddr, /// The actual cluster that implement the SWIM protocol. - artillery_cluster: ArtilleryCluster, + scuttlebutt_server: ScuttleServer, /// A receiver(channel) for exchanging members in a cluster. members: watch::Receiver>, @@ -75,84 +114,62 @@ impl Cluster { /// Create a cluster given a host key and a listen address. /// When a cluster is created, the thread that monitors cluster events /// will be started at the same time. - pub fn new(node_id: String, listen_addr: SocketAddr) -> ClusterResult { - info!( node_id=?node_id, listen_addr=?listen_addr, "Create new cluster."); - let config = ArtilleryClusterConfig { - cluster_key: CLUSTER_ID.as_bytes().to_vec(), - listen_addr, - ..Default::default() - }; - let (artillery_cluster, swim_event_rx) = - ArtilleryCluster::create_and_start(node_id.clone(), config).map_err( - |err| match err { - ArtilleryError::Io(io_err) => ClusterError::UDPPortBindingError { - port: listen_addr.port(), - message: io_err.to_string(), - }, - _ => ClusterError::CreateClusterError { - message: err.to_string(), - }, - }, - )?; + pub fn new( + me: Member, + listen_addr: SocketAddr, + seed_nodes: &[String], + failure_detector_config: FailureDetectorConfig, + ) -> ClusterResult { + info!(member=?me, listen_addr=?listen_addr, "Create new cluster."); + let scuttlebutt_server = ScuttleServer::spawn( + NodeId::from(me.clone()), + seed_nodes, + listen_addr.to_string(), + failure_detector_config, + ); + let scuttlebutt = scuttlebutt_server.scuttlebutt(); let (members_sender, members_receiver) = watch::channel(Vec::new()); // Create cluster. let cluster = Cluster { + node_id: me.internal_id(), listen_addr, - artillery_cluster, + scuttlebutt_server, members: members_receiver, stop: Arc::new(AtomicBool::new(false)), }; // Add itself as the initial member of the cluster. - let member = Member { - node_id, - listen_addr, - is_self: true, - }; - let initial_members: Vec = vec![member]; + let initial_members: Vec = vec![me.clone()]; if members_sender.send(initial_members).is_err() { error!("Failed to add itself as the initial member of the cluster."); } // Prepare to start a task that will monitor cluster events. - let task_listen_addr = cluster.listen_addr; let task_stop = cluster.stop.clone(); + tokio::task::spawn(async move { + let mut node_change_receiver = scuttlebutt.lock().await.live_nodes_watcher(); + + while let Some(members_set) = node_change_receiver.next().await { + let mut members = members_set + .into_iter() + .map(Member::try_from) + .collect::, _>>()?; + members.push(me.clone()); + + if task_stop.load(Ordering::Relaxed) { + debug!("receive a stop signal"); + break; + } - // Start to monitor the cluster events. - tokio::task::spawn_blocking(move || { - loop { - match swim_event_rx.recv_timeout(CLUSTER_EVENT_TIMEOUT) { - Ok((artillery_members, artillery_member_event)) => { - log_artillery_event(artillery_member_event); - let updated_memberlist: Vec = artillery_members - .into_iter() - .filter(|member| match member.state() { - ArtilleryMemberState::Alive | ArtilleryMemberState::Suspect => true, - ArtilleryMemberState::Down | ArtilleryMemberState::Left => false, - }) - .map(|member| convert_member(member, task_listen_addr)) - .collect(); - debug!(updated_memberlist=?updated_memberlist); - if members_sender.send(updated_memberlist).is_err() { - // Somehow the cluster has been dropped. - error!("Failed to send a member list."); - break; - } - } - Err(flume::RecvTimeoutError::Disconnected) => { - debug!("channel disconnected"); - break; - } - Err(flume::RecvTimeoutError::Timeout) => { - if task_stop.load(Ordering::Relaxed) { - debug!("receive a stop signal"); - break; - } - } + if members_sender.send(members).is_err() { + // Somehow the cluster has been dropped. + error!("Failed to send a member list."); + break; } } + Result::<(), anyhow::Error>::Ok(()) }); Ok(cluster) @@ -168,18 +185,21 @@ impl Cluster { self.members.borrow().clone() } - /// Specify the address of a running node and join the cluster to which the node belongs. - pub async fn add_peer_node(&self, peer_addr: SocketAddr) { - if peer_addr != self.listen_addr { - info!(self_addr = ?self.listen_addr, peer_addr = ?peer_addr, "Adding peer node."); - self.artillery_cluster.add_seed_node(peer_addr); - } - } - /// Leave the cluster. pub async fn leave(&self) { info!(self_addr = ?self.listen_addr, "Leaving the cluster."); - self.artillery_cluster.leave_cluster(); + // TODO: implements leave/join on ScuttleButt + self.stop.store(true, Ordering::Relaxed); + } + + /// Leave the cluster. + pub async fn shutdown(self) { + info!(self_addr = ?self.listen_addr, "Shutting down the cluster."); + let result = self.scuttlebutt_server.shutdown().await; + if let Err(error) = result { + error!(self_addr = ?self.listen_addr, error = ?error, "Error while shuting down."); + } + self.stop.store(true, Ordering::Relaxed); } @@ -204,56 +224,35 @@ impl Cluster { } } -/// Convert the Artillery's member into Quickwit's one. -fn convert_member(member: ArtilleryMember, self_listen_addr: SocketAddr) -> Member { - let listen_addr = if let Some(addr) = member.remote_host() { - addr - } else { - self_listen_addr - }; - - Member { - node_id: member.node_id(), - listen_addr, - is_self: member.is_current(), - } -} - -/// Output member event as log. -fn log_artillery_event(artillery_member_event: ArtilleryMemberEvent) { - match artillery_member_event { - ArtilleryMemberEvent::Joined(artillery_member) => { - info!(node_id=?artillery_member.node_id(), remote_host=?artillery_member.remote_host(), "Joined."); - } - ArtilleryMemberEvent::WentUp(artillery_member) => { - info!(node_id=?artillery_member.node_id(), remote_host=?artillery_member.remote_host(), "Went up."); - } - ArtilleryMemberEvent::SuspectedDown(artillery_member) => { - warn!(node_id=?artillery_member.node_id(), remote_host=?artillery_member.remote_host(), "Suspected down."); - } - ArtilleryMemberEvent::WentDown(artillery_member) => { - error!(node_id=?artillery_member.node_id(), remote_host=?artillery_member.remote_host(), "Went down."); - } - ArtilleryMemberEvent::Left(artillery_member) => { - info!(node_id=?artillery_member.node_id(), remote_host=?artillery_member.remote_host(), "Left."); - } - ArtilleryMemberEvent::Payload(artillery_member, message) => { - info!(node_id=?artillery_member.node_id(), remote_host=?artillery_member.remote_host(), message=?message, "Payload."); - } - }; -} - -pub fn create_cluster_for_test_with_id(peer_uuid: String) -> anyhow::Result { +pub fn create_cluster_for_test_with_id( + peer_uuid: String, + seeds: &[String], +) -> anyhow::Result { let port = quickwit_common::net::find_available_port()?; let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); - let cluster = Cluster::new(peer_uuid, peer_addr)?; + let failure_detector_config = create_failure_detector_config_for_test(); + let cluster = Cluster::new( + Member::new(peer_uuid, 1, peer_addr), + peer_addr, + seeds, + failure_detector_config, + )?; Ok(cluster) } +/// Creates a failure detector config for tests. +fn create_failure_detector_config_for_test() -> FailureDetectorConfig { + FailureDetectorConfig { + phi_threshold: 6.0, + initial_interval: Duration::from_millis(400), + ..Default::default() + } +} + /// Creates a local cluster listening on a random port. -pub fn create_cluster_for_test() -> anyhow::Result { +pub fn create_cluster_for_test(seeds: &[String]) -> anyhow::Result { let peer_uuid = Uuid::new_v4().to_string(); - let cluster = create_cluster_for_test_with_id(peer_uuid)?; + let cluster = create_cluster_for_test_with_id(peer_uuid, seeds)?; Ok(cluster) } @@ -263,78 +262,46 @@ mod tests { use std::time::Duration; use itertools::Itertools; - use quickwit_swim::prelude::{ArtilleryMember, ArtilleryMemberState}; use tokio::time::sleep; use super::*; - use crate::cluster::{convert_member, Member}; - - #[tokio::test] - async fn test_cluster_convert_member() { - let node_id = Uuid::new_v4().to_string(); - let remote_host = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); - { - let artillery_member = - ArtilleryMember::new(node_id.clone(), remote_host, 0, ArtilleryMemberState::Alive); - - let member = convert_member(artillery_member, remote_host); - let expected_member = Member { - node_id: node_id.clone(), - listen_addr: remote_host, - is_self: false, - }; - assert_eq!(member, expected_member); - } - { - let artillery_member = ArtilleryMember::current(node_id.clone()); - let member = convert_member(artillery_member, remote_host); - let expected_member = Member { - node_id, - listen_addr: remote_host, - is_self: true, - }; - assert_eq!(member, expected_member); - } - } #[tokio::test] async fn test_cluster_single_node() -> anyhow::Result<()> { - let cluster = create_cluster_for_test()?; + let cluster = create_cluster_for_test(&[])?; let members: Vec = cluster .members() .iter() - .map(|member| member.listen_addr) + .map(|member| member.gossip_public_address) .collect(); let expected_members = vec![cluster.listen_addr]; assert_eq!(members, expected_members); - cluster.leave().await; + cluster.shutdown().await; Ok(()) } #[tokio::test] async fn test_cluster_multiple_nodes() -> anyhow::Result<()> { quickwit_common::setup_logging_for_tests(); - let cluster1 = create_cluster_for_test()?; - let cluster2 = create_cluster_for_test()?; - let cluster3 = create_cluster_for_test()?; + let cluster1 = create_cluster_for_test(&[])?; + let node_1 = cluster1.listen_addr.to_string(); + let cluster2 = create_cluster_for_test(&[node_1.clone()])?; + let cluster3 = create_cluster_for_test(&[node_1])?; - cluster2.add_peer_node(cluster1.listen_addr).await; - cluster3.add_peer_node(cluster1.listen_addr).await; - - let ten_secs = Duration::from_secs(10); + let wait_secs = Duration::from_secs(10); for cluster in [&cluster1, &cluster2, &cluster3] { cluster - .wait_for_members(|members| members.len() == 3, ten_secs) + .wait_for_members(|members| members.len() == 3, wait_secs) .await .unwrap(); } let members: Vec = cluster1 .members() .iter() - .map(|member| member.listen_addr) + .map(|member| member.gossip_public_address) .sorted() .collect(); let mut expected_members = vec![ @@ -345,15 +312,15 @@ mod tests { expected_members.sort(); assert_eq!(members, expected_members); - drop(cluster2); + cluster2.shutdown().await; cluster1 - .wait_for_members(|members| members.len() == 2, ten_secs) + .wait_for_members(|members| members.len() == 2, wait_secs) .await .unwrap(); - cluster3.leave().await; + cluster3.shutdown().await; cluster1 - .wait_for_members(|members| members.len() == 1, ten_secs) + .wait_for_members(|members| members.len() == 1, wait_secs) .await .unwrap(); Ok(()) @@ -362,23 +329,22 @@ mod tests { #[tokio::test] async fn test_cluster_rejoin_with_different_id_issue_1018() -> anyhow::Result<()> { quickwit_common::setup_logging_for_tests(); - let cluster1 = create_cluster_for_test_with_id("cluster1".to_string())?; - let cluster2 = create_cluster_for_test_with_id("cluster2".to_string())?; - - cluster2.add_peer_node(cluster1.listen_addr).await; + let cluster1 = create_cluster_for_test_with_id("cluster1".to_string(), &[])?; + let node_1 = cluster1.listen_addr.to_string(); + let cluster2 = create_cluster_for_test_with_id("cluster2".to_string(), &[node_1.clone()])?; - let ten_secs = Duration::from_secs(10); + let wait_secs = Duration::from_secs(10); for cluster in [&cluster1, &cluster2] { cluster - .wait_for_members(|members| members.len() == 2, ten_secs) + .wait_for_members(|members| members.len() == 2, wait_secs) .await .unwrap(); } let members: Vec = cluster1 .members() .iter() - .map(|member| member.listen_addr) + .map(|member| member.gossip_public_address) .sorted() .collect(); let mut expected_members = vec![cluster1.listen_addr, cluster2.listen_addr]; @@ -386,17 +352,20 @@ mod tests { assert_eq!(members, expected_members); let cluster2_listen_addr = cluster2.listen_addr; - cluster2.leave().await; - drop(cluster2); + cluster2.shutdown().await; cluster1 - .wait_for_members(|members| members.len() == 1, ten_secs) + .wait_for_members(|members| members.len() == 1, wait_secs) .await .unwrap(); sleep(Duration::from_secs(3)).await; - let cluster2 = Cluster::new("newid".to_string(), cluster2_listen_addr)?; - cluster2.add_peer_node(cluster1.listen_addr).await; + let cluster2 = Cluster::new( + Member::new("newid".to_string(), 1, cluster2_listen_addr), + cluster2_listen_addr, + &[node_1], + create_failure_detector_config_for_test(), + )?; for _ in 0..4_000 { if cluster1.members().len() > 2 { @@ -408,11 +377,11 @@ mod tests { assert!(!cluster1 .members() .iter() - .any(|member| (*member).node_id == "cluster2")); + .any(|member| (*member).node_unique_id == "cluster2")); assert!(!cluster2 .members() .iter() - .any(|member| (*member).node_id == "cluster2")); + .any(|member| (*member).node_unique_id == "cluster2")); Ok(()) } @@ -420,25 +389,24 @@ mod tests { #[tokio::test] async fn test_cluster_rejoin_with_different_id_3_nodes_issue_1018() -> anyhow::Result<()> { quickwit_common::setup_logging_for_tests(); - let cluster1 = create_cluster_for_test_with_id("cluster1".to_string())?; - let cluster2 = create_cluster_for_test_with_id("cluster2".to_string())?; - let cluster3 = create_cluster_for_test_with_id("cluster3".to_string())?; + let cluster1 = create_cluster_for_test_with_id("cluster1".to_string(), &[])?; + let node_1 = cluster1.listen_addr.to_string(); + let cluster2 = create_cluster_for_test_with_id("cluster2".to_string(), &[node_1.clone()])?; + let node_2 = cluster2.listen_addr.to_string(); + let cluster3 = create_cluster_for_test_with_id("cluster3".to_string(), &[node_2])?; - cluster2.add_peer_node(cluster1.listen_addr).await; - cluster3.add_peer_node(cluster2.listen_addr).await; - - let wait_period = Duration::from_secs(15); + let wait_secs = Duration::from_secs(15); for cluster in [&cluster1, &cluster2] { cluster - .wait_for_members(|members| members.len() == 3, wait_period) + .wait_for_members(|members| members.len() == 3, wait_secs) .await .unwrap(); } let members: Vec = cluster1 .members() .iter() - .map(|member| member.listen_addr) + .map(|member| member.gossip_public_address) .sorted() .collect(); let mut expected_members = vec![ @@ -451,48 +419,56 @@ mod tests { let cluster2_listen_addr = cluster2.listen_addr; let cluster3_listen_addr = cluster3.listen_addr; - drop(cluster2); - drop(cluster3); + cluster2.shutdown().await; + cluster3.shutdown().await; cluster1 - .wait_for_members(|members| members.len() == 1, wait_period) + .wait_for_members(|members| members.len() == 1, wait_secs) .await .unwrap(); sleep(Duration::from_secs(3)).await; - let cluster2 = Cluster::new("newid".to_string(), cluster2_listen_addr)?; - cluster2.add_peer_node(cluster1.listen_addr).await; - - let cluster3 = Cluster::new("newid2".to_string(), cluster3_listen_addr)?; - cluster3.add_peer_node(cluster2.listen_addr).await; + let cluster2 = Cluster::new( + Member::new("newid".to_string(), 1, cluster2_listen_addr), + cluster2_listen_addr, + &[node_1], + create_failure_detector_config_for_test(), + )?; + let node_2 = cluster2.listen_addr.to_string(); + + let cluster3 = Cluster::new( + Member::new("newid2".to_string(), 1, cluster3_listen_addr), + cluster3_listen_addr, + &[node_2], + create_failure_detector_config_for_test(), + )?; sleep(Duration::from_secs(10)).await; - assert!(!cluster1 .members() .iter() - .any(|member| (*member).node_id == "cluster2")); + .any(|member| (*member).node_unique_id == "cluster2")); assert!(!cluster2 .members() .iter() - .any(|member| (*member).node_id == "cluster2")); + .any(|member| (*member).node_unique_id == "cluster2")); assert!(!cluster3 .members() .iter() - .any(|member| (*member).node_id == "cluster2")); + .any(|member| (*member).node_unique_id == "cluster2")); assert!(!cluster1 .members() .iter() - .any(|member| (*member).node_id == "cluster3")); + .any(|member| (*member).node_unique_id == "cluster3")); assert!(!cluster2 .members() .iter() - .any(|member| (*member).node_id == "cluster3")); + .any(|member| (*member).node_unique_id == "cluster3")); assert!(!cluster2 .members() .iter() - .any(|member| (*member).node_id == "cluster3")); + .any(|member| (*member).node_unique_id == "cluster3")); Ok(()) } diff --git a/quickwit-cluster/src/service.rs b/quickwit-cluster/src/service.rs index 0b4ba5b6e2..5947e0219f 100644 --- a/quickwit-cluster/src/service.rs +++ b/quickwit-cluster/src/service.rs @@ -32,9 +32,10 @@ use crate::error::ClusterError; impl From for PMember { fn from(member: Member) -> Self { PMember { - id: member.node_id.to_string(), - listen_address: member.listen_addr.to_string(), + id: member.internal_id(), + listen_address: member.gossip_public_address.to_string(), is_self: member.is_self, + generation: member.generation, } } } @@ -106,8 +107,9 @@ mod tests { let is_self = true; let member = Member { - node_id: host_id.clone(), - listen_addr, + node_unique_id: host_id.clone(), + gossip_public_address: listen_addr, + generation: 1, is_self, }; println!("member={:?}", member); @@ -116,8 +118,9 @@ mod tests { println!("proto_member={:?}", proto_member); let expected = PMember { - id: host_id, + id: format!("{}/{}", host_id, 1), listen_address: listen_addr.to_string(), + generation: 1, is_self, }; println!("expected={:?}", expected); diff --git a/quickwit-config/src/config.rs b/quickwit-config/src/config.rs index ec2be24422..35c3ff8e94 100644 --- a/quickwit-config/src/config.rs +++ b/quickwit-config/src/config.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::env; use std::ffi::OsStr; use std::net::SocketAddr; use std::path::{Path, PathBuf}; @@ -257,6 +258,17 @@ impl QuickwitConfig { get_socket_addr(&(self.listen_address.as_str(), self.rest_listen_port)) } + /// The node gossip_public_address should ideally be specified via config; + /// environment variable interpolation if necessary. + /// right now we just try to fetch from environment variable otherwise fallback + /// to listen_address. + pub fn gossip_public_addr(&self) -> anyhow::Result { + match env::var("QW_GOSSIP_PUBLIC_ADDRESS") { + Ok(addr) => Ok(addr.parse::()?), + Err(_) => self.gossip_socket_addr(), + } + } + pub fn seed_socket_addrs(&self) -> anyhow::Result> { // If no port is given, we assume a seed is using the same port as ourself. let default_gossip_port = self.gossip_socket_addr()?.port(); diff --git a/quickwit-proto/proto/cluster.proto b/quickwit-proto/proto/cluster.proto index d88d91b3ef..88eacf89b1 100644 --- a/quickwit-proto/proto/cluster.proto +++ b/quickwit-proto/proto/cluster.proto @@ -42,6 +42,9 @@ message Member { /// If true, it means self. bool is_self = 3; + + /// member reincarnation + int64 generation = 4; } message ListMembersRequest { diff --git a/quickwit-proto/src/cluster.rs b/quickwit-proto/src/cluster.rs index 92d2959f23..3864cabc5b 100644 --- a/quickwit-proto/src/cluster.rs +++ b/quickwit-proto/src/cluster.rs @@ -13,6 +13,9 @@ pub struct Member { //// If true, it means self. #[prost(bool, tag = "3")] pub is_self: bool, + //// member reincarnation + #[prost(int64, tag = "4")] + pub generation: i64, } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/quickwit-search/src/lib.rs b/quickwit-search/src/lib.rs index a6ade2260b..816c90b57c 100644 --- a/quickwit-search/src/lib.rs +++ b/quickwit-search/src/lib.rs @@ -67,10 +67,13 @@ pub use crate::search_stream::root_search_stream; pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl}; use crate::thread_pool::run_cpu_intensive; -/// Compute the gRPC port from the SWIM port. -/// Add 1 to the SWIM port to get the gRPC port. -pub fn swim_addr_to_grpc_addr(swim_addr: SocketAddr) -> SocketAddr { - SocketAddr::new(swim_addr.ip(), swim_addr.port() + 1) +/// Compute the gRPC port from the Scuttlebutt gossip port. +/// Add 1 to the Scuttlebutt gossip port to get the gRPC port. +pub fn scuttlebutt_gossip_addr_to_grpc_addr(scuttlebutt_gossip_addr: SocketAddr) -> SocketAddr { + SocketAddr::new( + scuttlebutt_gossip_addr.ip(), + scuttlebutt_gossip_addr.port() + 1, + ) } /// GlobalDocAddress serves as a hit address. diff --git a/quickwit-search/src/search_client_pool.rs b/quickwit-search/src/search_client_pool.rs index b8d1ea0832..1562e8dd1f 100644 --- a/quickwit-search/src/search_client_pool.rs +++ b/quickwit-search/src/search_client_pool.rs @@ -32,7 +32,7 @@ use tonic::transport::Endpoint; use tracing::*; use crate::rendezvous_hasher::sort_by_rendez_vous_hash; -use crate::{swim_addr_to_grpc_addr, SearchServiceClient}; +use crate::{scuttlebutt_gossip_addr_to_grpc_addr, SearchServiceClient}; /// Create a SearchServiceClient with SocketAddr as an argument. /// It will try to reconnect to the node automatically. @@ -85,7 +85,7 @@ async fn update_client_map( // Create a list of addresses to be removed. let members_addresses: HashSet = members .iter() - .map(|member| swim_addr_to_grpc_addr(member.listen_addr)) + .map(|member| scuttlebutt_gossip_addr_to_grpc_addr(member.gossip_public_address)) .collect(); let addrs_to_remove: Vec = new_clients .keys() @@ -103,7 +103,7 @@ async fn update_client_map( // Add clients to the client pool. for member in members { - let grpc_addr = swim_addr_to_grpc_addr(member.listen_addr); + let grpc_addr = scuttlebutt_gossip_addr_to_grpc_addr(member.gossip_public_address); if let Entry::Vacant(_entry) = new_clients.entry(grpc_addr) { match create_search_service_client(grpc_addr).await { Ok(client) => { @@ -316,25 +316,25 @@ mod tests { use super::create_search_service_client; use crate::root::SearchJob; - use crate::{swim_addr_to_grpc_addr, SearchClientPool}; + use crate::{scuttlebutt_gossip_addr_to_grpc_addr, SearchClientPool}; #[tokio::test] async fn test_search_client_pool_single_node() -> anyhow::Result<()> { - let cluster = Arc::new(create_cluster_for_test()?); + let cluster = Arc::new(create_cluster_for_test(&[])?); let client_pool = SearchClientPool::create_and_keep_updated(cluster.clone()).await; let clients = client_pool.clients(); let addrs: Vec = clients.into_keys().collect(); - let expected_addrs = vec![swim_addr_to_grpc_addr(cluster.listen_addr)]; + let expected_addrs = vec![scuttlebutt_gossip_addr_to_grpc_addr(cluster.listen_addr)]; assert_eq!(addrs, expected_addrs); Ok(()) } #[tokio::test] async fn test_search_client_pool_multiple_nodes() -> anyhow::Result<()> { - let cluster1 = Arc::new(create_cluster_for_test()?); - let cluster2 = Arc::new(create_cluster_for_test()?); + let cluster1 = Arc::new(create_cluster_for_test(&[])?); + let node_1 = cluster1.listen_addr.to_string(); + let cluster2 = Arc::new(create_cluster_for_test(&[node_1])?); - cluster2.add_peer_node(cluster1.listen_addr).await; cluster1 .wait_for_members(|members| members.len() == 2, Duration::from_secs(5)) .await?; @@ -344,8 +344,8 @@ mod tests { let addrs: Vec = clients.into_keys().sorted().collect(); let mut expected_addrs = vec![ - swim_addr_to_grpc_addr(cluster1.listen_addr), - swim_addr_to_grpc_addr(cluster2.listen_addr), + scuttlebutt_gossip_addr_to_grpc_addr(cluster1.listen_addr), + scuttlebutt_gossip_addr_to_grpc_addr(cluster2.listen_addr), ]; expected_addrs.sort(); assert_eq!(addrs, expected_addrs); @@ -354,7 +354,7 @@ mod tests { #[tokio::test] async fn test_search_client_pool_single_node_assign_jobs() -> anyhow::Result<()> { - let cluster = Arc::new(create_cluster_for_test()?); + let cluster = Arc::new(create_cluster_for_test(&[])?); let client_pool = SearchClientPool::create_and_keep_updated(cluster.clone()).await; let jobs = vec![ SearchJob::for_test("split1", 1), @@ -365,7 +365,8 @@ mod tests { let assigned_jobs = client_pool.assign_jobs(jobs, &HashSet::default())?; let expected_assigned_jobs = vec![( - create_search_service_client(swim_addr_to_grpc_addr(cluster.listen_addr)).await?, + create_search_service_client(scuttlebutt_gossip_addr_to_grpc_addr(cluster.listen_addr)) + .await?, vec![ SearchJob::for_test("split4", 4), SearchJob::for_test("split3", 3), diff --git a/quickwit-serve/Cargo.toml b/quickwit-serve/Cargo.toml index 97ecb16d28..ad32471c16 100644 --- a/quickwit-serve/Cargo.toml +++ b/quickwit-serve/Cargo.toml @@ -35,6 +35,8 @@ opentelemetry = "0.17" tracing-opentelemetry = "0.17" prometheus = "0.13" once_cell = '1' +chrono = "0.4" +scuttlebutt = { git = "https://github.com/quickwit-oss/scuttlebutt", version = "0.1.0", rev="56e2e8f" } [dev-dependencies] mockall = "0.11" diff --git a/quickwit-serve/src/lib.rs b/quickwit-serve/src/lib.rs index c67fb69e6c..48e43149dd 100644 --- a/quickwit-serve/src/lib.rs +++ b/quickwit-serve/src/lib.rs @@ -31,14 +31,16 @@ mod rest; use std::sync::Arc; +use chrono::Utc; use format::Format; -use quickwit_cluster::cluster::Cluster; +use quickwit_cluster::cluster::{Cluster, Member}; use quickwit_cluster::service::ClusterServiceImpl; use quickwit_config::{QuickwitConfig, SEARCHER_CONFIG_INSTANCE}; use quickwit_metastore::Metastore; use quickwit_search::{ClusterClient, SearchClientPool, SearchService, SearchServiceImpl}; use quickwit_storage::quickwit_storage_uri_resolver; -use tracing::{debug, info}; +use scuttlebutt::FailureDetectorConfig; +use tracing::info; pub use crate::args::ServeArgs; use crate::cluster_api::GrpcClusterAdapter; @@ -57,16 +59,24 @@ pub async fn run_searcher( SEARCHER_CONFIG_INSTANCE .set(quickwit_config.searcher_config.clone()) .expect("could not set searcher config in global once cell"); - let cluster = Arc::new(Cluster::new( + + let seed_nodes = quickwit_config + .seed_socket_addrs()? + .iter() + .map(|addr| addr.to_string()) + .collect::>(); + + let member = Member::new( quickwit_config.node_id.clone(), + Utc::now().timestamp(), + quickwit_config.gossip_public_addr()?, + ); + let cluster = Arc::new(Cluster::new( + member, quickwit_config.gossip_socket_addr()?, + &seed_nodes, + FailureDetectorConfig::default(), )?); - for seed_socket_addr in quickwit_config.seed_socket_addrs()? { - // If the peer seed address is specified, - // it joins the cluster in which that node participates. - debug!(peer_seed_addr = %seed_socket_addr, "Add peer seed node."); - cluster.add_peer_node(seed_socket_addr).await; - } let storage_uri_resolver = quickwit_storage_uri_resolver().clone(); let client_pool = SearchClientPool::create_and_keep_updated(cluster.clone()).await; let cluster_client = ClusterClient::new(client_pool.clone()); diff --git a/quickwit-swim/.gitignore b/quickwit-swim/.gitignore deleted file mode 100644 index 96ef6c0b94..0000000000 --- a/quickwit-swim/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/target -Cargo.lock diff --git a/quickwit-swim/Cargo.toml b/quickwit-swim/Cargo.toml deleted file mode 100644 index 3cd27dc9fc..0000000000 --- a/quickwit-swim/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "quickwit-swim" -version = "0.2.1" -authors = ['Quickwit, Inc. '] -edition = '2021' -description = "Fork of artillery-core SWIM implementation" -repository = "https://github.com/quickwit-oss/quickwit" -homepage = "https://quickwit.io/" -documentation = "https://quickwit.io/docs/" -license = "Apache-2.0/MIT" - -[dependencies] -thiserror = "1" -serde = { version = "1.0.114", features = ["derive"] } -serde_json = "1.0.56" -uuid = { version = "0.8.1", features = ["serde", "v4"] } -rand = "0.8" -mio = { version = "0.8", features = ["os-poll", "net"] } -futures = "0.3.5" -tracing = "0.1.29" -tokio = { version = "1.16", features = [ "full" ]} -flume = "0.10" - -[dev-dependencies] -bincode = "1.3.1" diff --git a/quickwit-swim/LICENSE-MIT b/quickwit-swim/LICENSE-MIT deleted file mode 100644 index 35680a394e..0000000000 --- a/quickwit-swim/LICENSE-MIT +++ /dev/null @@ -1,26 +0,0 @@ - -MIT License - -Copyright for portions of project quickwit-swim are held by Mahmut Bulut (2021) as part of project -`artillery-core`. -All other copyright for project quickwit-swim are held by Quickwit Inc, 2021. - -Copyright (c) Mahmut Bulut - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/quickwit-swim/README.md b/quickwit-swim/README.md deleted file mode 100644 index 1f0b944e41..0000000000 --- a/quickwit-swim/README.md +++ /dev/null @@ -1,7 +0,0 @@ -This crate is a currently a fork of artillery-core, itself part of bastion/artillery. - -On 2021 August 21th, a lagging dependency broke our build. -We only use a small portion of artillery core and cannot really afford pulling all of the dependencies, -so we decided to extract the project, before an eventual rewrite. - -Please prefer using bastion/artillery. diff --git a/quickwit-swim/src/cluster.rs b/quickwit-swim/src/cluster.rs deleted file mode 100644 index d2a48459b1..0000000000 --- a/quickwit-swim/src/cluster.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::convert::AsRef; -use std::net::SocketAddr; - -use flume; -use tracing::debug; - -use super::state::ArtilleryEpidemic; -use crate::cluster_config::ClusterConfig; -use crate::errors::*; -use crate::state::{ArtilleryClusterEvent, ArtilleryClusterRequest}; - -#[derive(Debug)] -pub struct Cluster { - comm: flume::Sender, -} - -impl Cluster { - pub fn create_and_start( - host_id: String, - config: ClusterConfig, - ) -> Result<(Cluster, flume::Receiver)> { - let (event_tx, event_rx) = flume::unbounded::(); - let (internal_tx, mut internal_rx) = flume::unbounded::(); - - let (poll, state) = ArtilleryEpidemic::new(host_id, config, event_tx, internal_tx.clone())?; - - debug!("Starting Artillery Cluster"); - tokio::task::spawn_blocking(move || { - ArtilleryEpidemic::event_loop(&mut internal_rx, poll, state) - .expect("Failed to create event loop"); - }); - - let cluster = Cluster { comm: internal_tx }; - Ok((cluster, event_rx)) - } - - pub fn add_seed_node(&self, addr: SocketAddr) { - let _ = self.comm.send(ArtilleryClusterRequest::AddSeed(addr)); - } - - pub fn send_payload>(&self, id: String, msg: T) { - self.comm - .send(ArtilleryClusterRequest::Payload( - id, - msg.as_ref().to_string(), - )) - .unwrap(); - } - - pub fn leave_cluster(&self) { - let _ = self.comm.send(ArtilleryClusterRequest::LeaveCluster); - } -} - -impl Drop for Cluster { - fn drop(&mut self) { - let _ = self.comm.send(ArtilleryClusterRequest::Exit); - } -} diff --git a/quickwit-swim/src/cluster_config.rs b/quickwit-swim/src/cluster_config.rs deleted file mode 100644 index 8e24e19555..0000000000 --- a/quickwit-swim/src/cluster_config.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::net::{SocketAddr, ToSocketAddrs}; -use std::time::Duration; - -// ARTIL = 27845 -/// Default Epidemic Port -pub const CONST_INFECTION_PORT: u16 = 27845; - -// Not sure MIO handles this correctly. -// Behave like this is the size. Normally 512 is enough. -/// Default UDP cast packet size -pub const CONST_PACKET_SIZE: usize = 1 << 16; - -#[derive(Debug, Clone)] -pub struct ClusterConfig { - pub cluster_key: Vec, - pub ping_interval: Duration, - pub network_mtu: usize, - pub ping_request_host_count: usize, - pub ping_timeout: Duration, - pub listen_addr: SocketAddr, -} - -impl Default for ClusterConfig { - fn default() -> Self { - let directed = SocketAddr::from(([127, 0, 0, 1], CONST_INFECTION_PORT)); - - ClusterConfig { - cluster_key: b"default".to_vec(), - ping_interval: Duration::from_secs(1), - network_mtu: CONST_PACKET_SIZE, - ping_request_host_count: 3, - ping_timeout: Duration::from_secs(3), - listen_addr: directed.to_socket_addrs().unwrap().next().unwrap(), - } - } -} diff --git a/quickwit-swim/src/errors.rs b/quickwit-swim/src/errors.rs deleted file mode 100644 index f7af58b362..0000000000 --- a/quickwit-swim/src/errors.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::{io, result}; - -use thiserror::Error; - -/// Result type for operations that could result in an `ArtilleryError` -pub type Result = result::Result; - -#[derive(Error, Debug)] -pub enum ArtilleryError { - // General Error Types - #[error("Artillery :: I/O error occurred: {}", _0)] - Io(#[from] io::Error), - #[error("Artillery :: Cluster Message Decode Error: {}", _0)] - ClusterMessageDecode(String), - #[error("Artillery :: Message Send Error: {}", _0)] - Send(String), - #[error("Artillery :: Message Receive Error: {}", _0)] - Receive(String), - #[error("Artillery :: Unexpected Error: {}", _0)] - Unexpected(String), - #[error("Artillery :: Decoding Error: {}", _0)] - Decoding(String), - #[error("Artillery :: Numeric Cast Error: {}", _0)] - NumericCast(String), -} - -impl From for ArtilleryError { - fn from(e: serde_json::error::Error) -> Self { - ArtilleryError::ClusterMessageDecode(e.to_string()) - } -} - -impl From> for ArtilleryError { - fn from(e: flume::SendError) -> Self { - ArtilleryError::Send(e.to_string()) - } -} - -impl From for ArtilleryError { - fn from(e: std::num::TryFromIntError) -> Self { - ArtilleryError::NumericCast(e.to_string()) - } -} diff --git a/quickwit-swim/src/lib.rs b/quickwit-swim/src/lib.rs deleted file mode 100644 index c45726157d..0000000000 --- a/quickwit-swim/src/lib.rs +++ /dev/null @@ -1,15 +0,0 @@ -pub mod cluster; -pub mod cluster_config; -pub mod errors; -pub mod member; -pub mod membership; -pub mod state; - -pub mod prelude { - pub use super::cluster::*; - pub use super::cluster_config::*; - pub use super::errors::ArtilleryError; - pub use super::member::*; - pub use super::membership::*; - pub use super::state::*; -} diff --git a/quickwit-swim/src/member.rs b/quickwit-swim/src/member.rs deleted file mode 100644 index b6ceeca46b..0000000000 --- a/quickwit-swim/src/member.rs +++ /dev/null @@ -1,226 +0,0 @@ -use std::cmp::Ordering; -use std::fmt; -use std::fmt::{Debug, Formatter}; -use std::net::SocketAddr; -use std::time::{Duration, Instant}; - -use serde::*; - -#[derive(Serialize, Deserialize, Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)] -pub enum ArtilleryMemberState { - /// Looks alive as in the original paper - #[serde(rename = "a")] - Alive, - /// Suspect from the given node - #[serde(rename = "s")] - Suspect, - /// AKA `Confirm` in the original paper - #[serde(rename = "d")] - Down, - /// Left the cluster - #[serde(rename = "l")] - Left, -} - -#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)] -pub struct ArtilleryMember { - #[serde(rename = "h")] - pub node_id: String, - #[serde(rename = "r")] - remote_host: Option, - #[serde(rename = "i")] - pub incarnation_number: u64, - #[serde(rename = "m")] - member_state: ArtilleryMemberState, - #[serde(rename = "t", skip, default = "Instant::now")] - last_state_change: Instant, -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialOrd, Ord, PartialEq, Eq)] -pub struct ArtilleryStateChange { - member: ArtilleryMember, -} - -impl ArtilleryMember { - pub fn new( - node_id: String, - remote_host: SocketAddr, - incarnation_number: u64, - known_state: ArtilleryMemberState, - ) -> Self { - ArtilleryMember { - node_id, - remote_host: Some(remote_host), - incarnation_number, - member_state: known_state, - last_state_change: Instant::now(), - } - } - - pub fn current(node_id: String) -> Self { - ArtilleryMember { - node_id, - remote_host: None, - incarnation_number: 0, - member_state: ArtilleryMemberState::Alive, - last_state_change: Instant::now(), - } - } - - pub fn node_id(&self) -> String { - self.node_id.clone() - } - - pub fn remote_host(&self) -> Option { - self.remote_host - } - - pub fn set_remote_host(&mut self, addr: SocketAddr) { - self.remote_host = Some(addr); - } - - pub fn is_remote(&self) -> bool { - self.remote_host.is_some() - } - - pub fn is_current(&self) -> bool { - self.remote_host.is_none() - } - - pub fn state_change_older_than(&self, duration: Duration) -> bool { - self.last_state_change + duration < Instant::now() - } - - pub fn state(&self) -> ArtilleryMemberState { - self.member_state - } - - pub fn set_state(&mut self, state: ArtilleryMemberState) { - if self.member_state != state { - self.member_state = state; - self.last_state_change = Instant::now(); - } - } - - pub fn member_by_changing_host(&self, remote_host: SocketAddr) -> ArtilleryMember { - ArtilleryMember { - remote_host: Some(remote_host), - ..self.clone() - } - } - - pub fn reincarnate(&mut self) { - self.incarnation_number += 1 - } -} - -impl ArtilleryStateChange { - pub fn new(member: ArtilleryMember) -> ArtilleryStateChange { - ArtilleryStateChange { member } - } - - pub fn member(&self) -> &ArtilleryMember { - &self.member - } - - pub fn update(&mut self, member: ArtilleryMember) { - self.member = member - } -} - -impl PartialOrd for ArtilleryMember { - fn partial_cmp(&self, rhs: &ArtilleryMember) -> Option { - let t1 = ( - self.node_id.as_bytes(), - format!("{:?}", self.remote_host), - self.incarnation_number, - self.member_state, - ); - - let t2 = ( - rhs.node_id.as_bytes(), - format!("{:?}", rhs.remote_host), - rhs.incarnation_number, - rhs.member_state, - ); - - t1.partial_cmp(&t2) - } -} - -impl Ord for ArtilleryMember { - fn cmp(&self, rhs: &ArtilleryMember) -> Ordering { - self.partial_cmp(rhs).unwrap() - } -} - -impl Debug for ArtilleryMember { - fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { - fmt.debug_struct("ArtilleryMember") - .field("incarnation_number", &self.incarnation_number) - .field("host", &self.node_id) - .field("state", &self.member_state) - .field( - "drift_time_ms", - &self.last_state_change.elapsed().as_millis(), - ) - .field("remote_host", &self.remote_host) - .finish() - } -} - -pub fn most_uptodate_member_data<'a>( - lhs: &'a ArtilleryMember, - rhs: &'a ArtilleryMember, -) -> &'a ArtilleryMember { - // Don't apply clippy here. - // It's important bit otherwise we won't understand. - #![allow(clippy::match_same_arms)] - - let lhs_overrides = match ( - lhs.member_state, - lhs.incarnation_number, - rhs.member_state, - rhs.incarnation_number, - ) { - (ArtilleryMemberState::Alive, i, ArtilleryMemberState::Suspect, j) => i > j, - (ArtilleryMemberState::Alive, i, ArtilleryMemberState::Alive, j) => i > j, - (ArtilleryMemberState::Suspect, i, ArtilleryMemberState::Suspect, j) => i > j, - (ArtilleryMemberState::Suspect, i, ArtilleryMemberState::Alive, j) => i >= j, - (ArtilleryMemberState::Down, _, ArtilleryMemberState::Alive, _) => true, - (ArtilleryMemberState::Down, _, ArtilleryMemberState::Suspect, _) => true, - (ArtilleryMemberState::Left, _, _, _) => true, - _ => false, - }; - - if lhs_overrides { - lhs - } else { - rhs - } -} - -#[cfg(test)] -mod test { - use std::str::FromStr; - use std::time::{Duration, Instant}; - - use uuid; - - use super::{ArtilleryMember, ArtilleryMemberState}; - - #[test] - fn test_member_encode_decode() { - let member = ArtilleryMember { - node_id: uuid::Uuid::new_v4().to_string(), - remote_host: Some(FromStr::from_str("127.0.0.1:1337").unwrap()), - incarnation_number: 123, - member_state: ArtilleryMemberState::Alive, - last_state_change: Instant::now() - Duration::from_secs(3600), - }; - let encoded = bincode::serialize(&member).unwrap(); - let decoded: ArtilleryMember = bincode::deserialize(&encoded).unwrap(); - let encoded_again = bincode::serialize(&decoded).unwrap(); - assert_eq!(encoded, encoded_again); - } -} diff --git a/quickwit-swim/src/membership.rs b/quickwit-swim/src/membership.rs deleted file mode 100644 index 0903ba6fe3..0000000000 --- a/quickwit-swim/src/membership.rs +++ /dev/null @@ -1,314 +0,0 @@ -use std::collections::HashSet; -use std::fmt; -use std::fmt::{Debug, Formatter}; -use std::net::SocketAddr; -use std::time::Duration; - -use rand::prelude::SliceRandom; - -use crate::member::{self, ArtilleryMember, ArtilleryMemberState, ArtilleryStateChange}; - -pub struct ArtilleryMemberList { - members: Vec, - periodic_index: usize, -} - -impl Debug for ArtilleryMemberList { - fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { - fmt.debug_struct("ArtilleryEpidemic") - .field("members", &self.members) - .finish() - } -} - -impl ArtilleryMemberList { - pub fn new(current: ArtilleryMember) -> Self { - ArtilleryMemberList { - members: vec![current], - periodic_index: 0, - } - } - - pub fn available_nodes(&self) -> Vec { - self.members - .iter() - .filter(|m| m.state() != ArtilleryMemberState::Left) - .cloned() - .collect() - } - - pub fn current_node_id(&self) -> String { - for member in self.members.iter() { - if member.is_current() { - return member.node_id(); - } - } - panic!("Could not find current node as registered member"); - } - - fn mut_myself(&mut self) -> &mut ArtilleryMember { - for member in &mut self.members { - if member.is_current() { - return member; - } - } - panic!("Could not find this instance as registered member"); - } - - pub fn reincarnate_self(&mut self) -> ArtilleryMember { - let myself = self.mut_myself(); - myself.reincarnate(); - - myself.clone() - } - - pub fn leave(&mut self) -> ArtilleryMember { - let myself = self.mut_myself(); - myself.set_state(ArtilleryMemberState::Left); - myself.reincarnate(); - - myself.clone() - } - - pub fn next_random_member(&mut self) -> Option { - if self.periodic_index == 0 { - let mut rng = rand::thread_rng(); - self.members.shuffle(&mut rng); - } - - let other_members: Vec<_> = self.members.iter().filter(|&m| m.is_remote()).collect(); - - if other_members.is_empty() { - None - } else { - self.periodic_index = (self.periodic_index + 1) % other_members.len(); - Some(other_members[self.periodic_index].clone()) - } - } - - pub fn time_out_nodes( - &mut self, - expired_hosts: &HashSet, - ) -> (Vec, Vec) { - let mut suspect_members = Vec::new(); - let mut down_members = Vec::new(); - - for member in &mut self.members { - if let Some(remote_host) = member.remote_host() { - if !expired_hosts.contains(&remote_host) { - continue; - } - - match member.state() { - ArtilleryMemberState::Alive => { - member.set_state(ArtilleryMemberState::Suspect); - suspect_members.push(member.clone()); - } - // TODO: Config suspect timeout - ArtilleryMemberState::Suspect - if member.state_change_older_than(Duration::from_secs(3)) => - { - member.set_state(ArtilleryMemberState::Down); - down_members.push(member.clone()); - } - ArtilleryMemberState::Suspect - | ArtilleryMemberState::Down - | ArtilleryMemberState::Left => {} - } - } - } - - (suspect_members, down_members) - } - - // Set node id for the host (in case it has changed). - pub fn set_node_id(&mut self, src_addr: SocketAddr, node_id: &str) { - if let Some(member) = self.get_mut_member_for_host(src_addr) { - member.node_id = node_id.to_string(); - } - } - - // Returns member if found via host and not alive - pub fn mark_node_alive( - &mut self, - src_addr: &SocketAddr, - node_id: String, - ) -> Option { - if self.current_node_id() == node_id { - return None; - } - self.set_node_id(*src_addr, &node_id); - for member in &mut self.members { - if member.remote_host() == Some(*src_addr) - && member.state() != ArtilleryMemberState::Alive - { - member.set_state(ArtilleryMemberState::Alive); - - return Some(member.clone()); - } - } - - None - } - - fn find_member<'a>( - members: &'a mut [ArtilleryMember], - node_id: &str, - addr: SocketAddr, - ) -> Option<&'a mut ArtilleryMember> { - let pos_matching_node_id: Option = members - .iter() - .position(|member| member.node_id() == node_id); - let pos_matching_address: Option = members - .iter() - .position(|member| member.remote_host() == Some(addr)); - let chosen_position: usize = pos_matching_node_id.or(pos_matching_address)?; - Some(&mut members[chosen_position]) - } - - pub fn apply_state_changes( - &mut self, - state_changes: Vec, - from: &SocketAddr, - ) -> (Vec, Vec) { - let mut current_members = self.members.clone(); - - let mut changed_nodes = Vec::new(); - let mut new_nodes = Vec::new(); - - let my_node_id = self.current_node_id(); - - for state_change in state_changes { - let member_change = state_change.member(); - - if member_change.node_id() == my_node_id { - if member_change.state() != ArtilleryMemberState::Alive { - let myself = self.reincarnate_self(); - changed_nodes.push(myself.clone()); - } - } else if let Some(existing_member) = Self::find_member( - &mut current_members, - &member_change.node_id(), - member_change.remote_host().unwrap_or(*from), - ) { - let update_member = - member::most_uptodate_member_data(member_change, existing_member).clone(); - let new_host = update_member - .remote_host() - .or_else(|| existing_member.remote_host()) - .unwrap(); - let update_member = update_member.member_by_changing_host(new_host); - - if update_member.state() != existing_member.state() { - existing_member.set_state(update_member.state()); - existing_member.incarnation_number = update_member.incarnation_number; - if let Some(host) = update_member.remote_host() { - existing_member.set_remote_host(host); - } - changed_nodes.push(update_member); - } - } else if member_change.state() != ArtilleryMemberState::Down - && member_change.state() != ArtilleryMemberState::Left - { - let new_host = member_change.remote_host().unwrap_or(*from); - let new_member = member_change.member_by_changing_host(new_host); - - current_members.push(new_member.clone()); - new_nodes.push(new_member); - } - } - - self.members = current_members - .into_iter() - .filter(|member| { - member.state() != ArtilleryMemberState::Down - && member.state() != ArtilleryMemberState::Left - }) - .collect::>(); - - (new_nodes, changed_nodes) - } - - /// Random ping enqueuing - pub fn hosts_for_indirect_ping( - &self, - host_count: usize, - target: &SocketAddr, - ) -> Vec { - let mut possible_members: Vec<_> = self - .members - .iter() - .filter_map(|m| { - if m.state() == ArtilleryMemberState::Alive - && m.is_remote() - && m.remote_host() != Some(*target) - { - m.remote_host() - } else { - None - } - }) - .collect(); - - let mut rng = rand::thread_rng(); - possible_members.shuffle(&mut rng); - possible_members.iter().take(host_count).cloned().collect() - } - - pub fn has_member(&self, remote_host: &SocketAddr) -> bool { - self.members - .iter() - .any(|m| m.remote_host() == Some(*remote_host)) - } - - pub fn add_member(&mut self, member: ArtilleryMember) { - self.members.push(member) - } - - pub fn remove_member(&mut self, id: &str) { - self.members.retain(|member| member.node_id() != id) - } - - /// `get_mut_member_for_host` will return artillery member if the given host is matched with any - /// of the member in the cluster. - pub fn get_mut_member_for_host(&mut self, host: SocketAddr) -> Option<&mut ArtilleryMember> { - self.members - .iter_mut() - .find(|m| m.remote_host() == Some(host)) - } - - /// `get_member` will return artillery member if the given uuid is matched with any of the - /// member in the cluster. - pub fn get_member(&self, id: &str) -> Option { - self.members.iter().find(|&m| m.node_id() == *id).cloned() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn membership_test() { - let current = ArtilleryMember::current("myid".to_string()); - let mut members = ArtilleryMemberList::new(current); - let address = "127.0.0.1:8080".parse().unwrap(); - members.add_member(ArtilleryMember::new( - "other_node".to_string(), - address, - 0, - ArtilleryMemberState::Suspect, - )); - - members.mark_node_alive(&address, "myid".to_string()); - - assert_eq!( - members - .available_nodes() - .iter() - .filter(|node| node.node_id() == "myid") - .count(), - 1 - ); - } -} diff --git a/quickwit-swim/src/state.rs b/quickwit-swim/src/state.rs deleted file mode 100644 index 62baaaa930..0000000000 --- a/quickwit-swim/src/state.rs +++ /dev/null @@ -1,585 +0,0 @@ -use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; -use std::fmt::{self, Debug, Formatter}; -use std::io; -use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Instant; - -use mio::net::UdpSocket; -use mio::{Events, Interest, Poll, Token}; -use serde::*; -use tracing::{debug, error, info, warn}; - -use super::cluster_config::ClusterConfig; -use super::membership::ArtilleryMemberList; -use crate::errors::*; -use crate::member::{ArtilleryMember, ArtilleryMemberState, ArtilleryStateChange}; -use crate::prelude::CONST_PACKET_SIZE; - -pub type ArtilleryClusterEvent = (Vec, ArtilleryMemberEvent); -pub type WaitList = HashMap>; - -#[derive(Debug)] -pub enum ArtilleryMemberEvent { - Joined(ArtilleryMember), - WentUp(ArtilleryMember), - SuspectedDown(ArtilleryMember), - WentDown(ArtilleryMember), - Left(ArtilleryMember), - Payload(ArtilleryMember, String), -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ArtilleryMessage { - node_id: String, - cluster_key: Vec, - request: Request, - state_changes: Vec, -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -struct EncSocketAddr(SocketAddr); - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -enum Request { - Heartbeat(Option), - Ack(String), - Ping(EncSocketAddr, String), - AckHost(ArtilleryMember), - Payload(String, String), -} - -impl Request { - fn is_heartbeat(&self) -> bool { - matches!(self, Request::Heartbeat(_)) - } -} - -#[derive(Debug, Clone)] -pub struct TargetedRequest { - request: Request, - target: SocketAddr, -} - -#[derive(Clone)] -pub enum ArtilleryClusterRequest { - AddSeed(SocketAddr), - Respond(SocketAddr, ArtilleryMessage), - React(TargetedRequest), - LeaveCluster, - Exit, - Payload(String, String), -} - -const UDP_SERVER: Token = Token(0); - -pub struct ArtilleryEpidemic { - host_id: String, - config: ClusterConfig, - members: ArtilleryMemberList, - seed_queue: Vec, - pending_responses: Vec<(Instant, SocketAddr, Vec)>, - state_changes: Vec, - wait_list: WaitList, - server_socket: UdpSocket, - request_tx: flume::Sender, - event_tx: flume::Sender, - running: AtomicBool, -} - -impl Debug for ArtilleryEpidemic { - fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { - fmt.debug_struct("ArtilleryEpidemic") - .field("host_id", &self.host_id) - .field("members", &self.members) - .field("seed_queue", &self.seed_queue) - .field("pending_responses", &self.pending_responses) - .field("state_changes", &self.state_changes) - .finish() - } -} - -pub type ClusterReactor = (Poll, ArtilleryEpidemic); - -impl ArtilleryEpidemic { - pub fn new( - host_id: String, - config: ClusterConfig, - event_tx: flume::Sender, - internal_tx: flume::Sender, - ) -> Result { - let poll: Poll = Poll::new()?; - - let interests = Interest::READABLE.add(Interest::WRITABLE); - let mut server_socket = UdpSocket::bind(config.listen_addr)?; - poll.registry() - .register(&mut server_socket, UDP_SERVER, interests)?; - - let me = ArtilleryMember::current(host_id.clone()); - - let state = ArtilleryEpidemic { - host_id, - config, - members: ArtilleryMemberList::new(me.clone()), - seed_queue: Vec::new(), - pending_responses: Vec::new(), - state_changes: vec![ArtilleryStateChange::new(me)], - wait_list: HashMap::new(), - server_socket, - request_tx: internal_tx, - event_tx, - running: AtomicBool::new(true), - }; - - Ok((poll, state)) - } - - pub(crate) fn event_loop( - receiver: &mut flume::Receiver, - mut poll: Poll, - mut state: ArtilleryEpidemic, - ) -> Result<()> { - let mut events = Events::with_capacity(1); - let mut buf = [0_u8; CONST_PACKET_SIZE]; - - let mut start = Instant::now(); - let timeout = state.config.ping_interval; - - debug!("Starting Event Loop"); - // Our event loop. - loop { - let elapsed = start.elapsed(); - - if elapsed >= timeout { - state.enqueue_seed_nodes(); - state.enqueue_random_ping(); - start = Instant::now(); - } - - if !state.running.load(Ordering::SeqCst) { - debug!("Stopping artillery epidemic evloop"); - break; - } - - // Poll to check if we have events waiting for us. - if let Some(remaining) = timeout.checked_sub(elapsed) { - poll.poll(&mut events, Some(remaining))?; - } - - // Process our own events that are submitted to event loop - // Aka outbound events - while let Ok(msg) = receiver.try_recv() { - state.process_internal_request(msg); - } - - // Process inbound events - for event in events.iter() { - if let UDP_SERVER = event.token() { - loop { - match state.server_socket.recv_from(&mut buf) { - Ok((packet_size, source_address)) => { - let message: ArtilleryMessage = - serde_json::from_slice(&buf[..packet_size])?; - state.request_tx.send(ArtilleryClusterRequest::Respond( - source_address, - message, - ))?; - } - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - // If we get a `WouldBlock` error we know our socket - // has no more packets queued, so we can return to - // polling and wait for some more. - break; - } - Err(e) => { - // If it was any other kind of error, something went - // wrong and we terminate with an error. - return Err(ArtilleryError::Unexpected(format!( - "Unexpected error occured in event loop: {}", - e - ))); - } - } - } - } else { - warn!("Got event for unexpected token: {:?}", event); - } - } - } - - info!("Exiting..."); - Ok(()) - } - - fn process_request(&mut self, request: &TargetedRequest) { - let timeout = Instant::now() + self.config.ping_timeout; - // It was Ping before - let should_add_pending = request.request.is_heartbeat(); - let message = build_message( - &self.host_id, - &self.config.cluster_key, - &request.request, - &self.state_changes, - self.config.network_mtu, - ); - - if should_add_pending { - self.pending_responses - .push((timeout, request.target, message.state_changes.clone())); - } - - let encoded = serde_json::to_string(&message).unwrap(); - - assert!(encoded.len() < self.config.network_mtu); - - let buf = encoded.as_bytes(); - self.server_socket.send_to(buf, request.target).unwrap(); - } - - fn enqueue_seed_nodes(&self) { - for seed_node in self - .seed_queue - .iter() - .filter(|addr| !self.members.has_member(addr)) - { - self.request_tx - .send(ArtilleryClusterRequest::React(TargetedRequest { - request: Request::Heartbeat(None), - target: *seed_node, - })) - .unwrap(); - } - } - - fn enqueue_random_ping(&mut self) { - if let Some(member) = self.members.next_random_member() { - self.request_tx - .send(ArtilleryClusterRequest::React(TargetedRequest { - request: Request::Heartbeat(Some(member.node_id())), - target: member.remote_host().unwrap(), - })) - .unwrap(); - } - } - - fn prune_timed_out_responses(&mut self) { - let now = Instant::now(); - - let (expired, remaining): (Vec<_>, Vec<_>) = self - .pending_responses - .iter() - .cloned() - .partition(|&(t, _, _)| t < now); - - let expired_hosts: HashSet = expired.iter().map(|&(_, a, _)| a).collect(); - - self.pending_responses = remaining; - - let (suspect, down) = self.members.time_out_nodes(&expired_hosts); - - enqueue_state_change(&mut self.state_changes, &down); - enqueue_state_change(&mut self.state_changes, &suspect); - - for member in suspect { - self.send_ping_requests(&member); - self.send_member_event(ArtilleryMemberEvent::SuspectedDown(member.clone())); - } - - for member in down { - self.members.remove_member(&member.node_id()); - self.send_member_event(ArtilleryMemberEvent::WentDown(member.clone())); - } - } - - fn send_ping_requests(&self, target: &ArtilleryMember) { - if let Some(target_host) = target.remote_host() { - for relay in self - .members - .hosts_for_indirect_ping(self.config.ping_request_host_count, &target_host) - { - self.request_tx - .send(ArtilleryClusterRequest::React(TargetedRequest { - request: Request::Ping( - EncSocketAddr::from_addr(&target_host), - target.node_id(), - ), - target: relay, - })) - .unwrap(); - } - } - } - - fn process_internal_request(&mut self, message: ArtilleryClusterRequest) { - use ArtilleryClusterRequest::*; - - match message { - AddSeed(addr) => self.seed_queue.push(addr), - Respond(src_addr, message) => self.respond_to_message(src_addr, message), - React(request) => { - self.prune_timed_out_responses(); - self.process_request(&request); - } - LeaveCluster => { - let myself = self.members.leave(); - enqueue_state_change(&mut self.state_changes, &[myself]); - } - Payload(id, msg) => { - if let Some(target_peer) = self.members.get_member(&id) { - if !target_peer.is_remote() { - error!("Current node can't send payload to self over LAN"); - return; - } - - self.process_request(&TargetedRequest { - request: Request::Payload(id, msg), - target: target_peer - .remote_host() - .expect("Expected target peer addr"), - }); - return; - } - warn!( - "Unable to find the peer with an id - {} to send the payload", - id - ); - } - Exit => { - self.running.store(false, Ordering::SeqCst); - } - }; - } - - fn respond_to_message(&mut self, src_addr: SocketAddr, message: ArtilleryMessage) { - use Request::*; - - if message.cluster_key != self.config.cluster_key { - error!("Mismatching cluster keys, ignoring message"); - return; - } - // We want to abort if a new member has the same node_id of an existing member. - // A new member is detected according to its socket address. - if !self.members.has_member(&src_addr) - && self.members.get_member(&message.node_id).is_some() - { - error!( - "Cannot add a member with a node-id `{}` already present in the cluster.", - message.node_id - ); - return; - } - - self.apply_state_changes(message.state_changes, src_addr); - - self.ensure_node_is_member(src_addr, message.node_id); - - let response = match message.request { - Heartbeat(ref opt_node_id) => { - let should_ignore_wrong_id = opt_node_id - .as_ref() - .map(|node_id| node_id != &self.host_id) - .unwrap_or(false); - if should_ignore_wrong_id { - None - } else { - Some(TargetedRequest { - request: Ack(self.host_id.to_string()), - target: src_addr, - }) - } - } - Ack(node_id) => { - self.ack_response(src_addr); - self.mark_node_alive(src_addr, node_id); - None - } - Ping(dest_addr, node_id) => { - let EncSocketAddr(dest_addr) = dest_addr; - add_to_wait_list(&mut self.wait_list, &dest_addr, &src_addr); - Some(TargetedRequest { - request: Heartbeat(Some(node_id)), - target: dest_addr, - }) - } - AckHost(member) => { - self.ack_response(member.remote_host().unwrap()); - self.mark_node_alive(member.remote_host().unwrap(), member.node_id()); - None - } - Payload(peer_id, msg) => { - if let Some(member) = self.members.get_member(&peer_id) { - self.send_member_event(ArtilleryMemberEvent::Payload(member, msg)); - } else { - warn!("Got payload request from an unknown peer {}", peer_id); - } - None - } - }; - - if let Some(response) = response { - self.request_tx - .send(ArtilleryClusterRequest::React(response)) - .unwrap() - } - } - - fn ack_response(&mut self, src_addr: SocketAddr) { - let mut to_remove = Vec::new(); - - for &(ref t, ref addr, ref state_changes) in &self.pending_responses { - if src_addr != *addr { - continue; - } - - to_remove.push((*t, *addr, state_changes.clone())); - - self.state_changes.retain(|os| { - !state_changes - .iter() - .any(|is| is.member().node_id() == os.member().node_id()) - }); - } - - self.pending_responses - .retain(|op| !to_remove.iter().any(|ip| ip == op)); - } - - fn ensure_node_is_member(&mut self, src_addr: SocketAddr, node_id: String) { - if node_id == self.host_id { - return; - } - if self.members.has_member(&src_addr) { - return; - } - - let new_member = ArtilleryMember::new(node_id, src_addr, 0, ArtilleryMemberState::Alive); - - self.members.add_member(new_member.clone()); - enqueue_state_change(&mut self.state_changes, &[new_member.clone()]); - self.send_member_event(ArtilleryMemberEvent::Joined(new_member)); - } - - fn send_member_event(&self, event: ArtilleryMemberEvent) { - use ArtilleryMemberEvent::*; - - match event { - Joined(_) | Payload(..) => {} - WentUp(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Alive), - WentDown(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Down), - SuspectedDown(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Suspect), - Left(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Left), - }; - - // If an error is returned, no one is listening to events anymore. This is normal. - let _ = self.event_tx.send((self.members.available_nodes(), event)); - } - - fn apply_state_changes(&mut self, state_changes: Vec, from: SocketAddr) { - let (new, changed) = self.members.apply_state_changes(state_changes, &from); - - enqueue_state_change(&mut self.state_changes, &new); - enqueue_state_change(&mut self.state_changes, &changed); - - for member in new { - self.send_member_event(ArtilleryMemberEvent::Joined(member)); - } - - for member in changed { - self.send_member_event(determine_member_event(member)); - } - } - - fn mark_node_alive(&mut self, src_addr: SocketAddr, node_id: String) { - if let Some(member) = self.members.mark_node_alive(&src_addr, node_id) { - if let Some(wait_list) = self.wait_list.get_mut(&src_addr) { - for remote in wait_list.iter() { - self.request_tx - .send(ArtilleryClusterRequest::React(TargetedRequest { - request: Request::AckHost(member.clone()), - target: *remote, - })) - .unwrap(); - } - - wait_list.clear(); - } - - enqueue_state_change(&mut self.state_changes, &[member.clone()]); - self.send_member_event(ArtilleryMemberEvent::WentUp(member)); - } - } -} - -fn build_message( - node_id: &str, - cluster_key: &[u8], - request: &Request, - state_changes: &[ArtilleryStateChange], - network_mtu: usize, -) -> ArtilleryMessage { - let mut message = ArtilleryMessage { - node_id: node_id.to_string(), - cluster_key: cluster_key.into(), - request: request.clone(), - state_changes: Vec::new(), - }; - - for i in 0..=state_changes.len() { - message = ArtilleryMessage { - node_id: node_id.to_string(), - cluster_key: cluster_key.into(), - request: request.clone(), - state_changes: (&state_changes[..i]).to_vec(), - }; - - let encoded = serde_json::to_string(&message).unwrap(); - if encoded.len() >= network_mtu { - return message; - } - } - - message -} - -fn add_to_wait_list(wait_list: &mut WaitList, wait_addr: &SocketAddr, notify_addr: &SocketAddr) { - match wait_list.entry(*wait_addr) { - Entry::Occupied(mut entry) => { - entry.get_mut().push(*notify_addr); - } - Entry::Vacant(entry) => { - entry.insert(vec![*notify_addr]); - } - }; -} - -fn determine_member_event(member: ArtilleryMember) -> ArtilleryMemberEvent { - match member.state() { - ArtilleryMemberState::Alive => ArtilleryMemberEvent::WentUp(member), - ArtilleryMemberState::Suspect => ArtilleryMemberEvent::SuspectedDown(member), - ArtilleryMemberState::Down => ArtilleryMemberEvent::WentDown(member), - ArtilleryMemberState::Left => ArtilleryMemberEvent::Left(member), - } -} - -fn enqueue_state_change( - state_changes: &mut Vec, - members: &[ArtilleryMember], -) { - for member in members { - for state_change in state_changes.iter_mut() { - if state_change.member().node_id() == member.node_id() { - state_change.update(member.clone()); - return; - } - } - - state_changes.push(ArtilleryStateChange::new(member.clone())); - } -} - -impl EncSocketAddr { - fn from_addr(addr: &SocketAddr) -> Self { - EncSocketAddr(*addr) - } -} diff --git a/rustfmt.toml b/rustfmt.toml index 9bdc2f748f..35d15bc549 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,7 +1,6 @@ ignore = [ "quickwit-proto/src/cluster.rs", "quickwit-proto/src/quickwit.rs", - "quickwit-swim", ] comment_width = 120 diff --git a/scripts/dep-tree.py b/scripts/dep-tree.py index 0d02742faf..075fb2994f 100644 --- a/scripts/dep-tree.py +++ b/scripts/dep-tree.py @@ -11,7 +11,6 @@ "quickwit-common", "quickwit-indexing", "quickwit-metastore", - "quickwit-swim", "quickwit-proto", "quickwit-directories", "quickwit-common",