Skip to content

Commit

Permalink
Asyncify terminal agent launch.
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimcn committed May 29, 2020
1 parent 2684ed9 commit e2b0853
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 59 deletions.
52 changes: 31 additions & 21 deletions adapter2/src/dap_session.rs
Expand Up @@ -23,11 +23,11 @@ impl<T> DAPChannel for T where
{
}

#[derive(Clone)]
pub struct DAPSession {
requests_sender: Weak<broadcast::Sender<Request>>,
events_sender: Weak<broadcast::Sender<Event>>,
out_sender: mpsc::Sender<(ProtocolMessage, Option<oneshot::Sender<ResponseBody>>)>,
message_seq: u32,
}

impl DAPSession {
Expand All @@ -37,12 +37,12 @@ impl DAPSession {
let events_sender = Arc::new(broadcast::channel::<Event>(10).0);
let (out_sender, mut out_receiver) = mpsc::channel(100);
let mut pending_requests: HashMap<u32, oneshot::Sender<ResponseBody>> = HashMap::new();
let mut message_seq = 0;

let client = DAPSession {
requests_sender: Arc::downgrade(&requests_sender),
events_sender: Arc::downgrade(&events_sender),
out_sender: out_sender,
message_seq: 0,
};

let worker = async move {
Expand Down Expand Up @@ -72,14 +72,23 @@ impl DAPSession {
}
}
},
Some((out_message, response_sender)) = out_receiver.next() => {
match &out_message {
ProtocolMessage::Request(request)=> {
pending_requests.insert(request.seq, response_sender.unwrap());
}
_ => {}
Some((message, response_sender)) = out_receiver.next() => {
let mut message = message;
match &mut message {
ProtocolMessage::Request(request) => {
message_seq += 1;
request.seq = message_seq;
if let Some(response_sender) = response_sender {
pending_requests.insert(request.seq, response_sender);
}
},
ProtocolMessage::Event(event) => {
message_seq += 1;
event.seq = message_seq;
},
ProtocolMessage::Response(_) => {}
}
log_send_err(channel.send(out_message).await);
log_send_err(channel.send(message).await);
}
}
}
Expand All @@ -102,26 +111,28 @@ impl DAPSession {
}
}

