diff --git a/README.md b/README.md index adaea18..db38ec7 100644 --- a/README.md +++ b/README.md @@ -41,17 +41,20 @@ Server = http://:7878/$repo/os/$arch * Concurrent downloads: You can have multiple clients downloading files from Flexo without one client having to wait. * Efficient bandwidth sharing for concurrent downloads: Flexo does not require a new connection to the remote mirror when the same file is downloaded by multiple clients. For instance, suppose a client starts downloading a given file. - After 5 seconds have elapsed, 100MB have been downloaded. Now, a second client requests the same file. The second - client will receive the first 100MB immediately from the local file system. Then, both clients continue to download + After 5 seconds have elapsed, 100 MB have been downloaded. Now, a second client requests the same file. The second + client will receive the first 100 MB immediately from the local file system. Then, both clients continue to download the file, while only a single connection to the remote mirror exists. This means that your bandwidth is not split for the two clients, both clients will be able to download the file with the full download speed provided by your ISP. * Persistent connections: This is especially useful when many small files are downloaded, since no new TLS negotiation is required for each file. +* The package cache is cleaned automatically: No need to set up cron jobs or systemd timers to clean the cache + regularly, Flexo will automatically ensure that only the 3 most recent versions of a package are kept in your cache + (this parameter can be changed). ## Configuration The AUR package will install the configuration file in `/etc/flexo/flexo.toml`. -It includes many comments and should be self explanatory (open an issue in case you disagree). +It includes many comments and should be self-explanatory (open an issue in case you disagree). In most cases, you will want to leave all settings unchanged, with two exceptions: 1. The setting `low_speed_limit` is commented by default, which means that flexo will *not* attempt @@ -89,11 +92,13 @@ For issues related to the mirror selection, also see [this page](./mirror_select ## Cleaning the package cache -Starting with version 1.2.0, Flexo includes the setting `num_versions_retain` to purge the package cache. See the -[configuration example](./flexo/conf/flexo.toml) for more details. +The default configuration of Flexo will keep 3 versions of a package in cache: After a 4th version of a package has been +downloaded, the oldest version will be automatically removed. This setting can be changed with the `num_versions_retain` +parameter. See the [configuration example](./flexo/conf/flexo.toml) for more details. -If you use Docker, make sure to use an image that is tagged with a version of 1.2.2 or higher. By default, 3 versions are kept in the cache. -Adapt the `FLEXO_NUM_VERSIONS_RETAIN` environment variable to change the number of versions kept in cache. +If you use Docker, the default behavior can be changed with the `FLEXO_NUM_VERSIONS_RETAIN` environment variable. + +If you want to disable this setting and never purge the cache, set the parameter to `0`. ## Using Unofficial User Repositories @@ -139,7 +144,7 @@ if that feature is desired and if it fits into the design goals of flexo. ## Development -Details about design decisions and the terminology used in the code +Details about design decisions, and the terminology used in the code, are described [here](flexo/terminology.md). The following packages are required to build and test flexo: @@ -150,7 +155,7 @@ pacman -S rustup docker docker-compose curl The [./docker-compose](test/docker-test-local/docker-compose) script may require you to be able to use Docker without root privileges. Add your user to the Docker group to do so. You may want to read -the [wiki](https://wiki.archlinux.org/index.php/Docker) for more details on on the security +the [wiki](https://wiki.archlinux.org/index.php/Docker) for more details on the security implications of doing so. ``` @@ -168,7 +173,7 @@ One way to enable it is to modify your `~/.docker/config.json` to include the fo Make sure to restart the Docker daemon after modifying this file. We have two types of test cases: -1. Tests written in Rust: [integration_tests.rs](flexo/tests/integration_test.rs). These tests run quickly +1. Tests written in Rust: [integration_tests.rs](flexo/tests/integration_test.rs). These tests run quickly, and they are fully deterministic (afaik). You can run them with `cargo`: ``` cd flexo @@ -176,10 +181,10 @@ and they are fully deterministic (afaik). You can run them with `cargo`: ``` 2. end-to-end tests using Docker. We try to avoid flaky test cases, but we cannot guarantee that all our Docker test cases are deterministic, -since their outcome depends on various factors outside of our control (e.g. how the scheduler runs OS processes, -how TCP packets are assembled by the kernel's TCP stack, etc.). -As a result, a failing end-to-end -test may indicate that a new bug was introduced, but it might also have been bad luck or a badly written test case. +since their outcome depends on various factors outside our control (e.g. how the scheduler runs OS processes, +how TCP packets are assembled by the kernel's TCP stack, etc.). +As a result, a failing end-to-end test may indicate that a new bug was introduced, but it might also have been bad luck +or a badly written test case. In order to run the Docker test cases, run the shell script to set up everything: diff --git a/flexo/Cargo.lock b/flexo/Cargo.lock index f223573..f997651 100644 --- a/flexo/Cargo.lock +++ b/flexo/Cargo.lock @@ -202,12 +202,13 @@ dependencies = [ [[package]] name = "flexo" -version = "1.3.0" +version = "1.3.1" dependencies = [ "chrono", "crossbeam", "curl", "env_logger", + "glob", "http", "httparse", "humantime", @@ -220,7 +221,6 @@ dependencies = [ "time", "toml", "walkdir", - "xattr", ] [[package]] @@ -264,6 +264,12 @@ dependencies = [ "wasi 0.10.0+wasi-snapshot-preview1", ] +[[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + [[package]] name = "hermit-abi" version = "0.1.11" @@ -790,12 +796,3 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - -[[package]] -name = "xattr" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "244c3741f4240ef46274860397c7c74e50eb23624996930e484c16679633a54c" -dependencies = [ - "libc", -] diff --git a/flexo/Cargo.toml b/flexo/Cargo.toml index b100a70..c0b2b52 100644 --- a/flexo/Cargo.toml +++ b/flexo/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "flexo" -version = "1.3.0" +version = "1.3.1" authors = ["Fabian Muscariello "] edition = "2018" @@ -18,11 +18,11 @@ crossbeam = "0.8.0" httparse = "1.3.4" time = "0.1.3" walkdir = "2.3.1" -xattr = "0.2.2" log = "0.4.14" chrono = { version = "0.4", features = ["serde", "rustc-serialize"] } humantime = "2.1.0" env_logger = "0.8.3" +glob = "0.3.0" [dev-dependencies] tempfile = "3.2.0" diff --git a/flexo/src/lib.rs b/flexo/src/lib.rs index bbe0566..3a378e3 100644 --- a/flexo/src/lib.rs +++ b/flexo/src/lib.rs @@ -455,6 +455,7 @@ impl JobContext where J: Job { return ScheduleOutcome::Uncacheable(self.best_provider(custom_provider)); }, Some(CachedItem { complete_size: Some(c), cached_size }) if c == cached_size => { + debug!("Order {:?} is already cached.", &order); return ScheduleOutcome::Cached; }, Some(CachedItem { cached_size, .. } ) => cached_size, diff --git a/flexo/src/main.rs b/flexo/src/main.rs index 8afb709..9917b61 100644 --- a/flexo/src/main.rs +++ b/flexo/src/main.rs @@ -3,9 +3,9 @@ extern crate http; #[macro_use] extern crate log; extern crate rand; -use std::ffi::OsString; use std::fs::File; use std::io; +use std::fs; use std::io::ErrorKind; use std::io::prelude::*; use std::net::{TcpListener, TcpStream}; @@ -17,6 +17,7 @@ use std::sync::{Arc, Mutex}; use crossbeam::channel::Receiver; use crossbeam::channel::RecvTimeoutError; +use glob::glob; use libc::off64_t; #[cfg(test)] use tempfile::tempfile; @@ -61,7 +62,7 @@ fn main() { let properties = mirror_config::load_config(); debug!("The following settings were fetched from the TOML file or environment variables: {:#?}", &properties); - initialize_cache(&properties); + inspect_and_initialize_cache(&properties); match properties.low_speed_limit { None => {}, Some(limit) => { @@ -106,6 +107,7 @@ fn main() { (Ok(true), Some(0)) => {}, (Ok(true), Some(v)) => { purge_cache(&cache_directory, v); + purge_cfs_files(&cache_directory); }, _ => {}, } @@ -141,6 +143,40 @@ fn purge_cache(directory: &str, num_versions_retain: u32) { } } +fn purge_cfs_files(directory: &str) { + let glob_pattern = format!("{}/**/.*.cfs", directory); + for path in glob(&glob_pattern).unwrap() { + match &path { + Ok(p) => { + match p.file_name().unwrap().to_str() { + None => { + warn!("Invalid unicode: {:?}", p.file_name()); + } + Some(filename) => { + let corresponding_package_filename = filename + .strip_prefix(".").unwrap() + .strip_suffix(".cfs").unwrap(); + let corresponding_package_filepath = p.with_file_name(corresponding_package_filename); + if !corresponding_package_filepath.exists() { + match fs::remove_file(&p) { + Ok(()) => { + debug!("File {:?} is no longer required and therefore removed.", &p); + } + Err(e) => { + warn!("Unable to remove file {:?}: {:?}", &p, e); + } + } + } + } + } + } + Err(e) => { + warn!("Unreadable path: {:?}", &e); + } + } + } +} + fn str_from_vec(v: Vec) -> Option { match String::from_utf8(v) { Ok(s) if !s.is_empty() => Some(s), @@ -177,7 +213,7 @@ fn serve_request(job_context: Arc>>, let order = DownloadOrder { filepath: get_request.path, }; - debug!("Attempt to schedule new job"); + debug!("Schedule new job"); let result = job_context.lock().unwrap().try_schedule(order.clone(), custom_provider, get_request.resume_from); match result { ScheduleOutcome::AlreadyInProgress => { @@ -553,12 +589,12 @@ fn receive_content_length(rx: Receiver) -> Result Result { let mut num_attempts = 0; // Timeout after 2 seconds. - while num_attempts < 2_000 * 2 { - match content_length_from_path(path)? { + while num_attempts < 2_000 { + match get_complete_size_from_cfs_file(path) { None => { - // for the unlikely event that this file has just been created, but the extended attribute - // has not been set yet. - std::thread::sleep(std::time::Duration::from_micros(500)); + // for the unlikely event that this file has just been created, but the cfs file + // has not been created yet. + std::thread::sleep(std::time::Duration::from_millis(1)); }, Some(v) => return Ok(v), } @@ -569,23 +605,6 @@ fn try_complete_filesize_from_path(path: &Path) -> Result { Err(FileAttrError::TimeoutError) } -fn content_length_from_path(path: &Path) -> Result, FileAttrError> { - let key = OsString::from("user.content_length"); - let value = xattr::get(&path, &key)?; - match value { - Some(value) => { - let content_length = String::from_utf8(value).map_err(FileAttrError::from) - .and_then(|v| v.parse::().map_err(FileAttrError::from))?; - debug!("Found file! content length is {}", content_length); - Ok(Some(content_length)) - }, - None => { - info!("file exists, but no content length is set."); - Ok(None) - } - } -} - fn serve_from_growing_file( mut file: File, content_length: u64, diff --git a/flexo/src/mirror_config.rs b/flexo/src/mirror_config.rs index 7a3aa7b..735a8dc 100644 --- a/flexo/src/mirror_config.rs +++ b/flexo/src/mirror_config.rs @@ -173,7 +173,7 @@ fn mirrors_auto_config_from_env() -> MirrorsAutoConfig { .split(",") .into_iter() .filter(|s| !s.is_empty()) - .map(|s| s.to_owned()) + .map(|s| s.trim().to_owned()) .collect::>() ); let mirrors_blacklist = diff --git a/flexo/src/mirror_flexo.rs b/flexo/src/mirror_flexo.rs index 7fee046..1177e8b 100644 --- a/flexo/src/mirror_flexo.rs +++ b/flexo/src/mirror_flexo.rs @@ -2,15 +2,15 @@ extern crate flexo; use std::{fs, str}; use std::cmp::Ordering; -use std::ffi::OsString; use std::fs::File; use std::fs::OpenOptions; use std::io::BufWriter; use std::io::{ErrorKind, Read, Write}; use std::num::ParseIntError; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::string::FromUtf8Error; use std::time::Duration; +use std::os::unix::ffi::OsStrExt; use crossbeam::channel::Sender; use curl::easy::{Easy2, Handler, HttpVersion, WriteError}; @@ -46,9 +46,6 @@ const MAX_REDIRECTIONS: u32 = 3; const LATENCY_TEST_NUM_ATTEMPTS: u32 = 5; -const ERR_MSG_XATTR_SUPPORT: &str = "Unable to get extended file attributes. Please make sure that the path \ -set as cache_directory resides on a file system with support for extended attributes."; - #[derive(Debug, PartialEq, Eq)] pub enum ClientError { BufferSizeExceeded, @@ -322,7 +319,7 @@ impl Job for DownloadJob { // it also has side effects. fn cache_state(order: &Self::O, properties: &Self::PR) -> Option { let path = Path::new(&properties.cache_directory).join(&order.filepath); - cache_state_from_path(&path) + persist_and_get_cache_state(&path) } fn serve_from_provider(self, mut channel: DownloadChannel, @@ -459,17 +456,14 @@ impl Job for DownloadJob { } } -pub fn initialize_cache(mirror_config: &MirrorConfig) { +pub fn inspect_and_initialize_cache(mirror_config: &MirrorConfig) { let mut sum_size = 0; let mut count_cache_items = 0; for entry in WalkDir::new(&mirror_config.cache_directory) { let entry = entry.expect("Error while reading directory entry"); - if entry.file_type().is_file() { - match cache_state_from_path(entry.path()) { - None => { - // This should happen only in extremely unlikely circumstances, e.g. when the file is - // deleted shortly after this function started executing. - } + if entry.file_type().is_file() && !entry.file_name().as_bytes().starts_with(b".") { + match persist_and_get_cache_state(entry.path()) { + None => {}, Some(CachedItem { cached_size, .. }) => { sum_size += cached_size; count_cache_items += 1; @@ -481,10 +475,12 @@ pub fn initialize_cache(mirror_config: &MirrorConfig) { info!("Retrieved {} files with a total size of {} from local file system.", count_cache_items, size_formatted); } -fn cache_state_from_path(path: &Path) -> Option { +fn persist_and_get_cache_state(path: &Path) -> Option { + debug!("Determine cache state for path {:?}", &path); let file = match File::open(path) { Ok(f) => f, Err(e) if e.kind() == ErrorKind::NotFound => { + debug!("The file {:?} does not exist (yet)", &path); // The requested file is not cached, yet. return None; } @@ -496,38 +492,31 @@ fn cache_state_from_path(path: &Path) -> Option { panic!("Unexpected I/O error occurred: {:?}", e); } }; - let key = OsString::from("user.content_length"); let file_size = file.metadata().expect("Unable to fetch file metadata").len(); - let complete_size = match xattr::get(path, &key).expect(ERR_MSG_XATTR_SUPPORT) { - Some(value) => { - let result = String::from_utf8(value).map_err(FileAttrError::from) - .and_then(|v| v.parse::().map_err(FileAttrError::from)); - match result { - Ok(v) => Some(v), - Err(e) => { - error!("Unable to read extended attribute user.content_length from file {:?}: {:?}", path, e); - None - }, - } - }, + let complete_size = match get_complete_size_from_cfs_file(path) { None => { - // Flexo sets the extended attributes for all files, but this file lacks this attribute: - // We assume that this mostly happens when the user copies files into the directory used - // by flexo, and we further assume that users will do this only if this file is complete. - // Therefore, we can set the content length attribute of this file to the file size. - let value = file_size.to_string(); - match xattr::set(path, &key, &value.as_bytes()) { - Ok(()) => { - info!("The file {:?} used to lack the content-length attribute, \ - this attribute has now been set to {}.", path, value); - }, - Err(e) => { - error!("Unable to set extended file attributes for {:?}: {:?}. Please make sure that \ - flexo has write permissions for this file.", path, e) + // Flexo maintains the complete file size (i.e., the expected file size when the + // download has completed) in files with the "cfs" extension (cfs = complete file size). + // If, for some reason, the complete file size could not be determined from the cfs + // file, then just assuming that the current file size is already the complete file + // size is usually a safe fallback. For example, this case can occur if files have been + // copied to Flexo's package directory, or if the user removed the files with the cfs + // extension. + debug!("Unable to fetch file size from CFS file for {:?}", path); + match path.metadata().expect("Unable to fetch file metadata").len() { + 0 => { + info!("File {:?} is empty. Apparently, a previous download was aborted. This file will be removed", + path); + let _ = fs::remove_file(path); + return None; }, + s => { + create_cfs_file(path, s); + Some(s) + } } - Some(file_size) - }, + } + Some(s) => Some(s) }; Some(CachedItem { cached_size: file_size, @@ -535,6 +524,49 @@ fn cache_state_from_path(path: &Path) -> Option { }) } +pub fn get_complete_size_from_cfs_file(path: &Path) -> Option { + let cfs_path = cfs_path_from_pkg_path(&path)?; + match fs::read_to_string(&cfs_path) { + Ok(s) => { + if s.ends_with("\n") && s.chars().rev().skip(1).all(|c| c.is_ascii_digit()) { + let without_newline = &s[0..s.len()-1]; + match without_newline.parse::() { + Ok(size) => Some(size), + Err(e) => { + error!("Unable to parse {:?} into integer: {:?}", s, e); + None + } + } + } else { + error!("File {:?} has unexpected format: Expected a single line containing digits only", cfs_path); + None + } + } + Err(e) if e.kind() == ErrorKind::NotFound => { + debug!("CFS file {:?} does not exist", cfs_path); + None + } + Err(e) => { + error!("Unable to read file {:?} into string: {:?}", cfs_path, e); + None + } + } +} + +fn cfs_path_from_pkg_path(path: &Path) -> Option { + match path.file_name() { + None => { + warn!("Unable to determine file name from path {:?}", path); + None + } + Some(f) => { + let filename = f.to_str().expect("Expected all file names to be valid UTF-8"); + let cfs_filename = format!(".{}.cfs", filename); + Some(path.with_file_name(cfs_filename)) + } + } +} + #[derive(PartialEq, Eq, Hash, Clone, Debug)] pub struct DownloadOrder { /// This path is relative to the given root directory. @@ -681,21 +713,17 @@ impl Handler for DownloadState { ).unwrap(); debug!("Content length is {}", content_length); job_resources.header_state.header_success = Some(HeaderOutcome::Ok(content_length)); - let path = Path::new(&self.properties.cache_directory).join(&self.job_state.order.filepath); - let key = OsString::from("user.content_length"); // TODO it may be safer to obtain the size_written from the job_state, i.e., add a new item to // the job state that stores the size the job should be started with. With the current // implementation, we assume that the header method is always called before anything is written to // the file. let size_written = self.job_state.job_resources.as_ref().unwrap().file_state.size_written; + let path = Path::new(&self.properties.cache_directory).join(&self.job_state.order.filepath); // TODO stick to a consistent terminology, everywhere: client_content_length = the content length // as communicated to the client, i.e., what the client receives in his headers. // provider_content_length = the content length we send to the provider. let client_content_length = size_written + content_length; - let value = format!("{}", client_content_length); - debug!("Setting the extended file attribute"); - xattr::set(path, &key, &value.as_bytes()) - .expect("Unable to set extended file attributes"); + create_cfs_file(&path, client_content_length); debug!("Sending content length: {}", client_content_length); let message: FlexoProgress = FlexoProgress::JobSize(client_content_length); let _ = self.job_state.tx.send(message); @@ -729,6 +757,24 @@ impl Handler for DownloadState { } } +fn create_cfs_file(path: &Path, complete_filesize: u64) { + debug!("Creating CFS file for {:?}", &path); + let cfs_path = cfs_path_from_pkg_path(path).unwrap(); + let mut cfs_file = match File::create(&cfs_path) { + Ok(f) => f, + Err(e) => { + error!("Unable to create CFS file {:?}: {:?}", &cfs_path, e); + return; + } + }; + match cfs_file.write_all(format!("{}\n", complete_filesize).as_bytes()) { + Ok(_) => {} + Err(e) => { + error!("Unable to write to CFS file {:?}: {:?}", &cfs_path, e); + } + } +} + #[derive(Debug)] pub struct DownloadChannel { handle: Easy2,