Skip to content

Commit

Permalink
Reacquire lock when runtime exits
Browse files Browse the repository at this point in the history
This commit adds changes so that the main process almost immediately
acquires a lock on the data file when runtime is dropped. This is just
an added precaution to try and ensure that no other process does
something silly with the data file.

The descriptor is cloned for this using `FileLock::try_clone`
  • Loading branch information
ohsayan committed May 2, 2021
1 parent 74ce75d commit ce466eb
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/target
/.vscode
data.bin
*.bin
/server/snapshots
snapstore.bin
snapstore.partmap
Expand Down
11 changes: 6 additions & 5 deletions server/src/coredb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ macro_rules! flush_db {
crate::coredb::CoreDB::flush_db(&$db, None)
};
($db:expr, $file:expr) => {
crate::coredb::CoreDB::flush_db(&$db, Some($file))
crate::coredb::CoreDB::flush_db(&$db, Some(&mut $file))
};
}

Expand Down Expand Up @@ -286,7 +286,7 @@ impl CoreDB {
bgsave: BGSave,
snapshot_cfg: SnapshotConfig,
restore_file: Option<PathBuf>,
) -> TResult<(Self, Option<flock::FileLock>)> {
) -> TResult<(Self, Option<flock::FileLock>, flock::FileLock)> {
let coretable = diskstore::get_saved(restore_file)?;
let mut background_tasks: usize = 0;
if !bgsave.is_disabled() {
Expand Down Expand Up @@ -324,12 +324,13 @@ impl CoreDB {
));
let lock = flock::FileLock::lock("data.bin")
.map_err(|e| format!("Failed to acquire lock on data file with error '{}'", e))?;
let cloned_descriptor = lock.try_clone()?;
if bgsave.is_disabled() {
Ok((db, Some(lock)))
Ok((db, Some(lock), cloned_descriptor))
} else {
// Spawn the BGSAVE service in a separate task
tokio::spawn(diskstore::bgsave_scheduler(db.clone(), bgsave, lock));
Ok((db, None))
Ok((db, None, cloned_descriptor))
}
}
/// Create an empty in-memory table
Expand Down Expand Up @@ -366,7 +367,7 @@ impl CoreDB {
self.shared.table.read()
}
/// Flush the contents of the in-memory table onto disk
pub fn flush_db(&self, file: Option<flock::FileLock>) -> TResult<()> {
pub fn flush_db(&self, file: Option<&mut flock::FileLock>) -> TResult<()> {
let data = match self.acquire_write() {
Some(wlock) => wlock,
None => return Err("Can no longer flush data; coretable is poisoned".into()),
Expand Down
22 changes: 12 additions & 10 deletions server/src/dbnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ use crate::config::PortConfig;
use crate::config::SnapshotConfig;
use crate::config::SslOpts;
use crate::dbnet::tcp::Listener;
use crate::diskstore::snapshot::DIR_REMOTE_SNAPSHOT;
use crate::diskstore::{self, snapshot::DIR_REMOTE_SNAPSHOT};
use diskstore::flock;
mod tcp;
use crate::CoreDB;
use libsky::TResult;
Expand Down Expand Up @@ -315,16 +316,17 @@ pub async fn run(
snapshot_cfg: SnapshotConfig,
sig: impl Future,
restore_filepath: Option<PathBuf>,
) -> CoreDB {
) -> (CoreDB, flock::FileLock) {
let (signal, _) = broadcast::channel(1);
let (terminate_tx, terminate_rx) = mpsc::channel(1);
let (db, lock) = match CoreDB::new(bgsave_cfg, snapshot_cfg, restore_filepath) {
Ok((db, lock)) => (db, lock),
Err(e) => {
log::error!("ERROR: {}", e);
process::exit(0x100);
}
};
let (db, lock, cloned_descriptor) =
match CoreDB::new(bgsave_cfg, snapshot_cfg, restore_filepath) {
Ok((db, lock, cloned_descriptor)) => (db, lock, cloned_descriptor),
Err(e) => {
log::error!("ERROR: {}", e);
process::exit(0x100);
}
};
match fs::create_dir_all(&*DIR_REMOTE_SNAPSHOT) {
Ok(_) => (),
Err(e) => match e.kind() {
Expand Down Expand Up @@ -391,7 +393,7 @@ pub async fn run(
log::error!("Failed to release lock on data file with '{}'", e);
process::exit(0x100);
}
db
(db, cloned_descriptor)
}

/// This is a **test only** function
Expand Down
23 changes: 23 additions & 0 deletions server/src/diskstore/flock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ impl FileLock {
// Now write to the file
self.file.write_all(bytes)
}
pub fn try_clone(&self) -> Result<Self> {
Ok(Self {
file: self.file.try_clone()?,
unlocked: false,
})
}
/// Reacquire a lock
pub fn reacquire(&mut self) -> Result<()> {
Self::_lock(&self.file)?;
self.unlocked = false;
Ok(())
}
}

impl Drop for FileLock {
Expand Down Expand Up @@ -146,6 +158,17 @@ mod tests {
file2.unlock().unwrap();
drop(file2);
}
#[cfg(windows)]
#[test]
fn test_release_one_acquire_second() {
let mut file = FileLock::lock("data5.bin").unwrap();
let mut cloned = file.try_clone().unwrap();
file.write(&[1, 2, 3]).unwrap();
drop(file);
cloned.reacquire().unwrap();
cloned.write(&[4, 5, 6]).unwrap();
cloned.unlock().unwrap();
}
}

#[cfg(windows)]
Expand Down
15 changes: 10 additions & 5 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ fn main() {
.enable_all()
.build()
.unwrap();
let db = runtime.block_on(async {
let (db, mut descriptor) = runtime.block_on(async {
let (tcplistener, bgsave_config, snapshot_config, restore_filepath) =
check_args_and_get_cfg().await;
let db = run(
let (db, descriptor) = run(
tcplistener,
bgsave_config,
snapshot_config,
signal::ctrl_c(),
restore_filepath,
)
.await;
db
(db, descriptor)
});
// Make sure all background workers terminate
drop(runtime);
Expand All @@ -97,14 +97,19 @@ fn main() {
1,
"Maybe the compiler reordered the drop causing more than one instance of CoreDB to live at this point"
);
if let Err(e) = flush_db!(db) {
// Try to acquire lock almost immediately
if let Err(e) = descriptor.reacquire() {
log::error!("Failed to reacquire lock on data file with error: '{}'", e);
panic!("FATAL: data file relocking failure");
}
if let Err(e) = flush_db!(db, descriptor) {
log::error!("Failed to flush data to disk with '{}'", e);
loop {
// Keep looping until we successfully write the in-memory table to disk
log::warn!("Press enter to try again...");
io::stdout().flush().unwrap();
io::stdin().read(&mut [0]).unwrap();
if let Ok(_) = flush_db!(db) {
if let Ok(_) = flush_db!(db, descriptor) {
log::info!("Successfully saved data to disk");
break;
} else {
Expand Down

0 comments on commit ce466eb

Please sign in to comment.