pub async fn send_request(&mut self, request_args: RequestArguments) -> Result<ResponseBody, Error> {
self.message_seq += 1;
pub fn send_request(
&mut self,
request_args: RequestArguments,
) -> impl Future<Output = Result<ResponseBody, Error>> {
let (sender, receiver) = oneshot::channel();
let message = ProtocolMessage::Request(Request {
seq: self.message_seq,
command: Command::Known(request_args),
seq: 0,
});
self.out_sender.send((message, Some(sender))).await?;
let resp = receiver.await?;
Ok(resp)
let send_result = self.out_sender.try_send((message, Some(sender)));
async move {
send_result?;
Ok(receiver.await?)
}
}

pub fn send_request_only(&mut self, request_args: RequestArguments) -> Result<(), Error> {
self.message_seq += 1;
let (sender, receiver) = oneshot::channel();
let message = ProtocolMessage::Request(Request {
seq: self.message_seq,
command: Command::Known(request_args),
seq: 0,
});
self.out_sender.try_send((message, Some(sender)))?;
self.out_sender.try_send((message, None))?;
Ok(())
}

Expand All @@ -132,10 +143,9 @@ impl DAPSession {
}

pub fn send_event(&mut self, event_body: EventBody) -> Result<(), Error> {
self.message_seq += 1;
let message = ProtocolMessage::Event(Event {
seq: self.message_seq,
body: event_body,
seq: 0,
});
self.out_sender.try_send((message, None))?;
Ok(())
Expand Down
6 changes: 4 additions & 2 deletions adapter2/src/debug_protocol.rs
Expand Up @@ -9,8 +9,8 @@ pub use raw_debug_protocol::{
DataBreakpointAccessType, DataBreakpointInfoArguments, DataBreakpointInfoResponseBody, DisconnectArguments,
EvaluateArguments, EvaluateResponseBody, ExceptionBreakpointsFilter, ExitedEventBody, GotoArguments, GotoTarget,
GotoTargetsArguments, GotoTargetsResponseBody, InitializeRequestArguments, Module, ModuleEventBody, NextArguments,
OutputEventBody, PauseArguments, RestartFrameArguments, ReverseContinueArguments,
RunInTerminalRequestArguments, Scope, ScopesArguments, ScopesResponseBody, SetBreakpointsArguments,
OutputEventBody, PauseArguments, RestartFrameArguments, ReverseContinueArguments, RunInTerminalRequestArguments,
RunInTerminalResponseBody, Scope, ScopesArguments, ScopesResponseBody, SetBreakpointsArguments,
SetBreakpointsResponseBody, SetDataBreakpointsArguments, SetDataBreakpointsResponseBody,
SetExceptionBreakpointsArguments, SetFunctionBreakpointsArguments, SetVariableArguments, SetVariableResponseBody,
Source, SourceArguments, SourceBreakpoint, SourceResponseBody, StackFrame, StackTraceArguments,
Expand Down Expand Up @@ -138,6 +138,8 @@ pub enum ResponseBody {
disconnect,
// Custom
adapterSettings,
// Reverse
runInTerminal(RunInTerminalResponseBody),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
51 changes: 22 additions & 29 deletions adapter2/src/debug_session.rs
Expand Up @@ -401,8 +401,8 @@ impl DebugSession {
self.dap_session.borrow_mut().send_response(response)
}

fn send_request(&self, args: RequestArguments) -> Result<(), Error> {
self.dap_session.borrow_mut().send_request_only(args)
fn send_request(&self, args: RequestArguments) -> impl Future<Output = Result<ResponseBody, Error>> {
self.dap_session.borrow_mut().send_request(args)
}

fn send_event(&self, event_body: EventBody) -> Result<(), Error> {
Expand Down Expand Up @@ -979,9 +979,11 @@ impl DebugSession {
self.disassembly = Initialized(disassembly::AddressSpace::new(&self.target));
self.send_event(EventBody::initialized);

let term_fut = self.create_terminal(&args);
let config_done_fut = self.wait_for_configuration_done();
let self_ref = self.self_ref.clone();
let fut = async move {
scoped_borrow_mut(&self_ref, |s| s.wait_for_configuration_done()).await?;
tokio::join!(term_fut, config_done_fut);
self_ref.borrow_mut().complete_launch(args)
};
Err(AsyncResponse(Box::new(fut)).into())
Expand Down Expand Up @@ -1077,13 +1079,12 @@ impl DebugSession {
}

if args.no_debug.unwrap_or(false) {
// No-debug launch: start debuggee directly and terminate debug session.
// No-debug launch: start debuggee directly and terminate the debug session.
launch_info.set_executable_file(&self.target.executable(), true);
let status = match &self.debuggee_terminal {
Some(t) => t.attach(|| self.target.platform().launch(&launch_info)),
None => self.target.platform().launch(&launch_info),
};
// Terminate debug session
self.send_event(EventBody::terminated(TerminatedEventBody {
restart: None,
}));
Expand Down Expand Up @@ -1255,36 +1256,28 @@ impl DebugSession {
program.into()
}

fn configure_stdio(&mut self, args: &LaunchRequestArguments, launch_info: &mut SBLaunchInfo) -> Result<(), Error> {
fn create_terminal(&mut self, args: &LaunchRequestArguments) -> impl Future {
let terminal_kind = match args.terminal.unwrap_or(TerminalKind::Integrated) {
TerminalKind::Console => None,
TerminalKind::External => Some("external"),
TerminalKind::Integrated => Some("integrated"),
TerminalKind::Console => return future::ready(()).left_future(),
TerminalKind::External => "external",
TerminalKind::Integrated => "integrated",
};

if let Some(terminal_kind) = terminal_kind {
let title = args.common.name.clone().unwrap_or_else(|| "Debug".into());
let result = Terminal::create(|agent_args| {
let req_args = RunInTerminalRequestArguments {
args: agent_args,
cwd: String::new(),
env: None,
kind: Some(terminal_kind.to_owned()),
title: Some(title),
};
self.send_request(RequestArguments::runInTerminal(req_args));
Ok(())
});

match result {
Ok(terminal) => self.debuggee_terminal = Some(terminal),
Err(err) => self.console_error(format!(
let title = args.common.name.as_deref().unwrap_or("Debug").to_string();
let fut = Terminal::create(terminal_kind, title, self.dap_session.borrow().clone());
let self_ref = self.self_ref.clone();
async move {
let result = fut.await;
scoped_borrow_mut(&self_ref, |s| match result {
Ok(terminal) => s.debuggee_terminal = Some(terminal),
Err(err) => s.console_error(format!(
"Failed to redirect stdio to a terminal. ({})\nDebuggee output will appear here.",
err
)),
}
}
})
}.right_future()
}

fn configure_stdio(&mut self, args: &LaunchRequestArguments, launch_info: &mut SBLaunchInfo) -> Result<(), Error> {
let mut stdio = match args.stdio {
None => vec![],
Some(Either::First(ref stdio)) => vec![Some(stdio.clone())], // A single string
Expand Down
37 changes: 30 additions & 7 deletions adapter2/src/terminal.rs
@@ -1,28 +1,51 @@
use crate::error::Error;
use log::debug;
use std::io::{self, BufRead};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::time::{Duration, Instant};

use crate::dap_session::DAPSession;
use crate::debug_protocol::*;
use crate::error::Error;

pub struct Terminal {
connection: TcpStream,
data: String,
}

impl Terminal {
pub fn create<F>(run_in_terminal: F) -> Result<Self, Error>
where
F: FnOnce(Vec<String>) -> Result<(), Error>,
{
pub async fn create(
terminal_kind: impl Into<String>,
title: impl Into<String>,
mut dap_session: DAPSession,
) -> Result<Terminal, Error> {
let terminal_kind = terminal_kind.into();
let title = title.into();

let req_args = RunInTerminalRequestArguments {
args: vec!["<<>>".into()],
cwd: String::new(),
env: None,
kind: Some(terminal_kind.clone()),
title: Some(title.clone()),
};
dap_session.send_request(RequestArguments::runInTerminal(req_args)).await?;

let mut listener = TcpListener::bind("127.0.0.1:0")?;
let addr = listener.local_addr()?;

// Run codelldb in a terminal agent mode, which sends back the tty device name (Unix)
// or its own process id (Windows), then waits till the socket gets closed from our end.
let executable = std::env::current_exe()?.to_str().unwrap().into();
let cmd = vec![executable, "terminal-agent".into(), format!("--port={}", addr.port())];
run_in_terminal(cmd)?;
let args = vec![executable, "terminal-agent".into(), format!("--port={}", addr.port())];
let req_args = RunInTerminalRequestArguments {
args: args,
cwd: String::new(),
env: None,
kind: Some(terminal_kind),
title: Some(title),
};
dap_session.send_request(RequestArguments::runInTerminal(req_args)).await?;

let stream = accept_with_timeout(&mut listener, Duration::from_millis(5000))?;
let stream2 = stream.try_clone()?;
Expand Down

0 comments on commit e2b0853

Please sign in to comment.