Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/ark/src/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use harp::exec::RFunction;
use harp::object::RObject;
use libR_sys::*;

use crate::interface::KERNEL;
use crate::interface::R_MAIN;

pub static mut PORT: u16 = 0;

Expand Down Expand Up @@ -48,8 +48,10 @@ unsafe fn handle_help_url(url: &str) -> Result<bool> {
focus: true,
});

let kernel = KERNEL.as_ref().unwrap().lock().unwrap();
let main = unsafe { R_MAIN.as_ref().unwrap() };
let kernel = main.kernel.lock().unwrap();
kernel.send_event(event);

Ok(true)
}

Expand Down
143 changes: 87 additions & 56 deletions crates/ark/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,24 +135,53 @@ extern "C" fn handle_interrupt(_signal: libc::c_int) {
// These values must be global in order for them to be accessible from R
// callbacks, which do not have a facility for passing or returning context.

/// The global R kernel state.
pub static mut KERNEL: Option<Arc<Mutex<Kernel>>> = None;

/// A lock guard, used to manage access to the R runtime. The main thread holds
/// the lock by default, but releases it at opportune times to allow the LSP to
/// access the R runtime where appropriate.
pub static mut R_RUNTIME_LOCK_GUARD: Option<ReentrantMutexGuard<()>> = None;

/// A channel that sends prompts from R to the kernel
static mut RPROMPT_SEND: Option<Mutex<Sender<PromptInfo>>> = None;

/// A channel that receives console input from the kernel and sends it to R;
/// sending empty input (None) tells R to shut down
static mut CONSOLE_RECV: Option<Mutex<Receiver<Option<String>>>> = None;

/// Ensures that the kernel is only ever initialized once
static INIT: Once = Once::new();

// The global state used by R callbacks.
//
// Doesn't need a mutex because it's only accessed by the R thread. Should
// not be used elsewhere than from an R frontend callback or an R function
// invoked by the REPL.
pub static mut R_MAIN: Option<RMain> = None;

pub struct RMain {
/// A channel that sends prompts from R to the kernel
prompt_tx: Sender<PromptInfo>,

/// A channel that receives console input from the kernel and sends it
/// to R; sending empty input (None) tells R to shut down
console_rx: Receiver<Option<String>>,

/// A lock guard, used to manage access to the R runtime. The main
/// thread holds the lock by default, but releases it at opportune
/// times to allow the LSP to access the R runtime where appropriate.
runtime_lock_guard: Option<ReentrantMutexGuard<'static, ()>>,

/// Shared reference to kernel. Currently used by the ark-execution
/// thread, the R frontend callbacks, and LSP routines called from R
pub kernel: Arc<Mutex<Kernel>>,
}

impl RMain {
pub fn new(
prompt_tx: Sender<PromptInfo>,
console_rx: Receiver<Option<String>>,
kernel: Arc<Mutex<Kernel>>,
) -> Self {
// The main thread owns the R runtime lock by default, but releases
// it when appropriate to give other threads a chance to execute.
let lock_guard = unsafe { R_RUNTIME_LOCK.lock() };

Self {
prompt_tx,
console_rx,
runtime_lock_guard: Some(lock_guard),
kernel,
}
}
}

pub unsafe fn process_events() {
// Don't process interrupts in this scope.
let _interrupts_suspended = RInterruptsSuspendedScope::new();
Expand Down Expand Up @@ -212,6 +241,8 @@ pub extern "C" fn r_read_console(
buflen: c_int,
_hist: c_int,
) -> i32 {
let main = unsafe { R_MAIN.as_mut().unwrap() };

let info = prompt_info(prompt);
debug!("R prompt: {}", info.prompt);

Expand All @@ -229,12 +260,7 @@ pub extern "C" fn r_read_console(
}

// TODO: if R prompt is +, we need to tell the user their input is incomplete
let mutex = unsafe { RPROMPT_SEND.as_ref().unwrap() };
let r_prompt_tx = mutex.lock().unwrap();
r_prompt_tx.send(info).unwrap();

let mutex = unsafe { CONSOLE_RECV.as_ref().unwrap() };
let receiver = mutex.lock().unwrap();
main.prompt_tx.send(info).unwrap();

// Match with a timeout. Necessary because we need to
// pump the event loop while waiting for console input.
Expand All @@ -244,12 +270,12 @@ pub extern "C" fn r_read_console(
// available data?
loop {
// Release the R runtime lock while we're waiting for input.
unsafe { R_RUNTIME_LOCK_GUARD = None };
main.runtime_lock_guard = None;

match receiver.recv_timeout(Duration::from_millis(200)) {
match main.console_rx.recv_timeout(Duration::from_millis(200)) {
Ok(response) => {
// Take back the lock after we've received some console input.
unsafe { R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock()) };
unsafe { main.runtime_lock_guard = Some(R_RUNTIME_LOCK.lock()) };

// If we received an interrupt while the user was typing input,
// we can assume the interrupt was 'handled' and so reset the flag.
Expand All @@ -268,7 +294,7 @@ pub extern "C" fn r_read_console(
},

Err(error) => {
unsafe { R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock()) };
unsafe { main.runtime_lock_guard = Some(R_RUNTIME_LOCK.lock()) };

use RecvTimeoutError::*;
match error {
Expand Down Expand Up @@ -346,13 +372,15 @@ fn prompt_info(prompt_c: *const c_char) -> PromptInfo {
#[no_mangle]
pub extern "C" fn r_write_console(buf: *const c_char, _buflen: i32, otype: i32) {
let content = unsafe { CStr::from_ptr(buf) };
let mutex = unsafe { KERNEL.as_ref().unwrap() };
let stream = if otype == 0 {
Stream::Stdout
} else {
Stream::Stderr
};
let mut kernel = mutex.lock().unwrap();

let main = unsafe { R_MAIN.as_ref().unwrap() };
let mut kernel = main.kernel.lock().unwrap();

kernel.write_console(content.to_str().unwrap(), stream);
}

Expand All @@ -365,8 +393,8 @@ pub extern "C" fn r_show_message(buf: *const c_char) {
let message = unsafe { CStr::from_ptr(buf) };

// Wait for a lock on the kernel
let mutex = unsafe { KERNEL.as_ref().unwrap() };
let kernel = mutex.lock().unwrap();
let main = unsafe { R_MAIN.as_ref().unwrap() };
let kernel = main.kernel.lock().unwrap();

// Create an event representing the message
let event = PositronEvent::ShowMessage(ShowMessageEvent {
Expand Down Expand Up @@ -395,8 +423,8 @@ pub extern "C" fn r_busy(which: i32) {
initialize_signal_handlers();

// Wait for a lock on the kernel
let mutex = unsafe { KERNEL.as_ref().unwrap() };
let kernel = mutex.lock().unwrap();
let main = unsafe { R_MAIN.as_ref().unwrap() };
let kernel = main.kernel.lock().unwrap();

// Create an event representing the new busy state
let event = PositronEvent::Busy(BusyEvent { busy: which != 0 });
Expand All @@ -406,6 +434,8 @@ pub extern "C" fn r_busy(which: i32) {
}

pub unsafe extern "C" fn r_polled_events() {
let main = unsafe { R_MAIN.as_mut().unwrap() };

// Check for pending tasks.
let count = R_RUNTIME_LOCK_COUNT.load(std::sync::atomic::Ordering::Acquire);
if count == 0 {
Expand All @@ -421,7 +451,7 @@ pub unsafe extern "C" fn r_polled_events() {
// `bump()` does a fair unlock, giving other threads
// waiting for the lock a chance to acquire it, and then
// relocks it.
ReentrantMutexGuard::bump(R_RUNTIME_LOCK_GUARD.as_mut().unwrap());
ReentrantMutexGuard::bump(main.runtime_lock_guard.as_mut().unwrap());

info!(
"The main thread re-acquired the R runtime lock after {} milliseconds.",
Expand All @@ -434,27 +464,21 @@ pub fn start_r(
kernel_init_tx: Bus<KernelInfo>,
shell_request_rx: Receiver<Request>,
) {
use std::borrow::BorrowMut;

// The main thread owns the R runtime lock by default, but releases
// it when appropriate to give other threads a chance to execute.
unsafe { R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock()) };

// Start building the channels + kernel objects
let (console_tx, console_rx) = crossbeam::channel::unbounded();
let (rprompt_tx, rprompt_rx) = crossbeam::channel::unbounded();

let kernel = Kernel::new(iopub_tx, console_tx.clone(), kernel_init_tx);
let kernel_mutex = Arc::new(Mutex::new(kernel));

// Initialize kernel (ensure we only do this once!)
// Initialize global state (ensure we only do this once!)
INIT.call_once(|| unsafe {
*CONSOLE_RECV.borrow_mut() = Some(Mutex::new(console_rx));
*RPROMPT_SEND.borrow_mut() = Some(Mutex::new(rprompt_tx));
*KERNEL.borrow_mut() = Some(Arc::new(Mutex::new(kernel)));
R_MAIN = Some(RMain::new(rprompt_tx, console_rx, kernel_mutex.clone()));
});

// Start thread to listen to execution requests
spawn!("ark-execution", move || {
listen(shell_request_rx, rprompt_rx)
listen(shell_request_rx, rprompt_rx, kernel_mutex.clone())
});

unsafe {
Expand Down Expand Up @@ -521,30 +545,35 @@ pub fn start_r(
}
}

fn handle_r_request(req: &Request, prompt_recv: &Receiver<PromptInfo>) {
fn handle_r_request(
req: &Request,
prompt_recv: &Receiver<PromptInfo>,
kernel_mutex: Arc<Mutex<Kernel>>,
) {
// Service the request.
let mutex = unsafe { KERNEL.as_ref().unwrap() };
{
let mut kernel = mutex.lock().unwrap();
let mut kernel = kernel_mutex.lock().unwrap();
kernel.fulfill_request(&req)
}

// If this is an execution request, complete it by waiting for R to prompt
// us before we process another request
if let Request::ExecuteCode(_, _, _) = req {
complete_execute_request(req, prompt_recv);
complete_execute_request(req, prompt_recv, kernel_mutex);
}
}

fn complete_execute_request(req: &Request, prompt_recv: &Receiver<PromptInfo>) {
let mutex = unsafe { KERNEL.as_ref().unwrap() };

fn complete_execute_request(
req: &Request,
prompt_recv: &Receiver<PromptInfo>,
kernel_mutex: Arc<Mutex<Kernel>>,
) {
// Wait for R to prompt us again. This signals that the
// execution is finished and R is ready for input again.
trace!("Waiting for R prompt signaling completion of execution...");
let prompt_info = prompt_recv.recv().unwrap();
let prompt = prompt_info.prompt;
let kernel = mutex.lock().unwrap();
let kernel = kernel_mutex.lock().unwrap();

// Signal prompt
EVENTS.console_prompt.emit(());
Expand All @@ -570,7 +599,11 @@ fn complete_execute_request(req: &Request, prompt_recv: &Receiver<PromptInfo>) {
return kernel.finish_request();
}

pub fn listen(exec_recv: Receiver<Request>, prompt_recv: Receiver<PromptInfo>) {
pub fn listen(
exec_recv: Receiver<Request>,
prompt_recv: Receiver<PromptInfo>,
kernel_mutex: Arc<Mutex<Kernel>>,
) {
// Before accepting execution requests from the front end, wait for R to
// prompt us for input.
trace!("Waiting for R's initial input prompt...");
Expand All @@ -580,17 +613,15 @@ pub fn listen(exec_recv: Receiver<Request>, prompt_recv: Receiver<PromptInfo>) {
info.prompt
);

// Mark kernel as initialized as soon as we get the first input prompt from R
let mutex = unsafe { KERNEL.as_ref().unwrap() };
{
let mut kernel = mutex.lock().unwrap();
let mut kernel = kernel_mutex.lock().unwrap();
kernel.complete_initialization();
}

loop {
// Wait for an execution request from the front end.
match exec_recv.recv() {
Ok(req) => handle_r_request(&req, &prompt_recv),
Ok(req) => handle_r_request(&req, &prompt_recv, kernel_mutex.clone()),
Err(err) => warn!("Could not receive execution request from kernel: {}", err),
}
}
Expand Down