diff --git a/crates/nu-plugin/src/plugin/interface/stream.rs b/crates/nu-plugin/src/plugin/interface/stream.rs index 6cda6df8d..95268cb9e 100644 --- a/crates/nu-plugin/src/plugin/interface/stream.rs +++ b/crates/nu-plugin/src/plugin/interface/stream.rs @@ -113,8 +113,14 @@ where fn next(&mut self) -> Option { // Converting the error to the value here makes the implementation a lot easier - self.recv() - .unwrap_or_else(|err| Some(T::from_shell_error(err))) + match self.recv() { + Ok(option) => option, + Err(err) => { + // Drop the receiver so we don't keep returning errors + self.receiver = None; + Some(T::from_shell_error(err)) + } + } } } diff --git a/crates/nu-plugin/src/plugin/interface/stream/tests.rs b/crates/nu-plugin/src/plugin/interface/stream/tests.rs index 6acf95bea..a775fd3ba 100644 --- a/crates/nu-plugin/src/plugin/interface/stream/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/stream/tests.rs @@ -147,6 +147,21 @@ fn reader_recv_end_of_stream() -> Result<(), ShellError> { Ok(()) } +#[test] +fn reader_iter_fuse_on_error() -> Result<(), ShellError> { + let (tx, rx) = mpsc::channel(); + let mut reader = StreamReader::::new(0, rx, TestSink::default()); + + drop(tx); // should cause error, because we didn't explicitly signal the end + + assert!( + reader.next().is_some_and(|e| e.is_error()), + "should be error the first time" + ); + assert!(reader.next().is_none(), "should be closed the second time"); + Ok(()) +} + #[test] fn reader_drop() { let (_tx, rx) = mpsc::channel();