diff --git a/Cargo.lock b/Cargo.lock index f7e21da..1183037 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -439,6 +439,15 @@ dependencies = [ "libc 0.2.71 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fsio" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", + "users 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -1536,6 +1545,14 @@ dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "run_script" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fsio 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rustc-demangle" version = "0.1.16" @@ -2000,6 +2017,14 @@ dependencies = [ "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "users" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.71 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "uuid" version = "0.7.4" @@ -2042,6 +2067,7 @@ dependencies = [ "reqwest 0.10.6 (registry+https://github.com/rust-lang/crates.io-index)", "rocket 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "rocket_contrib 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "run_script 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2269,6 +2295,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" "checksum fsevent 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5ab7d1bd1bd33cc98b0889831b72da23c0aa4df9cec7e0702f46ecea04b35db6" "checksum fsevent-sys 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f41b048a94555da0f42f1d632e2e19510084fb8e303b0daa2816e733fb3644a0" +"checksum fsio 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2131cb03096f67334dfba2f0bc46afc5564b08a919d042c6e217e2665741fc54" "checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" @@ -2387,6 +2414,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum rocket_codegen 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "cb852e6da168fb948a8f2b798ba2e2f0e4fc860eae0efa9cf2bf0f5466bb0425" "checksum rocket_contrib 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e3946ca815127041d8f64455561031d058c22ae1b135251502c5ea523cf9e14b" "checksum rocket_http 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "1aff5a5480175f2f553a876b251e9350c74196128806d176da3a51c82aab5428" +"checksum run_script 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a8e8fc35067815a04a35fe2144361e1257b0f1041f0d413664f38e44d1a73cb4" "checksum rustc-demangle 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "4c691c0e608126e00913e33f0ccf3727d5fc84573623b8d65b2df340b5201783" "checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" "checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" @@ -2445,6 +2473,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" "checksum url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb" "checksum url_serde 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "74e7d099f1ee52f823d4bdd60c93c3602043c728f5db3b97bdb548467f7bddea" +"checksum users 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c72f4267aea0c3ec6d07eaabea6ead7c5ddacfafc5e22bcf8d186706851fb4cf" "checksum uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)" = "90dbc611eb48397705a6b0f6e917da23ae517e4d127123d2cf7674206627d32a" "checksum vcpkg 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" "checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" diff --git a/Cargo.toml b/Cargo.toml index 19711be..3134631 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ native-tls = { version = "0.2", features = ["vendored"] } openssl-probe = "0.1" reqwest = { version = "0.10", features = ["native-tls-vendored", "gzip", "blocking", "json"], default-features = false } ping = "0.2" +run_script = "0.6" lettre = { version = "0.9", features = ["smtp-transport"], optional = true } lettre_email = { version = "0.9", optional = true } libstrophe = { version = "0.13", default-features = false, optional = true } diff --git a/config.cfg b/config.cfg index 0cd30ce..02e8d8b 100644 --- a/config.cfg +++ b/config.cfg @@ -44,6 +44,8 @@ push_delay_dead = 20 push_system_cpu_sick_above = 0.90 push_system_ram_sick_above = 0.90 +script_interval = 300 + [plugins] [plugins.rabbitmq] @@ -177,3 +179,28 @@ id = "socket-client" label = "Visitor realtime sockets" mode = "push" rabbitmq_queue = "client" + +[[probe.service]] + +id = "feature" +label = "Feature checks" + +[[probe.service.node]] + +id = "mysql-replication" +label = "MySQL replication status" +mode = "script" + +scripts = [ + ''' + exit 0 + ''', + + ''' + exit 1 + ''', + + ''' + exit 2 + ''' +] diff --git a/src/config/config.rs b/src/config/config.rs index 12ba6f2..2a16b07 100644 --- a/src/config/config.rs +++ b/src/config/config.rs @@ -88,6 +88,9 @@ pub struct ConfigMetrics { #[serde(default = "defaults::metrics_push_system_ram_sick_above")] pub push_system_ram_sick_above: f32, + + #[serde(default = "defaults::metrics_script_interval")] + pub script_interval: u64, } #[derive(Deserialize)] @@ -217,6 +220,7 @@ pub struct ConfigProbeServiceNode { pub label: String, pub mode: Mode, pub replicas: Option>, + pub scripts: Option>, pub http_body_healthy_match: Option, pub rabbitmq_queue: Option, } diff --git a/src/config/defaults.rs b/src/config/defaults.rs index e7ada99..cb87d8c 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -63,6 +63,10 @@ pub fn metrics_push_system_ram_sick_above() -> f32 { 0.99 } +pub fn metrics_script_interval() -> u64 { + 300 +} + pub fn notify_startup_notification() -> bool { true } diff --git a/src/main.rs b/src/main.rs index b2879e3..944a852 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,6 +24,7 @@ extern crate ping; extern crate regex; extern crate reqwest; extern crate rocket_contrib; +extern crate run_script; extern crate serde; extern crate time; extern crate toml; @@ -56,14 +57,18 @@ use crate::aggregator::manager::run as run_aggregator; use crate::config::config::Config; use crate::config::logger::ConfigLogger; use crate::config::reader::ConfigReader; -use crate::prober::manager::{initialize_store as initialize_store_prober, run as run_prober}; +use crate::prober::manager::{ + initialize_store as initialize_store_prober, run_poll as run_poll_prober, + run_script as run_script_prober, +}; use crate::responder::manager::run as run_responder; struct AppArgs { config: String, } -pub static THREAD_NAME_PROBER: &'static str = "vigil-prober"; +pub static THREAD_NAME_PROBER_POLL: &'static str = "vigil-prober-poll"; +pub static THREAD_NAME_PROBER_SCRIPT: &'static str = "vigil-prober-script"; pub static THREAD_NAME_AGGREGATOR: &'static str = "vigil-aggregator"; pub static THREAD_NAME_RESPONDER: &'static str = "vigil-responder"; @@ -101,7 +106,18 @@ lazy_static! { static ref APP_CONF: Config = ConfigReader::make(); } -gen_spawn_managed!("prober", spawn_prober, THREAD_NAME_PROBER, run_prober); +gen_spawn_managed!( + "prober-poll", + spawn_poll_prober, + THREAD_NAME_PROBER_POLL, + run_poll_prober +); +gen_spawn_managed!( + "prober-script", + spawn_script_prober, + THREAD_NAME_PROBER_SCRIPT, + run_script_prober +); gen_spawn_managed!( "aggregator", spawn_aggregator, @@ -167,7 +183,8 @@ fn main() { initialize_store_prober(); // Spawn probes (background thread) - thread::spawn(spawn_prober); + thread::spawn(spawn_poll_prober); + thread::spawn(spawn_script_prober); // Spawn aggregator (background thread) thread::spawn(spawn_aggregator); diff --git a/src/prober/manager.rs b/src/prober/manager.rs index 4f7a845..7edb5c8 100644 --- a/src/prober/manager.rs +++ b/src/prober/manager.rs @@ -18,6 +18,7 @@ use reqwest::blocking::Client; use reqwest::header::{HeaderMap, USER_AGENT}; use reqwest::redirect::Policy as RedirectPolicy; use reqwest::StatusCode; +use run_script::{self, ScriptOptions}; use super::replica::ReplicaURL; use super::states::{ @@ -63,6 +64,11 @@ pub struct Store { pub notified: Option, } +enum DispatchMode<'a> { + Poll(&'a ReplicaURL, &'a Option), + Script(&'a String), +} + fn make_default_headers() -> HeaderMap { let mut headers = HeaderMap::new(); @@ -82,7 +88,7 @@ fn map_poll_replicas() -> Vec<(String, String, String, ReplicaURL, Option // Acquire states let states = &PROBER_STORE.read().unwrap().states; - // Map hosts to be probed + // Map replica URLs to be probed for (probe_id, probe) in states.probes.iter() { for (node_id, node) in probe.nodes.iter() { if node.mode == Mode::Poll { @@ -108,7 +114,36 @@ fn map_poll_replicas() -> Vec<(String, String, String, ReplicaURL, Option replica_list } -fn proceed_replica_probe_with_retry( +fn map_script_replicas() -> Vec<(String, String, String, String)> { + let mut replica_list = Vec::new(); + + // Acquire states + let states = &PROBER_STORE.read().unwrap().states; + + // Map scripts to be probed + for (probe_id, probe) in states.probes.iter() { + for (node_id, node) in probe.nodes.iter() { + if node.mode == Mode::Script { + for (replica_id, replica) in node.replicas.iter() { + if let Some(ref replica_script) = replica.script { + // Clone values to scan; this ensure the write lock is not held while \ + // the script execution is performed. Same as in `map_poll_replicas()`. + replica_list.push(( + probe_id.to_owned(), + node_id.to_owned(), + replica_id.to_owned(), + replica_script.to_owned(), + )); + } + } + } + } + } + + replica_list +} + +fn proceed_replica_probe_poll_with_retry( replica_url: &ReplicaURL, body_match: &Option, ) -> (Status, Option) { @@ -124,7 +159,7 @@ fn proceed_replica_probe_with_retry( thread::sleep(Duration::from_millis(PROBE_HOLD_MILLISECONDS)); - let probe_results = proceed_replica_probe(replica_url, body_match); + let probe_results = proceed_replica_probe_poll(replica_url, body_match); status = probe_results.0; latency = Some(probe_results.1); @@ -133,17 +168,17 @@ fn proceed_replica_probe_with_retry( (status, latency) } -fn proceed_replica_probe( +fn proceed_replica_probe_poll( replica_url: &ReplicaURL, body_match: &Option, ) -> (Status, Duration) { let start_time = SystemTime::now(); let (is_up, poll_duration) = match replica_url { - &ReplicaURL::ICMP(ref host) => proceed_replica_probe_icmp(host), - &ReplicaURL::TCP(ref host, port) => proceed_replica_probe_tcp(host, port), - &ReplicaURL::HTTP(ref url) => proceed_replica_probe_http(url, body_match), - &ReplicaURL::HTTPS(ref url) => proceed_replica_probe_http(url, body_match), + &ReplicaURL::ICMP(ref host) => proceed_replica_probe_poll_icmp(host), + &ReplicaURL::TCP(ref host, port) => proceed_replica_probe_poll_tcp(host, port), + &ReplicaURL::HTTP(ref url) => proceed_replica_probe_poll_http(url, body_match), + &ReplicaURL::HTTPS(ref url) => proceed_replica_probe_poll_http(url, body_match), }; let duration_latency = match poll_duration { @@ -165,7 +200,7 @@ fn proceed_replica_probe( } } -fn proceed_replica_probe_icmp(host: &str) -> (bool, Option) { +fn proceed_replica_probe_poll_icmp(host: &str) -> (bool, Option) { // Notice: a dummy port of value '0' is set here, so that we can resolve the host to an actual \ // IP address using the standard library, which avoids depending on an additional library. let address_results = (host, 0).to_socket_addrs(); @@ -273,7 +308,7 @@ fn proceed_replica_probe_icmp(host: &str) -> (bool, Option) { (true, maximum_rtt) } -fn proceed_replica_probe_tcp(host: &str, port: u16) -> (bool, Option) { +fn proceed_replica_probe_poll_tcp(host: &str, port: u16) -> (bool, Option) { let address_results = (host, port).to_socket_addrs(); match address_results { @@ -317,7 +352,10 @@ fn proceed_replica_probe_tcp(host: &str, port: u16) -> (bool, Option) (false, None) } -fn proceed_replica_probe_http(url: &str, body_match: &Option) -> (bool, Option) { +fn proceed_replica_probe_poll_http( + url: &str, + body_match: &Option, +) -> (bool, Option) { // Acquire query string separator (if the URL already contains a query string, use append mode) let query_separator = if url.contains("?") { "&" } else { "?" }; @@ -385,6 +423,29 @@ fn proceed_replica_probe_http(url: &str, body_match: &Option) -> (bool, O (false, None) } +fn proceed_replica_probe_script(script: &String) -> Status { + match run_script::run(script, &Vec::new(), &ScriptOptions::new()) { + Ok((code, _, _)) => { + debug!( + "prober script execution succeeded with return code: {}", + code + ); + + // Return code '0' goes for 'healthy', '1' goes for 'sick'; any other code is 'dead' + match code { + 0 => Status::Healthy, + 1 => Status::Sick, + _ => Status::Dead, + } + } + Err(err) => { + error!("prober script execution failed with error: {}", err); + + Status::Dead + } + } +} + fn proceed_rabbitmq_queue_probe( rabbitmq: &ConfigPluginsRabbitMQ, rabbitmq_queue: &str, @@ -474,35 +535,61 @@ fn proceed_rabbitmq_queue_probe( (false, false, None) } -fn dispatch_polls() { - // Probe hosts - for probe_replica in map_poll_replicas() { - let (replica_status, replica_latency) = - proceed_replica_probe_with_retry(&probe_replica.3, &probe_replica.4); +fn dispatch_replica<'a>(mode: DispatchMode<'a>, probe_id: &str, node_id: &str, replica_id: &str) { + // Acquire replica status (with optional latency) + let (replica_status, replica_latency) = match mode { + DispatchMode::Poll(replica_url, body_match) => { + proceed_replica_probe_poll_with_retry(replica_url, body_match) + } + DispatchMode::Script(script) => (proceed_replica_probe_script(script), None), + }; - debug!( - "replica probe result: {}:{}:{} => {:?}", - &probe_replica.0, &probe_replica.1, &probe_replica.2, replica_status - ); + debug!( + "replica probe result: {}:{}:{} => {:?}", + probe_id, node_id, replica_id, replica_status + ); - // Update replica status (write-lock the store) - { - let mut store = STORE.write().unwrap(); + // Update replica status (write-lock the store) + { + let mut store = STORE.write().unwrap(); - if let Some(ref mut probe) = store.states.probes.get_mut(&probe_replica.0) { - if let Some(ref mut node) = probe.nodes.get_mut(&probe_replica.1) { - if let Some(ref mut replica) = node.replicas.get_mut(&probe_replica.2) { - replica.status = replica_status; + if let Some(ref mut probe) = store.states.probes.get_mut(probe_id) { + if let Some(ref mut node) = probe.nodes.get_mut(node_id) { + if let Some(ref mut replica) = node.replicas.get_mut(replica_id) { + replica.status = replica_status; - replica.metrics.latency = - replica_latency.map(|duration| duration.as_millis() as u64); - } + replica.metrics.latency = + replica_latency.map(|duration| duration.as_millis() as u64); } } } } } +fn dispatch_polls() { + // Probe hosts + for probe_replica in map_poll_replicas() { + dispatch_replica( + DispatchMode::Poll(&probe_replica.3, &probe_replica.4), + &probe_replica.0, + &probe_replica.1, + &probe_replica.2, + ); + } +} + +fn dispatch_scripts() { + // Run scripts + for probe_replica in map_script_replicas() { + dispatch_replica( + DispatchMode::Script(&probe_replica.3), + &probe_replica.0, + &probe_replica.1, + &probe_replica.2, + ); + } +} + fn dispatch_plugins_rabbitmq(probe_id: String, node_id: String, queue: Option) { // RabbitMQ plugin enabled? if let Some(ref plugins) = APP_CONF.plugins { @@ -602,6 +689,7 @@ pub fn initialize_store() { rabbitmq_queue: node.rabbitmq_queue.to_owned(), }; + // Node with replicas? (might be a poll node) if let Some(ref replicas) = node.replicas { if node.mode != Mode::Poll { panic!("non-poll node cannot have replicas"); @@ -620,6 +708,33 @@ pub fn initialize_store() { ServiceStatesProbeNodeReplica { status: Status::Healthy, url: Some(replica_url), + script: None, + metrics: ServiceStatesProbeNodeReplicaMetrics::default(), + load: None, + report: None, + }, + ); + } + } + + // Node with scripts? (might be a script node) + if let Some(ref scripts) = node.scripts { + if node.mode != Mode::Script { + panic!("non-script node cannot have scripts"); + } + + for (index, script) in scripts.iter().enumerate() { + debug!( + "prober store: got script {}:{}:#{}", + service.id, node.id, index + ); + + probe_node.replicas.insert( + index.to_string(), + ServiceStatesProbeNodeReplica { + status: Status::Healthy, + url: None, + script: Some(script.to_owned()), metrics: ServiceStatesProbeNodeReplicaMetrics::default(), load: None, report: None, @@ -637,15 +752,28 @@ pub fn initialize_store() { info!("initialized prober store"); } -pub fn run() { +pub fn run_poll() { loop { - debug!("running a probe operation..."); + debug!("running a poll probe operation..."); dispatch_polls(); - info!("ran probe operation"); + info!("ran poll probe operation"); // Hold for next aggregate run thread::sleep(Duration::from_secs(APP_CONF.metrics.poll_interval)); } } + +pub fn run_script() { + loop { + debug!("running a script probe operation..."); + + dispatch_scripts(); + + info!("ran script probe operation"); + + // Hold for next aggregate run + thread::sleep(Duration::from_secs(APP_CONF.metrics.script_interval)); + } +} diff --git a/src/prober/mode.rs b/src/prober/mode.rs index 1512c8d..9d0162f 100644 --- a/src/prober/mode.rs +++ b/src/prober/mode.rs @@ -11,4 +11,7 @@ pub enum Mode { #[serde(rename = "push")] Push, + + #[serde(rename = "script")] + Script, } diff --git a/src/prober/report.rs b/src/prober/report.rs index 9dc179f..45063e7 100644 --- a/src/prober/report.rs +++ b/src/prober/report.rs @@ -74,6 +74,7 @@ pub fn handle( ServiceStatesProbeNodeReplica { status: status, url: None, + script: None, metrics: metrics, load: Some(ServiceStatesProbeNodeReplicaLoad { cpu: load_cpu, diff --git a/src/prober/states.rs b/src/prober/states.rs index 5ccb744..020ed8f 100644 --- a/src/prober/states.rs +++ b/src/prober/states.rs @@ -41,6 +41,7 @@ pub struct ServiceStatesProbeNode { pub struct ServiceStatesProbeNodeReplica { pub status: Status, pub url: Option, + pub script: Option, pub metrics: ServiceStatesProbeNodeReplicaMetrics, pub load: Option, pub report: Option,