Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
key: cargo-lock-2-{{ checksum "Cargo.lock" }}
- run:
name: "Install dependencies"
command: "sudo apt-get update && sudo apt-get install -y postgresql-contrib-12 postgresql-client-12"
command: "sudo apt-get update && sudo apt-get install -y psmisc postgresql-contrib-12 postgresql-client-12"
- run:
name: "Build"
command: "cargo build"
Expand All @@ -47,4 +47,4 @@ jobs:
workflows:
build:
jobs:
- build
- build
3 changes: 3 additions & 0 deletions .circleci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > /

# Replica/primary selection & more sharding tests
psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null

# Attempt clean shut down
killall pgcat -s SIGINT
114 changes: 63 additions & 51 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern crate toml;

use regex::Regex;
use tokio::net::TcpListener;
use tokio::signal;

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -108,61 +109,72 @@ async fn main() {

println!("> Waiting for clients...");

loop {
let pool = pool.clone();
let client_server_map = client_server_map.clone();
let server_info = server_info.clone();

let (socket, addr) = match listener.accept().await {
Ok((socket, addr)) => (socket, addr),
Err(err) => {
println!("> Listener: {:?}", err);
continue;
}
};

// Client goes to another thread, bye.
tokio::task::spawn(async move {
let start = chrono::offset::Utc::now().naive_utc();

println!(">> Client {:?} connected", addr);

match client::Client::startup(
socket,
client_server_map,
transaction_mode,
default_server_role,
server_info,
)
.await
{
Ok(mut client) => {
println!(">> Client {:?} authenticated successfully!", addr);

match client.handle(pool).await {
Ok(()) => {
let duration = chrono::offset::Utc::now().naive_utc() - start;

println!(
">> Client {:?} disconnected, session duration: {}",
addr,
format_duration(&duration)
);
}

Err(err) => {
println!(">> Client disconnected with error: {:?}", err);
client.release();
}
}
}
// Main app runs here.
tokio::task::spawn(async move {
loop {
let pool = pool.clone();
let client_server_map = client_server_map.clone();
let server_info = server_info.clone();

let (socket, addr) = match listener.accept().await {
Ok((socket, addr)) => (socket, addr),
Err(err) => {
println!(">> Error: {:?}", err);
println!("> Listener: {:?}", err);
continue;
}
};
});
}

// Client goes to another thread, bye.
tokio::task::spawn(async move {
let start = chrono::offset::Utc::now().naive_utc();

println!(">> Client {:?} connected", addr);

match client::Client::startup(
socket,
client_server_map,
transaction_mode,
default_server_role,
server_info,
)
.await
{
Ok(mut client) => {
println!(">> Client {:?} authenticated successfully!", addr);

match client.handle(pool).await {
Ok(()) => {
let duration = chrono::offset::Utc::now().naive_utc() - start;

println!(
">> Client {:?} disconnected, session duration: {}",
addr,
format_duration(&duration)
);
}

Err(err) => {
println!(">> Client disconnected with error: {:?}", err);
client.release();
}
}
}

Err(err) => {
println!(">> Error: {:?}", err);
}
};
});
}
});

// Setup shut down sequence
match signal::ctrl_c().await {
Ok(()) => {}
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
}
};
}

/// Format chrono::Duration to be more human-friendly.
Expand Down
29 changes: 29 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ pub struct Server {
// Mapping of clients and servers used for query cancellation.
client_server_map: ClientServerMap,

// Server role, e.g. primary or replica.
role: Role,

// Server connected at
connected_at: chrono::naive::NaiveDateTime,
}

impl Server {
Expand Down Expand Up @@ -193,6 +197,7 @@ impl Server {
bad: false,
client_server_map: client_server_map,
role: role,
connected_at: chrono::offset::Utc::now().naive_utc(),
});
}

Expand Down Expand Up @@ -417,3 +422,27 @@ impl Server {
}
}
}

impl Drop for Server {
// Try to do a clean shut down.
fn drop(&mut self) {
let mut bytes = BytesMut::with_capacity(4);
bytes.put_u8(b'X');
bytes.put_i32(4);

match self.write.try_write(&bytes) {
Ok(n) => (),
Err(_) => (),
};

self.bad = true;

let now = chrono::offset::Utc::now().naive_utc();
let duration = now - self.connected_at;

println!(
">> Server connection closed, session duration: {}",
crate::format_duration(&duration)
);
}
}