Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
175 additions
and
144 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,121 +1,73 @@ | ||
import sublime | ||
|
||
from .logging import debug | ||
from .url import uri_to_filename | ||
from .protocol import Diagnostic | ||
from .events import global_events | ||
from .views import range_to_region | ||
from .windows import WindowLike, ViewLike | ||
|
||
assert Diagnostic | ||
|
||
try: | ||
import sublime | ||
from typing import Any, List, Dict, Tuple, Callable, Optional | ||
assert sublime | ||
assert Any and List and Dict and Tuple and Callable and Optional | ||
assert ViewLike and WindowLike | ||
except ImportError: | ||
pass | ||
|
||
|
||
global_diagnostics = dict( | ||
) # type: Dict[int, Dict[str, Dict[str, List[Diagnostic]]]] | ||
|
||
|
||
def update_file_diagnostics(window: sublime.Window, file_path: str, source: str, | ||
diagnostics: 'List[Diagnostic]') -> bool: | ||
updated = False | ||
if diagnostics: | ||
file_diagnostics = global_diagnostics.setdefault(window.id(), dict()).setdefault( | ||
file_path, dict()) | ||
file_diagnostics[source] = diagnostics | ||
updated = True | ||
else: | ||
if window.id() in global_diagnostics: | ||
window_diagnostics = global_diagnostics[window.id()] | ||
if file_path in window_diagnostics: | ||
if source in window_diagnostics[file_path]: | ||
updated = True | ||
del window_diagnostics[file_path][source] | ||
if not window_diagnostics[file_path]: | ||
del window_diagnostics[file_path] | ||
return updated | ||
|
||
|
||
class DiagnosticsUpdate(object): | ||
def __init__(self, window: sublime.Window, client_name: str, | ||
def __init__(self, window, client_name: str, | ||
file_path: str, diagnostics: 'List[Diagnostic]') -> 'None': | ||
self.window = window | ||
self.client_name = client_name | ||
self.file_path = file_path | ||
self.diagnostics = diagnostics | ||
|
||
|
||
def handle_client_diagnostics(window: sublime.Window, client_name: str, update: dict): | ||
maybe_file_uri = update.get('uri') | ||
if maybe_file_uri is not None: | ||
file_path = uri_to_filename(maybe_file_uri) | ||
|
||
diagnostics = list( | ||
Diagnostic.from_lsp(item) for item in update.get('diagnostics', [])) | ||
|
||
if update_file_diagnostics(window, file_path, client_name, diagnostics): | ||
global_events.publish("document.diagnostics", | ||
DiagnosticsUpdate(window, client_name, file_path, diagnostics)) | ||
else: | ||
debug('missing uri in diagnostics update') | ||
# TODO: expose updates to features | ||
|
||
class WindowDiagnostics(object): | ||
|
||
def remove_diagnostics(view: sublime.View, client_name: str): | ||
"""Removes diagnostics for a file | ||
""" | ||
window = view.window() or sublime.active_window() | ||
def __init__(self): | ||
self._diagnostics = {} # type: Dict[str, Dict[str, List[Diagnostic]]] | ||
self._on_updated = None # type: Optional[Callable] | ||
|
||
file_path = view.file_name() | ||
if file_path: | ||
if update_file_diagnostics(window, file_path, client_name, []): | ||
global_events.publish("document.diagnostics", DiagnosticsUpdate(window, client_name, file_path, [])) | ||
def get(self) -> 'Dict[str, Dict[str, List[Diagnostic]]]': | ||
return self._diagnostics | ||
|
||
def set_on_updated(self, update_handler: 'Callable'): | ||
self._on_updated = update_handler | ||
|
||
class GlobalDiagnostics(object): | ||
def update(self, window: 'Any', client_name: str, update: dict): | ||
handle_client_diagnostics(window, client_name, update) | ||
def get_by_path(self, file_path: str) -> 'List[Diagnostic]': | ||
view_diagnostics = [] | ||
if file_path in self._diagnostics: | ||
for origin in self._diagnostics[file_path]: | ||
view_diagnostics.extend(self._diagnostics[file_path][origin]) | ||
return view_diagnostics | ||
|
||
def remove(self, view: 'Any', client_name: str): | ||
"""Removes diagnostics for a file if no views exist for it | ||
""" | ||
remove_diagnostics(view, client_name) | ||
|
||
|
||
def get_line_diagnostics(view, point): | ||
row, _ = view.rowcol(point) | ||
diagnostics = get_diagnostics_for_view(view) | ||
return tuple( | ||
diagnostic for diagnostic in diagnostics | ||
if diagnostic.range.start.row <= row <= diagnostic.range.end.row | ||
) | ||
|
||
|
||
def get_point_diagnostics(view, point): | ||
diagnostics = get_diagnostics_for_view(view) | ||
return tuple( | ||
diagnostic for diagnostic in diagnostics | ||
if range_to_region(diagnostic.range, view).contains(point) | ||
) | ||
|
||
|
||
def get_window_diagnostics(window: sublime.Window) -> 'Optional[Dict[str, Dict[str, List[Diagnostic]]]]': | ||
return global_diagnostics.get(window.id()) | ||
|
||
|
||
def get_diagnostics_for_view(view: sublime.View) -> 'List[Diagnostic]': | ||
view_diagnostics = [] | ||
window = view.window() | ||
file_path = view.file_name() | ||
if file_path and window: | ||
if window.id() in global_diagnostics: | ||
file_diagnostics = global_diagnostics[window.id()] | ||
if file_path in file_diagnostics: | ||
for origin in file_diagnostics[file_path]: | ||
view_diagnostics.extend(file_diagnostics[file_path][origin]) | ||
return view_diagnostics | ||
def update(self, file_path: str, client_name: str, diagnostics: 'List[Diagnostic]') -> bool: | ||
updated = False | ||
if diagnostics: | ||
file_diagnostics = self._diagnostics.setdefault(file_path, dict()) | ||
file_diagnostics[client_name] = diagnostics | ||
updated = True | ||
else: | ||
if file_path in self._diagnostics: | ||
if client_name in self._diagnostics[file_path]: | ||
updated = True | ||
del self._diagnostics[file_path][client_name] | ||
if not self._diagnostics[file_path]: | ||
del self._diagnostics[file_path] | ||
return updated | ||
|
||
def handle_client_diagnostics(self, client_name: str, update: dict): | ||
maybe_file_uri = update.get('uri') | ||
if maybe_file_uri is not None: | ||
file_path = uri_to_filename(maybe_file_uri) | ||
|
||
diagnostics = list( | ||
Diagnostic.from_lsp(item) for item in update.get('diagnostics', [])) | ||
|
||
if self.update(file_path, client_name, diagnostics): | ||
if self._on_updated: | ||
self._on_updated(file_path, client_name, diagnostics) | ||
else: | ||
debug('missing uri in diagnostics update') | ||
|
||
def remove(self, file_path: str, client_name: str): | ||
self.update(file_path, client_name, []) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import unittest | ||
from .diagnostics import WindowDiagnostics | ||
from .protocol import Diagnostic, Range, Point | ||
# from .configurations import WindowConfigManager, _merge_dicts, ConfigManager, is_supported_syntax | ||
# from .test_session import test_config, test_language | ||
from .test_protocol import LSP_MINIMAL_DIAGNOSTIC | ||
|
||
|
||
class WindowDiagnosticsTest(unittest.TestCase): | ||
|
||
def test_empty_diagnostics(self): | ||
wd = WindowDiagnostics() | ||
self.assertEqual(wd.get_by_path(__file__), []) | ||
|
||
# todo: remove | ||
|
||
def test_updated_diagnostics(self): | ||
wd = WindowDiagnostics() | ||
|
||
test_file_path = "test.py" | ||
diag = Diagnostic('message', Range(Point(0, 0), Point(1, 1)), 1, None, dict()) | ||
|
||
wd.update(test_file_path, "test_server", [diag]) | ||
view_diags = wd.get_by_path(test_file_path) | ||
self.assertEqual(len(view_diags), 1) | ||
self.assertEqual(view_diags[0], diag) | ||
|
||
wd.update(test_file_path, "test_server", []) | ||
view_diags = wd.get_by_path(test_file_path) | ||
self.assertEqual(len(view_diags), 0) | ||
|
||
def test_handle_diagnostics_update(self): | ||
wd = WindowDiagnostics() | ||
|
||
test_file_path = "/test.py" | ||
update = { | ||
'uri': 'file:///test.py', | ||
'diagnostics': [LSP_MINIMAL_DIAGNOSTIC] | ||
} | ||
|
||
wd.handle_client_diagnostics("test_server", update) | ||
|
||
view_diags = wd.get_by_path(test_file_path) | ||
self.assertEqual(len(view_diags), 1) | ||
|
||
def test_remove_diagnostics(self): | ||
wd = WindowDiagnostics() | ||
|
||
test_file_path = "test.py" | ||
diag = Diagnostic('message', Range(Point(0, 0), Point(1, 1)), 1, None, dict()) | ||
|
||
wd.update(test_file_path, "test_server", [diag]) | ||
view_diags = wd.get_by_path(test_file_path) | ||
self.assertEqual(len(view_diags), 1) | ||
|
||
wd.remove(test_file_path, "test_server") | ||
|
||
view_diags = wd.get_by_path(test_file_path) | ||
self.assertEqual(len(view_diags), 0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.