-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathinterface.rs
More file actions
1799 lines (1548 loc) · 66.9 KB
/
interface.rs
File metadata and controls
1799 lines (1548 loc) · 66.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//
// interface.rs
//
// Copyright (C) 2023-2024 Posit Software, PBC. All rights reserved.
//
//
// All code in this file runs synchronously with R. We store the global
// state inside of a global `R_MAIN` singleton that implements `RMain`.
// The frontend methods called by R are forwarded to the corresponding
// `RMain` methods via `R_MAIN`.
use std::collections::HashMap;
use std::ffi::*;
use std::os::raw::c_uchar;
use std::path::PathBuf;
use std::result::Result::Ok;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Poll;
use std::time::Duration;
use amalthea::comm::base_comm::JsonRpcReply;
use amalthea::comm::event::CommManagerEvent;
use amalthea::comm::ui_comm::ui_frontend_reply_from_value;
use amalthea::comm::ui_comm::BusyParams;
use amalthea::comm::ui_comm::PromptStateParams;
use amalthea::comm::ui_comm::ShowMessageParams;
use amalthea::comm::ui_comm::UiFrontendEvent;
use amalthea::comm::ui_comm::UiFrontendRequest;
use amalthea::socket::iopub::IOPubMessage;
use amalthea::socket::iopub::Wait;
use amalthea::socket::stdin::StdInRequest;
use amalthea::wire::exception::Exception;
use amalthea::wire::execute_error::ExecuteError;
use amalthea::wire::execute_input::ExecuteInput;
use amalthea::wire::execute_reply::ExecuteReply;
use amalthea::wire::execute_reply_exception::ExecuteReplyException;
use amalthea::wire::execute_request::ExecuteRequest;
use amalthea::wire::execute_response::ExecuteResponse;
use amalthea::wire::execute_result::ExecuteResult;
use amalthea::wire::input_reply::InputReply;
use amalthea::wire::input_request::InputRequest;
use amalthea::wire::input_request::ShellInputRequest;
use amalthea::wire::input_request::StdInRpcReply;
use amalthea::wire::input_request::UiCommFrontendRequest;
use amalthea::wire::jupyter_message::Status;
use amalthea::wire::originator::Originator;
use amalthea::wire::stream::Stream;
use amalthea::wire::stream::StreamOutput;
use amalthea::Error;
use anyhow::*;
use bus::Bus;
use crossbeam::channel::bounded;
use crossbeam::channel::unbounded;
use crossbeam::channel::Receiver;
use crossbeam::channel::Sender;
use crossbeam::select;
use harp::environment::r_ns_env;
use harp::environment::Environment;
use harp::environment::R_ENVS;
use harp::exec::r_check_stack;
use harp::exec::r_peek_error_buffer;
use harp::exec::r_sandbox;
use harp::exec::RFunction;
use harp::exec::RFunctionExt;
use harp::library::RLibraries;
use harp::line_ending::convert_line_endings;
use harp::line_ending::LineEnding;
use harp::object::r_null_or_try_into;
use harp::object::RObject;
use harp::r_symbol;
use harp::routines::r_register_routines;
use harp::session::r_traceback;
use harp::utils::r_is_data_frame;
use harp::utils::r_pairlist_any;
use harp::utils::r_typeof;
use harp::R_MAIN_THREAD_ID;
use libr::R_BaseNamespace;
use libr::R_GlobalEnv;
use libr::R_ProcessEvents;
use libr::R_RunPendingFinalizers;
use libr::Rf_error;
use libr::Rf_findVarInFrame;
use libr::Rf_onintr;
use libr::SEXP;
use once_cell::sync::Lazy;
use regex::Regex;
use serde_json::json;
use stdext::result::ResultOrLog;
use stdext::*;
use uuid::Uuid;
use crate::dap::dap::DapBackendEvent;
use crate::dap::dap_r_main::RMainDap;
use crate::dap::Dap;
use crate::errors;
use crate::help::message::HelpEvent;
use crate::help::r_help::RHelp;
use crate::kernel::Kernel;
use crate::lsp::events::EVENTS;
use crate::lsp::main_loop::Event;
use crate::lsp::main_loop::KernelNotification;
use crate::lsp::main_loop::TokioUnboundedSender;
use crate::lsp::state_handlers::ConsoleInputs;
use crate::modules;
use crate::plots::graphics_device;
use crate::r_task;
use crate::r_task::BoxFuture;
use crate::r_task::RTask;
use crate::r_task::RTaskStartInfo;
use crate::r_task::RTaskStatus;
use crate::request::debug_request_command;
use crate::request::RRequest;
use crate::signals::initialize_signal_handlers;
use crate::signals::interrupts_pending;
use crate::signals::set_interrupts_pending;
use crate::srcref::ns_populate_srcref;
use crate::srcref::resource_loaded_namespaces;
use crate::startup;
use crate::sys::console::console_to_utf8;
/// An enum representing the different modes in which the R session can run.
#[derive(PartialEq, Clone)]
pub enum SessionMode {
/// A session with an interactive console (REPL), such as in Positron.
Console,
/// A session in a Jupyter or Jupyter-like notebook.
Notebook,
/// A background session, typically not connected to any UI.
Background,
}
// --- Globals ---
// These values must be global in order for them to be accessible from R
// callbacks, which do not have a facility for passing or returning context.
// We use the `once_cell` crate for init synchronisation because the stdlib
// equivalent `std::sync::Once` does not have a `wait()` method.
/// Ensures that the kernel is only ever initialized once. Used to wait for the
/// `RMain` singleton initialization in `RMain::wait_initialized()`.
static R_MAIN_INIT: once_cell::sync::OnceCell<()> = once_cell::sync::OnceCell::new();
/// Used to wait for complete R startup in `RMain::wait_r_initialized()`.
static R_INIT: once_cell::sync::OnceCell<()> = once_cell::sync::OnceCell::new();
// The global state used by R callbacks.
//
// Doesn't need a mutex because it's only accessed by the R thread. Should
// not be used elsewhere than from an R frontend callback or an R function
// invoked by the REPL (this is enforced by `RMain::get()` and
// `RMain::get_mut()`).
static mut R_MAIN: Option<RMain> = None;
pub struct RMain {
kernel_init_tx: Bus<KernelInfo>,
/// Whether we are running in Console, Notebook, or Background mode.
pub session_mode: SessionMode,
/// Channel used to send along messages relayed on the open comms.
comm_manager_tx: Sender<CommManagerEvent>,
/// Execution requests from the frontend. Processed from `ReadConsole()`.
/// Requests for code execution provide input to that method.
r_request_rx: Receiver<RRequest>,
/// Input requests to the frontend. Processed from `ReadConsole()`
/// calls triggered by e.g. `readline()`.
stdin_request_tx: Sender<StdInRequest>,
/// Input replies from the frontend. Waited on in `ReadConsole()` after a request.
stdin_reply_rx: Receiver<amalthea::Result<InputReply>>,
/// IOPub channel for broadcasting outputs
iopub_tx: Sender<IOPubMessage>,
/// Active request passed to `ReadConsole()`. Contains response channel
/// the reply should be send to once computation has finished.
active_request: Option<ActiveReadConsoleRequest>,
/// Execution request counter used to populate `In[n]` and `Out[n]` prompts
execution_count: u32,
/// Accumulated top-level output for the current execution.
/// This is the output emitted by R's autoprint and propagated as
/// `execute_result` Jupyter messages instead of `stream` messages.
autoprint_output: String,
/// Accumulated output during startup
banner_output: String,
/// Channel to send and receive tasks from `RTask`s
tasks_interrupt_rx: Receiver<RTask>,
tasks_idle_rx: Receiver<RTask>,
pending_futures: HashMap<Uuid, (BoxFuture<'static, ()>, RTaskStartInfo)>,
/// Shared reference to kernel. Currently used by the ark-execution
/// thread, the R frontend callbacks, and LSP routines called from R
kernel: Arc<Mutex<Kernel>>,
/// Represents whether an error occurred during R code execution.
pub error_occurred: bool,
pub error_message: String, // `evalue` in the Jupyter protocol
pub error_traceback: Vec<String>,
/// Channel to communicate with the Help thread
help_event_tx: Option<Sender<HelpEvent>>,
/// R help port
help_port: Option<u16>,
/// Event channel for notifying the LSP. In principle, could be a Jupyter comm.
lsp_events_tx: Option<TokioUnboundedSender<Event>>,
dap: RMainDap,
/// Whether or not R itself is actively busy.
/// This does not represent the busy state of the kernel.
pub is_busy: bool,
}
/// Represents the currently active execution request from the frontend. It
/// resolves at the next invocation of the `ReadConsole()` frontend method.
struct ActiveReadConsoleRequest {
exec_count: u32,
request: ExecuteRequest,
orig: Option<Originator>,
response_tx: Sender<ExecuteResponse>,
}
/// Represents kernel metadata (available after the kernel has fully started)
#[derive(Debug, Clone)]
pub struct KernelInfo {
pub version: String,
pub banner: String,
pub input_prompt: Option<String>,
pub continuation_prompt: Option<String>,
}
/// This struct represents the data that we wish R would pass to
/// `ReadConsole()` methods. We need this information to determine what kind
/// of prompt we are dealing with.
#[derive(Clone)]
pub struct PromptInfo {
/// The prompt string to be presented to the user. This does not
/// necessarily correspond to `getOption("prompt")`, for instance in
/// case of a browser prompt or a readline prompt.
input_prompt: String,
/// The continuation prompt string when user supplies incomplete
/// inputs. This always corresponds to `getOption("continue"). We send
/// it to frontends along with `prompt` because some frontends such as
/// Positron do not send incomplete inputs to Ark and take charge of
/// continuation prompts themselves.
continuation_prompt: String,
/// Whether this is a `browser()` prompt. A browser prompt can be
/// incomplete but is never a user request.
browser: bool,
/// Whether the last input didn't fully parse and R is waiting for more input
incomplete: bool,
/// Whether this is a prompt from a fresh REPL iteration (browser or
/// top level) or a prompt from some user code, e.g. via `readline()`
input_request: bool,
}
pub enum ConsoleInput {
EOF,
Input(String),
}
pub enum ConsoleResult {
NewInput,
Interrupt,
Disconnected,
Error(amalthea::Error),
}
impl RMain {
/// Starts the main R thread and initializes the `R_MAIN` singleton.
/// Doesn't return. Must be called only once.
pub fn start(
r_args: Vec<String>,
startup_file: Option<String>,
kernel_mutex: Arc<Mutex<Kernel>>,
comm_manager_tx: Sender<CommManagerEvent>,
r_request_rx: Receiver<RRequest>,
stdin_request_tx: Sender<StdInRequest>,
stdin_reply_rx: Receiver<amalthea::Result<InputReply>>,
iopub_tx: Sender<IOPubMessage>,
kernel_init_tx: Bus<KernelInfo>,
dap: Arc<Mutex<Dap>>,
session_mode: SessionMode,
) {
unsafe { R_MAIN_THREAD_ID = Some(std::thread::current().id()) };
// Channels to send/receive tasks from auxiliary threads via `RTask`s
let (tasks_interrupt_tx, tasks_interrupt_rx) = unbounded::<RTask>();
let (tasks_idle_tx, tasks_idle_rx) = unbounded::<RTask>();
r_task::initialize(tasks_interrupt_tx.clone(), tasks_idle_tx.clone());
unsafe {
R_MAIN = Some(RMain::new(
kernel_mutex,
tasks_interrupt_rx,
tasks_idle_rx,
comm_manager_tx,
r_request_rx,
stdin_request_tx,
stdin_reply_rx,
iopub_tx,
kernel_init_tx,
dap,
session_mode,
))
};
// Let other threads know that `R_MAIN` is initialized. Deliberately
// panic if already set as `start()` must be called only once.
R_MAIN_INIT.set(()).expect("R can only be initialized once");
let mut r_args = r_args.clone();
// Record if the user has requested that we don't load the site/user level R profiles
let ignore_site_r_profile = startup::should_ignore_site_r_profile(&r_args);
let ignore_user_r_profile = startup::should_ignore_user_r_profile(&r_args);
// We always manually load site/user level R profiles rather than letting R do it
// to ensure that ark is fully set up before running code that could potentially call
// back into ark internals.
if !ignore_site_r_profile {
startup::push_ignore_site_r_profile(&mut r_args);
}
if !ignore_user_r_profile {
startup::push_ignore_user_r_profile(&mut r_args);
}
// Build the argument list from the command line arguments. The default
// list is `--interactive` unless altered with the `--` passthrough
// argument.
let mut args = cargs!["ark"];
for arg in r_args {
args.push(CString::new(arg).unwrap().into_raw());
}
// Get `R_HOME`, typically set by Positron / CI / kernel specification
let r_home = match std::env::var("R_HOME") {
Ok(home) => PathBuf::from(home),
Err(_) => {
// Get `R_HOME` from `PATH`, via R
let Ok(result) = std::process::Command::new("R").arg("RHOME").output() else {
panic!("Can't find R or `R_HOME`");
};
let r_home = String::from_utf8(result.stdout).unwrap();
let r_home = r_home.trim();
std::env::set_var("R_HOME", r_home);
PathBuf::from(r_home)
},
};
let libraries = RLibraries::from_r_home_path(&r_home);
libraries.initialize_pre_setup_r();
crate::sys::interface::setup_r(args);
libraries.initialize_post_setup_r();
unsafe {
// Register embedded routines
r_register_routines();
// Initialize harp (after routine registration)
harp::initialize();
// Optionally run a frontend specified R startup script (after harp init)
if let Some(file) = &startup_file {
harp::source(file)
.or_log_error(&format!("Failed to source startup file '{file}' due to"));
}
// Initialize support functions (after routine registration)
if let Err(err) = modules::initialize() {
log::error!("Can't load R modules: {err:?}");
}
// Register all hooks once all modules have been imported
let hook_result = RFunction::from(".ps.register_all_hooks").call();
if let Err(err) = hook_result {
log::error!("Error registering some hooks: {err:?}");
}
// Populate srcrefs for namespaces already loaded in the session.
// Namespaces of future loaded packages will be populated on load.
if do_resource_namespaces() {
if let Err(err) = resource_loaded_namespaces() {
log::error!("Can't populate srcrefs for loaded packages: {err:?}");
}
}
// Set up the global error handler (after support function initialization)
errors::initialize();
// Now that R has started (emitting any startup messages), and now that we have set
// up all hooks and handlers, officially finish the R initialization process to
// unblock the kernel-info request and also allow the LSP to start.
RMain::with_mut(|main| {
log::info!("R has started and ark handlers have been registered, completing initialization.");
main.complete_initialization();
});
}
// Now that R has started and libr and ark have fully initialized, run site and user
// level R profiles, in that order
if !ignore_site_r_profile {
startup::source_site_r_profile(&r_home);
}
if !ignore_user_r_profile {
startup::source_user_r_profile();
}
// Does not return!
crate::sys::interface::run_r();
}
/// Completes the kernel's initialization.
/// Unlike `RMain::start()`, this has access to `R_MAIN`'s state, such as
/// the kernel-info banner.
/// SAFETY: Can only be called from the R thread, and only once.
pub unsafe fn complete_initialization(&mut self) {
let version = unsafe {
let version = Rf_findVarInFrame(R_BaseNamespace, r_symbol!("R.version.string"));
RObject::new(version).to::<String>().unwrap()
};
// Initial input and continuation prompts
let input_prompt: String = harp::get_option("prompt").try_into().unwrap();
let continuation_prompt: String = harp::get_option("continue").try_into().unwrap();
let kernel_info = KernelInfo {
version: version.clone(),
banner: self.banner_output.clone(),
input_prompt: Some(input_prompt),
continuation_prompt: Some(continuation_prompt),
};
log::info!("Sending kernel info: {version}");
self.kernel_init_tx.broadcast(kernel_info);
// Thread-safe initialisation flag for R
R_INIT.set(()).expect("`R_INIT` can only be set once");
}
pub fn new(
kernel: Arc<Mutex<Kernel>>,
tasks_interrupt_rx: Receiver<RTask>,
tasks_idle_rx: Receiver<RTask>,
comm_manager_tx: Sender<CommManagerEvent>,
r_request_rx: Receiver<RRequest>,
stdin_request_tx: Sender<StdInRequest>,
stdin_reply_rx: Receiver<amalthea::Result<InputReply>>,
iopub_tx: Sender<IOPubMessage>,
kernel_init_tx: Bus<KernelInfo>,
dap: Arc<Mutex<Dap>>,
session_mode: SessionMode,
) -> Self {
Self {
r_request_rx,
comm_manager_tx,
stdin_request_tx,
stdin_reply_rx,
iopub_tx,
kernel_init_tx,
active_request: None,
execution_count: 0,
autoprint_output: String::new(),
banner_output: String::new(),
kernel,
error_occurred: false,
error_message: String::new(),
error_traceback: Vec::new(),
help_event_tx: None,
help_port: None,
lsp_events_tx: None,
dap: RMainDap::new(dap),
is_busy: false,
tasks_interrupt_rx,
tasks_idle_rx,
pending_futures: HashMap::new(),
session_mode,
}
}
/// Wait for complete R initialization
///
/// Wait for R being ready to evaluate R code. Resolves as the same time as
/// the `Bus<KernelInfo>` init channel does.
///
/// Thread-safe.
pub fn wait_r_initialized() {
R_INIT.wait();
}
/// Has R completed initialization
///
/// I.e. is it ready to evaluate R code.
///
/// Thread-safe.
pub fn is_r_initialized() -> bool {
R_INIT.get().is_some()
}
/// Has the `RMain` singleton completed initialization.
///
/// This can return true when R might still not have finished starting up.
/// See `wait_r_initialized()`.
///
/// Thread-safe. But note you can only get access to the singleton on the R
/// thread.
pub fn is_initialized() -> bool {
R_MAIN_INIT.get().is_some()
}
/// Access a reference to the singleton instance of this struct
///
/// SAFETY: Accesses must occur after `start_r()` initializes it, and must
/// occur on the main R thread.
pub fn get() -> &'static Self {
RMain::get_mut()
}
/// Access a mutable reference to the singleton instance of this struct
///
/// SAFETY: Accesses must occur after `start_r()` initializes it, and must
/// occur on the main R thread.
pub fn get_mut() -> &'static mut Self {
if !RMain::on_main_thread() {
let thread = std::thread::current();
let name = thread.name().unwrap_or("<unnamed>");
let message =
format!("Must access `R_MAIN` on the main R thread, not thread '{name}'.");
#[cfg(debug_assertions)]
panic!("{message}");
#[cfg(not(debug_assertions))]
log::error!("{message}");
}
unsafe {
R_MAIN
.as_mut()
.expect("`R_MAIN` can't be used before it is initialized!")
}
}
pub fn with<F, T>(f: F) -> T
where
F: FnOnce(&RMain) -> T,
{
let main = Self::get();
f(main)
}
pub fn with_mut<F, T>(f: F) -> T
where
F: FnOnce(&mut RMain) -> T,
{
let main = Self::get_mut();
f(main)
}
pub fn on_main_thread() -> bool {
let thread = std::thread::current();
thread.id() == unsafe { R_MAIN_THREAD_ID.unwrap() }
}
/// Provides read-only access to `iopub_tx`
pub fn get_iopub_tx(&self) -> &Sender<IOPubMessage> {
&self.iopub_tx
}
fn init_execute_request(&mut self, req: &ExecuteRequest) -> (ConsoleInput, u32) {
// Reset the autoprint buffer
self.autoprint_output = String::new();
// Increment counter if we are storing this execution in history
if req.store_history {
self.execution_count = self.execution_count + 1;
}
// If the code is not to be executed silently, re-broadcast the
// execution to all frontends
if !req.silent {
if let Err(err) = self.iopub_tx.send(IOPubMessage::ExecuteInput(ExecuteInput {
code: req.code.clone(),
execution_count: self.execution_count,
})) {
log::warn!(
"Could not broadcast execution input {} to all frontends: {}",
self.execution_count,
err
);
}
}
// Return the code to the R console to be evaluated and the corresponding exec count
(ConsoleInput::Input(req.code.clone()), self.execution_count)
}
/// Invoked by R to read console input from the user.
///
/// * `prompt` - The prompt shown to the user
/// * `buf` - Pointer to buffer to receive the user's input (type `CONSOLE_BUFFER_CHAR`)
/// * `buflen` - Size of the buffer to receiver user's input
/// * `hist` - Whether to add the input to the history (1) or not (0)
///
/// Returns a tuple. First value is to be passed on to `ReadConsole()` and
/// indicates whether new input is available. Second value indicates whether
/// we need to call `Rf_onintr()` to process an interrupt.
fn read_console(
&mut self,
prompt: *const c_char,
buf: *mut c_uchar,
buflen: c_int,
_hist: c_int,
) -> ConsoleResult {
let info = Self::prompt_info(prompt);
log::trace!("R prompt: {}", info.input_prompt);
// Upon entering read-console, finalize any debug call text that we were capturing.
// At this point, the user can either advance the debugger, causing us to capture
// a new expression, or execute arbitrary code, where we will reuse a finalized
// debug call text to maintain the debug state.
self.dap.finalize_call_text();
// TODO: Can we remove this below code?
// If the prompt begins with "Save workspace", respond with (n)
//
// NOTE: Should be able to overwrite the `Cleanup` frontend method.
// This would also help with detecting normal exits versus crashes.
if info.input_prompt.starts_with("Save workspace") {
Self::on_console_input(buf, buflen, String::from("n"));
return ConsoleResult::NewInput;
}
if info.input_request {
if let Some(req) = &self.active_request {
// Send request to frontend. We'll wait for an `input_reply`
// from the frontend in the event loop below. The active request
// remains active.
self.request_input(req.orig.clone(), info.input_prompt.to_string());
} else {
// Invalid input request, propagate error to R
return self.handle_invalid_input_request(buf, buflen);
}
} else if let Some(req) = std::mem::take(&mut self.active_request) {
// We got a prompt request marking the end of the previous
// execution. We took and cleared the active request as we're about
// to complete it and send a reply to unblock the active Shell
// request.
// FIXME: Race condition between the comm and shell socket threads.
//
// Send info for the next prompt to frontend. This handles
// custom prompts set by users, e.g. `options(prompt = ,
// continue = )`, as well as debugging prompts, e.g. after a
// call to `browser()`.
let event = UiFrontendEvent::PromptState(PromptStateParams {
input_prompt: info.input_prompt.clone(),
continuation_prompt: info.continuation_prompt.clone(),
});
{
let kernel = self.kernel.lock().unwrap();
kernel.send_ui_event(event);
}
// Let frontend know the last request is complete. This turns us
// back to Idle.
self.reply_execute_request(req, &info);
}
// In the future we'll also send browser information, see
// https://github.com/posit-dev/positron/issues/3001. Currently this is
// a push model where we send the console inputs at each round. In the
// future, a pull model would be better, this way the LSP can manage a
// cache of inputs and we don't need to retraverse the environments as
// often. We'd still push a `DidChangeConsoleInputs` notification from
// here, but only containing high-level information such as `search()`
// contents and `ls(rho)`.
if !info.browser && !info.incomplete && !info.input_request {
self.refresh_lsp();
}
// Signal prompt
EVENTS.console_prompt.emit(());
if info.browser {
match self.dap.stack_info() {
Ok(stack) => {
self.dap.start_debug(stack);
},
Err(err) => log::error!("ReadConsole: Can't get stack info: {err}"),
};
} else {
if self.dap.is_debugging() {
// Terminate debugging session
self.dap.stop_debug();
}
}
loop {
// If an interrupt was signaled and we are in a user
// request prompt, e.g. `readline()`, we need to propagate
// the interrupt to the R stack. This needs to happen before
// `process_events()`, particularly on Windows, because it
// calls `R_ProcessEvents()`, which checks and resets
// `UserBreak`, but won't actually fire the interrupt b/c
// we have them disabled, so it would end up swallowing the
// user interrupt request.
if info.input_request && interrupts_pending() {
return ConsoleResult::Interrupt;
}
// Otherwise we are at top level and we can assume the
// interrupt was 'handled' on the frontend side and so
// reset the flag
set_interrupts_pending(false);
// FIXME: Race between interrupt and new code request. To fix
// this, we could manage the Shell and Control sockets on the
// common message event thread. The Control messages would need
// to be handled in a blocking way to ensure subscribers are
// notified before the next incoming message is processed.
// First handle execute requests outside of `select!` to ensure they
// have priority. `select!` chooses at random.
if let Ok(req) = self.r_request_rx.try_recv() {
if let Some(input) = self.handle_execute_request(req, &info, buf, buflen) {
return input;
}
}
select! {
// Wait for an execution request from the frontend.
recv(self.r_request_rx) -> req => {
let Ok(req) = req else {
// The channel is disconnected and empty
return ConsoleResult::Disconnected;
};
if let Some(input) = self.handle_execute_request(req, &info, buf, buflen) {
return input;
}
}
// We've got a response for readline
recv(self.stdin_reply_rx) -> reply => {
return self.handle_input_reply(reply.unwrap(), buf, buflen);
}
// A task woke us up, start next loop tick to yield to it
recv(self.tasks_interrupt_rx) -> task => {
self.handle_task_interrupt(task.unwrap());
}
recv(self.tasks_idle_rx) -> task => {
self.handle_task(task.unwrap());
}
// Wait with a timeout. Necessary because we need to
// pump the event loop while waiting for console input.
//
// Alternatively, we could try to figure out the file
// descriptors that R has open and select() on those for
// available data?
default(Duration::from_millis(200)) => {
unsafe { Self::process_events() };
}
}
}
}
// We prefer to panic if there is an error while trying to determine the
// prompt type because any confusion here is prone to put the frontend in a
// bad state (e.g. causing freezes)
fn prompt_info(prompt_c: *const c_char) -> PromptInfo {
let n_frame = harp::session::r_n_frame().unwrap();
log::trace!("prompt_info(): n_frame = '{n_frame}'");
let prompt_slice = unsafe { CStr::from_ptr(prompt_c) };
let prompt = prompt_slice.to_string_lossy().into_owned();
// Detect browser prompts by inspecting the `RDEBUG` flag of each
// frame on the stack. If ANY of the frames are marked with `RDEBUG`,
// then we assume we are in a debug state. We can't just check the
// last frame, as sometimes frames are pushed onto the stack by lazy
// evaluation of arguments or `tryCatch()` that aren't debug frames,
// but we don't want to exit the debugger when we hit these, as R is
// still inside a browser state. Should also handle cases like `debug(readline)`
// followed by `n`.
// https://github.com/posit-dev/positron/issues/2310
let frames = harp::session::r_sys_frames().unwrap();
let browser = r_pairlist_any(frames.sexp, |frame| {
harp::session::r_env_is_browsed(frame).unwrap()
});
// If there are frames on the stack and we're not in a browser prompt,
// this means some user code is requesting input, e.g. via `readline()`
let user_request = !browser && n_frame > 0;
// The request is incomplete if we see the continue prompt, except if
// we're in a user request, e.g. `readline("+ ")`
let continuation_prompt: String = harp::get_option("continue").try_into().unwrap();
let incomplete = !user_request && prompt == continuation_prompt;
if incomplete {
log::trace!("Got R prompt '{prompt}', marking request incomplete");
} else if user_request {
log::trace!("Got R prompt '{prompt}', asking user for input");
}
return PromptInfo {
input_prompt: prompt,
continuation_prompt,
browser,
incomplete,
input_request: user_request,
};
}
fn handle_execute_request(
&mut self,
req: RRequest,
info: &PromptInfo,
buf: *mut c_uchar,
buflen: c_int,
) -> Option<ConsoleResult> {
if info.input_request {
panic!("Unexpected `execute_request` while waiting for `input_reply`.");
}
let input = match req {
RRequest::ExecuteCode(exec_req, orig, response_tx) => {
// Extract input from request
let (input, exec_count) = { self.init_execute_request(&exec_req) };
// Save `ExecuteCode` request so we can respond to it at next prompt
self.active_request = Some(ActiveReadConsoleRequest {
exec_count,
request: exec_req,
orig,
response_tx,
});
input
},
RRequest::Shutdown(_) => ConsoleInput::EOF,
RRequest::DebugCommand(cmd) => {
// Just ignore command in case we left the debugging state already
if !self.dap.is_debugging() {
return None;
}
// Translate requests from the debugger frontend to actual inputs for
// the debug interpreter
ConsoleInput::Input(debug_request_command(cmd))
},
};
// Clear error flag
self.error_occurred = false;
match input {
ConsoleInput::Input(mut code) => {
// Handle commands for the debug interpreter
if self.dap.is_debugging() {
let continue_cmds = vec!["n", "f", "c", "cont"];
if continue_cmds.contains(&&code[..]) {
self.dap.send_dap(DapBackendEvent::Continued);
}
}
// In notebooks, wrap in braces so that only the last complete
// expression is auto-printed
if let SessionMode::Notebook = self.session_mode {
code = format!("{{ {code} }}");
}
Self::on_console_input(buf, buflen, code);
Some(ConsoleResult::NewInput)
},
ConsoleInput::EOF => Some(ConsoleResult::Disconnected),
}
}
/// Handle an `input_request` received outside of an `execute_request` context
///
/// We believe it is always invalid to receive an `input_request` that isn't
/// nested within an `execute_request`. However, this can happen at R
/// startup when sourcing an `.Rprofile` that calls `readline()` or `menu()`.
/// Both of these are explicitly forbidden by `?Startup` in R as
/// "interaction with the user during startup", so when we detect this
/// invalid `input_request` case we throw an R error and assume that it
/// came from a `readline()` or `menu()` call during startup.
///
/// We make a single exception for preexisting renv `activate.R` scripts,
/// which used to call `readline()` from within `.Rprofile`. In those cases,
/// we return `"n"` which allows older versions of renv to at least startup.
/// https://github.com/posit-dev/positron/issues/2070
/// https://github.com/rstudio/renv/blob/5d0d52c395e569f7f24df4288d949cef95efca4e/inst/resources/activate.R#L85-L87
fn handle_invalid_input_request(&self, buf: *mut c_uchar, buflen: c_int) -> ConsoleResult {
if Self::in_renv_autoloader() {
log::info!("Detected `readline()` call in renv autoloader. Returning `'n'`.");
Self::on_console_input(buf, buflen, String::from("n"));
return ConsoleResult::NewInput;
}
log::info!("Detected invalid `input_request` outside an `execute_request`. Preparing to throw an R error.");
let message = vec![
"Can't request input from the user at this time.",
"Are you calling `readline()` or `menu()` from an `.Rprofile` or `.Rprofile.site` file? If so, that is the issue and you should remove that code."
].join("\n");
return ConsoleResult::Error(Error::InvalidInputRequest(message));
}
fn in_renv_autoloader() -> bool {
harp::get_option("renv.autoloader.running")
.try_into()
.unwrap_or(false)
}
fn handle_input_reply(
&self,
reply: amalthea::Result<InputReply>,
buf: *mut c_uchar,
buflen: c_int,
) -> ConsoleResult {
match reply {
Ok(input) => {
let input = convert_line_endings(&input.value, LineEnding::Posix);
Self::on_console_input(buf, buflen, input);
ConsoleResult::NewInput
},
Err(err) => ConsoleResult::Error(err),
}
}
/// Handle a task at interrupt time.
///
/// Wrapper around `handle_task()` that does some extra logging to record
/// how long a task waited before being picked up by the R or ReadConsole
/// event loop.
///
/// Since tasks running during interrupt checks block the R thread while
/// they are running, they should return very quickly. The log message helps
/// monitor excessively long-running tasks.
fn handle_task_interrupt(&mut self, mut task: RTask) {
if let Some(start_info) = task.start_info_mut() {
// Log excessive waiting before starting task
if start_info.start_time.elapsed() > std::time::Duration::from_millis(50) {
start_info.span.in_scope(|| {
tracing::info!(
"{} milliseconds wait before running task.",
start_info.start_time.elapsed().as_millis()
)
});
}
// Reset timer, next time we'll log how long the task took
start_info.start_time = std::time::Instant::now();
}
let finished_task_info = self.handle_task(task);
// We only log long task durations in the interrupt case since we expect
// idle tasks to take longer. Use the tracing profiler to monitor the
// duration of idle tasks.
if let Some(info) = finished_task_info {
if info.elapsed() > std::time::Duration::from_millis(50) {
info.span.in_scope(|| {
log::info!("task took {} milliseconds.", info.elapsed().as_millis());
})
}
}
}
/// Returns start information when the task has been completed
fn handle_task(&mut self, task: RTask) -> Option<RTaskStartInfo> {
// Background tasks can't take any user input, so we set R_Interactive
// to 0 to prevent `readline()` from blocking the task.
let _interactive = harp::raii::RLocalInteractive::new(false);
match task {