Skip to content

Commit

Permalink
Merge pull request #577 from tursodatabase/restore-autocommit
Browse files Browse the repository at this point in the history
use is_autocommit to retrieve transaction state
  • Loading branch information
LucioFranco committed Nov 7, 2023
2 parents 5292eff + a098789 commit c6abeef
Show file tree
Hide file tree
Showing 14 changed files with 53 additions and 49 deletions.
2 changes: 1 addition & 1 deletion libsql-replication/proto/proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ message FinishRow { }
message FinishRows { }
message Finish {
optional uint64 last_frame_no = 1;
State state = 2;
bool is_autocommit = 2;
}

/// Stream execx dexcribe response messages
Expand Down
4 changes: 2 additions & 2 deletions libsql-replication/src/generated/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,8 @@ pub struct FinishRows {}
pub struct Finish {
#[prost(uint64, optional, tag = "1")]
pub last_frame_no: ::core::option::Option<u64>,
#[prost(enumeration = "State", tag = "2")]
pub state: i32,
#[prost(bool, tag = "2")]
pub is_autocommit: bool,
}
/// / Stream execx dexcribe response messages
#[derive(serde::Serialize, serde::Deserialize)]
Expand Down
18 changes: 8 additions & 10 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,16 +570,14 @@ impl<W: WalHook> Connection<W> {
results.push(res);
}

let status = this
.lock()
.conn
.transaction_state(Some(DatabaseName::Main))?
.into();

builder.finish(
*this.lock().current_frame_no_receiver.borrow_and_update(),
status,
)?;
{
let mut lock = this.lock();
let is_autocommit = lock.conn.is_autocommit();
builder.finish(
*(lock.current_frame_no_receiver.borrow_and_update()),
is_autocommit,
)?;
}

