Skip to content

Commit

Permalink
Use the namespace resolver in durable WAL
Browse files Browse the repository at this point in the history
  • Loading branch information
avinassh committed Jun 14, 2024
1 parent 299eb56 commit b8d2daa
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 17 deletions.
6 changes: 5 additions & 1 deletion libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,11 @@ where
Some(CustomWAL::DurableWal) => {
tracing::info!("using durable wal");
let lock_manager = Arc::new(std::sync::Mutex::new(LockManager::new()));
let wal = DurableWalManager::new(lock_manager, self.storage_server_address.clone());
let wal = DurableWalManager::new(
lock_manager,
namespace_resolver,
self.storage_server_address.clone(),
);
Ok((
Arc::new(move || EitherWAL::C(wal.clone())),
Box::pin(ready(Ok(()))),
Expand Down
44 changes: 28 additions & 16 deletions libsql-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::BTreeMap;
use std::ffi::OsStr;
use std::os::unix::ffi::OsStrExt;
use std::sync::{Arc, Mutex};

use libsql_sys::ffi::{SQLITE_ABORT, SQLITE_BUSY};
use libsql_sys::name::{NamespaceName, NamespaceResolver};
use libsql_sys::rusqlite;
use libsql_sys::wal::{Result, Vfs, Wal, WalManager};
use rpc::storage_client::StorageClient;
Expand Down Expand Up @@ -32,13 +35,19 @@ pub struct DurableWalConfig {
#[derive(Clone)]
pub struct DurableWalManager {
lock_manager: Arc<Mutex<LockManager>>,
resolver: Arc<Box<dyn NamespaceResolver + Send + Sync + 'static>>,
config: DurableWalConfig,
}

impl DurableWalManager {
pub fn new(lock_manager: Arc<Mutex<LockManager>>, storage_server_address: String) -> Self {
pub fn new(
lock_manager: Arc<Mutex<LockManager>>,
resolver: impl NamespaceResolver,
storage_server_address: String,
) -> Self {
Self {
lock_manager,
resolver: Arc::new(Box::new(resolver)),
config: DurableWalConfig {
storage_server_address,
},
Expand All @@ -54,7 +63,7 @@ impl WalManager for DurableWalManager {
false
}

#[tracing::instrument(skip_all, fields(_db_path))]
#[tracing::instrument(skip_all, fields(db_path))]
fn open(
&self,
_vfs: &mut Vfs,
Expand All @@ -63,10 +72,13 @@ impl WalManager for DurableWalManager {
_max_log_size: i64,
db_path: &std::ffi::CStr,
) -> Result<Self::Wal> {
let _db_path = db_path.to_str().unwrap();
trace!("DurableWalManager::open()");
// TODO: use the actual namespace uuid from the connection
let namespace = "default".to_string();
let db_path = OsStr::from_bytes(&db_path.to_bytes());
let namespace = self.resolver.resolve(db_path.as_ref());
// TODO:
// the namespace can be `default` and multiple databases belonging to different
// groups might use same `default` as namespace. Either force only UUIDs to be supported
// as namespaces for durable-wal or use a different specifier like (<libsql_id>, <ns>)
trace!("DurableWalManager::open() ns = {}", namespace);
let rt = tokio::runtime::Handle::current();
let resp = DurableWal::new(namespace, self.config.clone(), self.lock_manager.clone());
let resp = tokio::task::block_in_place(|| rt.block_on(resp));
Expand Down Expand Up @@ -104,7 +116,7 @@ impl WalManager for DurableWalManager {
}

pub struct DurableWal {
namespace: String,
namespace: NamespaceName,
conn_id: String,
client: StorageClient<Channel>,
frames_cache: SieveCache<std::num::NonZeroU64, Vec<u8>>,
Expand All @@ -114,7 +126,7 @@ pub struct DurableWal {

impl DurableWal {
async fn new(
namespace: String,
namespace: NamespaceName,
config: DurableWalConfig,
lock_manager: Arc<Mutex<LockManager>>,
) -> Self {
Expand All @@ -139,7 +151,7 @@ impl DurableWal {
) -> Result<Option<std::num::NonZeroU64>> {
trace!("DurableWal::find_frame_by_page_no()");
let req = rpc::FindFrameRequest {
namespace: self.namespace.clone(),
namespace: self.namespace.to_string(),
page_no: page_no.get(),
max_frame_no: 0,
};
Expand All @@ -155,7 +167,7 @@ impl DurableWal {

async fn frames_count(&self) -> u64 {
let req = rpc::FramesInWalRequest {
namespace: self.namespace.clone(),
namespace: self.namespace.to_string(),
};
let mut binding = self.client.clone();
let resp = binding.frames_in_wal(req).await.unwrap();
Expand Down Expand Up @@ -184,7 +196,7 @@ impl Wal for DurableWal {
trace!(
"DurableWal::end_read_txn() id = {}, unlocked = {}",
self.conn_id,
lock_manager.unlock(self.namespace.clone(), self.conn_id.clone())
lock_manager.unlock(self.namespace.to_string(), self.conn_id.clone())
);
}

Expand Down Expand Up @@ -236,7 +248,7 @@ impl Wal for DurableWal {
return Ok(());
}
let req = rpc::ReadFrameRequest {
namespace: self.namespace.clone(),
namespace: self.namespace.to_string(),
frame_no: frame_no.get(),
};
let mut binding = self.client.clone();
Expand All @@ -261,7 +273,7 @@ impl Wal for DurableWal {
fn begin_write_txn(&mut self) -> Result<()> {
// todo: check if the connection holds a read lock then try to acquire a write lock
let mut lock_manager = self.lock_manager.lock().unwrap();
if !lock_manager.lock(self.namespace.clone(), self.conn_id.clone()) {
if !lock_manager.lock(self.namespace.to_string(), self.conn_id.clone()) {
trace!(
"DurableWal::begin_write_txn() lock acquired = false, id = {}",
self.conn_id
Expand All @@ -280,7 +292,7 @@ impl Wal for DurableWal {
trace!(
"DurableWal::end_write_txn() id = {}, unlocked = {}",
self.conn_id,
lock_manager.unlock(self.namespace.clone(), self.conn_id.clone())
lock_manager.unlock(self.namespace.to_string(), self.conn_id.clone())
);
Ok(())
}
Expand Down Expand Up @@ -311,7 +323,7 @@ impl Wal for DurableWal {
trace!("DurableWal::insert_frames()");
let rt = tokio::runtime::Handle::current();
let mut lock_manager = self.lock_manager.lock().unwrap();
if !lock_manager.is_lock_owner(self.namespace.clone(), self.conn_id.clone()) {
if !lock_manager.is_lock_owner(self.namespace.to_string(), self.conn_id.clone()) {
error!("DurableWal::insert_frames() was called without acquiring lock!",);
self.write_cache.clear();
return Err(rusqlite::ffi::Error::new(SQLITE_ABORT));
Expand All @@ -335,7 +347,7 @@ impl Wal for DurableWal {
}

let req = rpc::InsertFramesRequest {
namespace: self.namespace.clone(),
namespace: self.namespace.to_string(),
frames: self.write_cache.values().cloned().collect(),
max_frame_no: 0,
};
Expand Down

0 comments on commit b8d2daa

Please sign in to comment.