Skip to content

Constants and comments #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions .circleci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
set -e
set -o xtrace

psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql

./target/debug/pgcat &

sleep 1

psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql

# Setup PgBench
pgbench -i -h 127.0.0.1 -p 6432

Expand All @@ -18,6 +18,13 @@ pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple
# Extended protocol
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended

# COPY TO STDOUT test
psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null

# Query cancellation test
(psql -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(5)' || true) &
killall psql -s SIGINT

# Sharding insert
psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_insert.sql

Expand All @@ -29,3 +36,6 @@ psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replic

# Attempt clean shut down
killall pgcat -s SIGINT

# Allow for graceful shutdown
sleep 1
92 changes: 71 additions & 21 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ use bytes::{Buf, BufMut, BytesMut};
use once_cell::sync::OnceCell;
use regex::Regex;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
};

use std::collections::HashMap;

use crate::config::Role;
use crate::constants::*;
use crate::errors::Error;
use crate::messages::*;
use crate::pool::{ClientServerMap, ConnectionPool};
Expand Down Expand Up @@ -97,15 +100,15 @@ impl Client {

match code {
// Client wants SSL. We don't support it at the moment.
80877103 => {
SSL_REQUEST_CODE => {
let mut no = BytesMut::with_capacity(1);
no.put_u8(b'N');

write_all(&mut stream, no).await?;
}

// Regular startup message.
196608 => {
PROTOCOL_VERSION_NUMBER => {
// TODO: perform actual auth.
let parameters = parse_startup(bytes.clone())?;

Expand Down Expand Up @@ -138,7 +141,7 @@ impl Client {
}

// Query cancel request.
80877102 => {
CANCEL_REQUEST_CODE => {
let (read, write) = stream.into_split();

let process_id = bytes.get_i32();
Expand Down Expand Up @@ -168,23 +171,31 @@ impl Client {

/// Client loop. We handle all messages between the client and the database here.
pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> {
// Special: cancelling existing running query
// The client wants to cancel a query it has issued previously.
if self.cancel_mode {
let (process_id, secret_key, address, port) = {
let guard = self.client_server_map.lock().unwrap();

match guard.get(&(self.process_id, self.secret_key)) {
// Drop the mutex as soon as possible.
// We found the server the client is using for its query
// that it wants to cancel.
Some((process_id, secret_key, address, port)) => (
process_id.clone(),
secret_key.clone(),
address.clone(),
port.clone(),
),

// The client doesn't know / got the wrong server,
// we're closing the connection for security reasons.
None => return Ok(()),
}
};

// TODO: pass actual server host and port somewhere.
// Opens a new separate connection to the server, sends the backend_id
// and secret_key and then closes it for security reasons. No other interactions
// take place.
return Ok(Server::cancel(&address, &port, process_id, secret_key).await?);
}

Expand Down Expand Up @@ -217,7 +228,7 @@ impl Client {
};

// Parse for special server role selection command.
//
// SET SERVER ROLE TO '(primary|replica)';
match self.select_role(message.clone()) {
Some(r) => {
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
Expand All @@ -236,15 +247,17 @@ impl Client {
}
};

let mut proxy = connection.0;
let mut reference = connection.0;
let _address = connection.1;
let server = &mut *proxy;
let server = &mut *reference;

// Claim this server as mine for query cancellation.
server.claim(self.process_id, self.secret_key);

// Transaction loop. Multiple queries can be issued by the client here.
// The connection belongs to the client until the transaction is over,
// or until the client disconnects if we are in session mode.
loop {
// No messages in the buffer, read one.
let mut message = if message.len() == 0 {
match read_message(&mut self.read).await {
Ok(message) => message,
Expand All @@ -268,19 +281,26 @@ impl Client {
msg
};

let original = message.clone(); // To be forwarded to the server
// The message will be forwarded to the server intact. We still would like to
// parse it below to figure out what to do with it.
let original = message.clone();

let code = message.get_u8() as char;
let _len = message.get_i32() as usize;

match code {
// ReadyForQuery
'Q' => {
// TODO: implement retries here for read-only transactions.
server.send(original).await?;

// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
// TODO: implement retries here for read-only transactions.
let response = server.recv().await?;

// Send server reply to the client.
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
Expand All @@ -294,15 +314,18 @@ impl Client {
}
}

// Send statistic
// Report query executed statistics.
self.stats.query();

// Transaction over
// The transaction is over, we can release the connection back to the pool.
if !server.in_transaction() {
// Report transaction executed statistics.
self.stats.transaction();

// Release server
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
// Report this client as idle.
self.stats.client_idle();

shard = None;
Expand All @@ -313,6 +336,7 @@ impl Client {
}
}

// Terminate
'X' => {
// Client closing. Rollback and clean up
// connection before releasing into the pool.
Expand All @@ -326,35 +350,46 @@ impl Client {
return Ok(());
}

// Parse
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
'P' => {
// Extended protocol, let's buffer most of it
self.buffer.put(&original[..]);
}

// Bind
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
'B' => {
self.buffer.put(&original[..]);
}

// Describe
// Command a client can issue to describe a previously prepared named statement.
'D' => {
self.buffer.put(&original[..]);
}

// Execute
// Execute a prepared statement prepared in `P` and bound in `B`.
'E' => {
self.buffer.put(&original[..]);
}

// Sync
// Frontend (client) is asking for the query result now.
'S' => {
// Extended protocol, client requests sync
self.buffer.put(&original[..]);

// TODO: retries for read-only transactions
// TODO: retries for read-only transactions.
server.send(self.buffer.clone()).await?;

self.buffer.clear();

// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
// TODO: retries for read-only transactions
let response = server.recv().await?;

match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
Expand All @@ -368,9 +403,11 @@ impl Client {
}
}

// Report query executed statistics.
self.stats.query();

// Release server
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() {
self.stats.transaction();

Expand All @@ -392,10 +429,13 @@ impl Client {
server.send(original).await?;
}

// CopyDone or CopyFail
// Copy is done, successfully or not.
'c' | 'f' => {
// Copy is done.
server.send(original).await?;

let response = server.recv().await?;

match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
Expand All @@ -404,24 +444,29 @@ impl Client {
}
};

// Release the server
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() {
self.stats.transaction();

if self.transaction_mode {
shard = None;
role = self.default_server_role;

break;
}
}
}

// Some unexpected message. We either did not implement the protocol correctly
// or this is not a Postgres client we're talking to.
_ => {
println!(">>> Unexpected code: {}", code);
}
}
}

// The server is no longer bound to us, we can't cancel it's queries anymore.
self.release();
}
}
Expand Down Expand Up @@ -450,18 +495,21 @@ impl Client {

let len = buf.get_i32();
let query = String::from_utf8_lossy(&buf[..len as usize - 4 - 1]).to_ascii_uppercase(); // Don't read the ternminating null

let rgx = match SHARDING_REGEX_RE.get() {
Some(r) => r,
None => return None,
};

if rgx.is_match(&query) {
let shard = query.split("'").collect::<Vec<&str>>()[1];

match shard.parse::<i64>() {
Ok(shard) => {
let sharder = Sharder::new(shards);
Some(sharder.pg_bigint_hash(shard))
}

Err(_) => None,
}
} else {
Expand All @@ -481,6 +529,7 @@ impl Client {

let len = buf.get_i32();
let query = String::from_utf8_lossy(&buf[..len as usize - 4 - 1]).to_ascii_uppercase();

let rgx = match ROLE_REGEX_RE.get() {
Some(r) => r,
None => return None,
Expand All @@ -490,6 +539,7 @@ impl Client {
// it'll be time to abstract :).
if rgx.is_match(&query) {
let role = query.split("'").collect::<Vec<&str>>()[1];

match role {
"PRIMARY" => Some(Role::Primary),
"REPLICA" => Some(Role::Replica),
Expand Down
22 changes: 22 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/// Various protocol constants, as defined in
/// https://www.postgresql.org/docs/12/protocol-message-formats.html
/// and elsewhere in the source code.
/// Also other constants we use elsewhere.

// Used in the StartupMessage to indicate regular handshake.
pub const PROTOCOL_VERSION_NUMBER: i32 = 196608;

// SSLRequest: used to indicate we want an SSL connection.
pub const SSL_REQUEST_CODE: i32 = 80877103;

// CancelRequest: the cancel request code.
pub const CANCEL_REQUEST_CODE: i32 = 80877102;

// AuthenticationMD5Password
pub const MD5_ENCRYPTED_PASSWORD: i32 = 5;

// AuthenticationOk
pub const AUTHENTICATION_SUCCESSFUL: i32 = 0;

// ErrorResponse: A code identifying the field type; if zero, this is the message terminator and no string follows.
pub const MESSAGE_TERMINATOR: u8 = 0;
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use tokio::sync::mpsc;

mod client;
mod config;
mod constants;
mod errors;
mod messages;
mod pool;
Expand Down
Loading