Skip to content
This repository has been archived by the owner on Dec 29, 2022. It is now read-only.

Commit

Permalink
Auto merge of #1713 - rust-lang:tokio-02, r=Xanewok
Browse files Browse the repository at this point in the history
Use Tokio 0.2

Helps with #1695 and #1693 (still needs a bump to Tokio 1.0)
  • Loading branch information
bors committed Jan 20, 2021
2 parents 36f4a9e + 36a75ae commit 581415c
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 538 deletions.
637 changes: 220 additions & 417 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 6 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ cargo = { git = "https://github.com/rust-lang/cargo", rev = "329895f5b52a358e5d9
cargo_metadata = "0.8"
clippy_lints = { git = "https://github.com/rust-lang/rust-clippy", rev = "7ea7cd165ad6705603852771bf82cc2fd6560db5", optional = true }
env_logger = "0.7"
futures = { version = "0.1", optional = true }
home = "0.5.1"
itertools = "0.8"
jsonrpc-core = "14"
jsonrpc-core = "17"
lsp-types = { version = "0.60", features = ["proposed"] }
lazy_static = "1"
log = "0.4"
Expand All @@ -51,7 +50,6 @@ serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
serde_ignored = "0.1"
tokio = { version = "0.1", optional = true }
url = "2"
walkdir = "2"
regex = "1"
Expand All @@ -68,16 +66,15 @@ rustc-workspace-hack = "1.0.0"
[dev-dependencies]
difference = "2"
tempfile = "3"
lsp-codec = "0.1.2"
tokio = "0.1"
futures = "0.1"
tokio-process = "0.2"
tokio-timer = "0.2"
lsp-codec = "0.2"
tokio = { version = "0.2", default-features = false, features = ["rt-core", "time", "io-util", "process", "rt-util"] }
tokio-util = { version = "0.3", default-features = false, features = ["codec"] }
futures = "0.3"

[build-dependencies]
rustc_tools_util = "0.2"

[features]
clippy = ["clippy_lints", "rls-rustc/clippy"]
ipc = ["tokio", "futures", "rls-rustc/ipc", "rls-ipc/server"]
ipc = ["rls-rustc/ipc", "rls-ipc/server"]
default = ["ipc"]
9 changes: 4 additions & 5 deletions rls-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ repository = "https://github.com/rust-lang/rls"
categories = ["development-tools"]

[dependencies]
jsonrpc-core = "14"
jsonrpc-core-client = "14"
jsonrpc-derive = "14"
# Pin 14.0.3 to use single parity-tokio-ipc version (0.2)
jsonrpc-ipc-server = { version = "=14.0.3", optional = true }
jsonrpc-core = "17"
jsonrpc-core-client = "17"
jsonrpc-derive = "17"
jsonrpc-ipc-server = { version = "17", optional = true }
rls-data = "0.19"
serde = { version = "1.0", features = ["derive"] }

Expand Down
4 changes: 2 additions & 2 deletions rls-rustc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ env_logger = "0.7"
log = "0.4"
rand = "0.7"
clippy_lints = { git = "https://github.com/rust-lang/rust-clippy", rev = "d236b30a1d638340aad8345fa2946cfe9543dcf0", optional = true }
tokio = { version = "0.1", optional = true }
futures = { version = "0.1", optional = true }
tokio = { version = "0.2", optional = true }
futures = { version = "0.3", optional = true }
serde = { version = "1", features = ["derive"], optional = true }
rls-data = { version = "0.19", optional = true }
rls-ipc = { path = "../rls-ipc", optional = true }
Expand Down
13 changes: 5 additions & 8 deletions rls-rustc/src/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::io;
use std::path::{Path, PathBuf};

use futures::Future;

use rls_ipc::client::{Client as JointClient, RpcChannel, RpcError};
use rls_ipc::rpc::callbacks::Client as CallbacksClient;
use rls_ipc::rpc::file_loader::Client as FileLoaderClient;
Expand All @@ -30,13 +29,11 @@ impl IpcFileLoader {

impl rustc_span::source_map::FileLoader for IpcFileLoader {
fn file_exists(&self, path: &Path) -> bool {
self.0.file_exists(path.to_owned()).wait().unwrap()
futures::executor::block_on(self.0.file_exists(path.to_owned())).unwrap()
}

fn read_file(&self, path: &Path) -> io::Result<String> {
self.0
.read_file(path.to_owned())
.wait()
futures::executor::block_on(self.0.read_file(path.to_owned()))
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))
}
}
Expand All @@ -48,14 +45,14 @@ impl IpcCallbacks {
pub fn complete_analysis(
&self,
analysis: rls_data::Analysis,
) -> impl Future<Item = (), Error = RpcError> {
) -> impl Future<Output = Result<(), RpcError>> {
self.0.complete_analysis(analysis)
}

pub fn input_files(
&self,
input_files: HashMap<PathBuf, HashSet<rls_ipc::rpc::Crate>>,
) -> impl Future<Item = (), Error = RpcError> {
) -> impl Future<Output = Result<(), RpcError>> {
self.0.input_files(input_files)
}
}
Expand Down
18 changes: 7 additions & 11 deletions rls-rustc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ pub fn run() -> Result<(), ()> {
#[cfg(feature = "ipc")]
let (mut shim_calls, file_loader) = match std::env::var("RLS_IPC_ENDPOINT").ok() {
Some(endpoint) => {
#[allow(deprecated)] // Windows doesn't work with lazily-bound reactors
let reactor = rt.reactor().clone();
let connection =
ipc::connect(endpoint, &reactor).expect("Couldn't connect to IPC endpoint");
let client: ipc::Client =
rt.block_on(connection).expect("Couldn't connect to IPC endpoint");
let client: ipc::Client = rt
.block_on(async { ipc::connect(endpoint).await })
.expect("Couldn't connect to IPC endpoint");
let (file_loader, callbacks) = client.split();

(
Expand Down Expand Up @@ -113,7 +110,6 @@ impl Callbacks for ShimCalls {
) -> Compilation {
use rustc_session::config::Input;

use futures::future::Future;
use rls_ipc::rpc::{Crate, Edition};
use std::collections::{HashMap, HashSet};

Expand Down Expand Up @@ -149,7 +145,7 @@ impl Callbacks for ShimCalls {
input_files.entry(file).or_default().insert(krate.clone());
}

if let Err(e) = callbacks.input_files(input_files).wait() {
if let Err(e) = futures::executor::block_on(callbacks.input_files(input_files)) {
log::error!("Can't send input files as part of a compilation callback: {:?}", e);
}

Expand All @@ -162,8 +158,6 @@ impl Callbacks for ShimCalls {
compiler: &interface::Compiler,
queries: &'tcx Queries<'tcx>,
) -> Compilation {
use futures::future::Future;

let callbacks = match self.callbacks.as_ref() {
Some(callbacks) => callbacks,
None => return Compilation::Continue,
Expand Down Expand Up @@ -196,7 +190,9 @@ impl Callbacks for ShimCalls {
CallbackHandler {
callback: &mut |a| {
let analysis = unsafe { ::std::mem::transmute(a.clone()) };
if let Err(e) = callbacks.complete_analysis(analysis).wait() {
if let Err(e) =
futures::executor::block_on(callbacks.complete_analysis(analysis))
{
log::error!(
"Can't send analysis as part of a compilation callback: {:?}",
e
Expand Down
6 changes: 0 additions & 6 deletions rls/src/build/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,7 @@ pub fn start_with_handler(io: IoHandler) -> Result<Server, ()> {
let endpoint_path = endpoint_path.clone();
move || {
log::trace!("Attempting to spin up IPC server at {}", endpoint_path);
let runtime = tokio::runtime::Builder::new().core_threads(1).build().unwrap();
#[allow(deprecated)] // Windows won't work with lazily bound reactor
let (reactor, executor) = (runtime.reactor(), runtime.executor());

let server = ServerBuilder::new(io)
.event_loop_executor(executor)
.event_loop_reactor(reactor.clone())
.start(&endpoint_path)
.map_err(|_| log::warn!("Couldn't open socket"))
.unwrap();
Expand Down
21 changes: 13 additions & 8 deletions tests/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::fs;
use std::path::Path;
use std::time::{Duration, Instant};
use std::time::Duration;

use futures::future::Future;
use futures::future;
use lsp_types::{notification::*, request::*, *};
use serde::de::Deserialize;
use serde_json::json;
Expand Down Expand Up @@ -245,7 +245,8 @@ fn client_changing_workspace_lib_retains_diagnostics() {

let lib = rls.future_diagnostics("library/src/lib.rs");
let bin = rls.future_diagnostics("binary/src/main.rs");
let (lib, bin) = rls.block_on(lib.join(bin)).unwrap();
let (lib, bin) = rls.block_on(future::join(lib, bin)).unwrap();
let (lib, bin) = (lib.unwrap(), bin.unwrap());

assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `test_val`")));
assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `unused`")));
Expand All @@ -268,7 +269,8 @@ fn client_changing_workspace_lib_retains_diagnostics() {

let lib = rls.future_diagnostics("library/src/lib.rs");
let bin = rls.future_diagnostics("binary/src/main.rs");
let (lib, bin) = rls.block_on(lib.join(bin)).unwrap();
let (lib, bin) = rls.block_on(future::join(lib, bin)).unwrap();
let (lib, bin) = (lib.unwrap(), bin.unwrap());

// lib unit tests have compile errors
assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `unused`")));
Expand All @@ -293,7 +295,8 @@ fn client_changing_workspace_lib_retains_diagnostics() {

let lib = rls.future_diagnostics("library/src/lib.rs");
let bin = rls.future_diagnostics("binary/src/main.rs");
let (lib, bin) = rls.block_on(lib.join(bin)).unwrap();
let (lib, bin) = rls.block_on(future::join(lib, bin)).unwrap();
let (lib, bin) = (lib.unwrap(), bin.unwrap());

assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `test_val`")));
assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `unused`")));
Expand Down Expand Up @@ -349,6 +352,7 @@ fn client_implicit_workspace_pick_up_lib_changes() {

let bin = rls.future_diagnostics("src/main.rs");
let bin = rls.block_on(bin).unwrap();
let bin = bin.unwrap();
assert!(bin.diagnostics[0].message.contains("unused variable: `val`"));

rls.notify::<DidChangeTextDocument>(DidChangeTextDocumentParams {
Expand All @@ -369,6 +373,7 @@ fn client_implicit_workspace_pick_up_lib_changes() {
// bin depending on lib picks up type mismatch
let bin = rls.future_diagnostics("src/main.rs");
let bin = rls.block_on(bin).unwrap();
let bin = bin.unwrap();
assert!(bin.diagnostics[0].message.contains("cannot find function `foo`"));

rls.notify::<DidChangeTextDocument>(DidChangeTextDocumentParams {
Expand All @@ -388,6 +393,7 @@ fn client_implicit_workspace_pick_up_lib_changes() {

let bin = rls.future_diagnostics("src/main.rs");
let bin = rls.block_on(bin).unwrap();
let bin = bin.unwrap();
assert!(bin.diagnostics[0].message.contains("unused variable: `val`"));
}

Expand Down Expand Up @@ -1971,7 +1977,7 @@ fn client_omit_init_build() {
// We need to assert that no other messages are received after a short
// period of time (e.g. no build progress messages).
std::thread::sleep(std::time::Duration::from_secs(1));
rls.block_on(response).unwrap();
rls.block_on(response).unwrap().unwrap();

assert_eq!(rls.messages().iter().count(), 1);
}
Expand Down Expand Up @@ -2141,8 +2147,7 @@ fn client_fail_uninitialized_request() {
},
);

let delay = tokio_timer::Delay::new(Instant::now() + Duration::from_secs(1));
rls.block_on(delay).unwrap();
rls.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }).unwrap();

let err = jsonrpc_core::Failure::deserialize(rls.messages().last().unwrap()).unwrap();
assert_eq!(err.id, jsonrpc_core::Id::Num(ID));
Expand Down
52 changes: 29 additions & 23 deletions tests/support/client/child_process.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,61 @@
use std::io::{Read, Write};
use std::io;
use std::pin::Pin;
use std::process::{Command, Stdio};
use std::rc::Rc;
use std::task::{Context, Poll};

use futures::Poll;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_process::{Child, CommandExt};

pub struct ChildProcess {
stdin: tokio_process::ChildStdin,
stdout: tokio_process::ChildStdout,
child: Rc<tokio_process::Child>,
stdin: tokio::process::ChildStdin,
stdout: tokio::process::ChildStdout,
child: Rc<tokio::process::Child>,
}

impl Read for ChildProcess {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
Read::read(&mut self.stdout, buf)
impl AsyncRead for ChildProcess {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.stdout).poll_read(cx, buf)
}
}

impl Write for ChildProcess {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Write::write(&mut self.stdin, buf)
impl AsyncWrite for ChildProcess {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.stdin).poll_write(cx, buf)
}
fn flush(&mut self) -> std::io::Result<()> {
Write::flush(&mut self.stdin)

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.stdin).poll_flush(cx)
}
}

impl AsyncRead for ChildProcess {}
impl AsyncWrite for ChildProcess {
fn shutdown(&mut self) -> Poll<(), std::io::Error> {
AsyncWrite::shutdown(&mut self.stdin)
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.stdin).poll_shutdown(cx)
}
}

impl ChildProcess {
pub fn spawn_from_command(mut cmd: Command) -> Result<ChildProcess, std::io::Error> {
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
let mut child = cmd.spawn_async()?;
let mut child = tokio::process::Command::from(cmd).spawn().expect("to async spawn process");

Ok(ChildProcess {
stdout: child.stdout().take().unwrap(),
stdin: child.stdin().take().unwrap(),
stdout: child.stdout.take().unwrap(),
stdin: child.stdin.take().unwrap(),
child: Rc::new(child),
})
}

/// Returns a handle to the underlying `Child` process.
/// Useful when waiting until child process exits.
pub fn child(&self) -> Rc<Child> {
pub fn child(&self) -> Rc<tokio::process::Child> {
Rc::clone(&self.child)
}
}
Loading

0 comments on commit 581415c

Please sign in to comment.