Skip to content

Commit

Permalink
SQLite unlock notification (#1658)
Browse files Browse the repository at this point in the history
* sqlite: add test for concurrent table access (failing)

* sqlite: implement unlock notification
  • Loading branch information
madadam committed Feb 16, 2022
1 parent 7fd324d commit 347374b
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 7 deletions.
1 change: 1 addition & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ libsqlite3-sys = { version = "0.23.2", optional = true, default-features = false
"pkg-config",
"vcpkg",
"bundled",
"unlock_notify"
] }
log = { version = "0.4.14", default-features = false }
md-5 = { version = "0.9.1", default-features = false, optional = true }
Expand Down
77 changes: 70 additions & 7 deletions sqlx-core/src/sqlite/statement/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::ptr;
use std::ptr::NonNull;
use std::slice::from_raw_parts;
use std::str::{from_utf8, from_utf8_unchecked};
use std::sync::{Condvar, Mutex};

use libsqlite3_sys::{
sqlite3, sqlite3_bind_blob64, sqlite3_bind_double, sqlite3_bind_int, sqlite3_bind_int64,
Expand All @@ -16,8 +17,8 @@ use libsqlite3_sys::{
sqlite3_column_name, sqlite3_column_origin_name, sqlite3_column_table_name,
sqlite3_column_type, sqlite3_column_value, sqlite3_db_handle, sqlite3_finalize, sqlite3_reset,
sqlite3_sql, sqlite3_step, sqlite3_stmt, sqlite3_stmt_readonly, sqlite3_table_column_metadata,
sqlite3_value, SQLITE_DONE, SQLITE_MISUSE, SQLITE_OK, SQLITE_ROW, SQLITE_TRANSIENT,
SQLITE_UTF8,
sqlite3_unlock_notify, sqlite3_value, SQLITE_DONE, SQLITE_LOCKED_SHAREDCACHE, SQLITE_MISUSE,
SQLITE_OK, SQLITE_ROW, SQLITE_TRANSIENT, SQLITE_UTF8,
};

use crate::error::{BoxDynError, Error};
Expand Down Expand Up @@ -305,11 +306,21 @@ impl StatementHandle {
pub(crate) fn step(&mut self) -> Result<bool, SqliteError> {
// SAFETY: we have exclusive access to the handle
unsafe {
match sqlite3_step(self.0.as_ptr()) {
SQLITE_ROW => Ok(true),
SQLITE_DONE => Ok(false),
SQLITE_MISUSE => panic!("misuse!"),
_ => Err(SqliteError::new(self.db_handle())),
loop {
match sqlite3_step(self.0.as_ptr()) {
SQLITE_ROW => return Ok(true),
SQLITE_DONE => return Ok(false),
SQLITE_MISUSE => panic!("misuse!"),
SQLITE_LOCKED_SHAREDCACHE => {
// The shared cache is locked by another connection. Wait for unlock
// notification and try again.
wait_for_unlock_notify(self.db_handle())?;
// Need to reset the handle after the unlock
// (https://www.sqlite.org/unlock_notify.html)
sqlite3_reset(self.0.as_ptr());
}
_ => return Err(SqliteError::new(self.db_handle())),
}
}
}
}
Expand All @@ -333,3 +344,55 @@ impl Drop for StatementHandle {
}
}
}

unsafe fn wait_for_unlock_notify(conn: *mut sqlite3) -> Result<(), SqliteError> {
let notify = Notify::new();

if sqlite3_unlock_notify(
conn,
Some(unlock_notify_cb),
&notify as *const Notify as *mut Notify as *mut _,
) != SQLITE_OK
{
return Err(SqliteError::new(conn));
}

notify.wait();

Ok(())
}

unsafe extern "C" fn unlock_notify_cb(ptr: *mut *mut c_void, len: c_int) {
let ptr = ptr as *mut &Notify;
let slice = from_raw_parts(ptr, len as usize);

for notify in slice {
notify.fire();
}
}

struct Notify {
mutex: Mutex<bool>,
condvar: Condvar,
}

impl Notify {
fn new() -> Self {
Self {
mutex: Mutex::new(false),
condvar: Condvar::new(),
}
}

fn wait(&self) {
let _ = self
.condvar
.wait_while(self.mutex.lock().unwrap(), |fired| !*fired)
.unwrap();
}

fn fire(&self) {
*self.mutex.lock().unwrap() = true;
self.condvar.notify_one();
}
}
57 changes: 57 additions & 0 deletions tests/sqlite/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,3 +662,60 @@ async fn issue_1467() -> anyhow::Result<()> {
}
Ok(())
}

#[sqlx_macros::test]
async fn concurrent_read_and_write() {
let pool: SqlitePool = SqlitePoolOptions::new()
.min_connections(2)
.connect(":memory:")
.await
.unwrap();

sqlx::query("CREATE TABLE kv (k PRIMARY KEY, v)")
.execute(&pool)
.await
.unwrap();

let n = 100;

let read = sqlx_rt::spawn({
let mut conn = pool.acquire().await.unwrap();

async move {
for i in 0u32..n {
sqlx::query("SELECT v FROM kv")
.bind(i)
.fetch_all(&mut conn)
.await
.unwrap();
}
}
});

let write = sqlx_rt::spawn({
let mut conn = pool.acquire().await.unwrap();

async move {
for i in 0u32..n {
sqlx::query("INSERT INTO kv (k, v) VALUES (?, ?)")
.bind(i)
.bind(i * i)
.execute(&mut conn)
.await
.unwrap();
}
}
});

#[cfg(any(feature = "_rt-tokio", feature = "_rt-actix"))]
{
read.await.unwrap();
write.await.unwrap();
}

#[cfg(feature = "_rt-async-std")]
{
read.await;
write.await;
}
}

0 comments on commit 347374b

Please sign in to comment.