From ec08bddc64d20b9d49487ba4f78830175bf732b5 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Thu, 29 Jan 2026 13:40:42 -0500 Subject: [PATCH] Revert "Stop IO event stream once there are no more senders (#6203)" This reverts commit a4a2f0d7b6cfe852b91d70885dd45f15455ad11c. --- Cargo.lock | 1 - vortex-file/Cargo.toml | 1 - vortex-file/src/read/events.rs | 146 ----------------------------- vortex-file/src/read/mod.rs | 3 - vortex-file/src/segments/source.rs | 17 ++-- 5 files changed, 8 insertions(+), 160 deletions(-) delete mode 100644 vortex-file/src/read/events.rs diff --git a/Cargo.lock b/Cargo.lock index 952184756aa..1681461cca0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10637,7 +10637,6 @@ dependencies = [ name = "vortex-file" version = "0.1.0" dependencies = [ - "anyhow", "async-trait", "bytes", "flatbuffers", diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index c5dce9e7405..d2285a6b1c0 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -58,7 +58,6 @@ vortex-zigzag = { workspace = true } vortex-zstd = { workspace = true, optional = true } [dev-dependencies] -anyhow = { workspace = true } tokio = { workspace = true, features = ["full"] } vortex-array = { workspace = true, features = ["_test-harness"] } vortex-io = { workspace = true, features = ["tokio"] } diff --git a/vortex-file/src/read/events.rs b/vortex-file/src/read/events.rs deleted file mode 100644 index e9f6b9dd79c..00000000000 --- a/vortex-file/src/read/events.rs +++ /dev/null @@ -1,146 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::pin::Pin; -use std::sync::Arc; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::task::Context; -use std::task::Poll; - -use futures::Stream; -use futures::StreamExt; -use futures::channel::mpsc::TrySendError; -use futures::channel::mpsc::UnboundedReceiver; -use futures::channel::mpsc::UnboundedSender; -use futures::channel::mpsc::unbounded; -use futures::stream::FusedStream; -use vortex_error::VortexExpect; - -use crate::segments::ReadEvent; - -pub struct EventsChannel; - -pub struct EventsReceiver { - inner: Option>, - num_senders: Arc, - is_done: bool, -} - -impl EventsReceiver { - fn new(inner: UnboundedReceiver, num_senders: Arc) -> Self { - Self { - inner: Some(inner), - num_senders, - is_done: false, - } - } - - fn terminate(&mut self) { - self.is_done = true; - self.inner.take(); - } -} - -pub struct EventsSender { - inner: UnboundedSender, - num_senders: Arc, -} - -impl Clone for EventsSender { - fn clone(&self) -> Self { - self.num_senders.fetch_add(1, Ordering::SeqCst); - Self { - inner: self.inner.clone(), - num_senders: self.num_senders.clone(), - } - } -} - -impl Drop for EventsSender { - fn drop(&mut self) { - self.num_senders.fetch_sub(1, Ordering::SeqCst); - } -} - -impl EventsSender { - fn new(inner: UnboundedSender, num_senders: Arc) -> Self { - Self { inner, num_senders } - } - - pub fn unbounded_send(&self, read_event: ReadEvent) -> Result<(), TrySendError> { - self.inner.unbounded_send(read_event) - } -} - -impl EventsChannel { - pub fn unbounded() -> (EventsSender, EventsReceiver) { - let (tx, rx) = unbounded(); - let num_senders = Arc::new(AtomicUsize::new(1)); - - ( - EventsSender::new(tx, num_senders.clone()), - EventsReceiver::new(rx, num_senders), - ) - } -} - -impl Stream for EventsReceiver { - type Item = ReadEvent; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.is_done || self.inner.as_ref().is_some_and(|rx| rx.is_terminated()) { - self.terminate(); - return Poll::Ready(None); - } - - if self.num_senders.load(Ordering::SeqCst) == 0 { - self.terminate(); - - return Poll::Ready(None); - } - - self.inner - .as_mut() - .vortex_expect("Must exist here") - .poll_next_unpin(cx) - } -} - -impl FusedStream for EventsReceiver { - fn is_terminated(&self) -> bool { - self.is_done || self.inner.as_ref().is_some_and(|rx| rx.is_terminated()) - } -} - -#[cfg(test)] -mod tests { - - use futures::future::FusedFuture; - - use super::*; - - #[tokio::test] - async fn test_cancellation_no_senders() -> anyhow::Result<()> { - let (tx, mut rx) = EventsChannel::unbounded(); - tx.unbounded_send(ReadEvent::Polled(1))?; - tx.unbounded_send(ReadEvent::Polled(2))?; - tx.unbounded_send(ReadEvent::Polled(3))?; - let tx2 = tx.clone(); - tx2.unbounded_send(ReadEvent::Polled(4))?; - - assert!(rx.next().await.is_some()); - assert!(rx.next().await.is_some()); - - drop(tx); - assert!(rx.next().await.is_some()); - drop(tx2); - - // We technically still have one event, but we stop anyway. - assert!(rx.next().await.is_none()); - assert!(rx.next().is_terminated()); - assert!(rx.next().await.is_none()); - - Ok(()) - } -} diff --git a/vortex-file/src/read/mod.rs b/vortex-file/src/read/mod.rs index b269087f199..a812b81f63b 100644 --- a/vortex-file/src/read/mod.rs +++ b/vortex-file/src/read/mod.rs @@ -2,11 +2,8 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors mod driver; -mod events; mod request; pub(crate) use driver::IoRequestStream; -pub(crate) use events::EventsChannel; -pub(crate) use events::EventsSender; pub(crate) use request::ReadRequest; pub(crate) use request::RequestId; diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index b2a94f36514..344f805f516 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -11,6 +11,7 @@ use std::task::Poll; use futures::FutureExt; use futures::StreamExt; +use futures::channel::mpsc; use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_error::VortexResult; @@ -23,8 +24,6 @@ use vortex_layout::segments::SegmentSource; use vortex_metrics::VortexMetrics; use crate::SegmentSpec; -use crate::read::EventsChannel; -use crate::read::EventsSender; use crate::read::IoRequestStream; use crate::read::ReadRequest; use crate::read::RequestId; @@ -60,7 +59,7 @@ pub enum ReadEvent { pub struct FileSegmentSource { segments: Arc<[SegmentSpec]>, /// A queue for sending read request events to the I/O stream. - events: EventsSender, + events: mpsc::UnboundedSender, /// The next read request ID. next_id: Arc, } @@ -72,7 +71,7 @@ impl FileSegmentSource { handle: Handle, metrics: VortexMetrics, ) -> Self { - let (send, recv) = EventsChannel::unbounded(); + let (send, recv) = mpsc::unbounded(); let max_alignment = segments .iter() @@ -145,10 +144,8 @@ impl SegmentSource for FileSegmentSource { // If we fail to submit the event, we create a future that has failed. if let Err(e) = self.events.unbounded_send(event) { - return std::future::ready({ - Err(vortex_err!("Failed to submit read request: {e}")) - }) - .boxed(); + return async move { Err(vortex_err!("Failed to submit read request: {e}")) } + .boxed(); } ReadFuture { @@ -177,7 +174,7 @@ struct ReadFuture { id: usize, recv: oneshot::Receiver>, polled: bool, - events: EventsSender, + events: mpsc::UnboundedSender, } impl Future for ReadFuture { @@ -201,6 +198,8 @@ impl Future for ReadFuture { impl Drop for ReadFuture { fn drop(&mut self) { + // When the FileHandle is dropped, we can send a shutdown event to the I/O stream. + // If the I/O stream has already been dropped, this will fail silently. drop(self.events.unbounded_send(ReadEvent::Dropped(self.id))); } }