Skip to content

admin: SHOW STATS #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .circleci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ cd tests/ruby && \
ruby tests.rb && \
cd ../..

# Admin tests
psql -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)

# Start PgCat in debug to demonstrate failover better
start_pgcat "debug"

Expand Down
116 changes: 116 additions & 0 deletions src/admin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use bytes::{Buf, BufMut, BytesMut};
use log::trace;
use tokio::net::tcp::OwnedWriteHalf;

use std::collections::HashMap;

use crate::constants::{OID_NUMERIC, OID_TEXT};
use crate::errors::Error;
use crate::messages::write_all_half;
use crate::stats::get_stats;

/// Handle admin client
pub async fn handle_admin(stream: &mut OwnedWriteHalf, mut query: BytesMut) -> Result<(), Error> {
let code = query.get_u8() as char;

if code != 'Q' {
return Err(Error::ProtocolSyncError);
}

let len = query.get_i32() as usize;
let query = String::from_utf8_lossy(&query[..len - 5])
.to_string()
.to_ascii_uppercase();

if query.starts_with("SHOW STATS") {
trace!("SHOW STATS");
show_stats(stream).await
} else {
Err(Error::ProtocolSyncError)
}
}

/// SHOW STATS
pub async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
let columns = [
"database",
"total_xact_count",
"total_query_count",
"total_received",
"total_sent",
"total_xact_time",
"total_query_time",
"total_wait_time",
"avg_xact_count",
"avg_query_count",
"avg_recv",
"avg_sent",
"avg_xact_time",
"avg_query_time",
"avg_wait_time",
];

let stats = get_stats().unwrap_or(HashMap::new());
let mut res = BytesMut::new();
let mut row_desc = BytesMut::new();
let mut data_row = BytesMut::new();

// Number of columns: 1
row_desc.put_i16(columns.len() as i16);
data_row.put_i16(columns.len() as i16);

for (i, column) in columns.iter().enumerate() {
// RowDescription

// Column name
row_desc.put_slice(&format!("{}\0", column).as_bytes());

// Doesn't belong to any table
row_desc.put_i32(0);

// Doesn't belong to any table
row_desc.put_i16(0);

// Data type
row_desc.put_i32(if i == 0 { OID_TEXT } else { OID_NUMERIC });

// Numeric/text size = variable (-1)
row_desc.put_i16(-1);

// Type modifier: none that I know
row_desc.put_i32(-1);

// Format being used: text (0), binary (1)
row_desc.put_i16(0);

// DataRow
let value = if i == 0 {
String::from("all shards")
} else {
stats.get(&column.to_string()).unwrap_or(&0).to_string()
};

data_row.put_i32(value.len() as i32);
data_row.put_slice(value.as_bytes());
}

let command_complete = BytesMut::from(&"SHOW\0"[..]);

res.put_u8(b'T');
res.put_i32(row_desc.len() as i32 + 4);
res.put(row_desc);

res.put_u8(b'D');
res.put_i32(data_row.len() as i32 + 4);
res.put(data_row);

res.put_u8(b'C');
res.put_i32(command_complete.len() as i32 + 4);
res.put(command_complete);

res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');

write_all_half(stream, res).await
}
22 changes: 22 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::net::{

use std::collections::HashMap;

use crate::admin::handle_admin;
use crate::config::get_config;
use crate::constants::*;
use crate::errors::Error;
Expand Down Expand Up @@ -54,6 +55,9 @@ pub struct Client {

// Statistics
stats: Reporter,

// Clients want to talk to admin
admin: bool,
}

impl Client {
Expand Down Expand Up @@ -118,6 +122,15 @@ impl Client {
ready_for_query(&mut stream).await?;
trace!("Startup OK");

let database = parameters
.get("database")
.unwrap_or(parameters.get("user").unwrap());
let admin = ["pgcat", "pgbouncer"]
.iter()
.filter(|db| *db == &database)
.count()
== 1;

// Split the read and write streams
// so we can control buffering.
let (read, write) = stream.into_split();
Expand All @@ -133,6 +146,7 @@ impl Client {
client_server_map: client_server_map,
parameters: parameters,
stats: stats,
admin: admin,
});
}

Expand All @@ -154,6 +168,7 @@ impl Client {
client_server_map: client_server_map,
parameters: HashMap::new(),
stats: stats,
admin: false,
});
}

Expand Down Expand Up @@ -220,6 +235,13 @@ impl Client {
return Ok(());
}

// Handle admin database real quick
if self.admin {
trace!("Handling admin command");
handle_admin(&mut self.write, message).await?;
continue;
}

// Handle all custom protocol commands here.
match query_router.try_execute_command(message.clone()) {
// Normal query
Expand Down
6 changes: 6 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ pub const AUTHENTICATION_SUCCESSFUL: i32 = 0;

// ErrorResponse: A code identifying the field type; if zero, this is the message terminator and no string follows.
pub const MESSAGE_TERMINATOR: u8 = 0;

//
// Data types
//
pub const OID_NUMERIC: i32 = 1700;
pub const OID_TEXT: i32 = 25;
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use tokio::{
use std::collections::HashMap;
use std::sync::Arc;

mod admin;
mod client;
mod config;
mod constants;
Expand Down
2 changes: 1 addition & 1 deletion src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ pub async fn show_response(
row_desc.put_i16(-1);

// Type modifier: none that I know
row_desc.put_i32(0);
row_desc.put_i32(-1);

// Format being used: text (0), binary (1)
row_desc.put_i16(0);
Expand Down
35 changes: 34 additions & 1 deletion src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use log::info;
use log::{error, info};
use once_cell::sync::OnceCell;
use statsd::Client;
/// Events collector and publisher.
use tokio::sync::mpsc::{Receiver, Sender};

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use crate::config::get_config;

static LATEST_STATS: OnceCell<Arc<Mutex<HashMap<String, i64>>>> = OnceCell::new();

#[derive(Debug, Clone, Copy)]
enum EventName {
CheckoutTime,
Expand Down Expand Up @@ -212,6 +216,13 @@ impl Collector {
pub async fn collect(&mut self) {
info!("Events reporter started");

match LATEST_STATS.set(Arc::new(Mutex::new(HashMap::new()))) {
Ok(_) => (),
Err(_) => {
error!("Latest stats will not be available");
}
};

let mut stats = HashMap::from([
("total_query_count", 0),
("total_xact_count", 0),
Expand Down Expand Up @@ -352,6 +363,17 @@ impl Collector {

info!("{:?}", stats);

match LATEST_STATS.get() {
Some(arc) => {
let mut guard = arc.lock().unwrap();
for (key, value) in &stats {
guard.insert(key.to_string(), value.clone());
}
}

None => (),
};

let mut pipeline = self.client.pipeline();

for (key, value) in stats.iter_mut() {
Expand All @@ -365,3 +387,14 @@ impl Collector {
}
}
}

pub fn get_stats() -> Option<HashMap<String, i64>> {
match LATEST_STATS.get() {
Some(arc) => {
let guard = arc.lock().unwrap();
Some(guard.clone())
}

None => None,
}
}