Ok(builder)
}
Expand Down
8 changes: 6 additions & 2 deletions libsql-server/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,12 @@ where
&builder_config,
&mut builder,
resp,
|last_frame_no, status| {
txn_status = status;
|last_frame_no, is_autocommit| {
txn_status = if is_autocommit {
TxnStatus::Init
} else {
TxnStatus::Txn
};
new_frame_no = last_frame_no;
},
)
Expand Down
3 changes: 1 addition & 2 deletions libsql-server/src/hrana/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use tokio::sync::{mpsc, oneshot};
use crate::auth::Authenticated;
use crate::connection::program::Program;
use crate::connection::Connection;
use crate::query_analysis::TxnStatus;
use crate::query_result_builder::{
Column, QueryBuilderConfig, QueryResultBuilder, QueryResultBuilderError,
};
Expand Down Expand Up @@ -259,7 +258,7 @@ impl QueryResultBuilder for CursorResultBuilder {
fn finish(
&mut self,
last_frame_no: Option<FrameNo>,
_status: TxnStatus,
_is_autocommit: bool,
) -> Result<(), QueryResultBuilderError> {
self.emit_entry(Ok(SizedEntry {
entry: proto::CursorEntry::ReplicationIndex {
Expand Down
5 changes: 2 additions & 3 deletions libsql-server/src/hrana/result_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use bytes::Bytes;
use rusqlite::types::ValueRef;

use crate::hrana::stmt::{proto_error_from_stmt_error, stmt_error_from_sqld_error};
use crate::query_analysis::TxnStatus;
use crate::query_result_builder::{
Column, QueryBuilderConfig, QueryResultBuilder, QueryResultBuilderError, TOTAL_RESPONSE_SIZE,
};
Expand Down Expand Up @@ -229,7 +228,7 @@ impl QueryResultBuilder for SingleStatementBuilder {
fn finish(
&mut self,
last_frame_no: Option<FrameNo>,
_state: TxnStatus,
_is_autocommit: bool,
) -> Result<(), QueryResultBuilderError> {
self.last_frame_no = last_frame_no;
Ok(())
Expand Down Expand Up @@ -352,7 +351,7 @@ impl QueryResultBuilder for HranaBatchProtoBuilder {
fn finish(
&mut self,
_last_frame_no: Option<FrameNo>,
_state: TxnStatus,
_is_autocommit: bool,
) -> Result<(), QueryResultBuilderError> {
Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions libsql-server/src/http/user/result_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use serde::{Serialize, Serializer};
use serde_json::ser::{CompactFormatter, Formatter};
use std::sync::atomic::Ordering;

use crate::query_analysis::TxnStatus;
use crate::query_result_builder::{
Column, JsonFormatter, QueryBuilderConfig, QueryResultBuilder, QueryResultBuilderError,
TOTAL_RESPONSE_SIZE,
Expand Down Expand Up @@ -297,7 +296,7 @@ impl QueryResultBuilder for JsonHttpPayloadBuilder {
fn finish(
&mut self,
_last_frame_no: Option<FrameNo>,
_state: TxnStatus,
_is_autocommit: bool,
) -> Result<(), QueryResultBuilderError> {
self.formatter.end_array(&mut self.buffer)?;

Expand Down
21 changes: 10 additions & 11 deletions libsql-server/src/query_result_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use serde::Serialize;
use serde_json::ser::Formatter;
use std::sync::atomic::AtomicUsize;

use crate::query_analysis::TxnStatus;
use crate::replication::FrameNo;

pub static TOTAL_RESPONSE_SIZE: AtomicUsize = AtomicUsize::new(0);
Expand Down Expand Up @@ -125,7 +124,7 @@ pub trait QueryResultBuilder: Send + 'static {
fn finish(
&mut self,
last_frame_no: Option<FrameNo>,
state: TxnStatus,
_is_auto_commit: bool,
) -> Result<(), QueryResultBuilderError>;
/// returns the inner ret
fn into_ret(self) -> Self::Ret;
Expand Down Expand Up @@ -320,7 +319,7 @@ impl QueryResultBuilder for StepResultsBuilder {
fn finish(
&mut self,
_last_frame_no: Option<FrameNo>,
_state: TxnStatus,
_is_autocommit: bool,
) -> Result<(), QueryResultBuilderError> {
Ok(())
}
Expand Down Expand Up @@ -385,7 +384,7 @@ impl QueryResultBuilder for IgnoreResult {
fn finish(
&mut self,
_last_frame_no: Option<FrameNo>,
_state: TxnStatus,
_is_autocommit: bool,
) -> Result<(), QueryResultBuilderError> {
Ok(())
}
Expand Down Expand Up @@ -498,9 +497,9 @@ impl<B: QueryResultBuilder> QueryResultBuilder for Take<B> {
fn finish(
&mut self,
last_frame_no: Option<FrameNo>,
state: TxnStatus,
is_autocommit: bool,
) -> Result<(), QueryResultBuilderError> {
self.inner.finish(last_frame_no, state)
self.inner.finish(last_frame_no, is_autocommit)
}

fn into_ret(self) -> Self::Ret {
Expand Down Expand Up @@ -617,7 +616,7 @@ pub mod test {
fn finish(
&mut self,
_last_frame_no: Option<FrameNo>,
_txn_status: TxnStatus,
_is_autocommitk: bool,
) -> Result<(), QueryResultBuilderError> {
Ok(())
}
Expand Down Expand Up @@ -771,7 +770,7 @@ pub mod test {
FinishRow => b.finish_row().unwrap(),
FinishRows => b.finish_rows().unwrap(),
Finish => {
b.finish(Some(0), TxnStatus::Init).unwrap();
b.finish(Some(0), true).unwrap();
break;
}
BuilderError => return b,
Expand Down Expand Up @@ -869,7 +868,7 @@ pub mod test {
fn finish(
&mut self,
_last_frame_no: Option<FrameNo>,
_state: TxnStatus,
_is_autocommitk: bool,
) -> Result<(), QueryResultBuilderError> {
assert_eq!(self.trace[self.current], FsmState::Finish);
self.current += 1;
Expand Down Expand Up @@ -1006,7 +1005,7 @@ pub mod test {
fn finish(
&mut self,
_last_frame_no: Option<FrameNo>,
_txn_status: TxnStatus,
_is_autocommitk: bool,
) -> Result<(), QueryResultBuilderError> {
self.maybe_inject_error()?;
self.transition(Finish)
Expand Down Expand Up @@ -1055,7 +1054,7 @@ pub mod test {
builder.finish_rows().unwrap();
builder.finish_step(0, None).unwrap();

builder.finish(Some(0), TxnStatus::Init).unwrap();
builder.finish(Some(0), true).unwrap();
}

#[test]
Expand Down
11 changes: 8 additions & 3 deletions libsql-server/src/rpc/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::auth::{Auth, Authenticated};
use crate::connection::Connection;
use crate::database::{Database, PrimaryConnection};
use crate::namespace::{NamespaceStore, PrimaryNamespaceMaker};
use crate::query_analysis::TxnStatus;
use crate::query_result_builder::{
Column, QueryBuilderConfig, QueryResultBuilder, QueryResultBuilderError,
};
Expand Down Expand Up @@ -453,11 +452,17 @@ impl QueryResultBuilder for ExecuteResultsBuilder {
fn finish(
&mut self,
last_frame_no: Option<FrameNo>,
txn_status: TxnStatus,
is_autocommit: bool,
) -> Result<(), QueryResultBuilderError> {
use libsql_replication::rpc::proxy::State;

self.output = Some(ExecuteResults {
results: std::mem::take(&mut self.results),
state: rpc::State::from(txn_status).into(),
state: if is_autocommit {
State::Init.into()
} else {
State::Txn.into()
},
current_frame_no: last_frame_no,
});
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ expression: stream.next().await.unwrap().unwrap()
"step": {
"Finish": {
"last_frame_no": null,
"state": 0
"is_autocommit": true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ expression: stream.next().await.unwrap().unwrap()
"step": {
"Finish": {
"last_frame_no": null,
"state": 0
"is_autocommit": true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ ExecResp {
Finish(
Finish {
last_frame_no: None,
state: Init,
is_autocommit: true,
},
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ expression: stream.next().await.unwrap().unwrap()
"step": {
"Finish": {
"last_frame_no": null,
"state": 0
"is_autocommit": true
}
}
}
Expand Down
19 changes: 10 additions & 9 deletions libsql-server/src/rpc/streaming_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use libsql_replication::rpc::proxy::row_value::Value;
use libsql_replication::rpc::proxy::{
AddRowValue, BeginRow, BeginRows, BeginStep, ColsDescription, DescribeCol, DescribeParam,
DescribeResp, ExecReq, ExecResp, Finish, FinishRow, FinishRows, FinishStep, Init, ProgramResp,
RespStep, RowValue, State as RpcState, StepError, StreamDescribeReq,
RespStep, RowValue, StepError, StreamDescribeReq,
};
use prost::Message;
use rusqlite::types::ValueRef;
Expand All @@ -23,7 +23,6 @@ use tonic::{Code, Status};
use crate::auth::Authenticated;
use crate::connection::Connection;
use crate::error::Error;
use crate::query_analysis::TxnStatus;
use crate::query_result_builder::{
Column, QueryBuilderConfig, QueryResultBuilder, QueryResultBuilderError,
};
Expand Down Expand Up @@ -212,7 +211,7 @@ pub fn apply_program_resp_to_builder<B: QueryResultBuilder>(
config: &QueryBuilderConfig,
builder: &mut B,
resp: ProgramResp,
mut on_finish: impl FnMut(Option<FrameNo>, TxnStatus),
mut on_finish: impl FnMut(Option<FrameNo>, bool),
) -> crate::Result<bool> {
for step in resp.steps {
let Some(step) = step.step else {
Expand Down Expand Up @@ -251,10 +250,12 @@ pub fn apply_program_resp_to_builder<B: QueryResultBuilder>(
}
Step::FinishRow(_) => builder.finish_row()?,
Step::FinishRows(_) => builder.finish_rows()?,
Step::Finish(f @ Finish { last_frame_no, .. }) => {
let txn_status = TxnStatus::from(f.state());
on_finish(last_frame_no, txn_status);
builder.finish(last_frame_no, txn_status)?;
Step::Finish(Finish {
last_frame_no,
is_autocommit,
}) => {
on_finish(last_frame_no, is_autocommit);
builder.finish(last_frame_no, is_autocommit)?;
return Ok(false);
}
_ => return Err(Error::PrimaryStreamMisuse),
Expand Down Expand Up @@ -343,11 +344,11 @@ impl QueryResultBuilder for StreamResponseBuilder {
fn finish(
&mut self,
last_frame_no: Option<FrameNo>,
state: TxnStatus,
is_autocommit: bool,
) -> Result<(), QueryResultBuilderError> {
self.push(Step::Finish(Finish {
last_frame_no,
state: RpcState::from(state).into(),
is_autocommit,
}))?;
self.flush()?;
Ok(())
Expand Down

0 comments on commit c6abeef

Please sign in to comment.