Skip to content

Commit

Permalink
Merge pull request #486 from zed-industries/background-highlights
Browse files Browse the repository at this point in the history
Move `GetDocumentHighlights` to the background and fix collaboration race conditions
  • Loading branch information
as-cii committed Feb 24, 2022
2 parents 73fcebb + d929819 commit 39ebaeb
Show file tree
Hide file tree
Showing 12 changed files with 591 additions and 467 deletions.
25 changes: 17 additions & 8 deletions crates/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,13 @@ impl Client {
let cx = cx.clone();
let this = self.clone();
async move {
let mut message_id = 0_usize;
while let Some(message) = incoming.next().await {
let mut state = this.state.write();
let payload_type_id = message.payload_type_id();
message_id += 1;
let type_name = message.payload_type_name();
let payload_type_id = message.payload_type_id();
let sender_id = message.original_sender_id().map(|id| id.0);

let model = state
.models_by_message_type
Expand Down Expand Up @@ -575,24 +578,30 @@ impl Client {

let client_id = this.id;
log::debug!(
"rpc message received. client_id:{}, name:{}",
"rpc message received. client_id:{}, message_id:{}, sender_id:{:?}, type:{}",
client_id,
message_id,
sender_id,
type_name
);
cx.foreground()
.spawn(async move {
match future.await {
Ok(()) => {
log::debug!(
"rpc message handled. client_id:{}, name:{}",
"rpc message handled. client_id:{}, message_id:{}, sender_id:{:?}, type:{}",
client_id,
message_id,
sender_id,
type_name
);
}
Err(error) => {
log::error!(
"error handling message. client_id:{}, name:{}, {}",
"error handling message. client_id:{}, message_id:{}, sender_id:{:?}, type:{}, error:{:?}",
client_id,
message_id,
sender_id,
type_name,
error
);
Expand Down Expand Up @@ -827,7 +836,7 @@ impl Client {
) -> impl Future<Output = Result<T::Response>> {
let client_id = self.id;
log::debug!(
"rpc request start. client_id: {}. name:{}",
"rpc request start. client_id:{}. name:{}",
client_id,
T::NAME
);
Expand All @@ -837,7 +846,7 @@ impl Client {
async move {
let response = response?.await;
log::debug!(
"rpc request finish. client_id: {}. name:{}",
"rpc request finish. client_id:{}. name:{}",
client_id,
T::NAME
);
Expand All @@ -846,7 +855,7 @@ impl Client {
}

fn respond<T: RequestMessage>(&self, receipt: Receipt<T>, response: T::Response) -> Result<()> {
log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME);
log::debug!("rpc respond. client_id:{}. name:{}", self.id, T::NAME);
self.peer.respond(receipt, response)
}

Expand All @@ -855,7 +864,7 @@ impl Client {
receipt: Receipt<T>,
error: proto::Error,
) -> Result<()> {
log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME);
log::debug!("rpc respond. client_id:{}. name:{}", self.id, T::NAME);
self.peer.respond_with_error(receipt, error)
}
}
Expand Down
24 changes: 13 additions & 11 deletions crates/editor/src/editor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8143,16 +8143,18 @@ mod tests {
#[gpui::test]
async fn test_completion(mut cx: gpui::TestAppContext) {
let settings = cx.read(EditorSettings::test);
let (language_server, mut fake) = lsp::LanguageServer::fake_with_capabilities(
lsp::ServerCapabilities {
completion_provider: Some(lsp::CompletionOptions {
trigger_characters: Some(vec![".".to_string(), ":".to_string()]),
let (language_server, mut fake) = cx.update(|cx| {
lsp::LanguageServer::fake_with_capabilities(
lsp::ServerCapabilities {
completion_provider: Some(lsp::CompletionOptions {
trigger_characters: Some(vec![".".to_string(), ":".to_string()]),
..Default::default()
}),
..Default::default()
}),
..Default::default()
},
cx.background(),
);
},
cx,
)
});

let text = "
one
Expand Down Expand Up @@ -8318,7 +8320,7 @@ mod tests {
position: Point,
completions: Vec<(Range<Point>, &'static str)>,
) {
fake.handle_request::<lsp::request::Completion, _>(move |params| {
fake.handle_request::<lsp::request::Completion, _>(move |params, _| {
assert_eq!(
params.text_document_position.text_document.uri,
lsp::Url::from_file_path(path).unwrap()
Expand Down Expand Up @@ -8352,7 +8354,7 @@ mod tests {
fake: &mut FakeLanguageServer,
edit: Option<(Range<Point>, &'static str)>,
) {
fake.handle_request::<lsp::request::ResolveCompletionItem, _>(move |_| {
fake.handle_request::<lsp::request::ResolveCompletionItem, _>(move |_, _| {
lsp::CompletionItem {
additional_text_edits: edit.clone().map(|(range, new_text)| {
vec![lsp::TextEdit::new(
Expand Down
6 changes: 3 additions & 3 deletions crates/language/src/language.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use futures::{
future::{BoxFuture, Shared},
FutureExt, TryFutureExt,
};
use gpui::{AppContext, Task};
use gpui::{MutableAppContext, Task};
use highlight_map::HighlightMap;
use lazy_static::lazy_static;
use parking_lot::{Mutex, RwLock};
Expand Down Expand Up @@ -225,7 +225,7 @@ impl LanguageRegistry {
language: &Arc<Language>,
root_path: Arc<Path>,
http_client: Arc<dyn HttpClient>,
cx: &AppContext,
cx: &mut MutableAppContext,
) -> Option<Task<Result<Arc<lsp::LanguageServer>>>> {
#[cfg(any(test, feature = "test-support"))]
if let Some(config) = &language.config.language_server {
Expand All @@ -234,7 +234,7 @@ impl LanguageRegistry {

let (server, mut fake_server) = lsp::LanguageServer::fake_with_capabilities(
fake_config.capabilities.clone(),
cx.background().clone(),
cx,
);

if let Some(initalizer) = &fake_config.initializer {
Expand Down
4 changes: 2 additions & 2 deletions crates/language/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ fn test_autoindent_adjusts_lines_when_only_text_changes(cx: &mut MutableAppConte

#[gpui::test]
async fn test_diagnostics(mut cx: gpui::TestAppContext) {
let (language_server, mut fake) = lsp::LanguageServer::fake(cx.background());
let (language_server, mut fake) = cx.update(lsp::LanguageServer::fake);
let mut rust_lang = rust_lang();
rust_lang.config.language_server = Some(LanguageServerConfig {
disk_based_diagnostic_sources: HashSet::from_iter(["disk".to_string()]),
Expand Down Expand Up @@ -837,7 +837,7 @@ async fn test_diagnostics(mut cx: gpui::TestAppContext) {

#[gpui::test]
async fn test_edits_from_lsp_with_past_version(mut cx: gpui::TestAppContext) {
let (language_server, mut fake) = lsp::LanguageServer::fake(cx.background());
let (language_server, mut fake) = cx.update(lsp::LanguageServer::fake);

let text = "
fn a() {
Expand Down
106 changes: 58 additions & 48 deletions crates/lsp/src/lsp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,36 +483,47 @@ impl Drop for Subscription {

#[cfg(any(test, feature = "test-support"))]
pub struct FakeLanguageServer {
handlers:
Arc<Mutex<HashMap<&'static str, Box<dyn Send + Sync + FnMut(usize, &[u8]) -> Vec<u8>>>>>,
handlers: Arc<
Mutex<
HashMap<
&'static str,
Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
>,
>,
>,
outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
}

#[cfg(any(test, feature = "test-support"))]
impl LanguageServer {
pub fn fake(executor: Arc<gpui::executor::Background>) -> (Arc<Self>, FakeLanguageServer) {
Self::fake_with_capabilities(Default::default(), executor)
pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc<Self>, FakeLanguageServer) {
Self::fake_with_capabilities(Default::default(), cx)
}

pub fn fake_with_capabilities(
capabilities: ServerCapabilities,
executor: Arc<gpui::executor::Background>,
cx: &mut gpui::MutableAppContext,
) -> (Arc<Self>, FakeLanguageServer) {
let (stdin_writer, stdin_reader) = async_pipe::pipe();
let (stdout_writer, stdout_reader) = async_pipe::pipe();

let mut fake = FakeLanguageServer::new(executor.clone(), stdin_reader, stdout_writer);
let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
fake.handle_request::<request::Initialize, _>({
let capabilities = capabilities.clone();
move |_| InitializeResult {
move |_, _| InitializeResult {
capabilities: capabilities.clone(),
..Default::default()
}
});

let server =
Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), executor).unwrap();
let server = Self::new_internal(
stdin_writer,
stdout_reader,
Path::new("/"),
cx.background().clone(),
)
.unwrap();

(server, fake)
}
Expand All @@ -521,9 +532,9 @@ impl LanguageServer {
#[cfg(any(test, feature = "test-support"))]
impl FakeLanguageServer {
fn new(
background: Arc<gpui::executor::Background>,
stdin: async_pipe::PipeReader,
stdout: async_pipe::PipeWriter,
cx: &mut gpui::MutableAppContext,
) -> Self {
use futures::StreamExt as _;

Expand All @@ -537,43 +548,42 @@ impl FakeLanguageServer {

// Receive incoming messages
let handlers = this.handlers.clone();
let executor = background.clone();
background
.spawn(async move {
let mut buffer = Vec::new();
let mut stdin = smol::io::BufReader::new(stdin);
while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
executor.simulate_random_delay().await;
if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
assert_eq!(request.jsonrpc, JSON_RPC_VERSION);

if let Some(handler) = handlers.lock().get_mut(request.method) {
let response = handler(request.id, request.params.get().as_bytes());
log::debug!("handled lsp request. method:{}", request.method);
outgoing_tx.unbounded_send(response)?;
} else {
log::debug!("unhandled lsp request. method:{}", request.method);
outgoing_tx.unbounded_send(
serde_json::to_vec(&AnyResponse {
id: request.id,
error: Some(Error {
message: "no handler".to_string(),
}),
result: None,
})
.unwrap(),
)?;
}
cx.spawn(|cx| async move {
let mut buffer = Vec::new();
let mut stdin = smol::io::BufReader::new(stdin);
while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
cx.background().simulate_random_delay().await;
if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
assert_eq!(request.jsonrpc, JSON_RPC_VERSION);

if let Some(handler) = handlers.lock().get_mut(request.method) {
let response =
handler(request.id, request.params.get().as_bytes(), cx.clone());
log::debug!("handled lsp request. method:{}", request.method);
outgoing_tx.unbounded_send(response)?;
} else {
incoming_tx.unbounded_send(buffer.clone())?;
log::debug!("unhandled lsp request. method:{}", request.method);
outgoing_tx.unbounded_send(
serde_json::to_vec(&AnyResponse {
id: request.id,
error: Some(Error {
message: "no handler".to_string(),
}),
result: None,
})
.unwrap(),
)?;
}
} else {
incoming_tx.unbounded_send(buffer.clone())?;
}
Ok::<_, anyhow::Error>(())
})
.detach();
}
Ok::<_, anyhow::Error>(())
})
.detach();

// Send outgoing messages
background
cx.background()
.spawn(async move {
let mut stdout = smol::io::BufWriter::new(stdout);
while let Some(notification) = outgoing_rx.next().await {
Expand Down Expand Up @@ -618,13 +628,13 @@ impl FakeLanguageServer {
) -> futures::channel::mpsc::UnboundedReceiver<()>
where
T: 'static + request::Request,
F: 'static + Send + Sync + FnMut(T::Params) -> T::Result,
F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
{
let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
self.handlers.lock().insert(
T::METHOD,
Box::new(move |id, params| {
let result = handler(serde_json::from_slice::<T::Params>(params).unwrap());
Box::new(move |id, params, cx| {
let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
let result = serde_json::to_string(&result).unwrap();
let result = serde_json::from_str::<&RawValue>(&result).unwrap();
let response = AnyResponse {
Expand Down Expand Up @@ -709,8 +719,8 @@ mod tests {
}

#[gpui::test]
async fn test_fake(cx: TestAppContext) {
let (server, mut fake) = LanguageServer::fake(cx.background());
async fn test_fake(mut cx: TestAppContext) {
let (server, mut fake) = cx.update(LanguageServer::fake);

let (message_tx, message_rx) = channel::unbounded();
let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
Expand Down Expand Up @@ -762,7 +772,7 @@ mod tests {
"file://b/c"
);

fake.handle_request::<request::Shutdown, _>(|_| ());
fake.handle_request::<request::Shutdown, _>(|_, _| ());

drop(server);
fake.receive_notification::<notification::Exit>().await;
Expand Down
Loading

0 comments on commit 39ebaeb

Please sign in to comment.