diff --git a/crates/mun_language_server/Cargo.toml b/crates/mun_language_server/Cargo.toml index 0fb4fe8f3..aadec71e6 100644 --- a/crates/mun_language_server/Cargo.toml +++ b/crates/mun_language_server/Cargo.toml @@ -16,26 +16,26 @@ categories = ["game-development", "mun"] [dependencies] rustc-hash="1.1.0" -lsp-types = "0.74" +lsp-types = "0.86.0" +lsp-server = "0.5.0" log = "0.4" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" -async-std = "1.6" -futures = "0.3" anyhow = "1.0" thiserror = "1.0" salsa = "0.15.0" hir = { version = "=0.2.0", path="../mun_hir", package="mun_hir" } -rayon = "1.3" -num_cpus = "1.13.0" +threadpool="1.8.1" vfs = { path = "../mun_vfs", package="mun_vfs" } project = { path = "../mun_project", package="mun_project" } mun_target = { version = "=0.2.0", path = "../mun_target" } mun_syntax = { version = "=0.2.0", path = "../mun_syntax" } mun_diagnostics = { version = "=0.1.0", path = "../mun_diagnostics" } crossbeam-channel = "0.5.0" +parking_lot="0.11.1" paths = {path="../mun_paths", package="mun_paths"} [dev-dependencies] tempdir = "0.3.7" +mun_test = { path = "../mun_test"} diff --git a/crates/mun_language_server/src/analysis.rs b/crates/mun_language_server/src/analysis.rs index 9a8aa282d..dcc0f0659 100644 --- a/crates/mun_language_server/src/analysis.rs +++ b/crates/mun_language_server/src/analysis.rs @@ -37,6 +37,11 @@ impl Analysis { db: self.db.snapshot(), } } + + /// Requests any outstanding snapshot to cancel computations. + pub fn request_cancelation(&mut self) { + self.db.request_cancelation(); + } } /// The `AnalysisSnapshot` is a snapshot of the state of the source, it enables querying for diff --git a/crates/mun_language_server/src/cancelation.rs b/crates/mun_language_server/src/cancelation.rs index c343120b9..0eff65b0e 100644 --- a/crates/mun_language_server/src/cancelation.rs +++ b/crates/mun_language_server/src/cancelation.rs @@ -1,3 +1,5 @@ +use std::error::Error; + /// An error signifying a cancelled operation. pub struct Canceled { // This is here so that you cannot construct a Canceled @@ -29,3 +31,8 @@ impl std::fmt::Debug for Canceled { } impl std::error::Error for Canceled {} + +/// Returns true if the specified error is of type [`Canceled`] +pub(crate) fn is_canceled(e: &(dyn Error + 'static)) -> bool { + e.downcast_ref::().is_some() +} diff --git a/crates/mun_language_server/src/config.rs b/crates/mun_language_server/src/config.rs index ab3b2af48..7bfd17dbe 100644 --- a/crates/mun_language_server/src/config.rs +++ b/crates/mun_language_server/src/config.rs @@ -1,5 +1,5 @@ -use crate::project_manifest::ProjectManifest; use paths::AbsPathBuf; +use project::ProjectManifest; /// The configuration used by the language server. #[derive(Debug, Clone)] diff --git a/crates/mun_language_server/src/conversion.rs b/crates/mun_language_server/src/conversion.rs index b51f57289..439fea79f 100644 --- a/crates/mun_language_server/src/conversion.rs +++ b/crates/mun_language_server/src/conversion.rs @@ -63,8 +63,8 @@ pub fn convert_unit( ) -> lsp_types::Position { let line_col = line_index.line_col(range); lsp_types::Position { - line: line_col.line.into(), - character: line_col.col.into(), + line: line_col.line, + character: line_col.col, } } diff --git a/crates/mun_language_server/src/db.rs b/crates/mun_language_server/src/db.rs index a4383991b..f111c463b 100644 --- a/crates/mun_language_server/src/db.rs +++ b/crates/mun_language_server/src/db.rs @@ -3,7 +3,7 @@ use crate::cancelation::Canceled; use hir::{HirDatabase, Upcast}; use mun_target::spec::Target; -use salsa::{Database, Snapshot}; +use salsa::{Database, Durability, Snapshot}; use std::panic; /// The `AnalysisDatabase` provides the database for all analyses. A database is given input and @@ -38,6 +38,11 @@ impl AnalysisDatabase { db } + + /// Triggers a simple write on the database which will cancell all outstanding snapshots. + pub fn request_cancelation(&mut self) { + self.salsa_runtime_mut().synthetic_write(Durability::LOW); + } } impl salsa::Database for AnalysisDatabase { diff --git a/crates/mun_language_server/src/lib.rs b/crates/mun_language_server/src/lib.rs index f3819d951..c1544d94d 100644 --- a/crates/mun_language_server/src/lib.rs +++ b/crates/mun_language_server/src/lib.rs @@ -1,3 +1,13 @@ +use std::convert::TryFrom; + +use serde::{de::DeserializeOwned, Serialize}; + +pub use config::{Config, FilesWatcher}; +pub use main_loop::main_loop; +use paths::AbsPathBuf; +use project::ProjectManifest; +pub(crate) use state::LanguageServerState; + mod analysis; mod cancelation; mod capabilities; @@ -7,40 +17,31 @@ mod conversion; mod db; mod diagnostics; mod main_loop; -mod project_manifest; -pub mod protocol; -mod workspace; - -pub use config::Config; -pub use main_loop::main_loop; - -use crate::{config::FilesWatcher, project_manifest::ProjectManifest}; -use paths::AbsPathBuf; -use serde::{de::DeserializeOwned, Serialize}; -use std::convert::TryFrom; - -pub type Result = anyhow::Result; +mod state; /// Deserializes a `T` from a json value. -pub fn from_json(what: &'static str, json: serde_json::Value) -> Result { +pub fn from_json( + what: &'static str, + json: serde_json::Value, +) -> anyhow::Result { T::deserialize(&json) .map_err(|e| anyhow::anyhow!("could not deserialize {}: {}: {}", what, e, json)) } /// Converts the `T` to a json value -pub fn to_json(value: T) -> Result { +pub fn to_json(value: T) -> anyhow::Result { serde_json::to_value(value).map_err(|e| anyhow::anyhow!("could not serialize to json: {}", e)) } /// Main entry point for the language server -pub async fn run_server_async() -> Result<()> { +pub fn run_server() -> anyhow::Result<()> { log::info!("language server started"); // Setup IO connections - let mut connection = protocol::Connection::stdio(); + let (connection, io_threads) = lsp_server::Connection::stdio(); // Wait for a client to connect - let (initialize_id, initialize_params) = connection.initialize_start().await?; + let (initialize_id, initialize_params) = connection.initialize_start()?; let initialize_params = from_json::("InitializeParams", initialize_params)?; @@ -57,9 +58,7 @@ pub async fn run_server_async() -> Result<()> { let initialize_result = serde_json::to_value(initialize_result).unwrap(); - connection - .initialize_finish(initialize_id, initialize_result) - .await?; + connection.initialize_finish(initialize_id, initialize_result)?; if let Some(client_info) = initialize_params.client_info { log::info!( @@ -122,12 +121,8 @@ pub async fn run_server_async() -> Result<()> { config }; - main_loop(connection, config).await?; + main_loop(connection, config)?; + io_threads.join()?; Ok(()) } - -/// Main entry point for the language server -pub fn run_server() -> Result<()> { - async_std::task::block_on(run_server_async()) -} diff --git a/crates/mun_language_server/src/main_loop.rs b/crates/mun_language_server/src/main_loop.rs index 96022fe6f..b7c12e9c6 100644 --- a/crates/mun_language_server/src/main_loop.rs +++ b/crates/mun_language_server/src/main_loop.rs @@ -1,549 +1,8 @@ -use crate::{ - analysis::{Analysis, AnalysisSnapshot, Cancelable}, - change::AnalysisChange, - config::Config, - conversion::{convert_range, convert_uri, url_from_path_with_drive_lowercasing}, - protocol::{Connection, Message, Notification, Request, RequestId}, - Result, -}; -use async_std::sync::RwLock; -use futures::{ - channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, - SinkExt, StreamExt, -}; -use lsp_types::{notification::PublishDiagnostics, PublishDiagnosticsParams, Url}; -use paths::AbsPathBuf; -use rustc_hash::FxHashSet; -use serde::{de::DeserializeOwned, Serialize}; -use std::{cell::RefCell, collections::HashSet, ops::Deref, sync::Arc}; -use vfs::VirtualFileSystem; - -/// A `Task` is something that is send from async tasks to the entry point for processing. This -/// enables synchronizing resources like the connection with the client. -#[derive(Debug)] -enum Task { - Notify(Notification), -} - -#[derive(Debug)] -enum Event { - Msg(Message), - Vfs(vfs::MonitorMessage), - Task(Task), -} - -/// State for the language server -pub(crate) struct LanguageServerState { - /// The connection with the client - pub connection: ConnectionState, - - /// The configuration passed by the client - pub config: Config, - - /// Thread pool for async execution - pub thread_pool: rayon::ThreadPool, - - /// The virtual filesystem that holds all the file contents - pub vfs: Arc>, - - /// The vfs monitor - pub vfs_monitor: Box, - - /// The receiver of vfs monitor messages - pub vfs_monitor_receiver: UnboundedReceiver, - - /// Documents that are currently kept in memory from the client - pub open_docs: FxHashSet, - - /// Holds the state of the analysis process - pub analysis: Analysis, - - /// All the packages known to the server - pub packages: Arc>, -} - -/// A snapshot of the state of the language server -pub(crate) struct LanguageServerSnapshot { - /// The virtual filesystem that holds all the file contents - pub vfs: Arc>, - - /// Holds the state of the analysis process - pub analysis: AnalysisSnapshot, - - /// All the packages known to the server - pub packages: Arc>, -} - -/// State maintained for the connection. This includes everything that is required to be able to -/// properly communicate with the client but has nothing to do with any Mun related state. -pub(crate) struct ConnectionState { - pub(crate) connection: Connection, - - next_request_id: u64, - pending_responses: HashSet, -} - -impl ConnectionState { - /// Constructs a new `ConnectionState` - fn new(connection: Connection) -> Self { - Self { - connection, - next_request_id: 0, - pending_responses: Default::default(), - } - } - - /// Constructs a new request ID and stores that we are still awaiting a response. - fn next_request_id(&mut self) -> RequestId { - self.next_request_id += 1; - let res: RequestId = self.next_request_id.into(); - let inserted = self.pending_responses.insert(res.clone()); - debug_assert!(inserted); - res - } -} - -impl LanguageServerState { - pub fn new(connection: Connection, config: Config) -> Self { - // Construct the virtual filesystem monitor - let (vfs_monitor_sender, vfs_monitor_receiver) = unbounded::(); - let vfs_monitor_sender = RefCell::new(vfs_monitor_sender); - let vfs_monitor: vfs::NotifyMonitor = vfs::Monitor::new(Box::new(move |msg| { - async_std::task::block_on(vfs_monitor_sender.borrow_mut().send(msg)) - .expect("error sending vfs monitor message to foreground") - })); - let vfs_monitor = Box::new(vfs_monitor) as Box; - - // Create a thread pool to dispatch the async commands - // Use the num_cpus to get a nice thread count estimation - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(num_cpus::get()) - .build() - .expect("unable to spin up thread pool"); - - // Apply the initial changes - let mut change = AnalysisChange::new(); - change.set_packages(Default::default()); - change.set_roots(Default::default()); - - // Construct the state that will hold all the analysis - let mut analysis = Analysis::new(); - analysis.apply_change(change); - - LanguageServerState { - connection: ConnectionState::new(connection), - config, - vfs: Arc::new(RwLock::new(Default::default())), - vfs_monitor, - vfs_monitor_receiver, - open_docs: FxHashSet::default(), - thread_pool, - analysis, - packages: Arc::new(Vec::new()), - } - } - - /// Runs the language server to completion - pub async fn run(mut self) -> Result<()> { - // Start by updating the current workspace - self.fetch_workspaces(); - - // Process events as the pass - let (task_sender, mut task_receiver) = futures::channel::mpsc::unbounded::(); - loop { - // Determine what to do next. This selects from different channels, the first message to - // arrive is returned. If an error occurs on one of the channel the main loop is shutdown - // with an error. - let event = futures::select! { - msg = self.connection.connection.receiver.next() => match msg { - Some(msg) => Event::Msg(msg), - None => return Err(anyhow::anyhow!("client exited without shutdown")), - }, - msg = self.vfs_monitor_receiver.next() => match msg { - Some(msg) => Event::Vfs(msg), - None => return Err(anyhow::anyhow!("client exited without shutdown")), - }, - task = task_receiver.next() => Event::Task(task.unwrap()), - }; - - // Handle the event - match handle_event(event, &task_sender, &mut self).await? { - LoopState::Continue => {} - LoopState::Shutdown => { - break; - } - } - } - - Ok(()) - } -} +use crate::{Config, LanguageServerState}; +use lsp_server::Connection; /// Runs the main loop of the language server. This will receive requests and handle them. -pub async fn main_loop(connection: Connection, config: Config) -> Result<()> { +pub fn main_loop(connection: Connection, config: Config) -> anyhow::Result<()> { log::info!("initial config: {:#?}", config); - LanguageServerState::new(connection, config).run().await -} - -/// A `LoopState` enumerator determines the state of the main loop -enum LoopState { - Continue, - Shutdown, -} - -/// Handles a received request -async fn handle_request(request: Request, state: &mut LanguageServerState) -> Result { - if state - .connection - .connection - .handle_shutdown(&request) - .await? - { - return Ok(LoopState::Shutdown); - }; - Ok(LoopState::Continue) -} - -/// Handles a received notification -async fn on_notification( - notification: Notification, - state: &mut LanguageServerState, -) -> Result { - let notification = - // When a a text document is opened - match cast_notification::(notification) { - Ok(params) => { - if let Ok(path) = convert_uri(¶ms.text_document.uri) { - state.open_docs.insert(path.clone()); - state.vfs.write().await.set_file_contents(&path, Some(params.text_document.text.into_bytes())); - } - return Ok(LoopState::Continue); - } - Err(not) => not, - }; - - // When a text document is closed - let notification = - match cast_notification::(notification) { - Ok(params) => { - if let Ok(path) = convert_uri(¶ms.text_document.uri) { - state.open_docs.remove(&path); - state.vfs_monitor.reload(&path); - } - let params = lsp_types::PublishDiagnosticsParams { - uri: params.text_document.uri, - diagnostics: Vec::new(), - version: None, - }; - let not = build_notification::(params); - state - .connection - .connection - .sender - .try_send(not.into()) - .unwrap(); - return Ok(LoopState::Continue); - } - Err(not) => not, - }; - - let notification = - match cast_notification::(notification) { - Ok(params) => { - let lsp_types::DidChangeTextDocumentParams { - text_document, - content_changes, - } = params; - if let Ok(path) = convert_uri(&text_document.uri) { - let new_content = content_changes.get(0).unwrap().text.clone(); - state - .vfs - .write() - .await - .set_file_contents(&path, Some(new_content.into_bytes())); - } - return Ok(LoopState::Continue); - } - Err(not) => not, - }; - - let _notification = - match cast_notification::(notification) { - Ok(params) => { - for change in params.changes { - if let Ok(path) = convert_uri(&change.uri) { - state.vfs_monitor.reload(&path); - } - } - return Ok(LoopState::Continue); - } - Err(not) => not, - }; - - Ok(LoopState::Continue) -} - -/// Handles an incoming event. Returns a `LoopState` state which determines whether processing -/// should continue. -async fn handle_event( - event: Event, - task_sender: &UnboundedSender, - state: &mut LanguageServerState, -) -> Result { - log::info!("handling event: {:?}", event); - - // Process the incoming event - let loop_state = match event { - Event::Task(task) => handle_task(task, state).await?, - Event::Msg(msg) => handle_lsp_message(msg, state).await?, - Event::Vfs(task) => handle_vfs_task(task, state).await?, - }; - - // Process any changes to the vfs - let state_changed = state.process_vfs_changes().await; - dbg!(state_changed); - if state_changed { - let snapshot = state.snapshot(); - let task_sender = task_sender.clone(); - // Spawn the diagnostics in the threadpool - state.thread_pool.spawn(move || { - let _result = async_std::task::block_on(handle_diagnostics(snapshot, task_sender)); - }); - } - - Ok(loop_state) -} - -/// Send all diagnostics of all files -async fn handle_diagnostics( - state: LanguageServerSnapshot, - mut sender: UnboundedSender, -) -> Cancelable<()> { - dbg!(&state.packages); - - // Iterate over all files - for (idx, _package) in state.packages.iter().enumerate() { - let package_id = hir::PackageId(idx as u32); - - // Get all the files - let files = state.analysis.package_source_files(package_id)?; - - // Publish all diagnostics - for file in files { - let line_index = state.analysis.file_line_index(file)?; - let uri = state.file_id_to_uri(file).await.unwrap(); - let diagnostics = state.analysis.diagnostics(file)?; - - let diagnostics = { - let mut lsp_diagnostics = Vec::with_capacity(diagnostics.len()); - for d in diagnostics { - lsp_diagnostics.push(lsp_types::Diagnostic { - range: convert_range(d.range, &line_index), - severity: Some(lsp_types::DiagnosticSeverity::Error), - code: None, - source: Some("mun".to_string()), - message: d.message, - related_information: { - let mut annotations = - Vec::with_capacity(d.additional_annotations.len()); - for annotation in d.additional_annotations { - annotations.push(lsp_types::DiagnosticRelatedInformation { - location: lsp_types::Location { - uri: state - .file_id_to_uri(annotation.range.file_id) - .await - .unwrap(), - range: convert_range( - annotation.range.value, - state - .analysis - .file_line_index(annotation.range.file_id)? - .deref(), - ), - }, - message: annotation.message, - }); - } - if annotations.is_empty() { - None - } else { - Some(annotations) - } - }, - tags: None, - }); - } - lsp_diagnostics - }; - - sender - .send(Task::Notify(build_notification::( - PublishDiagnosticsParams { - uri, - diagnostics, - version: None, - }, - ))) - .await - .unwrap(); - } - } - Ok(()) -} - -/// Handles a task send by another async task -async fn handle_task(task: Task, state: &mut LanguageServerState) -> Result { - match task { - Task::Notify(notification) => { - state - .connection - .connection - .sender - .send(notification.into()) - .await? - } - } - - Ok(LoopState::Continue) -} - -/// Handles a change to the underlying virtual file system. -async fn handle_vfs_task( - mut task: vfs::MonitorMessage, - state: &mut LanguageServerState, -) -> Result { - loop { - match task { - vfs::MonitorMessage::Progress { .. } => {} - vfs::MonitorMessage::Loaded { files } => { - let vfs = &mut *state.vfs.write().await; - for (path, contents) in files { - vfs.set_file_contents(&path, contents); - } - } - } - - // Coalesce many VFS events into a single loop turn - task = match state.vfs_monitor_receiver.try_next() { - Ok(Some(task)) => task, - _ => break, - } - } - Ok(LoopState::Continue) -} - -/// Handles an incoming message via the language server protocol. -async fn handle_lsp_message(msg: Message, state: &mut LanguageServerState) -> Result { - match msg { - Message::Request(req) => handle_request(req, state).await, - Message::Response(response) => { - let removed = state.connection.pending_responses.remove(&response.id); - if !removed { - log::error!("unexpected response: {:?}", response) - } - - Ok(LoopState::Continue) - } - Message::Notification(notification) => on_notification(notification, state).await, - } -} - -/// Constructs a new notification with the specified parameters. -fn build_notification(params: N::Params) -> Notification -where - N: lsp_types::notification::Notification, - N::Params: Serialize, -{ - Notification::new(N::METHOD.to_string(), params) -} - -/// Casts a notification to the specified type. -fn cast_notification(notification: Notification) -> std::result::Result -where - N: lsp_types::notification::Notification, - N::Params: DeserializeOwned, -{ - notification.try_extract(N::METHOD) -} - -impl LanguageServerState { - /// Sends a new request to the client - pub fn send_request(&mut self, params: R::Params) { - let request = Request::new( - self.connection.next_request_id(), - R::METHOD.to_string(), - params, - ); - async_std::task::block_on(self.connection.connection.sender.send(request.into())).unwrap(); - } -} - -impl LanguageServerState { - /// Creates a snapshot of the state - pub fn snapshot(&self) -> LanguageServerSnapshot { - LanguageServerSnapshot { - vfs: self.vfs.clone(), - analysis: self.analysis.snapshot(), - packages: self.packages.clone(), - } - } - - /// Processes any and all changes that have been applied to the virtual filesystem. Generates - /// an `AnalysisChange` and applies it if there are changes. True is returned if things changed, - /// otherwise false. - pub async fn process_vfs_changes(&mut self) -> bool { - // Get all the changes since the last time we processed - let changed_files = { - let mut vfs = self.vfs.write().await; - vfs.take_changes() - }; - if changed_files.is_empty() { - return false; - } - - // Construct an AnalysisChange to apply to the analysis - let vfs = self.vfs.read().await; - let mut analysis_change = AnalysisChange::new(); - let mut has_created_or_deleted_entries = false; - for file in changed_files { - // If the file was deleted or created we have to remember that so that we update the - // source roots as well. - if file.is_created_or_deleted() { - has_created_or_deleted_entries = true; - } - - // Convert the contents of the file to a string - let bytes = vfs - .file_contents(file.file_id) - .map(Vec::from) - .unwrap_or_default(); - let text = match String::from_utf8(bytes).ok() { - Some(text) => Some(Arc::from(text)), - None => None, - }; - - // Notify the database about this change - analysis_change.change_file(hir::FileId(file.file_id.0), text); - } - - // If an entry was created or deleted we have to recreate all source roots - if has_created_or_deleted_entries { - analysis_change.set_roots(self.recompute_source_roots()); - } - - // Apply the change - self.analysis.apply_change(analysis_change); - true - } -} - -impl LanguageServerSnapshot { - /// Converts the specified `hir::FileId` to a `Url` - pub async fn file_id_to_uri(&self, id: hir::FileId) -> Result { - let vfs = self.vfs.read().await; - let path = vfs.file_path(vfs::FileId(id.0)); - let url = url_from_path_with_drive_lowercasing(path)?; - - Ok(url) - } + LanguageServerState::new(connection.sender, config).run(connection.receiver) } diff --git a/crates/mun_language_server/src/protocol.rs b/crates/mun_language_server/src/protocol.rs deleted file mode 100644 index f9a723314..000000000 --- a/crates/mun_language_server/src/protocol.rs +++ /dev/null @@ -1,8 +0,0 @@ -mod connection; -mod error; -mod message; -mod stdio; - -pub use connection::Connection; -pub use error::ProtocolError; -pub use message::{Message, Notification, Request, RequestId, Response, ResponseError}; diff --git a/crates/mun_language_server/src/protocol/connection.rs b/crates/mun_language_server/src/protocol/connection.rs deleted file mode 100644 index 06490984d..000000000 --- a/crates/mun_language_server/src/protocol/connection.rs +++ /dev/null @@ -1,106 +0,0 @@ -use super::{Message, ProtocolError, Request, RequestId, Response}; -use async_std::future::{timeout, TimeoutError}; -use futures::channel::mpsc; -use futures::{SinkExt, StreamExt}; -use std::time::Duration; - -/// Represents a connection between a language server server and a language server client. -pub struct Connection { - pub sender: mpsc::Sender, - pub receiver: mpsc::Receiver, -} - -impl Connection { - /// Creates a connection that communicates over stdout and stdin. This enables inter-process - /// communication. - pub fn stdio() -> Connection { - let (sender, receiver) = super::stdio::stdio_transport(); - Connection { sender, receiver } - } - - /// Creates a pair of connected connections. This enables in-process communication, especially - /// useful for testing. - pub fn memory() -> (Connection, Connection) { - let (s1, r1) = mpsc::channel(0); - let (s2, r2) = mpsc::channel(0); - ( - Connection { - sender: s1, - receiver: r2, - }, - Connection { - sender: s2, - receiver: r1, - }, - ) - } - - /// Starts the initialization process by waiting for an initialize request from the client. - pub async fn initialize_start( - &mut self, - ) -> Result<(RequestId, serde_json::Value), ProtocolError> { - let req = match self.receiver.next().await { - Some(Message::Request(req)) => { - if req.is_initialize() { - req - } else { - return Err(ProtocolError::UnexpectedMessage { - expected: "initialize".to_owned(), - received: Some(Message::Request(req)), - }); - } - } - msg => { - return Err(ProtocolError::UnexpectedMessage { - expected: "initialize".to_owned(), - received: msg, - }) - } - }; - Ok((req.id, req.params)) - } - - /// Finishes the initialization process by sending an `InitializeResult` to the client - pub async fn initialize_finish( - &mut self, - initialize_id: RequestId, - initialize_result: serde_json::Value, - ) -> Result<(), ProtocolError> { - let resp = Response::new_ok(initialize_id, initialize_result); - self.sender.send(resp.into()).await.unwrap(); - match self.receiver.next().await { - Some(Message::Notification(n)) if n.is_initialized() => (), - m => { - return Err(ProtocolError::UnexpectedMessage { - expected: "initialized".to_owned(), - received: m, - }) - } - }; - Ok(()) - } - - /// If `req` is a `Shutdown`, responds to it and returns `true`, otherwise returns `false`. - pub async fn handle_shutdown(&mut self, req: &Request) -> Result { - if !req.is_shutdown() { - return Ok(false); - } - let resp = Response::new_ok(req.id.clone(), ()); - let _ = self.sender.send(resp.into()).await; - match timeout(Duration::from_secs(30), self.receiver.next()).await { - Ok(Some(Message::Notification(n))) if n.is_exit() => {} - Err(TimeoutError { .. }) => { - return Err(ProtocolError::Timeout { - waiting_for: "exit".to_owned(), - }) - } - Ok(m) => { - return Err(ProtocolError::UnexpectedMessage { - expected: "exit".to_owned(), - received: m, - }) - } - } - Ok(true) - } -} diff --git a/crates/mun_language_server/src/protocol/error.rs b/crates/mun_language_server/src/protocol/error.rs deleted file mode 100644 index 5d5104f78..000000000 --- a/crates/mun_language_server/src/protocol/error.rs +++ /dev/null @@ -1,14 +0,0 @@ -use super::Message; -use thiserror::Error; - -#[derive(Debug, Clone, Error)] -pub enum ProtocolError { - #[error("expected '{expected}' request, got '{received:?}'")] - UnexpectedMessage { - expected: String, - received: Option, - }, - - #[error("timeout while waiting for {waiting_for}")] - Timeout { waiting_for: String }, -} diff --git a/crates/mun_language_server/src/protocol/message.rs b/crates/mun_language_server/src/protocol/message.rs deleted file mode 100644 index 5400e8aeb..000000000 --- a/crates/mun_language_server/src/protocol/message.rs +++ /dev/null @@ -1,290 +0,0 @@ -use async_std::io; -use futures::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::fmt; - -#[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(untagged)] -pub enum Message { - Request(Request), - Response(Response), - Notification(Notification), -} - -impl From for Message { - fn from(request: Request) -> Message { - Message::Request(request) - } -} - -impl From for Message { - fn from(response: Response) -> Message { - Message::Response(response) - } -} - -impl From for Message { - fn from(notification: Notification) -> Message { - Message::Notification(notification) - } -} - -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[serde(transparent)] -pub struct RequestId(Id); - -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[serde(untagged)] -enum Id { - U64(u64), - String(String), -} - -impl From for RequestId { - fn from(id: u64) -> RequestId { - RequestId(Id::U64(id)) - } -} - -impl From for RequestId { - fn from(id: String) -> RequestId { - RequestId(Id::String(id)) - } -} - -impl fmt::Display for RequestId { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match &self.0 { - Id::U64(id) => write!(f, "{}", id), - Id::String(id) => write!(f, "\"{}\"", id), - } - } -} - -/// A request message to describe a request between the client and the server. Every processed -/// request must send a response back to the sender of the request. -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Request { - pub id: RequestId, - pub method: String, - pub params: serde_json::Value, -} - -/// A Response Message sent as a result of a `Request`. If a request doesn’t provide a result value -/// the receiver of a request still needs to return a response message to conform to the JSON RPC -/// specification. The result property of the ResponseMessage should be set to null in this case to -/// signal a successful request. -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Response { - pub id: RequestId, - #[serde(skip_serializing_if = "Option::is_none")] - pub result: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, -} - -/// An error object in case a request failed. -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct ResponseError { - pub code: i32, - pub message: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub data: Option, -} - -/// An error code indicating the error type that occurred. -#[derive(Clone, Copy, Debug)] -#[allow(unused)] -pub enum ErrorCode { - // Defined by JSON RPC - ParseError = -32700, - InvalidRequest = -32600, - MethodNotFound = -32601, - InvalidParams = -32602, - InternalError = -32603, - ServerErrorStart = -32099, - ServerErrorEnd = -32000, - ServerNotInitialized = -32002, - UnknownErrorCode = -32001, - - // Defined by the protocol. - RequestCanceled = -32800, - ContentModified = -32801, -} - -/// A notification message. A processed notification message must not send a response back. They -/// work like events. -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Notification { - pub method: String, - pub params: serde_json::Value, -} - -impl Message { - /// Reads an RPC message from the given stream - pub async fn read(stream: &mut R) -> io::Result> { - let text = match read_message_string(stream).await? { - None => return Ok(None), - Some(text) => text, - }; - Ok(Some(serde_json::from_str(&text)?)) - } - - /// Writes the RPC message to the given stream - pub async fn write(self, stream: &mut R) -> io::Result<()> { - #[derive(Serialize)] - struct RpcMessage { - jsonrpc: &'static str, - #[serde(flatten)] - msg: Message, - } - let text = serde_json::to_string(&RpcMessage { - jsonrpc: "2.0", - msg: self, - })?; - write_message_string(stream, &text).await - } -} - -impl Response { - /// Constructs a `Response` object signaling the succesfull handling of a request with the - /// specified id. - pub fn new_ok(id: RequestId, result: R) -> Self { - Self { - id, - result: Some(serde_json::to_value(result).unwrap()), - error: None, - } - } - - /// Constructs a `Response` object signalling failure to handle the request with the specified - /// id - pub fn new_err(id: RequestId, code: i32, message: String) -> Self { - Self { - id, - result: None, - error: Some(ResponseError { - code, - message, - data: None, - }), - } - } -} - -impl Request { - /// Constructs a new Request object - pub fn new(id: RequestId, method: String, params: P) -> Self { - Self { - id, - method, - params: serde_json::to_value(params).unwrap(), - } - } - - /// Tries to extract the specific request parameters from this request. - pub fn try_extract(self, method: &str) -> Result<(RequestId, P), Request> { - if self.method == method { - let params = serde_json::from_value(self.params).unwrap_or_else(|err| { - panic!("Invalid request\nMethod: {}\nerror: {}", method, err) - }); - Ok((self.id, params)) - } else { - Err(self) - } - } - - pub(crate) fn is_shutdown(&self) -> bool { - self.method == "shutdown" - } - - pub(crate) fn is_initialize(&self) -> bool { - self.method == "initialize" - } -} - -impl Notification { - /// Constructs a new `Notification` from the specified method name and parameters - pub fn new(method: String, params: P) -> Self { - Self { - method, - params: serde_json::to_value(params).unwrap(), - } - } - - /// Tries to extract the specific notification parameters from this notification. - pub fn try_extract(self, method: &str) -> Result { - if self.method == method { - let params = serde_json::from_value(self.params).unwrap_or_else(|err| { - panic!("Invalid request\nMethod: {}\nerror: {}", method, err) - }); - Ok(params) - } else { - Err(self) - } - } - - pub(crate) fn is_exit(&self) -> bool { - self.method == "exit" - } - pub(crate) fn is_initialized(&self) -> bool { - self.method == "initialized" - } -} - -/// Reads an RPC message from the specified stream. -async fn read_message_string( - stream: &mut R, -) -> io::Result> { - /// Constructs an `InvalidData` error with a cause - fn invalid_data(error: impl Into>) -> io::Error { - io::Error::new(io::ErrorKind::InvalidData, error) - } - - // Loop over all headers of the incoming message. - let mut size = None; - let mut buf = String::new(); - loop { - buf.clear(); - if stream.read_line(&mut buf).await? == 0 { - return Ok(None); - } - if !buf.ends_with("\r\n") { - return Err(invalid_data(format!("malformed header: {:?}", buf))); - } - - // If there are no more headers, break to parse the rest of the message - let buf = &buf[..buf.len() - 2]; - if buf.is_empty() { - break; - } - - // If this is the `Content-Length` header, parse the size of the message - let mut parts = buf.splitn(2, ": "); - let header_name = parts.next().unwrap(); - let header_value = parts - .next() - .ok_or_else(|| invalid_data(format!("malformed header: {:?}", buf)))?; - if header_name == "Content-Length" { - size = Some(header_value.parse::().map_err(invalid_data)?); - } - } - - let size: usize = size.ok_or_else(|| invalid_data("no Content-Length".to_owned()))?; - let mut buf = buf.into_bytes(); - buf.resize(size, 0); - stream.read_exact(&mut buf).await?; - let buf = String::from_utf8(buf).map_err(invalid_data)?; - log::debug!("< {}", buf); - Ok(Some(buf)) -} - -/// Writes an RPC message to the specified stream. -async fn write_message_string(stream: &mut R, msg: &str) -> io::Result<()> { - log::debug!("> {}", msg); - let header = format!("Content-Length: {}\r\n\r\n", msg.len()); - stream.write_all(header.as_bytes()).await?; - stream.write_all(msg.as_bytes()).await?; - stream.flush().await?; - Ok(()) -} diff --git a/crates/mun_language_server/src/protocol/stdio.rs b/crates/mun_language_server/src/protocol/stdio.rs deleted file mode 100644 index 1ac030bef..000000000 --- a/crates/mun_language_server/src/protocol/stdio.rs +++ /dev/null @@ -1,36 +0,0 @@ -use super::Message; -use async_std::io::BufReader; -use futures::{channel::mpsc, SinkExt, StreamExt}; - -/// Constructs a communication channel over stdin (input) and stdout (output) -pub(crate) fn stdio_transport() -> (mpsc::Sender, mpsc::Receiver) { - let (writer_sender, mut writer_receiver) = mpsc::channel::(0); - let (mut reader_sender, reader_receiver) = mpsc::channel::(0); - - // Receive messages over the channel and forward them to stdout - async_std::task::spawn(async move { - let mut stdout = async_std::io::stdout(); - while let Some(msg) = writer_receiver.next().await { - msg.write(&mut stdout).await.unwrap(); - } - }); - - // Receive data over stdin and forward to the application - async_std::task::spawn(async move { - let mut stdin = BufReader::new(async_std::io::stdin()); - while let Some(msg) = Message::read(&mut stdin).await.unwrap() { - let is_exit = match &msg { - Message::Notification(n) => n.is_exit(), - _ => false, - }; - - reader_sender.send(msg).await.unwrap(); - - if is_exit { - break; - } - } - }); - - (writer_sender, reader_receiver) -} diff --git a/crates/mun_language_server/src/state.rs b/crates/mun_language_server/src/state.rs new file mode 100644 index 000000000..1f062f6c1 --- /dev/null +++ b/crates/mun_language_server/src/state.rs @@ -0,0 +1,385 @@ +use crate::{ + analysis::{Analysis, AnalysisSnapshot}, + change::AnalysisChange, + config::Config, + conversion::{convert_range, url_from_path_with_drive_lowercasing}, + state::utils::Progress, + to_json, +}; +use crossbeam_channel::{select, unbounded, Receiver, Sender}; +use lsp_server::ReqQueue; +use lsp_types::{ + notification::Notification, notification::PublishDiagnostics, PublishDiagnosticsParams, Url, +}; +use parking_lot::RwLock; +use paths::AbsPathBuf; +use rustc_hash::FxHashSet; +use std::{ops::Deref, sync::Arc, time::Instant}; +use vfs::VirtualFileSystem; + +mod protocol; +mod utils; +mod workspace; + +/// A `Task` is something that is send from async tasks to the entry point for processing. This +/// enables synchronizing resources like the connection with the client. +#[derive(Debug)] +pub(crate) enum Task { + Notify(lsp_server::Notification), +} + +#[derive(Debug)] +pub(crate) enum Event { + Vfs(vfs::MonitorMessage), + Task(Task), + Lsp(lsp_server::Message), +} + +pub(crate) type RequestHandler = fn(&mut LanguageServerState, lsp_server::Response); + +/// State for the language server +pub(crate) struct LanguageServerState { + /// Channel to send language server messages to the client + pub(crate) sender: Sender, + + /// The request queue keeps track of all incoming and outgoing requests. + pub(crate) request_queue: lsp_server::ReqQueue<(String, Instant), RequestHandler>, + + /// The configuration passed by the client + pub config: Config, + + /// Thread pool for async execution + pub thread_pool: threadpool::ThreadPool, + + /// Channel to send tasks to from background operations + pub task_sender: Sender, + + /// Channel to receive tasks on from background operations + pub task_receiver: Receiver, + + /// The virtual filesystem that holds all the file contents + pub vfs: Arc>, + + /// The vfs monitor + pub vfs_monitor: Box, + + /// The receiver of vfs monitor messages + pub vfs_monitor_receiver: Receiver, + + /// Documents that are currently kept in memory from the client + pub open_docs: FxHashSet, + + /// Holds the state of the analysis process + pub analysis: Analysis, + + /// All the packages known to the server + pub packages: Arc>, + + /// True if the client requested that we shut down + pub shutdown_requested: bool, +} + +/// A snapshot of the state of the language server +pub(crate) struct LanguageServerSnapshot { + /// The virtual filesystem that holds all the file contents + pub vfs: Arc>, + + /// Holds the state of the analysis process + pub analysis: AnalysisSnapshot, + + /// All the packages known to the server + pub packages: Arc>, +} + +impl LanguageServerState { + pub fn new(sender: Sender, config: Config) -> Self { + // Construct the virtual filesystem monitor + let (vfs_monitor_sender, vfs_monitor_receiver) = unbounded::(); + let vfs_monitor: vfs::NotifyMonitor = vfs::Monitor::new(Box::new(move |msg| { + vfs_monitor_sender + .send(msg) + .expect("error sending vfs monitor message to foreground") + })); + let vfs_monitor = Box::new(vfs_monitor) as Box; + + // Construct a task channel + let (task_sender, task_receiver) = unbounded(); + + // Construct the state that will hold all the analysis and apply the initial state + let mut analysis = Analysis::new(); + let mut change = AnalysisChange::new(); + change.set_packages(Default::default()); + change.set_roots(Default::default()); + analysis.apply_change(change); + + LanguageServerState { + sender, + request_queue: ReqQueue::default(), + config, + vfs: Arc::new(RwLock::new(Default::default())), + vfs_monitor, + vfs_monitor_receiver, + open_docs: FxHashSet::default(), + thread_pool: threadpool::ThreadPool::default(), + task_sender, + task_receiver, + analysis, + packages: Arc::new(Vec::new()), + shutdown_requested: false, + } + } + + /// Blocks until a new event is received from one of the many channels the language server + /// listens to. Returns the first event that is received. + fn next_event(&self, receiver: &Receiver) -> Option { + select! { + recv(receiver) -> msg => msg.ok().map(Event::Lsp), + recv(self.vfs_monitor_receiver) -> task => Some(Event::Vfs(task.unwrap())), + recv(self.task_receiver) -> task => Some(Event::Task(task.unwrap())) + } + } + + /// Runs the language server to completion + pub fn run(mut self, receiver: Receiver) -> anyhow::Result<()> { + // Start by updating the current workspace + self.fetch_workspaces(); + + while let Some(event) = self.next_event(&receiver) { + if let Event::Lsp(lsp_server::Message::Notification(notification)) = &event { + if notification.method == lsp_types::notification::Exit::METHOD { + return Ok(()); + } + } + self.handle_event(event)?; + } + + Ok(()) + } + + /// Handles an event from one of the many sources that the language server subscribes to. + fn handle_event(&mut self, event: Event) -> anyhow::Result<()> { + let start_time = Instant::now(); + log::info!("handling event: {:?}", event); + + // Process the incoming event + match event { + Event::Task(task) => self.handle_task(task)?, + Event::Lsp(msg) => match msg { + lsp_server::Message::Request(req) => self.on_request(req, start_time)?, + lsp_server::Message::Response(resp) => self.complete_request(resp), + lsp_server::Message::Notification(not) => self.on_notification(not)?, + }, + Event::Vfs(task) => self.handle_vfs_task(task)?, + }; + + // Process any changes to the vfs + let state_changed = self.process_vfs_changes(); + if state_changed { + let snapshot = self.snapshot(); + let task_sender = self.task_sender.clone(); + // Spawn the diagnostics in the threadpool + self.thread_pool.execute(move || { + let _result = handle_diagnostics(snapshot, task_sender); + }); + } + + Ok(()) + } + + /// Handles a task sent by another async task + fn handle_task(&mut self, task: Task) -> anyhow::Result<()> { + match task { + Task::Notify(notification) => { + self.send(notification.into()); + } + } + Ok(()) + } + + /// Handles a change to the underlying virtual file system. + fn handle_vfs_task(&mut self, mut task: vfs::MonitorMessage) -> anyhow::Result<()> { + loop { + match task { + vfs::MonitorMessage::Progress { total, done } => { + let progress_state = if done == 0 { + Progress::Begin + } else if done < total { + Progress::Report + } else { + Progress::End + }; + self.report_progress( + "projects scanned", + progress_state, + Some(format!("{}/{}", done, total)), + Some(Progress::fraction(done, total)), + ) + } + vfs::MonitorMessage::Loaded { files } => { + let vfs = &mut *self.vfs.write(); + for (path, contents) in files { + vfs.set_file_contents(&path, contents); + } + } + } + + // Coalesce many VFS events into a single loop turn + task = match self.vfs_monitor_receiver.try_recv() { + Ok(task) => task, + _ => break, + } + } + Ok(()) + } +} + +/// Sends all diagnostics of all files +fn handle_diagnostics(state: LanguageServerSnapshot, sender: Sender) -> anyhow::Result<()> { + // Iterate over all files + for (idx, _package) in state.packages.iter().enumerate() { + let package_id = hir::PackageId(idx as u32); + + // Get all the files + let files = state.analysis.package_source_files(package_id)?; + + // Publish all diagnostics + for file in files { + let line_index = state.analysis.file_line_index(file)?; + let uri = state.file_id_to_uri(file).unwrap(); + let diagnostics = state.analysis.diagnostics(file)?; + + let diagnostics = { + let mut lsp_diagnostics = Vec::with_capacity(diagnostics.len()); + for d in diagnostics { + lsp_diagnostics.push(lsp_types::Diagnostic { + range: convert_range(d.range, &line_index), + severity: Some(lsp_types::DiagnosticSeverity::Error), + code: None, + code_description: None, + source: Some("mun".to_string()), + message: d.message, + related_information: { + let mut annotations = + Vec::with_capacity(d.additional_annotations.len()); + for annotation in d.additional_annotations { + annotations.push(lsp_types::DiagnosticRelatedInformation { + location: lsp_types::Location { + uri: state + .file_id_to_uri(annotation.range.file_id) + .unwrap(), + range: convert_range( + annotation.range.value, + state + .analysis + .file_line_index(annotation.range.file_id)? + .deref(), + ), + }, + message: annotation.message, + }); + } + if annotations.is_empty() { + None + } else { + Some(annotations) + } + }, + tags: None, + data: None, + }); + } + lsp_diagnostics + }; + + sender + .send(Task::Notify(lsp_server::Notification { + method: PublishDiagnostics::METHOD.to_owned(), + params: to_json(PublishDiagnosticsParams { + uri, + diagnostics, + version: None, + }) + .unwrap(), + })) + .unwrap(); + } + } + Ok(()) +} + +impl LanguageServerState { + /// Creates a snapshot of the state + pub fn snapshot(&self) -> LanguageServerSnapshot { + LanguageServerSnapshot { + vfs: self.vfs.clone(), + analysis: self.analysis.snapshot(), + packages: self.packages.clone(), + } + } + + /// Processes any and all changes that have been applied to the virtual filesystem. Generates + /// an `AnalysisChange` and applies it if there are changes. True is returned if things changed, + /// otherwise false. + pub fn process_vfs_changes(&mut self) -> bool { + // Get all the changes since the last time we processed + let changed_files = { + let mut vfs = self.vfs.write(); + vfs.take_changes() + }; + if changed_files.is_empty() { + return false; + } + + // Construct an AnalysisChange to apply to the analysis + let vfs = self.vfs.read(); + let mut analysis_change = AnalysisChange::new(); + let mut has_created_or_deleted_entries = false; + for file in changed_files { + // If the file was deleted or created we have to remember that so that we update the + // source roots as well. + if file.is_created_or_deleted() { + has_created_or_deleted_entries = true; + } + + // Convert the contents of the file to a string + let bytes = vfs + .file_contents(file.file_id) + .map(Vec::from) + .unwrap_or_default(); + let text = match String::from_utf8(bytes).ok() { + Some(text) => Some(Arc::from(text)), + None => None, + }; + + // Notify the database about this change + analysis_change.change_file(hir::FileId(file.file_id.0), text); + } + + // If an entry was created or deleted we have to recreate all source roots + if has_created_or_deleted_entries { + analysis_change.set_roots(self.recompute_source_roots()); + } + + // Apply the change + self.analysis.apply_change(analysis_change); + true + } +} + +impl LanguageServerSnapshot { + /// Converts the specified `hir::FileId` to a `Url` + pub fn file_id_to_uri(&self, id: hir::FileId) -> anyhow::Result { + let vfs = self.vfs.read(); + let path = vfs.file_path(vfs::FileId(id.0)); + let url = url_from_path_with_drive_lowercasing(path)?; + + Ok(url) + } +} + +impl Drop for LanguageServerState { + fn drop(&mut self) { + self.analysis.request_cancelation(); + self.thread_pool.join(); + } +} diff --git a/crates/mun_language_server/src/state/protocol.rs b/crates/mun_language_server/src/state/protocol.rs new file mode 100644 index 000000000..7192439eb --- /dev/null +++ b/crates/mun_language_server/src/state/protocol.rs @@ -0,0 +1,165 @@ +use super::LanguageServerState; +use crate::{conversion::convert_uri, state::RequestHandler}; +use anyhow::Result; +use dispatcher::{NotificationDispatcher, RequestDispatcher}; +use lsp_types::notification::{ + DidChangeTextDocument, DidChangeWatchedFiles, DidCloseTextDocument, DidOpenTextDocument, +}; +use std::time::Instant; + +pub mod dispatcher; + +impl LanguageServerState { + /// Called when a `DidOpenTextDocument` notification was received. + fn on_did_open_text_document( + &mut self, + params: lsp_types::DidOpenTextDocumentParams, + ) -> anyhow::Result<()> { + if let Ok(path) = convert_uri(¶ms.text_document.uri) { + self.open_docs.insert(path.clone()); + self.vfs + .write() + .set_file_contents(&path, Some(params.text_document.text.into_bytes())); + } + Ok(()) + } + + /// Called when a `DidChangeTextDocument` notification was received. + fn on_did_change_text_document( + &mut self, + params: lsp_types::DidChangeTextDocumentParams, + ) -> anyhow::Result<()> { + let lsp_types::DidChangeTextDocumentParams { + text_document, + content_changes, + } = params; + if let Ok(path) = convert_uri(&text_document.uri) { + let new_content = content_changes.get(0).unwrap().text.clone(); + self.vfs + .write() + .set_file_contents(&path, Some(new_content.into_bytes())); + } + Ok(()) + } + + /// Called when a `DidCloseTextDocument` notification was received. + fn on_did_close_text_document( + &mut self, + params: lsp_types::DidCloseTextDocumentParams, + ) -> anyhow::Result<()> { + if let Ok(path) = convert_uri(¶ms.text_document.uri) { + self.open_docs.remove(&path); + self.vfs_monitor.reload(&path); + } + Ok(()) + } + + /// Called when a `DidChangeWatchedFiles` was received + fn on_did_change_watched_files( + &mut self, + params: lsp_types::DidChangeWatchedFilesParams, + ) -> anyhow::Result<()> { + for change in params.changes { + if let Ok(path) = convert_uri(&change.uri) { + self.vfs_monitor.reload(&path); + } + } + Ok(()) + } + + /// Handles a language server protocol request + pub(super) fn on_request( + &mut self, + request: lsp_server::Request, + request_received: Instant, + ) -> Result<()> { + self.register_request(&request, request_received); + + // If a shutdown was requested earlier, immediately respond with an error + if self.shutdown_requested { + self.respond(lsp_server::Response::new_err( + request.id, + lsp_server::ErrorCode::InvalidRequest as i32, + "shutdown was requested".to_owned(), + )); + return Ok(()); + } + + // Dispatch the event based on the type of event + RequestDispatcher::new(self, request) + .on::(|state, _request| { + state.shutdown_requested = true; + Ok(()) + })? + .finish(); + + Ok(()) + } + + /// Handles a notification from the language server client + pub(super) fn on_notification(&mut self, notification: lsp_server::Notification) -> Result<()> { + NotificationDispatcher::new(self, notification) + .on::(LanguageServerState::on_did_open_text_document)? + .on::(LanguageServerState::on_did_change_text_document)? + .on::(LanguageServerState::on_did_close_text_document)? + .on::(LanguageServerState::on_did_change_watched_files)? + .finish(); + Ok(()) + } + + /// Registers a request with the server. We register all these request to make sure they all get + /// handled and so we can measure the time it takes for them to complete from the point of view + /// of the client. + fn register_request(&mut self, request: &lsp_server::Request, request_received: Instant) { + self.request_queue.incoming.register( + request.id.clone(), + (request.method.clone(), request_received), + ) + } + + /// Sends a request to the client and registers the request so that we can handle the response. + pub(crate) fn send_request( + &mut self, + params: R::Params, + handler: RequestHandler, + ) { + let request = self + .request_queue + .outgoing + .register(R::METHOD.to_string(), params, handler); + self.send(request.into()); + } + + /// Sends a notification to the client + pub(crate) fn send_notification( + &mut self, + params: N::Params, + ) { + let not = lsp_server::Notification::new(N::METHOD.to_string(), params); + self.send(not.into()); + } + + /// Handles a response to a request we made. The response gets forwarded to where we made the + /// request from. + pub(super) fn complete_request(&mut self, response: lsp_server::Response) { + let handler = self.request_queue.outgoing.complete(response.id.clone()); + handler(self, response) + } + + /// Sends a response to the client. This method logs the time it took us to reply + /// to a request from the client. + fn respond(&mut self, response: lsp_server::Response) { + if let Some((_method, start)) = self.request_queue.incoming.complete(response.id.clone()) { + let duration = start.elapsed(); + log::info!("handled req#{} in {:?}", response.id, duration); + self.send(response.into()); + } + } + + /// Sends a message to the client + pub(crate) fn send(&mut self, message: lsp_server::Message) { + self.sender + .send(message) + .expect("error sending lsp message to the outgoing channel") + } +} diff --git a/crates/mun_language_server/src/state/protocol/dispatcher.rs b/crates/mun_language_server/src/state/protocol/dispatcher.rs new file mode 100644 index 000000000..f2b53d2ff --- /dev/null +++ b/crates/mun_language_server/src/state/protocol/dispatcher.rs @@ -0,0 +1,164 @@ +use super::LanguageServerState; +use crate::cancelation::is_canceled; +use crate::from_json; +use anyhow::Result; +use serde::de::DeserializeOwned; +use serde::Serialize; + +/// A helper struct to ergonomically dispatch LSP requests to functions. +pub(crate) struct RequestDispatcher<'a> { + state: &'a mut LanguageServerState, + request: Option, +} + +impl<'a> RequestDispatcher<'a> { + /// Constructs a new dispatcher for the specified request + pub fn new(state: &'a mut LanguageServerState, request: lsp_server::Request) -> Self { + RequestDispatcher { + state, + request: Some(request), + } + } + + /// Try to dispatch the event as the given Request type. + pub fn on( + &mut self, + f: fn(&mut LanguageServerState, R::Params) -> Result, + ) -> Result<&mut Self> + where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + 'static, + R::Result: Serialize + 'static, + { + let (id, params) = match self.parse::() { + Some(it) => it, + None => return Ok(self), + }; + + let result = f(self.state, params); + let response = result_to_response::(id, result); + self.state.respond(response); + Ok(self) + } + + /// Tries to parse the request as the specified type. If the request is of the specified type, + /// the request is transferred and any subsequent call to this method will return None. If an + /// error is encountered during parsing of the request parameters an error is send to the + /// client. + fn parse(&mut self) -> Option<(lsp_server::RequestId, R::Params)> + where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + 'static, + { + let req = match &self.request { + Some(req) if req.method == R::METHOD => self.request.take().unwrap(), + _ => return None, + }; + + match from_json(R::METHOD, req.params) { + Ok(params) => Some((req.id, params)), + Err(err) => { + let response = lsp_server::Response::new_err( + req.id, + lsp_server::ErrorCode::InvalidParams as i32, + err.to_string(), + ); + self.state.respond(response); + None + } + } + } + + /// Wraps-up the dispatcher. If the request was not handled, report back that this is an + /// unknown request. + pub fn finish(&mut self) { + if let Some(req) = self.request.take() { + log::error!("unknown request: {:?}", req); + let response = lsp_server::Response::new_err( + req.id, + lsp_server::ErrorCode::MethodNotFound as i32, + "unknown request".to_string(), + ); + self.state.respond(response); + } + } +} + +pub(crate) struct NotificationDispatcher<'a> { + state: &'a mut LanguageServerState, + notification: Option, +} + +impl<'a> NotificationDispatcher<'a> { + /// Constructs a new dispatcher for the specified request + pub fn new(state: &'a mut LanguageServerState, notification: lsp_server::Notification) -> Self { + NotificationDispatcher { + state, + notification: Some(notification), + } + } + + /// Try to dispatch the event as the given Notification type. + pub fn on( + &mut self, + f: fn(&mut LanguageServerState, N::Params) -> Result<()>, + ) -> Result<&mut Self> + where + N: lsp_types::notification::Notification + 'static, + N::Params: DeserializeOwned + Send + 'static, + { + let notification = match self.notification.take() { + Some(it) => it, + None => return Ok(self), + }; + let params = match notification.extract::(N::METHOD) { + Ok(it) => it, + Err(notification) => { + self.notification = Some(notification); + return Ok(self); + } + }; + f(self.state, params)?; + Ok(self) + } + + /// Wraps-up the dispatcher. If the notification was not handled, log an error. + pub fn finish(&mut self) { + if let Some(notification) = &self.notification { + if !notification.method.starts_with("$/") { + log::error!("unhandled notification: {:?}", notification); + } + } + } +} + +/// Converts the specified results of an LSP request into an LSP response handling any errors that +/// may have occurred. +fn result_to_response( + id: lsp_server::RequestId, + result: Result, +) -> lsp_server::Response +where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + 'static, + R::Result: Serialize + 'static, +{ + match result { + Ok(resp) => lsp_server::Response::new_ok(id, &resp), + Err(e) => { + if is_canceled(&*e) { + lsp_server::Response::new_err( + id, + lsp_server::ErrorCode::ContentModified as i32, + "content modified".to_string(), + ) + } else { + lsp_server::Response::new_err( + id, + lsp_server::ErrorCode::InternalError as i32, + e.to_string(), + ) + } + } + } +} diff --git a/crates/mun_language_server/src/state/utils.rs b/crates/mun_language_server/src/state/utils.rs new file mode 100644 index 000000000..449721b39 --- /dev/null +++ b/crates/mun_language_server/src/state/utils.rs @@ -0,0 +1,74 @@ +use super::LanguageServerState; + +#[derive(Debug, Eq, PartialEq)] +pub(crate) enum Progress { + Begin, + Report, + End, +} + +impl Progress { + /// Builds a fractional progress value + pub(crate) fn fraction(done: usize, total: usize) -> f64 { + assert!(done <= total); + done as f64 / total.max(1) as f64 + } +} + +impl LanguageServerState { + /// Sends a notification to the client to display the specified message to the user. + pub(crate) fn show_message(&mut self, typ: lsp_types::MessageType, message: impl AsRef) { + let message = message.as_ref().to_owned(); + self.send_notification::( + lsp_types::ShowMessageParams { typ, message }, + ) + } + + /// Reports progress to the user via the `WorkDoneProgress` protocol. + pub(crate) fn report_progress( + &mut self, + title: &str, + state: Progress, + message: Option, + fraction: Option, + ) { + // TODO: Ensure that the client supports WorkDoneProgress + + let percentage = fraction.map(|f| { + (0.0..=1.0).contains(&f); + (f * 100.0) as u32 + }); + let token = lsp_types::ProgressToken::String(format!("mun/{}", title)); + let work_done_progress = match state { + Progress::Begin => { + self.send_request::( + lsp_types::WorkDoneProgressCreateParams { + token: token.clone(), + }, + |_, _| (), + ); + + lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin { + title: title.into(), + cancellable: None, + message, + percentage, + }) + } + Progress::Report => { + lsp_types::WorkDoneProgress::Report(lsp_types::WorkDoneProgressReport { + cancellable: None, + message, + percentage, + }) + } + Progress::End => { + lsp_types::WorkDoneProgress::End(lsp_types::WorkDoneProgressEnd { message }) + } + }; + self.send_notification::(lsp_types::ProgressParams { + token, + value: lsp_types::ProgressParamsValue::WorkDone(work_done_progress), + }); + } +} diff --git a/crates/mun_language_server/src/workspace.rs b/crates/mun_language_server/src/state/workspace.rs similarity index 92% rename from crates/mun_language_server/src/workspace.rs rename to crates/mun_language_server/src/state/workspace.rs index 454b4af42..9aa264552 100644 --- a/crates/mun_language_server/src/workspace.rs +++ b/crates/mun_language_server/src/state/workspace.rs @@ -1,4 +1,5 @@ -use crate::{change::AnalysisChange, config::FilesWatcher, main_loop::LanguageServerState}; +use super::LanguageServerState; +use crate::{change::AnalysisChange, config::FilesWatcher}; use paths::{AbsPathBuf, RelativePath}; use std::{ convert::{TryFrom, TryInto}, @@ -12,13 +13,16 @@ impl LanguageServerState { let packages = self .config .discovered_projects - .as_ref() + .clone() .into_iter() .flatten() .filter_map(|project| match project::Package::from_file(&project.path) { Ok(package) => Some(package), - Err(_) => { - // TODO: Show error + Err(err) => { + self.show_message( + lsp_types::MessageType::Error, + format!("mun failed to load package: {:#}", err), + ); None } }) @@ -51,6 +55,7 @@ impl LanguageServerState { lsp_types::RegistrationParams { registrations: vec![registration], }, + |_, _| {}, ); } @@ -114,7 +119,7 @@ impl LanguageServerState { // Iterate over all files and find to which source directory they belong, including their // relative path - let vfs = &*async_std::task::block_on(self.vfs.read()); + let vfs = &*self.vfs.read(); for (file_id, path) in vfs.iter() { if let Some((idx, relative_path)) = source_dirs diff --git a/crates/mun_language_server/tests/initialization.rs b/crates/mun_language_server/tests/initialization.rs index eccb5e64c..722a775d2 100644 --- a/crates/mun_language_server/tests/initialization.rs +++ b/crates/mun_language_server/tests/initialization.rs @@ -1,6 +1,22 @@ mod support; +use support::Project; + #[test] fn test_server() { - let _server = support::Server::new(); + let _server = Project::with_fixture( + r#" +//- /mun.toml +[package] +name = "foo" +version = "0.0.0" + +//- /src/mod.mun +fn add(a: i32, b: i32) -> i32 { + a + b +} +"#, + ) + .server() + .wait_until_workspace_is_loaded(); } diff --git a/crates/mun_language_server/tests/support.rs b/crates/mun_language_server/tests/support.rs index ca716aa3c..94b8faa58 100644 --- a/crates/mun_language_server/tests/support.rs +++ b/crates/mun_language_server/tests/support.rs @@ -1,96 +1,177 @@ -use async_std::future::timeout; -use futures::{SinkExt, StreamExt}; -use lsp_types::{notification::Exit, request::Shutdown}; -use mun_language_server::protocol::{Connection, Message, Notification, Request}; -use mun_language_server::{main_loop, Config}; +use crossbeam_channel::{after, select}; +use lsp_server::{Connection, Message, Notification, Request}; +use lsp_types::{ + notification::Exit, request::Shutdown, ProgressParams, ProgressParamsValue, WorkDoneProgress, +}; +use mun_language_server::{main_loop, Config, FilesWatcher}; +use mun_test::Fixture; use paths::AbsPathBuf; +use project::ProjectManifest; use serde::Serialize; use serde_json::Value; -use std::convert::TryFrom; -use std::time::Duration; +use std::{ + cell::{Cell, RefCell}, + convert::TryInto, + fs, + time::Duration, +}; + +/// A `Project` represents a project that a language server can work with. Call the [`server`] +/// method to instantiate a language server that will serve information about the project. +pub struct Project<'a> { + fixture: &'a str, + tmp_dir: Option, +} + +impl<'a> Project<'a> { + /// Constructs a project from a fixture. + pub fn with_fixture(fixture: &str) -> Project { + Project { + fixture, + tmp_dir: None, + } + } + + /// Instantiates a language server for this project. + pub fn server(self) -> Server { + // Get or create a temporary directory + let tmp_dir = self + .tmp_dir + .unwrap_or_else(|| tempdir::TempDir::new("testdir").unwrap()); + + // Write all fixtures to a folder + for entry in Fixture::parse(self.fixture) { + let path = entry.relative_path.to_path(tmp_dir.path()); + fs::create_dir_all(path.parent().unwrap()).unwrap(); + fs::write(path.as_path(), entry.text.as_bytes()).unwrap(); + } + + let tmp_dir_path: AbsPathBuf = tmp_dir + .path() + .to_path_buf() + .try_into() + .expect("could not convert temp dir to absolute path"); + let roots = vec![tmp_dir_path.clone()]; + + let discovered_projects = ProjectManifest::discover_all(roots.into_iter()); + + // Construct a default configuration for the server + let config = Config { + discovered_projects: Some(discovered_projects), + watcher: FilesWatcher::Client, + ..Config::new(tmp_dir_path) + }; + + // TODO: Provide the ability to modify the configuration externally + + Server::new(tmp_dir, config) + } +} /// An object that runs the language server main loop and enables sending and receiving messages /// to and from it. pub struct Server { - next_request_id: u64, + next_request_id: Cell, + messages: RefCell>, worker: Option>, client: Connection, - _temp_path: tempdir::TempDir, + _tmp_dir: tempdir::TempDir, } impl Server { /// Constructs and initializes a new `Server` - pub fn new() -> Self { + pub fn new(tmp_dir: tempdir::TempDir, config: Config) -> Self { let (connection, client) = Connection::memory(); - let temp_path = tempdir::TempDir::new("mun_language_server") - .expect("unable to create temporary directory"); - - let config = Config::new( - AbsPathBuf::try_from(temp_path.path().to_path_buf()) - .expect("temp_path is not an absolute path"), - ); let worker = std::thread::spawn(move || { - async_std::task::block_on(async move { - main_loop(connection, config).await.unwrap(); - }) + main_loop(connection, config).unwrap(); }); Self { - next_request_id: Default::default(), + next_request_id: Cell::new(1), + messages: RefCell::new(Vec::new()), worker: Some(worker), client, - _temp_path: temp_path, + _tmp_dir: tmp_dir, + } + } + + /// Waits until all projects in the workspace have been loaded + pub fn wait_until_workspace_is_loaded(self) -> Server { + self.wait_for_message_cond(1, &|msg: &Message| match msg { + Message::Notification(n) if n.method == "$/progress" => { + match n.clone().extract::("$/progress").unwrap() { + ProgressParams { + token: lsp_types::ProgressToken::String(ref token), + value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(_)), + } if token == "mun/projects scanned" => true, + _ => false, + } + } + _ => false, + }); + self + } + + /// A function to wait for a specific message to arrive + fn wait_for_message_cond(&self, n: usize, cond: &dyn Fn(&Message) -> bool) { + let mut total = 0; + for msg in self.messages.borrow().iter() { + if cond(msg) { + total += 1 + } + } + while total < n { + let msg = self.recv().expect("no response"); + if cond(&msg) { + total += 1; + } } } /// Sends a request to the main loop and expects the specified value to be returned - async fn assert_request( + fn assert_request( &mut self, params: R::Params, expected_response: Value, ) where R::Params: Serialize, { - let result = self.send_request::(params).await; + let result = self.send_request::(params); assert_eq!(result, expected_response); } /// Sends a request to main loop, returning the response - async fn send_request(&mut self, params: R::Params) -> Value + fn send_request(&self, params: R::Params) -> Value where R::Params: Serialize, { - let id = self.next_request_id; - self.next_request_id += 1; + let id = self.next_request_id.get(); + self.next_request_id.set(id.wrapping_add(1)); let r = Request::new(id.into(), R::METHOD.to_string(), params); - self.send_and_receive(r).await + self.send_and_receive(r) } /// Sends an LSP notification to the main loop. - async fn notification(&mut self, params: N::Params) + fn notification(&self, params: N::Params) where N::Params: Serialize, { let r = Notification::new(N::METHOD.to_string(), params); - self.send_notification(r).await + self.send_notification(r) } /// Sends a server notification to the main loop - async fn send_notification(&mut self, not: Notification) { - self.client - .sender - .send(Message::Notification(not)) - .await - .unwrap(); + fn send_notification(&self, not: Notification) { + self.client.sender.send(Message::Notification(not)).unwrap(); } /// Sends a request to the main loop and receives its response - async fn send_and_receive(&mut self, r: Request) -> Value { + fn send_and_receive(&self, r: Request) -> Value { let id = r.id.clone(); - self.client.sender.send(r.into()).await.unwrap(); - while let Some(msg) = self.recv().await { + self.client.sender.send(r.into()).unwrap(); + while let Some(msg) = self.recv() { match msg { Message::Request(req) => panic!( "did not expect a request as a response to a request: {:?}", @@ -113,26 +194,28 @@ impl Server { } /// Receives a message from the message or timeout. - async fn recv(&mut self) -> Option { - let duration = Duration::from_secs(60); - timeout(duration, self.client.receiver.next()) - .await - .unwrap() + fn recv(&self) -> Option { + let timeout = Duration::from_secs(120); + let msg = select! { + recv(self.client.receiver) -> msg => msg.ok(), + recv(after(timeout)) -> _ => panic!("timed out"), + }; + if let Some(ref msg) = msg { + self.messages.borrow_mut().push(msg.clone()); + } + msg } } impl Drop for Server { fn drop(&mut self) { - // Send a shutdown request - async_std::task::block_on(async { - // Send the proper shutdown sequence to ensure the main loop terminates properly - self.assert_request::((), Value::Null).await; - self.notification::(()).await; - - // Cancel the main_loop - if let Some(worker) = self.worker.take() { - worker.join().unwrap(); - } - }); + // Send the proper shutdown sequence to ensure the main loop terminates properly + self.assert_request::((), Value::Null); + self.notification::(()); + + // Cancel the main_loop + if let Some(worker) = self.worker.take() { + worker.join().unwrap(); + } } } diff --git a/crates/mun_project/Cargo.toml b/crates/mun_project/Cargo.toml index ea030f650..ba3aa35eb 100644 --- a/crates/mun_project/Cargo.toml +++ b/crates/mun_project/Cargo.toml @@ -5,8 +5,10 @@ authors = ["The Mun Team "] edition = "2018" [dependencies] +rustc-hash = "1.1.0" serde = "1.0" serde_derive = "1.0" toml = "0.5" semver = { version = "0.10", features = ["serde"] } anyhow = "1.0" +paths = { path="../mun_paths", package="mun_paths" } diff --git a/crates/mun_project/src/lib.rs b/crates/mun_project/src/lib.rs index a32333474..8b92dc667 100644 --- a/crates/mun_project/src/lib.rs +++ b/crates/mun_project/src/lib.rs @@ -1,7 +1,9 @@ -mod manifest; -mod package; - pub use manifest::{Manifest, ManifestMetadata, PackageId}; pub use package::Package; +pub use project_manifest::ProjectManifest; + +mod manifest; +mod package; +mod project_manifest; pub const MANIFEST_FILENAME: &str = "mun.toml"; diff --git a/crates/mun_language_server/src/project_manifest.rs b/crates/mun_project/src/project_manifest.rs similarity index 88% rename from crates/mun_language_server/src/project_manifest.rs rename to crates/mun_project/src/project_manifest.rs index 316574e22..097e3a9ea 100644 --- a/crates/mun_language_server/src/project_manifest.rs +++ b/crates/mun_project/src/project_manifest.rs @@ -1,9 +1,8 @@ +use crate::MANIFEST_FILENAME; use anyhow::bail; use paths::{AbsPath, AbsPathBuf}; use rustc_hash::FxHashSet; -use std::convert::TryFrom; -use std::fs::read_dir; -use std::io; +use std::{convert::TryFrom, fs::read_dir, io}; /// A wrapper around a path to a mun project #[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)] @@ -15,14 +14,14 @@ impl ProjectManifest { /// Constructs a new [`ProjectManifest`] from a path pub fn from_manifest_path(path: impl AsRef) -> anyhow::Result { let path = path.as_ref(); - if path.ends_with(project::MANIFEST_FILENAME) { + if path.ends_with(MANIFEST_FILENAME) { Ok(Self { path: path.to_path_buf(), }) } else { bail!( "project root must point to {}: {}", - project::MANIFEST_FILENAME, + MANIFEST_FILENAME, path.display() ); } @@ -37,7 +36,7 @@ impl ProjectManifest { path.is_file() && path .file_name() - .map(|file_name| file_name == project::MANIFEST_FILENAME) + .map(|file_name| file_name == MANIFEST_FILENAME) .unwrap_or(false) }) .map(|path| ProjectManifest { diff --git a/crates/mun_vfs/src/monitor.rs b/crates/mun_vfs/src/monitor.rs index 040c9f190..73eb09fc9 100644 --- a/crates/mun_vfs/src/monitor.rs +++ b/crates/mun_vfs/src/monitor.rs @@ -5,6 +5,7 @@ mod notify_monitor; pub use notify_monitor::NotifyMonitor; use crate::{AbsPath, AbsPathBuf}; +use std::fmt; /// Describes something to be monitored by a `Monitor`. #[derive(Debug, Clone)] @@ -46,7 +47,6 @@ pub struct MonitorConfig { } /// A message that might be communicated from a [`Monitor`] -#[derive(Debug)] pub enum MonitorMessage { /// A message that indicates the progress status of the monitor Progress { total: usize, done: usize }, @@ -153,6 +153,22 @@ impl MonitorEntry { } } +impl fmt::Debug for MonitorMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + MonitorMessage::Loaded { files } => f + .debug_struct("Loaded") + .field("files", &files.len()) + .finish(), + MonitorMessage::Progress { total, done } => f + .debug_struct("Progress") + .field("total", total) + .field("done", done) + .finish(), + } + } +} + #[cfg(test)] mod tests { use super::{AbsPathBuf, Monitor, MonitorDirectories};