Skip to content

use logger lib; minor refactor; sv_* stats #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ statsd = "0.15"
sqlparser = "0.14"
log = "0.4"
arc-swap = "1"
env_logger = "0.9"
12 changes: 8 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/// We are pretending to the server in this scenario,
/// and this module implements that.
use bytes::{Buf, BufMut, BytesMut};
use log::error;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
Expand Down Expand Up @@ -253,7 +254,7 @@ impl Client {
let connection = match pool.get(query_router.shard(), query_router.role()).await {
Ok(conn) => conn,
Err(err) => {
println!(">> Could not get connection from pool: {:?}", err);
error!("Could not get connection from pool: {:?}", err);
error_response(&mut self.write, "could not get connection from the pool")
.await?;
continue;
Expand All @@ -267,8 +268,9 @@ impl Client {
// Claim this server as mine for query cancellation.
server.claim(self.process_id, self.secret_key);

// Client active
// Client active & server active
self.stats.client_active(self.process_id);
self.stats.server_active(server.process_id());

// Transaction loop. Multiple queries can be issued by the client here.
// The connection belongs to the client until the transaction is over,
Expand Down Expand Up @@ -338,7 +340,7 @@ impl Client {
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
// Report this client as idle.
self.stats.server_idle(server.process_id());
break;
}
}
Expand Down Expand Up @@ -420,6 +422,7 @@ impl Client {
self.stats.transaction();

if self.transaction_mode {
self.stats.server_idle(server.process_id());
break;
}
}
Expand Down Expand Up @@ -453,6 +456,7 @@ impl Client {
self.stats.transaction();

if self.transaction_mode {
self.stats.server_idle(server.process_id());
break;
}
}
Expand All @@ -461,7 +465,7 @@ impl Client {
// Some unexpected message. We either did not implement the protocol correctly
// or this is not a Postgres client we're talking to.
_ => {
println!(">>> Unexpected code: {}", code);
error!("Unexpected code: {}", code);
}
}
}
Expand Down
33 changes: 17 additions & 16 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arc_swap::{ArcSwap, Guard};
use log::{error, info};
use once_cell::sync::Lazy;
use serde_derive::Deserialize;
use tokio::fs::File;
Expand Down Expand Up @@ -150,14 +151,14 @@ impl Default for Config {

impl Config {
pub fn show(&self) {
println!("> Pool size: {}", self.general.pool_size);
println!("> Pool mode: {}", self.general.pool_mode);
println!("> Ban time: {}s", self.general.ban_time);
println!(
"> Healthcheck timeout: {}ms",
info!("Pool size: {}", self.general.pool_size);
info!("Pool mode: {}", self.general.pool_mode);
info!("Ban time: {}s", self.general.ban_time);
info!(
"Healthcheck timeout: {}ms",
self.general.healthcheck_timeout
);
println!("> Connection timeout: {}ms", self.general.connect_timeout);
info!("Connection timeout: {}ms", self.general.connect_timeout);
}
}

Expand All @@ -171,23 +172,23 @@ pub async fn parse(path: &str) -> Result<(), Error> {
let mut file = match File::open(path).await {
Ok(file) => file,
Err(err) => {
println!("> Config error: {:?}", err);
error!("{:?}", err);
return Err(Error::BadConfig);
}
};

match file.read_to_string(&mut contents).await {
Ok(_) => (),
Err(err) => {
println!("> Config error: {:?}", err);
error!("{:?}", err);
return Err(Error::BadConfig);
}
};

let config: Config = match toml::from_str(&contents) {
Ok(config) => config,
Err(err) => {
println!("> Config error: {:?}", err);
error!("{:?}", err);
return Err(Error::BadConfig);
}
};
Expand All @@ -200,7 +201,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
let mut primary_count = 0;

if shard.1.servers.len() == 0 {
println!("> Shard {} has no servers configured", shard.0);
error!("Shard {} has no servers configured", shard.0);
return Err(Error::BadConfig);
}

Expand All @@ -218,8 +219,8 @@ pub async fn parse(path: &str) -> Result<(), Error> {
"primary" => (),
"replica" => (),
_ => {
println!(
"> Shard {} server role must be either 'primary' or 'replica', got: '{}'",
error!(
"Shard {} server role must be either 'primary' or 'replica', got: '{}'",
shard.0, server.2
);
return Err(Error::BadConfig);
Expand All @@ -228,12 +229,12 @@ pub async fn parse(path: &str) -> Result<(), Error> {
}

if primary_count > 1 {
println!("> Shard {} has more than on primary configured.", &shard.0);
error!("Shard {} has more than on primary configured", &shard.0);
return Err(Error::BadConfig);
}

if dup_check.len() != shard.1.servers.len() {
println!("> Shard {} contains duplicate server configs.", &shard.0);
error!("Shard {} contains duplicate server configs", &shard.0);
return Err(Error::BadConfig);
}
}
Expand All @@ -243,8 +244,8 @@ pub async fn parse(path: &str) -> Result<(), Error> {
"primary" => (),
"replica" => (),
other => {
println!(
"> Query router default_role must be 'primary', 'replica', or 'any', got: '{}'",
error!(
"Query router default_role must be 'primary', 'replica', or 'any', got: '{}'",
other
);
return Err(Error::BadConfig);
Expand Down
Loading