Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ If changes are needed in these files, that must happen in the separate Positron

- Use fully qualified result types (`anyhow::Result`) instead of importing them.

- You can log `Result::Err` by using the `.log_err()` method from the extension trait `stdext::ResultExt`. Add some `.context()` if that would be helpful, but never do it for errors that are quite unexpected, such as from `.send()` to a channel (that would be too verbose).

- When writing tests, prefer simple assertion macros without custom error messages:
- Use `assert_eq!(actual, expected);` instead of `assert_eq!(actual, expected, "custom message");`
- Use `assert!(condition);` instead of `assert!(condition, "custom message");`
Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions crates/amalthea/src/comm/comm_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crossbeam::channel::Select;
use crossbeam::channel::Sender;
use log::info;
use log::warn;
use stdext::result::ResultOrLog;
use stdext::result::ResultExt;
use stdext::spawn;

use crate::comm::comm_channel::CommMsg;
Expand Down Expand Up @@ -161,9 +161,7 @@ impl CommManager {
if let Some(index) = index {
// Notify the comm that it's been closed
let comm = self.open_comms.get(index).unwrap();
comm.incoming_tx
.send(CommMsg::Close)
.or_log_error("Failed to send comm_close to comm.");
comm.incoming_tx.send(CommMsg::Close).log_err();

// Remove it from our list of open comms
self.open_comms.remove(index);
Expand Down
7 changes: 2 additions & 5 deletions crates/amalthea/src/socket/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::sync::Mutex;
use crossbeam::channel::Receiver;
use crossbeam::channel::Sender;
use futures::executor::block_on;
use stdext::result::ResultOrLog;
use stdext::result::ResultExt;

use crate::comm::comm_channel::comm_rpc_message;
use crate::comm::comm_channel::Comm;
Expand Down Expand Up @@ -429,10 +429,7 @@ impl Shell {
// comm has been opened
self.comm_manager_tx
.send(CommManagerEvent::Opened(comm_socket.clone(), comm_data))
.or_log_warning(&format!(
"Failed to send '{}' comm open notification to listener thread",
comm_socket.comm_name
));
.log_err();

// If the comm wraps a server, send notification once the server is ready to
// accept connections. This also sends back the port number to connect on. Failing
Expand Down
9 changes: 5 additions & 4 deletions crates/ark/src/dap/dap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use amalthea::comm::server_comm::ServerStartedMessage;
use amalthea::language::server_handler::ServerHandler;
use crossbeam::channel::Sender;
use harp::object::RObject;
use stdext::log_error;
use stdext::result::ResultExt;
use stdext::spawn;

use crate::dap::dap_r_main::FrameInfo;
Expand Down Expand Up @@ -134,14 +134,15 @@ impl Dap {

if self.is_debugging {
if let Some(tx) = &self.backend_events_tx {
log_error!(tx.send(DapBackendEvent::Stopped(DapStoppedEvent { preserve_focus })));
tx.send(DapBackendEvent::Stopped(DapStoppedEvent { preserve_focus }))
.log_err();
}
} else {
if let Some(tx) = &self.comm_tx {
// Ask frontend to connect to the DAP
log::trace!("DAP: Sending `start_debug` event");
let msg = amalthea::comm_rpc_message!("start_debug");
log_error!(tx.send(msg));
tx.send(msg).log_err();
}

self.is_debugging = true;
Expand All @@ -162,7 +163,7 @@ impl Dap {
// terminate the debugging session and disconnect.
if let Some(tx) = &self.backend_events_tx {
log::trace!("DAP: Sending `stop_debug` event");
log_error!(tx.send(DapBackendEvent::Terminated));
tx.send(DapBackendEvent::Terminated).log_err();
}
}
// else: If not connected to a frontend, the DAP client should
Expand Down
4 changes: 2 additions & 2 deletions crates/ark/src/dap/dap_r_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use libr::INTSXP;
use libr::SET_INTEGER_ELT;
use libr::SEXP;
use libr::VECTOR_ELT;
use stdext::log_error;
use stdext::result::ResultExt;

use crate::dap::dap::DapBackendEvent;
use crate::dap::Dap;
Expand Down Expand Up @@ -180,7 +180,7 @@ impl RMainDap {
pub fn send_dap(&self, event: DapBackendEvent) {
let dap = self.dap.lock().unwrap();
if let Some(tx) = &dap.backend_events_tx {
log_error!(tx.send(event));
tx.send(event).log_err();
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/ark/src/dap/dap_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use dap::requests::*;
use dap::responses::*;
use dap::server::ServerOutput;
use dap::types::*;
use stdext::result::ResultOrLog;
use stdext::result::ResultExt;
use stdext::spawn;

use super::dap::Dap;
Expand Down Expand Up @@ -73,7 +73,7 @@ pub fn start_dap(
// Send the port back to `Shell` and eventually out to the frontend so it can connect
server_started_tx
.send(ServerStartedMessage::new(port))
.or_log_error("DAP: Can't send init notification");
.log_err();

loop {
log::trace!("DAP: Waiting for client");
Expand Down
8 changes: 5 additions & 3 deletions crates/ark/src/data_explorer/r_data_explorer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use amalthea::socket::comm::CommInitiator;
use amalthea::socket::comm::CommSocket;
use anyhow::anyhow;
use anyhow::bail;
use anyhow::Context;
use crossbeam::channel::unbounded;
use crossbeam::channel::Sender;
use crossbeam::select;
Expand All @@ -84,7 +85,7 @@ use libr::*;
use serde::Deserialize;
use serde::Serialize;
use stdext::local;
use stdext::result::ResultOrLog;
use stdext::result::ResultExt;
use stdext::spawn;
use stdext::unwrap;
use tracing::Instrument;
Expand Down Expand Up @@ -236,7 +237,7 @@ impl RDataExplorer {
// the schema
comm_manager_tx
.send(CommManagerEvent::Closed(comm.comm_id))
.or_log_error("Error sending comm closed event")
.log_err();
},
}
});
Expand Down Expand Up @@ -680,7 +681,8 @@ impl RDataExplorer {
handle_columns_profiles_requests(params, comm)
.instrument(tracing::info_span!("get_columns_profile", ns = id))
.await
.or_log_error("Unable to handle get_columns_profile");
.context("Unable to handle get_columns_profile")
.log_err();
});
}

Expand Down
5 changes: 3 additions & 2 deletions crates/ark/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ use libr::SEXP;
use once_cell::sync::Lazy;
use regex::Regex;
use serde_json::json;
use stdext::result::ResultOrLog;
use stdext::result::ResultExt;
use stdext::*;
use tokio::sync::mpsc::UnboundedReceiver as AsyncUnboundedReceiver;
use uuid::Uuid;
Expand Down Expand Up @@ -440,7 +440,8 @@ impl RMain {
// Optionally run a frontend specified R startup script (after harp init)
if let Some(file) = &startup_file {
harp::source(file)
.or_log_error(&format!("Failed to source startup file '{file}' due to"));
.context(format!("Failed to source startup file '{file}' due to"))
.log_err();
}

// Initialize support functions (after routine registration, after r_task initialization)
Expand Down
6 changes: 4 additions & 2 deletions crates/ark/src/lsp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ use amalthea::comm::server_comm::ServerStartMessage;
use amalthea::comm::server_comm::ServerStartedMessage;
use amalthea::comm::ui_comm::ShowMessageParams as UiShowMessageParams;
use amalthea::comm::ui_comm::UiFrontendEvent;
use anyhow::Context;
use crossbeam::channel::Sender;
use serde_json::Value;
use stdext::result::ResultOrLog;
use stdext::result::ResultExt;
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::unbounded_channel as tokio_unbounded_channel;
Expand Down Expand Up @@ -501,7 +502,8 @@ pub fn start_lsp(
// Send the port back to `Shell` and eventually out to the frontend so it can connect
server_started_tx
.send(ServerStartedMessage::new(port))
.or_log_error("LSP: Can't send server started notification");
.context("LSP: Can't send server started notification")
.log_err();

log::trace!("LSP: Waiting for client");
let (stream, address) = listener.accept().await.unwrap();
Expand Down
10 changes: 6 additions & 4 deletions crates/ark/src/plots/graphics_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use amalthea::wire::display_data::DisplayData;
use amalthea::wire::update_display_data::TransientValue;
use amalthea::wire::update_display_data::UpdateDisplayData;
use anyhow::anyhow;
use anyhow::Context;
use base64::engine::general_purpose;
use base64::Engine;
use crossbeam::channel::Select;
Expand All @@ -43,7 +44,7 @@ use libr::pGEcontext;
use libr::R_NilValue;
use libr::SEXP;
use serde_json::json;
use stdext::result::ResultOrLog;
use stdext::result::ResultExt;
use stdext::unwrap;
use tokio::sync::mpsc::UnboundedReceiver as AsyncUnboundedReceiver;
use uuid::Uuid;
Expand Down Expand Up @@ -567,7 +568,7 @@ impl DeviceContext {
metadata,
transient,
}))
.or_log_warning(&format!("Could not publish display data on IOPub."));
.log_err();
}

fn process_update_plot(&self, id: &PlotId) {
Expand Down Expand Up @@ -620,7 +621,8 @@ impl DeviceContext {
socket
.outgoing_tx
.send(CommMsg::Data(value))
.or_log_error("Failed to send update message for id {id}.");
.context("Failed to send update message for id {id}.")
.log_err();
}

#[tracing::instrument(level = "trace", skip_all, fields(id = %id))]
Expand All @@ -647,7 +649,7 @@ impl DeviceContext {
metadata,
transient,
}))
.or_log_warning(&format!("Could not publish update display data on IOPub."));
.log_err();
}

fn create_display_data_plot(&self, id: &PlotId) -> Result<serde_json::Value, anyhow::Error> {
Expand Down
16 changes: 6 additions & 10 deletions crates/ark/src/reticulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use amalthea::comm::event::CommManagerEvent;
use amalthea::comm_rpc_message;
use amalthea::socket::comm::CommInitiator;
use amalthea::socket::comm::CommSocket;
use anyhow::Context;
use crossbeam::channel::Sender;
use harp::utils::r_is_null;
use harp::RObject;
use libr::R_NilValue;
use libr::SEXP;
use serde_json::json;
use stdext::result::ResultOrLog;
use stdext::result::ResultExt;
use stdext::spawn;
use stdext::unwrap;
use uuid::Uuid;
Expand Down Expand Up @@ -63,15 +64,13 @@ impl ReticulateService {
"reticulate_id": (*RETICULATE_ID).clone(),
}),
);
service
.comm_manager_tx
.send(event)
.or_log_error("Reticulate: Could not open comm.");
service.comm_manager_tx.send(event).log_err();

spawn!(format!("ark-reticulate-{}", comm_id), move || {
service
.handle_messages()
.or_log_error("Reticulate: Error handling messages");
.context("Reticulate: Error handling messages")
.log_err();
});

Ok(())
Expand All @@ -92,10 +91,7 @@ impl ReticulateService {
}

// before finalizing the thread we make sure to send a close message to the front end
self.comm
.outgoing_tx
.send(CommMsg::Close)
.or_log_error("Reticulate: Could not send close message to the front-end");
self.comm.outgoing_tx.send(CommMsg::Close).log_err();

// Reset the global soccket before closing
log::info!("Reticulate Thread closing {:?}", self.comm.comm_id);
Expand Down
1 change: 1 addition & 0 deletions crates/stdext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license.workspace = true
rust-version.workspace = true

[dependencies]
anyhow = "1.0.100"
log = "0.4.18"

[features]
Expand Down
Loading