-
Notifications
You must be signed in to change notification settings - Fork 281
/
language_server.py
71 lines (55 loc) · 2.38 KB
/
language_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# Copyright 2017 Palantir Technologies, Inc.
import logging
import socketserver
from .jsonrpc import JSONRPCServer
from .workspace import Workspace
log = logging.getLogger(__name__)
def start_tcp_lang_server(bind_addr, port, handler_class):
if not issubclass(handler_class, LanguageServer):
raise ValueError("Handler class must be a subclass of LanguageServer")
server = socketserver.ThreadingTCPServer((bind_addr, port), handler_class)
try:
log.info("Serving %s on (%s, %s)", handler_class.__name__, bind_addr, port)
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()
finally:
log.info("Shutting down")
server.server_close()
def start_io_lang_server(rfile, wfile, handler_class):
if not issubclass(handler_class, LanguageServer):
raise ValueError("Handler class must be a subclass of LanguageServer")
log.info("Starting %s IO language server", handler_class.__name__)
server = handler_class(rfile, wfile)
server.handle()
class LanguageServer(JSONRPCServer):
""" Implementation of the Microsoft VSCode Language Server Protocol
https://github.com/Microsoft/language-server-protocol/blob/master/versions/protocol-1-x.md
"""
process_id = None
workspace = None
init_opts = None
M_PUBLISH_DIAGNOSTICS = 'textDocument/publishDiagnostics'
def capabilities(self):
return {}
def publish_diagnostics(self, uri, diagnostics):
log.debug("Publishing diagnostics: %s", diagnostics)
params = {'uri': uri, 'diagnostics': diagnostics}
self.call(self.M_PUBLISH_DIAGNOSTICS, params)
def m_initialize(self, **kwargs):
log.debug("Language server intialized with %s", kwargs)
self.process_id = kwargs.get('processId')
self.workspace = Workspace(kwargs.get('rootPath'))
self.init_opts = kwargs.get('initializationOptions')
# Get our capabilities
return {'capabilities': self.capabilities()}
def m___cancel_request(self, **kwargs):
# TODO: We could I suppose launch tasks in their own threads and kill
# them on cancel, but is it really worth the effort given most methods
# are reasonably quick?
# This tends to happen when cancelling a hover request
pass
def m_shutdown(self, **kwargs):
self.shutdown()
def m_exit(self, **kwargs):
self.shutdown()