Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to axum 0.7 #17

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
31716fd
Add configuration file support
sunsided Oct 2, 2023
92cb92c
Add explicit configuration file support
sunsided Oct 15, 2023
27df263
Allow specifying configuration on command-line
sunsided Oct 15, 2023
23d34cc
Add TODO for validating Memcache connection string
sunsided Oct 15, 2023
9e60aa6
Move memcache configuration to memcache backend
sunsided Nov 1, 2023
8f5113c
Refactor memcache backend into multiple files
sunsided Nov 11, 2023
2c68201
Add file_id as field to structured log output
sunsided Nov 11, 2023
cb75dbc
Refactor boxing of dynamic backends for registration
sunsided Nov 12, 2023
74609ae
Refator backend registration using TryCreateFromConfig trait
sunsided Nov 12, 2023
c18b498
Change log message after registration to be dynamic
sunsided Nov 12, 2023
07b1e7b
Refactor backend registration
sunsided Nov 12, 2023
aefaf8f
Add rendezvous at server shutdown
sunsided Nov 12, 2023
c8dc643
Refactor shutdown rendezvous into own type
sunsided Nov 12, 2023
159be1a
Add basic dispatch of distribution events
sunsided Nov 12, 2023
0c6e394
Use rendezvous crate for shutdown rendezvous
sunsided Dec 1, 2023
9538053
Update axum to 0.7 / http 1.0
sunsided Dec 1, 2023
7556853
Migrate connection acceptance loop to owned code
sunsided Dec 1, 2023
e1de2b7
Refactor new axum hosting into own module
sunsided Dec 1, 2023
f40df10
Add dropping of connection on shutdown
sunsided Dec 2, 2023
319db14
Add note about graceful shutdown
sunsided Dec 2, 2023
d902a56
Upgrade async-tempfile and shared-files
sunsided Dec 6, 2023
d9b3c6d
Add note about graceful shutdowns issue
sunsided Dec 6, 2023
2766a31
Update Cargo.lock after rebasing
sunsided Dec 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
607 changes: 559 additions & 48 deletions Cargo.lock

Large diffs are not rendered by default.

72 changes: 48 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,62 @@ authors = ["Markus Mayer"]
edition = "2021"
rust-version = "1.68.0"

[features]
default = ["memcache"]
memcache = ["dep:memcache", "dep:r2d2-memcache", "dep:r2d2"]

