Skip to content

Commit

Permalink
Accept config for pool init
Browse files Browse the repository at this point in the history
Also clean up query writes
  • Loading branch information
ohsayan committed Nov 27, 2023
1 parent 18aad43 commit 13466cd
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ rand = "0.8.5"
r2d2 = "0.8.10"
async-trait = "0.1.74"
bb8 = "0.8.1"
itoa = "1.0.9"
11 changes: 10 additions & 1 deletion src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
macro_rules! query {
($query_str:expr) => { $crate::Query::new($query_str) };
($query_str:expr$(, $($query_param:expr),* $(,)?)?) => {{
let mut q = $crate::Query::new($query_str); $($(q.push_param($query_param);)*)*q
let mut q = $crate::Query::from($query_str); $($(q.push_param($query_param);)*)*q
}};
}

macro_rules! pushlen {
($buf:expr, $len:expr) => {{
let mut buf = ::itoa::Buffer::new();
let r = ::itoa::Buffer::format(&mut buf, $len);
$buf.extend(str::as_bytes(r));
$buf.push(b'\n');
}};
}
23 changes: 8 additions & 15 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,41 +22,34 @@ use {
const QUERY_SYSCTL_STATUS: &str = "sysctl report status";

/// Returns a TCP (skyhash/TCP) connection pool using [`r2d2`]'s default settings and the given maximum pool size
pub fn get(
pool_size: u32,
username: &str,
password: &str,
) -> Result<r2d2::Pool<ConnectionMgrTcp>, r2d2::Error> {
let mgr = ConnectionMgrTcp::new(Config::new_default(username, password));
pub fn get(pool_size: u32, config: Config) -> Result<r2d2::Pool<ConnectionMgrTcp>, r2d2::Error> {
let mgr = ConnectionMgrTcp::new(config);
r2d2::Pool::builder().max_size(pool_size).build(mgr)
}
/// Returns an async TCP (skyhash/TCP) connection pool using [`bb8`]'s default settings and the given maximum pool size
pub async fn get_async(
pool_size: u32,
username: &str,
password: &str,
config: Config,
) -> Result<bb8::Pool<ConnectionMgrTcp>, Error> {
let mgr = ConnectionMgrTcp::new(Config::new_default(username, password));
let mgr = ConnectionMgrTcp::new(config);
bb8::Pool::builder().max_size(pool_size).build(mgr).await
}
/// Returns a TLS (skyhash/TLS) connection pool using [`r2d2`]'s default settings and the given maximum pool size
pub fn get_tls(
pool_size: u32,
username: &str,
password: &str,
config: Config,
pem_cert: &str,
) -> Result<r2d2::Pool<ConnectionMgrTls>, r2d2::Error> {
let mgr = ConnectionMgrTls::new(Config::new_default(username, password), pem_cert.into());
let mgr = ConnectionMgrTls::new(config, pem_cert.into());
r2d2::Pool::builder().max_size(pool_size).build(mgr)
}
/// Returns an async TLS (skyhash/TCP) connection pool using [`bb8`]'s default settings and the given maximum pool size
pub async fn get_tls_async(
pool_size: u32,
username: &str,
password: &str,
config: Config,
pem_cert: &str,
) -> Result<bb8::Pool<ConnectionMgrTls>, Error> {
let mgr = ConnectionMgrTls::new(Config::new_default(username, password), pem_cert.into());
let mgr = ConnectionMgrTls::new(config, pem_cert.into());
bb8::Pool::builder().max_size(pool_size).build(mgr).await
}

Expand Down
6 changes: 2 additions & 4 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,8 @@ impl ClientHandshake {
pub(crate) fn new(cfg: &Config) -> Self {
let mut v = Vec::with_capacity(6 + cfg.username().len() + cfg.password().len() + 5);
v.extend(b"H\x00\x00\x00\x00\x00");
v.extend(cfg.username().len().to_string().as_bytes());
v.push(b'\n');
v.extend(cfg.password().len().to_string().as_bytes());
v.push(b'\n');
pushlen!(v, cfg.username().len());
pushlen!(v, cfg.password().len());
v.extend(cfg.username().as_bytes());
v.extend(cfg.password().as_bytes());
Self(v.into_boxed_slice())
Expand Down
93 changes: 67 additions & 26 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,28 @@ pub struct Query {
param_cnt: usize,
}

impl From<String> for Query {
fn from(q: String) -> Self {
Self::new_string(q)
}
}

impl<'a> From<&'a str> for Query {
fn from(q: &'a str) -> Self {
Self::new(q)
}
}

impl Query {
pub fn new(query: &str) -> Self {
Self::_new(query.to_owned())
}
pub fn new_string(query: String) -> Self {
Self::_new(query)
}
fn _new(query: String) -> Self {
Self {
dataframe_q: query.as_bytes().to_owned().into_boxed_slice(),
dataframe_q: query.into_bytes().into_boxed_slice(),
dataframe_p: vec![],
param_cnt: 0,
}
Expand All @@ -48,7 +66,7 @@ impl Query {
unsafe { core::str::from_utf8_unchecked(&self.dataframe_q) }
}
pub fn push_param(&mut self, param: impl SQParam) -> &mut Self {
param.push(&mut self.dataframe_p);
param.append_param(&mut self.dataframe_p);
self.param_cnt += 1;
self
}
Expand All @@ -62,12 +80,17 @@ impl Query {
^meta1 ^meta2 ^payload
*/
// compute the total packet size
let query_window_str = self.dataframe_q.len().to_string();
// q window
let mut query_window_buffer = itoa::Buffer::new();
let query_window_str = query_window_buffer.format(self.dataframe_q.len());
// full packet
let total_packet_size =
query_window_str.len() + 1 + self.dataframe_q.len() + self.dataframe_p.len();
let mut total_packet_size_buffer = itoa::Buffer::new();
let total_packet_size_str = total_packet_size_buffer.format(total_packet_size);
// segment 1: meta
buf.write_all(b"S")?;
buf.write_all(&total_packet_size.to_string().as_bytes())?;
buf.write_all(total_packet_size_str.as_bytes())?;
buf.write_all(b"\n")?;
// segment 2: variable meta
buf.write_all(query_window_str.as_bytes())?;
Expand All @@ -92,74 +115,92 @@ impl Query {
/// An [`SQParam`] should be implemented by any type that is expected to be used as a parameter
pub trait SQParam {
/// Append this element to the raw parameter buffer
fn push(self, buf: &mut Vec<u8>);
fn append_param(self, buf: &mut Vec<u8>);
}
// null
impl<T> SQParam for Option<T>
where
T: SQParam,
{
fn push(self, buf: &mut Vec<u8>) {
fn append_param(self, buf: &mut Vec<u8>) {
match self {
None => buf.push(0),
Some(e) => e.push(buf),
Some(e) => e.append_param(buf),
}
}
}
// bool
impl SQParam for bool {
fn push(self, buf: &mut Vec<u8>) {
fn append_param(self, buf: &mut Vec<u8>) {
let a = [1, self as u8];
buf.extend(a)
}
}
macro_rules! imp_number {
($($code:literal => $($ty:ty as $base:ty),*),* $(,)?) => {
$($(impl SQParam for $ty { fn append_param(self, b: &mut Vec<u8>) {
let mut buf = ::itoa::Buffer::new();
let str = buf.format(<$base>::from(self));
b.push($code); b.extend(str.as_bytes()); b.push(b'\n');
} })*)*
}
}

macro_rules! imp_terminated_str_type {
($($code:literal => $($ty:ty),*),* $(,)?) => {
$($(impl SQParam for $ty { fn push(self, buf: &mut Vec<u8>) { buf.push($code); buf.extend(self.to_string().as_bytes()); buf.push(b'\n'); } })*)*
$($(impl SQParam for $ty { fn append_param(self, buf: &mut Vec<u8>) { buf.push($code); buf.extend(self.to_string().as_bytes()); buf.push(b'\n'); } })*)*
}
}

// uint, sint, float
imp_number!(
2 => u8, NonZeroU8, u16, NonZeroU16, u32, NonZeroU32, u64, NonZeroU64, usize, NonZeroUsize,
3 => i8, NonZeroI8, i16, NonZeroI16, i32, NonZeroI32, i64, NonZeroI64, isize, NonZeroIsize,
4 => f32, f64,
2 => u8 as u8, NonZeroU8 as u8, u16 as u16, NonZeroU16 as u16, u32 as u32, NonZeroU32 as u32, u64 as u64, NonZeroU64 as u64, usize as usize, NonZeroUsize as usize,
3 => i8 as i8, NonZeroI8 as i8, i16 as i16, NonZeroI16 as i16, i32 as i32, NonZeroI32 as i32, i64 as i64, NonZeroI64 as i64, isize as isize, NonZeroIsize as isize,
);

imp_terminated_str_type!(
4 => f32, f64
);

// bin
impl<'a> SQParam for &'a [u8] {
fn push(self, buf: &mut Vec<u8>) {
fn append_param(self, buf: &mut Vec<u8>) {
buf.push(5);
buf.extend(self.len().to_string().into_bytes());
buf.push(b'\n');
pushlen!(buf, self.len());
buf.extend(self);
}
}
impl<const N: usize> SQParam for [u8; N] {
fn push(self, buf: &mut Vec<u8>) {
fn append_param(self, buf: &mut Vec<u8>) {
buf.push(5);
buf.extend(self.len().to_string().into_bytes());
buf.push(b'\n');
pushlen!(buf, self.len());
buf.extend(self);
}
}
impl<'a, const N: usize> SQParam for &'a [u8; N] {
fn push(self, buf: &mut Vec<u8>) {
fn append_param(self, buf: &mut Vec<u8>) {
buf.push(5);
pushlen!(buf, self.len());
buf.extend(self);
}
}
impl SQParam for Vec<u8> {
fn append_param(self, buf: &mut Vec<u8>) {
buf.push(5);
buf.extend(self.len().to_string().into_bytes());
buf.push(b'\n');
pushlen!(buf, self.len());
buf.extend(self);
}
}
// str
impl<'a> SQParam for &'a str {
fn push(self, buf: &mut Vec<u8>) {
fn append_param(self, buf: &mut Vec<u8>) {
buf.push(6);
buf.extend(self.len().to_string().into_bytes());
buf.push(b'\n');
pushlen!(buf, self.len());
buf.extend(self.as_bytes());
}
}
impl SQParam for String {
fn push(self, buf: &mut Vec<u8>) {
self.as_str().push(buf)
fn append_param(self, buf: &mut Vec<u8>) {
self.as_str().append_param(buf)
}
}

0 comments on commit 13466cd

Please sign in to comment.