Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce double write file system #323

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1fa6c25
add double write env
Connor1996 Jul 4, 2023
4d22a39
add test
Connor1996 Jul 6, 2023
6297cff
add failpoint
Connor1996 Jul 6, 2023
8e5edc5
rename
Connor1996 Jul 19, 2023
62a9040
add test
Connor1996 Aug 9, 2023
045ecbd
adjust
Connor1996 Aug 22, 2023
394beed
add recover ext
Connor1996 Aug 24, 2023
adbd517
refactor
Connor1996 Aug 31, 2023
928dd70
make send to two disks atomic
Connor1996 Aug 31, 2023
6754299
make format
Connor1996 Aug 31, 2023
b523bd4
fix warning
Connor1996 Aug 31, 2023
3409669
refine tests
Connor1996 Sep 4, 2023
e1fc66c
wait both for rewrite files
Connor1996 Sep 4, 2023
f250521
allocate sequential number for task
Connor1996 Sep 4, 2023
6b60bf7
rename and fix seqno update
Connor1996 Sep 5, 2023
0bbb0bb
introduce pause mechanism
Connor1996 Sep 6, 2023
026861a
reuse catch up diff
Connor1996 Sep 7, 2023
9efb9a4
non blocking drop
Connor1996 Sep 12, 2023
b8df7eb
change test
Connor1996 Sep 12, 2023
bcd73bf
extract task runner
Connor1996 Sep 12, 2023
d343e6b
reuse get files for snapshot
Connor1996 Sep 12, 2023
fb31a9b
reimpl recover
Connor1996 Sep 12, 2023
5a955f8
rename
Connor1996 Sep 12, 2023
cef9add
fix snapshot
Connor1996 Sep 12, 2023
135ae19
add recover case and fix cancealed rx
Connor1996 Sep 13, 2023
df907f0
handle background error and panic
Connor1996 Sep 14, 2023
ce601a9
remove recover ext
Connor1996 Sep 14, 2023
76b38bb
separate into multiple files
Connor1996 Sep 14, 2023
2518b10
add comment
Connor1996 Sep 18, 2023
56c40dc
add comment
Connor1996 Sep 18, 2023
58132d3
clean
Connor1996 Sep 18, 2023
73375f4
Merge remote-tracking branch 'upstream/master' into double-write
Connor1996 Sep 18, 2023
5c7e7b2
clean
Connor1996 Sep 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ required-features = ["failpoints"]
byteorder = "1.2"
crc32fast = "1.2"
crossbeam = "0.8"
crossbeam-channel = "0.5.8"
either = "1.8.1"
fail = "0.5"
fs2 = "0.4"
futures = "0.3.28"
hashbrown = "0.14"
hex = "0.4"
if_chain = "1.0"
Expand Down Expand Up @@ -70,6 +73,7 @@ rand = "0.8"
rand_distr = "0.4"
tempfile = "3.6"
toml = "0.8"
md-5 = "0.10.5"

