From 0b6abd14034931aea3d83fe50ac782f2bb5ab822 Mon Sep 17 00:00:00 2001 From: lhear <121179341+lhear@users.noreply.github.com> Date: Thu, 28 May 2026 15:59:02 +0800 Subject: [PATCH] fix(server): resolve deadlock by releasing reader lock during async poll - Optimize DownloadStream::poll_next to use take-and-put-back pattern, preventing std::sync::Mutex from holding across async boundaries. - Implement stream rotation timeout to clean up stale rotated streams. - Add timeout for upload completion and expand channel capacity to 16. --- src/server/constants.rs | 6 ++++++ src/server/handlers.rs | 14 +++++++++---- src/server/janitor.rs | 6 ++++-- src/server/state.rs | 46 +++++++++++++++++++++++++++++++++++------ 4 files changed, 60 insertions(+), 12 deletions(-) diff --git a/src/server/constants.rs b/src/server/constants.rs index 708ff89..89ad434 100644 --- a/src/server/constants.rs +++ b/src/server/constants.rs @@ -8,12 +8,18 @@ pub const MAX_REORDER_SECS: u64 = 10; pub const STREAM_IDLE_TIMEOUT_SECS: u64 = 120; +pub const ROTATION_TIMEOUT_SECS: u64 = 30; + +pub const UPLOAD_DONE_TIMEOUT: Duration = Duration::from_secs(30); + pub const JANITOR_INTERVAL: Duration = Duration::from_secs(30); pub const NONCE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60); pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); pub const WRITE_TIMEOUT: Duration = Duration::from_secs(10); +pub const UPLOAD_CHANNEL_CAPACITY: usize = 16; + pub const MASTER_EXPIRY: Duration = Duration::from_secs(1200); pub const PADDING_POOL: &[u8] = b"padding=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"; diff --git a/src/server/handlers.rs b/src/server/handlers.rs index b1ef2e0..17234e8 100644 --- a/src/server/handlers.rs +++ b/src/server/handlers.rs @@ -13,7 +13,10 @@ use zeroize::Zeroizing; use crate::crypto::{self, AesFrameCipher, AesKey}; use crate::error::ServerError; -use crate::server::constants::{CONNECT_TIMEOUT, MASTER_EXPIRY, MAX_UPLOAD_BODY_SIZE}; +use crate::server::constants::{ + CONNECT_TIMEOUT, MASTER_EXPIRY, MAX_UPLOAD_BODY_SIZE, UPLOAD_CHANNEL_CAPACITY, + UPLOAD_DONE_TIMEOUT, +}; use crate::server::{ connection::{self, connect_upstream}, state::{DownloadStream, FrameOrEos, StreamBundle, UploadStream}, @@ -159,7 +162,7 @@ async fn handle_plaintext_download( let session_id = utils::extract_cookie_value(&headers, "session") .map(|s| s.to_owned()) .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); - let (frame_tx, frame_rx) = mpsc::channel::(1); + let (frame_tx, frame_rx) = mpsc::channel::(UPLOAD_CHANNEL_CAPACITY); let upload = Arc::new(UploadStream::new(frame_tx, None)); @@ -474,7 +477,7 @@ async fn handle_pq_download( }; let (upstream_read, upstream_write) = upstream.into_split(); - let (frame_tx, frame_rx) = mpsc::channel::(1); + let (frame_tx, frame_rx) = mpsc::channel::(UPLOAD_CHANNEL_CAPACITY); let upload = Arc::new(UploadStream::new(frame_tx, Some(upload_cipher))); @@ -552,6 +555,8 @@ async fn handle_download_continuation( .map(|r| Arc::clone(r.value())) .ok_or_else(|| ServerError::not_found("stream not found for continuation"))?; + bundle.upload.clear_rotation(); + let session_id = cookie_val.split(':').next().unwrap_or(cookie_val); if let Some(entry) = state.master_store.get(session_id) { @@ -661,8 +666,9 @@ async fn handle_stream_upload( .await .map_err(|_| ServerError::bad_gateway("upload channel closed"))?; - done_rx + tokio::time::timeout(UPLOAD_DONE_TIMEOUT, done_rx) .await + .map_err(|_| ServerError::gateway_timeout("upload drain timeout"))? .map_err(|_| ServerError::bad_gateway("upload stream closed"))?; bundle.upload.touch(); diff --git a/src/server/janitor.rs b/src/server/janitor.rs index b27693c..6b7be72 100644 --- a/src/server/janitor.rs +++ b/src/server/janitor.rs @@ -13,7 +13,9 @@ pub async fn stream_janitor(streams: Arc>>) { let mut expired = vec![]; for entry in streams.iter() { let bundle = entry.value(); - if bundle.upload.is_idle() && bundle.upload.do_shutdown() { + if (bundle.upload.is_idle() || bundle.upload.is_rotation_stale()) + && bundle.upload.do_shutdown() + { expired.push(entry.key().clone()); } } @@ -24,7 +26,7 @@ pub async fn stream_janitor(streams: Arc>>) { *guard = None; } let display_id = key.split(':').next().unwrap_or(&key); - warn!(stream_id = %display_id, reason = "idle timeout", "shutting down idle stream"); + warn!(stream_id = %display_id, reason = "idle or rotation timeout", "shutting down stream"); } } } diff --git a/src/server/state.rs b/src/server/state.rs index c1a2372..b6146f2 100644 --- a/src/server/state.rs +++ b/src/server/state.rs @@ -13,7 +13,7 @@ use tokio::sync::{Notify, mpsc, oneshot}; use tracing::{debug, info, warn}; use crate::crypto::AesFrameCipher; -use crate::server::constants::{STREAM_IDLE_TIMEOUT_SECS, now_secs}; +use crate::server::constants::{ROTATION_TIMEOUT_SECS, STREAM_IDLE_TIMEOUT_SECS, now_secs}; use crate::shaper::{EncodingType, FrameCipher}; pub enum FrameOrEos { @@ -33,6 +33,7 @@ pub struct UploadStream { pub upload_cipher: Option>, pub shutdown: Arc, shutdown_flag: AtomicBool, + pub rotation_at: AtomicU64, } impl UploadStream { @@ -44,6 +45,7 @@ impl UploadStream { upload_cipher, shutdown: Arc::new(Notify::new()), shutdown_flag: AtomicBool::new(false), + rotation_at: AtomicU64::new(0), } } #[inline(always)] @@ -64,6 +66,19 @@ impl UploadStream { true } } + #[inline(always)] + pub fn mark_rotation(&self) { + self.rotation_at.store(now_secs(), Ordering::Relaxed); + } + #[inline(always)] + pub fn clear_rotation(&self) { + self.rotation_at.store(0, Ordering::Relaxed); + } + #[inline(always)] + pub fn is_rotation_stale(&self) -> bool { + let at = self.rotation_at.load(Ordering::Relaxed); + at != 0 && now_secs().saturating_sub(at) > ROTATION_TIMEOUT_SECS + } } pub type ShaperStream = Pin> + Send>>; @@ -133,12 +148,14 @@ impl Stream for DownloadStream { let threshold = this.bundle.max_download_bytes; - let mut guard = this + let mut shaper_opt = this .bundle .upstream_reader .lock() - .expect("upstream_reader mutex poisoned"); - let shaper = match guard.as_mut() { + .expect("upstream_reader mutex poisoned") + .take(); + + let shaper = match shaper_opt.as_mut() { Some(s) => s, None => { info!(stream_id = %this.log_key, reason = "upstream reader already taken", "download stream ended"); @@ -159,7 +176,12 @@ impl Stream for DownloadStream { { this.done = true; this.rotated = true; - drop(guard); + this.bundle.upload.mark_rotation(); + *this + .bundle + .upstream_reader + .lock() + .expect("upstream_reader mutex poisoned") = shaper_opt; this.release_upstream(); debug!( stream_id = %this.log_key, @@ -168,6 +190,11 @@ impl Stream for DownloadStream { ); return Poll::Ready(Some(Ok(data))); } + *this + .bundle + .upstream_reader + .lock() + .expect("upstream_reader mutex poisoned") = shaper_opt; Poll::Ready(Some(Ok(data))) } Poll::Ready(Some(Err(e))) => { @@ -182,7 +209,14 @@ impl Stream for DownloadStream { this.release_upstream(); Poll::Ready(None) } - Poll::Pending => Poll::Pending, + Poll::Pending => { + *this + .bundle + .upstream_reader + .lock() + .expect("upstream_reader mutex poisoned") = shaper_opt; + Poll::Pending + } } } }