Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log: Adds a read queue #151

Merged
merged 2 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions fuzz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ pub enum Action<O: Debug> {
Transaction(Vec<O>),
ProcessReindex,
ProcessCommits,
FlushAndEnactLogs,
FlushLog,
EnactLog,
CleanLogs,
Restart,
}
Expand Down Expand Up @@ -143,26 +144,22 @@ pub trait DbSimulator {
&options,
)
},
Action::FlushAndEnactLogs => {
// We repeat flush and then call enact_log to avoid deadlocks due to
// Log::flush_one side effects
for _ in 0..2 {
db = Self::try_or_restart(
|db| db.flush_logs(),
db,
&mut layers,
&old_layers,
&options,
)
}
Action::FlushLog =>
db = Self::try_or_restart(
|db| db.flush_logs(),
db,
&mut layers,
&old_layers,
&options,
),
Action::EnactLog =>
db = Self::try_or_restart(
|db| db.enact_logs(),
db,
&mut layers,
&old_layers,
&options,
)
},
),
Action::CleanLogs =>
db = Self::try_or_restart(
|db| db.clean_logs(),
Expand Down
16 changes: 7 additions & 9 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,14 +679,11 @@ impl DbInner {
}

fn flush_logs(&self, min_log_size: u64) -> Result<bool> {
let (flush_next, read_next, cleanup_next) = self.log.flush_one(min_log_size)?;
if read_next {
let has_flushed = self.log.flush_one(min_log_size)?;
if has_flushed {
self.commit_worker_wait.signal();
}
if cleanup_next {
self.cleanup_worker_wait.signal();
}
Ok(flush_next)
Ok(has_flushed)
}

fn clean_logs(&self) -> Result<bool> {
Expand Down Expand Up @@ -956,7 +953,10 @@ impl Db {
let mut more_work = false;
while !db.shutdown.load(Ordering::SeqCst) || more_work {
if !more_work {
db.commit_worker_wait.wait();
db.cleanup_worker_wait.signal();
if !db.log.has_log_files_to_read() {
db.commit_worker_wait.wait();
}
}

more_work = db.enact_logs(false)?;
Expand Down Expand Up @@ -1461,10 +1461,8 @@ mod tests {
while db.process_reindex().unwrap() {}
}
if *self == EnableCommitPipelineStages::DbFile {
let _ = db.log.flush_one(0).unwrap();
let _ = db.log.flush_one(0).unwrap();
while db.enact_logs(false).unwrap() {}
let _ = db.log.flush_one(0).unwrap();
let _ = db.clean_logs().unwrap();
}
}
Expand Down
150 changes: 47 additions & 103 deletions src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
options::Options,
table::TableId as ValueTableId,
};
use parking_lot::{Condvar, MappedRwLockWriteGuard, Mutex, RwLock, RwLockWriteGuard};
use parking_lot::{RwLock, RwLockWriteGuard};
use std::{
collections::{HashMap, VecDeque},
convert::TryInto,
Expand Down Expand Up @@ -115,7 +115,7 @@ pub struct Cleared {

#[derive(Debug)]
pub struct LogReader<'a> {
file: MappedRwLockWriteGuard<'a, std::io::BufReader<std::fs::File>>,
reading: RwLockWriteGuard<'a, Option<Reading>>,
record_id: u64,
read_bytes: u64,
crc32: crc32fast::Hasher,
Expand All @@ -128,13 +128,10 @@ impl<'a> LogReader<'a> {
self.record_id
}

fn new(
file: MappedRwLockWriteGuard<'a, std::io::BufReader<std::fs::File>>,
validate: bool,
) -> LogReader<'a> {
fn new(reading: RwLockWriteGuard<'a, Option<Reading>>, validate: bool) -> LogReader<'a> {
LogReader {
cleared: Default::default(),
file,
reading,
record_id: 0,
read_bytes: 0,
crc32: crc32fast::Hasher::new(),
Expand All @@ -144,7 +141,12 @@ impl<'a> LogReader<'a> {

pub fn reset(&mut self) -> Result<()> {
self.cleared = Default::default();
try_io!(self.file.seek(std::io::SeekFrom::Current(-(self.read_bytes as i64))));
try_io!(self
.reading
.as_mut()
.unwrap()
.file
.seek(std::io::SeekFrom::Current(-(self.read_bytes as i64))));
self.read_bytes = 0;
self.record_id = 0;
self.crc32 = crc32fast::Hasher::new();
Expand All @@ -153,7 +155,7 @@ impl<'a> LogReader<'a> {

pub fn next(&mut self) -> Result<LogAction> {
let mut read_buf = |size, buf: &mut [u8; 8]| -> Result<()> {
try_io!(self.file.read_exact(&mut buf[0..size]));
try_io!(self.reading.as_mut().unwrap().file.read_exact(&mut buf[0..size]));
self.read_bytes += size as u64;
if self.validate {
self.crc32.update(&buf[0..size]);
Expand Down Expand Up @@ -189,7 +191,7 @@ impl<'a> LogReader<'a> {
Ok(LogAction::InsertValue(InsertValueAction { table, index }))
},
END_RECORD => {
try_io!(self.file.read_exact(&mut buf[0..4]));
try_io!(self.reading.as_mut().unwrap().file.read_exact(&mut buf[0..4]));
self.read_bytes += 4;
if self.validate {
let checksum = u32::from_le_bytes(buf[0..4].try_into().unwrap());
Expand Down Expand Up @@ -218,7 +220,7 @@ impl<'a> LogReader<'a> {
}

pub fn read(&mut self, buf: &mut [u8]) -> Result<()> {
try_io!(self.file.read_exact(buf));
try_io!(self.reading.as_mut().unwrap().file.read_exact(buf));
self.read_bytes += buf.len() as u64;
if self.validate {
self.crc32.update(buf);
Expand Down Expand Up @@ -454,32 +456,18 @@ struct Appending {
size: u64,
}

#[derive(Debug)]
struct Flushing {
id: u32,
file: std::fs::File,
}

#[derive(Debug)]
struct Reading {
id: u32,
file: std::io::BufReader<std::fs::File>,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum ReadingState {
Reading,
Idle,
}

#[derive(Debug)]
pub struct Log {
overlays: RwLock<LogOverlays>,
appending: RwLock<Option<Appending>>,
reading: RwLock<Option<Reading>>,
reading_state: Mutex<ReadingState>,
done_reading_cv: Condvar,
flushing: Mutex<Option<Flushing>>,
read_queue: RwLock<VecDeque<(u32, std::fs::File)>>,
next_record_id: AtomicU64,
dirty: AtomicBool,
log_pool: RwLock<VecDeque<(u32, std::fs::File)>>,
Expand Down Expand Up @@ -524,16 +512,14 @@ impl Log {
overlays: Default::default(),
appending: RwLock::new(None),
reading: RwLock::new(None),
reading_state: Mutex::new(ReadingState::Idle),
done_reading_cv: Condvar::new(),
flushing: Mutex::new(None),
read_queue: RwLock::default(),
next_record_id: AtomicU64::new(1),
next_log_id: AtomicU32::new(next_log_id),
dirty: AtomicBool::new(true),
sync: options.sync_wal,
replay_queue: RwLock::new(logs),
cleanup_queue: RwLock::new(Default::default()),
log_pool: RwLock::new(Default::default()),
cleanup_queue: RwLock::default(),
log_pool: RwLock::default(),
path,
})
}
Expand Down Expand Up @@ -591,7 +577,6 @@ impl Log {
overlays.index.clear();
overlays.value.clear();
overlays.last_record_id.clear();
*self.reading_state.lock() = ReadingState::Idle;
self.dirty.store(false, Ordering::Relaxed);
}

Expand Down Expand Up @@ -677,65 +662,22 @@ impl Log {
overlays.index.retain(|_, overlay| !overlay.map.is_empty());
}

pub fn flush_one(&self, min_size: u64) -> Result<(bool, bool, bool)> {
// Wait for the reader to finish reading
let mut flushing = self.flushing.lock();
let mut read_next = false;
let mut cleanup = false;
if flushing.is_some() {
let mut reading_state = self.reading_state.lock();

while *reading_state == ReadingState::Reading {
log::debug!(target: "parity-db", "Flush: Awaiting log reader");
self.done_reading_cv.wait(&mut reading_state)
}

{
let mut reading = self.reading.write();
if let Some(reading) = reading.take() {
log::debug!(target: "parity-db", "Flush: Activated log cleanup {}", reading.id);
let file = reading.file.into_inner();
self.cleanup_queue.write().push_back((reading.id, file));
*reading_state = ReadingState::Idle;
cleanup = true;
}

if let Some(mut flushing) = flushing.take() {
log::debug!(target: "parity-db", "Flush: Activated log reader {}", flushing.id);
try_io!(flushing.file.seek(std::io::SeekFrom::Start(0)));
*reading = Some(Reading {
id: flushing.id,
file: std::io::BufReader::new(flushing.file),
});
*reading_state = ReadingState::Reading;
read_next = true;
pub fn flush_one(&self, min_size: u64) -> Result<bool> {
// If it exists take the writer and flush it
let cur_size = self.appending.read().as_ref().map_or(0, |r| r.size);
if cur_size > min_size {
if let Some(to_flush) = self.appending.write().take() {
let file = try_io!(to_flush.file.into_inner().map_err(|e| e.into_error()));
if self.sync {
log::debug!(target: "parity-db", "Flush: Flushing log to disk");
try_io!(file.sync_data());
log::debug!(target: "parity-db", "Flush: Flushing log completed");
}
self.read_queue.write().push_back((to_flush.id, file));
}
return Ok(true)
}

{
// Lock writer and reset it
let cur_size = self.appending.read().as_ref().map_or(0, |r| r.size);
if cur_size > 0 && cur_size > min_size {
let mut appending = self.appending.write();
let to_flush = appending.take();
*flushing = to_flush.map(|to_flush| Flushing {
file: to_flush.file.into_inner().unwrap(),
id: to_flush.id,
});
}
}

// Flush to disk
if self.sync {
if let Some(flushing) = flushing.as_ref() {
log::debug!(target: "parity-db", "Flush: Flushing log to disk");
try_io!(flushing.file.sync_data());
log::debug!(target: "parity-db", "Flush: Flushing log completed");
}
}

Ok((flushing.is_some(), read_next, cleanup))
Ok(false)
}

pub fn replay_next(&mut self) -> Result<Option<u32>> {
Expand All @@ -750,10 +692,8 @@ impl Log {
if let Some((id, _record_id, file)) = self.replay_queue.write().pop_front() {
log::debug!(target: "parity-db", "Replay: Activated log reader {}", id);
*reading = Some(Reading { id, file: std::io::BufReader::new(file) });
*self.reading_state.lock() = ReadingState::Reading;
Ok(Some(id))
} else {
*self.reading_state.lock() = ReadingState::Idle;
Ok(None)
}
}
Expand Down Expand Up @@ -785,26 +725,26 @@ impl Log {
}

pub fn read_next(&self, validate: bool) -> Result<Option<LogReader<'_>>> {
let mut reading_state = self.reading_state.lock();
if *reading_state != ReadingState::Reading {
log::trace!(target: "parity-db", "No logs to enact");
return Ok(None)
}

let reading = self.reading.write();
let mut reading = self.reading.write();
if reading.is_none() {
log::trace!(target: "parity-db", "No active reader");
return Ok(None)
if let Some((id, mut file)) = self.read_queue.write().pop_front() {
try_io!(file.seek(std::io::SeekFrom::Start(0)));
*reading = Some(Reading { id, file: std::io::BufReader::new(file) });
} else {
log::trace!(target: "parity-db", "No active reader");
return Ok(None)
}
}
let reading = RwLockWriteGuard::map(reading, |r| &mut r.as_mut().unwrap().file);
let mut reader = LogReader::new(reading, validate);
match reader.next() {
Ok(LogAction::BeginRecord) => Ok(Some(reader)),
Ok(_) => Err(Error::Corruption("Bad log record structure".into())),
Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => {
*reading_state = ReadingState::Idle;
self.done_reading_cv.notify_one();
log::debug!(target: "parity-db", "Read: End of log");
if let Some(reading) = reader.reading.take() {
log::debug!(target: "parity-db", "Read: End of log {}", reading.id);
let file = reading.file.into_inner();
self.cleanup_queue.write().push_back((reading.id, file));
}
Ok(None)
},
Err(e) => Err(e),
Expand All @@ -815,6 +755,10 @@ impl Log {
&self.overlays
}

pub fn has_log_files_to_read(&self) -> bool {
self.read_queue.read().len() > 0
}

pub fn kill_logs(&self) -> Result<()> {
let mut log_pool = self.log_pool.write();
for (id, file) in log_pool.drain(..) {
Expand Down
3 changes: 0 additions & 3 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,9 +1116,6 @@ mod test {
let mut writer = log.begin_record();
f(&mut writer);
let bytes_written = log.end_record(writer.drain()).unwrap();
// Cycle through 2 log files
let _ = log.read_next(false);
log.flush_one(0).unwrap();
let _ = log.read_next(false);
log.flush_one(0).unwrap();
let mut reader = log.read_next(false).unwrap().unwrap();
Expand Down