Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/server/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ pub const MAX_REORDER_SECS: u64 = 10;

pub const STREAM_IDLE_TIMEOUT_SECS: u64 = 120;

pub const ROTATION_TIMEOUT_SECS: u64 = 30;

pub const UPLOAD_DONE_TIMEOUT: Duration = Duration::from_secs(30);

pub const JANITOR_INTERVAL: Duration = Duration::from_secs(30);
pub const NONCE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);

pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
pub const WRITE_TIMEOUT: Duration = Duration::from_secs(10);

pub const UPLOAD_CHANNEL_CAPACITY: usize = 16;

pub const MASTER_EXPIRY: Duration = Duration::from_secs(1200);

pub const PADDING_POOL: &[u8] = b"padding=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
Expand Down
14 changes: 10 additions & 4 deletions src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use zeroize::Zeroizing;

use crate::crypto::{self, AesFrameCipher, AesKey};
use crate::error::ServerError;
use crate::server::constants::{CONNECT_TIMEOUT, MASTER_EXPIRY, MAX_UPLOAD_BODY_SIZE};
use crate::server::constants::{
CONNECT_TIMEOUT, MASTER_EXPIRY, MAX_UPLOAD_BODY_SIZE, UPLOAD_CHANNEL_CAPACITY,
UPLOAD_DONE_TIMEOUT,
};
use crate::server::{
connection::{self, connect_upstream},
state::{DownloadStream, FrameOrEos, StreamBundle, UploadStream},
Expand Down Expand Up @@ -159,7 +162,7 @@ async fn handle_plaintext_download(
let session_id = utils::extract_cookie_value(&headers, "session")
.map(|s| s.to_owned())
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let (frame_tx, frame_rx) = mpsc::channel::<FrameOrEos>(1);
let (frame_tx, frame_rx) = mpsc::channel::<FrameOrEos>(UPLOAD_CHANNEL_CAPACITY);

let upload = Arc::new(UploadStream::new(frame_tx, None));

Expand Down Expand Up @@ -474,7 +477,7 @@ async fn handle_pq_download(
};

let (upstream_read, upstream_write) = upstream.into_split();
let (frame_tx, frame_rx) = mpsc::channel::<FrameOrEos>(1);
let (frame_tx, frame_rx) = mpsc::channel::<FrameOrEos>(UPLOAD_CHANNEL_CAPACITY);

let upload = Arc::new(UploadStream::new(frame_tx, Some(upload_cipher)));

Expand Down Expand Up @@ -552,6 +555,8 @@ async fn handle_download_continuation(
.map(|r| Arc::clone(r.value()))
.ok_or_else(|| ServerError::not_found("stream not found for continuation"))?;

bundle.upload.clear_rotation();

let session_id = cookie_val.split(':').next().unwrap_or(cookie_val);

if let Some(entry) = state.master_store.get(session_id) {
Expand Down Expand Up @@ -661,8 +666,9 @@ async fn handle_stream_upload(
.await
.map_err(|_| ServerError::bad_gateway("upload channel closed"))?;

done_rx
tokio::time::timeout(UPLOAD_DONE_TIMEOUT, done_rx)
.await
.map_err(|_| ServerError::gateway_timeout("upload drain timeout"))?
.map_err(|_| ServerError::bad_gateway("upload stream closed"))?;

bundle.upload.touch();
Expand Down
6 changes: 4 additions & 2 deletions src/server/janitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ pub async fn stream_janitor(streams: Arc<DashMap<String, Arc<StreamBundle>>>) {
let mut expired = vec![];
for entry in streams.iter() {
let bundle = entry.value();
if bundle.upload.is_idle() && bundle.upload.do_shutdown() {
if (bundle.upload.is_idle() || bundle.upload.is_rotation_stale())
&& bundle.upload.do_shutdown()
{
expired.push(entry.key().clone());
}
}
Expand All @@ -24,7 +26,7 @@ pub async fn stream_janitor(streams: Arc<DashMap<String, Arc<StreamBundle>>>) {
*guard = None;
}
let display_id = key.split(':').next().unwrap_or(&key);
warn!(stream_id = %display_id, reason = "idle timeout", "shutting down idle stream");
warn!(stream_id = %display_id, reason = "idle or rotation timeout", "shutting down stream");
}
}
}
Expand Down
46 changes: 40 additions & 6 deletions src/server/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::sync::{Notify, mpsc, oneshot};
use tracing::{debug, info, warn};

use crate::crypto::AesFrameCipher;
use crate::server::constants::{STREAM_IDLE_TIMEOUT_SECS, now_secs};
use crate::server::constants::{ROTATION_TIMEOUT_SECS, STREAM_IDLE_TIMEOUT_SECS, now_secs};
use crate::shaper::{EncodingType, FrameCipher};

pub enum FrameOrEos {
Expand All @@ -33,6 +33,7 @@ pub struct UploadStream {
pub upload_cipher: Option<Arc<AesFrameCipher>>,
pub shutdown: Arc<Notify>,
shutdown_flag: AtomicBool,
pub rotation_at: AtomicU64,
}

impl UploadStream {
Expand All @@ -44,6 +45,7 @@ impl UploadStream {
upload_cipher,
shutdown: Arc::new(Notify::new()),
shutdown_flag: AtomicBool::new(false),
rotation_at: AtomicU64::new(0),
}
}
#[inline(always)]
Expand All @@ -64,6 +66,19 @@ impl UploadStream {
true
}
}
#[inline(always)]
pub fn mark_rotation(&self) {
self.rotation_at.store(now_secs(), Ordering::Relaxed);
}
#[inline(always)]
pub fn clear_rotation(&self) {
self.rotation_at.store(0, Ordering::Relaxed);
}
#[inline(always)]
pub fn is_rotation_stale(&self) -> bool {
let at = self.rotation_at.load(Ordering::Relaxed);
at != 0 && now_secs().saturating_sub(at) > ROTATION_TIMEOUT_SECS
}
}

pub type ShaperStream = Pin<Box<dyn Stream<Item = std::io::Result<(u64, Bytes)>> + Send>>;
Expand Down Expand Up @@ -133,12 +148,14 @@ impl Stream for DownloadStream {

let threshold = this.bundle.max_download_bytes;

let mut guard = this
let mut shaper_opt = this
.bundle
.upstream_reader
.lock()
.expect("upstream_reader mutex poisoned");
let shaper = match guard.as_mut() {
.expect("upstream_reader mutex poisoned")
.take();

let shaper = match shaper_opt.as_mut() {
Some(s) => s,
None => {
info!(stream_id = %this.log_key, reason = "upstream reader already taken", "download stream ended");
Expand All @@ -159,7 +176,12 @@ impl Stream for DownloadStream {
{
this.done = true;
this.rotated = true;
drop(guard);
this.bundle.upload.mark_rotation();
*this
.bundle
.upstream_reader
.lock()
.expect("upstream_reader mutex poisoned") = shaper_opt;
this.release_upstream();
debug!(
stream_id = %this.log_key,
Expand All @@ -168,6 +190,11 @@ impl Stream for DownloadStream {
);
return Poll::Ready(Some(Ok(data)));
}
*this
.bundle
.upstream_reader
.lock()
.expect("upstream_reader mutex poisoned") = shaper_opt;
Poll::Ready(Some(Ok(data)))
}
Poll::Ready(Some(Err(e))) => {
Expand All @@ -182,7 +209,14 @@ impl Stream for DownloadStream {
this.release_upstream();
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
Poll::Pending => {
*this
.bundle
.upstream_reader
.lock()
.expect("upstream_reader mutex poisoned") = shaper_opt;
Poll::Pending
}
}
}
}
Expand Down
Loading