Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace panics with errors in thread spawning #12040

Merged
merged 1 commit into from Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 21 additions & 19 deletions crates/nu-cmd-lang/src/core_commands/do_.rs
Expand Up @@ -4,8 +4,8 @@ use nu_engine::{eval_block_with_early_return, redirect_env, CallExt};
use nu_protocol::ast::Call;
use nu_protocol::engine::{Closure, Command, EngineState, Stack};
use nu_protocol::{
Category, Example, ListStream, PipelineData, RawStream, ShellError, Signature, SyntaxShape,
Type, Value,
Category, Example, IntoSpanned, ListStream, PipelineData, RawStream, ShellError, Signature,
SyntaxShape, Type, Value,
};

#[derive(Clone)]
Expand Down Expand Up @@ -147,23 +147,25 @@ impl Command for Do {
// consumes the first 65535 bytes
// So we need a thread to receive stdout message, then the current thread can continue to consume
// stderr messages.
let stdout_handler = stdout.map(|stdout_stream| {
thread::Builder::new()
.name("stderr redirector".to_string())
.spawn(move || {
let ctrlc = stdout_stream.ctrlc.clone();
let span = stdout_stream.span;
RawStream::new(
Box::new(std::iter::once(
stdout_stream.into_bytes().map(|s| s.item),
)),
ctrlc,
span,
None,
)
})
.expect("Failed to create thread")
});
let stdout_handler = stdout
.map(|stdout_stream| {
thread::Builder::new()
.name("stderr redirector".to_string())
.spawn(move || {
let ctrlc = stdout_stream.ctrlc.clone();
let span = stdout_stream.span;
RawStream::new(
Box::new(std::iter::once(
stdout_stream.into_bytes().map(|s| s.item),
)),
ctrlc,
span,
None,
)
})
.map_err(|e| e.into_spanned(call.head))
})
.transpose()?;

// Intercept stderr so we can return it in the error if the exit code is non-zero.
// The threading issues mentioned above dictate why we also need to intercept stdout.
Expand Down
30 changes: 17 additions & 13 deletions crates/nu-command/src/filesystem/save.rs
Expand Up @@ -3,6 +3,7 @@ use nu_engine::CallExt;
use nu_path::expand_path_with;
use nu_protocol::ast::{Call, Expr, Expression};
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::IntoSpanned;
use nu_protocol::{
Category, DataSource, Example, PipelineData, PipelineMetadata, RawStream, ShellError,
Signature, Span, Spanned, SyntaxShape, Type, Value,
Expand Down Expand Up @@ -123,19 +124,22 @@ impl Command for Save {
)?;

// delegate a thread to redirect stderr to result.
let handler = stderr.map(|stderr_stream| match stderr_file {
Some(stderr_file) => thread::Builder::new()
.name("stderr redirector".to_string())
.spawn(move || stream_to_file(stderr_stream, stderr_file, span, progress))
.expect("Failed to create thread"),
None => thread::Builder::new()
.name("stderr redirector".to_string())
.spawn(move || {
let _ = stderr_stream.into_bytes();
Ok(PipelineData::empty())
})
.expect("Failed to create thread"),
});
let handler = stderr
.map(|stderr_stream| match stderr_file {
Some(stderr_file) => thread::Builder::new()
.name("stderr redirector".to_string())
.spawn(move || {
stream_to_file(stderr_stream, stderr_file, span, progress)
}),
None => thread::Builder::new()
.name("stderr redirector".to_string())
.spawn(move || {
let _ = stderr_stream.into_bytes();
Ok(PipelineData::empty())
}),
})
.transpose()
.map_err(|e| e.into_spanned(span))?;

let res = stream_to_file(stream, file, span, progress);
if let Some(h) = handler {
Expand Down
45 changes: 27 additions & 18 deletions crates/nu-command/src/filters/tee.rs
Expand Up @@ -4,8 +4,8 @@ use nu_engine::{eval_block_with_early_return, CallExt};
use nu_protocol::{
ast::Call,
engine::{Closure, Command, EngineState, Stack},
Category, Example, IntoInterruptiblePipelineData, PipelineData, RawStream, ShellError,
Signature, Spanned, SyntaxShape, Type, Value,
Category, Example, IntoInterruptiblePipelineData, IntoSpanned, PipelineData, RawStream,
ShellError, Signature, Spanned, SyntaxShape, Type, Value,
};

#[derive(Clone)]
Expand Down Expand Up @@ -128,8 +128,10 @@ use it in your pipeline."#

if use_stderr {
if let Some(stderr) = stderr {
let iter = tee(stderr.stream, with_stream)
.map_err(|e| e.into_spanned(call.head))?;
let raw_stream = RawStream::new(
Box::new(tee(stderr.stream, with_stream).map(flatten_result)),
Box::new(iter.map(flatten_result)),
stderr.ctrlc,
stderr.span,
stderr.known_size,
Expand Down Expand Up @@ -158,14 +160,18 @@ use it in your pipeline."#
})
}
} else {
let stdout = stdout.map(|stdout| {
RawStream::new(
Box::new(tee(stdout.stream, with_stream).map(flatten_result)),
stdout.ctrlc,
stdout.span,
stdout.known_size,
)
});
let stdout = stdout
.map(|stdout| {
let iter = tee(stdout.stream, with_stream)
.map_err(|e| e.into_spanned(call.head))?;
Ok::<_, ShellError>(RawStream::new(
Box::new(iter.map(flatten_result)),
stdout.ctrlc,
stdout.span,
stdout.known_size,
))
})
.transpose()?;
Ok(PipelineData::ExternalStream {
stdout,
stderr,
Expand Down Expand Up @@ -201,6 +207,7 @@ use it in your pipeline."#
// Make sure to drain any iterator produced to avoid unexpected behavior
result.and_then(|data| data.drain())
})
.map_err(|e| e.into_spanned(call.head))?
.map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span)))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone());

