diff --git a/libsql/src/connection.rs b/libsql/src/connection.rs index 47f35f1f91..467effe010 100644 --- a/libsql/src/connection.rs +++ b/libsql/src/connection.rs @@ -16,7 +16,7 @@ pub(crate) trait Conn { async fn transaction(&self, tx_behavior: TransactionBehavior) -> Result; - async fn is_autocommit(&self) -> Result; + fn is_autocommit(&self) -> bool; fn changes(&self) -> u64; @@ -67,8 +67,8 @@ impl Connection { self.conn.transaction(tx_behavior).await } - pub async fn is_autocommit(&self) -> Result { - self.conn.is_autocommit().await + pub fn is_autocommit(&self) -> bool { + self.conn.is_autocommit() } pub fn changes(&self) -> u64 { diff --git a/libsql/src/hrana/connection.rs b/libsql/src/hrana/connection.rs index 5489271b41..187ba0882e 100644 --- a/libsql/src/hrana/connection.rs +++ b/libsql/src/hrana/connection.rs @@ -3,7 +3,7 @@ use crate::hrana::proto::{Batch, BatchCond, BatchResult, Stmt, StmtResult}; use crate::hrana::stream::HttpStream; use crate::hrana::{HranaError, HttpSend, Result, Statement}; use crate::util::coerce_url_scheme; -use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; #[derive(Debug)] @@ -21,6 +21,7 @@ where auth: String, affected_row_count: AtomicU64, last_insert_rowid: AtomicI64, + is_autocommit: AtomicBool, } impl HttpConnection @@ -30,13 +31,14 @@ where pub fn new(url: String, token: String, inner: T) -> Self { // The `libsql://` protocol is an alias for `https://`. let base_url = coerce_url_scheme(&url); - let url_for_queries = format!("{base_url}/v2/pipeline"); + let url_for_queries = format!("{base_url}/v3/pipeline"); HttpConnection(Arc::new(InnerClient { inner, url_for_queries, auth: format!("Bearer {token}"), affected_row_count: AtomicU64::new(0), last_insert_rowid: AtomicI64::new(0), + is_autocommit: AtomicBool::new(true), })) } @@ -60,6 +62,14 @@ where .store(value, Ordering::SeqCst) } + pub fn is_autocommit(&self) -> bool { + self.client().is_autocommit.load(Ordering::SeqCst) + } + + fn set_autocommit(&self, value: bool) { + self.client().is_autocommit.store(value, Ordering::SeqCst) + } + fn client(&self) -> &InnerClient { &self.0 } @@ -78,10 +88,11 @@ where stmts: impl IntoIterator, ) -> Result { let batch = stmts_to_batch(false, stmts); - let resp = self + let (resp, is_autocommit) = self .open_stream() .finalize(StreamRequest::Batch(BatchStreamReq { batch })) .await?; + self.set_autocommit(is_autocommit); match resp { StreamResponse::Batch(resp) => Ok(resp.result), other => Err(HranaError::UnexpectedResponse(format!( @@ -92,10 +103,11 @@ where } pub(crate) async fn execute_inner(&self, stmt: Stmt) -> Result { - let resp = self + let (resp, is_autocommit) = self .open_stream() .finalize(StreamRequest::Execute(ExecuteStreamReq { stmt })) .await?; + self.set_autocommit(is_autocommit); match resp { StreamResponse::Execute(resp) => Ok(resp.result), other => Err(HranaError::UnexpectedResponse(format!( diff --git a/libsql/src/hrana/hyper.rs b/libsql/src/hrana/hyper.rs index 3d2f643858..c30565fdb5 100644 --- a/libsql/src/hrana/hyper.rs +++ b/libsql/src/hrana/hyper.rs @@ -142,8 +142,8 @@ impl Conn for HttpConnection { }) } - async fn is_autocommit(&self) -> crate::Result { - Ok(true) // connection without transaction always commits at the end of execution step + fn is_autocommit(&self) -> bool { + self.is_autocommit() } fn changes(&self) -> u64 { @@ -238,12 +238,8 @@ impl Conn for HttpStream { todo!("sounds like nested transactions innit?") } - async fn is_autocommit(&self) -> crate::Result { - let is_autocommit = self - .get_autocommit() - .await - .map_err(|e| crate::Error::Hrana(e.into()))?; - Ok(is_autocommit) + fn is_autocommit(&self) -> bool { + false // for streams this method is callable only when we're within explicit transaction } fn changes(&self) -> u64 { diff --git a/libsql/src/hrana/stream.rs b/libsql/src/hrana/stream.rs index 0ec5bffb8c..e5e7e5372a 100644 --- a/libsql/src/hrana/stream.rs +++ b/libsql/src/hrana/stream.rs @@ -63,7 +63,7 @@ where } /// Executes a final request and immediately closes current stream - all in one request. - pub async fn finalize(&mut self, req: StreamRequest) -> Result { + pub async fn finalize(&mut self, req: StreamRequest) -> Result<(StreamResponse, bool)> { let mut client = self.inner.stream.lock().await; let resp = client.finalize(req).await?; Ok(resp) @@ -265,8 +265,8 @@ where )))?; } let mut responses = std::array::from_fn(|_| StreamResponse::Close); - for i in 0..N { - match response.results.swap_remove(0) { + for (i, result) in response.results.into_iter().enumerate() { + match result { Response::Ok(StreamResponseOk { response }) => responses[i] = response, Response::Error(e) => return Err(HranaError::StreamError(e)), } @@ -274,10 +274,21 @@ where Ok(responses) } - async fn finalize(&mut self, req: StreamRequest) -> Result { - let [resp, _] = self.send_requests([req, StreamRequest::Close]).await?; + async fn finalize(&mut self, req: StreamRequest) -> Result<(StreamResponse, bool)> { + let [resp, get_autocommit, _] = self + .send_requests([req, StreamRequest::GetAutocommit, StreamRequest::Close]) + .await?; + let is_autocommit = match get_autocommit { + StreamResponse::GetAutocommit(resp) => resp.is_autocommit, + other => { + return Err(HranaError::UnexpectedResponse(format!( + "expected GetAutocommitResp but got {:?}", + other + ))) + } + }; self.done(); - Ok(resp) + Ok((resp, is_autocommit)) } fn done(&mut self) { diff --git a/libsql/src/local/impls.rs b/libsql/src/local/impls.rs index 72b643eae8..3c0bfcce4b 100644 --- a/libsql/src/local/impls.rs +++ b/libsql/src/local/impls.rs @@ -48,8 +48,8 @@ impl Conn for LibsqlConnection { }) } - async fn is_autocommit(&self) -> Result { - Ok(self.conn.is_autocommit()) + fn is_autocommit(&self) -> bool { + self.conn.is_autocommit() } fn changes(&self) -> u64 { diff --git a/libsql/src/replication/connection.rs b/libsql/src/replication/connection.rs index cb086758b1..d2d5975acb 100644 --- a/libsql/src/replication/connection.rs +++ b/libsql/src/replication/connection.rs @@ -228,7 +228,7 @@ impl RemoteConnection { // and will return false if no rollback happened and the // execute was valid. pub(self) async fn maybe_execute_rollback(&self) -> Result { - if self.inner.lock().state != State::TxnReadOnly && !self.local.is_autocommit().await? { + if self.inner.lock().state != State::TxnReadOnly && !self.local.is_autocommit() { self.local.execute("ROLLBACK", Params::None).await?; Ok(true) } else { @@ -328,8 +328,8 @@ impl Conn for RemoteConnection { }) } - async fn is_autocommit(&self) -> Result { - Ok(self.is_state_init()) + fn is_autocommit(&self) -> bool { + self.is_state_init() } fn changes(&self) -> u64 {