Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 35 additions & 24 deletions crates/ark/src/reticulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use amalthea::comm::event::CommManagerEvent;
use amalthea::socket::comm::CommInitiator;
use amalthea::socket::comm::CommSocket;
use crossbeam::channel::Sender;
use harp::utils::r_is_null;
use harp::RObject;
use libr::R_NilValue;
use libr::SEXP;
Expand All @@ -18,21 +19,28 @@ use uuid::Uuid;

use crate::interface::RMain;

static RETICULATE_COMM_ID: LazyLock<Mutex<Option<String>>> = LazyLock::new(|| Mutex::new(None));
static RETICULATE_OUTGOING_TX: LazyLock<Mutex<Option<Sender<CommMsg>>>> =
LazyLock::new(|| Mutex::new(None));

#[derive(Clone)]
pub struct ReticulateService {
comm: CommSocket,
comm_manager_tx: Sender<CommManagerEvent>,
}

impl ReticulateService {
fn start(comm_id: String, comm_manager_tx: Sender<CommManagerEvent>) -> anyhow::Result<String> {
fn start(comm_id: String, comm_manager_tx: Sender<CommManagerEvent>) -> anyhow::Result<()> {
let comm = CommSocket::new(
CommInitiator::BackEnd,
comm_id.clone(),
String::from("positron.reticulate"),
);

{
let mut outgoing_tx_guard = RETICULATE_OUTGOING_TX.lock().unwrap();
*outgoing_tx_guard = Some(comm.outgoing_tx.clone());
}

let service = Self {
comm,
comm_manager_tx,
Expand All @@ -50,16 +58,18 @@ impl ReticulateService {
.or_log_error("Reticulate: Error handling messages");
});

Ok(comm_id)
Ok(())
}

fn handle_messages(&self) -> Result<(), anyhow::Error> {
loop {
let msg = unwrap!(self.comm.incoming_rx.recv(), Err(err) => {
let msg: CommMsg = unwrap!(self.comm.incoming_rx.recv(), Err(err) => {
log::error!("Reticulate: Error while receiving message from frontend: {err:?}");
break;
});

log::trace!("Reticulate: Received message from front end: {msg:?}");

if let CommMsg::Close = msg {
break;
}
Expand All @@ -71,10 +81,12 @@ impl ReticulateService {
.send(CommMsg::Close)
.or_log_error("Reticulate: Could not send close message to the front-end");

// Reset the global comm_id before closing
let mut comm_id_guard = RETICULATE_COMM_ID.lock().unwrap();
log::info!("Reticulate Thread closing {:?}", (*comm_id_guard).clone());
*comm_id_guard = None;
// Reset the global soccket before closing
log::info!("Reticulate Thread closing {:?}", self.comm.comm_id);
{
let mut outgoing_tx_guard = RETICULATE_OUTGOING_TX.lock().unwrap();
*outgoing_tx_guard = None;
}

Ok(())
}
Expand All @@ -89,28 +101,27 @@ pub unsafe extern "C" fn ps_reticulate_open(input: SEXP) -> Result<SEXP, anyhow:
let main = RMain::get();

let input: RObject = input.try_into()?;
let input_code: Option<String> = input.try_into()?;

let mut comm_id_guard = RETICULATE_COMM_ID.lock().unwrap();
// Reticulate sends `NULL` or a string with the code to be executed in the Python console.
let input_code: Option<String> = if r_is_null(input.sexp) {
None
} else {
Some(input.try_into()?)
};

// If there's an id already registered, we just need to send the focus event
if let Some(id) = comm_id_guard.deref() {
let outgoing_tx_guard = RETICULATE_OUTGOING_TX.lock().unwrap();
if let Some(outgoing_tx) = outgoing_tx_guard.deref() {
// There's a comm_id registered, we just send the focus event
main.get_comm_manager_tx().send(CommManagerEvent::Message(
id.clone(),
CommMsg::Data(json!({
"method": "focus",
"params": {
"input": input_code
}
})),
))?;
outgoing_tx.send(CommMsg::Data(json!({
"method": "focus",
"params": {
"input": input_code
}
})))?;
return Ok(R_NilValue);
}

let id = Uuid::new_v4().to_string();
*comm_id_guard = Some(id.clone());

let id = format!("reticulate-{}", Uuid::new_v4().to_string());
ReticulateService::start(id, main.get_comm_manager_tx().clone())?;

Ok(R_NilValue)
Expand Down
Loading