Skip to content

Commit

Permalink
Plugin StreamReader: fuse the iterator after an error (#12027)
Browse files Browse the repository at this point in the history
# Description

This patches `StreamReader`'s iterator implementation to not return any
values after an I/O error has been encountered.

Without this, it's possible for a protocol error to cause the channel to
disconnect, in which case every call to `recv()` returns an error, which
causes the iterator to produce error values infinitely. There are some
commands that don't immediately stop after receiving an error so it's
possible that they just get stuck in an infinite error. This fixes that
so the error is only produced once, and then the stream ends
artificially.
  • Loading branch information
devyn committed Feb 29, 2024
1 parent 65e5aba commit 4c4609d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
10 changes: 8 additions & 2 deletions crates/nu-plugin/src/plugin/interface/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,14 @@ where

fn next(&mut self) -> Option<T> {
// Converting the error to the value here makes the implementation a lot easier
self.recv()
.unwrap_or_else(|err| Some(T::from_shell_error(err)))
match self.recv() {
Ok(option) => option,
Err(err) => {
// Drop the receiver so we don't keep returning errors
self.receiver = None;
Some(T::from_shell_error(err))
}
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions crates/nu-plugin/src/plugin/interface/stream/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@ fn reader_recv_end_of_stream() -> Result<(), ShellError> {
Ok(())
}

#[test]
fn reader_iter_fuse_on_error() -> Result<(), ShellError> {
let (tx, rx) = mpsc::channel();
let mut reader = StreamReader::<Value, _>::new(0, rx, TestSink::default());

drop(tx); // should cause error, because we didn't explicitly signal the end

assert!(
reader.next().is_some_and(|e| e.is_error()),
"should be error the first time"
);
assert!(reader.next().is_none(), "should be closed the second time");
Ok(())
}

#[test]
fn reader_drop() {
let (_tx, rx) = mpsc::channel();
Expand Down

0 comments on commit 4c4609d

Please sign in to comment.