Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic wal with registry #1279

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
437 changes: 225 additions & 212 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions bindings/c/include/libsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@

#include <stdint.h>

#define LIBSQL_INT 1

#define LIBSQL_FLOAT 2

#define LIBSQL_TEXT 3

#define LIBSQL_BLOB 4

#define LIBSQL_NULL 5

typedef struct libsql_connection libsql_connection;

typedef struct libsql_database libsql_database;
Expand Down Expand Up @@ -76,6 +86,8 @@ int libsql_query_stmt(libsql_stmt_t stmt, libsql_rows_t *out_rows, const char **

int libsql_execute_stmt(libsql_stmt_t stmt, const char **out_err_msg);

int libsql_reset_stmt(libsql_stmt_t stmt, const char **out_err_msg);

void libsql_free_stmt(libsql_stmt_t stmt);

int libsql_query(libsql_connection_t conn, const char *sql, libsql_rows_t *out_rows, const char **out_err_msg);
Expand Down
24 changes: 19 additions & 5 deletions bindings/c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,20 @@ pub unsafe extern "C" fn libsql_execute_stmt(
}
}

#[no_mangle]
pub unsafe extern "C" fn libsql_reset_stmt(
stmt: libsql_stmt_t,
out_err_msg: *mut *const std::ffi::c_char,
) -> std::ffi::c_int {
if stmt.is_null() {
set_err_msg("Null statement".to_string(), out_err_msg);
return 1;
}
let stmt = stmt.get_ref_mut();
stmt.params.clear();
0
}

