From 52e152f7d5f22fbf3088e0ff177e4a20f3d7b0fc Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Fri, 25 Feb 2022 18:10:32 -0800 Subject: [PATCH 1/5] admin: show stats --- src/admin.rs | 120 +++++++++++++++++++++++++++++++++++++++++++++++ src/client.rs | 22 +++++++++ src/constants.rs | 6 +++ src/main.rs | 1 + src/messages.rs | 2 +- src/stats.rs | 35 +++++++++++++- 6 files changed, 184 insertions(+), 2 deletions(-) create mode 100644 src/admin.rs diff --git a/src/admin.rs b/src/admin.rs new file mode 100644 index 00000000..e7e25219 --- /dev/null +++ b/src/admin.rs @@ -0,0 +1,120 @@ +use tokio::net::{ + tcp::{OwnedReadHalf, OwnedWriteHalf}, + TcpStream, +}; + +use bytes::{Buf, BufMut, BytesMut}; +use log::trace; + +use std::collections::HashMap; + +use crate::constants::{OID_NUMERIC, OID_TEXT}; +use crate::errors::Error; +use crate::messages::write_all_half; +use crate::stats::get_stats; + +/// Handle admin client +pub async fn handle_admin(stream: &mut OwnedWriteHalf, mut query: BytesMut) -> Result<(), Error> { + let code = query.get_u8() as char; + + if code != 'Q' { + return Err(Error::ProtocolSyncError); + } + + let len = query.get_i32() as usize; + let query = String::from_utf8_lossy(&query[..len - 5]) + .to_string() + .to_ascii_uppercase(); + + if query.starts_with("SHOW STATS") { + trace!("SHOW STATS"); + show_stats(stream).await + } else { + Err(Error::ProtocolSyncError) + } +} + +/// SHOW STATS +pub async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> { + let columns = [ + "database", + "total_xact_count", + "total_query_count", + "total_received", + "total_sent", + "total_xact_time", + "total_query_time", + "total_wait_time", + "avg_xact_count", + "avg_query_count", + "avg_recv", + "avg_sent", + "avg_xact_time", + "avg_query_time", + "avg_wait_time", + ]; + + let stats = get_stats().unwrap_or(HashMap::new()); + let mut res = BytesMut::new(); + let mut row_desc = BytesMut::new(); + let mut data_row = BytesMut::new(); + + // Number of columns: 1 + row_desc.put_i16(columns.len() as i16); + data_row.put_i16(columns.len() as i16); + + for (i, column) in columns.iter().enumerate() { + // RowDescription + + // Column name + row_desc.put_slice(&format!("{}\0", column).as_bytes()); + + // Doesn't belong to any table + row_desc.put_i32(0); + + // Doesn't belong to any table + row_desc.put_i16(0); + + // Data type + row_desc.put_i32(if i == 0 { OID_TEXT } else { OID_NUMERIC }); + + // Numeric/text size = variable (-1) + row_desc.put_i16(-1); + + // Type modifier: none that I know + row_desc.put_i32(0); //TODO maybe -1? + + // Format being used: text (0), binary (1) + row_desc.put_i16(0); + + // DataRow + let value = if i == 0 { + String::from("all shards") + } else { + stats.get(&column.to_string()).unwrap_or(&0).to_string() + }; + + data_row.put_i32(value.len() as i32); + data_row.put_slice(value.as_bytes()); + } + + let command_complete = BytesMut::from(&"SHOW\0"[..]); + + res.put_u8(b'T'); + res.put_i32(row_desc.len() as i32 + 4); + res.put(row_desc); + + res.put_u8(b'D'); + res.put_i32(data_row.len() as i32 + 4); + res.put(data_row); + + res.put_u8(b'C'); + res.put_i32(command_complete.len() as i32 + 4); + res.put(command_complete); + + res.put_u8(b'Z'); + res.put_i32(5); + res.put_u8(b'I'); + + write_all_half(stream, res).await +} diff --git a/src/client.rs b/src/client.rs index b0ca7d1d..fb1a4b0e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,6 +11,7 @@ use tokio::net::{ use std::collections::HashMap; +use crate::admin::handle_admin; use crate::config::get_config; use crate::constants::*; use crate::errors::Error; @@ -54,6 +55,9 @@ pub struct Client { // Statistics stats: Reporter, + + // Clients want to talk to admin + admin: bool, } impl Client { @@ -118,6 +122,15 @@ impl Client { ready_for_query(&mut stream).await?; trace!("Startup OK"); + let database = parameters + .get("database") + .unwrap_or(parameters.get("user").unwrap()); + let admin = ["pgcat", "pgbouncer"] + .iter() + .filter(|db| *db == &database) + .count() + == 1; + // Split the read and write streams // so we can control buffering. let (read, write) = stream.into_split(); @@ -133,6 +146,7 @@ impl Client { client_server_map: client_server_map, parameters: parameters, stats: stats, + admin: admin, }); } @@ -154,6 +168,7 @@ impl Client { client_server_map: client_server_map, parameters: HashMap::new(), stats: stats, + admin: false, }); } @@ -220,6 +235,13 @@ impl Client { return Ok(()); } + // Handle admin database real quick + if self.admin { + trace!("Handling admin command"); + handle_admin(&mut self.write, message).await?; + continue; + } + // Handle all custom protocol commands here. match query_router.try_execute_command(message.clone()) { // Normal query diff --git a/src/constants.rs b/src/constants.rs index 3bdb47d8..25609759 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -20,3 +20,9 @@ pub const AUTHENTICATION_SUCCESSFUL: i32 = 0; // ErrorResponse: A code identifying the field type; if zero, this is the message terminator and no string follows. pub const MESSAGE_TERMINATOR: u8 = 0; + +// +// Data types +// +pub const OID_NUMERIC: i32 = 1700; +pub const OID_TEXT: i32 = 25; diff --git a/src/main.rs b/src/main.rs index 2cf67737..a41d5ecc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,6 +47,7 @@ use tokio::{ use std::collections::HashMap; use std::sync::Arc; +mod admin; mod client; mod config; mod constants; diff --git a/src/messages.rs b/src/messages.rs index e48af531..d8bd6992 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -274,7 +274,7 @@ pub async fn show_response( row_desc.put_i16(-1); // Type modifier: none that I know - row_desc.put_i32(0); + row_desc.put_i32(0); //TODO maybe -1? // Format being used: text (0), binary (1) row_desc.put_i16(0); diff --git a/src/stats.rs b/src/stats.rs index 384aa1a2..40a8ab75 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,12 +1,16 @@ -use log::info; +use log::{error, info}; +use once_cell::sync::OnceCell; use statsd::Client; /// Events collector and publisher. use tokio::sync::mpsc::{Receiver, Sender}; use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use crate::config::get_config; +static LATEST_STATS: OnceCell>>> = OnceCell::new(); + #[derive(Debug, Clone, Copy)] enum EventName { CheckoutTime, @@ -212,6 +216,13 @@ impl Collector { pub async fn collect(&mut self) { info!("Events reporter started"); + match LATEST_STATS.set(Arc::new(Mutex::new(HashMap::new()))) { + Ok(_) => (), + Err(_) => { + error!("Latest stats will not be available"); + } + }; + let mut stats = HashMap::from([ ("total_query_count", 0), ("total_xact_count", 0), @@ -352,6 +363,17 @@ impl Collector { info!("{:?}", stats); + match LATEST_STATS.get() { + Some(arc) => { + let mut guard = arc.lock().unwrap(); + for (key, value) in &stats { + guard.insert(key.to_string(), value.clone()); + } + } + + None => (), + }; + let mut pipeline = self.client.pipeline(); for (key, value) in stats.iter_mut() { @@ -365,3 +387,14 @@ impl Collector { } } } + +pub fn get_stats() -> Option> { + match LATEST_STATS.get() { + Some(arc) => { + let guard = arc.lock().unwrap(); + Some(guard.clone()) + } + + None => None, + } +} From fc62eb5d9c6a6368c75b77c96b11c0b0b9246a5d Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Fri, 25 Feb 2022 18:13:14 -0800 Subject: [PATCH 2/5] warning --- src/admin.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index e7e25219..3d299495 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -1,8 +1,4 @@ -use tokio::net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpStream, -}; - +use tokio::net::tcp::OwnedWriteHalf; use bytes::{Buf, BufMut, BytesMut}; use log::trace; From f57e6bc86019784d6c70c2f281f08d2df7359ebc Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Fri, 25 Feb 2022 18:15:16 -0800 Subject: [PATCH 3/5] tests --- .circleci/run_tests.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 73a237be..93827f9a 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -57,6 +57,10 @@ cd tests/ruby && \ ruby tests.rb && \ cd ../.. +# Admin tests +psql -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null +(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null) + # Start PgCat in debug to demonstrate failover better start_pgcat "debug" From d9f109b59af58e8fa14709362f6acf20ad845458 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Fri, 25 Feb 2022 18:16:22 -0800 Subject: [PATCH 4/5] lint --- src/admin.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/admin.rs b/src/admin.rs index 3d299495..c2881be3 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -1,6 +1,6 @@ -use tokio::net::tcp::OwnedWriteHalf; use bytes::{Buf, BufMut, BytesMut}; use log::trace; +use tokio::net::tcp::OwnedWriteHalf; use std::collections::HashMap; From 651be057636c7e26efc9731320fde7bd1ae54dd9 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Fri, 25 Feb 2022 18:18:53 -0800 Subject: [PATCH 5/5] type mod --- src/admin.rs | 2 +- src/messages.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index c2881be3..5108346c 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -78,7 +78,7 @@ pub async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> { row_desc.put_i16(-1); // Type modifier: none that I know - row_desc.put_i32(0); //TODO maybe -1? + row_desc.put_i32(-1); // Format being used: text (0), binary (1) row_desc.put_i16(0); diff --git a/src/messages.rs b/src/messages.rs index d8bd6992..e24ad9d7 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -274,7 +274,7 @@ pub async fn show_response( row_desc.put_i16(-1); // Type modifier: none that I know - row_desc.put_i32(0); //TODO maybe -1? + row_desc.put_i32(-1); // Format being used: text (0), binary (1) row_desc.put_i16(0);