Skip to content

Commit

Permalink
libsql-wal
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed May 27, 2024
1 parent 9da7b2c commit c79517c
Show file tree
Hide file tree
Showing 2,294 changed files with 131,905 additions and 264 deletions.
437 changes: 225 additions & 212 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions bindings/c/include/libsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@

#include <stdint.h>

#define LIBSQL_INT 1

#define LIBSQL_FLOAT 2

#define LIBSQL_TEXT 3

#define LIBSQL_BLOB 4

#define LIBSQL_NULL 5

typedef struct libsql_connection libsql_connection;

typedef struct libsql_database libsql_database;
Expand Down Expand Up @@ -76,6 +86,8 @@ int libsql_query_stmt(libsql_stmt_t stmt, libsql_rows_t *out_rows, const char **

int libsql_execute_stmt(libsql_stmt_t stmt, const char **out_err_msg);

int libsql_reset_stmt(libsql_stmt_t stmt, const char **out_err_msg);

void libsql_free_stmt(libsql_stmt_t stmt);

int libsql_query(libsql_connection_t conn, const char *sql, libsql_rows_t *out_rows, const char **out_err_msg);
Expand Down
24 changes: 19 additions & 5 deletions bindings/c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,20 @@ pub unsafe extern "C" fn libsql_execute_stmt(
}
}

#[no_mangle]
pub unsafe extern "C" fn libsql_reset_stmt(
stmt: libsql_stmt_t,
out_err_msg: *mut *const std::ffi::c_char,
) -> std::ffi::c_int {
if stmt.is_null() {
set_err_msg("Null statement".to_string(), out_err_msg);
return 1;
}
let stmt = stmt.get_ref_mut();
stmt.params.clear();
0
}