[features]
default = ["internals", "scripting"]
Expand Down
5 changes: 4 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub struct Config {
/// Default: None
pub spill_dir: Option<String>,

pub second_dir: Option<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe backup_dir or mirror_dir


/// How to deal with file corruption during recovery.
///
/// Default: "tolerate-tail-corruption".
Expand Down Expand Up @@ -110,7 +112,7 @@ pub struct Config {
/// Whether to prepare log files for recycling when start.
/// If `true`, batch empty log files will be prepared for recycling when
/// starting engine.
/// Only available for `enable-log-reycle` is true.
/// Only available for `enable-log-recycle` is true.
///
/// Default: false
pub prefill_for_recycle: bool,
Expand All @@ -129,6 +131,7 @@ impl Default for Config {
let mut cfg = Config {
dir: "".to_owned(),
spill_dir: None,
second_dir: None,
recovery_mode: RecoveryMode::TolerateTailCorruption,
recovery_read_block_size: ReadableSize::kb(16),
recovery_threads: 4,
Expand Down
300 changes: 282 additions & 18 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ where
Self::open_with(cfg, file_system, vec![])
}

pub fn open_with(
fn open_with(
mut cfg: Config,
file_system: Arc<F>,
mut listeners: Vec<Arc<dyn EventListener>>,
) -> Result<Engine<F, FilePipeLog<F>>> {
cfg.sanitize()?;
file_system.bootstrap()?;
Copy link

@tonyxuqqi tonyxuqqi Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it can fallback to single disk solution dynamically. The file system may need extra APIs to wait for all pending writes done and then engine can switch to different file_system.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in another PR

listeners.push(Arc::new(PurgeHook::default()) as Arc<dyn EventListener>);

let start = Instant::now();
Expand Down Expand Up @@ -626,7 +627,7 @@ where
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::env::{ObfuscatedFileSystem, Permission};
use crate::env::{HedgedFileSystem, ObfuscatedFileSystem, Permission};
use crate::file_pipe_log::{parse_reserved_file_name, FileNameExt};
use crate::log_batch::AtomicGroupBuilder;
use crate::pipe_log::Version;
Expand Down Expand Up @@ -2629,6 +2630,284 @@ pub(crate) mod tests {
assert!(engine.raft_groups().is_empty());
}

fn number_of_files(p: &Path) -> usize {
let mut r = 0;
std::fs::read_dir(p).unwrap().for_each(|e| {
if e.unwrap()
.path()
.file_name()
.unwrap()
.to_str()
.unwrap()
.starts_with("000")
{
r += 1;
}
});
r
}

use md5::{Digest, Md5};
use std::{fs, io};

fn calculate_hash(path: &Path) -> [u8; 16] {
let mut hasher = Md5::new();

std::fs::read_dir(path).unwrap().for_each(|e| {
let p = e.unwrap().path();
let file_name = p.file_name().unwrap().to_str().unwrap();
match FileId::parse_file_name(file_name) {
None => {
if parse_reserved_file_name(file_name).is_none() {
return;
}
}
_ => {}
}
let mut file = fs::File::open(&p).unwrap();
let n = io::copy(&mut file, &mut hasher).unwrap();
});
hasher.finalize().into()
}

use std::io::Write;
#[test]
fn test_start_engine_with_second_disk() {
let dir = tempfile::Builder::new()
.prefix("test_start_engine_with_second_disk_default")
.tempdir()
.unwrap();
let sec_dir = tempfile::Builder::new()
.prefix("test_start_engine_with_second_disk_second")
.tempdir()
.unwrap();

let file_system = Arc::new(HedgedFileSystem::new(
Arc::new(DefaultFileSystem {}),
dir.path().to_path_buf(),
sec_dir.path().to_path_buf(),
));
let entry_data = vec![b'x'; 512];

// Preparations for multi-dirs.
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
enable_log_recycle: false,
target_file_size: ReadableSize(1),
..Default::default()
};

// Step 1: write data into the main directory.
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
for rid in 1..=10 {
engine.append(rid, 1, 10, Some(&entry_data));
}
drop(engine);

// Restart the engine with recycle and prefill. Test reusing files from both
// dirs.
let cfg_2 = Config {
enable_log_recycle: true,
prefill_for_recycle: true,
purge_threshold: ReadableSize(40),
..cfg
};
let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system).unwrap();
assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path()));
for rid in 1..=10 {
assert_eq!(engine.first_index(rid).unwrap(), 1);
engine.clean(rid);
}
engine.purge_manager.must_rewrite_append_queue(None, None);
let file_count = number_of_files(dir.path());
assert_eq!(number_of_files(sec_dir.path()), file_count);
assert!(file_count > engine.file_count(None));
// Append data, recycled files are reused.
for rid in 1..=30 {
engine.append(rid, 20, 30, Some(&entry_data));
}
// No new file is created.
let file_count1 = number_of_files(dir.path());
assert_eq!(file_count, file_count1);
assert_eq!(number_of_files(sec_dir.path()), file_count1);
}

#[test]
fn test_start_engine_with_abnormal_second_disk() {
let dir = tempfile::Builder::new()
.prefix("test_start_engine_with_abnormal_second_disk_default")
.tempdir()
.unwrap();
let sec_dir = tempfile::Builder::new()
.prefix("test_start_engine_with_abnormal_second_disk_second")
.tempdir()
.unwrap();

let file_system = Arc::new(HedgedFileSystem::new(
Arc::new(DefaultFileSystem {}),
dir.path().to_path_buf(),
sec_dir.path().to_path_buf(),
));
let entry_data = vec![b'x'; 512];

// Preparations for multi-dirs.
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
enable_log_recycle: true,
prefill_for_recycle: true,
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(40),
..Default::default()
};

// Step 1: write data into the main directory.
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
for rid in 1..=10 {
engine.append(rid, 1, 10, Some(&entry_data));
}
assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path()));
for rid in 1..=10 {
assert_eq!(engine.first_index(rid).unwrap(), 1);
engine.clean(rid);
}
engine.purge_manager.must_rewrite_append_queue(None, None);
let file_count = number_of_files(dir.path());
assert_eq!(number_of_files(sec_dir.path()), file_count);
assert!(file_count > engine.file_count(None));
// Append data, recycled files are reused.
for rid in 1..=30 {
engine.append(rid, 20, 30, Some(&entry_data));
}
// No new file is created.
let file_count1 = number_of_files(dir.path());
assert_eq!(file_count, file_count1);
assert_eq!(number_of_files(sec_dir.path()), file_count1);
drop(engine);

