Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions libsql/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) trait Conn {

async fn transaction(&self, tx_behavior: TransactionBehavior) -> Result<Transaction>;

async fn is_autocommit(&self) -> Result<bool>;
fn is_autocommit(&self) -> bool;

fn changes(&self) -> u64;

Expand Down Expand Up @@ -67,8 +67,8 @@ impl Connection {
self.conn.transaction(tx_behavior).await
}

pub async fn is_autocommit(&self) -> Result<bool> {
self.conn.is_autocommit().await
pub fn is_autocommit(&self) -> bool {
self.conn.is_autocommit()
}

pub fn changes(&self) -> u64 {
Expand Down
20 changes: 16 additions & 4 deletions libsql/src/hrana/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::hrana::proto::{Batch, BatchCond, BatchResult, Stmt, StmtResult};
use crate::hrana::stream::HttpStream;
use crate::hrana::{HranaError, HttpSend, Result, Statement};
use crate::util::coerce_url_scheme;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;

#[derive(Debug)]
Expand All @@ -21,6 +21,7 @@ where
auth: String,
affected_row_count: AtomicU64,
last_insert_rowid: AtomicI64,
is_autocommit: AtomicBool,
}

impl<T> HttpConnection<T>
Expand All @@ -30,13 +31,14 @@ where
pub fn new(url: String, token: String, inner: T) -> Self {
// The `libsql://` protocol is an alias for `https://`.
let base_url = coerce_url_scheme(&url);
let url_for_queries = format!("{base_url}/v2/pipeline");
let url_for_queries = format!("{base_url}/v3/pipeline");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetAutocommitReq requires Hrana v3 to be used.

HttpConnection(Arc::new(InnerClient {
inner,
url_for_queries,
auth: format!("Bearer {token}"),
affected_row_count: AtomicU64::new(0),
last_insert_rowid: AtomicI64::new(0),
is_autocommit: AtomicBool::new(true),
}))
}

Expand All @@ -60,6 +62,14 @@ where
.store(value, Ordering::SeqCst)
}

pub fn is_autocommit(&self) -> bool {
self.client().is_autocommit.load(Ordering::SeqCst)
}

fn set_autocommit(&self, value: bool) {
self.client().is_autocommit.store(value, Ordering::SeqCst)
}

fn client(&self) -> &InnerClient<T> {
&self.0
}
Expand All @@ -78,10 +88,11 @@ where
stmts: impl IntoIterator<Item = Stmt>,
) -> Result<BatchResult> {
let batch = stmts_to_batch(false, stmts);
let resp = self
let (resp, is_autocommit) = self
.open_stream()
.finalize(StreamRequest::Batch(BatchStreamReq { batch }))
.await?;
self.set_autocommit(is_autocommit);
match resp {
StreamResponse::Batch(resp) => Ok(resp.result),
other => Err(HranaError::UnexpectedResponse(format!(
Expand All @@ -92,10 +103,11 @@ where
}

pub(crate) async fn execute_inner(&self, stmt: Stmt) -> Result<StmtResult> {
let resp = self
let (resp, is_autocommit) = self
.open_stream()
.finalize(StreamRequest::Execute(ExecuteStreamReq { stmt }))
.await?;
self.set_autocommit(is_autocommit);
match resp {
StreamResponse::Execute(resp) => Ok(resp.result),
other => Err(HranaError::UnexpectedResponse(format!(
Expand Down
12 changes: 4 additions & 8 deletions libsql/src/hrana/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ impl Conn for HttpConnection<HttpSender> {
})
}

async fn is_autocommit(&self) -> crate::Result<bool> {
Ok(true) // connection without transaction always commits at the end of execution step
fn is_autocommit(&self) -> bool {
self.is_autocommit()
}

fn changes(&self) -> u64 {
Expand Down Expand Up @@ -238,12 +238,8 @@ impl Conn for HttpStream<HttpSender> {
todo!("sounds like nested transactions innit?")
}

async fn is_autocommit(&self) -> crate::Result<bool> {
let is_autocommit = self
.get_autocommit()
.await
.map_err(|e| crate::Error::Hrana(e.into()))?;
Ok(is_autocommit)
fn is_autocommit(&self) -> bool {
false // for streams this method is callable only when we're within explicit transaction
}

fn changes(&self) -> u64 {
Expand Down
23 changes: 17 additions & 6 deletions libsql/src/hrana/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ where
}

/// Executes a final request and immediately closes current stream - all in one request.
pub async fn finalize(&mut self, req: StreamRequest) -> Result<StreamResponse> {
pub async fn finalize(&mut self, req: StreamRequest) -> Result<(StreamResponse, bool)> {
let mut client = self.inner.stream.lock().await;
let resp = client.finalize(req).await?;
Ok(resp)
Expand Down Expand Up @@ -265,19 +265,30 @@ where
)))?;
}
let mut responses = std::array::from_fn(|_| StreamResponse::Close);
for i in 0..N {
match response.results.swap_remove(0) {
for (i, result) in response.results.into_iter().enumerate() {
match result {
Response::Ok(StreamResponseOk { response }) => responses[i] = response,
Response::Error(e) => return Err(HranaError::StreamError(e)),
}
}
Ok(responses)
}

async fn finalize(&mut self, req: StreamRequest) -> Result<StreamResponse> {
let [resp, _] = self.send_requests([req, StreamRequest::Close]).await?;
async fn finalize(&mut self, req: StreamRequest) -> Result<(StreamResponse, bool)> {
let [resp, get_autocommit, _] = self
.send_requests([req, StreamRequest::GetAutocommit, StreamRequest::Close])
.await?;
let is_autocommit = match get_autocommit {
StreamResponse::GetAutocommit(resp) => resp.is_autocommit,
other => {
return Err(HranaError::UnexpectedResponse(format!(
"expected GetAutocommitResp but got {:?}",
other
)))
}
};
self.done();
Ok(resp)
Ok((resp, is_autocommit))
}

fn done(&mut self) {
Expand Down
4 changes: 2 additions & 2 deletions libsql/src/local/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ impl Conn for LibsqlConnection {
})
}

async fn is_autocommit(&self) -> Result<bool> {
Ok(self.conn.is_autocommit())
fn is_autocommit(&self) -> bool {
self.conn.is_autocommit()
}

fn changes(&self) -> u64 {
Expand Down
6 changes: 3 additions & 3 deletions libsql/src/replication/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl RemoteConnection {
// and will return false if no rollback happened and the
// execute was valid.
pub(self) async fn maybe_execute_rollback(&self) -> Result<bool> {
if self.inner.lock().state != State::TxnReadOnly && !self.local.is_autocommit().await? {
if self.inner.lock().state != State::TxnReadOnly && !self.local.is_autocommit() {
self.local.execute("ROLLBACK", Params::None).await?;
Ok(true)
} else {
Expand Down Expand Up @@ -328,8 +328,8 @@ impl Conn for RemoteConnection {
})
}

async fn is_autocommit(&self) -> Result<bool> {
Ok(self.is_state_init())
fn is_autocommit(&self) -> bool {
self.is_state_init()
}

fn changes(&self) -> u64 {
Expand Down