Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store file size in normal files, not extended attributes #54

Merged
merged 10 commits into from
May 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 19 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,20 @@ Server = http://<FLEXO_SERVER_IP_ADDRESS>:7878/$repo/os/$arch
* Concurrent downloads: You can have multiple clients downloading files from Flexo without one client having to wait.
* Efficient bandwidth sharing for concurrent downloads: Flexo does not require a new connection to the remote mirror
when the same file is downloaded by multiple clients. For instance, suppose a client starts downloading a given file.
After 5 seconds have elapsed, 100MB have been downloaded. Now, a second client requests the same file. The second
client will receive the first 100MB immediately from the local file system. Then, both clients continue to download
After 5 seconds have elapsed, 100 MB have been downloaded. Now, a second client requests the same file. The second
client will receive the first 100 MB immediately from the local file system. Then, both clients continue to download
the file, while only a single connection to the remote mirror exists. This means that your bandwidth is not split for
the two clients, both clients will be able to download the file with the full download speed provided by your ISP.
* Persistent connections: This is especially useful when many small files are downloaded, since no new TLS negotiation
is required for each file.
* The package cache is cleaned automatically: No need to set up cron jobs or systemd timers to clean the cache
regularly, Flexo will automatically ensure that only the 3 most recent versions of a package are kept in your cache
(this parameter can be changed).

## Configuration

The AUR package will install the configuration file in `/etc/flexo/flexo.toml`.
It includes many comments and should be self explanatory (open an issue in case you disagree).
It includes many comments and should be self-explanatory (open an issue in case you disagree).
In most cases, you will want to leave all settings unchanged, with two exceptions:

1. The setting `low_speed_limit` is commented by default, which means that flexo will *not* attempt
Expand Down Expand Up @@ -89,11 +92,13 @@ For issues related to the mirror selection, also see [this page](./mirror_select

## Cleaning the package cache

Starting with version 1.2.0, Flexo includes the setting `num_versions_retain` to purge the package cache. See the
[configuration example](./flexo/conf/flexo.toml) for more details.
The default configuration of Flexo will keep 3 versions of a package in cache: After a 4th version of a package has been
downloaded, the oldest version will be automatically removed. This setting can be changed with the `num_versions_retain`
parameter. See the [configuration example](./flexo/conf/flexo.toml) for more details.

If you use Docker, make sure to use an image that is tagged with a version of 1.2.2 or higher. By default, 3 versions are kept in the cache.
Adapt the `FLEXO_NUM_VERSIONS_RETAIN` environment variable to change the number of versions kept in cache.
If you use Docker, the default behavior can be changed with the `FLEXO_NUM_VERSIONS_RETAIN` environment variable.

If you want to disable this setting and never purge the cache, set the parameter to `0`.

## Using Unofficial User Repositories

Expand Down Expand Up @@ -139,7 +144,7 @@ if that feature is desired and if it fits into the design goals of flexo.

## Development

Details about design decisions and the terminology used in the code
Details about design decisions, and the terminology used in the code,
are described [here](flexo/terminology.md).

The following packages are required to build and test flexo:
Expand All @@ -150,7 +155,7 @@ pacman -S rustup docker docker-compose curl

The [./docker-compose](test/docker-test-local/docker-compose) script may require you to be able to use Docker
without root privileges. Add your user to the Docker group to do so. You may want to read
the [wiki](https://wiki.archlinux.org/index.php/Docker) for more details on on the security
the [wiki](https://wiki.archlinux.org/index.php/Docker) for more details on the security
implications of doing so.

```
Expand All @@ -168,18 +173,18 @@ One way to enable it is to modify your `~/.docker/config.json` to include the fo
Make sure to restart the Docker daemon after modifying this file.

We have two types of test cases:
1. Tests written in Rust: [integration_tests.rs](flexo/tests/integration_test.rs). These tests run quickly
1. Tests written in Rust: [integration_tests.rs](flexo/tests/integration_test.rs). These tests run quickly,
and they are fully deterministic (afaik). You can run them with `cargo`:
```
cd flexo
cargo test
```
2. end-to-end tests using Docker.
We try to avoid flaky test cases, but we cannot guarantee that all our Docker test cases are deterministic,
since their outcome depends on various factors outside of our control (e.g. how the scheduler runs OS processes,
how TCP packets are assembled by the kernel's TCP stack, etc.).
As a result, a failing end-to-end
test may indicate that a new bug was introduced, but it might also have been bad luck or a badly written test case.
since their outcome depends on various factors outside our control (e.g. how the scheduler runs OS processes,
how TCP packets are assembled by the kernel's TCP stack, etc.).
As a result, a failing end-to-end test may indicate that a new bug was introduced, but it might also have been bad luck
or a badly written test case.

In order to run the Docker test cases, run the shell script to set up everything:

Expand Down
19 changes: 8 additions & 11 deletions flexo/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions flexo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "flexo"
version = "1.3.0"
version = "1.3.1"
authors = ["Fabian Muscariello <nroi@mailbox.org>"]
edition = "2018"

Expand All @@ -18,11 +18,11 @@ crossbeam = "0.8.0"
httparse = "1.3.4"
time = "0.1.3"
walkdir = "2.3.1"
xattr = "0.2.2"
log = "0.4.14"
chrono = { version = "0.4", features = ["serde", "rustc-serialize"] }
humantime = "2.1.0"
env_logger = "0.8.3"
glob = "0.3.0"

[dev-dependencies]
tempfile = "3.2.0"
Expand Down
1 change: 1 addition & 0 deletions flexo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ impl <J> JobContext<J> where J: Job {
return ScheduleOutcome::Uncacheable(self.best_provider(custom_provider));
},
Some(CachedItem { complete_size: Some(c), cached_size }) if c == cached_size => {
debug!("Order {:?} is already cached.", &order);
return ScheduleOutcome::Cached;
},
Some(CachedItem { cached_size, .. } ) => cached_size,
Expand Down
69 changes: 44 additions & 25 deletions flexo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ extern crate http;
#[macro_use] extern crate log;
extern crate rand;

use std::ffi::OsString;
use std::fs::File;
use std::io;
use std::fs;
use std::io::ErrorKind;
use std::io::prelude::*;
use std::net::{TcpListener, TcpStream};
Expand All @@ -17,6 +17,7 @@ use std::sync::{Arc, Mutex};

use crossbeam::channel::Receiver;
use crossbeam::channel::RecvTimeoutError;
use glob::glob;
use libc::off64_t;
#[cfg(test)]
use tempfile::tempfile;
Expand Down Expand Up @@ -61,7 +62,7 @@ fn main() {

let properties = mirror_config::load_config();
debug!("The following settings were fetched from the TOML file or environment variables: {:#?}", &properties);
initialize_cache(&properties);
inspect_and_initialize_cache(&properties);
match properties.low_speed_limit {
None => {},
Some(limit) => {
Expand Down Expand Up @@ -106,6 +107,7 @@ fn main() {
(Ok(true), Some(0)) => {},
(Ok(true), Some(v)) => {
purge_cache(&cache_directory, v);
purge_cfs_files(&cache_directory);
},
_ => {},
}
Expand Down Expand Up @@ -141,6 +143,40 @@ fn purge_cache(directory: &str, num_versions_retain: u32) {
}
}

fn purge_cfs_files(directory: &str) {
let glob_pattern = format!("{}/**/.*.cfs", directory);
for path in glob(&glob_pattern).unwrap() {
match &path {
Ok(p) => {
match p.file_name().unwrap().to_str() {
None => {
warn!("Invalid unicode: {:?}", p.file_name());
}
Some(filename) => {
let corresponding_package_filename = filename
.strip_prefix(".").unwrap()
.strip_suffix(".cfs").unwrap();
let corresponding_package_filepath = p.with_file_name(corresponding_package_filename);
if !corresponding_package_filepath.exists() {
match fs::remove_file(&p) {
Ok(()) => {
debug!("File {:?} is no longer required and therefore removed.", &p);
}
Err(e) => {
warn!("Unable to remove file {:?}: {:?}", &p, e);
}
}
}
}
}
}
Err(e) => {
warn!("Unreadable path: {:?}", &e);
}
}
}
}

fn str_from_vec(v: Vec<u8>) -> Option<String> {
match String::from_utf8(v) {
Ok(s) if !s.is_empty() => Some(s),
Expand Down Expand Up @@ -177,7 +213,7 @@ fn serve_request(job_context: Arc<Mutex<JobContext<DownloadJob>>>,
let order = DownloadOrder {
filepath: get_request.path,
};
debug!("Attempt to schedule new job");
debug!("Schedule new job");
let result = job_context.lock().unwrap().try_schedule(order.clone(), custom_provider, get_request.resume_from);
match result {
ScheduleOutcome::AlreadyInProgress => {
Expand Down Expand Up @@ -553,12 +589,12 @@ fn receive_content_length(rx: Receiver<FlexoProgress>) -> Result<ContentLengthRe
fn try_complete_filesize_from_path(path: &Path) -> Result<u64, FileAttrError> {
let mut num_attempts = 0;
// Timeout after 2 seconds.
while num_attempts < 2_000 * 2 {
match content_length_from_path(path)? {
while num_attempts < 2_000 {
match get_complete_size_from_cfs_file(path) {
None => {
// for the unlikely event that this file has just been created, but the extended attribute
// has not been set yet.
std::thread::sleep(std::time::Duration::from_micros(500));
// for the unlikely event that this file has just been created, but the cfs file
// has not been created yet.
std::thread::sleep(std::time::Duration::from_millis(1));
},
Some(v) => return Ok(v),
}
Expand All @@ -569,23 +605,6 @@ fn try_complete_filesize_from_path(path: &Path) -> Result<u64, FileAttrError> {
Err(FileAttrError::TimeoutError)
}

fn content_length_from_path(path: &Path) -> Result<Option<u64>, FileAttrError> {
let key = OsString::from("user.content_length");
let value = xattr::get(&path, &key)?;
match value {
Some(value) => {
let content_length = String::from_utf8(value).map_err(FileAttrError::from)
.and_then(|v| v.parse::<u64>().map_err(FileAttrError::from))?;
debug!("Found file! content length is {}", content_length);
Ok(Some(content_length))
},
None => {
info!("file exists, but no content length is set.");
Ok(None)
}
}
}

fn serve_from_growing_file(
mut file: File,
content_length: u64,
Expand Down
2 changes: 1 addition & 1 deletion flexo/src/mirror_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ fn mirrors_auto_config_from_env() -> MirrorsAutoConfig {
.split(",")
.into_iter()
.filter(|s| !s.is_empty())
.map(|s| s.to_owned())
.map(|s| s.trim().to_owned())
.collect::<Vec<String>>()
);
let mirrors_blacklist =
Expand Down