// abnormal case - Empty second dir
{
std::fs::remove_dir_all(sec_dir.path()).unwrap();
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
// All files in first dir are copied to second dir
assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path()));
assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path()));
}
// abnormal case - Missing some append files in second dir
{
let mut file_count = 0;
for e in std::fs::read_dir(sec_dir.path()).unwrap() {
let p = e.unwrap().path();
let file_name = p.file_name().unwrap().to_str().unwrap();
if let Some(FileId {
queue: LogQueue::Append,
seq: _,
}) = FileId::parse_file_name(file_name)
{
if file_count % 2 == 0 {
std::fs::remove_file(sec_dir.path().join(file_name)).unwrap();
}
file_count += 1;
}
}
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
// Missing append files are copied
assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path()));
assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path()));
}
// abnormal case - Missing some rewrite files in second dir
{
let mut file_count = 0;
for e in std::fs::read_dir(sec_dir.path()).unwrap() {
let p = e.unwrap().path();
let file_name = p.file_name().unwrap().to_str().unwrap();
if let Some(FileId {
queue: LogQueue::Rewrite,
seq: _,
}) = FileId::parse_file_name(file_name)
{
if file_count % 2 == 0 {
std::fs::remove_file(sec_dir.path().join(file_name)).unwrap();
}
file_count += 1;
}
}
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
// Missing rewrite files are copied
assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path()));
assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path()));
}
// abnormal case - Missing some reserve files in second dir
{
let mut file_count = 0;
for e in std::fs::read_dir(sec_dir.path()).unwrap() {
let p = e.unwrap().path();
let file_name = p.file_name().unwrap().to_str().unwrap();
if let None = FileId::parse_file_name(file_name) {
if file_count % 2 == 0 {
std::fs::remove_file(sec_dir.path().join(file_name)).unwrap();
}
file_count += 1;
}
}
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
// Missing reserve files are copied
assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path()));
assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path()));
}
// abnormal case - Have some extra files in second dir
{
let mut file_count = 0;
for e in std::fs::read_dir(sec_dir.path()).unwrap() {
let p = e.unwrap().path();
let file_name = p.file_name().unwrap().to_str().unwrap();
if file_count % 2 == 0 {
std::fs::copy(
sec_dir.path().join(file_name),
sec_dir.path().join(file_name.to_owned() + "tmp"),
)
.unwrap();
}
}
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
// Extra files are untouched.
assert_ne!(number_of_files(sec_dir.path()), number_of_files(dir.path()));
assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path()));
}
// TODO: handle the error
// abnormal case - One file is corrupted
{
for e in std::fs::read_dir(sec_dir.path()).unwrap() {
let p = e.unwrap().path();
let file_name = p.file_name().unwrap().to_str().unwrap();
if file_count % 2 == 0 {
let mut f = std::fs::OpenOptions::new()
.write(true)
.open(sec_dir.path().join(file_name))
.unwrap();
f.write_all(b"corrupted").unwrap();
}
}
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
// Corrupted files are untouched.
assert_ne!(number_of_files(sec_dir.path()), number_of_files(dir.path()));
assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path()));
}
// abnormal case - One file in main dir is corrupted and one file in second dir
// is corrupted
{}
// abnormal case - Missing latest rewrite file in main dir and missing one log
// file in second dir
{}
}

#[test]
fn test_start_engine_with_multi_dirs() {
let dir = tempfile::Builder::new()
Expand All @@ -2639,22 +2918,7 @@ pub(crate) mod tests {
.prefix("test_start_engine_with_multi_dirs_spill")
.tempdir()
.unwrap();
fn number_of_files(p: &Path) -> usize {
let mut r = 0;
std::fs::read_dir(p).unwrap().for_each(|e| {
if e.unwrap()
.path()
.file_name()
.unwrap()
.to_str()
.unwrap()
.starts_with("000")
{
r += 1;
}
});
r
}

let file_system = Arc::new(DeleteMonitoredFileSystem::new());
let entry_data = vec![b'x'; 512];

Expand Down
1 change: 1 addition & 0 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl WriteExt for LogFile {
}
}

#[derive(Clone)]
pub struct DefaultFileSystem;

impl FileSystem for DefaultFileSystem {
Expand Down
Loading
Loading