From 4c4609d646df80d3c5277beb23fe96e63db0f12b Mon Sep 17 00:00:00 2001 From: Devyn Cairns Date: Thu, 29 Feb 2024 14:39:17 -0800 Subject: [PATCH] Plugin StreamReader: fuse the iterator after an error (#12027) # Description This patches `StreamReader`'s iterator implementation to not return any values after an I/O error has been encountered. Without this, it's possible for a protocol error to cause the channel to disconnect, in which case every call to `recv()` returns an error, which causes the iterator to produce error values infinitely. There are some commands that don't immediately stop after receiving an error so it's possible that they just get stuck in an infinite error. This fixes that so the error is only produced once, and then the stream ends artificially. --- crates/nu-plugin/src/plugin/interface/stream.rs | 10 ++++++++-- .../src/plugin/interface/stream/tests.rs | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/crates/nu-plugin/src/plugin/interface/stream.rs b/crates/nu-plugin/src/plugin/interface/stream.rs index 6cda6df8d..95268cb9e 100644 --- a/crates/nu-plugin/src/plugin/interface/stream.rs +++ b/crates/nu-plugin/src/plugin/interface/stream.rs @@ -113,8 +113,14 @@ where fn next(&mut self) -> Option { // Converting the error to the value here makes the implementation a lot easier - self.recv() - .unwrap_or_else(|err| Some(T::from_shell_error(err))) + match self.recv() { + Ok(option) => option, + Err(err) => { + // Drop the receiver so we don't keep returning errors + self.receiver = None; + Some(T::from_shell_error(err)) + } + } } } diff --git a/crates/nu-plugin/src/plugin/interface/stream/tests.rs b/crates/nu-plugin/src/plugin/interface/stream/tests.rs index 6acf95bea..a775fd3ba 100644 --- a/crates/nu-plugin/src/plugin/interface/stream/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/stream/tests.rs @@ -147,6 +147,21 @@ fn reader_recv_end_of_stream() -> Result<(), ShellError> { Ok(()) } +#[test] +fn reader_iter_fuse_on_error() -> Result<(), ShellError> { + let (tx, rx) = mpsc::channel(); + let mut reader = StreamReader::::new(0, rx, TestSink::default()); + + drop(tx); // should cause error, because we didn't explicitly signal the end + + assert!( + reader.next().is_some_and(|e| e.is_error()), + "should be error the first time" + ); + assert!(reader.next().is_none(), "should be closed the second time"); + Ok(()) +} + #[test] fn reader_drop() { let (_tx, rx) = mpsc::channel();