#[no_mangle]
pub unsafe extern "C" fn libsql_free_stmt(stmt: libsql_stmt_t) {
if stmt.is_null() {
Expand Down Expand Up @@ -596,19 +610,19 @@ pub unsafe extern "C" fn libsql_column_type(
let row = row.get_ref();
match row.get_value(col) {
Ok(libsql::Value::Null) => {
*out_type = 5 as i32;
*out_type = types::LIBSQL_NULL as i32;
}
Ok(libsql::Value::Text(_)) => {
*out_type = 3 as i32;
*out_type = types::LIBSQL_TEXT as i32;
}
Ok(libsql::Value::Integer(_)) => {
*out_type = 1 as i32;
*out_type = types::LIBSQL_INT as i32;
}
Ok(libsql::Value::Real(_)) => {
*out_type = 2 as i32;
*out_type = types::LIBSQL_FLOAT as i32;
}
Ok(libsql::Value::Blob(_)) => {
*out_type = 4 as i32;
*out_type = types::LIBSQL_BLOB as i32;
}
Err(e) => {
set_err_msg(format!("Error fetching value: {e}"), out_err_msg);
Expand Down
6 changes: 6 additions & 0 deletions bindings/c/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
pub const LIBSQL_INT: i8 = 1;
pub const LIBSQL_FLOAT: i8 = 2;
pub const LIBSQL_TEXT: i8 = 3;
pub const LIBSQL_BLOB: i8 = 4;
pub const LIBSQL_NULL: i8 = 5;

#[derive(Clone, Debug)]
#[repr(C)]
pub struct blob {
Expand Down
6 changes: 3 additions & 3 deletions libsql-ffi/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ fn build_multiple_ciphers(out_path: &Path) {
let cxx = env("CXX");

let toolchain_path = sqlite3mc_build_dir.join("toolchain.cmake");
let cmake_toolchain_opt = format!("-DCMAKE_TOOLCHAIN_FILE=toolchain.cmake");
let cmake_toolchain_opt = "-DCMAKE_TOOLCHAIN_FILE=toolchain.cmake".to_string();

let mut toolchain_file = OpenOptions::new()
.create(true)
Expand Down Expand Up @@ -363,8 +363,8 @@ fn build_multiple_ciphers(out_path: &Path) {

let mut make = Command::new("cmake");
make.current_dir(sqlite3mc_build_dir.clone());
make.args(&["--build", "."]);
make.args(&["--config", "Release"]);
make.args(["--build", "."]);
make.args(["--config", "Release"]);
if !make.status().unwrap().success() {
panic!("Failed to run make");
}
Expand Down
1 change: 1 addition & 0 deletions libsql-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ itertools = "0.10.5"
jsonwebtoken = "9"
libsql = { path = "../libsql/", optional = true }
libsql_replication = { path = "../libsql-replication" }
libsql-wal = { path = "../libsql-wal/" }
metrics = "0.21.1"
metrics-util = "0.15"
metrics-exporter-prometheus = "0.12.2"
Expand Down
25 changes: 15 additions & 10 deletions libsql-server/src/connection/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

use crossbeam::deque::Steal;
use crossbeam::sync::{Parker, Unparker};
use hashbrown::HashMap;

Check failure on line 8 in libsql-server/src/connection/connection_manager.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/connection/connection_manager.rs
use libsql_sys::wal::either::Either;
use libsql_sys::wal::wrapper::{WrapWal, WrappedWal};
use libsql_sys::wal::{CheckpointMode, Sqlite3Wal, Wal};
use libsql_sys::wal::{CheckpointMode, Sqlite3Wal, Wal, Sqlite3WalManager};
use libsql_wal::fs::StdFs;
use libsql_wal::wal::{LibsqlWal, LibsqlWalManager};
use metrics::atomics::AtomicU64;
use parking_lot::{Mutex, MutexGuard};
use rusqlite::ErrorCode;
Expand All @@ -17,7 +20,9 @@

pub type ConnId = u64;

pub type ManagedConnectionWal = WrappedWal<ManagedConnectionWalWrapper, Sqlite3Wal>;
pub type InnerWalManager = Either<Sqlite3WalManager, LibsqlWalManager<StdFs>>;
pub type InnerWal = Either<Sqlite3Wal, LibsqlWal<StdFs>>;
pub type ManagedConnectionWal = WrappedWal<ManagedConnectionWalWrapper, InnerWal>;

#[derive(Copy, Clone, Debug)]
struct Slot {
Expand Down Expand Up @@ -359,9 +364,9 @@
}
}

impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
impl WrapWal<InnerWal> for ManagedConnectionWalWrapper {
#[tracing::instrument(skip_all, fields(id = self.id))]
fn begin_write_txn(&mut self, wrapped: &mut Sqlite3Wal) -> libsql_sys::wal::Result<()> {
fn begin_write_txn(&mut self, wrapped: &mut InnerWal) -> libsql_sys::wal::Result<()> {
tracing::debug!("begin write");
self.acquire()?;
match wrapped.begin_write_txn() {
Expand Down Expand Up @@ -390,7 +395,7 @@
#[tracing::instrument(skip_all, fields(id = self.id))]
fn checkpoint(
&mut self,
wrapped: &mut Sqlite3Wal,
wrapped: &mut InnerWal,
db: &mut libsql_sys::wal::Sqlite3Db,
mode: libsql_sys::wal::CheckpointMode,
busy_handler: Option<&mut dyn libsql_sys::wal::BusyHandler>,
Expand Down Expand Up @@ -441,13 +446,13 @@
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn begin_read_txn(&mut self, wrapped: &mut Sqlite3Wal) -> libsql_sys::wal::Result<bool> {
fn begin_read_txn(&mut self, wrapped: &mut InnerWal) -> libsql_sys::wal::Result<bool> {
tracing::debug!("begin read txn");
wrapped.begin_read_txn()
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn end_read_txn(&mut self, wrapped: &mut Sqlite3Wal) {
fn end_read_txn(&mut self, wrapped: &mut InnerWal) {
wrapped.end_read_txn();
{
let current = self.manager.current.lock();
Expand All @@ -470,7 +475,7 @@
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn end_write_txn(&mut self, wrapped: &mut Sqlite3Wal) -> libsql_sys::wal::Result<()> {
fn end_write_txn(&mut self, wrapped: &mut InnerWal) -> libsql_sys::wal::Result<()> {
wrapped.end_write_txn()?;
tracing::debug!("end write txn");
self.release();
Expand All @@ -479,10 +484,10 @@
}

#[tracing::instrument(skip_all, fields(id = self.id))]
fn close<M: libsql_sys::wal::WalManager<Wal = Sqlite3Wal>>(
fn close<M: libsql_sys::wal::WalManager<Wal = InnerWal>>(
&mut self,
manager: &M,
wrapped: &mut Sqlite3Wal,
wrapped: &mut InnerWal,
db: &mut libsql_sys::wal::Sqlite3Db,
sync_flags: std::ffi::c_int,
_scratch: Option<&mut [u8]>,
Expand Down
23 changes: 19 additions & 4 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::sync::Arc;

use libsql_sys::wal::wrapper::{WrapWal, WrappedWal};
use libsql_sys::wal::{BusyHandler, CheckpointCallback, Sqlite3WalManager, Wal, WalManager};
use libsql_sys::wal::{BusyHandler, CheckpointCallback, Wal, WalManager};
use libsql_sys::EncryptionConfig;
use metrics::histogram;
use parking_lot::Mutex;
Expand All @@ -22,10 +22,10 @@
use crate::query_result_builder::{QueryBuilderConfig, QueryResultBuilder};
use crate::replication::FrameNo;
use crate::stats::{Stats, StatsUpdateMessage};
use crate::{Result, BLOCKING_RT};

Check failure on line 25 in libsql-server/src/connection/libsql.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/connection/libsql.rs

use super::connection_manager::{
ConnectionManager, ManagedConnectionWal, ManagedConnectionWalWrapper,
ConnectionManager, ManagedConnectionWal, ManagedConnectionWalWrapper, InnerWalManager,
};
use super::program::{
check_describe_auth, check_program_auth, DescribeCol, DescribeParam, DescribeResponse, Vm,
Expand All @@ -48,6 +48,7 @@
encryption_config: Option<EncryptionConfig>,
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
}

impl<W> MakeLibSqlConn<W>
Expand All @@ -68,6 +69,7 @@
encryption_config: Option<EncryptionConfig>,
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
) -> Result<Self> {
let txn_timeout = config_store.get().txn_timeout.unwrap_or(TXN_TIMEOUT);

Expand All @@ -86,6 +88,7 @@
block_writes,
resolve_attach_path,
connection_manager: ConnectionManager::new(txn_timeout),
make_wal_manager,
};

let db = this.try_create_db().await?;
Expand Down Expand Up @@ -140,6 +143,7 @@
self.block_writes.clone(),
self.resolve_attach_path.clone(),
self.connection_manager.clone(),
self.make_wal_manager.clone(),
)
.await
}
Expand All @@ -164,6 +168,9 @@
#[cfg(test)]
impl LibSqlConnection<libsql_sys::wal::wrapper::PassthroughWalWrapper> {
pub async fn new_test(path: &Path) -> Self {
use libsql_sys::wal::either::Either;
use libsql_sys::wal::Sqlite3WalManager;

Self::new(
path.to_owned(),
Arc::new([]),
Expand All @@ -175,6 +182,7 @@
Default::default(),
Arc::new(|_| unreachable!()),
ConnectionManager::new(TXN_TIMEOUT),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap()
Expand Down Expand Up @@ -309,13 +317,14 @@
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
connection_manager: ConnectionManager,
make_wal: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
) -> crate::Result<Self> {
let (conn, id) = tokio::task::spawn_blocking({
let connection_manager = connection_manager.clone();
move || -> crate::Result<_> {
let manager = ManagedConnectionWalWrapper::new(connection_manager);
let id = manager.id();
let wal = Sqlite3WalManager::default().wrap(manager).wrap(wal_wrapper);
let wal = make_wal().wrap(manager).wrap(wal_wrapper);

let conn = Connection::new(
path.as_ref(),
Expand Down Expand Up @@ -687,8 +696,9 @@
#[cfg(test)]
mod test {
use itertools::Itertools;
use libsql_sys::wal::either::Either;
use libsql_sys::wal::wrapper::PassthroughWalWrapper;
use libsql_sys::wal::Sqlite3Wal;
use libsql_sys::wal::{Sqlite3Wal, Sqlite3WalManager};
use rand::Rng;
use tempfile::tempdir;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -750,6 +760,7 @@
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -794,6 +805,7 @@
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -843,6 +855,7 @@
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -924,6 +937,7 @@
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down Expand Up @@ -1009,6 +1023,7 @@
None,
Default::default(),
Arc::new(|_| unreachable!()),
Arc::new(|| Either::Left(Sqlite3WalManager::default())),
)
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions libsql-server/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use crate::stats::Stats;
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};

use super::connection_manager::InnerWalManager;
use super::libsql::{LibSqlConnection, MakeLibSqlConn};
use super::program::DescribeResponse;
use super::{Connection, RequestContext};
Expand Down Expand Up @@ -57,9 +58,10 @@
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
max_response_size: u64,
max_total_response_size: u64,
primary_replication_index: Option<FrameNo>,

Check failure on line 61 in libsql-server/src/connection/write_proxy.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/connection/write_proxy.rs
encryption_config: Option<EncryptionConfig>,
resolve_attach_path: ResolveNamespacePathFn,
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Send + Sync + 'static>,
) -> crate::Result<Self> {
let client = ProxyClient::with_origin(channel, uri);
let make_read_only_conn = MakeLibSqlConn::new(
Expand All @@ -75,6 +77,7 @@
encryption_config.clone(),
Arc::new(AtomicBool::new(false)), // this is always false for write proxy
resolve_attach_path,
make_wal_manager,
)
.await?;

Expand Down
4 changes: 4 additions & 0 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
use config::{
AdminApiConfig, DbConfig, HeartbeatConfig, RpcClientConfig, RpcServerConfig, UserApiConfig,
};
use http::user::UserApi;

Check failure on line 28 in libsql-server/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/lib.rs
use hyper::client::HttpConnector;
use hyper_rustls::HttpsConnector;
use libsql_sys::wal::Sqlite3WalManager;
use libsql_sys::wal::either::Either;
use namespace::{NamespaceConfig, NamespaceName};
use net::Connector;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -413,6 +415,7 @@
let (scheduler_sender, scheduler_receiver) = mpsc::channel(128);

let (stats_sender, stats_receiver) = mpsc::channel(8);
let make_wal_manager = Arc::new(|| Either::Left(Sqlite3WalManager::default()));
let ns_config = NamespaceConfig {
db_kind,
base_path: self.path.clone(),
Expand All @@ -431,6 +434,7 @@
channel: channel.clone(),
uri: uri.clone(),
migration_scheduler: scheduler_sender.into(),
make_wal_manager,
};

let (metastore_conn_maker, meta_store_wal_manager) =
Expand Down
Loading
Loading