From d14fb98309f1748969fdaa6c5e8f1e947e4ad315 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Tue, 29 Mar 2022 18:02:12 -0700 Subject: [PATCH 1/7] Another piece of the reconciliation puzzle This covers the next piece of reconciliation where we actually copy extent files from one downstairs to another. Upstairs: Update the repair logic to now do more than just close and then re-open extents. The upstairs will now build the complete repair steps needed to repair an extent. This includes the logic about which downstairs should receive which repair commands. Protocol: New ExtentFlush to flush a specific extent. New ExtentRepair command for repairing a specific extent. Replaced a few specific ACKs with a generic repair ACK. Downstairs: Made repair mode always on, and starting at port + REPAIR_PORT_OFFSET. New repair-api command that can dump a json file for the repair-client API Added the logic to support the new repair messages. Downstairs region: The downstairs coordination of a repair live in this file New functions to get the names for the new repair directories and a bunch of tests around them. We can now get names for specific parts of the extent, the housing directory, the copy, replace, and completed directories, as well as provide an optional file extension to the file name. During repair, we check for and remove any copy or completed directories. If there is a replay directory, then the extent open will do the replay and replace existing extent files, only if the open is read/write. New command to just flush a specific extent. Downstairs repair: This new file is the dropshot server for extent repair requests. It listens on the specified port and serves (anyone currently) the list of files for an extent, or one of the actual extent files. Repair-client: Generate a library using progenitor that is used by a downstairs to request repair extents from another downstairs. This library generation uses the json file that is output from the downstairs repair-api command. Tests: More tests for repair More tests for building repair work list. A New test that will in a loop: Create a workload that will produce downstairs out of sync Restart and repair the out of sync downstairs. Improved usage messages for create-generic-ds.sh --- Cargo.lock | 148 +++++- Cargo.toml | 1 + downstairs/Cargo.toml | 17 +- downstairs/src/main.rs | 89 +++- downstairs/src/region.rs | 819 ++++++++++++++++++++++++++++++++- downstairs/src/repair.rs | 385 ++++++++++++++++ openapi/README.md | 5 + openapi/downstairs-repair.json | 212 +++++++++ protocol/src/lib.rs | 19 +- repair-client/Cargo.toml | 18 + repair-client/src/lib.rs | 8 + tools/create-generic-ds.sh | 7 +- tools/test_repair.sh | 89 ++++ tools/test_up.sh | 64 ++- upstairs/src/lib.rs | 164 +++++-- upstairs/src/mend.rs | 8 +- upstairs/src/test.rs | 200 +++++++- 17 files changed, 2111 insertions(+), 142 deletions(-) create mode 100644 downstairs/src/repair.rs create mode 100644 openapi/downstairs-repair.json create mode 100644 repair-client/Cargo.toml create mode 100644 repair-client/src/lib.rs create mode 100755 tools/test_repair.sh diff --git a/Cargo.lock b/Cargo.lock index e158b0e2a..bba6a3610 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -517,7 +517,7 @@ dependencies = [ "anyhow", "chrono", "percent-encoding", - "progenitor", + "progenitor 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", "reqwest", "schemars", "serde", @@ -574,7 +574,7 @@ version = "0.0.1" dependencies = [ "anyhow", "percent-encoding", - "progenitor", + "progenitor 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", "reqwest", "schemars", "serde", @@ -596,6 +596,10 @@ dependencies = [ "futures", "futures-core", "hex", + "http", + "hyper", + "hyper-staticfile", + "mime_guess", "omicron-common", "opentelemetry 0.17.0", "opentelemetry-jaeger", @@ -603,10 +607,13 @@ dependencies = [ "oximeter-producer", "rand 0.8.5", "rand_chacha 0.3.1", + "repair-client", + "reqwest", "ringbuffer", "rusqlite", "schemars", "serde", + "serde_json", "sha2", "slog", "slog-async", @@ -1305,6 +1312,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21dec9db110f5f872ed9699c3ecf50cf16f423502706ba5c72462e28d3157573" + [[package]] name = "httparse" version = "1.6.0" @@ -1354,6 +1367,25 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-staticfile" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d5ff45ea721e295c400e4c65b1c855d43d329b8ae8ec12520ab1860a240debf" +dependencies = [ + "futures-util", + "http", + "http-range", + "httpdate", + "hyper", + "mime_guess", + "percent-encoding", + "rand 0.8.5", + "tokio", + "url", + "winapi", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1576,6 +1608,16 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.4.4" @@ -1654,11 +1696,11 @@ dependencies = [ [[package]] name = "nexus-client" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/omicron?branch=main#6a98020b9de0f0ae42fc2ac593e1576b7b7c21e8" +source = "git+https://github.com/oxidecomputer/omicron?branch=main#8a26d6992dafea6f5052083b2362d13b8e94dfa6" dependencies = [ "chrono", "omicron-common", - "progenitor", + "progenitor 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", "reqwest", "serde", "serde_json", @@ -1826,7 +1868,7 @@ dependencies = [ "ipnetwork", "macaddr", "parse-display", - "progenitor", + "progenitor 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", "rand 0.8.5", "reqwest", "ring", @@ -2326,6 +2368,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "progenitor" +version = "0.0.0" +source = "git+https://github.com/oxidecomputer/progenitor?branch=main#75348207d7405db3371a98b5348bef3fbd0ef79e" +dependencies = [ + "anyhow", + "getopts", + "openapiv3", + "progenitor-client 0.0.0 (git+https://github.com/oxidecomputer/progenitor?branch=main)", + "progenitor-impl 0.0.0 (git+https://github.com/oxidecomputer/progenitor?branch=main)", + "progenitor-macro 0.0.0 (git+https://github.com/oxidecomputer/progenitor?branch=main)", + "serde", + "serde_json", +] + [[package]] name = "progenitor" version = "0.0.0" @@ -2334,9 +2391,22 @@ dependencies = [ "anyhow", "getopts", "openapiv3", - "progenitor-client", - "progenitor-impl", - "progenitor-macro", + "progenitor-client 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", + "progenitor-impl 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", + "progenitor-macro 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", + "serde", + "serde_json", +] + +[[package]] +name = "progenitor-client" +version = "0.0.0" +source = "git+https://github.com/oxidecomputer/progenitor?branch=main#75348207d7405db3371a98b5348bef3fbd0ef79e" +dependencies = [ + "bytes", + "futures-core", + "percent-encoding", + "reqwest", "serde", "serde_json", ] @@ -2354,6 +2424,28 @@ dependencies = [ "serde_json", ] +[[package]] +name = "progenitor-impl" +version = "0.0.0" +source = "git+https://github.com/oxidecomputer/progenitor?branch=main#75348207d7405db3371a98b5348bef3fbd0ef79e" +dependencies = [ + "convert_case", + "getopts", + "indexmap", + "openapiv3", + "proc-macro2", + "quote", + "regex", + "rustfmt-wrapper", + "schemars", + "serde", + "serde_json", + "syn", + "thiserror", + "typify", + "unicode-xid", +] + [[package]] name = "progenitor-impl" version = "0.0.0" @@ -2376,6 +2468,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "progenitor-macro" +version = "0.0.0" +source = "git+https://github.com/oxidecomputer/progenitor?branch=main#75348207d7405db3371a98b5348bef3fbd0ef79e" +dependencies = [ + "openapiv3", + "proc-macro2", + "progenitor-impl 0.0.0 (git+https://github.com/oxidecomputer/progenitor?branch=main)", + "quote", + "serde", + "serde_json", + "serde_tokenstream", + "syn", +] + [[package]] name = "progenitor-macro" version = "0.0.0" @@ -2383,7 +2490,7 @@ source = "git+https://github.com/oxidecomputer/progenitor#b7bbb5cdffdfc4ddf5bc7c dependencies = [ "openapiv3", "proc-macro2", - "progenitor-impl", + "progenitor-impl 0.0.0 (git+https://github.com/oxidecomputer/progenitor)", "quote", "serde", "serde_json", @@ -2625,6 +2732,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "repair-client" +version = "0.0.1" +dependencies = [ + "anyhow", + "chrono", + "percent-encoding", + "progenitor 0.0.0 (git+https://github.com/oxidecomputer/progenitor?branch=main)", + "reqwest", + "schemars", + "serde", + "serde_json", +] + [[package]] name = "reqwest" version = "0.11.10" @@ -3774,6 +3895,15 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c" +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.7" diff --git a/Cargo.toml b/Cargo.toml index a2eadc8aa..97cd1ef63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "nbd_server", "package", "protocol", + "repair-client", "scope", "smf", "upstairs", diff --git a/downstairs/Cargo.toml b/downstairs/Cargo.toml index 7c9f4e3cf..f259a0462 100644 --- a/downstairs/Cargo.toml +++ b/downstairs/Cargo.toml @@ -14,18 +14,25 @@ crucible = { path = "../upstairs" } crucible-common = { path = "../common" } crucible-protocol = { path = "../protocol" } dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main" } -slog = { version = "2.7" } -slog-term = { version = "2.9" } -slog-async = { version = "2.7" } -schemars = { version = "0.8", features = [ "chrono", "uuid" ] } futures = "0.3" futures-core = "0.3" +http = "0.2.6" +hyper = { version = "0.14", features = [ "full" ] } +hyper-staticfile = "0.8" +mime_guess = "2.0.3" omicron-common = { git = "https://github.com/oxidecomputer/omicron", branch = "main" } oximeter-producer = { git = "https://github.com/oxidecomputer/omicron", branch = "main" } oximeter = { git = "https://github.com/oxidecomputer/omicron", branch = "main" } rand = "0.8.5" +repair-client = { path = "../repair-client" } +reqwest = { version = "0.11", features = ["json"] } ringbuffer = "0.8" +schemars = { version = "0.8.8", features = [ "uuid" ] } serde = { version = "1", features = ["derive"] } +serde_json = "1.0.79" +slog = { version = "2.7" } +slog-term = { version = "2.9" } +slog-async = { version = "2.7" } structopt = "0.3" tokio = { version = "1.17.0", features = ["full"] } tokio-util = { version = "0.7", features = ["codec"]} @@ -42,8 +49,8 @@ hex = "0.4" sha2 = "0.10" [dev-dependencies] -tempfile = "3" rand_chacha = "0.3.1" +tempfile = "3" [features] default = [] diff --git a/downstairs/src/main.rs b/downstairs/src/main.rs index 8fabf277d..4b6f72e61 100644 --- a/downstairs/src/main.rs +++ b/downstairs/src/main.rs @@ -19,6 +19,7 @@ use anyhow::{bail, Result}; use bytes::BytesMut; use futures::{SinkExt, StreamExt}; use rand::prelude::*; +use slog::Drain; use structopt::StructOpt; use tokio::net::TcpListener; use tokio::sync::mpsc; @@ -29,11 +30,10 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use uuid::Uuid; -use slog::Drain; - mod admin; mod dump; mod region; +mod repair; mod stats; use admin::run_dropshot; @@ -134,17 +134,16 @@ enum Args { skip: u64, }, Run { - #[structopt(short, long, default_value = "0.0.0.0")] + /// Address the downstairs will listen for the upstairs on. + #[structopt(short, long, default_value = "0.0.0.0", name = "ADDRESS")] address: IpAddr, + /// Directory where the region is located. #[structopt(short, long, parse(from_os_str), name = "DIRECTORY")] data: PathBuf, - /* - * Test option, makes the search for new work sleep and sometimes - * skip doing work. XXX Note that the flow control between upstairs - * and downstairs is not yet implemented. By turning on this option - * it's possible to deadlock. - */ + + /// Test option, makes the search for new work sleep and sometimes + /// skip doing work. #[structopt(long)] lossy: bool, @@ -152,9 +151,11 @@ enum Args { * If this option is provided along with the address:port of the * oximeter server, the downstairs will publish stats. */ - #[structopt(long)] + /// Use this address:port to send stats to an Oximeter server. + #[structopt(long, name = "OXIMETER_ADDRESS:PORT")] oximeter: Option, + /// Listen on this port for the upstairs to connect to us. #[structopt(short, long, default_value = "9000")] port: u16, @@ -175,6 +176,7 @@ enum Args { #[structopt(long, default_value = "rw")] mode: Mode, }, + RepairAPI, Serve { #[structopt(short, long)] trace_endpoint: Option, @@ -525,14 +527,34 @@ where d.add_work(*uuid, *ds_id, new_read).await?; Some(*ds_id) } + Message::ExtentFlush(rep_id, eid, _cid, flush_number, gen_number) => { + println!( + "{} Flush extent {} with f:{} g:{}", + rep_id, eid, flush_number, gen_number + ); + let msg = { + let d = ad.lock().await; + match d.region.region_flush_extent( + *eid, + *flush_number, + *gen_number, + ) { + Ok(()) => Message::RepairAckId(*rep_id), + Err(e) => Message::ExtentError(*rep_id, *eid, e), + } + }; + let mut fw = fw.lock().await; + fw.send(msg).await?; + return Ok(()); + } Message::ExtentClose(rep_id, eid) => { - println!("{} Close extent {}", rep_id, eid); + println!("{} Close extent {}", rep_id, eid); let msg = { let mut d = ad.lock().await; - match d.region.extents.get_mut(*eid as usize) { + match d.region.extents.get_mut(*eid) { Some(ext) => { ext.close()?; - Message::ExtentCloseAck(*rep_id) + Message::RepairAckId(*rep_id) } None => Message::ExtentError( *rep_id, @@ -545,12 +567,28 @@ where fw.send(msg).await?; return Ok(()); } + Message::ExtentRepair(rep_id, eid, sc, repair_addr, dest) => { + println!( + "{} Repair extent {} source:[{}] {:?} dest:{:?}", + rep_id, eid, sc, repair_addr, dest + ); + let msg = { + let mut d = ad.lock().await; + match d.region.repair_extent(*eid, *repair_addr).await { + Ok(()) => Message::RepairAckId(*rep_id), + Err(e) => Message::ExtentError(*rep_id, *eid, e), + } + }; + let mut fw = fw.lock().await; + fw.send(msg).await?; + return Ok(()); + } Message::ExtentReopen(rep_id, eid) => { println!("{} Reopen extent {}", rep_id, eid); let msg = { let mut d = ad.lock().await; - match d.region.reopen_extent(*eid as usize) { - Ok(()) => Message::ExtentReopenAck(*rep_id), + match d.region.reopen_extent(*eid) { + Ok(()) => Message::RepairAckId(*rep_id), Err(e) => Message::ExtentError(*rep_id, *eid, e), } }; @@ -1071,8 +1109,8 @@ where * downstairs work queue. */ #[derive(Debug)] -struct Downstairs { - region: Region, +pub struct Downstairs { + pub region: Region, work: Mutex, lossy: bool, // Test flag, enables pauses and skipped jobs return_errors: bool, // Test flag @@ -1800,6 +1838,10 @@ async fn main() -> Result<()> { ) .await } + Args::RepairAPI => { + let _ = repair::build_api(true); + Ok(()) + } Args::Serve { trace_endpoint, bind_addr, @@ -1920,6 +1962,18 @@ async fn start_downstairs( }); } + // Start the repair server on the same address at port + REPAIR_PORT_OFFSET + let rport = port + REPAIR_PORT_OFFSET; + let repair_address = match address { + IpAddr::V4(ipv4) => SocketAddr::new(std::net::IpAddr::V4(ipv4), rport), + IpAddr::V6(ipv6) => SocketAddr::new(std::net::IpAddr::V6(ipv6), rport), + }; + let dss = d.clone(); + tokio::spawn(async move { + let s = repair::repair_main(&dss, repair_address).await; + println!("Got {:?} from repair main", s); + }); + let listen_on = match address { IpAddr::V4(ipv4) => SocketAddr::new(std::net::IpAddr::V4(ipv4), port), IpAddr::V6(ipv6) => SocketAddr::new(std::net::IpAddr::V6(ipv6), port), @@ -1928,7 +1982,6 @@ async fn start_downstairs( /* * Establish a listen server on the port. */ - // let listen_on = SocketAddrV4::new(address, port); println!("Using address: {:?}", listen_on); let listener = TcpListener::bind(&listen_on).await?; diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index 2656a1a15..f50a56fa5 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -1,8 +1,9 @@ // Copyright 2021 Oxide Computer Company use std::collections::HashMap; use std::convert::TryInto; -use std::fs::{File, OpenOptions}; +use std::fs::{rename, File, OpenOptions}; use std::io::{Read, Seek, SeekFrom, Write}; +use std::net::SocketAddr; use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; use std::sync::{Mutex, MutexGuard}; @@ -10,8 +11,11 @@ use std::sync::{Mutex, MutexGuard}; use anyhow::{bail, Result}; use crucible_common::*; use crucible_protocol::{EncryptionContext, SnapshotDetails}; +use futures::TryStreamExt; +use repair_client::Client; use rusqlite::{params, Connection}; use serde::{Deserialize, Serialize}; +use tokio::macros::support::Pin; use tracing::instrument; #[derive(Debug)] @@ -380,7 +384,6 @@ pub struct ExtentMeta { * Increasing value provided from upstairs every time it connects to * a downstairs. Used to help break ties if flash numbers are the same * on extents. - * Not currently connected to anything XXX */ pub gen_number: u64, /** @@ -407,14 +410,76 @@ impl Default for ExtentMeta { } /** - * Produce a PathBuf that refers to the backing file for extent "number", - * anchored under "dir". + * Produce the string name of the data file for a given extent number + */ +pub fn extent_file_name(number: u32, extension: Option<&str>) -> String { + if let Some(extension) = extension { + format!("{:03X}.{}", number & 0xFFF, extension) + } else { + format!("{:03X}", number & 0xFFF) + } +} + +/** + * Produce a PathBuf that refers to the containing directory for extent + * "number", anchored under "dir". */ -fn extent_path>(dir: P, number: u32) -> PathBuf { +pub fn extent_dir>(dir: P, number: u32) -> PathBuf { let mut out = dir.as_ref().to_path_buf(); out.push(format!("{:02X}", (number >> 24) & 0xFF)); out.push(format!("{:03X}", (number >> 12) & 0xFFF)); - out.push(format!("{:03X}", number & 0xFFF)); + out +} + +/** + * Produce a PathBuf that refers to the backing file for extent "number", + * anchored under "dir". + */ +pub fn extent_path>(dir: P, number: u32) -> PathBuf { + let mut out = extent_dir(dir, number); + out.push(extent_file_name(number, None)); + out +} + +/** + * Produce a PathBuf that refers to the copy directory we create for + * a given extent "number", This directory will hold the files we + * transfer from the source downstairs. + * anchored under "dir". + */ +pub fn copy_dir>(dir: P, number: u32) -> PathBuf { + let mut out = extent_dir(dir, number); + out.push(extent_file_name(number, None)); + out.set_extension("copy".to_string()); + out +} + +/** + * Produce a PathBuf that refers to the replace directory we use for + * a given extent "number". This directory is generated (see below) when + * all the files needed for repairing an extent have been added to the + * copy directory and we are ready to start replacing the extents files. + * anchored under "dir". The actual generation of this directory will + * be done by renaming the copy directory. + */ +pub fn replace_dir>(dir: P, number: u32) -> PathBuf { + let mut out = extent_dir(dir, number); + out.push(extent_file_name(number, None)); + out.set_extension("replace".to_string()); + out +} +/** + * Produce a PathBuf that refers to the completed directory we use for + * a given extent "number". This directory is generated (see below) when + * all the files needed for repairing an extent have been copied to their + * final destination and everything has been flushed. + * The actual generation of this directory will be done by renaming the + * replace directory. + */ +pub fn completed_dir>(dir: P, number: u32) -> PathBuf { + let mut out = extent_dir(dir, number); + out.push(extent_file_name(number, None)); + out.set_extension("completed".to_string()); out } @@ -424,6 +489,23 @@ fn config_path>(dir: P) -> PathBuf { out } +/** + * Remove directories associated with repair expect for the replace + * directory. Replace is handled specifically during extent open. + */ +pub fn remove_copy_cleanup_dir>(dir: P, eid: u32) -> Result<()> { + let mut remove_dirs = vec![copy_dir(&dir, eid)]; + remove_dirs.push(completed_dir(&dir, eid)); + + for d in remove_dirs { + if Path::new(&d).exists() { + println!("Deleting dir: {:?}", d); + std::fs::remove_dir_all(&d)?; + } + } + Ok(()) +} + impl Extent { /** * Open an existing extent file at the location requested. @@ -439,11 +521,25 @@ impl Extent { * Store extent data in files within a directory hierarchy so that * there are not too many files in any level of that hierarchy. */ - let mut path = extent_path(dir, number); + let mut path = extent_path(&dir, number); let bcount = def.extent_size().value; let size = def.block_size().checked_mul(bcount).unwrap(); + remove_copy_cleanup_dir(&dir, number)?; + + // If the replace directory exists for this extent, then it means + // a repair was interrupted before it could finish. We will continue + // the repair before we open the extent. + let replace_dir = replace_dir(&dir, number); + if !read_only && Path::new(&replace_dir).exists() { + println!( + "Extent {} found replacement dir, finishing replacement", + number + ); + move_replacement_extent(&dir, number as usize)?; + } + /* * Open the extent file and verify the size is as we expect. */ @@ -489,7 +585,6 @@ impl Extent { */ pub fn close(&mut self) -> Result<()> { let inner = self.inner.as_ref().unwrap().lock().unwrap(); - println!("Close {} {:?}", self.number, inner); drop(inner); self.inner = None; Ok(()) @@ -509,7 +604,7 @@ impl Extent { * Store extent data in files within a directory hierarchy so that * there are not too many files in any level of that hierarchy. */ - let mut path = extent_path(dir, number); + let mut path = extent_path(&dir, number); /* * Verify there are not existing extent files. @@ -517,6 +612,7 @@ impl Extent { if Path::new(&path).exists() { bail!("Extent file already exists {:?}", path); } + remove_copy_cleanup_dir(&dir, number)?; let bcount = def.extent_size().value; let size = def.block_size().checked_mul(bcount).unwrap(); @@ -604,6 +700,52 @@ impl Extent { }) } + /** + * Create the copy directory for this extent. + */ + fn create_copy_dir>(&self, dir: P) -> Result { + let cp = copy_dir(dir, self.number); + + /* + * Verify the copy directory does not exist + */ + if Path::new(&cp).exists() { + panic!("Extent copy directory already exists {:?}", cp); + } + + println!("Create copy dir {:?}", cp); + std::fs::create_dir_all(&cp)?; + Ok(cp) + } + + /** + * Create the file that will hold a copy of an extent from a + * remote downstairs. + */ + fn create_copy_file( + &self, + mut copy_dir: PathBuf, + extension: Option<&str>, + ) -> Result { + let name = extent_file_name(self.number, None); + copy_dir.push(name); + if let Some(extension) = extension { + copy_dir.set_extension(extension); + } + let copy_path = copy_dir; + + if Path::new(©_path).exists() { + panic!("copy file already exists {:?}", copy_path); + } + + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(©_path)?; + Ok(file) + } + pub fn inner(&self) -> MutexGuard { self.inner.as_ref().unwrap().lock().unwrap() } @@ -808,7 +950,7 @@ extern "C" { */ #[derive(Debug)] pub struct Region { - dir: PathBuf, + pub dir: PathBuf, def: RegionDefinition, pub extents: Vec, read_only: bool, @@ -954,7 +1096,6 @@ impl Region { * - matches our eid * - is not read-only */ - println!("reopen extent {} {:?}", eid, self.extents[eid].inner); assert!(!self.extents[eid].inner.is_some()); assert_eq!(self.extents[eid].number, eid as u32); assert!(!self.read_only); @@ -965,6 +1106,162 @@ impl Region { Ok(()) } + /** + * Repair an extent from another downstairs + * + * We need to repair an extent in such a way that an interruption + * at any time can be recovered from. + * + * Let us assume we are repairing extent 012 + * 1. Make new 012.copy dir (extent name plus: .copy) + * 2. Get all extent files from source side, put in 021.copy directory + * 3. fsync files we just downloaded + * 4. Rename 012.copy dir to 012.replace dir + * 5. fsync extent directory ( 00/000/ where the extent files live) + * 6. Replace current extent 012 files with copied files of same name + * from 012.replace dir + * 7. Remove any files in extent dir that don't exist in replacing dir + * 8. fsync files after copying them (new location). + * 9. fsync containing extent dir + * 10. Rename 012.replace dir to 012.completed dir. + * 11. fsync extent dir again (so dir rename is persisted) + * 12. Delete completed dir. + * + * This also requires the following behavior on Extent open: + * A. If xxx.Copy directory found, delete it. + * B. If xxx.Completed directory found, delete it. + * C. If xxx.Replace dir found start at step 4 above and continue + * on through 6. + * D. Only then, open extent. + */ + pub async fn repair_extent( + &mut self, + eid: usize, + repair_addr: SocketAddr, + ) -> Result<(), CrucibleError> { + // Make sure the extent: + // is currently closed, matches our eid, is not read-only + assert!(self.extents[eid].inner.is_none()); + assert_eq!(self.extents[eid].number, eid as u32); + assert!(!self.read_only); + + self.get_extent_copy(eid, repair_addr).await?; + + // Returning from get_extent_copy means we have copied all our + // files and moved the copy directory to replace directory. + // Now, replace the current extent files with the replacement ones. + move_replacement_extent(&self.dir, eid)?; + + Ok(()) + } + + /** + * Connect to the source and pull over all the extent files for the + * given extent ID. + * The files are loaded into the copy_dir for the given extent. + * After all the files have been copied locally, we rename the + * copy_dir to replace_dir. + */ + pub async fn get_extent_copy( + &mut self, + eid: usize, + repair_addr: SocketAddr, + ) -> Result<(), CrucibleError> { + assert!(self.extents[eid].inner.is_none()); + // Make sure copy, replace, cleanup dirs don't exist yet. + // We don't need them yet, but if they exist, then something + // is wrong. + let rd = replace_dir(&self.dir, eid as u32); + if rd.exists() { + crucible_bail!( + IoError, + "Replace directory: {:?} already exists", + rd, + ); + } + + let extent = &self.extents[eid]; + let copy_dir = extent.create_copy_dir(&self.dir)?; + + // XXX TLS someday? Authentication? + let url = format!("http://{:?}", repair_addr); + let repair_server = Client::new(&url); + + let repair_files = repair_server + .get_files_for_extent(eid as u32) + .await + .unwrap() + .into_inner() + .files; + + println!("Found repair files: {:?}", repair_files); + + // The repair file list should always contain the extent data + // file itself, and the .db file (metadata) for that extent. + // Missing these means the repair will not succeed. + let filename = extent_file_name(eid as u32, None); + if !repair_files.contains(&filename) { + // XXX Panic now, but this will eventually bail and abort + // the repair and let the upper layers handle it. + panic!("Repair file list missing data file {}", filename); + } + let extent_copy = + extent.create_copy_file(copy_dir.clone(), None).unwrap(); + let repair_stream = repair_server.get_extent(eid as u32).await.unwrap(); + save_stream_to_file(extent_copy, repair_stream.into_inner()).await?; + + let filename = extent_file_name(eid as u32, Some("db")); + if !repair_files.contains(&filename) { + // XXX Panic now, but this will eventually bail and abort + // the repair and let the upper layers handle it. + panic!("Repair file list missing db file {}", filename); + } + let extent_db = extent + .create_copy_file(copy_dir.clone(), Some("db")) + .unwrap(); + let repair_stream = repair_server.get_db(eid as u32).await.unwrap(); + save_stream_to_file(extent_db, repair_stream.into_inner()).await?; + + // These next two are optional. + let filename = extent_file_name(eid as u32, Some("db-shm")); + if repair_files.contains(&filename) { + let extent_shm = extent + .create_copy_file(copy_dir.clone(), Some("db-shm")) + .unwrap(); + let repair_stream = + repair_server.get_shm(eid as u32).await.unwrap(); + save_stream_to_file(extent_shm, repair_stream.into_inner()).await?; + } else if repair_files.len() != 2 { + panic!("Unknown files on repair list: {:?}", repair_files); + } + + let filename = extent_file_name(eid as u32, Some("db-wal")); + if repair_files.contains(&filename) { + let extent_wal = extent + .create_copy_file(copy_dir.clone(), Some("db-wal")) + .unwrap(); + let repair_stream = + repair_server.get_wal(eid as u32).await.unwrap(); + save_stream_to_file(extent_wal, repair_stream.into_inner()).await?; + } + if repair_files.len() > 4 { + panic!("Unknown extra files on repair list: {:?}", repair_files); + } + + // After we have all files: move the repair dir. + println!( + "Repair files downloaded, move directory {:?} to {:?}", + copy_dir, rd + ); + + // XXX fsync the parent directory (the extent dir) + rename(copy_dir.clone(), rd.clone())?; + + // XXX fsync the parent directory (the extent dir) + + Ok(()) + } + /** * if there is a difference between what our actual extent_count is * and what is requested, go out and create the new extent files. @@ -1141,6 +1438,28 @@ impl Region { Ok(responses) } + /* + * Send a flush to just the given extent. The provided flush number is + * what an extent should use if a flush is required. + */ + #[instrument] + pub fn region_flush_extent( + &self, + eid: usize, + flush_number: u64, + gen_number: u64, + ) -> Result<(), CrucibleError> { + println!( + "Flush just extent {} with f:{} and g:{}", + eid, flush_number, gen_number + ); + + let extent = &self.extents[eid]; + extent.flush_block(flush_number, gen_number)?; + + Ok(()) + } + /* * Send a flush to all extents. The provided flush number is * what an extent should use if a flush is required. @@ -1229,12 +1548,137 @@ impl Region { } } +/** + * Copy the contents of the replacement directory on to the extent + * files in the extent directory. + */ +pub fn move_replacement_extent>( + region_dir: P, + eid: usize, +) -> Result<(), CrucibleError> { + let destination_dir = extent_dir(®ion_dir, eid as u32); + let extent_file_name = extent_file_name(eid as u32, None); + let replace_dir = replace_dir(®ion_dir, eid as u32); + let completed_dir = completed_dir(®ion_dir, eid as u32); + + // XXX replace panic with CrucibleError + assert!(Path::new(&replace_dir).exists()); + assert!(!Path::new(&completed_dir).exists()); + + println!("Move stuff from {:?} in {:?}", replace_dir, destination_dir,); + + // Setup the original and replacement file names. + let mut new_file = replace_dir.clone(); + new_file.push(extent_file_name.clone()); + + let mut original_file = destination_dir; + original_file.push(extent_file_name); + + // Copy the new file (the one we copied from the source side) on top + // of the original file. + if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { + panic!("copy of {:?} to {:?} got: {:?}", new_file, original_file, e); + } + // XXX fsync original_file + + new_file.set_extension("db"); + original_file.set_extension("db"); + if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { + panic!("copy {:?} to {:?} got: {:?}", new_file, original_file, e); + } + // XXX fsync original_file + + // The .db-shm and .db-wal files may or may not exist. If they don't + // exist on the source side, then be sure to remove them locally to + // avoid database corruption from a mismatch between old and new. + new_file.set_extension("db-shm"); + original_file.set_extension("db-shm"); + if new_file.exists() { + if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { + panic!("copy {:?} to {:?} got: {:?}", new_file, original_file, e); + } + // XXX fsync original_file + } else { + println!( + "Remove old file {:?} as there is no replacement", + original_file.clone() + ); + if original_file.exists() { + std::fs::remove_file(&original_file).unwrap(); + } + } + + new_file.set_extension("db-wal"); + original_file.set_extension("db-wal"); + if new_file.exists() { + if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { + panic!("copy {:?} to {:?} got: {:?}", new_file, original_file, e); + } + // XXX fsync original_file + } else { + println!( + "Remove old file {:?} as there is no replacement", + original_file.clone() + ); + if original_file.exists() { + std::fs::remove_file(&original_file).unwrap(); + } + } + + // XXX fsync destination_dir; + + // After we have all files: move the copy dir. + println!("Move directory {:?} to {:?}", replace_dir, completed_dir); + rename(replace_dir, &completed_dir)?; + + // XXX fsync the parent directory (the extent dir) + + std::fs::remove_dir_all(&completed_dir)?; + + // XXX fsync parent dir one more time + Ok(()) +} + +/** + * Given: + * The stream returned to us from the progenitor endpoint + * A local File, already created and opened, + * Stream the data from the endpoint into the file. + * When the stream is completed, fsync the file. + */ +pub async fn save_stream_to_file( + mut file: File, + mut stream: Pin< + Box< + dyn futures::Stream< + Item = std::result::Result, + > + std::marker::Send, + >, + >, +) -> Result<(), CrucibleError> { + loop { + match stream.try_next().await { + Ok(Some(bytes)) => { + file.write_all(&bytes).unwrap(); + } + Ok(None) => break, + Err(e) => panic!("extent stream {}", e), + } + } + if unsafe { fsync(file.as_raw_fd()) } == -1 { + let e = std::io::Error::last_os_error(); + crucible_bail!(IoError, "repair {:?}: fsync failure: {:?}", file, e); + } + Ok(()) +} + #[cfg(test)] mod test { use super::*; use crate::dump::dump_region; use bytes::{BufMut, BytesMut}; use rand::{Rng, RngCore}; + use std::fs::rename; use std::path::PathBuf; use tempfile::tempdir; use uuid::Uuid; @@ -1280,6 +1724,43 @@ mod test { region_options } + #[test] + fn copy_extent_dir() -> Result<()> { + // Create the region, make three extents + // Create the copy directory, make sure it exists. + // Remove the copy directory, make sure it goes away. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + let ext_one = &mut region.extents[1]; + let cp = copy_dir(&dir, 1); + + assert!(ext_one.create_copy_dir(&dir).is_ok()); + assert!(Path::new(&cp).exists()); + assert!(remove_copy_cleanup_dir(&dir, 1).is_ok()); + assert!(!Path::new(&cp).exists()); + Ok(()) + } + + #[test] + fn copy_extent_dir_twice() -> () { + // Create the region, make three extents + // Create the copy directory, make sure it exists. + // Verify a second create will fail. + // Catch the panic just from the final step, all others should pass. + let dir = tempdir().unwrap(); + let mut region = Region::create(&dir, new_region_options()).unwrap(); + region.extend(3).unwrap(); + + let ext_one = &mut region.extents[1]; + let _ = ext_one.create_copy_dir(&dir); + let result = + std::panic::catch_unwind(|| ext_one.create_copy_dir(&dir).unwrap()); + assert!(result.is_err()); + () + } + #[test] fn close_extent() -> Result<()> { // Create the region, make three extents @@ -1292,6 +1773,8 @@ mod test { ext_one.close()?; assert!(!ext_one.inner.is_some()); + // Make copy directory for this extent + let cp = ext_one.create_copy_dir(&dir)?; // Reopen extent 1 region.reopen_extent(1)?; @@ -1302,6 +1785,268 @@ mod test { // Make sure the eid matches assert_eq!(ext_one.number, 1); + // Make sure the copy directory is gone + assert!(!Path::new(&cp).exists()); + + Ok(()) + } + + #[test] + fn reopen_extent_cleanup_one() -> Result<()> { + // Verify the copy directory is removed if an extent is + // opened with that directory present. + // Create the region, make three extents + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Close extent 1 + let ext_one = &mut region.extents[1]; + ext_one.close()?; + assert!(!ext_one.inner.is_some()); + + // Make copy directory for this extent + let cp = ext_one.create_copy_dir(&dir)?; + + // Reopen extent 1 + region.reopen_extent(1)?; + + // Verify extent one is valid + let ext_one = &mut region.extents[1]; + assert!(ext_one.inner.is_some()); + + // Make sure copy directory was removed + assert!(!Path::new(&cp).exists()); + + Ok(()) + } + + #[test] + fn reopen_extent_cleanup_two() -> Result<()> { + // Verify that the completed directory is removed if present + // when an extent is re-opened. + // Create the region, make three extents + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Close extent 1 + let ext_one = &mut region.extents[1]; + ext_one.close()?; + assert!(!ext_one.inner.is_some()); + + // Make copy directory for this extent + let cp = ext_one.create_copy_dir(&dir)?; + + // Step through the replacement dir, but don't do any work. + let rd = replace_dir(&dir, 1); + rename(cp.clone(), rd.clone())?; + + // Finish up the fake repair, but leave behind the completed dir. + let cd = completed_dir(&dir, 1); + rename(rd.clone(), cd.clone())?; + + // Reopen extent 1 + region.reopen_extent(1)?; + + // Verify extent one is valid + let ext_one = &mut region.extents[1]; + assert!(ext_one.inner.is_some()); + + // Make sure all repair directories are gone + assert!(!Path::new(&cp).exists()); + assert!(!Path::new(&rd).exists()); + assert!(!Path::new(&cd).exists()); + + Ok(()) + } + #[test] + fn reopen_extent_cleanup_replay() -> Result<()> { + // Verify on extent open that a replacement directory will + // have it's contents replace an extents existing data and + // metadata files. + // Create the region, make three extents + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Close extent 1 + let ext_one = &mut region.extents[1]; + ext_one.close()?; + assert!(!ext_one.inner.is_some()); + + // Make copy directory for this extent + let cp = ext_one.create_copy_dir(&dir)?; + + // We are simulating the copy of files from the "source" repair + // extent by copying the files from extent zero into the copy + // directory. + let dest_name = extent_file_name(1, None); + let mut source_path = extent_path(&dir, 0); + let mut dest_path = cp.clone(); + dest_path.push(dest_name); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + source_path.set_extension("db"); + dest_path.set_extension("db"); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + source_path.set_extension("db-shm"); + dest_path.set_extension("db-shm"); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + source_path.set_extension("db-wal"); + dest_path.set_extension("db-wal"); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + let rd = replace_dir(&dir, 1); + rename(cp.clone(), rd.clone())?; + + // Now we have a replace directory, we verify that special + // action is taken when we (re)open the extent. + + // Reopen extent 1 + region.reopen_extent(1)?; + + let ext_one = &mut region.extents[1]; + assert!(ext_one.inner.is_some()); + + // Make sure all repair directories are gone + assert!(!Path::new(&cp).exists()); + assert!(!Path::new(&rd).exists()); + + // The reopen should have replayed the repair, renamed, then + // deleted this directory. + let cd = completed_dir(&dir, 1); + assert!(!Path::new(&cd).exists()); + + Ok(()) + } + + #[test] + fn reopen_extent_cleanup_replay_short() -> Result<()> { + // test move_replacement_extent(), create a copy dir, populate it + // and let the reopen do the work. This time we make sure our + // copy dir only has extent data and .db files, and not .db-shm + // nor .db-wal. Verify these files are delete from the original + // extent after the reopen has cleaned them up. + // Create the region, make three extents + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Close extent 1 + let ext_one = &mut region.extents[1]; + ext_one.close()?; + assert!(!ext_one.inner.is_some()); + + // Make copy directory for this extent + let cp = ext_one.create_copy_dir(&dir)?; + + // We are simulating the copy of files from the "source" repair + // extent by copying the files from extent zero into the copy + // directory. + let dest_name = extent_file_name(1, None); + let mut source_path = extent_path(&dir, 0); + let mut dest_path = cp.clone(); + dest_path.push(dest_name); + println!("cp {:?} to {:?}", source_path, dest_path); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + source_path.set_extension("db"); + dest_path.set_extension("db"); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + let rd = replace_dir(&dir, 1); + rename(cp.clone(), rd.clone())?; + + // The close may remove the db-shm and db-wal files, manually + // create them here, just to verify they are removed after the + // reopen as they are not included in the files to be recovered + // and this test exists to verify they will be deleted. + let mut invalid_db = extent_path(&dir, 1); + invalid_db.set_extension("db-shm"); + println!("Recreate {:?}", invalid_db); + std::fs::copy(source_path.clone(), invalid_db.clone())?; + assert!(Path::new(&invalid_db).exists()); + + invalid_db.set_extension("db-wal"); + println!("Recreate {:?}", invalid_db); + std::fs::copy(source_path.clone(), invalid_db.clone())?; + assert!(Path::new(&invalid_db).exists()); + + // Now we have a replace directory populated and our files to + // delete are ready. We verify that special action is taken + // when we (re)open the extent. + + // Reopen extent 1 + region.reopen_extent(1)?; + + // Make sure there is no longer a db-shm and db-wal + dest_path.set_extension("db-shm"); + assert!(!Path::new(&dest_path).exists()); + dest_path.set_extension("db-wal"); + assert!(!Path::new(&dest_path).exists()); + + let ext_one = &mut region.extents[1]; + assert!(ext_one.inner.is_some()); + + // Make sure all repair directories are gone + assert!(!Path::new(&cp).exists()); + assert!(!Path::new(&rd).exists()); + + // The reopen should have replayed the repair, renamed, then + // deleted this directory. + let cd = completed_dir(&dir, 1); + assert!(!Path::new(&cd).exists()); + + Ok(()) + } + + #[test] + fn reopen_extent_no_replay_readonly() -> Result<()> { + // Verify on a read-only region a replacement directory will + // be ignored. This is required by the dump command, as it would + // be tragic if the command to inspect a region changed that + // region's contents in the act of inspecting. + + // Create the region, make three extents + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Make copy directory for this extent + let ext_one = &mut region.extents[1]; + let cp = ext_one.create_copy_dir(&dir)?; + + // We are simulating the copy of files from the "source" repair + // extent by copying the files from extent zero into the copy + // directory. + let dest_name = extent_file_name(1, None); + let mut source_path = extent_path(&dir, 0); + let mut dest_path = cp.clone(); + dest_path.push(dest_name); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + source_path.set_extension("db"); + dest_path.set_extension("db"); + std::fs::copy(source_path.clone(), dest_path.clone())?; + + let rd = replace_dir(&dir, 1); + rename(cp.clone(), rd.clone())?; + + drop(region); + + // Open up the region read_only now. + let mut region = Region::open(&dir, new_region_options(), false, true)?; + + // Verify extent 1 has opened again. + let ext_one = &mut region.extents[1]; + assert!(ext_one.inner.is_some()); + + // Make sure repair directory is still present + assert!(Path::new(&rd).exists()); + Ok(()) } @@ -1429,6 +2174,51 @@ mod test { assert_eq!((), ext.check_input(Block::new_512(1), &data).unwrap()); } + #[test] + fn extent_name_basic() { + assert_eq!(extent_file_name(4, None), "004"); + } + #[test] + fn extent_name_basic_ext() { + assert_eq!(extent_file_name(4, Some("db")), "004.db"); + } + #[test] + fn extent_name_basic_two() { + assert_eq!(extent_file_name(10, None), "00A"); + } + #[test] + fn extent_name_basic_three() { + assert_eq!(extent_file_name(59, None), "03B"); + } + #[test] + fn extent_name_max() { + assert_eq!(extent_file_name(u32::MAX, None), "FFF"); + } + #[test] + fn extent_name_min() { + assert_eq!(extent_file_name(u32::MIN, None), "000"); + } + + #[test] + fn extent_dir_basic() { + assert_eq!(extent_dir("/var/region", 4), p("/var/region/00/000/")); + } + + #[test] + fn extent_dir_max() { + assert_eq!( + extent_dir("/var/region", u32::MAX), + p("/var/region/FF/FFF") + ); + } + #[test] + fn extent_dir_min() { + assert_eq!( + extent_dir("/var/region", u32::MIN), + p("/var/region/00/000/") + ); + } + #[test] fn extent_path_min() { assert_eq!( @@ -1436,6 +2226,13 @@ mod test { p("/var/region/00/000/000") ); } + #[test] + fn copy_path_basic() { + assert_eq!( + copy_dir("/var/region", 4), + p("/var/region/00/000/004.copy") + ); + } #[test] fn extent_path_three() { diff --git a/downstairs/src/repair.rs b/downstairs/src/repair.rs new file mode 100644 index 000000000..cc064c10e --- /dev/null +++ b/downstairs/src/repair.rs @@ -0,0 +1,385 @@ +// Copyright 2022 Oxide Computer Company +use std::path::PathBuf; +use std::sync::Arc; + +use dropshot::ApiDescription; +use dropshot::ConfigDropshot; +use dropshot::ConfigLogging; +use dropshot::ConfigLoggingLevel; +use dropshot::HttpError; +use dropshot::HttpResponseOk; +use dropshot::HttpServerStarter; +use dropshot::RequestContext; +use dropshot::{endpoint, Path}; +use http::{Response, StatusCode}; +use hyper::Body; +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; + +use super::*; +use crate::region::{extent_dir, extent_file_name, extent_path}; + +/** + * Our context is the root of the region we want to serve. + */ +pub struct FileServerContext { + region_dir: PathBuf, +} + +/** + * Build the API. If requested, dump it to stdout. + * This allows us to use the resulting output to build the client side. + */ +pub fn build_api( + show: bool, +) -> Result, String> { + let mut api = ApiDescription::new(); + api.register(get_extent).unwrap(); + api.register(get_db).unwrap(); + api.register(get_shm).unwrap(); + api.register(get_wal).unwrap(); + api.register(get_files_for_extent).unwrap(); + + if show { + api.openapi("downstairs-repair", "1") + .write(&mut std::io::stdout()) + .map_err(|e| e.to_string())?; + } + Ok(api) +} + +pub async fn repair_main( + ds: &Arc>, + addr: SocketAddr, +) -> Result<(), String> { + /* + * We must specify a configuration with a bind address. + */ + let config_dropshot = ConfigDropshot { + bind_address: addr, + request_body_max_bytes: 1024, + tls: None, + }; + + /* + * For simplicity, configure an "info"-level logger that writes to + * stderr assuming that it's a terminal. + */ + let config_logging = ConfigLogging::StderrTerminal { + level: ConfigLoggingLevel::Info, + }; + let log = config_logging + .to_logger("example-basic") + .map_err(|error| format!("failed to create logger: {}", error))?; + + /* + * Build a description of the API + */ + let api = build_api(false)?; + + /* + * Record the region directory where all the extents and metadata + * files live. + */ + let ds = ds.lock().await; + let region_dir = ds.region.dir.clone(); + drop(ds); + + let context = FileServerContext { region_dir }; + + println!("Repair listens on {}", addr); + /* + * Set up the server. + */ + let server = HttpServerStarter::new(&config_dropshot, api, context, &log) + .map_err(|error| format!("failed to create server: {}", error))? + .start(); + + /* + * Wait for the server to stop. Note that there's not any code to shut + * down this server, so we should never get past this point. + */ + server.await +} + +#[derive(Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Eid { + eid: u32, +} + +/** + * Stream the contents of the data file for the given extent ID + */ +#[endpoint { + method = GET, + path = "/extent/{eid}/data", + unpublished = false, +}] +async fn get_extent( + rqctx: Arc>, + path: Path, +) -> Result, HttpError> { + let eid = path.into_inner().eid; + let extent_path = extent_path(rqctx.context().region_dir.clone(), eid); + + get_file(extent_path).await +} + +/** + * Stream the contents of the metadata .db file for the given extent ID + */ +#[endpoint { + method = GET, + path = "/extent/{eid}/db", + unpublished = false, +}] +async fn get_db( + rqctx: Arc>, + path: Path, +) -> Result, HttpError> { + let eid = path.into_inner().eid; + let mut extent_path = extent_path(rqctx.context().region_dir.clone(), eid); + extent_path.set_extension("db"); + + get_file(extent_path).await +} + +/** + * Stream the contents of the metadata .db-shm file for the given extent ID + */ +#[endpoint { + method = GET, + path = "/extent/{eid}/db-shm", + unpublished = false, +}] +async fn get_shm( + rqctx: Arc>, + path: Path, +) -> Result, HttpError> { + let eid = path.into_inner().eid; + let mut extent_path = extent_path(rqctx.context().region_dir.clone(), eid); + extent_path.set_extension("db-shm"); + + get_file(extent_path).await +} + +/** + * Stream the contents of the metadata .db-wal file for the given extent ID + */ +#[endpoint { + method = GET, + path = "/extent/{eid}/db-wal", + unpublished = false, +}] +async fn get_wal( + rqctx: Arc>, + path: Path, +) -> Result, HttpError> { + let eid = path.into_inner().eid; + let mut extent_path = extent_path(rqctx.context().region_dir.clone(), eid); + extent_path.set_extension("db-wal"); + + get_file(extent_path).await +} + +async fn get_file(path: PathBuf) -> Result, HttpError> { + println!("Request for file {:?}", path); + /* + * Make sure our file is neither a link nor a directory. + */ + let m = path + .symlink_metadata() + .map_err(|_| HttpError::for_bad_request(None, "ENOENT".to_string()))?; + + if m.file_type().is_symlink() { + Err(HttpError::for_bad_request(None, "EMLINK".to_string())) + } else if path.is_dir() { + Err(HttpError::for_bad_request(None, "EBADF".to_string())) + } else { + let file = tokio::fs::File::open(&path).await.map_err(|_| { + HttpError::for_bad_request(None, "EBADF".to_string()) + })?; + + let file_stream = hyper_staticfile::FileBytesStream::new(file); + let content_type = "application/octet-stream".to_string(); + + Ok(Response::builder() + .status(StatusCode::OK) + .header(http::header::CONTENT_TYPE, content_type) + .body(file_stream.into_body())?) + } +} + +#[derive(Deserialize, Serialize, JsonSchema)] +struct ExtentFiles { + files: Vec, +} + +/** + * For a given extent, return a vec of strings representing the names of + * the files that exist for that extent. + */ +#[endpoint { + method = GET, + path = "/extent/{eid}/files", + unpublished = false, +}] +async fn get_files_for_extent( + rqctx: Arc>, + path: Path, +) -> Result, HttpError> { + let eid = path.into_inner().eid; + let extent_dir = extent_dir(rqctx.context().region_dir.clone(), eid); + + // Some sanity checking on the extent path + let m = extent_dir + .symlink_metadata() + .map_err(|_| HttpError::for_bad_request(None, "ENOENT".to_string()))?; + + if m.file_type().is_symlink() { + Err(HttpError::for_bad_request(None, "EMLINK".to_string())) + } else if !extent_dir.is_dir() { + Err(HttpError::for_bad_request(None, "EBADF".to_string())) + } else { + let files = extent_file_list(extent_dir, eid).await?; + Ok(HttpResponseOk(files)) + } +} + +async fn extent_file_list( + extent_dir: PathBuf, + eid: u32, +) -> Result { + let mut full_name = extent_dir; + + let extent_name = extent_file_name(eid, None); + full_name.push(extent_name.clone()); + let mut files = Vec::new(); + // The data file should always exist + if !full_name.exists() { + return Err(HttpError::for_bad_request(None, "EBADF".to_string())); + } + files.push(extent_name); + // The db file should always exist. + full_name.set_extension("db"); + if !full_name.exists() { + return Err(HttpError::for_bad_request(None, "EBADF".to_string())); + } + files.push(extent_file_name(eid, Some("db"))); + + // The db-shm file may exist. + full_name.set_extension("db-shm"); + if full_name.exists() { + println!("Exists: {:?}", full_name.file_name().unwrap()); + files.push(extent_file_name(eid, Some("db-shm"))); + } + // The db-wal file may exist. + full_name.set_extension("db-wal"); + if full_name.exists() { + files.push(extent_file_name(eid, Some("db-wal"))); + } + Ok(ExtentFiles { files }) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::region::{extent_dir, extent_file_name}; + use tempfile::tempdir; + + fn new_region_options() -> crucible_common::RegionOptions { + let mut region_options: crucible_common::RegionOptions = + Default::default(); + let block_size = 512; + region_options.set_block_size(block_size); + region_options + .set_extent_size(Block::new(10, block_size.trailing_zeros())); + region_options + } + + #[tokio::test] + async fn extent_expected_files() -> Result<()> { + // Verify that the list of files returned for an extent matches + // what we expect. This is a hack of sorts as we are hard coding + // the expected names of files here in that test, rather than + // determine them through some programmatic means. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Determine the directory and name for expected extent files. + let extent_dir = extent_dir(&dir, 1); + let mut ex_files = extent_file_list(extent_dir, 1).await.unwrap(); + ex_files.files.sort(); + let expected = vec!["001", "001.db", "001.db-shm", "001.db-wal"]; + println!("files: {:?}", ex_files.files); + assert_eq!(ex_files.files, expected); + + Ok(()) + } + + #[tokio::test] + async fn extent_expected_files_short() -> Result<()> { + // Verify that the list of files returned for an extent matches + // what we expect. In this case we expect the extent data file and + // the .db file, but not the .db-shm or .db-wal database files. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Determine the directory and name for expected extent files. + let extent_dir = extent_dir(&dir, 1); + + // Delete db-wal and db-shm + let mut rm_file = extent_dir.clone(); + rm_file.push(extent_file_name(1, None)); + rm_file.set_extension("db-wal"); + std::fs::remove_file(&rm_file).unwrap(); + rm_file.set_extension("db-shm"); + std::fs::remove_file(rm_file).unwrap(); + + let mut ex_files = extent_file_list(extent_dir, 1).await.unwrap(); + ex_files.files.sort(); + let expected = vec!["001", "001.db"]; + println!("files: {:?}", ex_files.files); + assert_eq!(ex_files.files, expected); + + Ok(()) + } + #[tokio::test] + async fn extent_expected_files_short_with_close() -> Result<()> { + // Verify that the list of files returned for an extent matches + // what we expect. In this case we expect the extent data file and + // the .db file, but not the .db-shm or .db-wal database files. + // We close the extent here first, and on illumos that behaves + // a little different than elsewhere. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + let ext_one = &mut region.extents[1]; + ext_one.close()?; + + // Determine the directory and name for expected extent files. + let extent_dir = extent_dir(&dir, 1); + + // Delete db-wal and db-shm. On illumos the close of the extent + // may remove these for us, so we ignore errors on the removal. + let mut rm_file = extent_dir.clone(); + rm_file.push(extent_file_name(1, None)); + rm_file.set_extension("db-wal"); + let _ = std::fs::remove_file(&rm_file); + rm_file.set_extension("db-shm"); + let _ = std::fs::remove_file(rm_file); + + let mut ex_files = extent_file_list(extent_dir, 1).await.unwrap(); + ex_files.files.sort(); + let expected = vec!["001", "001.db"]; + println!("files: {:?}", ex_files.files); + assert_eq!(ex_files.files, expected); + + Ok(()) + } +} diff --git a/openapi/README.md b/openapi/README.md index 33a02a391..acc83b2fa 100644 --- a/openapi/README.md +++ b/openapi/README.md @@ -3,3 +3,8 @@ # crucible-control.json Described in this file is the control API for Crucible Upstairs. This file can be generated with the `control-api` program. + +# downstairs-repair.json +This file describes the API for repairing between Crucible Downstairs. +This file can be generated by running the downstairs binary with the +`repair-api` option. diff --git a/openapi/downstairs-repair.json b/openapi/downstairs-repair.json new file mode 100644 index 000000000..013f4b9e4 --- /dev/null +++ b/openapi/downstairs-repair.json @@ -0,0 +1,212 @@ +{ + "openapi": "3.0.3", + "info": { + "title": "downstairs-repair", + "version": "1" + }, + "paths": { + "/extent/{eid}/data": { + "get": { + "summary": "Stream the contents of the data file for the given extent ID", + "operationId": "get_extent", + "parameters": [ + { + "in": "path", + "name": "eid", + "required": true, + "schema": { + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "style": "simple" + } + ], + "responses": { + "default": { + "description": "", + "content": { + "*/*": { + "schema": {} + } + } + } + } + } + }, + "/extent/{eid}/db": { + "get": { + "summary": "Stream the contents of the metadata .db file for the given extent ID", + "operationId": "get_db", + "parameters": [ + { + "in": "path", + "name": "eid", + "required": true, + "schema": { + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "style": "simple" + } + ], + "responses": { + "default": { + "description": "", + "content": { + "*/*": { + "schema": {} + } + } + } + } + } + }, + "/extent/{eid}/db-shm": { + "get": { + "summary": "Stream the contents of the metadata .db-shm file for the given extent ID", + "operationId": "get_shm", + "parameters": [ + { + "in": "path", + "name": "eid", + "required": true, + "schema": { + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "style": "simple" + } + ], + "responses": { + "default": { + "description": "", + "content": { + "*/*": { + "schema": {} + } + } + } + } + } + }, + "/extent/{eid}/db-wal": { + "get": { + "summary": "Stream the contents of the metadata .db-wal file for the given extent ID", + "operationId": "get_wal", + "parameters": [ + { + "in": "path", + "name": "eid", + "required": true, + "schema": { + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "style": "simple" + } + ], + "responses": { + "default": { + "description": "", + "content": { + "*/*": { + "schema": {} + } + } + } + } + } + }, + "/extent/{eid}/files": { + "get": { + "summary": "For a given extent, return a vec of strings representing the names of", + "description": "the files that exist for that extent.", + "operationId": "get_files_for_extent", + "parameters": [ + { + "in": "path", + "name": "eid", + "required": true, + "schema": { + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "style": "simple" + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ExtentFiles" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + } + }, + "components": { + "responses": { + "Error": { + "description": "Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/Error" + } + } + } + } + }, + "schemas": { + "Error": { + "description": "Error information from a response.", + "type": "object", + "properties": { + "error_code": { + "type": "string" + }, + "message": { + "type": "string" + }, + "request_id": { + "type": "string" + } + }, + "required": [ + "message", + "request_id" + ] + }, + "ExtentFiles": { + "type": "object", + "properties": { + "files": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "files" + ] + } + } + } +} \ No newline at end of file diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index a5db40e35..21744989e 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -1,5 +1,6 @@ // Copyright 2021 Oxide Computer Company use std::cmp::Ordering; +use std::net::SocketAddr; use anyhow::bail; use bytes::{Buf, BufMut, BytesMut}; @@ -148,16 +149,22 @@ pub enum Message { */ /// Send a close the given extent ID on the downstairs. /// We send the downstairs the repair ID (rep_id) and the extent number. - ExtentClose(u64, u64), - /// Ack the close of an extent from the downstairs using the rep_id. - ExtentCloseAck(u64), + ExtentClose(u64, usize), /// Send a request (with rep_id) to reopen the given extent. - ExtentReopen(u64, u64), + ExtentReopen(u64, usize), /// Ack the Re-Open of an extent from the downstairs using the rep_id. - ExtentReopenAck(u64), + /// Flush just this extent on just this downstairs client. + /// rep_id, extent ID, downstairs client ID, flush number, gen number. + ExtentFlush(u64, usize, u8, u64, u64), + /// Replace an extent with data from the given downstairs. + /// rep_id, extent ID, source extent, Vec of extents to repair + ExtentRepair(u64, usize, u8, SocketAddr, Vec), + + /// The given repair job ID has finished without error + RepairAckId(u64), /// A problem with the given extent - ExtentError(u64, u64, CrucibleError), + ExtentError(u64, usize, CrucibleError), /* * Metadata exchange */ diff --git a/repair-client/Cargo.toml b/repair-client/Cargo.toml new file mode 100644 index 000000000..27b8e8b86 --- /dev/null +++ b/repair-client/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "repair-client" +version = "0.0.1" +license = "MPL-2.0" +edition = "2018" + +[dependencies] +anyhow = "1.0" +chrono = { version = "0.4", features = [ "serde" ] } +percent-encoding = "2.1" +progenitor = { git = "https://github.com/oxidecomputer/progenitor", branch = "main" } +reqwest = { version = "0.11", features = ["json", "stream"] } +schemars = "0.8" +serde_json = "1.0" + +[dependencies.serde] +version = "1.0" +features = [ "derive" ] diff --git a/repair-client/src/lib.rs b/repair-client/src/lib.rs new file mode 100644 index 000000000..9fb3220ff --- /dev/null +++ b/repair-client/src/lib.rs @@ -0,0 +1,8 @@ +// Copyright 2022 Oxide Computer Company + +use progenitor::generate_api; + +generate_api!( + spec = "../openapi/downstairs-repair.json", + derives = [schemars::JsonSchema], +); diff --git a/tools/create-generic-ds.sh b/tools/create-generic-ds.sh index a33e382fb..c232fa0fc 100755 --- a/tools/create-generic-ds.sh +++ b/tools/create-generic-ds.sh @@ -38,7 +38,12 @@ while getopts 'b:c:des:' opt; do s) extent_size=$OPTARG echo "Using extent size $extent_size" ;; - *) echo "Usage: $0 [de] [-c extent_count] [-s extent_size]" >&2 + *) echo "Usage: $0 [de] [-b #] [-c #] [-s #]" >&2 + echo " -b block_size Block size for the region" >&2 + echo " -c extent_count Total number of extent files" >&2 + echo " -d Delete existing var/88* direcories" >&2 + echo " -e Require encryption on the volume" >&2 + echo " -s extent_size Number of extents per extent file" >&2 exit 1 ;; esac diff --git a/tools/test_repair.sh b/tools/test_repair.sh new file mode 100755 index 000000000..07e9f9c61 --- /dev/null +++ b/tools/test_repair.sh @@ -0,0 +1,89 @@ +#!/usr/bin/env bash + +cargo build || echo "Failed to build" + +cds="./target/debug/crucible-downstairs" +cc="./target/debug/crucible-client" +if [[ ! -f ${cds} ]] || [[ ! -f ${cc} ]]; then + echo "Can't find crucible binaries at $cds or $cc" + exit 1 +fi + +# start all three downstairs +${cds} run -d var/8810 -p 8810 &> /tmp/ds1 & +ds1_pid=$! +${cds} run -d var/8820 -p 8820 &> /tmp/ds2 & +ds2_pid=$! +${cds} run -d var/8830 -p 8830 &> /tmp/ds3 & +ds3_pid=$! + +trap ctrl_c INT +function ctrl_c() { + echo "Stopping at your request" + cleanup +} + +function cleanup() { + kill "$ds1_pid" 2> /dev/null + kill "$ds2_pid" 2> /dev/null + kill "$ds3_pid" 2> /dev/null +} + +verify_file=/tmp/repair_test_verify.data + +target_args="-t 127.0.0.1:8810 -t 127.0.0.1:8820 -t 127.0.0.1:8830" +# Do initial volume population. +if ! ${cc} fill ${target_args} --verify-out "$verify_file" -q +then + echo "Exit on initial fill" + cleanup + exit 1 +fi + +# Stop a downstairs, we will restart with lossy in the loop +kill "$ds3_pid" + +# Start loop +for (( i = 0; i < 30; i += 1 )); do + + # restart downstairs with lossy + ${cds} run -d var/8820 -p 8830 --lossy &> /tmp/ds2 & + ds3_pid=$! + + if ! ${cc} repair ${target_args} --verify-out "$verify_file" --verify-in "$verify_file" -c 30 + then + echo "Exit on repair fail" + cleanup + exit 1 + fi + + echo "" + # Stop --lossy downstairs + kill "$ds3_pid" + sleep 2 + + # Did we get any mismatches? + ${cds} dump -d var/8810 -d var/8820 -d var/8830 -o + echo "On loop $i" + + sleep 2 + echo "" + # Start downstairs without lossy + ${cds} run -d var/8830 -p 8830 &> /tmp/ds2 & + ds3_pid=$! + + echo "Verifying data now" + if ! ${cc} verify ${target_args} --verify-out "$verify_file" --verify-in "$verify_file" -q > /tmp/verify_out + then + echo "Exit on verify fail" + echo "Check /tmp/verify_out for details" + cleanup + exit 1 + fi + + # stop a downstairs + kill "$ds3_pid" +done + +echo "Tests all done at $(date)" +cleanup diff --git a/tools/test_up.sh b/tools/test_up.sh index 6eac748d7..4fa37407d 100755 --- a/tools/test_up.sh +++ b/tools/test_up.sh @@ -84,7 +84,7 @@ for (( i = 0; i < 3; i++ )); do done res=0 -test_list="one span big dep deactivate balloon" +test_list="span big dep deactivate balloon" for tt in ${test_list}; do echo "" echo "Running test: $tt" @@ -100,13 +100,37 @@ for tt in ${test_list}; do fi done +echo "Running hammer" +if ! time cargo run -p crucible-hammer -- \ + "${args[@]}"; then + + echo "Failed hammer test" + echo "Failed hammer test" >> /tmp/test_fail.txt + (( res += 1 )) +fi + # Repair test +# This one goes last because it modified the args variable. +# We also test the --verify-* args here as well. +args+=( --verify-out "${testdir}/verify_file" ) +echo "$cc" fill -q "${args[@]}" +if ! "$cc" fill -q "${args[@]}"; then + (( res += 1 )) + echo "" + echo "Failed setup repair test" + echo "Failed setup repair test" >> /tmp/test_fail.txt + echo +else + echo "Repair setup passed" +fi + echo "Copy the $port file" echo cp -r "${testdir}/${port}" "${testdir}/previous" cp -r "${testdir}/${port}" "${testdir}/previous" -echo "$cc" "$tt" -q "${args[@]}" -if ! "$cc" one -q "${args[@]}"; then +args+=( --verify-in "${testdir}/verify_file" ) +echo "$cc" repair -q "${args[@]}" +if ! "$cc" repair -q "${args[@]}"; then (( res += 1 )) echo "" echo "Failed repair test part 1" @@ -136,7 +160,7 @@ downstairs[4]=$! echo "" echo "" echo "$cc" "$tt" -q "${args[@]}" -if ! "$cc" one -q "${args[@]}"; then +if ! "$cc" verify -q "${args[@]}"; then (( res += 1 )) echo "" echo "Failed repair test part 2" @@ -146,38 +170,6 @@ else echo "Repair part 2 passed" fi -echo "Running hammer" -if ! time cargo run -p crucible-hammer -- \ - "${args[@]}"; then - - echo "Failed hammer test" - echo "Failed hammer test" >> /tmp/test_fail.txt - (( res += 1 )) -fi - -echo "" -echo "Running verify test: $tt" -vfile="${testdir}/verify" -echo "$cc" rand -q --verify-out "$vfile" "${args[@]}" -if ! "$cc" rand -q --verify-out "$vfile" "${args[@]}"; then - (( res += 1 )) - echo "" - echo "Failed crucible-client rand verify test" - echo "Failed crucible-client rand verify test" >> /tmp/test_fail.txt - echo "" -else - echo "$cc" rand -q --verify-in "$vfile" "${args[@]}" - if ! "$cc" rand -q --verify-in "$vfile" "${args[@]}"; then - (( res += 1 )) - echo "" - echo "Failed crucible-client rand verify part 2 test" - echo "Failed crucible-client rand verify part 2 test" >> /tmp/test_fail.txt - echo "" - else - echo "Verify test passed" - fi -fi - # The dump args look different than other downstairs commands args=() for (( i = 0; i < 30; i += 10 )); do diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index c6fa2a9dd..db78d94a2 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -1255,20 +1255,16 @@ where up_coms.client_id, expected_uuid ); } - - Some(Message::ExtentCloseAck(rep_id)) => { - up.ds_repair_done_notify( - up_coms.client_id, - rep_id, - &up_coms.ds_reconcile_done_tx, - ).await?; - } - Some(Message::ExtentReopenAck(rep_id)) => { - up.ds_repair_done_notify( - up_coms.client_id, - rep_id, - &up_coms.ds_reconcile_done_tx, - ).await?; + Some(Message::RepairAckId(rep_id)) => { + if up.downstairs.lock().unwrap().rep_done( + up_coms.client_id, rep_id + ) { + up.ds_repair_done_notify( + up_coms.client_id, + rep_id, + &up_coms.ds_reconcile_done_tx, + ).await?; + } } Some(m) => { println!( @@ -1286,13 +1282,51 @@ where */ println!("[{}] received reconcile message", up_coms.client_id); - let job = up - .downstairs - .lock().unwrap().rep_in_progress(up_coms.client_id); + /* + * We use rep_done to indicate this was job where our client + * did not have any actual work to send to the downstairs. + * It indicates that, we don't need a response from the + * downstairs and can go ahead and mark this rep_id as + * completed for this client and move forward. + */ + let mut rep_done = None; + let job = up. + downstairs + .lock() + .unwrap() + .rep_in_progress(up_coms.client_id); match job { Some(op) => { - println!("[{}] submit {:?}", up_coms.client_id, op); - fw.send(op).await?; + println!("[{}] client {:?}", up_coms.client_id, op); + /* + * If there is work to do, check to see if it is + * a repair job. If so, only send that to the actual + * clients that need to get it. The source downstairs + * does not get a message for this operation. + * + * If the work is an extent flush, then only send the + * message to the source extent, the other downstairs + * do not get a message. + */ + match op { + Message::ExtentRepair(rep_id, _, src, _, _) => { + if up_coms.client_id == src { + rep_done = Some(rep_id); + } else { + fw.send(op).await?; + } + }, + Message::ExtentFlush(rep_id, _, src, _, _) => { + if up_coms.client_id != src { + rep_done = Some(rep_id); + } else { + fw.send(op).await?; + } + }, + op => { + fw.send(op).await?; + } + } }, None => { /* @@ -1332,6 +1366,25 @@ where } } } + /* + * If rep_done is Some, it means this client had nothing + * to send to the downstairs, and we can go ahead and mark + * this rep_id as completed, which will trigger sending a + * notify if all other downstairs are also complete. + */ + if let Some(rep_id) = rep_done { + if up.downstairs.lock().unwrap().rep_done(up_coms.client_id, rep_id) { + println!("[{}] self notify as src for {}", + up_coms.client_id, + rep_id + ); + up.ds_repair_done_notify( + up_coms.client_id, + rep_id, + &up_coms.ds_reconcile_done_tx, + ).await?; + } + } } _ = sleep_until(timeout_deadline) => { bail!("[{}] Downstairs not responding, take offline", @@ -1672,6 +1725,7 @@ impl Downstairs { reconcile_repair_needed: 0, } } + /** * Assign a new downstairs ID. */ @@ -1792,7 +1846,7 @@ impl Downstairs { ); return None; } - + println!("[{}] rep_in_progress: return {:?}", client_id, job); Some(job.op.clone()) } else { None @@ -1840,22 +1894,40 @@ impl Downstairs { */ fn convert_rc_to_messages( &mut self, - mut rec_list: HashMap, + mut rec_list: HashMap, + max_flush: u64, + max_gen: u64, ) { let mut rep_id = 0; - println!("Full list: {:?}", rec_list); - for (ext, _) in rec_list.drain() { + println!("Full repair list: {:?}", rec_list); + for (ext, ef) in rec_list.drain() { /* - * XXX TODO: We are not repairing anything here yet. - * For every extent mismatch, we just close then re-open them. - * Coming soon will be the actual commands to send extents - * from one downstairs to another. + * For each extent needing repair, we put the following + * tasks on the reconcile task list. + * Flush (the source) extent with latest gen/flush#. + * Close extent (on all ds) + * Send repair command to bad extents + * Reopen extent. */ + self.reconcile_task_list.push_back(ReconcileIO::new( + rep_id, + Message::ExtentFlush( + rep_id, ext, ef.source, max_flush, max_gen, + ), + )); + rep_id += 1; self.reconcile_task_list.push_back(ReconcileIO::new( rep_id, Message::ExtentClose(rep_id, ext), )); rep_id += 1; + let repair = self.repair_addr(ef.source); + self.reconcile_task_list.push_back(ReconcileIO::new( + rep_id, + Message::ExtentRepair(rep_id, ext, ef.source, repair, ef.dest), + )); + rep_id += 1; + self.reconcile_task_list.push_back(ReconcileIO::new( rep_id, Message::ExtentReopen(rep_id, ext), @@ -2425,6 +2497,8 @@ impl Downstairs { self.downstairs_errors.insert(client_id, errors + 1); // XXX We don't count read errors here. + } else { + println!("[{}] {} read error", client_id, ds_id); } } } @@ -3792,8 +3866,8 @@ impl Upstairs { } /* - * Check and see if the downstairs are ready for the next repair - * command. Send a message if so. + * Send a message that indicates the downstairs are ready for the + * next repair command. */ async fn ds_repair_done_notify( &self, @@ -3801,18 +3875,16 @@ impl Upstairs { rep_id: u64, ds_reconcile_done_tx: &mpsc::Sender, ) -> Result<()> { - if self.downstairs.lock().unwrap().rep_done(client_id, rep_id) { - println!("[{}] It's time to notify for {}", client_id, rep_id); - if let Err(e) = ds_reconcile_done_tx - .send(Repair { - repair: true, - client_id, - rep_id, - }) - .await - { - bail!("[{}] Failed to notify {} {:?}", client_id, rep_id, e); - } + println!("[{}] It's time to notify for {}", client_id, rep_id); + if let Err(e) = ds_reconcile_done_tx + .send(Repair { + repair: true, + client_id, + rep_id, + }) + .await + { + bail!("[{}] Failed to notify {} {:?}", client_id, rep_id, e); } Ok(()) } @@ -3974,7 +4046,7 @@ impl Upstairs { "Found {:?} extents that need repair", reconcile_list.mend.len() ); - ds.convert_rc_to_messages(reconcile_list.mend); + ds.convert_rc_to_messages(reconcile_list.mend, max_flush, max_gen); ds.reconcile_repair_needed = ds.reconcile_task_list.len(); true } else { @@ -4341,9 +4413,11 @@ impl Upstairs { } /* - * TODO: This should just store the region info for a downstairs at - * some place we can later look at and compare. For now, we do some - * of that work now. + * Store the downstairs UUID, or compare to what we stored before + * for a given client ID. Do a sanity check that this downstairs + * Region Definition matches the other downstairs. If we don't have + * any Region info yet, then use the provided RegionDefinition as + * the source to compare the other downstairs with. */ fn add_ds_region( &self, diff --git a/upstairs/src/mend.rs b/upstairs/src/mend.rs index 6a305b764..bbd8288ba 100644 --- a/upstairs/src/mend.rs +++ b/upstairs/src/mend.rs @@ -34,7 +34,7 @@ pub struct ExtentFix { #[derive(Debug)] pub struct DownstairsMend { // Index by extent ID - pub mend: HashMap, + pub mend: HashMap, } impl DownstairsMend { @@ -108,7 +108,7 @@ impl DownstairsMend { if *dirty0 || c1.dirty[i] || c2.dirty[i] { println!("Extents {} dirty", i); let ef = make_repair_list(i, c0, c1, c2); - dsm.mend.insert(i as u64, ef); + dsm.mend.insert(i, ef); } else { to_check.push(i as usize); } @@ -126,7 +126,7 @@ impl DownstairsMend { { println!("Extent {} has flush number mismatch", i); let ef = make_repair_list(*i, c0, c1, c2); - dsm.mend.insert(*i as u64, ef); + dsm.mend.insert(*i, ef); } else { second_check.push(*i); } @@ -142,7 +142,7 @@ impl DownstairsMend { { println!("generation number mismatch {}", i); let ef = make_repair_list(*i, c0, c1, c2); - dsm.mend.insert(*i as u64, ef); + dsm.mend.insert(*i, ef); } } diff --git a/upstairs/src/test.rs b/upstairs/src/test.rs index aea854024..497468d87 100644 --- a/upstairs/src/test.rs +++ b/upstairs/src/test.rs @@ -3668,20 +3668,206 @@ mod test { #[test] fn reconcile_rc_to_message() { - // Convert an extent fix to the crucible messages. - // TODO: This will grow as the protocol is finalized. + // Convert an extent fix to the crucible repair messages that + // are sent to the downstairs. Verify that the resulting + // messages are what we expect let up = Upstairs::default(); let mut ds = up.downstairs.lock().unwrap(); + let r0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 801); + let r1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 802); + let r2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 803); + ds.ds_repair.insert(0, r0.clone()); + ds.ds_repair.insert(1, r1); + ds.ds_repair.insert(2, r2); + + let repair_extent = 9; let mut rec_list = HashMap::new(); let ef = ExtentFix { source: 0, dest: vec![1, 2], }; - rec_list.insert(0, ef); - ds.convert_rc_to_messages(rec_list); - // TODO: When we finalize the message, update this test - // to expect more. - assert!(!ds.reconcile_task_list.is_empty()); + rec_list.insert(repair_extent, ef); + let max_flush = 22; + let max_gen = 33; + ds.convert_rc_to_messages(rec_list, max_flush, max_gen); + + // Walk the list and check for messages we expect to find + assert_eq!(ds.reconcile_task_list.len(), 4); + + // First task, flush + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 0); + match rio.op { + Message::ExtentFlush(rep_id, ext, source, mf, mg) => { + assert_eq!(rep_id, 0); + assert_eq!(ext, repair_extent); + assert_eq!(source, 0); + assert_eq!(mf, max_flush); + assert_eq!(mg, max_gen); + } + m => { + panic!("{:?} not ExtentFlush()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Second task, close extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 1); + match rio.op { + Message::ExtentClose(rep_id, ext) => { + assert_eq!(rep_id, 1); + assert_eq!(ext, repair_extent); + } + m => { + panic!("{:?} not ExtentClose()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Third task, repair extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 2); + match rio.op { + Message::ExtentRepair(rep_id, ext, source, repair, dest) => { + assert_eq!(rep_id, rio.id); + assert_eq!(ext, repair_extent); + assert_eq!(source, 0); + assert_eq!(repair, r0); + assert_eq!(dest, vec![1, 2]); + } + m => { + panic!("{:?} not ExtentRepair", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Third task, close extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 3); + match rio.op { + Message::ExtentReopen(rep_id, ext) => { + assert_eq!(rep_id, 3); + assert_eq!(ext, repair_extent); + } + m => { + panic!("{:?} not ExtentClose()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + } + /* + * + let result = std::panic::catch_unwind(|| + ext_one.create_copy_dir(&dir).unwrap() + ); + assert!(result.is_err()); + */ + #[test] + fn reconcile_rc_to_message_two() { + // Convert another extent fix to the crucible repair messages that + // are sent to the downstairs. Verify that the resulting + // messages are what we expect + let up = Upstairs::default(); + let mut ds = up.downstairs.lock().unwrap(); + let r0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 801); + let r1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 802); + let r2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 803); + ds.ds_repair.insert(0, r0.clone()); + ds.ds_repair.insert(1, r1); + ds.ds_repair.insert(2, r2); + + let repair_extent = 5; + let mut rec_list = HashMap::new(); + let ef = ExtentFix { + source: 2, + dest: vec![0, 1], + }; + rec_list.insert(repair_extent, ef); + let max_flush = 66; + let max_gen = 77; + ds.convert_rc_to_messages(rec_list, max_flush, max_gen); + + // Walk the list and check for messages we expect to find + assert_eq!(ds.reconcile_task_list.len(), 4); + + // First task, flush + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 0); + match rio.op { + Message::ExtentFlush(rep_id, ext, source, mf, mg) => { + assert_eq!(rep_id, 0); + assert_eq!(ext, repair_extent); + assert_eq!(source, 2); + assert_eq!(mf, max_flush); + assert_eq!(mg, max_gen); + } + m => { + panic!("{:?} not ExtentFlush()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Second task, close extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 1); + match rio.op { + Message::ExtentClose(rep_id, ext) => { + assert_eq!(rep_id, 1); + assert_eq!(ext, repair_extent); + } + m => { + panic!("{:?} not ExtentClose()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Third task, repair extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 2); + match rio.op { + Message::ExtentRepair(rep_id, ext, source, repair, dest) => { + assert_eq!(rep_id, rio.id); + assert_eq!(ext, repair_extent); + assert_eq!(source, 2); + assert_eq!(repair, r2); + assert_eq!(dest, vec![0, 1]); + } + m => { + panic!("{:?} not ExtentRepair", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); + + // Third task, close extent + let rio = ds.reconcile_task_list.pop_front().unwrap(); + assert_eq!(rio.id, 3); + match rio.op { + Message::ExtentReopen(rep_id, ext) => { + assert_eq!(rep_id, 3); + assert_eq!(ext, repair_extent); + } + m => { + panic!("{:?} not ExtentClose()", m); + } + } + assert_eq!(Some(&IOState::New), rio.state.get(&0)); + assert_eq!(Some(&IOState::New), rio.state.get(&1)); + assert_eq!(Some(&IOState::New), rio.state.get(&2)); } #[test] From 1d8032d5f345a11f6e2c4e8df9749d8536e112f8 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Tue, 29 Mar 2022 23:37:37 -0700 Subject: [PATCH 2/7] fix repair comment typos --- downstairs/src/region.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index f50a56fa5..eae08046e 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -1114,7 +1114,7 @@ impl Region { * * Let us assume we are repairing extent 012 * 1. Make new 012.copy dir (extent name plus: .copy) - * 2. Get all extent files from source side, put in 021.copy directory + * 2. Get all extent files from source side, put in 012.copy directory * 3. fsync files we just downloaded * 4. Rename 012.copy dir to 012.replace dir * 5. fsync extent directory ( 00/000/ where the extent files live) @@ -1127,7 +1127,7 @@ impl Region { * 11. fsync extent dir again (so dir rename is persisted) * 12. Delete completed dir. * - * This also requires the following behavior on Extent open: + * This also requires the following behavior on every extent open: * A. If xxx.Copy directory found, delete it. * B. If xxx.Completed directory found, delete it. * C. If xxx.Replace dir found start at step 4 above and continue From 8d625692caaf4d637015932e3a79c77c3ab01687 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Thu, 31 Mar 2022 13:50:25 -0700 Subject: [PATCH 3/7] validate_repair_files test and PR comments Added a new function for validating repair files. Other small updates based on PR comments. --- downstairs/src/region.rs | 120 +++++++++++++++++++++++++++++++++------ downstairs/src/repair.rs | 1 + tools/test_repair.sh | 20 ++++--- 3 files changed, 117 insertions(+), 24 deletions(-) diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index eae08046e..fe702a1d4 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -490,7 +490,7 @@ fn config_path>(dir: P) -> PathBuf { } /** - * Remove directories associated with repair expect for the replace + * Remove directories associated with repair except for the replace * directory. Replace is handled specifically during extent open. */ pub fn remove_copy_cleanup_dir>(dir: P, eid: u32) -> Result<()> { @@ -506,6 +506,34 @@ pub fn remove_copy_cleanup_dir>(dir: P, eid: u32) -> Result<()> { Ok(()) } +/** + * Validate a list of repair files. + * There are either two or four files we expect to find, any more or less + * and we have a bad list. + */ +pub fn validate_repair_files(eid: usize, files: &[String]) -> bool { + println!("validate {} with {:?}", eid, files); + let count = files.len(); + if count != 4 && count != 2 { + false + } else { + let eid = eid as u32; + for ss in files.iter() { + match &*ss { + d if *d == extent_file_name(eid, None) + || *d == extent_file_name(eid, Some("db")) => {} + d if count == 4 + && (*d == extent_file_name(eid, Some("db-shm")) + || *d == extent_file_name(eid, Some("db-wal"))) => {} + _ => { + return false; + } + } + } + true + } +} + impl Extent { /** * Open an existing extent file at the location requested. @@ -1121,11 +1149,16 @@ impl Region { * 6. Replace current extent 012 files with copied files of same name * from 012.replace dir * 7. Remove any files in extent dir that don't exist in replacing dir + * For example, if the replacement extent has 012 and 012.db, but + * the current (bad) extent has 012 012.db 012.db-shm + * and 012.db-wal, we want to remove the 012.db-shm and 012.db-wal + * files when we replace 012 and 012.db with the new versions. * 8. fsync files after copying them (new location). * 9. fsync containing extent dir * 10. Rename 012.replace dir to 012.completed dir. * 11. fsync extent dir again (so dir rename is persisted) * 12. Delete completed dir. + * 13. fsync extent dir again (so dir rename is persisted) * * This also requires the following behavior on every extent open: * A. If xxx.Copy directory found, delete it. @@ -1199,23 +1232,16 @@ impl Region { // The repair file list should always contain the extent data // file itself, and the .db file (metadata) for that extent. // Missing these means the repair will not succeed. - let filename = extent_file_name(eid as u32, None); - if !repair_files.contains(&filename) { - // XXX Panic now, but this will eventually bail and abort - // the repair and let the upper layers handle it. - panic!("Repair file list missing data file {}", filename); + // Optionally, there could be both .db-shm and .db-wal. + if !validate_repair_files(eid, &repair_files) { + panic!("Invalid repair file list: {:?}", repair_files); } + let extent_copy = extent.create_copy_file(copy_dir.clone(), None).unwrap(); let repair_stream = repair_server.get_extent(eid as u32).await.unwrap(); save_stream_to_file(extent_copy, repair_stream.into_inner()).await?; - let filename = extent_file_name(eid as u32, Some("db")); - if !repair_files.contains(&filename) { - // XXX Panic now, but this will eventually bail and abort - // the repair and let the upper layers handle it. - panic!("Repair file list missing db file {}", filename); - } let extent_db = extent .create_copy_file(copy_dir.clone(), Some("db")) .unwrap(); @@ -1231,8 +1257,6 @@ impl Region { let repair_stream = repair_server.get_shm(eid as u32).await.unwrap(); save_stream_to_file(extent_shm, repair_stream.into_inner()).await?; - } else if repair_files.len() != 2 { - panic!("Unknown files on repair list: {:?}", repair_files); } let filename = extent_file_name(eid as u32, Some("db-wal")); @@ -1244,9 +1268,6 @@ impl Region { repair_server.get_wal(eid as u32).await.unwrap(); save_stream_to_file(extent_wal, repair_stream.into_inner()).await?; } - if repair_files.len() > 4 { - panic!("Unknown extra files on repair list: {:?}", repair_files); - } // After we have all files: move the repair dir. println!( @@ -1860,6 +1881,7 @@ mod test { Ok(()) } + #[test] fn reopen_extent_cleanup_replay() -> Result<()> { // Verify on extent open that a replacement directory will @@ -2050,6 +2072,70 @@ mod test { Ok(()) } + #[test] + fn validate_repair_files_empty() { + // No repair files is a failure + assert_eq!(validate_repair_files(1, &Vec::new()), false); + } + + #[test] + fn validate_repair_files_good() { + // This is an expected list of files for an extent + let good_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-wal".to_string(), + ]; + + assert!(validate_repair_files(1, &good_files)); + } + + #[test] + fn validate_repair_files_also_good() { + // This is also an expected list of files for an extent + let good_files: Vec = + vec!["001".to_string(), "001.db".to_string()]; + assert!(validate_repair_files(1, &good_files)); + } + + #[test] + fn validate_repair_files_offbyon() { + // Incorrect file names for extent 2 + let good_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-wal".to_string(), + ]; + + assert_eq!(validate_repair_files(2, &good_files), false); + } + + #[test] + fn validate_repair_files_too_good() { + // Duplicate file in list + let good_files: Vec = vec![ + "001".to_string(), + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-wal".to_string(), + ]; + assert_eq!(validate_repair_files(1, &good_files), false); + } + + #[test] + fn validate_repair_files_not_good_enough() { + // We require 2 or 4 files, not 3 + let good_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-wal".to_string(), + ]; + assert_eq!(validate_repair_files(1, &good_files), false); + } + #[test] fn reopen_all_extents() -> Result<()> { // Create the region, make three extents diff --git a/downstairs/src/repair.rs b/downstairs/src/repair.rs index cc064c10e..c0d39e7f8 100644 --- a/downstairs/src/repair.rs +++ b/downstairs/src/repair.rs @@ -348,6 +348,7 @@ mod test { Ok(()) } + #[tokio::test] async fn extent_expected_files_short_with_close() -> Result<()> { // Verify that the list of files returned for an extent matches diff --git a/tools/test_repair.sh b/tools/test_repair.sh index 07e9f9c61..1a392c744 100755 --- a/tools/test_repair.sh +++ b/tools/test_repair.sh @@ -27,10 +27,16 @@ function cleanup() { kill "$ds1_pid" 2> /dev/null kill "$ds2_pid" 2> /dev/null kill "$ds3_pid" 2> /dev/null + kill "$slow_pid" 2> /dev/null } verify_file=/tmp/repair_test_verify.data +# Change these to pick which downstairs will be the one out of sync. +slow_port=8820 +slow_log=/tmp/ds2 +slow_pid=$ds2_pid + target_args="-t 127.0.0.1:8810 -t 127.0.0.1:8820 -t 127.0.0.1:8830" # Do initial volume population. if ! ${cc} fill ${target_args} --verify-out "$verify_file" -q @@ -41,14 +47,14 @@ then fi # Stop a downstairs, we will restart with lossy in the loop -kill "$ds3_pid" +kill "$slow_pid" # Start loop for (( i = 0; i < 30; i += 1 )); do # restart downstairs with lossy - ${cds} run -d var/8820 -p 8830 --lossy &> /tmp/ds2 & - ds3_pid=$! + ${cds} run -d var/"${slow_port}" -p "${slow_port}" --lossy &> "${slow_log}" & + slow_pid=$! if ! ${cc} repair ${target_args} --verify-out "$verify_file" --verify-in "$verify_file" -c 30 then @@ -59,7 +65,7 @@ for (( i = 0; i < 30; i += 1 )); do echo "" # Stop --lossy downstairs - kill "$ds3_pid" + kill "$slow_pid" sleep 2 # Did we get any mismatches? @@ -69,8 +75,8 @@ for (( i = 0; i < 30; i += 1 )); do sleep 2 echo "" # Start downstairs without lossy - ${cds} run -d var/8830 -p 8830 &> /tmp/ds2 & - ds3_pid=$! + ${cds} run -d var/"$slow_port" -p "$slow_port" &> "$slow_log" & + slow_pid=$! echo "Verifying data now" if ! ${cc} verify ${target_args} --verify-out "$verify_file" --verify-in "$verify_file" -q > /tmp/verify_out @@ -82,7 +88,7 @@ for (( i = 0; i < 30; i += 1 )); do fi # stop a downstairs - kill "$ds3_pid" + kill "$slow_pid" done echo "Tests all done at $(date)" From 01a79e026a52e31c4dce425726b60a63d1ad3a90 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Thu, 31 Mar 2022 14:10:12 -0700 Subject: [PATCH 4/7] more PR comments addressed --- upstairs/src/lib.rs | 2 +- upstairs/src/test.rs | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index db78d94a2..5edaedc24 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -2496,8 +2496,8 @@ impl Downstairs { self.downstairs_errors.insert(client_id, errors + 1); - // XXX We don't count read errors here. } else { + // XXX We don't count read errors here. println!("[{}] {} read error", client_id, ds_id); } } diff --git a/upstairs/src/test.rs b/upstairs/src/test.rs index 497468d87..6b0f0bd7f 100644 --- a/upstairs/src/test.rs +++ b/upstairs/src/test.rs @@ -3764,13 +3764,7 @@ mod test { assert_eq!(Some(&IOState::New), rio.state.get(&1)); assert_eq!(Some(&IOState::New), rio.state.get(&2)); } - /* - * - let result = std::panic::catch_unwind(|| - ext_one.create_copy_dir(&dir).unwrap() - ); - assert!(result.is_err()); - */ + #[test] fn reconcile_rc_to_message_two() { // Convert another extent fix to the crucible repair messages that From 705d4b334b48473d8957dc8f4a695bdb7a0f2714 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Thu, 31 Mar 2022 14:13:14 -0700 Subject: [PATCH 5/7] fix cargo fmt --- upstairs/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 5edaedc24..9a6791953 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -2495,7 +2495,6 @@ impl Downstairs { }; self.downstairs_errors.insert(client_id, errors + 1); - } else { // XXX We don't count read errors here. println!("[{}] {} read error", client_id, ds_id); From f459a672579a48847a96cffc053b8c39b0235d34 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Mon, 11 Apr 2022 10:43:38 -0700 Subject: [PATCH 6/7] Address PR comments Refactor the openapi a bit, less code duplication. --- downstairs/src/region.rs | 191 ++++++++++++++++++++++--------- downstairs/src/repair.rs | 200 +++++++++++++++++---------------- openapi/downstairs-repair.json | 130 +++++---------------- 3 files changed, 273 insertions(+), 248 deletions(-) diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index fe702a1d4..b486b4208 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -1,6 +1,7 @@ // Copyright 2021 Oxide Computer Company use std::collections::HashMap; use std::convert::TryInto; +use std::fmt; use std::fs::{rename, File, OpenOptions}; use std::io::{Read, Seek, SeekFrom, Write}; use std::net::SocketAddr; @@ -12,6 +13,7 @@ use anyhow::{bail, Result}; use crucible_common::*; use crucible_protocol::{EncryptionContext, SnapshotDetails}; use futures::TryStreamExt; +use repair_client::types::FileType; use repair_client::Client; use rusqlite::{params, Connection}; use serde::{Deserialize, Serialize}; @@ -409,10 +411,44 @@ impl Default for ExtentMeta { } } +#[derive(Debug, Clone)] +pub enum ExtentExtension { + Db, + DbShm, + DbWal, +} + +impl fmt::Display for ExtentExtension { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + ExtentExtension::Db => write!(f, "db"), + ExtentExtension::DbShm => write!(f, "db-shm"), + ExtentExtension::DbWal => write!(f, "db-wal"), + } + } +} + +/** + * Take an ExtentExtension and translate it into the corresponding + * FileType from the repair client. + */ +impl ExtentExtension { + fn to_file_type(&self) -> FileType { + match self { + ExtentExtension::Db => FileType::Db, + ExtentExtension::DbShm => FileType::DbShm, + ExtentExtension::DbWal => FileType::DbWal, + } + } +} + /** * Produce the string name of the data file for a given extent number */ -pub fn extent_file_name(number: u32, extension: Option<&str>) -> String { +pub fn extent_file_name( + number: u32, + extension: Option, +) -> String { if let Some(extension) = extension { format!("{:03X}.{}", number & 0xFFF, extension) } else { @@ -507,31 +543,27 @@ pub fn remove_copy_cleanup_dir>(dir: P, eid: u32) -> Result<()> { } /** - * Validate a list of repair files. + * Validate a list of sorted repair files. * There are either two or four files we expect to find, any more or less - * and we have a bad list. + * and we have a bad list. No duplicates. */ pub fn validate_repair_files(eid: usize, files: &[String]) -> bool { println!("validate {} with {:?}", eid, files); - let count = files.len(); - if count != 4 && count != 2 { - false - } else { - let eid = eid as u32; - for ss in files.iter() { - match &*ss { - d if *d == extent_file_name(eid, None) - || *d == extent_file_name(eid, Some("db")) => {} - d if count == 4 - && (*d == extent_file_name(eid, Some("db-shm")) - || *d == extent_file_name(eid, Some("db-wal"))) => {} - _ => { - return false; - } - } - } - true - } + let eid = eid as u32; + + let some = vec![ + extent_file_name(eid, None), + extent_file_name(eid, Some(ExtentExtension::Db)), + ]; + + let mut all = some.clone(); + all.extend(vec![ + extent_file_name(eid, Some(ExtentExtension::DbShm)), + extent_file_name(eid, Some(ExtentExtension::DbWal)), + ]); + + // Either we have some or all. + files == some || files == all } impl Extent { @@ -753,12 +785,13 @@ impl Extent { fn create_copy_file( &self, mut copy_dir: PathBuf, - extension: Option<&str>, + extension: Option, ) -> Result { let name = extent_file_name(self.number, None); copy_dir.push(name); if let Some(extension) = extension { - copy_dir.set_extension(extension); + let ext = format!("{}", extension); + copy_dir.set_extension(ext); } let copy_path = copy_dir; @@ -1161,9 +1194,9 @@ impl Region { * 13. fsync extent dir again (so dir rename is persisted) * * This also requires the following behavior on every extent open: - * A. If xxx.Copy directory found, delete it. - * B. If xxx.Completed directory found, delete it. - * C. If xxx.Replace dir found start at step 4 above and continue + * A. If xxx.copy directory found, delete it. + * B. If xxx.completed directory found, delete it. + * C. If xxx.replace dir found start at step 4 above and continue * on through 6. * D. Only then, open extent. */ @@ -1200,9 +1233,11 @@ impl Region { eid: usize, repair_addr: SocketAddr, ) -> Result<(), CrucibleError> { + // An extent must be closed before we replace its files. assert!(self.extents[eid].inner.is_none()); - // Make sure copy, replace, cleanup dirs don't exist yet. - // We don't need them yet, but if they exist, then something + + // Make sure copy, replace, and cleanup directories don't exist yet. + // We don't need them yet, but if they do exist, then something // is wrong. let rd = replace_dir(&self.dir, eid as u32); if rd.exists() { @@ -1220,13 +1255,14 @@ impl Region { let url = format!("http://{:?}", repair_addr); let repair_server = Client::new(&url); - let repair_files = repair_server + let mut repair_files = repair_server .get_files_for_extent(eid as u32) .await .unwrap() .into_inner() .files; + repair_files.sort(); println!("Found repair files: {:?}", repair_files); // The repair file list should always contain the extent data @@ -1237,36 +1273,39 @@ impl Region { panic!("Invalid repair file list: {:?}", repair_files); } + // First, copy the main extent data file. let extent_copy = extent.create_copy_file(copy_dir.clone(), None).unwrap(); - let repair_stream = repair_server.get_extent(eid as u32).await.unwrap(); + let repair_stream = repair_server + .get_extent_file(eid as u32, FileType::Data) + .await + .unwrap(); save_stream_to_file(extent_copy, repair_stream.into_inner()).await?; + // The .db file is also required to exist for any valid extent. let extent_db = extent - .create_copy_file(copy_dir.clone(), Some("db")) + .create_copy_file(copy_dir.clone(), Some(ExtentExtension::Db)) + .unwrap(); + let repair_stream = repair_server + .get_extent_file(eid as u32, FileType::Db) + .await .unwrap(); - let repair_stream = repair_server.get_db(eid as u32).await.unwrap(); save_stream_to_file(extent_db, repair_stream.into_inner()).await?; // These next two are optional. - let filename = extent_file_name(eid as u32, Some("db-shm")); - if repair_files.contains(&filename) { - let extent_shm = extent - .create_copy_file(copy_dir.clone(), Some("db-shm")) - .unwrap(); - let repair_stream = - repair_server.get_shm(eid as u32).await.unwrap(); - save_stream_to_file(extent_shm, repair_stream.into_inner()).await?; - } - - let filename = extent_file_name(eid as u32, Some("db-wal")); - if repair_files.contains(&filename) { - let extent_wal = extent - .create_copy_file(copy_dir.clone(), Some("db-wal")) - .unwrap(); - let repair_stream = - repair_server.get_wal(eid as u32).await.unwrap(); - save_stream_to_file(extent_wal, repair_stream.into_inner()).await?; + for opt_file in &[ExtentExtension::DbShm, ExtentExtension::DbWal] { + let filename = extent_file_name(eid as u32, Some(opt_file.clone())); + if repair_files.contains(&filename) { + let extent_shm = extent + .create_copy_file(copy_dir.clone(), Some(opt_file.clone())) + .unwrap(); + let repair_stream = repair_server + .get_extent_file(eid as u32, opt_file.to_file_type()) + .await + .unwrap(); + save_stream_to_file(extent_shm, repair_stream.into_inner()) + .await?; + } } // After we have all files: move the repair dir. @@ -1586,7 +1625,7 @@ pub fn move_replacement_extent>( assert!(Path::new(&replace_dir).exists()); assert!(!Path::new(&completed_dir).exists()); - println!("Move stuff from {:?} in {:?}", replace_dir, destination_dir,); + println!("Copy files from {:?} in {:?}", replace_dir, destination_dir,); // Setup the original and replacement file names. let mut new_file = replace_dir.clone(); @@ -2099,6 +2138,38 @@ mod test { assert!(validate_repair_files(1, &good_files)); } + #[test] + fn validate_repair_files_duplicate() { + // duplicate file names for extent 2 + let good_files: Vec = + vec!["002".to_string(), "002".to_string()]; + assert_eq!(validate_repair_files(2, &good_files), false); + } + + #[test] + fn validate_repair_files_duplicate_pair() { + // duplicate file names for extent 2 + let good_files: Vec = vec![ + "002".to_string(), + "002".to_string(), + "002.db".to_string(), + "002.db".to_string(), + ]; + assert_eq!(validate_repair_files(2, &good_files), false); + } + + #[test] + fn validate_repair_files_quad_duplicate() { + // This is an expected list of files for an extent + let good_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-shm".to_string(), + ]; + assert_eq!(validate_repair_files(1, &good_files), false); + } + #[test] fn validate_repair_files_offbyon() { // Incorrect file names for extent 2 @@ -2266,7 +2337,21 @@ mod test { } #[test] fn extent_name_basic_ext() { - assert_eq!(extent_file_name(4, Some("db")), "004.db"); + assert_eq!(extent_file_name(4, Some(ExtentExtension::Db)), "004.db"); + } + #[test] + fn extent_name_basic_ext_shm() { + assert_eq!( + extent_file_name(4, Some(ExtentExtension::DbShm)), + "004.db-shm" + ); + } + #[test] + fn extent_name_basic_ext_wal() { + assert_eq!( + extent_file_name(4, Some(ExtentExtension::DbWal)), + "004.db-wal" + ); } #[test] fn extent_name_basic_two() { diff --git a/downstairs/src/repair.rs b/downstairs/src/repair.rs index c0d39e7f8..3b041326d 100644 --- a/downstairs/src/repair.rs +++ b/downstairs/src/repair.rs @@ -18,7 +18,9 @@ use serde::Deserialize; use serde::Serialize; use super::*; -use crate::region::{extent_dir, extent_file_name, extent_path}; +use crate::region::{ + extent_dir, extent_file_name, extent_path, ExtentExtension, +}; /** * Our context is the root of the region we want to serve. @@ -35,10 +37,7 @@ pub fn build_api( show: bool, ) -> Result, String> { let mut api = ApiDescription::new(); - api.register(get_extent).unwrap(); - api.register(get_db).unwrap(); - api.register(get_shm).unwrap(); - api.register(get_wal).unwrap(); + api.register(get_extent_file).unwrap(); api.register(get_files_for_extent).unwrap(); if show { @@ -109,82 +108,55 @@ pub struct Eid { eid: u32, } -/** - * Stream the contents of the data file for the given extent ID - */ -#[endpoint { - method = GET, - path = "/extent/{eid}/data", - unpublished = false, -}] -async fn get_extent( - rqctx: Arc>, - path: Path, -) -> Result, HttpError> { - let eid = path.into_inner().eid; - let extent_path = extent_path(rqctx.context().region_dir.clone(), eid); - - get_file(extent_path).await +#[derive(Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub enum FileType { + #[serde(rename = "data")] + Data, + #[serde(rename = "db")] + Database, + #[serde(rename = "db-shm")] + DatabaseSharedMemory, + #[serde(rename = "db-wal")] + DatabaseLog, } -/** - * Stream the contents of the metadata .db file for the given extent ID - */ -#[endpoint { - method = GET, - path = "/extent/{eid}/db", - unpublished = false, -}] -async fn get_db( - rqctx: Arc>, - path: Path, -) -> Result, HttpError> { - let eid = path.into_inner().eid; - let mut extent_path = extent_path(rqctx.context().region_dir.clone(), eid); - extent_path.set_extension("db"); - - get_file(extent_path).await +#[derive(Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct FileSpec { + eid: u32, + file_type: FileType, } -/** - * Stream the contents of the metadata .db-shm file for the given extent ID - */ #[endpoint { method = GET, - path = "/extent/{eid}/db-shm", - unpublished = false, + path = "/newextent/{eid}/{fileType}", }] -async fn get_shm( +async fn get_extent_file( rqctx: Arc>, - path: Path, + path: Path, ) -> Result, HttpError> { - let eid = path.into_inner().eid; - let mut extent_path = extent_path(rqctx.context().region_dir.clone(), eid); - extent_path.set_extension("db-shm"); + let fs = path.into_inner(); + let eid = fs.eid; - get_file(extent_path).await -} - -/** - * Stream the contents of the metadata .db-wal file for the given extent ID - */ -#[endpoint { - method = GET, - path = "/extent/{eid}/db-wal", - unpublished = false, -}] -async fn get_wal( - rqctx: Arc>, - path: Path, -) -> Result, HttpError> { - let eid = path.into_inner().eid; let mut extent_path = extent_path(rqctx.context().region_dir.clone(), eid); - extent_path.set_extension("db-wal"); + match fs.file_type { + FileType::Database => { + extent_path.set_extension("db"); + } + FileType::DatabaseSharedMemory => { + extent_path.set_extension("db-wal"); + } + FileType::DatabaseLog => { + extent_path.set_extension("db-shm"); + } + FileType::Data => (), + }; - get_file(extent_path).await + get_a_file(extent_path).await } -async fn get_file(path: PathBuf) -> Result, HttpError> { +async fn get_a_file(path: PathBuf) -> Result, HttpError> { println!("Request for file {:?}", path); /* * Make sure our file is neither a link nor a directory. @@ -224,7 +196,6 @@ struct ExtentFiles { #[endpoint { method = GET, path = "/extent/{eid}/files", - unpublished = false, }] async fn get_files_for_extent( rqctx: Arc>, @@ -248,38 +219,34 @@ async fn get_files_for_extent( } } +/** + * Return the list of extent files we have in our region directory + * that correspond to the given extent. Return an error if any + * of the required files are missing. + */ async fn extent_file_list( extent_dir: PathBuf, eid: u32, ) -> Result { - let mut full_name = extent_dir; - - let extent_name = extent_file_name(eid, None); - full_name.push(extent_name.clone()); let mut files = Vec::new(); - // The data file should always exist - if !full_name.exists() { - return Err(HttpError::for_bad_request(None, "EBADF".to_string())); + let possible_files = vec![ + (extent_file_name(eid, None), true), + (extent_file_name(eid, Some(ExtentExtension::Db)), true), + (extent_file_name(eid, Some(ExtentExtension::DbShm)), false), + (extent_file_name(eid, Some(ExtentExtension::DbWal)), false), + ]; + + for (file, required) in possible_files.into_iter() { + let mut fullname = extent_dir.clone(); + fullname.push(file.clone()); + if fullname.exists() { + files.push(file); + } else if required { + println!("Needed file {} is missing", file); + return Err(HttpError::for_bad_request(None, "EBADF".to_string())); + } } - files.push(extent_name); - // The db file should always exist. - full_name.set_extension("db"); - if !full_name.exists() { - return Err(HttpError::for_bad_request(None, "EBADF".to_string())); - } - files.push(extent_file_name(eid, Some("db"))); - // The db-shm file may exist. - full_name.set_extension("db-shm"); - if full_name.exists() { - println!("Exists: {:?}", full_name.file_name().unwrap()); - files.push(extent_file_name(eid, Some("db-shm"))); - } - // The db-wal file may exist. - full_name.set_extension("db-wal"); - if full_name.exists() { - files.push(extent_file_name(eid, Some("db-wal"))); - } Ok(ExtentFiles { files }) } @@ -310,8 +277,8 @@ mod test { region.extend(3)?; // Determine the directory and name for expected extent files. - let extent_dir = extent_dir(&dir, 1); - let mut ex_files = extent_file_list(extent_dir, 1).await.unwrap(); + let ed = extent_dir(&dir, 1); + let mut ex_files = extent_file_list(ed, 1).await.unwrap(); ex_files.files.sort(); let expected = vec!["001", "001.db", "001.db-shm", "001.db-wal"]; println!("files: {:?}", ex_files.files); @@ -383,4 +350,47 @@ mod test { Ok(()) } + + #[tokio::test] + async fn extent_expected_files_fail() -> Result<()> { + // Verify that we get an error if the expected extent.db file + // is missing. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Determine the directory and name for expected extent files. + let extent_dir = extent_dir(&dir, 2); + + // Delete db + let mut rm_file = extent_dir.clone(); + rm_file.push(extent_file_name(2, None)); + rm_file.set_extension("db"); + std::fs::remove_file(&rm_file).unwrap(); + + assert!(extent_file_list(extent_dir, 2).await.is_err()); + + Ok(()) + } + + #[tokio::test] + async fn extent_expected_files_fail_two() -> Result<()> { + // Verify that we get an error if the expected extent file + // is missing. + let dir = tempdir()?; + let mut region = Region::create(&dir, new_region_options())?; + region.extend(3)?; + + // Determine the directory and name for expected extent files. + let extent_dir = extent_dir(&dir, 1); + + // Delete db + let mut rm_file = extent_dir.clone(); + rm_file.push(extent_file_name(1, None)); + std::fs::remove_file(&rm_file).unwrap(); + + assert!(extent_file_list(extent_dir, 1).await.is_err()); + + Ok(()) + } } diff --git a/openapi/downstairs-repair.json b/openapi/downstairs-repair.json index 013f4b9e4..37eb79356 100644 --- a/openapi/downstairs-repair.json +++ b/openapi/downstairs-repair.json @@ -5,39 +5,11 @@ "version": "1" }, "paths": { - "/extent/{eid}/data": { - "get": { - "summary": "Stream the contents of the data file for the given extent ID", - "operationId": "get_extent", - "parameters": [ - { - "in": "path", - "name": "eid", - "required": true, - "schema": { - "type": "integer", - "format": "uint32", - "minimum": 0 - }, - "style": "simple" - } - ], - "responses": { - "default": { - "description": "", - "content": { - "*/*": { - "schema": {} - } - } - } - } - } - }, - "/extent/{eid}/db": { + "/extent/{eid}/files": { "get": { - "summary": "Stream the contents of the metadata .db file for the given extent ID", - "operationId": "get_db", + "summary": "For a given extent, return a vec of strings representing the names of", + "description": "the files that exist for that extent.", + "operationId": "get_files_for_extent", "parameters": [ { "in": "path", @@ -52,21 +24,28 @@ } ], "responses": { - "default": { - "description": "", + "200": { + "description": "successful operation", "content": { - "*/*": { - "schema": {} + "application/json": { + "schema": { + "$ref": "#/components/schemas/ExtentFiles" + } } } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" } } } }, - "/extent/{eid}/db-shm": { + "/newextent/{eid}/{fileType}": { "get": { - "summary": "Stream the contents of the metadata .db-shm file for the given extent ID", - "operationId": "get_shm", + "operationId": "get_extent_file", "parameters": [ { "in": "path", @@ -78,33 +57,13 @@ "minimum": 0 }, "style": "simple" - } - ], - "responses": { - "default": { - "description": "", - "content": { - "*/*": { - "schema": {} - } - } - } - } - } - }, - "/extent/{eid}/db-wal": { - "get": { - "summary": "Stream the contents of the metadata .db-wal file for the given extent ID", - "operationId": "get_wal", - "parameters": [ + }, { "in": "path", - "name": "eid", + "name": "fileType", "required": true, "schema": { - "type": "integer", - "format": "uint32", - "minimum": 0 + "$ref": "#/components/schemas/FileType" }, "style": "simple" } @@ -120,44 +79,6 @@ } } } - }, - "/extent/{eid}/files": { - "get": { - "summary": "For a given extent, return a vec of strings representing the names of", - "description": "the files that exist for that extent.", - "operationId": "get_files_for_extent", - "parameters": [ - { - "in": "path", - "name": "eid", - "required": true, - "schema": { - "type": "integer", - "format": "uint32", - "minimum": 0 - }, - "style": "simple" - } - ], - "responses": { - "200": { - "description": "successful operation", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/ExtentFiles" - } - } - } - }, - "4XX": { - "$ref": "#/components/responses/Error" - }, - "5XX": { - "$ref": "#/components/responses/Error" - } - } - } } }, "components": { @@ -206,6 +127,15 @@ "required": [ "files" ] + }, + "FileType": { + "type": "string", + "enum": [ + "data", + "db", + "db-shm", + "db-wal" + ] } } } From a65e6648ac79655446c8ffdca8597bcde1280541 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Wed, 20 Apr 2022 15:36:03 -0700 Subject: [PATCH 7/7] More PR comments addressed Added big picture comment on repair process. --- downstairs/src/region.rs | 99 +++++++++++++++++----------------- downstairs/src/repair.rs | 52 ++++++++---------- openapi/downstairs-repair.json | 24 +++------ tools/test_repair.sh | 6 ++- upstairs/src/lib.rs | 46 +++++++++++++++- 5 files changed, 128 insertions(+), 99 deletions(-) diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index b486b4208..99308d8a2 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -412,32 +412,35 @@ impl Default for ExtentMeta { } #[derive(Debug, Clone)] -pub enum ExtentExtension { +pub enum ExtentType { + Data, Db, DbShm, DbWal, } -impl fmt::Display for ExtentExtension { +impl fmt::Display for ExtentType { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - ExtentExtension::Db => write!(f, "db"), - ExtentExtension::DbShm => write!(f, "db-shm"), - ExtentExtension::DbWal => write!(f, "db-wal"), + ExtentType::Data => Ok(()), + ExtentType::Db => write!(f, "db"), + ExtentType::DbShm => write!(f, "db-shm"), + ExtentType::DbWal => write!(f, "db-wal"), } } } /** - * Take an ExtentExtension and translate it into the corresponding + * Take an ExtentType and translate it into the corresponding * FileType from the repair client. */ -impl ExtentExtension { +impl ExtentType { fn to_file_type(&self) -> FileType { match self { - ExtentExtension::Db => FileType::Db, - ExtentExtension::DbShm => FileType::DbShm, - ExtentExtension::DbWal => FileType::DbWal, + ExtentType::Data => FileType::Data, + ExtentType::Db => FileType::Db, + ExtentType::DbShm => FileType::DbShm, + ExtentType::DbWal => FileType::DbWal, } } } @@ -445,14 +448,14 @@ impl ExtentExtension { /** * Produce the string name of the data file for a given extent number */ -pub fn extent_file_name( - number: u32, - extension: Option, -) -> String { - if let Some(extension) = extension { - format!("{:03X}.{}", number & 0xFFF, extension) - } else { - format!("{:03X}", number & 0xFFF) +pub fn extent_file_name(number: u32, extent_type: ExtentType) -> String { + match extent_type { + ExtentType::Data => { + format!("{:03X}", number & 0xFFF) + } + ExtentType::Db | ExtentType::DbShm | ExtentType::DbWal => { + format!("{:03X}.{}", number & 0xFFF, extent_type) + } } } @@ -473,7 +476,7 @@ pub fn extent_dir>(dir: P, number: u32) -> PathBuf { */ pub fn extent_path>(dir: P, number: u32) -> PathBuf { let mut out = extent_dir(dir, number); - out.push(extent_file_name(number, None)); + out.push(extent_file_name(number, ExtentType::Data)); out } @@ -485,7 +488,7 @@ pub fn extent_path>(dir: P, number: u32) -> PathBuf { */ pub fn copy_dir>(dir: P, number: u32) -> PathBuf { let mut out = extent_dir(dir, number); - out.push(extent_file_name(number, None)); + out.push(extent_file_name(number, ExtentType::Data)); out.set_extension("copy".to_string()); out } @@ -500,7 +503,7 @@ pub fn copy_dir>(dir: P, number: u32) -> PathBuf { */ pub fn replace_dir>(dir: P, number: u32) -> PathBuf { let mut out = extent_dir(dir, number); - out.push(extent_file_name(number, None)); + out.push(extent_file_name(number, ExtentType::Data)); out.set_extension("replace".to_string()); out } @@ -514,7 +517,7 @@ pub fn replace_dir>(dir: P, number: u32) -> PathBuf { */ pub fn completed_dir>(dir: P, number: u32) -> PathBuf { let mut out = extent_dir(dir, number); - out.push(extent_file_name(number, None)); + out.push(extent_file_name(number, ExtentType::Data)); out.set_extension("completed".to_string()); out } @@ -552,14 +555,14 @@ pub fn validate_repair_files(eid: usize, files: &[String]) -> bool { let eid = eid as u32; let some = vec![ - extent_file_name(eid, None), - extent_file_name(eid, Some(ExtentExtension::Db)), + extent_file_name(eid, ExtentType::Data), + extent_file_name(eid, ExtentType::Db), ]; let mut all = some.clone(); all.extend(vec![ - extent_file_name(eid, Some(ExtentExtension::DbShm)), - extent_file_name(eid, Some(ExtentExtension::DbWal)), + extent_file_name(eid, ExtentType::DbShm), + extent_file_name(eid, ExtentType::DbWal), ]); // Either we have some or all. @@ -785,9 +788,10 @@ impl Extent { fn create_copy_file( &self, mut copy_dir: PathBuf, - extension: Option, + extension: Option, ) -> Result { - let name = extent_file_name(self.number, None); + // Get the base extent name before we consider the actual Type + let name = extent_file_name(self.number, ExtentType::Data); copy_dir.push(name); if let Some(extension) = extension { let ext = format!("{}", extension); @@ -1259,8 +1263,7 @@ impl Region { .get_files_for_extent(eid as u32) .await .unwrap() - .into_inner() - .files; + .into_inner(); repair_files.sort(); println!("Found repair files: {:?}", repair_files); @@ -1284,7 +1287,7 @@ impl Region { // The .db file is also required to exist for any valid extent. let extent_db = extent - .create_copy_file(copy_dir.clone(), Some(ExtentExtension::Db)) + .create_copy_file(copy_dir.clone(), Some(ExtentType::Db)) .unwrap(); let repair_stream = repair_server .get_extent_file(eid as u32, FileType::Db) @@ -1293,8 +1296,8 @@ impl Region { save_stream_to_file(extent_db, repair_stream.into_inner()).await?; // These next two are optional. - for opt_file in &[ExtentExtension::DbShm, ExtentExtension::DbWal] { - let filename = extent_file_name(eid as u32, Some(opt_file.clone())); + for opt_file in &[ExtentType::DbShm, ExtentType::DbWal] { + let filename = extent_file_name(eid as u32, opt_file.clone()); if repair_files.contains(&filename) { let extent_shm = extent .create_copy_file(copy_dir.clone(), Some(opt_file.clone())) @@ -1617,7 +1620,7 @@ pub fn move_replacement_extent>( eid: usize, ) -> Result<(), CrucibleError> { let destination_dir = extent_dir(®ion_dir, eid as u32); - let extent_file_name = extent_file_name(eid as u32, None); + let extent_file_name = extent_file_name(eid as u32, ExtentType::Data); let replace_dir = replace_dir(®ion_dir, eid as u32); let completed_dir = completed_dir(®ion_dir, eid as u32); @@ -1942,7 +1945,7 @@ mod test { // We are simulating the copy of files from the "source" repair // extent by copying the files from extent zero into the copy // directory. - let dest_name = extent_file_name(1, None); + let dest_name = extent_file_name(1, ExtentType::Data); let mut source_path = extent_path(&dir, 0); let mut dest_path = cp.clone(); dest_path.push(dest_name); @@ -2007,7 +2010,7 @@ mod test { // We are simulating the copy of files from the "source" repair // extent by copying the files from extent zero into the copy // directory. - let dest_name = extent_file_name(1, None); + let dest_name = extent_file_name(1, ExtentType::Data); let mut source_path = extent_path(&dir, 0); let mut dest_path = cp.clone(); dest_path.push(dest_name); @@ -2083,7 +2086,7 @@ mod test { // We are simulating the copy of files from the "source" repair // extent by copying the files from extent zero into the copy // directory. - let dest_name = extent_file_name(1, None); + let dest_name = extent_file_name(1, ExtentType::Data); let mut source_path = extent_path(&dir, 0); let mut dest_path = cp.clone(); dest_path.push(dest_name); @@ -2333,41 +2336,35 @@ mod test { #[test] fn extent_name_basic() { - assert_eq!(extent_file_name(4, None), "004"); + assert_eq!(extent_file_name(4, ExtentType::Data), "004"); } #[test] fn extent_name_basic_ext() { - assert_eq!(extent_file_name(4, Some(ExtentExtension::Db)), "004.db"); + assert_eq!(extent_file_name(4, ExtentType::Db), "004.db"); } #[test] fn extent_name_basic_ext_shm() { - assert_eq!( - extent_file_name(4, Some(ExtentExtension::DbShm)), - "004.db-shm" - ); + assert_eq!(extent_file_name(4, ExtentType::DbShm), "004.db-shm"); } #[test] fn extent_name_basic_ext_wal() { - assert_eq!( - extent_file_name(4, Some(ExtentExtension::DbWal)), - "004.db-wal" - ); + assert_eq!(extent_file_name(4, ExtentType::DbWal), "004.db-wal"); } #[test] fn extent_name_basic_two() { - assert_eq!(extent_file_name(10, None), "00A"); + assert_eq!(extent_file_name(10, ExtentType::Data), "00A"); } #[test] fn extent_name_basic_three() { - assert_eq!(extent_file_name(59, None), "03B"); + assert_eq!(extent_file_name(59, ExtentType::Data), "03B"); } #[test] fn extent_name_max() { - assert_eq!(extent_file_name(u32::MAX, None), "FFF"); + assert_eq!(extent_file_name(u32::MAX, ExtentType::Data), "FFF"); } #[test] fn extent_name_min() { - assert_eq!(extent_file_name(u32::MIN, None), "000"); + assert_eq!(extent_file_name(u32::MIN, ExtentType::Data), "000"); } #[test] diff --git a/downstairs/src/repair.rs b/downstairs/src/repair.rs index 3b041326d..af5a698fd 100644 --- a/downstairs/src/repair.rs +++ b/downstairs/src/repair.rs @@ -15,12 +15,9 @@ use http::{Response, StatusCode}; use hyper::Body; use schemars::JsonSchema; use serde::Deserialize; -use serde::Serialize; use super::*; -use crate::region::{ - extent_dir, extent_file_name, extent_path, ExtentExtension, -}; +use crate::region::{extent_dir, extent_file_name, extent_path, ExtentType}; /** * Our context is the root of the region we want to serve. @@ -184,12 +181,9 @@ async fn get_a_file(path: PathBuf) -> Result, HttpError> { } } -#[derive(Deserialize, Serialize, JsonSchema)] -struct ExtentFiles { - files: Vec, -} - /** + * Get the list of files related to an extent. + * * For a given extent, return a vec of strings representing the names of * the files that exist for that extent. */ @@ -200,7 +194,7 @@ struct ExtentFiles { async fn get_files_for_extent( rqctx: Arc>, path: Path, -) -> Result, HttpError> { +) -> Result>, HttpError> { let eid = path.into_inner().eid; let extent_dir = extent_dir(rqctx.context().region_dir.clone(), eid); @@ -227,13 +221,13 @@ async fn get_files_for_extent( async fn extent_file_list( extent_dir: PathBuf, eid: u32, -) -> Result { +) -> Result, HttpError> { let mut files = Vec::new(); let possible_files = vec![ - (extent_file_name(eid, None), true), - (extent_file_name(eid, Some(ExtentExtension::Db)), true), - (extent_file_name(eid, Some(ExtentExtension::DbShm)), false), - (extent_file_name(eid, Some(ExtentExtension::DbWal)), false), + (extent_file_name(eid, ExtentType::Data), true), + (extent_file_name(eid, ExtentType::Db), true), + (extent_file_name(eid, ExtentType::DbShm), false), + (extent_file_name(eid, ExtentType::DbWal), false), ]; for (file, required) in possible_files.into_iter() { @@ -247,7 +241,7 @@ async fn extent_file_list( } } - Ok(ExtentFiles { files }) + Ok(files) } #[cfg(test)] @@ -279,10 +273,10 @@ mod test { // Determine the directory and name for expected extent files. let ed = extent_dir(&dir, 1); let mut ex_files = extent_file_list(ed, 1).await.unwrap(); - ex_files.files.sort(); + ex_files.sort(); let expected = vec!["001", "001.db", "001.db-shm", "001.db-wal"]; - println!("files: {:?}", ex_files.files); - assert_eq!(ex_files.files, expected); + println!("files: {:?}", ex_files); + assert_eq!(ex_files, expected); Ok(()) } @@ -301,17 +295,17 @@ mod test { // Delete db-wal and db-shm let mut rm_file = extent_dir.clone(); - rm_file.push(extent_file_name(1, None)); + rm_file.push(extent_file_name(1, ExtentType::Data)); rm_file.set_extension("db-wal"); std::fs::remove_file(&rm_file).unwrap(); rm_file.set_extension("db-shm"); std::fs::remove_file(rm_file).unwrap(); let mut ex_files = extent_file_list(extent_dir, 1).await.unwrap(); - ex_files.files.sort(); + ex_files.sort(); let expected = vec!["001", "001.db"]; - println!("files: {:?}", ex_files.files); - assert_eq!(ex_files.files, expected); + println!("files: {:?}", ex_files); + assert_eq!(ex_files, expected); Ok(()) } @@ -336,17 +330,17 @@ mod test { // Delete db-wal and db-shm. On illumos the close of the extent // may remove these for us, so we ignore errors on the removal. let mut rm_file = extent_dir.clone(); - rm_file.push(extent_file_name(1, None)); + rm_file.push(extent_file_name(1, ExtentType::Data)); rm_file.set_extension("db-wal"); let _ = std::fs::remove_file(&rm_file); rm_file.set_extension("db-shm"); let _ = std::fs::remove_file(rm_file); let mut ex_files = extent_file_list(extent_dir, 1).await.unwrap(); - ex_files.files.sort(); + ex_files.sort(); let expected = vec!["001", "001.db"]; - println!("files: {:?}", ex_files.files); - assert_eq!(ex_files.files, expected); + println!("files: {:?}", ex_files); + assert_eq!(ex_files, expected); Ok(()) } @@ -364,7 +358,7 @@ mod test { // Delete db let mut rm_file = extent_dir.clone(); - rm_file.push(extent_file_name(2, None)); + rm_file.push(extent_file_name(2, ExtentType::Data)); rm_file.set_extension("db"); std::fs::remove_file(&rm_file).unwrap(); @@ -386,7 +380,7 @@ mod test { // Delete db let mut rm_file = extent_dir.clone(); - rm_file.push(extent_file_name(1, None)); + rm_file.push(extent_file_name(1, ExtentType::Data)); std::fs::remove_file(&rm_file).unwrap(); assert!(extent_file_list(extent_dir, 1).await.is_err()); diff --git a/openapi/downstairs-repair.json b/openapi/downstairs-repair.json index 37eb79356..1fb043a6b 100644 --- a/openapi/downstairs-repair.json +++ b/openapi/downstairs-repair.json @@ -7,8 +7,8 @@ "paths": { "/extent/{eid}/files": { "get": { - "summary": "For a given extent, return a vec of strings representing the names of", - "description": "the files that exist for that extent.", + "summary": "Get the list of files related to an extent.", + "description": "For a given extent, return a vec of strings representing the names of the files that exist for that extent.", "operationId": "get_files_for_extent", "parameters": [ { @@ -29,7 +29,11 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/ExtentFiles" + "title": "Array_of_String", + "type": "array", + "items": { + "type": "string" + } } } } @@ -114,20 +118,6 @@ "request_id" ] }, - "ExtentFiles": { - "type": "object", - "properties": { - "files": { - "type": "array", - "items": { - "type": "string" - } - } - }, - "required": [ - "files" - ] - }, "FileType": { "type": "string", "enum": [ diff --git a/tools/test_repair.sh b/tools/test_repair.sh index 1a392c744..b80a02ed2 100755 --- a/tools/test_repair.sh +++ b/tools/test_repair.sh @@ -1,5 +1,9 @@ #!/usr/bin/env bash +# A test to break, then Repair a downstairs region that is out of sync with +# the other regions. +# This assumes you already have regions created. + cargo build || echo "Failed to build" cds="./target/debug/crucible-downstairs" @@ -50,7 +54,7 @@ fi kill "$slow_pid" # Start loop -for (( i = 0; i < 30; i += 1 )); do +for (( i = 0; i < 300; i += 1 )); do # restart downstairs with lossy ${cds} run -d var/"${slow_port}" -p "${slow_port}" --lossy &> "${slow_log}" & diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 9a6791953..8a7aa530b 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -6400,7 +6400,51 @@ pub struct Arg { * to this task (in the ds_reconcile() function) that a downstairs has * completed a reconcile request. * - * This task drives any reconciliation if necessary. + * This task drives any reconciliation if necessary. If Repair is required, + * it happens in three phases. Typically an interruption of repair will + * result in things starting over, but if actual repair work to an extent + * is completed, that extent won't need to be repaired again. + * + * The three phases are: + * + * Collect: + * When a Downstairs connects, the Upstairs collects the gen/flush/dirty + * (GFD) info from all extents. This GFD information is stored and the + * Upstairs waits for all three Downstairs to attach. + * + * Compare: + * In the compare phase, the upstairs will walk the list of all extents + * and compare the G/F/D from each of the downstairs. When there is a + * mismatch between downstairs (The dirty bit counts as a mismatch and will + * force a repair even if generation and flush numbers agree). For each + * mismatch, the upstairs determines which downstairs has the extent that + * should be the source, and which of the other downstairs extents needs + * repair. This list of mismatches (source, destination(s)) is collected. + * Once an upstairs has compiled its repair list, it will then generates a + * sequence of Upstairs -> Downstairs repair commands to repair each + * extent that needs to be fixed. For a given piece of repair work, the + * commands are: + * - Send a flush to source extent. + * - Close extent on all downstairs. + * - Send repair command to destination extents (with source extent + * IP/Port). + * (See DS-DS Repair) + * - Reopen all extents. + * + * Repair: + * During repair Each command issued from the upstairs must be completed + * before the next will be sent. The Upstairs is responsible for walking + * the repair commands and sending them to the required downstairs, and + * waiting for them to finish. The actual repair work for an extent + * takes place on the downstairs being repaired. + * + * Repair (ds to ds) + * Each downstairs runs a repair server (Dropshot) that listens for + * repair requests from other downstairs. A downstairs with an extent + * that needs repair will contact the source downstairs and request the + * list of files for an extent, then request each file. Once all files + * are local to the downstairs needing repair, it will replace the existing + * extent files with the new ones. */ async fn up_listen( up: &Arc,