Skip to content

Commit

Permalink
support supplying file system to run ctl command (#231)
Browse files Browse the repository at this point in the history
Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
tabokie committed Jun 28, 2022
1 parent 07dcadb commit 7a436ea
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
//! # Raft Engine Control

use std::path::Path;
use std::sync::Arc;

use clap::{crate_authors, crate_version, Parser};
use raft_engine::env::{DefaultFileSystem, FileSystem};
use raft_engine::internals::LogQueue;
use raft_engine::{Engine, Error, Result as EngineResult};

Expand Down Expand Up @@ -87,14 +89,21 @@ fn convert_queue(queue: &str) -> Option<LogQueue> {
}

impl ControlOpt {
pub fn validate_and_execute(mut self) -> EngineResult<()> {
pub fn validate_and_execute(self) -> EngineResult<()> {
self.validate_and_execute_with_file_system(Arc::new(DefaultFileSystem))
}

pub fn validate_and_execute_with_file_system<F: FileSystem>(
mut self,
fs: Arc<F>,
) -> EngineResult<()> {
if self.cmd.is_none() {
return Err(Error::InvalidArgument("subcommand is needed".to_owned()));
}

match self.cmd.take().unwrap() {
Cmd::Dump { path, raft_groups } => {
let it = Engine::dump(Path::new(&path))?;
let it = Engine::dump_with_file_system(Path::new(&path), fs)?;
for item in it {
if let Ok(v) = item {
if raft_groups.is_empty() || raft_groups.contains(&v.raft_group_id) {
Expand All @@ -112,10 +121,15 @@ impl ControlOpt {
script,
} => {
let script = std::fs::read_to_string(script)?;
Engine::unsafe_repair(Path::new(&path), convert_queue(&queue), script)?;
Engine::unsafe_repair_with_file_system(
Path::new(&path),
convert_queue(&queue),
script,
fs,
)?;
}
Cmd::Check { path } => {
let r = Engine::consistency_check(Path::new(&path))?;
let r = Engine::consistency_check_with_file_system(Path::new(&path), fs)?;
if r.is_empty() {
println!("All data is Ok")
} else {
Expand All @@ -124,10 +138,13 @@ impl ControlOpt {
}
}
Cmd::TryPurge { path } => {
let e = Engine::open(raft_engine::Config {
dir: path,
..Default::default()
})?;
let e = Engine::open_with_file_system(
raft_engine::Config {
dir: path,
..Default::default()
},
fs,
)?;
println!(
"purge_expired_files() returns {:?}",
e.purge_expired_files()?
Expand All @@ -138,10 +155,10 @@ impl ControlOpt {
}
}

pub fn run_command(mut args: Vec<String>) {
pub fn run_command<F: FileSystem>(mut args: Vec<String>, fs: Arc<F>) {
args.insert(0, "ctl".to_owned());
let opts = ControlOpt::parse_from(args);
if let Err(e) = opts.validate_and_execute() {
if let Err(e) = opts.validate_and_execute_with_file_system(fs) {
println!("{:?}", e);
}
}

0 comments on commit 7a436ea

Please sign in to comment.