diff --git a/Cargo.lock b/Cargo.lock index e158b0e2a..bba6a3610 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -517,7 +517,7 @@ dependencies = [ "anyhow", "chrono", "percent-encoding", - "progenitor", + "progenitor 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", "reqwest", "schemars", "serde", @@ -574,7 +574,7 @@ version = "0.0.1" dependencies = [ "anyhow", "percent-encoding", - "progenitor", + "progenitor 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", "reqwest", "schemars", "serde", @@ -596,6 +596,10 @@ dependencies = [ "futures", "futures-core", "hex", + "http", + "hyper", + "hyper-staticfile", + "mime_guess", "omicron-common", "opentelemetry 0.17.0", "opentelemetry-jaeger", @@ -603,10 +607,13 @@ dependencies = [ "oximeter-producer", "rand 0.8.5", "rand_chacha 0.3.1", + "repair-client", + "reqwest", "ringbuffer", "rusqlite", "schemars", "serde", + "serde_json", "sha2", "slog", "slog-async", @@ -1305,6 +1312,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21dec9db110f5f872ed9699c3ecf50cf16f423502706ba5c72462e28d3157573" + [[package]] name = "httparse" version = "1.6.0" @@ -1354,6 +1367,25 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-staticfile" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d5ff45ea721e295c400e4c65b1c855d43d329b8ae8ec12520ab1860a240debf" +dependencies = [ + "futures-util", + "http", + "http-range", + "httpdate", + "hyper", + "mime_guess", + "percent-encoding", + "rand 0.8.5", + "tokio", + "url", + "winapi", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1576,6 +1608,16 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.4.4" @@ -1654,11 +1696,11 @@ dependencies = [ [[package]] name = "nexus-client" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/omicron?branch=main#6a98020b9de0f0ae42fc2ac593e1576b7b7c21e8" +source = "git+https://github.com/oxidecomputer/omicron?branch=main#8a26d6992dafea6f5052083b2362d13b8e94dfa6" dependencies = [ "chrono", "omicron-common", - "progenitor", + "progenitor 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", "reqwest", "serde", "serde_json", @@ -1826,7 +1868,7 @@ dependencies = [ "ipnetwork", "macaddr", "parse-display", - "progenitor", + "progenitor 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", "rand 0.8.5", "reqwest", "ring", @@ -2326,6 +2368,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "progenitor" +version = "0.0.0" +source = "git+https://github.com/oxidecomputer/progenitor?branch=main#75348207d7405db3371a98b5348bef3fbd0ef79e" +dependencies = [ + "anyhow", + "getopts", + "openapiv3", + "progenitor-client 0.0.0 (git+https://github.com/oxidecomputer/progenitor?branch=main)", + "progenitor-impl 0.0.0 (git+https://github.com/oxidecomputer/progenitor?branch=main)", + "progenitor-macro 0.0.0 (git+https://github.com/oxidecomputer/progenitor?branch=main)", + "serde", + "serde_json", +] + [[package]] name = "progenitor" version = "0.0.0" @@ -2334,9 +2391,22 @@ dependencies = [ "anyhow", "getopts", "openapiv3", - "progenitor-client", - "progenitor-impl", - "progenitor-macro", + "progenitor-client 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", + "progenitor-impl 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", + "progenitor-macro 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", + "serde", + "serde_json", +] + +[[package]] +name = "progenitor-client" +version = "0.0.0" +source = "git+https://github.com/oxidecomputer/progenitor?branch=main#75348207d7405db3371a98b5348bef3fbd0ef79e" +dependencies = [ + "bytes", + "futures-core", + "percent-encoding", + "reqwest", "serde", "serde_json", ] @@ -2354,6 +2424,28 @@ dependencies = [ "serde_json", ] +[[package]] +name = "progenitor-impl" +version = "0.0.0" +source = "git+https://github.com/oxidecomputer/progenitor?branch=main#75348207d7405db3371a98b5348bef3fbd0ef79e" +dependencies = [ + "convert_case", + "getopts", + "indexmap", + "openapiv3", + "proc-macro2", + "quote", + "regex", + "rustfmt-wrapper", + "schemars", + "serde", + "serde_json", + "syn", + "thiserror", + "typify", + "unicode-xid", +] + [[package]] name = "progenitor-impl" version = "0.0.0" @@ -2376,6 +2468,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "progenitor-macro" +version = "0.0.0" +source = "git+https://github.com/oxidecomputer/progenitor?branch=main#75348207d7405db3371a98b5348bef3fbd0ef79e" +dependencies = [ + "openapiv3", + "proc-macro2", + "progenitor-impl 0.0.0 (git+https://github.com/oxidecomputer/progenitor?branch=main)", + "quote", + "serde", + "serde_json", + "serde_tokenstream", + "syn", +] + [[package]] name = "progenitor-macro" version = "0.0.0" @@ -2383,7 +2490,7 @@ source = "git+https://github.com/oxidecomputer/progenitor#b7bbb5cdffdfc4ddf5bc7c dependencies = [ "openapiv3", "proc-macro2", - "progenitor-impl", + "progenitor-impl 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", "quote", "serde", "serde_json", @@ -2625,6 +2732,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "repair-client" +version = "0.0.1" +dependencies = [ + "anyhow", + "chrono", + "percent-encoding", + "progenitor 0.0.0 (git+https://github.com/oxidecomputer/progenitor?branch=main)", + "reqwest", + "schemars", + "serde", + "serde_json", +] + [[package]] name = "reqwest" version = "0.11.10" @@ -3774,6 +3895,15 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c" +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.7" diff --git a/Cargo.toml b/Cargo.toml index a2eadc8aa..97cd1ef63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "nbd_server", "package", "protocol", + "repair-client", "scope", "smf", "upstairs", diff --git a/downstairs/Cargo.toml b/downstairs/Cargo.toml index 7c9f4e3cf..f259a0462 100644 --- a/downstairs/Cargo.toml +++ b/downstairs/Cargo.toml @@ -14,18 +14,25 @@ crucible = { path = "../upstairs" } crucible-common = { path = "../common" } crucible-protocol = { path = "../protocol" } dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main" } -slog = { version = "2.7" } -slog-term = { version = "2.9" } -slog-async = { version = "2.7" } -schemars = { version = "0.8", features = [ "chrono", "uuid" ] } futures = "0.3" futures-core = "0.3" +http = "0.2.6" +hyper = { version = "0.14", features = [ "full" ] } +hyper-staticfile = "0.8" +mime_guess = "2.0.3" omicron-common = { git = "https://github.com/oxidecomputer/omicron", branch = "main" } oximeter-producer = { git = "https://github.com/oxidecomputer/omicron", branch = "main" } oximeter = { git = "https://github.com/oxidecomputer/omicron", branch = "main" } rand = "0.8.5" +repair-client = { path = "../repair-client" } +reqwest = { version = "0.11", features = ["json"] } ringbuffer = "0.8" +schemars = { version = "0.8.8", features = [ "uuid" ] } serde = { version = "1", features = ["derive"] } +serde_json = "1.0.79" +slog = { version = "2.7" } +slog-term = { version = "2.9" } +slog-async = { version = "2.7" } structopt = "0.3" tokio = { version = "1.17.0", features = ["full"] } tokio-util = { version = "0.7", features = ["codec"]} @@ -42,8 +49,8 @@ hex = "0.4" sha2 = "0.10" [dev-dependencies] -tempfile = "3" rand_chacha = "0.3.1" +tempfile = "3" [features] default = [] diff --git a/downstairs/src/main.rs b/downstairs/src/main.rs index 8fabf277d..4b6f72e61 100644 --- a/downstairs/src/main.rs +++ b/downstairs/src/main.rs @@ -19,6 +19,7 @@ use anyhow::{bail, Result}; use bytes::BytesMut; use futures::{SinkExt, StreamExt}; use rand::prelude::*; +use slog::Drain; use structopt::StructOpt; use tokio::net::TcpListener; use tokio::sync::mpsc; @@ -29,11 +30,10 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use uuid::Uuid; -use slog::Drain; - mod admin; mod dump; mod region; +mod repair; mod stats; use admin::run_dropshot; @@ -134,17 +134,16 @@ enum Args { skip: u64, }, Run { - #[structopt(short, long, default_value = "0.0.0.0")] + /// Address the downstairs will listen for the upstairs on. + #[structopt(short, long, default_value = "0.0.0.0", name = "ADDRESS")] address: IpAddr, + /// Directory where the region is located. #[structopt(short, long, parse(from_os_str), name = "DIRECTORY")] data: PathBuf, - /* - * Test option, makes the search for new work sleep and sometimes - * skip doing work. XXX Note that the flow control between upstairs - * and downstairs is not yet implemented. By turning on this option - * it's possible to deadlock. - */ + + /// Test option, makes the search for new work sleep and sometimes + /// skip doing work. #[structopt(long)] lossy: bool, @@ -152,9 +151,11 @@ enum Args { * If this option is provided along with the address:port of the * oximeter server, the downstairs will publish stats. */ - #[structopt(long)] + /// Use this address:port to send stats to an Oximeter server. + #[structopt(long, name = "OXIMETER_ADDRESS:PORT")] oximeter: Option, + /// Listen on this port for the upstairs to connect to us. #[structopt(short, long, default_value = "9000")] port: u16, @@ -175,6 +176,7 @@ enum Args { #[structopt(long, default_value = "rw")] mode: Mode, }, + RepairAPI, Serve { #[structopt(short, long)] trace_endpoint: Option, @@ -525,14 +527,34 @@ where d.add_work(*uuid, *ds_id, new_read).await?; Some(*ds_id) } + Message::ExtentFlush(rep_id, eid, _cid, flush_number, gen_number) => { + println!( + "{} Flush extent {} with f:{} g:{}", + rep_id, eid, flush_number, gen_number + ); + let msg = { + let d = ad.lock().await; + match d.region.region_flush_extent( + *eid, + *flush_number, + *gen_number, + ) { + Ok(()) => Message::RepairAckId(*rep_id), + Err(e) => Message::ExtentError(*rep_id, *eid, e), + } + }; + let mut fw = fw.lock().await; + fw.send(msg).await?; + return Ok(()); + } Message::ExtentClose(rep_id, eid) => { - println!("{} Close extent {}", rep_id, eid); + println!("{} Close extent {}", rep_id, eid); let msg = { let mut d = ad.lock().await; - match d.region.extents.get_mut(*eid as usize) { + match d.region.extents.get_mut(*eid) { Some(ext) => { ext.close()?; - Message::ExtentCloseAck(*rep_id) + Message::RepairAckId(*rep_id) } None => Message::ExtentError( *rep_id, @@ -545,12 +567,28 @@ where fw.send(msg).await?; return Ok(()); } + Message::ExtentRepair(rep_id, eid, sc, repair_addr, dest) => { + println!( + "{} Repair extent {} source:[{}] {:?} dest:{:?}", + rep_id, eid, sc, repair_addr, dest + ); + let msg = { + let mut d = ad.lock().await; + match d.region.repair_extent(*eid, *repair_addr).await { + Ok(()) => Message::RepairAckId(*rep_id), + Err(e) => Message::ExtentError(*rep_id, *eid, e), + } + }; + let mut fw = fw.lock().await; + fw.send(msg).await?; + return Ok(()); + } Message::ExtentReopen(rep_id, eid) => { println!("{} Reopen extent {}", rep_id, eid); let msg = { let mut d = ad.lock().await; - match d.region.reopen_extent(*eid as usize) { - Ok(()) => Message::ExtentReopenAck(*rep_id), + match d.region.reopen_extent(*eid) { + Ok(()) => Message::RepairAckId(*rep_id), Err(e) => Message::ExtentError(*rep_id, *eid, e), } }; @@ -1071,8 +1109,8 @@ where * downstairs work queue. */ #[derive(Debug)] -struct Downstairs { - region: Region, +pub struct Downstairs { + pub region: Region, work: Mutex, lossy: bool, // Test flag, enables pauses and skipped jobs return_errors: bool, // Test flag @@ -1800,6 +1838,10 @@ async fn main() -> Result<()> { ) .await } + Args::RepairAPI => { + let _ = repair::build_api(true); + Ok(()) + } Args::Serve { trace_endpoint, bind_addr, @@ -1920,6 +1962,18 @@ async fn start_downstairs( }); } + // Start the repair server on the same address at port + REPAIR_PORT_OFFSET + let rport = port + REPAIR_PORT_OFFSET; + let repair_address = match address { + IpAddr::V4(ipv4) => SocketAddr::new(std::net::IpAddr::V4(ipv4), rport), + IpAddr::V6(ipv6) => SocketAddr::new(std::net::IpAddr::V6(ipv6), rport), + }; + let dss = d.clone(); + tokio::spawn(async move { + let s = repair::repair_main(&dss, repair_address).await; + println!("Got {:?} from repair main", s); + }); + let listen_on = match address { IpAddr::V4(ipv4) => SocketAddr::new(std::net::IpAddr::V4(ipv4), port), IpAddr::V6(ipv6) => SocketAddr::new(std::net::IpAddr::V6(ipv6), port), @@ -1928,7 +1982,6 @@ async fn start_downstairs( /* * Establish a listen server on the port. */ - // let listen_on = SocketAddrV4::new(address, port); println!("Using address: {:?}", listen_on); let listener = TcpListener::bind(&listen_on).await?; diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index 2656a1a15..99308d8a2 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -1,8 +1,10 @@ // Copyright 2021 Oxide Computer Company use std::collections::HashMap; use std::convert::TryInto; -use std::fs::{File, OpenOptions}; +use std::fmt; +use std::fs::{rename, File, OpenOptions}; use std::io::{Read, Seek, SeekFrom, Write}; +use std::net::SocketAddr; use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; use std::sync::{Mutex, MutexGuard}; @@ -10,8 +12,12 @@ use std::sync::{Mutex, MutexGuard}; use anyhow::{bail, Result}; use crucible_common::*; use crucible_protocol::{EncryptionContext, SnapshotDetails}; +use futures::TryStreamExt; +use repair_client::types::FileType; +use repair_client::Client; use rusqlite::{params, Connection}; use serde::{Deserialize, Serialize}; +use tokio::macros::support::Pin; use tracing::instrument; #[derive(Debug)] @@ -380,7 +386,6 @@ pub struct ExtentMeta { * Increasing value provided from upstairs every time it connects to * a downstairs. Used to help break ties if flash numbers are the same * on extents. - * Not currently connected to anything XXX */ pub gen_number: u64, /** @@ -406,15 +411,114 @@ impl Default for ExtentMeta { } } +#[derive(Debug, Clone)] +pub enum ExtentType { + Data, + Db, + DbShm, + DbWal, +} + +impl fmt::Display for ExtentType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + ExtentType::Data => Ok(()), + ExtentType::Db => write!(f, "db"), + ExtentType::DbShm => write!(f, "db-shm"), + ExtentType::DbWal => write!(f, "db-wal"), + } + } +} + /** - * Produce a PathBuf that refers to the backing file for extent "number", - * anchored under "dir". + * Take an ExtentType and translate it into the corresponding + * FileType from the repair client. */ -fn extent_path>(dir: P, number: u32) -> PathBuf { +impl ExtentType { + fn to_file_type(&self) -> FileType { + match self { + ExtentType::Data => FileType::Data, + ExtentType::Db => FileType::Db, + ExtentType::DbShm => FileType::DbShm, + ExtentType::DbWal => FileType::DbWal, + } + } +} + +/** + * Produce the string name of the data file for a given extent number + */ +pub fn extent_file_name(number: u32, extent_type: ExtentType) -> String { + match extent_type { + ExtentType::Data => { + format!("{:03X}", number & 0xFFF) + } + ExtentType::Db | ExtentType::DbShm | ExtentType::DbWal => { + format!("{:03X}.{}", number & 0xFFF, extent_type) + } + } +} + +/** + * Produce a PathBuf that refers to the containing directory for extent + * "number", anchored under "dir". + */ +pub fn extent_dir>(dir: P, number: u32) -> PathBuf { let mut out = dir.as_ref().to_path_buf(); out.push(format!("{:02X}", (number >> 24) & 0xFF)); out.push(format!("{:03X}", (number >> 12) & 0xFFF)); - out.push(format!("{:03X}", number & 0xFFF)); + out +} + +/** + * Produce a PathBuf that refers to the backing file for extent "number", + * anchored under "dir". + */ +pub fn extent_path>(dir: P, number: u32) -> PathBuf { + let mut out = extent_dir(dir, number); + out.push(extent_file_name(number, ExtentType::Data)); + out +} + +/** + * Produce a PathBuf that refers to the copy directory we create for + * a given extent "number", This directory will hold the files we + * transfer from the source downstairs. + * anchored under "dir". + */ +pub fn copy_dir>(dir: P, number: u32) -> PathBuf { + let mut out = extent_dir(dir, number); + out.push(extent_file_name(number, ExtentType::Data)); + out.set_extension("copy".to_string()); + out +} + +/** + * Produce a PathBuf that refers to the replace directory we use for + * a given extent "number". This directory is generated (see below) when + * all the files needed for repairing an extent have been added to the + * copy directory and we are ready to start replacing the extents files. + * anchored under "dir". The actual generation of this directory will + * be done by renaming the copy directory. + */ +pub fn replace_dir>(dir: P, number: u32) -> PathBuf { + let mut out = extent_dir(dir, number); + out.push(extent_file_name(number, ExtentType::Data)); + out.set_extension("replace".to_string()); + out +} +/** + * Produce a PathBuf that refers to the completed directory we use for + * a given extent "number". This directory is generated (see below) when + * all the files needed for repairing an extent have been copied to their + * final destination and everything has been flushed. + * The actual generation of this directory will be done by renaming the + * replace directory. + */ +pub fn completed_dir>(dir: P, number: u32) -> PathBuf { + let mut out = extent_dir(dir, number); + out.push(extent_file_name(number, ExtentType::Data)); + out.set_extension("completed".to_string()); out } @@ -424,6 +528,47 @@ fn config_path>(dir: P) -> PathBuf { out } +/** + * Remove directories associated with repair except for the replace + * directory. Replace is handled specifically during extent open. + */ +pub fn remove_copy_cleanup_dir>(dir: P, eid: u32) -> Result<()> { + let mut remove_dirs = vec![copy_dir(&dir, eid)]; + remove_dirs.push(completed_dir(&dir, eid)); + + for d in remove_dirs { + if Path::new(&d).exists() { + println!("Deleting dir: {:?}", d); + std::fs::remove_dir_all(&d)?; + } + } + Ok(()) +} + +/** + * Validate a list of sorted repair files. + * There are either two or four files we expect to find, any more or less + * and we have a bad list. No duplicates. + */ +pub fn validate_repair_files(eid: usize, files: &[String]) -> bool { + println!("validate {} with {:?}", eid, files); + let eid = eid as u32; + + let some = vec![ + extent_file_name(eid, ExtentType::Data), + extent_file_name(eid, ExtentType::Db), + ]; + + let mut all = some.clone(); + all.extend(vec![ + extent_file_name(eid, ExtentType::DbShm), + extent_file_name(eid, ExtentType::DbWal), + ]); + + // Either we have some or all. + files == some || files == all +} + impl Extent { /** * Open an existing extent file at the location requested. @@ -439,11 +584,25 @@ impl Extent { * Store extent data in files within a directory hierarchy so that * there are not too many files in any level of that hierarchy. */ - let mut path = extent_path(dir, number); + let mut path = extent_path(&dir, number); let bcount = def.extent_size().value; let size = def.block_size().checked_mul(bcount).unwrap(); + remove_copy_cleanup_dir(&dir, number)?; + + // If the replace directory exists for this extent, then it means + // a repair was interrupted before it could finish. We will continue + // the repair before we open the extent. + let replace_dir = replace_dir(&dir, number); + if !read_only && Path::new(&replace_dir).exists() { + println!( + "Extent {} found replacement dir, finishing replacement", + number + ); + move_replacement_extent(&dir, number as usize)?; + } + /* * Open the extent file and verify the size is as we expect. */ @@ -489,7 +648,6 @@ impl Extent { */ pub fn close(&mut self) -> Result<()> { let inner = self.inner.as_ref().unwrap().lock().unwrap(); - println!("Close {} {:?}", self.number, inner); drop(inner); self.inner = None; Ok(()) @@ -509,7 +667,7 @@ impl Extent { * Store extent data in files within a directory hierarchy so that * there are not too many files in any level of that hierarchy. */ - let mut path = extent_path(dir, number); + let mut path = extent_path(&dir, number); /* * Verify there are not existing extent files. @@ -517,6 +675,7 @@ impl Extent { if Path::new(&path).exists() { bail!("Extent file already exists {:?}", path); } + remove_copy_cleanup_dir(&dir, number)?; let bcount = def.extent_size().value; let size = def.block_size().checked_mul(bcount).unwrap(); @@ -604,6 +763,54 @@ impl Extent { }) } + /** + * Create the copy directory for this extent. + */ + fn create_copy_dir>(&self, dir: P) -> Result { + let cp = copy_dir(dir, self.number); + + /* + * Verify the copy directory does not exist + */ + if Path::new(&cp).exists() { + panic!("Extent copy directory already exists {:?}", cp); + } + + println!("Create copy dir {:?}", cp); + std::fs::create_dir_all(&cp)?; + Ok(cp) + } + + /** + * Create the file that will hold a copy of an extent from a + * remote downstairs. + */ + fn create_copy_file( + &self, + mut copy_dir: PathBuf, + extension: Option, + ) -> Result { + // Get the base extent name before we consider the actual Type + let name = extent_file_name(self.number, ExtentType::Data); + copy_dir.push(name); + if let Some(extension) = extension { + let ext = format!("{}", extension); + copy_dir.set_extension(ext); + } + let copy_path = copy_dir; + + if Path::new(©_path).exists() { + panic!("copy file already exists {:?}", copy_path); + } + + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(©_path)?; + Ok(file) + } + pub fn inner(&self) -> MutexGuard { self.inner.as_ref().unwrap().lock().unwrap() } @@ -808,7 +1015,7 @@ extern "C" { */ #[derive(Debug)] pub struct Region { - dir: PathBuf, + pub dir: PathBuf, def: RegionDefinition, pub extents: Vec, read_only: bool, @@ -954,7 +1161,6 @@ impl Region { * - matches our eid * - is not read-only */ - println!("reopen extent {} {:?}", eid, self.extents[eid].inner); assert!(!self.extents[eid].inner.is_some()); assert_eq!(self.extents[eid].number, eid as u32); assert!(!self.read_only); @@ -965,6 +1171,160 @@ impl Region { Ok(()) } + /** + * Repair an extent from another downstairs + * + * We need to repair an extent in such a way that an interruption + * at any time can be recovered from. + * + * Let us assume we are repairing extent 012 + * 1. Make new 012.copy dir (extent name plus: .copy) + * 2. Get all extent files from source side, put in 012.copy directory + * 3. fsync files we just downloaded + * 4. Rename 012.copy dir to 012.replace dir + * 5. fsync extent directory ( 00/000/ where the extent files live) + * 6. Replace current extent 012 files with copied files of same name + * from 012.replace dir + * 7. Remove any files in extent dir that don't exist in replacing dir + * For example, if the replacement extent has 012 and 012.db, but + * the current (bad) extent has 012 012.db 012.db-shm + * and 012.db-wal, we want to remove the 012.db-shm and 012.db-wal + * files when we replace 012 and 012.db with the new versions. + * 8. fsync files after copying them (new location). + * 9. fsync containing extent dir + * 10. Rename 012.replace dir to 012.completed dir. + * 11. fsync extent dir again (so dir rename is persisted) + * 12. Delete completed dir. + * 13. fsync extent dir again (so dir rename is persisted) + * + * This also requires the following behavior on every extent open: + * A. If xxx.copy directory found, delete it. + * B. If xxx.completed directory found, delete it. + * C. If xxx.replace dir found start at step 4 above and continue + * on through 6. + * D. Only then, open extent. + */ + pub async fn repair_extent( + &mut self, + eid: usize, + repair_addr: SocketAddr, + ) -> Result<(), CrucibleError> { + // Make sure the extent: + // is currently closed, matches our eid, is not read-only + assert!(self.extents[eid].inner.is_none()); + assert_eq!(self.extents[eid].number, eid as u32); + assert!(!self.read_only); + + self.get_extent_copy(eid, repair_addr).await?; + + // Returning from get_extent_copy means we have copied all our + // files and moved the copy directory to replace directory. + // Now, replace the current extent files with the replacement ones. + move_replacement_extent(&self.dir, eid)?; + + Ok(()) + } + + /** + * Connect to the source and pull over all the extent files for the + * given extent ID. + * The files are loaded into the copy_dir for the given extent. + * After all the files have been copied locally, we rename the + * copy_dir to replace_dir. + */ + pub async fn get_extent_copy( + &mut self, + eid: usize, + repair_addr: SocketAddr, + ) -> Result<(), CrucibleError> { + // An extent must be closed before we replace its files. + assert!(self.extents[eid].inner.is_none()); + + // Make sure copy, replace, and cleanup directories don't exist yet. + // We don't need them yet, but if they do exist, then something + // is wrong. + let rd = replace_dir(&self.dir, eid as u32); + if rd.exists() { + crucible_bail!( + IoError, + "Replace directory: {:?} already exists", + rd, + ); + } + + let extent = &self.extents[eid]; + let copy_dir = extent.create_copy_dir(&self.dir)?; + + // XXX TLS someday? Authentication? + let url = format!("http://{:?}", repair_addr); + let repair_server = Client::new(&url); + + let mut repair_files = repair_server + .get_files_for_extent(eid as u32) + .await + .unwrap() + .into_inner(); + + repair_files.sort(); + println!("Found repair files: {:?}", repair_files); + + // The repair file list should always contain the extent data + // file itself, and the .db file (metadata) for that extent. + // Missing these means the repair will not succeed. + // Optionally, there could be both .db-shm and .db-wal. + if !validate_repair_files(eid, &repair_files) { + panic!("Invalid repair file list: {:?}", repair_files); + } + + // First, copy the main extent data file. + let extent_copy = + extent.create_copy_file(copy_dir.clone(), None).unwrap(); + let repair_stream = repair_server + .get_extent_file(eid as u32, FileType::Data) + .await + .unwrap(); + save_stream_to_file(extent_copy, repair_stream.into_inner()).await?; + + // The .db file is also required to exist for any valid extent. + let extent_db = extent + .create_copy_file(copy_dir.clone(), Some(ExtentType::Db)) + .unwrap(); + let repair_stream = repair_server + .get_extent_file(eid as u32, FileType::Db) + .await + .unwrap(); + save_stream_to_file(extent_db, repair_stream.into_inner()).await?; + + // These next two are optional. + for opt_file in &[ExtentType::DbShm, ExtentType::DbWal] { + let filename = extent_file_name(eid as u32, opt_file.clone()); + if repair_files.contains(&filename) { + let extent_shm = extent + .create_copy_file(copy_dir.clone(), Some(opt_file.clone())) + .unwrap(); + let repair_stream = repair_server + .get_extent_file(eid as u32, opt_file.to_file_type()) + .await + .unwrap(); + save_stream_to_file(extent_shm, repair_stream.into_inner()) + .await?; + } + } + + // After we have all files: move the repair dir. + println!( + "Repair files downloaded, move directory {:?} to {:?}", + copy_dir, rd + ); + + // XXX fsync the parent directory (the extent dir) + rename(copy_dir.clone(), rd.clone())?; + + // XXX fsync the parent directory (the extent dir) + + Ok(()) + } + /** * if there is a difference between what our actual extent_count is * and what is requested, go out and create the new extent files. @@ -1141,6 +1501,28 @@ impl Region { Ok(responses) } + /* + * Send a flush to just the given extent. The provided flush number is + * what an extent should use if a flush is required. + */ + #[instrument] + pub fn region_flush_extent( + &self, + eid: usize, + flush_number: u64, + gen_number: u64, + ) -> Result<(), CrucibleError> { + println!( + "Flush just extent {} with f:{} and g:{}", + eid, flush_number, gen_number + ); + + let extent = &self.extents[eid]; + extent.flush_block(flush_number, gen_number)?; + + Ok(()) + } + /* * Send a flush to all extents. The provided flush number is * what an extent should use if a flush is required. @@ -1229,12 +1611,137 @@ impl Region { } } +/** + * Copy the contents of the replacement directory on to the extent + * files in the extent directory. + */ +pub fn move_replacement_extent>( + region_dir: P, + eid: usize, +) -> Result<(), CrucibleError> { + let destination_dir = extent_dir(®ion_dir, eid as u32); + let extent_file_name = extent_file_name(eid as u32, ExtentType::Data); + let replace_dir = replace_dir(®ion_dir, eid as u32); + let completed_dir = completed_dir(®ion_dir, eid as u32); + + // XXX replace panic with CrucibleError + assert!(Path::new(&replace_dir).exists()); + assert!(!Path::new(&completed_dir).exists()); + + println!("Copy files from {:?} in {:?}", replace_dir, destination_dir,); + + // Setup the original and replacement file names. + let mut new_file = replace_dir.clone(); + new_file.push(extent_file_name.clone()); + + let mut original_file = destination_dir; + original_file.push(extent_file_name); + + // Copy the new file (the one we copied from the source side) on top + // of the original file. + if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { + panic!("copy of {:?} to {:?} got: {:?}", new_file, original_file, e); + } + // XXX fsync original_file + + new_file.set_extension("db"); + original_file.set_extension("db"); + if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { + panic!("copy {:?} to {:?} got: {:?}", new_file, original_file, e); + } + // XXX fsync original_file + + // The .db-shm and .db-wal files may or may not exist. If they don't + // exist on the source side, then be sure to remove them locally to + // avoid database corruption from a mismatch between old and new. + new_file.set_extension("db-shm"); + original_file.set_extension("db-shm"); + if new_file.exists() { + if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { + panic!("copy {:?} to {:?} got: {:?}", new_file, original_file, e); + } + // XXX fsync original_file + } else { + println!( + "Remove old file {:?} as there is no replacement", + original_file.clone() + ); + if original_file.exists() { + std::fs::remove_file(&original_file).unwrap(); + } + } + + new_file.set_extension("db-wal"); + original_file.set_extension("db-wal"); + if new_file.exists() { + if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { + panic!("copy {:?} to {:?} got: {:?}", new_file, original_file, e); + } + // XXX fsync original_file + } else { + println!( + "Remove old file {:?} as there is no replacement", + original_file.clone() + ); + if original_file.exists() { + std::fs::remove_file(&original_file).unwrap(); + } + } + + // XXX fsync destination_dir; + + // After we have all files: move the copy dir. + println!("Move directory {:?} to {:?}", replace_dir, completed_dir); + rename(replace_dir, &completed_dir)?; + + // XXX fsync the parent directory (the extent dir) + + std::fs::remove_dir_all(&completed_dir)?; + + // XXX fsync parent dir one more time + Ok(()) +} + +/** + * Given: + * The stream returned to us from the progenitor endpoint + * A local File, already created and opened, + * Stream the data from the endpoint into the file. + * When the stream is completed, fsync the file. + */ +pub async fn save_stream_to_file( + mut file: File, + mut stream: Pin< + Box< + dyn futures::Stream< + Item = std::result::Result, + > + std::marker::Send, + >, + >, +) -> Result<(), CrucibleError> { + loop { + match stream.try_next().await { + Ok(Some(bytes)) => { + file.write_all(&bytes).unwrap(); + } + Ok(None) => break, + Err(e) => panic!("extent stream {}", e), + } + } + if unsafe { fsync(file.as_raw_fd()) } == -1 { + let e = std::io::Error::last_os_error(); + crucible_bail!(IoError, "repair {:?}: fsync failure: {:?}", file, e); + } + Ok(()) +} + #[cfg(test)] mod test { use super::*; use crate::dump::dump_region; use bytes::{BufMut, BytesMut}; use rand::{Rng, RngCore}; + use std::fs::rename; use std::path::PathBuf; use tempfile::tempdir; use uuid::Uuid; @@ -1280,6 +1787,43 @@ mod test { region_options } + #[test] + fn copy_extent_dir() -> Result<()> { + // Create the region, make three extents + // Create the copy directory, make sure it exists. + // Remove the copy directory, make sure it goes away. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + let ext_one = &mut region.extents[1]; + let cp = copy_dir(&dir, 1); + + assert!(ext_one.create_copy_dir(&dir).is_ok()); + assert!(Path::new(&cp).exists()); + assert!(remove_copy_cleanup_dir(&dir, 1).is_ok()); + assert!(!Path::new(&cp).exists()); + Ok(()) + } + + #[test] + fn copy_extent_dir_twice() -> () { + // Create the region, make three extents + // Create the copy directory, make sure it exists. + // Verify a second create will fail. + // Catch the panic just from the final step, all others should pass. + let dir = tempdir().unwrap(); + let mut region = Region::create(&dir, new_region_options()).unwrap(); + region.extend(3).unwrap(); + + let ext_one = &mut region.extents[1]; + let _ = ext_one.create_copy_dir(&dir); + let result = + std::panic::catch_unwind(|| ext_one.create_copy_dir(&dir).unwrap()); + assert!(result.is_err()); + () + } + #[test] fn close_extent() -> Result<()> { // Create the region, make three extents @@ -1292,6 +1836,8 @@ mod test { ext_one.close()?; assert!(!ext_one.inner.is_some()); + // Make copy directory for this extent + let cp = ext_one.create_copy_dir(&dir)?; // Reopen extent 1 region.reopen_extent(1)?; @@ -1302,9 +1848,368 @@ mod test { // Make sure the eid matches assert_eq!(ext_one.number, 1); + // Make sure the copy directory is gone + assert!(!Path::new(&cp).exists()); + + Ok(()) + } + + #[test] + fn reopen_extent_cleanup_one() -> Result<()> { + // Verify the copy directory is removed if an extent is + // opened with that directory present. + // Create the region, make three extents + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Close extent 1 + let ext_one = &mut region.extents[1]; + ext_one.close()?; + assert!(!ext_one.inner.is_some()); + + // Make copy directory for this extent + let cp = ext_one.create_copy_dir(&dir)?; + + // Reopen extent 1 + region.reopen_extent(1)?; + + // Verify extent one is valid + let ext_one = &mut region.extents[1]; + assert!(ext_one.inner.is_some()); + + // Make sure copy directory was removed + assert!(!Path::new(&cp).exists()); + + Ok(()) + } + + #[test] + fn reopen_extent_cleanup_two() -> Result<()> { + // Verify that the completed directory is removed if present + // when an extent is re-opened. + // Create the region, make three extents + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Close extent 1 + let ext_one = &mut region.extents[1]; + ext_one.close()?; + assert!(!ext_one.inner.is_some()); + + // Make copy directory for this extent + let cp = ext_one.create_copy_dir(&dir)?; + + // Step through the replacement dir, but don't do any work. + let rd = replace_dir(&dir, 1); + rename(cp.clone(), rd.clone())?; + + // Finish up the fake repair, but leave behind the completed dir. + let cd = completed_dir(&dir, 1); + rename(rd.clone(), cd.clone())?; + + // Reopen extent 1 + region.reopen_extent(1)?; + + // Verify extent one is valid + let ext_one = &mut region.extents[1]; + assert!(ext_one.inner.is_some()); + + // Make sure all repair directories are gone + assert!(!Path::new(&cp).exists()); + assert!(!Path::new(&rd).exists()); + assert!(!Path::new(&cd).exists()); + + Ok(()) + } + + #[test] + fn reopen_extent_cleanup_replay() -> Result<()> { + // Verify on extent open that a replacement directory will + // have it's contents replace an extents existing data and + // metadata files. + // Create the region, make three extents + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Close extent 1 + let ext_one = &mut region.extents[1]; + ext_one.close()?; + assert!(!ext_one.inner.is_some()); + + // Make copy directory for this extent + let cp = ext_one.create_copy_dir(&dir)?; + + // We are simulating the copy of files from the "source" repair + // extent by copying the files from extent zero into the copy + // directory. + let dest_name = extent_file_name(1, ExtentType::Data); + let mut source_path = extent_path(&dir, 0); + let mut dest_path = cp.clone(); + dest_path.push(dest_name); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + source_path.set_extension("db"); + dest_path.set_extension("db"); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + source_path.set_extension("db-shm"); + dest_path.set_extension("db-shm"); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + source_path.set_extension("db-wal"); + dest_path.set_extension("db-wal"); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + let rd = replace_dir(&dir, 1); + rename(cp.clone(), rd.clone())?; + + // Now we have a replace directory, we verify that special + // action is taken when we (re)open the extent. + + // Reopen extent 1 + region.reopen_extent(1)?; + + let ext_one = &mut region.extents[1]; + assert!(ext_one.inner.is_some()); + + // Make sure all repair directories are gone + assert!(!Path::new(&cp).exists()); + assert!(!Path::new(&rd).exists()); + + // The reopen should have replayed the repair, renamed, then + // deleted this directory. + let cd = completed_dir(&dir, 1); + assert!(!Path::new(&cd).exists()); + + Ok(()) + } + + #[test] + fn reopen_extent_cleanup_replay_short() -> Result<()> { + // test move_replacement_extent(), create a copy dir, populate it + // and let the reopen do the work. This time we make sure our + // copy dir only has extent data and .db files, and not .db-shm + // nor .db-wal. Verify these files are delete from the original + // extent after the reopen has cleaned them up. + // Create the region, make three extents + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Close extent 1 + let ext_one = &mut region.extents[1]; + ext_one.close()?; + assert!(!ext_one.inner.is_some()); + + // Make copy directory for this extent + let cp = ext_one.create_copy_dir(&dir)?; + + // We are simulating the copy of files from the "source" repair + // extent by copying the files from extent zero into the copy + // directory. + let dest_name = extent_file_name(1, ExtentType::Data); + let mut source_path = extent_path(&dir, 0); + let mut dest_path = cp.clone(); + dest_path.push(dest_name); + println!("cp {:?} to {:?}", source_path, dest_path); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + source_path.set_extension("db"); + dest_path.set_extension("db"); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + let rd = replace_dir(&dir, 1); + rename(cp.clone(), rd.clone())?; + + // The close may remove the db-shm and db-wal files, manually + // create them here, just to verify they are removed after the + // reopen as they are not included in the files to be recovered + // and this test exists to verify they will be deleted. + let mut invalid_db = extent_path(&dir, 1); + invalid_db.set_extension("db-shm"); + println!("Recreate {:?}", invalid_db); + std::fs::copy(source_path.clone(), invalid_db.clone())?; + assert!(Path::new(&invalid_db).exists()); + + invalid_db.set_extension("db-wal"); + println!("Recreate {:?}", invalid_db); + std::fs::copy(source_path.clone(), invalid_db.clone())?; + assert!(Path::new(&invalid_db).exists()); + + // Now we have a replace directory populated and our files to + // delete are ready. We verify that special action is taken + // when we (re)open the extent. + + // Reopen extent 1 + region.reopen_extent(1)?; + + // Make sure there is no longer a db-shm and db-wal + dest_path.set_extension("db-shm"); + assert!(!Path::new(&dest_path).exists()); + dest_path.set_extension("db-wal"); + assert!(!Path::new(&dest_path).exists()); + + let ext_one = &mut region.extents[1]; + assert!(ext_one.inner.is_some()); + + // Make sure all repair directories are gone + assert!(!Path::new(&cp).exists()); + assert!(!Path::new(&rd).exists()); + + // The reopen should have replayed the repair, renamed, then + // deleted this directory. + let cd = completed_dir(&dir, 1); + assert!(!Path::new(&cd).exists()); + + Ok(()) + } + + #[test] + fn reopen_extent_no_replay_readonly() -> Result<()> { + // Verify on a read-only region a replacement directory will + // be ignored. This is required by the dump command, as it would + // be tragic if the command to inspect a region changed that + // region's contents in the act of inspecting. + + // Create the region, make three extents + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Make copy directory for this extent + let ext_one = &mut region.extents[1]; + let cp = ext_one.create_copy_dir(&dir)?; + + // We are simulating the copy of files from the "source" repair + // extent by copying the files from extent zero into the copy + // directory. + let dest_name = extent_file_name(1, ExtentType::Data); + let mut source_path = extent_path(&dir, 0); + let mut dest_path = cp.clone(); + dest_path.push(dest_name); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + source_path.set_extension("db"); + dest_path.set_extension("db"); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + let rd = replace_dir(&dir, 1); + rename(cp.clone(), rd.clone())?; + + drop(region); + + // Open up the region read_only now. + let mut region = Region::open(&dir, new_region_options(), false, true)?; + + // Verify extent 1 has opened again. + let ext_one = &mut region.extents[1]; + assert!(ext_one.inner.is_some()); + + // Make sure repair directory is still present + assert!(Path::new(&rd).exists()); + Ok(()) } + #[test] + fn validate_repair_files_empty() { + // No repair files is a failure + assert_eq!(validate_repair_files(1, &Vec::new()), false); + } + + #[test] + fn validate_repair_files_good() { + // This is an expected list of files for an extent + let good_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-wal".to_string(), + ]; + + assert!(validate_repair_files(1, &good_files)); + } + + #[test] + fn validate_repair_files_also_good() { + // This is also an expected list of files for an extent + let good_files: Vec = + vec!["001".to_string(), "001.db".to_string()]; + assert!(validate_repair_files(1, &good_files)); + } + + #[test] + fn validate_repair_files_duplicate() { + // duplicate file names for extent 2 + let good_files: Vec = + vec!["002".to_string(), "002".to_string()]; + assert_eq!(validate_repair_files(2, &good_files), false); + } + + #[test] + fn validate_repair_files_duplicate_pair() { + // duplicate file names for extent 2 + let good_files: Vec = vec![ + "002".to_string(), + "002".to_string(), + "002.db".to_string(), + "002.db".to_string(), + ]; + assert_eq!(validate_repair_files(2, &good_files), false); + } + + #[test] + fn validate_repair_files_quad_duplicate() { + // This is an expected list of files for an extent + let good_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-shm".to_string(), + ]; + assert_eq!(validate_repair_files(1, &good_files), false); + } + + #[test] + fn validate_repair_files_offbyon() { + // Incorrect file names for extent 2 + let good_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-wal".to_string(), + ]; + + assert_eq!(validate_repair_files(2, &good_files), false); + } + + #[test] + fn validate_repair_files_too_good() { + // Duplicate file in list + let good_files: Vec = vec![ + "001".to_string(), + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-wal".to_string(), + ]; + assert_eq!(validate_repair_files(1, &good_files), false); + } + + #[test] + fn validate_repair_files_not_good_enough() { + // We require 2 or 4 files, not 3 + let good_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-wal".to_string(), + ]; + assert_eq!(validate_repair_files(1, &good_files), false); + } + #[test] fn reopen_all_extents() -> Result<()> { // Create the region, make three extents @@ -1429,6 +2334,59 @@ mod test { assert_eq!((), ext.check_input(Block::new_512(1), &data).unwrap()); } + #[test] + fn extent_name_basic() { + assert_eq!(extent_file_name(4, ExtentType::Data), "004"); + } + #[test] + fn extent_name_basic_ext() { + assert_eq!(extent_file_name(4, ExtentType::Db), "004.db"); + } + #[test] + fn extent_name_basic_ext_shm() { + assert_eq!(extent_file_name(4, ExtentType::DbShm), "004.db-shm"); + } + #[test] + fn extent_name_basic_ext_wal() { + assert_eq!(extent_file_name(4, ExtentType::DbWal), "004.db-wal"); + } + #[test] + fn extent_name_basic_two() { + assert_eq!(extent_file_name(10, ExtentType::Data), "00A"); + } + #[test] + fn extent_name_basic_three() { + assert_eq!(extent_file_name(59, ExtentType::Data), "03B"); + } + #[test] + fn extent_name_max() { + assert_eq!(extent_file_name(u32::MAX, ExtentType::Data), "FFF"); + } + #[test] + fn extent_name_min() { + assert_eq!(extent_file_name(u32::MIN, ExtentType::Data), "000"); + } + + #[test] + fn extent_dir_basic() { + assert_eq!(extent_dir("/var/region", 4), p("/var/region/00/000/")); + } + + #[test] + fn extent_dir_max() { + assert_eq!( + extent_dir("/var/region", u32::MAX), + p("/var/region/FF/FFF") + ); + } + #[test] + fn extent_dir_min() { + assert_eq!( + extent_dir("/var/region", u32::MIN), + p("/var/region/00/000/") + ); + } + #[test] fn extent_path_min() { assert_eq!( @@ -1436,6 +2394,13 @@ mod test { p("/var/region/00/000/000") ); } + #[test] + fn copy_path_basic() { + assert_eq!( + copy_dir("/var/region", 4), + p("/var/region/00/000/004.copy") + ); + } #[test] fn extent_path_three() { diff --git a/downstairs/src/repair.rs b/downstairs/src/repair.rs new file mode 100644 index 000000000..af5a698fd --- /dev/null +++ b/downstairs/src/repair.rs @@ -0,0 +1,390 @@ +// Copyright 2022 Oxide Computer Company +use std::path::PathBuf; +use std::sync::Arc; + +use dropshot::ApiDescription; +use dropshot::ConfigDropshot; +use dropshot::ConfigLogging; +use dropshot::ConfigLoggingLevel; +use dropshot::HttpError; +use dropshot::HttpResponseOk; +use dropshot::HttpServerStarter; +use dropshot::RequestContext; +use dropshot::{endpoint, Path}; +use http::{Response, StatusCode}; +use hyper::Body; +use schemars::JsonSchema; +use serde::Deserialize; + +use super::*; +use crate::region::{extent_dir, extent_file_name, extent_path, ExtentType}; + +/** + * Our context is the root of the region we want to serve. + */ +pub struct FileServerContext { + region_dir: PathBuf, +} + +/** + * Build the API. If requested, dump it to stdout. + * This allows us to use the resulting output to build the client side. + */ +pub fn build_api( + show: bool, +) -> Result, String> { + let mut api = ApiDescription::new(); + api.register(get_extent_file).unwrap(); + api.register(get_files_for_extent).unwrap(); + + if show { + api.openapi("downstairs-repair", "1") + .write(&mut std::io::stdout()) + .map_err(|e| e.to_string())?; + } + Ok(api) +} + +pub async fn repair_main( + ds: &Arc>, + addr: SocketAddr, +) -> Result<(), String> { + /* + * We must specify a configuration with a bind address. + */ + let config_dropshot = ConfigDropshot { + bind_address: addr, + request_body_max_bytes: 1024, + tls: None, + }; + + /* + * For simplicity, configure an "info"-level logger that writes to + * stderr assuming that it's a terminal. + */ + let config_logging = ConfigLogging::StderrTerminal { + level: ConfigLoggingLevel::Info, + }; + let log = config_logging + .to_logger("example-basic") + .map_err(|error| format!("failed to create logger: {}", error))?; + + /* + * Build a description of the API + */ + let api = build_api(false)?; + + /* + * Record the region directory where all the extents and metadata + * files live. + */ + let ds = ds.lock().await; + let region_dir = ds.region.dir.clone(); + drop(ds); + + let context = FileServerContext { region_dir }; + + println!("Repair listens on {}", addr); + /* + * Set up the server. + */ + let server = HttpServerStarter::new(&config_dropshot, api, context, &log) + .map_err(|error| format!("failed to create server: {}", error))? + .start(); + + /* + * Wait for the server to stop. Note that there's not any code to shut + * down this server, so we should never get past this point. + */ + server.await +} + +#[derive(Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Eid { + eid: u32, +} + +#[derive(Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub enum FileType { + #[serde(rename = "data")] + Data, + #[serde(rename = "db")] + Database, + #[serde(rename = "db-shm")] + DatabaseSharedMemory, + #[serde(rename = "db-wal")] + DatabaseLog, +} + +#[derive(Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct FileSpec { + eid: u32, + file_type: FileType, +} + +#[endpoint { + method = GET, + path = "/newextent/{eid}/{fileType}", +}] +async fn get_extent_file( + rqctx: Arc>, + path: Path, +) -> Result, HttpError> { + let fs = path.into_inner(); + let eid = fs.eid; + + let mut extent_path = extent_path(rqctx.context().region_dir.clone(), eid); + match fs.file_type { + FileType::Database => { + extent_path.set_extension("db"); + } + FileType::DatabaseSharedMemory => { + extent_path.set_extension("db-wal"); + } + FileType::DatabaseLog => { + extent_path.set_extension("db-shm"); + } + FileType::Data => (), + }; + + get_a_file(extent_path).await +} + +async fn get_a_file(path: PathBuf) -> Result, HttpError> { + println!("Request for file {:?}", path); + /* + * Make sure our file is neither a link nor a directory. + */ + let m = path + .symlink_metadata() + .map_err(|_| HttpError::for_bad_request(None, "ENOENT".to_string()))?; + + if m.file_type().is_symlink() { + Err(HttpError::for_bad_request(None, "EMLINK".to_string())) + } else if path.is_dir() { + Err(HttpError::for_bad_request(None, "EBADF".to_string())) + } else { + let file = tokio::fs::File::open(&path).await.map_err(|_| { + HttpError::for_bad_request(None, "EBADF".to_string()) + })?; + + let file_stream = hyper_staticfile::FileBytesStream::new(file); + let content_type = "application/octet-stream".to_string(); + + Ok(Response::builder() + .status(StatusCode::OK) + .header(http::header::CONTENT_TYPE, content_type) + .body(file_stream.into_body())?) + } +} + +/** + * Get the list of files related to an extent. + * + * For a given extent, return a vec of strings representing the names of + * the files that exist for that extent. + */ +#[endpoint { + method = GET, + path = "/extent/{eid}/files", +}] +async fn get_files_for_extent( + rqctx: Arc>, + path: Path, +) -> Result>, HttpError> { + let eid = path.into_inner().eid; + let extent_dir = extent_dir(rqctx.context().region_dir.clone(), eid); + + // Some sanity checking on the extent path + let m = extent_dir + .symlink_metadata() + .map_err(|_| HttpError::for_bad_request(None, "ENOENT".to_string()))?; + + if m.file_type().is_symlink() { + Err(HttpError::for_bad_request(None, "EMLINK".to_string())) + } else if !extent_dir.is_dir() { + Err(HttpError::for_bad_request(None, "EBADF".to_string())) + } else { + let files = extent_file_list(extent_dir, eid).await?; + Ok(HttpResponseOk(files)) + } +} + +/** + * Return the list of extent files we have in our region directory + * that correspond to the given extent. Return an error if any + * of the required files are missing. + */ +async fn extent_file_list( + extent_dir: PathBuf, + eid: u32, +) -> Result, HttpError> { + let mut files = Vec::new(); + let possible_files = vec![ + (extent_file_name(eid, ExtentType::Data), true), + (extent_file_name(eid, ExtentType::Db), true), + (extent_file_name(eid, ExtentType::DbShm), false), + (extent_file_name(eid, ExtentType::DbWal), false), + ]; + + for (file, required) in possible_files.into_iter() { + let mut fullname = extent_dir.clone(); + fullname.push(file.clone()); + if fullname.exists() { + files.push(file); + } else if required { + println!("Needed file {} is missing", file); + return Err(HttpError::for_bad_request(None, "EBADF".to_string())); + } + } + + Ok(files) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::region::{extent_dir, extent_file_name}; + use tempfile::tempdir; + + fn new_region_options() -> crucible_common::RegionOptions { + let mut region_options: crucible_common::RegionOptions = + Default::default(); + let block_size = 512; + region_options.set_block_size(block_size); + region_options + .set_extent_size(Block::new(10, block_size.trailing_zeros())); + region_options + } + + #[tokio::test] + async fn extent_expected_files() -> Result<()> { + // Verify that the list of files returned for an extent matches + // what we expect. This is a hack of sorts as we are hard coding + // the expected names of files here in that test, rather than + // determine them through some programmatic means. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Determine the directory and name for expected extent files. + let ed = extent_dir(&dir, 1); + let mut ex_files = extent_file_list(ed, 1).await.unwrap(); + ex_files.sort(); + let expected = vec!["001", "001.db", "001.db-shm", "001.db-wal"]; + println!("files: {:?}", ex_files); + assert_eq!(ex_files, expected); + + Ok(()) + } + + #[tokio::test] + async fn extent_expected_files_short() -> Result<()> { + // Verify that the list of files returned for an extent matches + // what we expect. In this case we expect the extent data file and + // the .db file, but not the .db-shm or .db-wal database files. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Determine the directory and name for expected extent files. + let extent_dir = extent_dir(&dir, 1); + + // Delete db-wal and db-shm + let mut rm_file = extent_dir.clone(); + rm_file.push(extent_file_name(1, ExtentType::Data)); + rm_file.set_extension("db-wal"); + std::fs::remove_file(&rm_file).unwrap(); + rm_file.set_extension("db-shm"); + std::fs::remove_file(rm_file).unwrap(); + + let mut ex_files = extent_file_list(extent_dir, 1).await.unwrap(); + ex_files.sort(); + let expected = vec!["001", "001.db"]; + println!("files: {:?}", ex_files); + assert_eq!(ex_files, expected); + + Ok(()) + } + + #[tokio::test] + async fn extent_expected_files_short_with_close() -> Result<()> { + // Verify that the list of files returned for an extent matches + // what we expect. In this case we expect the extent data file and + // the .db file, but not the .db-shm or .db-wal database files. + // We close the extent here first, and on illumos that behaves + // a little different than elsewhere. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + let ext_one = &mut region.extents[1]; + ext_one.close()?; + + // Determine the directory and name for expected extent files. + let extent_dir = extent_dir(&dir, 1); + + // Delete db-wal and db-shm. On illumos the close of the extent + // may remove these for us, so we ignore errors on the removal. + let mut rm_file = extent_dir.clone(); + rm_file.push(extent_file_name(1, ExtentType::Data)); + rm_file.set_extension("db-wal"); + let _ = std::fs::remove_file(&rm_file); + rm_file.set_extension("db-shm"); + let _ = std::fs::remove_file(rm_file); + + let mut ex_files = extent_file_list(extent_dir, 1).await.unwrap(); + ex_files.sort(); + let expected = vec!["001", "001.db"]; + println!("files: {:?}", ex_files); + assert_eq!(ex_files, expected); + + Ok(()) + } + + #[tokio::test] + async fn extent_expected_files_fail() -> Result<()> { + // Verify that we get an error if the expected extent.db file + // is missing. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Determine the directory and name for expected extent files. + let extent_dir = extent_dir(&dir, 2); + + // Delete db + let mut rm_file = extent_dir.clone(); + rm_file.push(extent_file_name(2, ExtentType::Data)); + rm_file.set_extension("db"); + std::fs::remove_file(&rm_file).unwrap(); + + assert!(extent_file_list(extent_dir, 2).await.is_err()); + + Ok(()) + } + + #[tokio::test] + async fn extent_expected_files_fail_two() -> Result<()> { + // Verify that we get an error if the expected extent file + // is missing. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Determine the directory and name for expected extent files. + let extent_dir = extent_dir(&dir, 1); + + // Delete db + let mut rm_file = extent_dir.clone(); + rm_file.push(extent_file_name(1, ExtentType::Data)); + std::fs::remove_file(&rm_file).unwrap(); + + assert!(extent_file_list(extent_dir, 1).await.is_err()); + + Ok(()) + } +} diff --git a/openapi/README.md b/openapi/README.md index 33a02a391..acc83b2fa 100644 --- a/openapi/README.md +++ b/openapi/README.md @@ -3,3 +3,8 @@ # crucible-control.json Described in this file is the control API for Crucible Upstairs. This file can be generated with the `control-api` program. + +# downstairs-repair.json +This file describes the API for repairing between Crucible Downstairs. +This file can be generated by running the downstairs binary with the +`repair-api` option. diff --git a/openapi/downstairs-repair.json b/openapi/downstairs-repair.json new file mode 100644 index 000000000..1fb043a6b --- /dev/null +++ b/openapi/downstairs-repair.json @@ -0,0 +1,132 @@ +{ + "openapi": "3.0.3", + "info": { + "title": "downstairs-repair", + "version": "1" + }, + "paths": { + "/extent/{eid}/files": { + "get": { + "summary": "Get the list of files related to an extent.", + "description": "For a given extent, return a vec of strings representing the names of the files that exist for that extent.", + "operationId": "get_files_for_extent", + "parameters": [ + { + "in": "path", + "name": "eid", + "required": true, + "schema": { + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "style": "simple" + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Array_of_String", + "type": "array", + "items": { + "type": "string" + } + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, + "/newextent/{eid}/{fileType}": { + "get": { + "operationId": "get_extent_file", + "parameters": [ + { + "in": "path", + "name": "eid", + "required": true, + "schema": { + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "style": "simple" + }, + { + "in": "path", + "name": "fileType", + "required": true, + "schema": { + "$ref": "#/components/schemas/FileType" + }, + "style": "simple" + } + ], + "responses": { + "default": { + "description": "", + "content": { + "*/*": { + "schema": {} + } + } + } + } + } + } + }, + "components": { + "responses": { + "Error": { + "description": "Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/Error" + } + } + } + } + }, + "schemas": { + "Error": { + "description": "Error information from a response.", + "type": "object", + "properties": { + "error_code": { + "type": "string" + }, + "message": { + "type": "string" + }, + "request_id": { + "type": "string" + } + }, + "required": [ + "message", + "request_id" + ] + }, + "FileType": { + "type": "string", + "enum": [ + "data", + "db", + "db-shm", + "db-wal" + ] + } + } + } +} \ No newline at end of file diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index a5db40e35..21744989e 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -1,5 +1,6 @@ // Copyright 2021 Oxide Computer Company use std::cmp::Ordering; +use std::net::SocketAddr; use anyhow::bail; use bytes::{Buf, BufMut, BytesMut}; @@ -148,16 +149,22 @@ pub enum Message { */ /// Send a close the given extent ID on the downstairs. /// We send the downstairs the repair ID (rep_id) and the extent number. - ExtentClose(u64, u64), - /// Ack the close of an extent from the downstairs using the rep_id. - ExtentCloseAck(u64), + ExtentClose(u64, usize), /// Send a request (with rep_id) to reopen the given extent. - ExtentReopen(u64, u64), + ExtentReopen(u64, usize), /// Ack the Re-Open of an extent from the downstairs using the rep_id. - ExtentReopenAck(u64), + /// Flush just this extent on just this downstairs client. + /// rep_id, extent ID, downstairs client ID, flush number, gen number. + ExtentFlush(u64, usize, u8, u64, u64), + /// Replace an extent with data from the given downstairs. + /// rep_id, extent ID, source extent, Vec of extents to repair + ExtentRepair(u64, usize, u8, SocketAddr, Vec), + + /// The given repair job ID has finished without error + RepairAckId(u64), /// A problem with the given extent - ExtentError(u64, u64, CrucibleError), + ExtentError(u64, usize, CrucibleError), /* * Metadata exchange */ diff --git a/repair-client/Cargo.toml b/repair-client/Cargo.toml new file mode 100644 index 000000000..27b8e8b86 --- /dev/null +++ b/repair-client/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "repair-client" +version = "0.0.1" +license = "MPL-2.0" +edition = "2018" + +[dependencies] +anyhow = "1.0" +chrono = { version = "0.4", features = [ "serde" ] } +percent-encoding = "2.1" +progenitor = { git = "https://github.com/oxidecomputer/progenitor", branch = "main" } +reqwest = { version = "0.11", features = ["json", "stream"] } +schemars = "0.8" +serde_json = "1.0" + +[dependencies.serde] +version = "1.0" +features = [ "derive" ] diff --git a/repair-client/src/lib.rs b/repair-client/src/lib.rs new file mode 100644 index 000000000..9fb3220ff --- /dev/null +++ b/repair-client/src/lib.rs @@ -0,0 +1,8 @@ +// Copyright 2022 Oxide Computer Company + +use progenitor::generate_api; + +generate_api!( + spec = "../openapi/downstairs-repair.json", + derives = [schemars::JsonSchema], +); diff --git a/tools/create-generic-ds.sh b/tools/create-generic-ds.sh index a33e382fb..c232fa0fc 100755 --- a/tools/create-generic-ds.sh +++ b/tools/create-generic-ds.sh @@ -38,7 +38,12 @@ while getopts 'b:c:des:' opt; do s) extent_size=$OPTARG echo "Using extent size $extent_size" ;; - *) echo "Usage: $0 [de] [-c extent_count] [-s extent_size]" >&2 + *) echo "Usage: $0 [de] [-b #] [-c #] [-s #]" >&2 + echo " -b block_size Block size for the region" >&2 + echo " -c extent_count Total number of extent files" >&2 + echo " -d Delete existing var/88* direcories" >&2 + echo " -e Require encryption on the volume" >&2 + echo " -s extent_size Number of extents per extent file" >&2 exit 1 ;; esac diff --git a/tools/test_repair.sh b/tools/test_repair.sh new file mode 100755 index 000000000..b80a02ed2 --- /dev/null +++ b/tools/test_repair.sh @@ -0,0 +1,99 @@ +#!/usr/bin/env bash + +# A test to break, then Repair a downstairs region that is out of sync with +# the other regions. +# This assumes you already have regions created. + +cargo build || echo "Failed to build" + +cds="./target/debug/crucible-downstairs" +cc="./target/debug/crucible-client" +if [[ ! -f ${cds} ]] || [[ ! -f ${cc} ]]; then + echo "Can't find crucible binaries at $cds or $cc" + exit 1 +fi + +# start all three downstairs +${cds} run -d var/8810 -p 8810 &> /tmp/ds1 & +ds1_pid=$! +${cds} run -d var/8820 -p 8820 &> /tmp/ds2 & +ds2_pid=$! +${cds} run -d var/8830 -p 8830 &> /tmp/ds3 & +ds3_pid=$! + +trap ctrl_c INT +function ctrl_c() { + echo "Stopping at your request" + cleanup +} + +function cleanup() { + kill "$ds1_pid" 2> /dev/null + kill "$ds2_pid" 2> /dev/null + kill "$ds3_pid" 2> /dev/null + kill "$slow_pid" 2> /dev/null +} + +verify_file=/tmp/repair_test_verify.data + +# Change these to pick which downstairs will be the one out of sync. +slow_port=8820 +slow_log=/tmp/ds2 +slow_pid=$ds2_pid + +target_args="-t 127.0.0.1:8810 -t 127.0.0.1:8820 -t 127.0.0.1:8830" +# Do initial volume population. +if ! ${cc} fill ${target_args} --verify-out "$verify_file" -q +then + echo "Exit on initial fill" + cleanup + exit 1 +fi + +# Stop a downstairs, we will restart with lossy in the loop +kill "$slow_pid" + +# Start loop +for (( i = 0; i < 300; i += 1 )); do + + # restart downstairs with lossy + ${cds} run -d var/"${slow_port}" -p "${slow_port}" --lossy &> "${slow_log}" & + slow_pid=$! + + if ! ${cc} repair ${target_args} --verify-out "$verify_file" --verify-in "$verify_file" -c 30 + then + echo "Exit on repair fail" + cleanup + exit 1 + fi + + echo "" + # Stop --lossy downstairs + kill "$slow_pid" + sleep 2 + + # Did we get any mismatches? + ${cds} dump -d var/8810 -d var/8820 -d var/8830 -o + echo "On loop $i" + + sleep 2 + echo "" + # Start downstairs without lossy + ${cds} run -d var/"$slow_port" -p "$slow_port" &> "$slow_log" & + slow_pid=$! + + echo "Verifying data now" + if ! ${cc} verify ${target_args} --verify-out "$verify_file" --verify-in "$verify_file" -q > /tmp/verify_out + then + echo "Exit on verify fail" + echo "Check /tmp/verify_out for details" + cleanup + exit 1 + fi + + # stop a downstairs + kill "$slow_pid" +done + +echo "Tests all done at $(date)" +cleanup diff --git a/tools/test_up.sh b/tools/test_up.sh index 6eac748d7..4fa37407d 100755 --- a/tools/test_up.sh +++ b/tools/test_up.sh @@ -84,7 +84,7 @@ for (( i = 0; i < 3; i++ )); do done res=0 -test_list="one span big dep deactivate balloon" +test_list="span big dep deactivate balloon" for tt in ${test_list}; do echo "" echo "Running test: $tt" @@ -100,13 +100,37 @@ for tt in ${test_list}; do fi done +echo "Running hammer" +if ! time cargo run -p crucible-hammer -- \ + "${args[@]}"; then + + echo "Failed hammer test" + echo "Failed hammer test" >> /tmp/test_fail.txt + (( res += 1 )) +fi + # Repair test +# This one goes last because it modified the args variable. +# We also test the --verify-* args here as well. +args+=( --verify-out "${testdir}/verify_file" ) +echo "$cc" fill -q "${args[@]}" +if ! "$cc" fill -q "${args[@]}"; then + (( res += 1 )) + echo "" + echo "Failed setup repair test" + echo "Failed setup repair test" >> /tmp/test_fail.txt + echo +else + echo "Repair setup passed" +fi + echo "Copy the $port file" echo cp -r "${testdir}/${port}" "${testdir}/previous" cp -r "${testdir}/${port}" "${testdir}/previous" -echo "$cc" "$tt" -q "${args[@]}" -if ! "$cc" one -q "${args[@]}"; then +args+=( --verify-in "${testdir}/verify_file" ) +echo "$cc" repair -q "${args[@]}" +if ! "$cc" repair -q "${args[@]}"; then (( res += 1 )) echo "" echo "Failed repair test part 1" @@ -136,7 +160,7 @@ downstairs[4]=$! echo "" echo "" echo "$cc" "$tt" -q "${args[@]}" -if ! "$cc" one -q "${args[@]}"; then +if ! "$cc" verify -q "${args[@]}"; then (( res += 1 )) echo "" echo "Failed repair test part 2" @@ -146,38 +170,6 @@ else echo "Repair part 2 passed" fi -echo "Running hammer" -if ! time cargo run -p crucible-hammer -- \ - "${args[@]}"; then - - echo "Failed hammer test" - echo "Failed hammer test" >> /tmp/test_fail.txt - (( res += 1 )) -fi - -echo "" -echo "Running verify test: $tt" -vfile="${testdir}/verify" -echo "$cc" rand -q --verify-out "$vfile" "${args[@]}" -if ! "$cc" rand -q --verify-out "$vfile" "${args[@]}"; then - (( res += 1 )) - echo "" - echo "Failed crucible-client rand verify test" - echo "Failed crucible-client rand verify test" >> /tmp/test_fail.txt - echo "" -else - echo "$cc" rand -q --verify-in "$vfile" "${args[@]}" - if ! "$cc" rand -q --verify-in "$vfile" "${args[@]}"; then - (( res += 1 )) - echo "" - echo "Failed crucible-client rand verify part 2 test" - echo "Failed crucible-client rand verify part 2 test" >> /tmp/test_fail.txt - echo "" - else - echo "Verify test passed" - fi -fi - # The dump args look different than other downstairs commands args=() for (( i = 0; i < 30; i += 10 )); do diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index c6fa2a9dd..8a7aa530b 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -1255,20 +1255,16 @@ where up_coms.client_id, expected_uuid ); } - - Some(Message::ExtentCloseAck(rep_id)) => { - up.ds_repair_done_notify( - up_coms.client_id, - rep_id, - &up_coms.ds_reconcile_done_tx, - ).await?; - } - Some(Message::ExtentReopenAck(rep_id)) => { - up.ds_repair_done_notify( - up_coms.client_id, - rep_id, - &up_coms.ds_reconcile_done_tx, - ).await?; + Some(Message::RepairAckId(rep_id)) => { + if up.downstairs.lock().unwrap().rep_done( + up_coms.client_id, rep_id + ) { + up.ds_repair_done_notify( + up_coms.client_id, + rep_id, + &up_coms.ds_reconcile_done_tx, + ).await?; + } } Some(m) => { println!( @@ -1286,13 +1282,51 @@ where */ println!("[{}] received reconcile message", up_coms.client_id); - let job = up - .downstairs - .lock().unwrap().rep_in_progress(up_coms.client_id); + /* + * We use rep_done to indicate this was job where our client + * did not have any actual work to send to the downstairs. + * It indicates that, we don't need a response from the + * downstairs and can go ahead and mark this rep_id as + * completed for this client and move forward. + */ + let mut rep_done = None; + let job = up. + downstairs + .lock() + .unwrap() + .rep_in_progress(up_coms.client_id); match job { Some(op) => { - println!("[{}] submit {:?}", up_coms.client_id, op); - fw.send(op).await?; + println!("[{}] client {:?}", up_coms.client_id, op); + /* + * If there is work to do, check to see if it is + * a repair job. If so, only send that to the actual + * clients that need to get it. The source downstairs + * does not get a message for this operation. + * + * If the work is an extent flush, then only send the + * message to the source extent, the other downstairs + * do not get a message. + */ + match op { + Message::ExtentRepair(rep_id, _, src, _, _) => { + if up_coms.client_id == src { + rep_done = Some(rep_id); + } else { + fw.send(op).await?; + } + }, + Message::ExtentFlush(rep_id, _, src, _, _) => { + if up_coms.client_id != src { + rep_done = Some(rep_id); + } else { + fw.send(op).await?; + } + }, + op => { + fw.send(op).await?; + } + } }, None => { /* @@ -1332,6 +1366,25 @@ where } } } + /* + * If rep_done is Some, it means this client had nothing + * to send to the downstairs, and we can go ahead and mark + * this rep_id as completed, which will trigger sending a + * notify if all other downstairs are also complete. + */ + if let Some(rep_id) = rep_done { + if up.downstairs.lock().unwrap().rep_done(up_coms.client_id, rep_id) { + println!("[{}] self notify as src for {}", + up_coms.client_id, + rep_id + ); + up.ds_repair_done_notify( + up_coms.client_id, + rep_id, + &up_coms.ds_reconcile_done_tx, + ).await?; + } + } } _ = sleep_until(timeout_deadline) => { bail!("[{}] Downstairs not responding, take offline", @@ -1672,6 +1725,7 @@ impl Downstairs { reconcile_repair_needed: 0, } } + /** * Assign a new downstairs ID. */ @@ -1792,7 +1846,7 @@ impl Downstairs { ); return None; } - + println!("[{}] rep_in_progress: return {:?}", client_id, job); Some(job.op.clone()) } else { None @@ -1840,22 +1894,40 @@ impl Downstairs { */ fn convert_rc_to_messages( &mut self, - mut rec_list: HashMap, + mut rec_list: HashMap, + max_flush: u64, + max_gen: u64, ) { let mut rep_id = 0; - println!("Full list: {:?}", rec_list); - for (ext, _) in rec_list.drain() { + println!("Full repair list: {:?}", rec_list); + for (ext, ef) in rec_list.drain() { /* - * XXX TODO: We are not repairing anything here yet. - * For every extent mismatch, we just close then re-open them. - * Coming soon will be the actual commands to send extents - * from one downstairs to another. + * For each extent needing repair, we put the following + * tasks on the reconcile task list. + * Flush (the source) extent with latest gen/flush#. + * Close extent (on all ds) + * Send repair command to bad extents + * Reopen extent. */ + self.reconcile_task_list.push_back(ReconcileIO::new( + rep_id, + Message::ExtentFlush( + rep_id, ext, ef.source, max_flush, max_gen, + ), + )); + rep_id += 1; self.reconcile_task_list.push_back(ReconcileIO::new( rep_id, Message::ExtentClose(rep_id, ext), )); rep_id += 1; + let repair = self.repair_addr(ef.source); + self.reconcile_task_list.push_back(ReconcileIO::new( + rep_id, + Message::ExtentRepair(rep_id, ext, ef.source, repair, ef.dest), + )); + rep_id += 1; + self.reconcile_task_list.push_back(ReconcileIO::new( rep_id, Message::ExtentReopen(rep_id, ext), @@ -2423,8 +2495,9 @@ impl Downstairs { }; self.downstairs_errors.insert(client_id, errors + 1); - + } else { // XXX We don't count read errors here. + println!("[{}] {} read error", client_id, ds_id); } } } @@ -3792,8 +3865,8 @@ impl Upstairs { } /* - * Check and see if the downstairs are ready for the next repair - * command. Send a message if so. + * Send a message that indicates the downstairs are ready for the + * next repair command. */ async fn ds_repair_done_notify( &self, @@ -3801,18 +3874,16 @@ impl Upstairs { rep_id: u64, ds_reconcile_done_tx: &mpsc::Sender, ) -> Result<()> { - if self.downstairs.lock().unwrap().rep_done(client_id, rep_id) { - println!("[{}] It's time to notify for {}", client_id, rep_id); - if let Err(e) = ds_reconcile_done_tx - .send(Repair { - repair: true, - client_id, - rep_id, - }) - .await - { - bail!("[{}] Failed to notify {} {:?}", client_id, rep_id, e); - } + println!("[{}] It's time to notify for {}", client_id, rep_id); + if let Err(e) = ds_reconcile_done_tx + .send(Repair { + repair: true, + client_id, + rep_id, + }) + .await + { + bail!("[{}] Failed to notify {} {:?}", client_id, rep_id, e); } Ok(()) } @@ -3974,7 +4045,7 @@ impl Upstairs { "Found {:?} extents that need repair", reconcile_list.mend.len() ); - ds.convert_rc_to_messages(reconcile_list.mend); + ds.convert_rc_to_messages(reconcile_list.mend, max_flush, max_gen); ds.reconcile_repair_needed = ds.reconcile_task_list.len(); true } else { @@ -4341,9 +4412,11 @@ impl Upstairs { } /* - * TODO: This should just store the region info for a downstairs at - * some place we can later look at and compare. For now, we do some - * of that work now. + * Store the downstairs UUID, or compare to what we stored before + * for a given client ID. Do a sanity check that this downstairs + * Region Definition matches the other downstairs. If we don't have + * any Region info yet, then use the provided RegionDefinition as + * the source to compare the other downstairs with. */ fn add_ds_region( &self, @@ -6327,7 +6400,51 @@ pub struct Arg { * to this task (in the ds_reconcile() function) that a downstairs has * completed a reconcile request. * - * This task drives any reconciliation if necessary. + * This task drives any reconciliation if necessary. If Repair is required, + * it happens in three phases. Typically an interruption of repair will + * result in things starting over, but if actual repair work to an extent + * is completed, that extent won't need to be repaired again. + * + * The three phases are: + * + * Collect: + * When a Downstairs connects, the Upstairs collects the gen/flush/dirty + * (GFD) info from all extents. This GFD information is stored and the + * Upstairs waits for all three Downstairs to attach. + * + * Compare: + * In the compare phase, the upstairs will walk the list of all extents + * and compare the G/F/D from each of the downstairs. When there is a + * mismatch between downstairs (The dirty bit counts as a mismatch and will + * force a repair even if generation and flush numbers agree). For each + * mismatch, the upstairs determines which downstairs has the extent that + * should be the source, and which of the other downstairs extents needs + * repair. This list of mismatches (source, destination(s)) is collected. + * Once an upstairs has compiled its repair list, it will then generates a + * sequence of Upstairs -> Downstairs repair commands to repair each + * extent that needs to be fixed. For a given piece of repair work, the + * commands are: + * - Send a flush to source extent. + * - Close extent on all downstairs. + * - Send repair command to destination extents (with source extent + * IP/Port). + * (See DS-DS Repair) + * - Reopen all extents. + * + * Repair: + * During repair Each command issued from the upstairs must be completed + * before the next will be sent. The Upstairs is responsible for walking + * the repair commands and sending them to the required downstairs, and + * waiting for them to finish. The actual repair work for an extent + * takes place on the downstairs being repaired. + * + * Repair (ds to ds) + * Each downstairs runs a repair server (Dropshot) that listens for + * repair requests from other downstairs. A downstairs with an extent + * that needs repair will contact the source downstairs and request the + * list of files for an extent, then request each file. Once all files + * are local to the downstairs needing repair, it will replace the existing + * extent files with the new ones. */ async fn up_listen( up: &Arc, diff --git a/upstairs/src/mend.rs b/upstairs/src/mend.rs index 6a305b764..bbd8288ba 100644 --- a/upstairs/src/mend.rs +++ b/upstairs/src/mend.rs @@ -34,7 +34,7 @@ pub struct ExtentFix { #[derive(Debug)] pub struct DownstairsMend { // Index by extent ID - pub mend: HashMap, + pub mend: HashMap, } impl DownstairsMend { @@ -108,7 +108,7 @@ impl DownstairsMend { if *dirty0 || c1.dirty[i] || c2.dirty[i] { println!("Extents {} dirty", i); let ef = make_repair_list(i, c0, c1, c2); - dsm.mend.insert(i as u64, ef); + dsm.mend.insert(i, ef); } else { to_check.push(i as usize); } @@ -126,7 +126,7 @@ impl DownstairsMend { { println!("Extent {} has flush number mismatch", i); let ef = make_repair_list(*i, c0, c1, c2); - dsm.mend.insert(*i as u64, ef); + dsm.mend.insert(*i, ef); } else { second_check.push(*i); } @@ -142,7 +142,7 @@ impl DownstairsMend { { println!("generation number mismatch {}", i); let ef = make_repair_list(*i, c0, c1, c2); - dsm.mend.insert(*i as u64, ef); + dsm.mend.insert(*i, ef); } } diff --git a/upstairs/src/test.rs b/upstairs/src/test.rs index aea854024..6b0f0bd7f 100644 --- a/upstairs/src/test.rs +++ b/upstairs/src/test.rs @@ -3668,20 +3668,200 @@ mod test { #[test] fn reconcile_rc_to_message() { - // Convert an extent fix to the crucible messages. - // TODO: This will grow as the protocol is finalized. + // Convert an extent fix to the crucible repair messages that + // are sent to the downstairs. Verify that the resulting + // messages are what we expect let up = Upstairs::default(); let mut ds = up.downstairs.lock().unwrap(); + let r0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 801); + let r1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 802); + let r2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 803); + ds.ds_repair.insert(0, r0.clone()); + ds.ds_repair.insert(1, r1); + ds.ds_repair.insert(2, r2); + + let repair_extent = 9; let mut rec_list = HashMap::new(); let ef = ExtentFix { source: 0, dest: vec![1, 2], }; - rec_list.insert(0, ef); - ds.convert_rc_to_messages(rec_list); - // TODO: When we finalize the message, update this test - // to expect more. - assert!(!ds.reconcile_task_list.is_empty()); + rec_list.insert(repair_extent, ef); + let max_flush = 22; + let max_gen = 33; + ds.convert_rc_to_messages(rec_list, max_flush, max_gen); + + // Walk the list and check for messages we expect to find + assert_eq!(ds.reconcile_task_list.len(), 4); + + // First task, flush + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 0); + match rio.op { + Message::ExtentFlush(rep_id, ext, source, mf, mg) => { + assert_eq!(rep_id, 0); + assert_eq!(ext, repair_extent); + assert_eq!(source, 0); + assert_eq!(mf, max_flush); + assert_eq!(mg, max_gen); + } + m => { + panic!("{:?} not ExtentFlush()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Second task, close extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 1); + match rio.op { + Message::ExtentClose(rep_id, ext) => { + assert_eq!(rep_id, 1); + assert_eq!(ext, repair_extent); + } + m => { + panic!("{:?} not ExtentClose()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Third task, repair extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 2); + match rio.op { + Message::ExtentRepair(rep_id, ext, source, repair, dest) => { + assert_eq!(rep_id, rio.id); + assert_eq!(ext, repair_extent); + assert_eq!(source, 0); + assert_eq!(repair, r0); + assert_eq!(dest, vec![1, 2]); + } + m => { + panic!("{:?} not ExtentRepair", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Third task, close extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 3); + match rio.op { + Message::ExtentReopen(rep_id, ext) => { + assert_eq!(rep_id, 3); + assert_eq!(ext, repair_extent); + } + m => { + panic!("{:?} not ExtentClose()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + } + + #[test] + fn reconcile_rc_to_message_two() { + // Convert another extent fix to the crucible repair messages that + // are sent to the downstairs. Verify that the resulting + // messages are what we expect + let up = Upstairs::default(); + let mut ds = up.downstairs.lock().unwrap(); + let r0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 801); + let r1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 802); + let r2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 803); + ds.ds_repair.insert(0, r0.clone()); + ds.ds_repair.insert(1, r1); + ds.ds_repair.insert(2, r2); + + let repair_extent = 5; + let mut rec_list = HashMap::new(); + let ef = ExtentFix { + source: 2, + dest: vec![0, 1], + }; + rec_list.insert(repair_extent, ef); + let max_flush = 66; + let max_gen = 77; + ds.convert_rc_to_messages(rec_list, max_flush, max_gen); + + // Walk the list and check for messages we expect to find + assert_eq!(ds.reconcile_task_list.len(), 4); + + // First task, flush + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 0); + match rio.op { + Message::ExtentFlush(rep_id, ext, source, mf, mg) => { + assert_eq!(rep_id, 0); + assert_eq!(ext, repair_extent); + assert_eq!(source, 2); + assert_eq!(mf, max_flush); + assert_eq!(mg, max_gen); + } + m => { + panic!("{:?} not ExtentFlush()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Second task, close extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 1); + match rio.op { + Message::ExtentClose(rep_id, ext) => { + assert_eq!(rep_id, 1); + assert_eq!(ext, repair_extent); + } + m => { + panic!("{:?} not ExtentClose()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Third task, repair extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 2); + match rio.op { + Message::ExtentRepair(rep_id, ext, source, repair, dest) => { + assert_eq!(rep_id, rio.id); + assert_eq!(ext, repair_extent); + assert_eq!(source, 2); + assert_eq!(repair, r2); + assert_eq!(dest, vec![0, 1]); + } + m => { + panic!("{:?} not ExtentRepair", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Third task, close extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 3); + match rio.op { + Message::ExtentReopen(rep_id, ext) => { + assert_eq!(rep_id, 3); + assert_eq!(ext, repair_extent); + } + m => { + panic!("{:?} not ExtentClose()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); } #[test]