Expand All @@ -227,7 +234,7 @@ fn flatten_result<T, E>(result: Result<Result<T, E>, E>) -> Result<T, E> {
fn tee<T>(
input: impl Iterator<Item = T>,
with_cloned_stream: impl FnOnce(mpsc::Receiver<T>) -> Result<(), ShellError> + Send + 'static,
) -> impl Iterator<Item = Result<T, ShellError>>
) -> Result<impl Iterator<Item = Result<T, ShellError>>, std::io::Error>
where
T: Clone + Send + 'static,
{
Expand All @@ -237,14 +244,13 @@ where
let mut thread = Some(
thread::Builder::new()
.name("stderr consumer".into())
.spawn(move || with_cloned_stream(rx))
.expect("could not create thread"),
.spawn(move || with_cloned_stream(rx))?,
);

let mut iter = input.into_iter();
let mut tx = Some(tx);

std::iter::from_fn(move || {
Ok(std::iter::from_fn(move || {
if thread.as_ref().is_some_and(|t| t.is_finished()) {
// Check for an error from the other thread
let result = thread
Expand Down Expand Up @@ -274,7 +280,7 @@ where
.map(Err)
})
}
})
}))
}

#[test]
Expand All @@ -289,6 +295,7 @@ fn tee_copies_values_to_other_thread_and_passes_them_through() {
}
Ok(())
})
.expect("io error")
.collect::<Result<Vec<i32>, ShellError>>()
.expect("should not produce error");

Expand All @@ -305,7 +312,8 @@ fn tee_forwards_errors_back_immediately() {
let slow_input = (0..100).inspect(|_| std::thread::sleep(Duration::from_millis(1)));
let iter = tee(slow_input, |_| {
Err(ShellError::IOError { msg: "test".into() })
});
})
.expect("io error");
for result in iter {
if let Ok(val) = result {
// should not make it to the end
Expand All @@ -331,7 +339,8 @@ fn tee_waits_for_the_other_thread() {
std::thread::sleep(Duration::from_millis(10));
waited_clone.store(true, Ordering::Relaxed);
Err(ShellError::IOError { msg: "test".into() })
});
})
.expect("io error");
let last = iter.last();
assert!(waited.load(Ordering::Relaxed), "failed to wait");
assert!(
Expand Down
2 changes: 1 addition & 1 deletion crates/nu-command/src/network/http/client.rs
Expand Up @@ -283,7 +283,7 @@ fn send_cancellable_request(
let ret = request_fn();
let _ = tx.send(ret); // may fail if the user has cancelled the operation
})
.expect("Failed to create thread");
.map_err(ShellError::from)?;