[dependencies]
anyhow = "1.0.71"
anyhow = "1.0.75"
async-tempfile = "0.5.0"
axum = { version = "0.6.18", features = ["http2", "headers", "macros", "json"] }
base64 = "0.21.2"
chrono = "0.4.26"
clap = { version = "4.3.8", features = ["env"] }
axum = { version = "0.7.2", features = ["http2", "macros", "json"] }
axum-extra = { version = "0.9.0", features = ["typed-header"] }
base64 = "0.21.5"
chrono = "0.4.31"
clap = { version = "4.4.11", features = ["env"] }
config = "0.13.4"
crossbeam = "0.8.2"
ctrlc = { version = "3.4.0", features = ["termination"] }
ctrlc = { version = "3.4.1", features = ["termination"] }
directories = "5.0.1"
dotenvy = "0.15.7"
exitcode = "1.1.2"
futures = "0.3.28"
headers-content-md5 = "0.1.0"
futures = "0.3.29"
futures-util = "0.3.29"
headers = "0.4.0"
headers-content-md5 = "0.2.0"
hex = "0.4.3"
hyper = { version = "0.14.26", features = ["http1", "http2", "server", "h2"] }
http-body = "1.0.0"
http-body-util = "0.1.0"
hyper = { version = "1.0.1", features = ["http1", "http2", "server"] }
hyper-util = { version = "0.1.1", features = ["server-auto"] }
lazy_static = "1.4.0"
md5 = "0.7.0"
mime-db = "1.6.0"
percent-encoding = "2.3.0"
pin-project = "1.1.0"
problemdetails = { version = "0.2.1", features = ["axum"] }
prometheus-client = "0.21.1"
serde = { version = "1.0.164", features = ["derive"] }
sha2 = "0.10.7"
memcache = { version = "0.17.1", optional = true }
mime-db = "1.7.0"
percent-encoding = "2.3.1"
pin-project = "1.1.3"
problemdetails = { version = "0.3.0", features = ["axum"] }
prometheus-client = "0.22.0"
r2d2 = { version = "0.8.10", optional = true }
r2d2-memcache = { version = "0.6.0", optional = true }
rendezvous = { version = "0.2.2", features = ["tokio", "log"] }
serde = { version = "1.0.193", features = ["derive"] }
sha2 = "0.10.8"
shared-files = "0.1.0"
shortguid = { version = "0.5.0", features = ["serde"] }
thiserror = "1.0.40"
time = "0.3.22"
tokio = { version = "1.28.2", features = ["full"] }
shortguid = { version = "0.6.0", features = ["serde"] }
thiserror = "1.0.50"
time = "0.3.30"
tokio = { version = "1.34.0", features = ["full"] }
tokio-stream = { version = "0.1.14", features = ["net"] }
tokio-util = { version = "0.7.8", features = ["io"] }
tokio-util = { version = "0.7.10", features = ["io"] }
tower = { version = "0.4.13", features = ["tokio"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter", "parking_lot", "tracing-log", "json"] }
uuid = { version = "1.3.4", features = ["v1", "rng", "serde"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "parking_lot", "tracing-log", "json"] }
uuid = { version = "1.6.1", features = ["v1", "rng", "serde"] }
tower-service = "0.3.2"

[dev-dependencies]
serde_yaml = "0.9.27"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,22 @@ sequenceDiagram
### Shutdown

* `/stop` - Initiates a graceful shutdown.

## Example run

```shell
cargo run --bin yeet-yoink -- --http 127.0.0.1:8080 --http 127.0.1.1:8081 -c example-config.yaml
```

Testing binary uploads with curl, ensure to use the `--data-binary` argument instead of `--data`, as zero bytes
may otherwise prematurely end the transfer:

```shell
curl --verbose -XPOST http://localhost:8080/yeet --data-binary @big-file.jpg
```

With nghttp, it could look like

```shell
nghttp -v --data=big-file.jpg -H':method: POST' http://localhost:8080/yeet
```
23 changes: 23 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
version: "3.8"
services:

redis:
image: "redis:alpine"
ports:
- "6379:6379"

memcached:
image: "memcached:alpine"
entrypoint:
- memcached
- -m 64
- --verbose
- --enable-largepages
expose:
- "11211/tcp"
ports:
- "11211:11211"
healthcheck:
test: echo stats | nc 127.0.0.1 11211
interval: 10s
retries: 60
6 changes: 6 additions & 0 deletions example-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
version: 0
backends:
memcache:
- tag: "memcache-1"
connection_string: "memcache://127.0.0.1:11211?timeout=10&tcp_nodelay=true"
63 changes: 63 additions & 0 deletions src/app_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use clap::ArgMatches;
use config::builder::DefaultState;
use config::{ConfigBuilder, File, FileFormat};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use tracing::{error, info};

#[derive(Default, Debug, Serialize, Deserialize)]
pub struct AppConfig {
version: u8,
pub backends: BackendsConfig,
}

#[derive(Default, Debug, Serialize, Deserialize)]
pub struct BackendsConfig {
#[cfg_attr(docsrs, doc(cfg(feature = "memcache")))]
#[cfg(feature = "memcache")]
pub memcache: Vec<crate::backends::memcache::MemcacheBackendConfig>,
}

pub fn load_config(config_dir: &Path, matches: &ArgMatches) -> Result<AppConfig, anyhow::Error> {
// TODO: Document configuration file locations
let mut config_builder = ConfigBuilder::<DefaultState>::default();

// Add default configuration.
config_builder = config_builder
.add_source(
File::from(config_dir.join("default.yml"))
.format(FileFormat::Yaml)
.required(false),
)
.add_source(
// The YAML FAQ requests `.yaml` to be used as the default.
File::from(config_dir.join("default.yaml"))
.format(FileFormat::Yaml)
.required(false),
);

if let Some(path) = matches.get_one::<PathBuf>("config_file").cloned() {
info!(
"Loading configuration file from {config_path:?}",
config_path = path
);
config_builder =
config_builder.add_source(File::from(path).format(FileFormat::Yaml).required(true))
}

let config = match config_builder.build() {
Ok(config) => config,
Err(e) => {
error!("Unable to load configuration: {error}", error = e);
return Err(e.into());
}
};

match config.try_deserialize() {
Ok(config) => Ok(config),
Err(e) => {
error!("Unable to deserialize configuration: {error}", error = e);
Err(e.into())
}
}
}
60 changes: 45 additions & 15 deletions src/backbone/backbone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ use crate::backbone::file_record::{FileRecord, GetReaderError};
use crate::backbone::file_writer::FileWriter;
use crate::backbone::file_writer_guard::FileWriterGuard;
use crate::backbone::WriteSummary;
use crate::backends::{BackendCommand, BackendRegistry};
use async_tempfile::TempFile;
use axum::headers::ContentType;
use axum::response::{IntoResponse, Response};
use headers::ContentType;
use hyper::StatusCode;
use rendezvous::RendezvousGuard;
use shared_files::{SharedFileWriter, SharedTemporaryFile};
use shortguid::ShortGuid;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::info;

Expand All @@ -25,21 +29,43 @@ pub const TEMPORAL_LEASE: Duration = Duration::from_secs(5 * 60);
/// This instance keeps track of currently processed files.
pub struct Backbone {
inner: Arc<RwLock<Inner>>,
sender: mpsc::Sender<BackboneCommand>,
sender: Sender<BackboneCommand>,
registry: BackendRegistry,
loop_handle: JoinHandle<()>,
}

struct Inner {
open: HashMap<ShortGuid, FileRecord>,
}

impl Backbone {
pub fn new() -> Self {
pub fn new(mut registry: BackendRegistry, cleanup_rendezvous: RendezvousGuard) -> Self {
let (sender, receiver) = mpsc::channel(1024);
let inner = Arc::new(RwLock::new(Inner {
open: HashMap::default(),
}));
let _ = tokio::spawn(Self::command_loop(inner.clone(), receiver));
Self { inner, sender }

let backend_sender = registry
.get_sender()
.expect("Failed to obtain the sender from the registry");

let loop_handle = tokio::spawn(Self::command_loop(
inner.clone(),
receiver,
backend_sender,
cleanup_rendezvous,
));
Self {
inner,
sender,
registry,
loop_handle,
}
}

pub async fn join(self) {
self.loop_handle.await.ok();
self.registry.join().await.ok();
}

/// Creates a new file buffer, registers it and returns a writer to it.
Expand Down Expand Up @@ -124,27 +150,31 @@ impl Backbone {
.map_err(|e| NewFileError::FailedCreatingWriter(id, e))
}

async fn command_loop(inner: Arc<RwLock<Inner>>, mut channel: mpsc::Receiver<BackboneCommand>) {
async fn command_loop(
inner: Arc<RwLock<Inner>>,
mut channel: mpsc::Receiver<BackboneCommand>,
backend_sender: Sender<BackendCommand>,
cleanup_rendezvous: RendezvousGuard,
) {
while let Some(command) = channel.recv().await {
match command {
BackboneCommand::RemoveWriter(id) => {
info!("Removing file {id} from bookkeeping");
info!(file_id = %id, "Removing file {id} from bookkeeping");
let mut inner = inner.write().await;
inner.open.remove(&id);
}
BackboneCommand::ReadyForDistribution(id, _) => {
info!("The file {id} was buffered completely and can now be distributed")
BackboneCommand::ReadyForDistribution(id, summary) => {
info!(file_id = %id, "The file {id} was buffered completely and can now be distributed");
backend_sender
.send(BackendCommand::DistributeFile(id, summary))
.await
.ok();
}
}
}

info!("The backbone command loop stopped");
}
}

impl Default for Backbone {
fn default() -> Self {
Self::new()
cleanup_rendezvous.completed();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/backbone/file_reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::backbone::WriteSummary;
use crate::metrics::transfer::{TransferMethod, TransferMetrics};
use axum::headers::ContentType;
use headers::ContentType;
use shared_files::{FileSize, SharedTemporaryFileReader};
use std::borrow::Cow;
use std::pin::Pin;
Expand Down
20 changes: 12 additions & 8 deletions src/backbone/file_record.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::backbone::backbone::BackboneCommand;
use crate::backbone::file_writer_guard::WriteResult;
use crate::backbone::WriteSummary;
use axum::headers::ContentType;
use headers::ContentType;
use shared_files::{SharedTemporaryFile, SharedTemporaryFileReader};
use shortguid::ShortGuid;
use std::sync::Arc;
Expand Down Expand Up @@ -96,17 +96,17 @@ impl FileRecord {
// Before starting the timeout, wait for the write to the file to complete.
let summary = match writer_command.await {
Ok(WriteResult::Success(summary)) => {
info!("File writing completed: {}", summary.hashes);
info!(file_id = %id, "File writing completed: {}", summary.hashes);
summary
}
Ok(WriteResult::Failed) => {
warn!("Writing to the file failed");
warn!(file_id = %id, "Writing to the file failed");
Self::close_file(&mut inner).await;
Self::remove_writer(id, backbone_command).await;
return;
}
Err(e) => {
warn!("The file writer channel failed: {e}");
warn!(file_id = %id, "The file writer channel failed: {e}");
Self::close_file(&mut inner).await;
Self::remove_writer(id, backbone_command).await;
return;
Expand All @@ -124,20 +124,24 @@ impl FileRecord {
.send(BackboneCommand::ReadyForDistribution(id, summary))
.await
{
warn!("The backbone writer channel was closed while indicating a termination for file with ID {id}: {error}");
warn!(file_id = %id, "The backbone writer channel was closed while indicating a termination for file with ID {id}: {error}");
return;
}

// TODO: The lifetime handler also needs to listen to graceful shutdowns.
// If that's not the case, open file entries may keep the server
// alive even if the servers have already shut down.

// Keep the file open for readers.
Self::apply_temporal_lease(&id, duration).await;
info!("Read lease timed out for file {id}; removing it");
info!(file_id = %id, "Read lease timed out for file {id}; removing it");

// Gracefully close the file.
Self::remove_writer(id, backbone_command).await;
}

async fn apply_temporal_lease(id: &ShortGuid, duration: Duration) {
info!("File {id} will accept new readers for {duration:?}");
info!(file_id = %id, "File {id} will accept new readers for {duration:?}");
tokio::time::sleep(duration).await
}

Expand All @@ -151,7 +155,7 @@ impl FileRecord {
.send(BackboneCommand::RemoveWriter(id))
.await
{
warn!("The backbone writer channel was closed while indicating a termination for file with ID {id}: {error}");
warn!(file_id = %id, "The backbone writer channel was closed while indicating a termination for file with ID {id}: {error}");
}
}
}
Expand Down
Loading