From 37fa57ba0b8d5cd9a9d0f4ef8b2127b0cf440fd3 Mon Sep 17 00:00:00 2001 From: ischeinkman Date: Wed, 7 May 2025 12:41:19 -0400 Subject: [PATCH 1/2] Added `Buffer::check_can_flush` --- questdb-rs/src/ingress/mod.rs | 41 +++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/questdb-rs/src/ingress/mod.rs b/questdb-rs/src/ingress/mod.rs index fab0fe35..743deeb7 100644 --- a/questdb-rs/src/ingress/mod.rs +++ b/questdb-rs/src/ingress/mod.rs @@ -1172,6 +1172,31 @@ impl Buffer { self.state.row_count += 1; Ok(()) } + + /// Checks if this buffer is ready to be flushed to a sender via one of the + /// [`Sender::flush`] functions. An [`Ok`] value indicates that the buffer + /// is ready to be written to using the given [`Protocol`] and transaction + /// level, while an [`Err`] will contain a message indicating why this + /// [`Buffer`] cannot be flushed at the moment. + pub fn check_can_flush(&self, transactional: bool, proto: Protocol) -> Result<()> { + self.check_op(Op::Flush)?; + if transactional { + if proto.is_tcpx() { + return Err(error::fmt!( + InvalidApiCall, + "Transactional flushes are not supported for ILP over TCP." + )); + } + if !self.transactional() { + return Err(error::fmt!( + InvalidApiCall, + "Buffer contains lines for multiple tables. \ + Transactional flushes are only supported for buffers containing lines for a single table." + )); + }; + } + Ok(()) + } } impl Default for Buffer { @@ -2585,7 +2610,6 @@ impl Sender { "Could not flush buffer: not connected to database." )); } - buf.check_op(Op::Flush)?; if buf.len() > self.max_buf_size { return Err(error::fmt!( @@ -2602,12 +2626,7 @@ impl Sender { } match self.handler { ProtocolHandler::Socket(ref mut conn) => { - if transactional { - return Err(error::fmt!( - InvalidApiCall, - "Transactional flushes are not supported for ILP over TCP." - )); - } + buf.check_can_flush(transactional, Protocol::Tcp)?; conn.write_all(bytes).map_err(|io_err| { self.connected = false; map_io_to_socket_err("Could not flush buffer: ", io_err) @@ -2615,13 +2634,7 @@ impl Sender { } #[cfg(feature = "ilp-over-http")] ProtocolHandler::Http(ref state) => { - if transactional && !buf.transactional() { - return Err(error::fmt!( - InvalidApiCall, - "Buffer contains lines for multiple tables. \ - Transactional flushes are only supported for buffers containing lines for a single table." - )); - } + buf.check_can_flush(transactional, Protocol::Http)?; let request_min_throughput = *state.config.request_min_throughput; let extra_time = if request_min_throughput > 0 { (bytes.len() as f64) / (request_min_throughput as f64) From ca12ca8403e84cd553f522dba773062801d74954 Mon Sep 17 00:00:00 2001 From: ischeinkman Date: Wed, 7 May 2025 12:47:57 -0400 Subject: [PATCH 2/2] Remove transaction logic from the flush fn --- questdb-rs/src/ingress/mod.rs | 50 ++++++++++++++++------------------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/questdb-rs/src/ingress/mod.rs b/questdb-rs/src/ingress/mod.rs index 743deeb7..f4e6a8aa 100644 --- a/questdb-rs/src/ingress/mod.rs +++ b/questdb-rs/src/ingress/mod.rs @@ -689,6 +689,15 @@ impl Buffer { } } + /// Checks if this buffer is ready to be flushed to a sender via one of the + /// [`Sender::flush`] functions. An [`Ok`] value indicates that the buffer + /// is ready to be flushed via a [`Sender`] while an [`Err`] will contain a + /// message indicating why this [`Buffer`] cannot be flushed at the moment. + #[inline(always)] + pub fn check_can_flush(&self) -> Result<()> { + self.check_op(Op::Flush) + } + #[inline(always)] fn validate_max_name_len(&self, name: &str) -> Result<()> { if name.len() > self.max_name_len { @@ -1172,31 +1181,6 @@ impl Buffer { self.state.row_count += 1; Ok(()) } - - /// Checks if this buffer is ready to be flushed to a sender via one of the - /// [`Sender::flush`] functions. An [`Ok`] value indicates that the buffer - /// is ready to be written to using the given [`Protocol`] and transaction - /// level, while an [`Err`] will contain a message indicating why this - /// [`Buffer`] cannot be flushed at the moment. - pub fn check_can_flush(&self, transactional: bool, proto: Protocol) -> Result<()> { - self.check_op(Op::Flush)?; - if transactional { - if proto.is_tcpx() { - return Err(error::fmt!( - InvalidApiCall, - "Transactional flushes are not supported for ILP over TCP." - )); - } - if !self.transactional() { - return Err(error::fmt!( - InvalidApiCall, - "Buffer contains lines for multiple tables. \ - Transactional flushes are only supported for buffers containing lines for a single table." - )); - }; - } - Ok(()) - } } impl Default for Buffer { @@ -2610,6 +2594,7 @@ impl Sender { "Could not flush buffer: not connected to database." )); } + buf.check_can_flush()?; if buf.len() > self.max_buf_size { return Err(error::fmt!( @@ -2626,7 +2611,12 @@ impl Sender { } match self.handler { ProtocolHandler::Socket(ref mut conn) => { - buf.check_can_flush(transactional, Protocol::Tcp)?; + if transactional { + return Err(error::fmt!( + InvalidApiCall, + "Transactional flushes are not supported for ILP over TCP." + )); + } conn.write_all(bytes).map_err(|io_err| { self.connected = false; map_io_to_socket_err("Could not flush buffer: ", io_err) @@ -2634,7 +2624,13 @@ impl Sender { } #[cfg(feature = "ilp-over-http")] ProtocolHandler::Http(ref state) => { - buf.check_can_flush(transactional, Protocol::Http)?; + if transactional && !buf.transactional() { + return Err(error::fmt!( + InvalidApiCall, + "Buffer contains lines for multiple tables. \ + Transactional flushes are only supported for buffers containing lines for a single table." + )); + } let request_min_throughput = *state.config.request_min_throughput; let extra_time = if request_min_throughput > 0 { (bytes.len() as f64) / (request_min_throughput as f64)