#[no_mangle]
pub unsafe extern "C" fn libsql_free_stmt(stmt: libsql_stmt_t) {
if stmt.is_null() {
Expand Down Expand Up @@ -596,19 +610,19 @@ pub unsafe extern "C" fn libsql_column_type(
let row = row.get_ref();
match row.get_value(col) {
Ok(libsql::Value::Null) => {
*out_type = 5 as i32;
*out_type = types::LIBSQL_NULL as i32;
}
Ok(libsql::Value::Text(_)) => {
*out_type = 3 as i32;
*out_type = types::LIBSQL_TEXT as i32;
}
Ok(libsql::Value::Integer(_)) => {
*out_type = 1 as i32;
*out_type = types::LIBSQL_INT as i32;
}
Ok(libsql::Value::Real(_)) => {
*out_type = 2 as i32;
*out_type = types::LIBSQL_FLOAT as i32;
}
Ok(libsql::Value::Blob(_)) => {
*out_type = 4 as i32;
*out_type = types::LIBSQL_BLOB as i32;
}
Err(e) => {
set_err_msg(format!("Error fetching value: {e}"), out_err_msg);
Expand Down
6 changes: 6 additions & 0 deletions bindings/c/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
pub const LIBSQL_INT: i8 = 1;
pub const LIBSQL_FLOAT: i8 = 2;
pub const LIBSQL_TEXT: i8 = 3;
pub const LIBSQL_BLOB: i8 = 4;
pub const LIBSQL_NULL: i8 = 5;

#[derive(Clone, Debug)]
#[repr(C)]
pub struct blob {
Expand Down
6 changes: 3 additions & 3 deletions libsql-ffi/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ fn build_multiple_ciphers(out_path: &Path) {
let cxx = env("CXX");

let toolchain_path = sqlite3mc_build_dir.join("toolchain.cmake");
let cmake_toolchain_opt = format!("-DCMAKE_TOOLCHAIN_FILE=toolchain.cmake");
let cmake_toolchain_opt = "-DCMAKE_TOOLCHAIN_FILE=toolchain.cmake".to_string();

let mut toolchain_file = OpenOptions::new()
.create(true)
Expand Down Expand Up @@ -363,8 +363,8 @@ fn build_multiple_ciphers(out_path: &Path) {

let mut make = Command::new("cmake");
make.current_dir(sqlite3mc_build_dir.clone());
make.args(&["--build", "."]);
make.args(&["--config", "Release"]);
make.args(["--build", "."]);
make.args(["--config", "Release"]);
if !make.status().unwrap().success() {
panic!("Failed to run make");
}
Expand Down
1 change: 1 addition & 0 deletions libsql-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ itertools = "0.10.5"
jsonwebtoken = "9"
libsql = { path = "../libsql/", optional = true }
libsql_replication = { path = "../libsql-replication" }
libsql-wal = { path = "../libsql-wal/" }
metrics = "0.21.1"
metrics-util = "0.15"
metrics-exporter-prometheus = "0.12.2"
Expand Down
25 changes: 15 additions & 10 deletions libsql-server/src/connection/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use std::time::{Duration, Instant};
use crossbeam::deque::Steal;
use crossbeam::sync::{Parker, Unparker};
use hashbrown::HashMap;

Check failure on line 8 in libsql-server/src/connection/connection_manager.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/connection/connection_manager.rs
use libsql_sys::wal::either::Either;
use libsql_sys::wal::wrapper::{WrapWal, WrappedWal};
use libsql_sys::wal::{CheckpointMode, Sqlite3Wal, Wal};
use libsql_sys::wal::{CheckpointMode, Sqlite3Wal, Wal, Sqlite3WalManager};
use libsql_wal::fs::StdFs;
use libsql_wal::wal::{LibsqlWal, LibsqlWalManager};
use metrics::atomics::AtomicU64;
use parking_lot::{Mutex, MutexGuard};
use rusqlite::ErrorCode;
Expand All @@ -17,7 +20,9 @@ use super::TXN_TIMEOUT;

pub type ConnId = u64;

pub type ManagedConnectionWal = WrappedWal<ManagedConnectionWalWrapper, Sqlite3Wal>;
pub type InnerWalManager = Either<Sqlite3WalManager, LibsqlWalManager<StdFs>>;
pub type InnerWal = Either<Sqlite3Wal, LibsqlWal<StdFs>>;
pub type ManagedConnectionWal = WrappedWal<ManagedConnectionWalWrapper, InnerWal>;

#[derive(Copy, Clone, Debug)]
struct Slot {
Expand Down Expand Up @@ -359,9 +364,9 @@ impl SlotState {
}
}

impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
impl WrapWal<InnerWal> for ManagedConnectionWalWrapper {
#[tracing::instrument(skip_all, fields(id = self.id))]
fn begin_write_txn(&mut self, wrapped: &mut Sqlite3Wal) -> libsql_sys::wal::Result<()> {
fn begin_write_txn(&mut self, wrapped: &mut InnerWal) -> libsql_sys::wal::Result<()> {
tracing::debug!("begin write");
self.acquire()?;
match wrapped.begin_write_txn() {
Expand Down Expand Up @@ -390,7 +395,7 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
#[tracing::instrument(skip_all, fields(id = self.id))]
fn checkpoint(
&mut self,
wrapped: &mut Sqlite3Wal,
wrapped: &mut InnerWal,
db: &mut libsql_sys::wal::Sqlite3Db,
mode: libsql_sys::wal::CheckpointMode,
busy_handler: Option<&mut dyn libsql_sys::wal::BusyHandler>,
Expand Down Expand Up @@ -441,13 +446,13 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn begin_read_txn(&mut self, wrapped: &mut Sqlite3Wal) -> libsql_sys::wal::Result<bool> {
fn begin_read_txn(&mut self, wrapped: &mut InnerWal) -> libsql_sys::wal::Result<bool> {
tracing::debug!("begin read txn");
wrapped.begin_read_txn()
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn end_read_txn(&mut self, wrapped: &mut Sqlite3Wal) {
fn end_read_txn(&mut self, wrapped: &mut InnerWal) {
wrapped.end_read_txn();
{
let current = self.manager.current.lock();
Expand All @@ -470,7 +475,7 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn end_write_txn(&mut self, wrapped: &mut Sqlite3Wal) -> libsql_sys::wal::Result<()> {
fn end_write_txn(&mut self, wrapped: &mut InnerWal) -> libsql_sys::wal::Result<()> {
wrapped.end_write_txn()?;
tracing::debug!("end write txn");
self.release();
Expand All @@ -479,10 +484,10 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn close<M: libsql_sys::wal::WalManager<Wal = Sqlite3Wal>>(
fn close<M: libsql_sys::wal::WalManager<Wal = InnerWal>>(
&mut self,
manager: &M,
wrapped: &mut Sqlite3Wal,
wrapped: &mut InnerWal,
db: &mut libsql_sys::wal::Sqlite3Db,
sync_flags: std::ffi::c_int,
_scratch: Option<&mut [u8]>,
Expand Down
23 changes: 19 additions & 4 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use libsql_sys::wal::wrapper::{WrapWal, WrappedWal};
use libsql_sys::wal::{BusyHandler, CheckpointCallback, Sqlite3WalManager, Wal, WalManager};
use libsql_sys::wal::{BusyHandler, CheckpointCallback, Wal, WalManager};
use libsql_sys::EncryptionConfig;
use metrics::histogram;
use parking_lot::Mutex;
Expand All @@ -25,7 +25,7 @@ use crate::stats::{Stats, StatsUpdateMessage};
use crate::{Result, BLOCKING_RT};

Check failure on line 25 in libsql-server/src/connection/libsql.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/connection/libsql.rs

use super::connection_manager::{
ConnectionManager, ManagedConnectionWal, ManagedConnectionWalWrapper,
ConnectionManager, ManagedConnectionWal, ManagedConnectionWalWrapper, InnerWalManager,
};
use super::program::{
check_describe_auth, check_program_auth, DescribeCol, DescribeParam, DescribeResponse, Vm,
Expand All @@ -48,6 +48,7 @@ pub struct MakeLibSqlConn<W> {
encryption_config: Option<EncryptionConfig>,
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
}

impl<W> MakeLibSqlConn<W>
Expand All @@ -68,6 +69,7 @@ where
encryption_config: Option<EncryptionConfig>,
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
) -> Result<Self> {
let txn_timeout = config_store.get().txn_timeout.unwrap_or(TXN_TIMEOUT);

Expand All @@ -86,6 +88,7 @@ where
block_writes,
resolve_attach_path,
connection_manager: ConnectionManager::new(txn_timeout),
make_wal_manager,
};

let db = this.try_create_db().await?;
Expand Down Expand Up @@ -140,6 +143,7 @@ where
self.block_writes.clone(),
self.resolve_attach_path.clone(),
self.connection_manager.clone(),
self.make_wal_manager.clone(),
)
.await
}
Expand All @@ -164,6 +168,9 @@ pub struct LibSqlConnection<T> {
#[cfg(test)]
impl LibSqlConnection<libsql_sys::wal::wrapper::PassthroughWalWrapper> {
pub async fn new_test(path: &Path) -> Self {
use libsql_sys::wal::either::Either;
use libsql_sys::wal::Sqlite3WalManager;

Self::new(
path.to_owned(),
Arc::new([]),
Expand All @@ -175,6 +182,7 @@ impl LibSqlConnection<libsql_sys::wal::wrapper::PassthroughWalWrapper> {
Default::default(),
Arc::new(|_| unreachable!()),
ConnectionManager::new(TXN_TIMEOUT),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap()
Expand Down Expand Up @@ -309,13 +317,14 @@ where
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
connection_manager: ConnectionManager,
make_wal: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
) -> crate::Result<Self> {
let (conn, id) = tokio::task::spawn_blocking({
let connection_manager = connection_manager.clone();
move || -> crate::Result<_> {
let manager = ManagedConnectionWalWrapper::new(connection_manager);
let id = manager.id();
let wal = Sqlite3WalManager::default().wrap(manager).wrap(wal_wrapper);
let wal = make_wal().wrap(manager).wrap(wal_wrapper);

let conn = Connection::new(
path.as_ref(),
Expand Down Expand Up @@ -687,8 +696,9 @@ where
#[cfg(test)]
mod test {
use itertools::Itertools;
use libsql_sys::wal::either::Either;
use libsql_sys::wal::wrapper::PassthroughWalWrapper;
use libsql_sys::wal::Sqlite3Wal;
use libsql_sys::wal::{Sqlite3Wal, Sqlite3WalManager};
use rand::Rng;
use tempfile::tempdir;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -750,6 +760,7 @@ mod test {
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -794,6 +805,7 @@ mod test {
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -843,6 +855,7 @@ mod test {
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -924,6 +937,7 @@ mod test {
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -1009,6 +1023,7 @@ mod test {
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions libsql-server/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::replication::FrameNo;
use crate::stats::Stats;
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};

use super::connection_manager::InnerWalManager;
use super::libsql::{LibSqlConnection, MakeLibSqlConn};
use super::program::DescribeResponse;
use super::{Connection, RequestContext};
Expand Down Expand Up @@ -60,6 +61,7 @@ impl MakeWriteProxyConn {
primary_replication_index: Option<FrameNo>,

Check failure on line 61 in libsql-server/src/connection/write_proxy.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/connection/write_proxy.rs
encryption_config: Option<EncryptionConfig>,
resolve_attach_path: ResolveNamespacePathFn,
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Send + Sync + 'static>,
) -> crate::Result<Self> {
let client = ProxyClient::with_origin(channel, uri);
let make_read_only_conn = MakeLibSqlConn::new(
Expand All @@ -75,6 +77,7 @@ impl MakeWriteProxyConn {
encryption_config.clone(),
Arc::new(AtomicBool::new(false)), // this is always false for write proxy
resolve_attach_path,
make_wal_manager,
)
.await?;

Expand Down
4 changes: 4 additions & 0 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use config::{
use http::user::UserApi;

Check failure on line 28 in libsql-server/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/lib.rs
use hyper::client::HttpConnector;
use hyper_rustls::HttpsConnector;
use libsql_sys::wal::Sqlite3WalManager;
use libsql_sys::wal::either::Either;
use namespace::{NamespaceConfig, NamespaceName};
use net::Connector;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -413,6 +415,7 @@ where
let (scheduler_sender, scheduler_receiver) = mpsc::channel(128);

let (stats_sender, stats_receiver) = mpsc::channel(8);
let make_wal_manager = Arc::new(|| Either::Left(Sqlite3WalManager::default()));
let ns_config = NamespaceConfig {
db_kind,
base_path: self.path.clone(),
Expand All @@ -431,6 +434,7 @@ where
channel: channel.clone(),
uri: uri.clone(),
migration_scheduler: scheduler_sender.into(),
make_wal_manager,
};

let (metastore_conn_maker, meta_store_wal_manager) =
Expand Down
Loading

0 comments on commit c79517c

Please sign in to comment.