Skip to content

Commit

Permalink
Log: Adds a read queue
Browse files Browse the repository at this point in the history
Allows to avoid having a global state on Log (Reading vs Idle) and to be able to flush log even if the next file is not fully enacted
  • Loading branch information
Tpt committed Oct 20, 2022
1 parent f418284 commit 595f377
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 130 deletions.
27 changes: 12 additions & 15 deletions fuzz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ pub enum Action<O: Debug> {
Transaction(Vec<O>),
ProcessReindex,
ProcessCommits,
FlushAndEnactLogs,
FlushLog,
EnactLog,
CleanLogs,
Restart,
}
Expand Down Expand Up @@ -143,26 +144,22 @@ pub trait DbSimulator {
&options,
)
},
Action::FlushAndEnactLogs => {
// We repeat flush and then call enact_log to avoid deadlocks due to
// Log::flush_one side effects
for _ in 0..2 {
db = Self::try_or_restart(
|db| db.flush_logs(),
db,
&mut layers,
&old_layers,
&options,
)
}
Action::FlushLog =>
db = Self::try_or_restart(
|db| db.flush_logs(),
db,
&mut layers,
&old_layers,
&options,
),
Action::EnactLog =>
db = Self::try_or_restart(
|db| db.enact_logs(),
db,
&mut layers,
&old_layers,
&options,
)
},
),
Action::CleanLogs =>
db = Self::try_or_restart(
|db| db.clean_logs(),
Expand Down
16 changes: 7 additions & 9 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,14 +679,11 @@ impl DbInner {
}

fn flush_logs(&self, min_log_size: u64) -> Result<bool> {
let (flush_next, read_next, cleanup_next) = self.log.flush_one(min_log_size)?;
if read_next {
let has_flushed = self.log.flush_one(min_log_size)?;
if has_flushed {
self.commit_worker_wait.signal();
}
if cleanup_next {
self.cleanup_worker_wait.signal();
}
Ok(flush_next)
Ok(has_flushed)
}

fn clean_logs(&self) -> Result<bool> {
Expand Down Expand Up @@ -956,7 +953,10 @@ impl Db {
let mut more_work = false;
while !db.shutdown.load(Ordering::SeqCst) || more_work {
if !more_work {
db.commit_worker_wait.wait();
db.cleanup_worker_wait.signal();
if !db.log.has_still_log_files_to_read() {
db.commit_worker_wait.wait();
}
}

more_work = db.enact_logs(false)?;
Expand Down Expand Up @@ -1461,10 +1461,8 @@ mod tests {
while db.process_reindex().unwrap() {}
}
if *self == EnableCommitPipelineStages::DbFile {
let _ = db.log.flush_one(0).unwrap();
let _ = db.log.flush_one(0).unwrap();
while db.enact_logs(false).unwrap() {}
let _ = db.log.flush_one(0).unwrap();
let _ = db.clean_logs().unwrap();
}
}
Expand Down
150 changes: 47 additions & 103 deletions src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
options::Options,
table::TableId as ValueTableId,
};
use parking_lot::{Condvar, MappedRwLockWriteGuard, Mutex, RwLock, RwLockWriteGuard};
use parking_lot::{RwLock, RwLockWriteGuard};
use std::{
collections::{HashMap, VecDeque},
convert::TryInto,
Expand Down Expand Up @@ -115,7 +115,7 @@ pub struct Cleared {

#[derive(Debug)]
pub struct LogReader<'a> {
file: MappedRwLockWriteGuard<'a, std::io::BufReader<std::fs::File>>,
reading: RwLockWriteGuard<'a, Option<Reading>>,
record_id: u64,
read_bytes: u64,
crc32: crc32fast::Hasher,
Expand All @@ -128,13 +128,10 @@ impl<'a> LogReader<'a> {
self.record_id
}

fn new(
file: MappedRwLockWriteGuard<'a, std::io::BufReader<std::fs::File>>,
validate: bool,
) -> LogReader<'a> {
fn new(reading: RwLockWriteGuard<'a, Option<Reading>>, validate: bool) -> LogReader<'a> {
LogReader {
cleared: Default::default(),
file,
reading,
record_id: 0,
read_bytes: 0,
crc32: crc32fast::Hasher::new(),
Expand All @@ -144,7 +141,12 @@ impl<'a> LogReader<'a> {

pub fn reset(&mut self) -> Result<()> {
self.cleared = Default::default();
try_io!(self.file.seek(std::io::SeekFrom::Current(-(self.read_bytes as i64))));
try_io!(self
.reading
.as_mut()
.unwrap()
.file
.seek(std::io::SeekFrom::Current(-(self.read_bytes as i64))));
self.read_bytes = 0;
self.record_id = 0;
self.crc32 = crc32fast::Hasher::new();
Expand All @@ -153,7 +155,7 @@ impl<'a> LogReader<'a> {

pub fn next(&mut self) -> Result<LogAction> {
let mut read_buf = |size, buf: &mut [u8; 8]| -> Result<()> {
try_io!(self.file.read_exact(&mut buf[0..size]));
try_io!(self.reading.as_mut().unwrap().file.read_exact(&mut buf[0..size]));
self.read_bytes += size as u64;
if self.validate {
self.crc32.update(&buf[0..size]);
Expand Down Expand Up @@ -189,7 +191,7 @@ impl<'a> LogReader<'a> {
Ok(LogAction::InsertValue(InsertValueAction { table, index }))
},
END_RECORD => {
try_io!(self.file.read_exact(&mut buf[0..4]));
try_io!(self.reading.as_mut().unwrap().file.read_exact(&mut buf[0..4]));
self.read_bytes += 4;
if self.validate {
let checksum = u32::from_le_bytes(buf[0..4].try_into().unwrap());
Expand Down Expand Up @@ -218,7 +220,7 @@ impl<'a> LogReader<'a> {
}

pub fn read(&mut self, buf: &mut [u8]) -> Result<()> {
try_io!(self.file.read_exact(buf));
try_io!(self.reading.as_mut().unwrap().file.read_exact(buf));
self.read_bytes += buf.len() as u64;
if self.validate {
self.crc32.update(buf);
Expand Down Expand Up @@ -454,32 +456,18 @@ struct Appending {
size: u64,
}

#[derive(Debug)]
struct Flushing {
id: u32,
file: std::fs::File,
}

#[derive(Debug)]
struct Reading {
id: u32,
file: std::io::BufReader<std::fs::File>,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum ReadingState {
Reading,
Idle,
}

#[derive(Debug)]
pub struct Log {
overlays: RwLock<LogOverlays>,
appending: RwLock<Option<Appending>>,
reading: RwLock<Option<Reading>>,
reading_state: Mutex<ReadingState>,
done_reading_cv: Condvar,
flushing: Mutex<Option<Flushing>>,
read_queue: RwLock<VecDeque<(u32, std::fs::File)>>,
next_record_id: AtomicU64,
dirty: AtomicBool,
log_pool: RwLock<VecDeque<(u32, std::fs::File)>>,
Expand Down Expand Up @@ -524,16 +512,14 @@ impl Log {
overlays: Default::default(),
appending: RwLock::new(None),
reading: RwLock::new(None),
reading_state: Mutex::new(ReadingState::Idle),
done_reading_cv: Condvar::new(),
flushing: Mutex::new(None),
read_queue: RwLock::default(),
next_record_id: AtomicU64::new(1),
next_log_id: AtomicU32::new(next_log_id),
dirty: AtomicBool::new(true),
sync: options.sync_wal,
replay_queue: RwLock::new(logs),
cleanup_queue: RwLock::new(Default::default()),
log_pool: RwLock::new(Default::default()),
cleanup_queue: RwLock::default(),
log_pool: RwLock::default(),
path,
})
}
Expand Down Expand Up @@ -591,7 +577,6 @@ impl Log {
overlays.index.clear();
overlays.value.clear();
overlays.last_record_id.clear();
*self.reading_state.lock() = ReadingState::Idle;
self.dirty.store(false, Ordering::Relaxed);
}

Expand Down Expand Up @@ -677,65 +662,22 @@ impl Log {
overlays.index.retain(|_, overlay| !overlay.map.is_empty());
}

pub fn flush_one(&self, min_size: u64) -> Result<(bool, bool, bool)> {
// Wait for the reader to finish reading
let mut flushing = self.flushing.lock();
let mut read_next = false;
let mut cleanup = false;
if flushing.is_some() {
let mut reading_state = self.reading_state.lock();

while *reading_state == ReadingState::Reading {
log::debug!(target: "parity-db", "Flush: Awaiting log reader");
self.done_reading_cv.wait(&mut reading_state)
}

{
let mut reading = self.reading.write();
if let Some(reading) = reading.take() {
log::debug!(target: "parity-db", "Flush: Activated log cleanup {}", reading.id);
let file = reading.file.into_inner();
self.cleanup_queue.write().push_back((reading.id, file));
*reading_state = ReadingState::Idle;
cleanup = true;
}

if let Some(mut flushing) = flushing.take() {
log::debug!(target: "parity-db", "Flush: Activated log reader {}", flushing.id);
try_io!(flushing.file.seek(std::io::SeekFrom::Start(0)));
*reading = Some(Reading {
id: flushing.id,
file: std::io::BufReader::new(flushing.file),
});
*reading_state = ReadingState::Reading;
read_next = true;
pub fn flush_one(&self, min_size: u64) -> Result<bool> {
// If it exists take the writer and flush it
let cur_size = self.appending.read().as_ref().map_or(0, |r| r.size);
if cur_size > min_size {
if let Some(to_flush) = self.appending.write().take() {
let file = try_io!(to_flush.file.into_inner().map_err(|e| e.into_error()));
if self.sync {
log::debug!(target: "parity-db", "Flush: Flushing log to disk");
try_io!(file.sync_data());
log::debug!(target: "parity-db", "Flush: Flushing log completed");
}
self.read_queue.write().push_back((to_flush.id, file));
}
return Ok(true)
}

{
// Lock writer and reset it
let cur_size = self.appending.read().as_ref().map_or(0, |r| r.size);
if cur_size > 0 && cur_size > min_size {
let mut appending = self.appending.write();
let to_flush = appending.take();
*flushing = to_flush.map(|to_flush| Flushing {
file: to_flush.file.into_inner().unwrap(),
id: to_flush.id,
});
}
}

// Flush to disk
if self.sync {
if let Some(flushing) = flushing.as_ref() {
log::debug!(target: "parity-db", "Flush: Flushing log to disk");
try_io!(flushing.file.sync_data());
log::debug!(target: "parity-db", "Flush: Flushing log completed");
}
}

Ok((flushing.is_some(), read_next, cleanup))
Ok(false)
}

pub fn replay_next(&mut self) -> Result<Option<u32>> {
Expand All @@ -750,10 +692,8 @@ impl Log {
if let Some((id, _record_id, file)) = self.replay_queue.write().pop_front() {
log::debug!(target: "parity-db", "Replay: Activated log reader {}", id);
*reading = Some(Reading { id, file: std::io::BufReader::new(file) });
*self.reading_state.lock() = ReadingState::Reading;
Ok(Some(id))
} else {
*self.reading_state.lock() = ReadingState::Idle;
Ok(None)
}
}
Expand Down Expand Up @@ -785,26 +725,26 @@ impl Log {
}

pub fn read_next(&self, validate: bool) -> Result<Option<LogReader<'_>>> {
let mut reading_state = self.reading_state.lock();
if *reading_state != ReadingState::Reading {
log::trace!(target: "parity-db", "No logs to enact");
return Ok(None)
}

let reading = self.reading.write();
let mut reading = self.reading.write();
if reading.is_none() {
log::trace!(target: "parity-db", "No active reader");
return Ok(None)
if let Some((id, mut file)) = self.read_queue.write().pop_front() {
try_io!(file.seek(std::io::SeekFrom::Start(0)));
*reading = Some(Reading { id, file: std::io::BufReader::new(file) });
} else {
log::trace!(target: "parity-db", "No active reader");
return Ok(None)
}
}
let reading = RwLockWriteGuard::map(reading, |r| &mut r.as_mut().unwrap().file);
let mut reader = LogReader::new(reading, validate);
match reader.next() {
Ok(LogAction::BeginRecord) => Ok(Some(reader)),
Ok(_) => Err(Error::Corruption("Bad log record structure".into())),
Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
*reading_state = ReadingState::Idle;
self.done_reading_cv.notify_one();
log::debug!(target: "parity-db", "Read: End of log");
if let Some(reading) = reader.reading.take() {
log::debug!(target: "parity-db", "Read: End of log {}", reading.id);
let file = reading.file.into_inner();
self.cleanup_queue.write().push_back((reading.id, file));
}
Ok(None)
},
Err(e) => Err(e),
Expand All @@ -815,6 +755,10 @@ impl Log {
&self.overlays
}

pub fn has_still_log_files_to_read(&self) -> bool {
self.read_queue.read().len() > 0
}

pub fn kill_logs(&self) -> Result<()> {
let mut log_pool = self.log_pool.write();
for (id, file) in log_pool.drain(..) {
Expand Down
3 changes: 0 additions & 3 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,9 +1116,6 @@ mod test {
let mut writer = log.begin_record();
f(&mut writer);
let bytes_written = log.end_record(writer.drain()).unwrap();
// Cycle through 2 log files
let _ = log.read_next(false);
log.flush_one(0).unwrap();
let _ = log.read_next(false);
log.flush_one(0).unwrap();
let mut reader = log.read_next(false).unwrap().unwrap();
Expand Down

0 comments on commit 595f377

Please sign in to comment.