Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ vortex-zigzag = { workspace = true }
vortex-zstd = { workspace = true, optional = true }

[dev-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["full"] }
vortex-array = { workspace = true, features = ["_test-harness"] }
vortex-io = { workspace = true, features = ["tokio"] }
Expand Down
146 changes: 0 additions & 146 deletions vortex-file/src/read/events.rs

This file was deleted.

3 changes: 0 additions & 3 deletions vortex-file/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

mod driver;
mod events;
mod request;

pub(crate) use driver::IoRequestStream;
pub(crate) use events::EventsChannel;
pub(crate) use events::EventsSender;
pub(crate) use request::ReadRequest;
pub(crate) use request::RequestId;
17 changes: 8 additions & 9 deletions vortex-file/src/segments/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::task::Poll;

use futures::FutureExt;
use futures::StreamExt;
use futures::channel::mpsc;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Alignment;
use vortex_error::VortexResult;
Expand All @@ -23,8 +24,6 @@ use vortex_layout::segments::SegmentSource;
use vortex_metrics::VortexMetrics;

use crate::SegmentSpec;
use crate::read::EventsChannel;
use crate::read::EventsSender;
use crate::read::IoRequestStream;
use crate::read::ReadRequest;
use crate::read::RequestId;
Expand Down Expand Up @@ -60,7 +59,7 @@ pub enum ReadEvent {
pub struct FileSegmentSource {
segments: Arc<[SegmentSpec]>,
/// A queue for sending read request events to the I/O stream.
events: EventsSender,
events: mpsc::UnboundedSender<ReadEvent>,
/// The next read request ID.
next_id: Arc<AtomicUsize>,
}
Expand All @@ -72,7 +71,7 @@ impl FileSegmentSource {
handle: Handle,
metrics: VortexMetrics,
) -> Self {
let (send, recv) = EventsChannel::unbounded();
let (send, recv) = mpsc::unbounded();

let max_alignment = segments
.iter()
Expand Down Expand Up @@ -145,10 +144,8 @@ impl SegmentSource for FileSegmentSource {

// If we fail to submit the event, we create a future that has failed.
if let Err(e) = self.events.unbounded_send(event) {
return std::future::ready({
Err(vortex_err!("Failed to submit read request: {e}"))
})
.boxed();
return async move { Err(vortex_err!("Failed to submit read request: {e}")) }
.boxed();
}

ReadFuture {
Expand Down Expand Up @@ -177,7 +174,7 @@ struct ReadFuture {
id: usize,
recv: oneshot::Receiver<VortexResult<BufferHandle>>,
polled: bool,
events: EventsSender,
events: mpsc::UnboundedSender<ReadEvent>,
}

impl Future for ReadFuture {
Expand All @@ -201,6 +198,8 @@ impl Future for ReadFuture {

impl Drop for ReadFuture {
fn drop(&mut self) {
// When the FileHandle is dropped, we can send a shutdown event to the I/O stream.
// If the I/O stream has already been dropped, this will fail silently.
drop(self.events.unbounded_send(ReadEvent::Dropped(self.id)));
}
}
Loading