Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 84 additions & 32 deletions engine/packages/depot-client/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ pub struct NativeWriteConnectionLease {
newly_opened: bool,
}

enum ReadAcquireAction {
Wait,
Open(NativeVfsHandle),
UseIdle(NativeConnection),
}

impl NativeConnectionManager {
pub fn new(
vfs: NativeVfsHandle,
Expand Down Expand Up @@ -156,50 +162,65 @@ impl NativeConnectionManager {
let wait_started_at = Instant::now();
loop {
let notified = self.inner.changed.notified();
let open_result = {
let (action, expired_read_connections) = {
let mut state = self.inner.state.lock().await;
let closed_readers = state.prune_expired_readers(self.inner.config.idle_ttl);
self.record_reader_closes(closed_readers);
self.record_reader_gauges(&state);
if state.vfs.is_none() {
return Err(anyhow!("sqlite connection manager is closed"));
}
if matches!(state.mode, NativeConnectionManagerMode::Closing) {
return Err(anyhow!("sqlite connection manager is closing"));
}
let expired_read_connections: Vec<NativeConnection> = state
.prune_expired_readers(self.inner.config.idle_ttl)
.into_iter()
.map(|reader| reader.connection)
.collect();
self.record_reader_closes(expired_read_connections.len());
self.record_reader_gauges(&state);
if state.pending_writers > 0
|| matches!(state.mode, NativeConnectionManagerMode::WriteMode)
|| state.active_writer
{
None
(ReadAcquireAction::Wait, expired_read_connections)
} else if let Some(connection) = state.idle_readers.pop() {
state.active_readers += 1;
self.record_mode_transition(state.refresh_mode());
self.record_reader_gauges(&state);
self.observe_read_wait(wait_started_at.elapsed());
return Ok(NativeReadConnectionLease {
manager: self.clone(),
connection: Some(connection.connection),
newly_opened: false,
});
(
ReadAcquireAction::UseIdle(connection.connection),
expired_read_connections,
)
} else if state.open_readers < self.inner.config.max_readers {
state.active_readers += 1;
state.open_readers += 1;
self.record_mode_transition(state.set_mode(NativeConnectionManagerMode::ReadMode));
self.record_reader_gauges(&state);
Some(
state
.vfs
.as_ref()
.expect("vfs checked above")
.clone(),
(
ReadAcquireAction::Open(
state
.vfs
.as_ref()
.expect("vfs checked above")
.clone(),
),
expired_read_connections,
)
} else {
None
(ReadAcquireAction::Wait, expired_read_connections)
}
};
drop_native_connections_blocking(expired_read_connections).await;

if let Some(vfs) = open_result {
if let ReadAcquireAction::UseIdle(connection) = action {
return Ok(NativeReadConnectionLease {
manager: self.clone(),
connection: Some(connection),
newly_opened: false,
});
}

if let ReadAcquireAction::Open(vfs) = action {
let file_name = self.inner.file_name.clone();
match tokio::task::spawn_blocking(move || {
open_connection(vfs, &file_name, SQLITE_OPEN_READONLY)
Expand Down Expand Up @@ -283,9 +304,13 @@ impl NativeConnectionManager {
};

if let Some((vfs, idle_readers)) = open_result {
drop(idle_readers);
let idle_read_connections = idle_readers
.into_iter()
.map(|reader| reader.connection)
.collect::<Vec<_>>();
let file_name = self.inner.file_name.clone();
match tokio::task::spawn_blocking(move || {
drop(idle_read_connections);
open_connection(
vfs,
&file_name,
Expand Down Expand Up @@ -391,20 +416,25 @@ impl NativeConnectionManager {
}

pub async fn close(&self) -> Result<()> {
let idle_readers = {
let idle_connections = {
let mut state = self.inner.state.lock().await;
if state.vfs.is_none() {
return Ok(());
}
state.mode = NativeConnectionManagerMode::Closing;
state.open_readers = state.open_readers.saturating_sub(state.idle_readers.len());
self.inner.changed.notify_waiters();
state.idle_writer.take();
let idle_writer = state.idle_writer.take();
self.record_reader_closes(state.idle_readers.len());
self.record_reader_gauges(&state);
std::mem::take(&mut state.idle_readers)
let mut connections = std::mem::take(&mut state.idle_readers)
.into_iter()
.map(|reader| reader.connection)
.collect::<Vec<_>>();
connections.extend(idle_writer);
connections
};
drop(idle_readers);
drop_native_connections_blocking(idle_connections).await;

loop {
let notified = self.inner.changed.notified();
Expand All @@ -419,7 +449,7 @@ impl NativeConnectionManager {
};

if let Some(vfs) = vfs {
drop(vfs);
drop_vfs_blocking(vfs).await;
self.inner.changed.notify_waiters();
return Ok(());
}
Expand Down Expand Up @@ -537,7 +567,7 @@ impl NativeReadConnectionLease {
if idle_connection.is_some() {
self.manager.record_reader_closes(1);
}
drop(idle_connection);
drop_native_connections_blocking(idle_connection.into_iter().collect()).await;
self.manager.inner.changed.notify_waiters();
}
}
Expand Down Expand Up @@ -599,7 +629,7 @@ impl NativeWriteConnectionLease {
connection
}
};
drop(close_connection);
drop_native_connections_blocking(close_connection.into_iter().collect()).await;
self.manager.inner.changed.notify_waiters();
}
}
Expand Down Expand Up @@ -649,17 +679,39 @@ impl NativeConnectionManagerState {
}
}

fn prune_expired_readers(&mut self, idle_ttl: Duration) -> usize {
fn prune_expired_readers(&mut self, idle_ttl: Duration) -> Vec<IdleReadConnection> {
let now = Instant::now();
let before = self.idle_readers.len();
self.idle_readers
.retain(|reader| now.duration_since(reader.idle_since) < idle_ttl);
let closed = before - self.idle_readers.len();
let mut expired = Vec::new();
let mut retained = Vec::with_capacity(self.idle_readers.len());
for reader in self.idle_readers.drain(..) {
if now.duration_since(reader.idle_since) < idle_ttl {
retained.push(reader);
} else {
expired.push(reader);
}
}
self.idle_readers = retained;
let closed = expired.len();
self.open_readers = self.open_readers.saturating_sub(closed);
if closed > 0 {
self.refresh_mode();
}
closed
expired
}
}

async fn drop_native_connections_blocking(connections: Vec<NativeConnection>) {
if connections.is_empty() {
return;
}
if let Err(err) = tokio::task::spawn_blocking(move || drop(connections)).await {
tracing::warn!(?err, "failed to join sqlite connection close task");
}
}

async fn drop_vfs_blocking(vfs: NativeVfsHandle) {
if let Err(err) = tokio::task::spawn_blocking(move || drop(vfs)).await {
tracing::warn!(?err, "failed to join sqlite vfs close task");
}
}

Expand Down
25 changes: 18 additions & 7 deletions engine/packages/depot-client/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use moka::sync::Cache;
use parking_lot::{Mutex, RwLock};
use rivet_envoy_client::handle::EnvoyHandle;
use rivet_envoy_protocol as protocol;
use tokio::runtime::Handle;
use tokio::runtime::{Handle, RuntimeFlavor};

use crate::optimization_flags::{SqliteOptimizationFlags, sqlite_optimization_flags};

Expand Down Expand Up @@ -1549,12 +1549,23 @@ fn fetch_initial_main_page(
runtime: &Handle,
actor_id: &str,
) -> std::result::Result<Option<Vec<u8>>, String> {
let response = runtime.block_on(transport.get_pages(protocol::SqliteGetPagesRequest {
actor_id: actor_id.to_string(),
pgnos: vec![1],
expected_generation: None,
expected_head_txid: None,
}));
if matches!(runtime.runtime_flavor(), RuntimeFlavor::CurrentThread) {
return Err(
"sqlite VFS registration cannot synchronously fetch the initial page on a current-thread Tokio runtime"
.to_string(),
);
}
// `register` is invoked from inside an async context (e.g. `open_database_from_envoy`),
// so plain `Handle::block_on` panics. Drop into `block_in_place` to bridge sync VFS
// registration into the async transport call.
let response = tokio::task::block_in_place(|| {
runtime.block_on(transport.get_pages(protocol::SqliteGetPagesRequest {
actor_id: actor_id.to_string(),
pgnos: vec![1],
expected_generation: None,
expected_head_txid: None,
}))
});

match response {
Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk(ok)) => Ok(ok
Expand Down
Loading