From 0c8838a812efe49cf49b9bd6d1d7c7b2d1bc0efc Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Sun, 20 Feb 2022 22:37:29 -0800 Subject: [PATCH] use logger lib; minor refactor; sv_* stats --- Cargo.lock | 49 ++++++++++++++++++++ Cargo.toml | 1 + src/client.rs | 12 +++-- src/config.rs | 33 +++++++------- src/main.rs | 45 +++++++++---------- src/pool.rs | 33 +++++++------- src/query_router.rs | 7 +-- src/server.rs | 39 +++++++++------- src/stats.rs | 106 +++++++++++++++++++++++++++++++++++++++----- 9 files changed, 236 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index befc8f13..1259b4e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,6 +28,17 @@ dependencies = [ "syn", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.0.1" @@ -116,6 +127,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "env_logger" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "futures-channel" version = "0.3.19" @@ -181,6 +205,12 @@ dependencies = [ "libc", ] +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "instant" version = "0.1.12" @@ -329,6 +359,7 @@ dependencies = [ "bb8", "bytes", "chrono", + "env_logger", "log", "md-5", "num_cpus", @@ -530,6 +561,15 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + [[package]] name = "time" version = "0.1.44" @@ -620,6 +660,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 7afcc6a8..860955e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,3 +24,4 @@ statsd = "0.15" sqlparser = "0.14" log = "0.4" arc-swap = "1" +env_logger = "0.9" diff --git a/src/client.rs b/src/client.rs index 7a11d60c..06f5a9f6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,6 +2,7 @@ /// We are pretending to the server in this scenario, /// and this module implements that. use bytes::{Buf, BufMut, BytesMut}; +use log::error; use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, @@ -253,7 +254,7 @@ impl Client { let connection = match pool.get(query_router.shard(), query_router.role()).await { Ok(conn) => conn, Err(err) => { - println!(">> Could not get connection from pool: {:?}", err); + error!("Could not get connection from pool: {:?}", err); error_response(&mut self.write, "could not get connection from the pool") .await?; continue; @@ -267,8 +268,9 @@ impl Client { // Claim this server as mine for query cancellation. server.claim(self.process_id, self.secret_key); - // Client active + // Client active & server active self.stats.client_active(self.process_id); + self.stats.server_active(server.process_id()); // Transaction loop. Multiple queries can be issued by the client here. // The connection belongs to the client until the transaction is over, @@ -338,7 +340,7 @@ impl Client { // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. if self.transaction_mode { - // Report this client as idle. + self.stats.server_idle(server.process_id()); break; } } @@ -420,6 +422,7 @@ impl Client { self.stats.transaction(); if self.transaction_mode { + self.stats.server_idle(server.process_id()); break; } } @@ -453,6 +456,7 @@ impl Client { self.stats.transaction(); if self.transaction_mode { + self.stats.server_idle(server.process_id()); break; } } @@ -461,7 +465,7 @@ impl Client { // Some unexpected message. We either did not implement the protocol correctly // or this is not a Postgres client we're talking to. _ => { - println!(">>> Unexpected code: {}", code); + error!("Unexpected code: {}", code); } } } diff --git a/src/config.rs b/src/config.rs index 261d596f..221fb667 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,5 @@ use arc_swap::{ArcSwap, Guard}; +use log::{error, info}; use once_cell::sync::Lazy; use serde_derive::Deserialize; use tokio::fs::File; @@ -150,14 +151,14 @@ impl Default for Config { impl Config { pub fn show(&self) { - println!("> Pool size: {}", self.general.pool_size); - println!("> Pool mode: {}", self.general.pool_mode); - println!("> Ban time: {}s", self.general.ban_time); - println!( - "> Healthcheck timeout: {}ms", + info!("Pool size: {}", self.general.pool_size); + info!("Pool mode: {}", self.general.pool_mode); + info!("Ban time: {}s", self.general.ban_time); + info!( + "Healthcheck timeout: {}ms", self.general.healthcheck_timeout ); - println!("> Connection timeout: {}ms", self.general.connect_timeout); + info!("Connection timeout: {}ms", self.general.connect_timeout); } } @@ -171,7 +172,7 @@ pub async fn parse(path: &str) -> Result<(), Error> { let mut file = match File::open(path).await { Ok(file) => file, Err(err) => { - println!("> Config error: {:?}", err); + error!("{:?}", err); return Err(Error::BadConfig); } }; @@ -179,7 +180,7 @@ pub async fn parse(path: &str) -> Result<(), Error> { match file.read_to_string(&mut contents).await { Ok(_) => (), Err(err) => { - println!("> Config error: {:?}", err); + error!("{:?}", err); return Err(Error::BadConfig); } }; @@ -187,7 +188,7 @@ pub async fn parse(path: &str) -> Result<(), Error> { let config: Config = match toml::from_str(&contents) { Ok(config) => config, Err(err) => { - println!("> Config error: {:?}", err); + error!("{:?}", err); return Err(Error::BadConfig); } }; @@ -200,7 +201,7 @@ pub async fn parse(path: &str) -> Result<(), Error> { let mut primary_count = 0; if shard.1.servers.len() == 0 { - println!("> Shard {} has no servers configured", shard.0); + error!("Shard {} has no servers configured", shard.0); return Err(Error::BadConfig); } @@ -218,8 +219,8 @@ pub async fn parse(path: &str) -> Result<(), Error> { "primary" => (), "replica" => (), _ => { - println!( - "> Shard {} server role must be either 'primary' or 'replica', got: '{}'", + error!( + "Shard {} server role must be either 'primary' or 'replica', got: '{}'", shard.0, server.2 ); return Err(Error::BadConfig); @@ -228,12 +229,12 @@ pub async fn parse(path: &str) -> Result<(), Error> { } if primary_count > 1 { - println!("> Shard {} has more than on primary configured.", &shard.0); + error!("Shard {} has more than on primary configured", &shard.0); return Err(Error::BadConfig); } if dup_check.len() != shard.1.servers.len() { - println!("> Shard {} contains duplicate server configs.", &shard.0); + error!("Shard {} contains duplicate server configs", &shard.0); return Err(Error::BadConfig); } } @@ -243,8 +244,8 @@ pub async fn parse(path: &str) -> Result<(), Error> { "primary" => (), "replica" => (), other => { - println!( - "> Query router default_role must be 'primary', 'replica', or 'any', got: '{}'", + error!( + "Query router default_role must be 'primary', 'replica', or 'any', got: '{}'", other ); return Err(Error::BadConfig); diff --git a/src/main.rs b/src/main.rs index e03cd12a..65a12c9a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,6 +23,7 @@ extern crate arc_swap; extern crate async_trait; extern crate bb8; extern crate bytes; +extern crate env_logger; extern crate log; extern crate md5; extern crate num_cpus; @@ -34,15 +35,16 @@ extern crate statsd; extern crate tokio; extern crate toml; +use log::{error, info}; use tokio::net::TcpListener; use tokio::{ signal, signal::unix::{signal as unix_signal, SignalKind}, + sync::mpsc, }; use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc; mod client; mod config; @@ -64,11 +66,12 @@ use stats::{Collector, Reporter}; /// Main! #[tokio::main(worker_threads = 4)] async fn main() { - println!("> Welcome to PgCat! Meow."); + env_logger::init(); + info!("Welcome to PgCat! Meow."); // Prepare regexes if !query_router::QueryRouter::setup() { - println!("> Could not setup query router."); + error!("Could not setup query router"); return; } @@ -76,7 +79,7 @@ async fn main() { match config::parse("pgcat.toml").await { Ok(_) => (), Err(err) => { - println!("> Config parse error: {:?}", err); + error!("Config parse error: {:?}", err); return; } }; @@ -87,12 +90,12 @@ async fn main() { let listener = match TcpListener::bind(&addr).await { Ok(sock) => sock, Err(err) => { - println!("> Error: {:?}", err); + error!("Listener socket error: {:?}", err); return; } }; - println!("> Running on {}", addr); + info!("Running on {}", addr); config.show(); // Tracks which client is connected to which server for query cancellation. @@ -102,8 +105,6 @@ async fn main() { let (tx, rx) = mpsc::channel(100); tokio::task::spawn(async move { - println!("> Statistics reporter started"); - let mut stats_collector = Collector::new(rx); stats_collector.collect().await; }); @@ -114,12 +115,12 @@ async fn main() { let server_info = match pool.validate().await { Ok(info) => info, Err(err) => { - println!("> Could not validate connection pool: {:?}", err); + error!("Could not validate connection pool: {:?}", err); return; } }; - println!("> Waiting for clients..."); + info!("Waiting for clients"); // Main app runs here. tokio::task::spawn(async move { @@ -132,7 +133,7 @@ async fn main() { let (socket, addr) = match listener.accept().await { Ok((socket, addr)) => (socket, addr), Err(err) => { - println!("> Listener: {:?}", err); + error!("{:?}", err); continue; } }; @@ -140,35 +141,31 @@ async fn main() { // Client goes to another thread, bye. tokio::task::spawn(async move { let start = chrono::offset::Utc::now().naive_utc(); - - println!(">> Client {:?} connected", addr); - match client::Client::startup(socket, client_server_map, server_info, reporter) .await { Ok(mut client) => { - println!(">> Client {:?} authenticated successfully!", addr); - + info!("Client {:?} connected", addr); match client.handle(pool).await { Ok(()) => { let duration = chrono::offset::Utc::now().naive_utc() - start; - println!( - ">> Client {:?} disconnected, session duration: {}", + info!( + "Client {:?} disconnected, session duration: {}", addr, format_duration(&duration) ); } Err(err) => { - println!(">> Client disconnected with error: {:?}", err); + error!("Client disconnected with error: {:?}", err); client.release(); } } } Err(err) => { - println!(">> Error: {:?}", err); + error!("Client failed to login: {:?}", err); } }; }); @@ -182,13 +179,13 @@ async fn main() { loop { stream.recv().await; - println!("> Reloading config"); + info!("Reloading config"); match config::parse("pgcat.toml").await { Ok(_) => { get_config().show(); } Err(err) => { - println!("> Config parse error: {:?}", err); + error!("{:?}", err); return; } }; @@ -198,11 +195,11 @@ async fn main() { // Setup shut down sequence match signal::ctrl_c().await { Ok(()) => { - println!("> Shutting down..."); + info!("Shutting down..."); } Err(err) => { - eprintln!("Unable to listen for shutdown signal: {}", err); + error!("Unable to listen for shutdown signal: {}", err); } }; } diff --git a/src/pool.rs b/src/pool.rs index bea260bd..87f169a3 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -3,6 +3,7 @@ use async_trait::async_trait; use bb8::{ManageConnection, Pool, PooledConnection}; use bytes::BytesMut; use chrono::naive::NaiveDateTime; +use log::{error, info, warn}; use crate::config::{get_config, Address, Role, User}; use crate::errors::Error; @@ -54,7 +55,7 @@ impl ConnectionPool { "primary" => Role::Primary, "replica" => Role::Replica, _ => { - println!("> Config error: server role can be 'primary' or 'replica', have: '{}'. Defaulting to 'replica'.", server.2); + error!("Config error: server role can be 'primary' or 'replica', have: '{}'. Defaulting to 'replica'.", server.2); Role::Replica } }; @@ -118,7 +119,7 @@ impl ConnectionPool { let connection = match self.get(shard, None).await { Ok(conn) => conn, Err(err) => { - println!("> Shard {} down or misconfigured: {:?}", shard, err); + error!("Shard {} down or misconfigured: {:?}", shard, err); continue; } }; @@ -166,8 +167,8 @@ impl ConnectionPool { }; if !exists { - log::error!( - "ConnectionPool::get Requested role {:?}, but none is configured.", + error!( + "Requested role {:?}, but none are configured", role ); return Err(Error::BadConfig); @@ -198,7 +199,7 @@ impl ConnectionPool { let mut conn = match self.databases[shard][index].get().await { Ok(conn) => conn, Err(err) => { - println!(">> Banning replica {}, error: {:?}", index, err); + error!("Banning replica {}, error: {:?}", index, err); self.ban(address, shard); continue; } @@ -208,6 +209,8 @@ impl ConnectionPool { let server = &mut *conn; let healthcheck_timeout = get_config().general.healthcheck_timeout; + self.stats.server_tested(server.process_id()); + match tokio::time::timeout( tokio::time::Duration::from_millis(healthcheck_timeout), server.query("SELECT 1"), @@ -218,13 +221,11 @@ impl ConnectionPool { Ok(res) => match res { Ok(_) => { self.stats.checkout_time(now.elapsed().as_micros()); + self.stats.server_idle(conn.process_id()); return Ok((conn, address.clone())); } Err(_) => { - println!( - ">> Banning replica {} because of failed health check", - index - ); + error!("Banning replica {} because of failed health check", index); // Don't leave a bad connection in the pool. server.mark_bad(); @@ -234,10 +235,7 @@ impl ConnectionPool { }, // Health check never came back, database is really really down Err(_) => { - println!( - ">> Banning replica {} because of health check timeout", - index - ); + error!("Banning replica {} because of health check timeout", index); // Don't leave a bad connection in the pool. server.mark_bad(); @@ -254,7 +252,7 @@ impl ConnectionPool { /// traffic for any new transactions. Existing transactions on that replica /// will finish successfully or error out to the clients. pub fn ban(&self, address: &Address, shard: usize) { - println!(">> Banning {:?}", address); + error!("Banning {:?}", address); let now = chrono::offset::Utc::now().naive_utc(); let mut guard = self.banlist.lock().unwrap(); guard[shard].insert(address.clone(), now); @@ -287,7 +285,7 @@ impl ConnectionPool { if guard[shard].len() == replicas_available { guard[shard].clear(); drop(guard); - println!(">> Unbanning all replicas."); + warn!("Unbanning all replicas."); return false; } @@ -351,7 +349,10 @@ impl ManageConnection for ServerPool { /// Attempts to create a new connection. async fn connect(&self) -> Result { - println!(">> Creating a new connection for the pool"); + info!( + "Creating a new connection to {:?} using user {:?}", + self.address, self.user.name + ); Server::startup( &self.address, diff --git a/src/query_router.rs b/src/query_router.rs index 097cdefe..64d6e704 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -8,6 +8,7 @@ use regex::RegexSet; use sqlparser::ast::Statement::{Query, StartTransaction}; use sqlparser::dialect::PostgreSqlDialect; use sqlparser::parser::Parser; +use log::{error, debug}; const CUSTOM_SQL_REGEXES: [&str; 5] = [ r"(?i)SET SHARDING KEY TO '[0-9]+'", @@ -54,7 +55,7 @@ impl QueryRouter { let set = match RegexSet::new(&CUSTOM_SQL_REGEXES) { Ok(rgx) => rgx, Err(err) => { - log::error!("QueryRouter::setup Could not compile regex set: {:?}", err); + error!("QueryRouter::setup Could not compile regex set: {:?}", err); return false; } }; @@ -219,8 +220,8 @@ impl QueryRouter { let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) { Ok(ast) => ast, Err(err) => { - log::debug!( - "QueryParser::infer_role could not parse query, error: {:?}, query: {}", + debug!( + "{:?}, query: {}", err, query ); diff --git a/src/server.rs b/src/server.rs index 842b81fc..18b1f267 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,7 @@ +use bytes::{Buf, BufMut, BytesMut}; ///! Implementation of the PostgreSQL server (database) protocol. ///! Here we are pretending to the a Postgres client. -use bytes::{Buf, BufMut, BytesMut}; +use log::{error, info}; use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, @@ -33,7 +34,7 @@ pub struct Server { server_info: BytesMut, // Backend id and secret key used for query cancellation. - backend_id: i32, + process_id: i32, secret_key: i32, // Is the server inside a transaction or idle. @@ -69,7 +70,7 @@ impl Server { match TcpStream::connect(&format!("{}:{}", &address.host, &address.port)).await { Ok(stream) => stream, Err(err) => { - println!(">> Could not connect to server: {}", err); + error!("Could not connect to server: {}", err); return Err(Error::SocketError); } }; @@ -78,7 +79,7 @@ impl Server { startup(&mut stream, &user.name, database).await?; let mut server_info = BytesMut::new(); - let mut backend_id: i32 = 0; + let mut process_id: i32 = 0; let mut secret_key: i32 = 0; // We'll be handling multiple packets, but they will all be structured the same. @@ -121,7 +122,7 @@ impl Server { AUTHENTICATION_SUCCESSFUL => (), _ => { - println!(">> Unsupported authentication mechanism: {}", auth_code); + error!("Unsupported authentication mechanism: {}", auth_code); return Err(Error::ServerError); } } @@ -151,7 +152,7 @@ impl Server { // TODO: the error message contains multiple fields; we can decode them and // present a prettier message to the user. // See: https://www.postgresql.org/docs/12/protocol-error-fields.html - println!(">> Server error: {}", String::from_utf8_lossy(&error)); + error!("Server error: {}", String::from_utf8_lossy(&error)); } }; @@ -179,7 +180,7 @@ impl Server { 'K' => { // The frontend must save these values if it wishes to be able to issue CancelRequest messages later. // See: https://www.postgresql.org/docs/12/protocol-message-formats.html - backend_id = match stream.read_i32().await { + process_id = match stream.read_i32().await { Ok(id) => id, Err(_) => return Err(Error::SocketError), }; @@ -209,7 +210,7 @@ impl Server { write: write, buffer: BytesMut::with_capacity(8196), server_info: server_info, - backend_id: backend_id, + process_id: process_id, secret_key: secret_key, in_transaction: false, data_available: false, @@ -223,7 +224,7 @@ impl Server { // We have an unexpected message from the server during this exchange. // Means we implemented the protocol wrong or we're not talking to a Postgres server. _ => { - println!(">> Unknown code: {}", code); + error!("Unknown code: {}", code); return Err(Error::ProtocolSyncError); } }; @@ -241,7 +242,7 @@ impl Server { let mut stream = match TcpStream::connect(&format!("{}:{}", host, port)).await { Ok(stream) => stream, Err(err) => { - println!(">> Could not connect to server: {}", err); + error!("Could not connect to server: {}", err); return Err(Error::SocketError); } }; @@ -262,7 +263,7 @@ impl Server { match write_all_half(&mut self.write, messages).await { Ok(_) => Ok(()), Err(err) => { - println!(">> Terminating server because of: {:?}", err); + error!("Terminating server because of: {:?}", err); self.bad = true; Err(err) } @@ -277,7 +278,7 @@ impl Server { let mut message = match read_message(&mut self.read).await { Ok(message) => message, Err(err) => { - println!(">> Terminating server because of: {:?}", err); + error!("Terminating server because of: {:?}", err); self.bad = true; return Err(err); } @@ -396,7 +397,7 @@ impl Server { /// Indicate that this server connection cannot be re-used and must be discarded. pub fn mark_bad(&mut self) { - println!(">> Server marked bad"); + error!("Server marked bad"); self.bad = true; } @@ -406,7 +407,7 @@ impl Server { guard.insert( (process_id, secret_key), ( - self.backend_id, + self.process_id, self.secret_key, self.address.host.clone(), self.address.port.clone(), @@ -455,6 +456,10 @@ impl Server { pub fn address(&self) -> Address { self.address.clone() } + + pub fn process_id(&self) -> i32 { + self.process_id + } } impl Drop for Server { @@ -462,6 +467,8 @@ impl Drop for Server { /// the socket is in non-blocking mode, so it may not be ready /// for a write. fn drop(&mut self) { + self.stats.server_disconnecting(self.process_id()); + let mut bytes = BytesMut::with_capacity(4); bytes.put_u8(b'X'); bytes.put_i32(4); @@ -476,8 +483,8 @@ impl Drop for Server { let now = chrono::offset::Utc::now().naive_utc(); let duration = now - self.connected_at; - println!( - ">> Server connection closed, session duration: {}", + info!( + "Server connection closed, session duration: {}", crate::format_duration(&duration) ); } diff --git a/src/stats.rs b/src/stats.rs index e74c0685..beb430f1 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,3 +1,4 @@ +use log::info; use statsd::Client; /// Statistics collector and publisher. use tokio::sync::mpsc::{Receiver, Sender}; @@ -20,6 +21,11 @@ pub enum StatisticName { ClientActive, ClientIdle, ClientDisconnecting, + ServerActive, + ServerIdle, + ServerTested, + ServerLogin, + ServerDisconnecting, } #[derive(Debug)] @@ -100,7 +106,6 @@ impl Reporter { } pub fn client_active(&mut self, process_id: i32) { - let statistic = Statistic { name: StatisticName::ClientActive, value: 1, @@ -129,6 +134,56 @@ impl Reporter { let _ = self.tx.try_send(statistic); } + + pub fn server_active(&mut self, process_id: i32) { + let statistic = Statistic { + name: StatisticName::ServerActive, + value: 1, + process_id: Some(process_id), + }; + + let _ = self.tx.try_send(statistic); + } + + pub fn server_idle(&mut self, process_id: i32) { + let statistic = Statistic { + name: StatisticName::ServerIdle, + value: 1, + process_id: Some(process_id), + }; + + let _ = self.tx.try_send(statistic); + } + + pub fn server_login(&mut self, process_id: i32) { + let statistic = Statistic { + name: StatisticName::ServerLogin, + value: 1, + process_id: Some(process_id), + }; + + let _ = self.tx.try_send(statistic); + } + + pub fn server_tested(&mut self, process_id: i32) { + let statistic = Statistic { + name: StatisticName::ServerTested, + value: 1, + process_id: Some(process_id), + }; + + let _ = self.tx.try_send(statistic); + } + + pub fn server_disconnecting(&mut self, process_id: i32) { + let statistic = Statistic { + name: StatisticName::ServerDisconnecting, + value: 1, + process_id: Some(process_id), + }; + + let _ = self.tx.try_send(statistic); + } } pub struct Collector { @@ -145,6 +200,8 @@ impl Collector { } pub async fn collect(&mut self) { + info!("Statistics reporter started"); + let mut stats = HashMap::from([ ("total_query_count", 0), ("total_xact_count", 0), @@ -156,9 +213,13 @@ impl Collector { ("cl_waiting", 0), ("cl_active", 0), ("cl_idle", 0), + ("sv_idle", 0), + ("sv_active", 0), + ("sv_login", 0), + ("sv_tested", 0), ]); - let mut client_states: HashMap = HashMap::new(); + let mut client_server_states: HashMap = HashMap::new(); let mut now = Instant::now(); @@ -166,7 +227,7 @@ impl Collector { let stat = match self.rx.recv().await { Some(stat) => stat, None => { - println!(">> Statistics collector is shutting down."); + info!("Statistics collector is shutting down"); return; } }; @@ -212,20 +273,25 @@ impl Collector { } } - StatisticName::ClientActive | StatisticName::ClientWaiting | StatisticName::ClientIdle => { - client_states.insert(stat.process_id.unwrap(), stat.name); + StatisticName::ClientActive + | StatisticName::ClientWaiting + | StatisticName::ClientIdle + | StatisticName::ServerActive + | StatisticName::ServerIdle + | StatisticName::ServerTested + | StatisticName::ServerLogin => { + client_server_states.insert(stat.process_id.unwrap(), stat.name); } - StatisticName::ClientDisconnecting => { - client_states.remove(&stat.process_id.unwrap()); + StatisticName::ClientDisconnecting | StatisticName::ServerDisconnecting => { + client_server_states.remove(&stat.process_id.unwrap()); } }; - // It's been 15 seconds. If there is no traffic, it won't publish anything, // but it also doesn't matter then. if now.elapsed().as_secs() > 15 { - for (_, state) in &client_states { + for (_, state) in &client_server_states { match state { StatisticName::ClientActive => { let counter = stats.entry("cl_active").or_insert(0); @@ -242,11 +308,31 @@ impl Collector { *counter += 1; } + StatisticName::ServerIdle => { + let counter = stats.entry("sv_idle").or_insert(0); + *counter += 1; + } + + StatisticName::ServerActive => { + let counter = stats.entry("sv_active").or_insert(0); + *counter += 1; + } + + StatisticName::ServerTested => { + let counter = stats.entry("sv_tested").or_insert(0); + *counter += 1; + } + + StatisticName::ServerLogin => { + let counter = stats.entry("sv_login").or_insert(0); + *counter += 1; + } + _ => unreachable!(), }; } - println!(">> Reporting to StatsD: {:?}", stats); + info!("{:?}", stats); let mut pipeline = self.client.pipeline();