// ...and poll the channel for responses
loop {
Expand Down
17 changes: 9 additions & 8 deletions crates/nu-command/src/system/complete.rs
@@ -1,7 +1,8 @@
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, IntoPipelineData, PipelineData, Record, ShellError, Signature, Type, Value,
Category, Example, IntoPipelineData, IntoSpanned, PipelineData, Record, ShellError, Signature,
Type, Value,
};

use std::thread;
Expand Down Expand Up @@ -52,9 +53,9 @@ impl Command for Complete {
// consumes the first 65535 bytes
// So we need a thread to receive stderr message, then the current thread can continue to consume
// stdout messages.
let stderr_handler = stderr.map(|stderr| {
let stderr_span = stderr.span;
(
let stderr_handler = stderr
.map(|stderr| {
let stderr_span = stderr.span;
thread::Builder::new()
.name("stderr consumer".to_string())
.spawn(move || {
Expand All @@ -65,10 +66,10 @@ impl Command for Complete {
Ok::<_, ShellError>(Value::binary(stderr.item, stderr.span))
}
})
.expect("failed to create thread"),
stderr_span,
)
});
.map(|handle| (handle, stderr_span))
.map_err(|err| err.into_spanned(call.head))
})
.transpose()?;

if let Some(stdout) = stdout {
let stdout = stdout.into_bytes()?;
Expand Down
7 changes: 4 additions & 3 deletions crates/nu-command/src/system/run_external.rs
Expand Up @@ -2,6 +2,7 @@ use nu_cmd_base::hook::eval_hook;
use nu_engine::env_to_strings;
use nu_engine::eval_expression;
use nu_engine::CallExt;
use nu_protocol::IntoSpanned;
use nu_protocol::NuGlob;
use nu_protocol::{
ast::{Call, Expr},
Expand Down Expand Up @@ -438,7 +439,7 @@ impl ExternalCommand {

Ok(())
})
.expect("Failed to create thread");
.map_err(|e| e.into_spanned(head))?;
}
}

Expand Down Expand Up @@ -526,7 +527,7 @@ impl ExternalCommand {
Ok(())
}
}
}).expect("Failed to create thread");
}).map_err(|e| e.into_spanned(head))?;

let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT);
if redirect_stderr {
Expand All @@ -543,7 +544,7 @@ impl ExternalCommand {
read_and_redirect_message(stderr, stderr_tx, stderr_ctrlc);
Ok::<(), ShellError>(())
})
.expect("Failed to create thread");
.map_err(|e| e.into_spanned(head))?;
}

let stdout_receiver = ChannelReceiver::new(stdout_rx);
Expand Down
15 changes: 8 additions & 7 deletions crates/nu-engine/src/eval.rs
Expand Up @@ -7,8 +7,8 @@ use nu_protocol::{
},
engine::{Closure, EngineState, Stack},
eval_base::Eval,
Config, DeclId, IntoPipelineData, PipelineData, RawStream, ShellError, Span, Spanned, Type,
Value, VarId, ENV_VARIABLE_ID,
Config, DeclId, IntoPipelineData, IntoSpanned, PipelineData, RawStream, ShellError, Span,
Spanned, Type, Value, VarId, ENV_VARIABLE_ID,
};
use std::thread::{self, JoinHandle};
use std::{borrow::Cow, collections::HashMap};
Expand Down Expand Up @@ -542,7 +542,7 @@ fn eval_element_with_input(
stderr_stack,
save_call,
input,
));
)?);
let (result_out_stream, result_err_stream) = if result_is_out {
(result_out_stream, None)
} else {
Expand Down Expand Up @@ -1090,8 +1090,9 @@ impl DataSaveJob {
mut stack: Stack,
save_call: Call,
input: PipelineData,
) -> Self {
Self {
) -> Result<Self, ShellError> {
let span = save_call.head;
Ok(Self {
inner: thread::Builder::new()
.name("stderr saver".to_string())
.spawn(move || {
Expand All @@ -1100,8 +1101,8 @@ impl DataSaveJob {
eprintln!("WARNING: error occurred when redirect to stderr: {:?}", err);
}
})
.expect("Failed to create thread"),
}
.map_err(|e| e.into_spanned(span))?,
})
}

pub fn join(self) -> thread::Result<()> {
Expand Down