From 595f3779f8d46e0ad95a07104e0551ac525fcf8b Mon Sep 17 00:00:00 2001 From: Tpt Date: Tue, 11 Oct 2022 17:28:19 +0200 Subject: [PATCH] Log: Adds a read queue Allows to avoid having a global state on Log (Reading vs Idle) and to be able to flush log even if the next file is not fully enacted --- fuzz/src/lib.rs | 27 ++++----- src/db.rs | 16 +++--- src/log.rs | 150 +++++++++++++++--------------------------------- src/table.rs | 3 - 4 files changed, 66 insertions(+), 130 deletions(-) diff --git a/fuzz/src/lib.rs b/fuzz/src/lib.rs index 40fdc602..aa093a28 100644 --- a/fuzz/src/lib.rs +++ b/fuzz/src/lib.rs @@ -36,7 +36,8 @@ pub enum Action { Transaction(Vec), ProcessReindex, ProcessCommits, - FlushAndEnactLogs, + FlushLog, + EnactLog, CleanLogs, Restart, } @@ -143,26 +144,22 @@ pub trait DbSimulator { &options, ) }, - Action::FlushAndEnactLogs => { - // We repeat flush and then call enact_log to avoid deadlocks due to - // Log::flush_one side effects - for _ in 0..2 { - db = Self::try_or_restart( - |db| db.flush_logs(), - db, - &mut layers, - &old_layers, - &options, - ) - } + Action::FlushLog => + db = Self::try_or_restart( + |db| db.flush_logs(), + db, + &mut layers, + &old_layers, + &options, + ), + Action::EnactLog => db = Self::try_or_restart( |db| db.enact_logs(), db, &mut layers, &old_layers, &options, - ) - }, + ), Action::CleanLogs => db = Self::try_or_restart( |db| db.clean_logs(), diff --git a/src/db.rs b/src/db.rs index f7da2c3c..f962256f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -679,14 +679,11 @@ impl DbInner { } fn flush_logs(&self, min_log_size: u64) -> Result { - let (flush_next, read_next, cleanup_next) = self.log.flush_one(min_log_size)?; - if read_next { + let has_flushed = self.log.flush_one(min_log_size)?; + if has_flushed { self.commit_worker_wait.signal(); } - if cleanup_next { - self.cleanup_worker_wait.signal(); - } - Ok(flush_next) + Ok(has_flushed) } fn clean_logs(&self) -> Result { @@ -956,7 +953,10 @@ impl Db { let mut more_work = false; while !db.shutdown.load(Ordering::SeqCst) || more_work { if !more_work { - db.commit_worker_wait.wait(); + db.cleanup_worker_wait.signal(); + if !db.log.has_still_log_files_to_read() { + db.commit_worker_wait.wait(); + } } more_work = db.enact_logs(false)?; @@ -1461,10 +1461,8 @@ mod tests { while db.process_reindex().unwrap() {} } if *self == EnableCommitPipelineStages::DbFile { - let _ = db.log.flush_one(0).unwrap(); let _ = db.log.flush_one(0).unwrap(); while db.enact_logs(false).unwrap() {} - let _ = db.log.flush_one(0).unwrap(); let _ = db.clean_logs().unwrap(); } } diff --git a/src/log.rs b/src/log.rs index 7433d214..94b26a84 100644 --- a/src/log.rs +++ b/src/log.rs @@ -8,7 +8,7 @@ use crate::{ options::Options, table::TableId as ValueTableId, }; -use parking_lot::{Condvar, MappedRwLockWriteGuard, Mutex, RwLock, RwLockWriteGuard}; +use parking_lot::{RwLock, RwLockWriteGuard}; use std::{ collections::{HashMap, VecDeque}, convert::TryInto, @@ -115,7 +115,7 @@ pub struct Cleared { #[derive(Debug)] pub struct LogReader<'a> { - file: MappedRwLockWriteGuard<'a, std::io::BufReader>, + reading: RwLockWriteGuard<'a, Option>, record_id: u64, read_bytes: u64, crc32: crc32fast::Hasher, @@ -128,13 +128,10 @@ impl<'a> LogReader<'a> { self.record_id } - fn new( - file: MappedRwLockWriteGuard<'a, std::io::BufReader>, - validate: bool, - ) -> LogReader<'a> { + fn new(reading: RwLockWriteGuard<'a, Option>, validate: bool) -> LogReader<'a> { LogReader { cleared: Default::default(), - file, + reading, record_id: 0, read_bytes: 0, crc32: crc32fast::Hasher::new(), @@ -144,7 +141,12 @@ impl<'a> LogReader<'a> { pub fn reset(&mut self) -> Result<()> { self.cleared = Default::default(); - try_io!(self.file.seek(std::io::SeekFrom::Current(-(self.read_bytes as i64)))); + try_io!(self + .reading + .as_mut() + .unwrap() + .file + .seek(std::io::SeekFrom::Current(-(self.read_bytes as i64)))); self.read_bytes = 0; self.record_id = 0; self.crc32 = crc32fast::Hasher::new(); @@ -153,7 +155,7 @@ impl<'a> LogReader<'a> { pub fn next(&mut self) -> Result { let mut read_buf = |size, buf: &mut [u8; 8]| -> Result<()> { - try_io!(self.file.read_exact(&mut buf[0..size])); + try_io!(self.reading.as_mut().unwrap().file.read_exact(&mut buf[0..size])); self.read_bytes += size as u64; if self.validate { self.crc32.update(&buf[0..size]); @@ -189,7 +191,7 @@ impl<'a> LogReader<'a> { Ok(LogAction::InsertValue(InsertValueAction { table, index })) }, END_RECORD => { - try_io!(self.file.read_exact(&mut buf[0..4])); + try_io!(self.reading.as_mut().unwrap().file.read_exact(&mut buf[0..4])); self.read_bytes += 4; if self.validate { let checksum = u32::from_le_bytes(buf[0..4].try_into().unwrap()); @@ -218,7 +220,7 @@ impl<'a> LogReader<'a> { } pub fn read(&mut self, buf: &mut [u8]) -> Result<()> { - try_io!(self.file.read_exact(buf)); + try_io!(self.reading.as_mut().unwrap().file.read_exact(buf)); self.read_bytes += buf.len() as u64; if self.validate { self.crc32.update(buf); @@ -454,32 +456,18 @@ struct Appending { size: u64, } -#[derive(Debug)] -struct Flushing { - id: u32, - file: std::fs::File, -} - #[derive(Debug)] struct Reading { id: u32, file: std::io::BufReader, } -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -enum ReadingState { - Reading, - Idle, -} - #[derive(Debug)] pub struct Log { overlays: RwLock, appending: RwLock>, reading: RwLock>, - reading_state: Mutex, - done_reading_cv: Condvar, - flushing: Mutex>, + read_queue: RwLock>, next_record_id: AtomicU64, dirty: AtomicBool, log_pool: RwLock>, @@ -524,16 +512,14 @@ impl Log { overlays: Default::default(), appending: RwLock::new(None), reading: RwLock::new(None), - reading_state: Mutex::new(ReadingState::Idle), - done_reading_cv: Condvar::new(), - flushing: Mutex::new(None), + read_queue: RwLock::default(), next_record_id: AtomicU64::new(1), next_log_id: AtomicU32::new(next_log_id), dirty: AtomicBool::new(true), sync: options.sync_wal, replay_queue: RwLock::new(logs), - cleanup_queue: RwLock::new(Default::default()), - log_pool: RwLock::new(Default::default()), + cleanup_queue: RwLock::default(), + log_pool: RwLock::default(), path, }) } @@ -591,7 +577,6 @@ impl Log { overlays.index.clear(); overlays.value.clear(); overlays.last_record_id.clear(); - *self.reading_state.lock() = ReadingState::Idle; self.dirty.store(false, Ordering::Relaxed); } @@ -677,65 +662,22 @@ impl Log { overlays.index.retain(|_, overlay| !overlay.map.is_empty()); } - pub fn flush_one(&self, min_size: u64) -> Result<(bool, bool, bool)> { - // Wait for the reader to finish reading - let mut flushing = self.flushing.lock(); - let mut read_next = false; - let mut cleanup = false; - if flushing.is_some() { - let mut reading_state = self.reading_state.lock(); - - while *reading_state == ReadingState::Reading { - log::debug!(target: "parity-db", "Flush: Awaiting log reader"); - self.done_reading_cv.wait(&mut reading_state) - } - - { - let mut reading = self.reading.write(); - if let Some(reading) = reading.take() { - log::debug!(target: "parity-db", "Flush: Activated log cleanup {}", reading.id); - let file = reading.file.into_inner(); - self.cleanup_queue.write().push_back((reading.id, file)); - *reading_state = ReadingState::Idle; - cleanup = true; - } - - if let Some(mut flushing) = flushing.take() { - log::debug!(target: "parity-db", "Flush: Activated log reader {}", flushing.id); - try_io!(flushing.file.seek(std::io::SeekFrom::Start(0))); - *reading = Some(Reading { - id: flushing.id, - file: std::io::BufReader::new(flushing.file), - }); - *reading_state = ReadingState::Reading; - read_next = true; + pub fn flush_one(&self, min_size: u64) -> Result { + // If it exists take the writer and flush it + let cur_size = self.appending.read().as_ref().map_or(0, |r| r.size); + if cur_size > min_size { + if let Some(to_flush) = self.appending.write().take() { + let file = try_io!(to_flush.file.into_inner().map_err(|e| e.into_error())); + if self.sync { + log::debug!(target: "parity-db", "Flush: Flushing log to disk"); + try_io!(file.sync_data()); + log::debug!(target: "parity-db", "Flush: Flushing log completed"); } + self.read_queue.write().push_back((to_flush.id, file)); } + return Ok(true) } - - { - // Lock writer and reset it - let cur_size = self.appending.read().as_ref().map_or(0, |r| r.size); - if cur_size > 0 && cur_size > min_size { - let mut appending = self.appending.write(); - let to_flush = appending.take(); - *flushing = to_flush.map(|to_flush| Flushing { - file: to_flush.file.into_inner().unwrap(), - id: to_flush.id, - }); - } - } - - // Flush to disk - if self.sync { - if let Some(flushing) = flushing.as_ref() { - log::debug!(target: "parity-db", "Flush: Flushing log to disk"); - try_io!(flushing.file.sync_data()); - log::debug!(target: "parity-db", "Flush: Flushing log completed"); - } - } - - Ok((flushing.is_some(), read_next, cleanup)) + Ok(false) } pub fn replay_next(&mut self) -> Result> { @@ -750,10 +692,8 @@ impl Log { if let Some((id, _record_id, file)) = self.replay_queue.write().pop_front() { log::debug!(target: "parity-db", "Replay: Activated log reader {}", id); *reading = Some(Reading { id, file: std::io::BufReader::new(file) }); - *self.reading_state.lock() = ReadingState::Reading; Ok(Some(id)) } else { - *self.reading_state.lock() = ReadingState::Idle; Ok(None) } } @@ -785,26 +725,26 @@ impl Log { } pub fn read_next(&self, validate: bool) -> Result>> { - let mut reading_state = self.reading_state.lock(); - if *reading_state != ReadingState::Reading { - log::trace!(target: "parity-db", "No logs to enact"); - return Ok(None) - } - - let reading = self.reading.write(); + let mut reading = self.reading.write(); if reading.is_none() { - log::trace!(target: "parity-db", "No active reader"); - return Ok(None) + if let Some((id, mut file)) = self.read_queue.write().pop_front() { + try_io!(file.seek(std::io::SeekFrom::Start(0))); + *reading = Some(Reading { id, file: std::io::BufReader::new(file) }); + } else { + log::trace!(target: "parity-db", "No active reader"); + return Ok(None) + } } - let reading = RwLockWriteGuard::map(reading, |r| &mut r.as_mut().unwrap().file); let mut reader = LogReader::new(reading, validate); match reader.next() { Ok(LogAction::BeginRecord) => Ok(Some(reader)), Ok(_) => Err(Error::Corruption("Bad log record structure".into())), Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => { - *reading_state = ReadingState::Idle; - self.done_reading_cv.notify_one(); - log::debug!(target: "parity-db", "Read: End of log"); + if let Some(reading) = reader.reading.take() { + log::debug!(target: "parity-db", "Read: End of log {}", reading.id); + let file = reading.file.into_inner(); + self.cleanup_queue.write().push_back((reading.id, file)); + } Ok(None) }, Err(e) => Err(e), @@ -815,6 +755,10 @@ impl Log { &self.overlays } + pub fn has_still_log_files_to_read(&self) -> bool { + self.read_queue.read().len() > 0 + } + pub fn kill_logs(&self) -> Result<()> { let mut log_pool = self.log_pool.write(); for (id, file) in log_pool.drain(..) { diff --git a/src/table.rs b/src/table.rs index 9ca46b8d..f14521f2 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1116,9 +1116,6 @@ mod test { let mut writer = log.begin_record(); f(&mut writer); let bytes_written = log.end_record(writer.drain()).unwrap(); - // Cycle through 2 log files - let _ = log.read_next(false); - log.flush_one(0).unwrap(); let _ = log.read_next(false); log.flush_one(0).unwrap(); let mut reader = log.read_next(false).unwrap().unwrap();