Skip to content

Commit

Permalink
Use asyncio for jsonrpc requests
Browse files Browse the repository at this point in the history
 - use jsonrpcclient/jsonrpcserver in place of jsonrpclib
 - all commands are async
 - start and stop the asyncio event loop from the main script.
 - the daemon runs in the main thread
  • Loading branch information
ecdsa committed Aug 18, 2019
1 parent 05a271c commit 865895d
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 359 deletions.
189 changes: 96 additions & 93 deletions electrum/commands.py

Large diffs are not rendered by default.

192 changes: 114 additions & 78 deletions electrum/daemon.py
Expand Up @@ -30,18 +30,16 @@
import sys
import threading
from typing import Dict, Optional, Tuple

import aiohttp
from aiohttp import web
from jsonrpcserver import async_dispatch as dispatch
from jsonrpcserver.methods import Methods
from base64 import b64decode

import jsonrpclib
import jsonrpcclient, jsonrpcserver
from jsonrpcclient.clients.aiohttp_client import AiohttpClient

from .jsonrpc import PasswordProtectedJSONRPCServer
from .version import ELECTRUM_VERSION
from .network import Network
from .util import (json_decode, DaemonThread, to_string,
create_and_start_event_loop, profiler, standardize_path)
from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare)
from .wallet import Wallet, Abstract_Wallet
from .storage import WalletStorage
from .commands import known_commands, Commands
Expand Down Expand Up @@ -84,29 +82,31 @@ def get_file_descriptor(config: SimpleConfig):
remove_lockfile(lockfile)


def request(config: SimpleConfig, endpoint, *args, **kwargs):

def request(config: SimpleConfig, endpoint, args=(), timeout=60):
lockfile = get_lockfile(config)
while True:
create_time = None
try:
with open(lockfile) as f:
(host, port), create_time = ast.literal_eval(f.read())
rpc_user, rpc_password = get_rpc_credentials(config)
if rpc_password == '':
# authentication disabled
server_url = 'http://%s:%d' % (host, port)
else:
server_url = 'http://%s:%s@%s:%d' % (
rpc_user, rpc_password, host, port)
except Exception:
raise DaemonNotRunning()
server = jsonrpclib.Server(server_url)
rpc_user, rpc_password = get_rpc_credentials(config)
server_url = 'http://%s:%d' % (host, port)
auth = aiohttp.BasicAuth(login=rpc_user, password=rpc_password)
loop = asyncio.get_event_loop()
async def request_coroutine():
async with aiohttp.ClientSession(auth=auth, loop=loop) as session:
server = AiohttpClient(session, server_url)
f = getattr(server, endpoint)
response = await f(*args)
return response.data.result
try:
# run request
f = getattr(server, endpoint)
return f(*args, **kwargs)
except ConnectionRefusedError:
_logger.info(f"failed to connect to JSON-RPC server")
fut = asyncio.run_coroutine_threadsafe(request_coroutine(), loop)
return fut.result(timeout=timeout)
except aiohttp.client_exceptions.ClientConnectorError as e:
_logger.info(f"failed to connect to JSON-RPC server {e}")
if not create_time or create_time < time.time() - 1.0:
raise DaemonNotRunning()
# Sleep a bit and try again; it might have just been started
Expand Down Expand Up @@ -141,14 +141,14 @@ def __init__(self, network):
self.lnwatcher = network.local_watchtower
self.app = web.Application()
self.app.router.add_post("/", self.handle)
self.methods = Methods()
self.methods = jsonrpcserver.methods.Methods()
self.methods.add(self.get_ctn)
self.methods.add(self.add_sweep_tx)

async def handle(self, request):
request = await request.text()
self.logger.info(f'{request}')
response = await dispatch(request, methods=self.methods)
response = await jsonrpcserver.async_dispatch(request, methods=self.methods)
if response.wanted:
return web.json_response(response.deserialized(), status=response.http_status)
else:
Expand All @@ -168,70 +168,98 @@ async def get_ctn(self, *args):
async def add_sweep_tx(self, *args):
return await self.lnwatcher.sweepstore.add_sweep_tx(*args)

class AuthenticationError(Exception):
pass

class Daemon(DaemonThread):
class Daemon(Logger):

@profiler
def __init__(self, config: SimpleConfig, fd=None, *, listen_jsonrpc=True):
DaemonThread.__init__(self)
Logger.__init__(self)
self.running = False
self.running_lock = threading.Lock()
self.config = config
if fd is None and listen_jsonrpc:
fd = get_file_descriptor(config)
if fd is None:
raise Exception('failed to lock daemon; already running?')
self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop()
self.asyncio_loop = asyncio.get_event_loop()
if config.get('offline'):
self.network = None
else:
self.network = Network(config)
self.network._loop_thread = self._loop_thread
self.fx = FxThread(config, self.network)
self.gui = None
self.gui_object = None
# path -> wallet; make sure path is standardized.
self.wallets = {} # type: Dict[str, Abstract_Wallet]
jobs = [self.fx.run]
# Setup JSONRPC server
self.server = None
if listen_jsonrpc:
self.init_server(config, fd)
jobs.append(self.start_jsonrpc(config, fd))
# server-side watchtower
self.watchtower = WatchTowerServer(self.network) if self.config.get('watchtower_host') else None
jobs = [self.fx.run]
if self.watchtower:
jobs.append(self.watchtower.run)
if self.network:
self.network.start(jobs)
self.start()

def init_server(self, config: SimpleConfig, fd):
host = config.get('rpchost', '127.0.0.1')
port = config.get('rpcport', 0)
rpc_user, rpc_password = get_rpc_credentials(config)
try:
server = PasswordProtectedJSONRPCServer(
(host, port), logRequests=False,
rpc_user=rpc_user, rpc_password=rpc_password)
except Exception as e:
self.logger.error(f'cannot initialize RPC server on host {host}: {repr(e)}')
self.server = None
os.close(fd)
def authenticate(self, headers):
if self.rpc_password == '':
# RPC authentication is disabled
return
os.write(fd, bytes(repr((server.socket.getsockname(), time.time())), 'utf8'))
os.close(fd)
self.server = server
server.timeout = 0.1
server.register_function(self.ping, 'ping')
server.register_function(self.run_gui, 'gui')
server.register_function(self.run_daemon, 'daemon')
auth_string = headers.get('Authorization', None)
if auth_string is None:
raise AuthenticationError('CredentialsMissing')
basic, _, encoded = auth_string.partition(' ')
if basic != 'Basic':
raise AuthenticationError('UnsupportedType')
encoded = to_bytes(encoded, 'utf8')
credentials = to_string(b64decode(encoded), 'utf8')
username, _, password = credentials.partition(':')
if not (constant_time_compare(username, self.rpc_user)
and constant_time_compare(password, self.rpc_password)):
time.sleep(0.050)
raise AuthenticationError('Invalid Credentials')

async def handle(self, request):
try:
self.authenticate(request.headers)
except AuthenticationError:
return web.Response(text='Forbidden', status='403')
request = await request.text()
self.logger.info(f'request: {request}')
response = await jsonrpcserver.async_dispatch(request, methods=self.methods)
if response.wanted:
return web.json_response(response.deserialized(), status=response.http_status)
else:
return web.Response()

async def start_jsonrpc(self, config: SimpleConfig, fd):
self.app = web.Application()
self.app.router.add_post("/", self.handle)
self.rpc_user, self.rpc_password = get_rpc_credentials(config)
self.methods = jsonrpcserver.methods.Methods()
self.methods.add(self.ping)
self.methods.add(self.gui)
self.methods.add(self.daemon)
self.cmd_runner = Commands(self.config, None, self.network)
for cmdname in known_commands:
server.register_function(getattr(self.cmd_runner, cmdname), cmdname)
server.register_function(self.run_cmdline, 'run_cmdline')
self.methods.add(getattr(self.cmd_runner, cmdname))
self.methods.add(self.run_cmdline)
self.host = config.get('rpchost', '127.0.0.1')
self.port = config.get('rpcport', 0)
self.runner = web.AppRunner(self.app)
await self.runner.setup()
site = web.TCPSite(self.runner, self.host, self.port)
await site.start()
socket = site._server.sockets[0]
os.write(fd, bytes(repr((socket.getsockname(), time.time())), 'utf8'))
os.close(fd)

def ping(self):
async def ping(self):
return True

def run_daemon(self, config_options):
asyncio.set_event_loop(self.asyncio_loop)
async def daemon(self, config_options):
config = SimpleConfig(config_options)
sub = config.get('subcommand')
assert sub in [None, 'start', 'stop', 'status', 'load_wallet', 'close_wallet']
Expand Down Expand Up @@ -279,13 +307,13 @@ def run_daemon(self, config_options):
response = "Daemon stopped"
return response

def run_gui(self, config_options):
async def gui(self, config_options):
config = SimpleConfig(config_options)
if self.gui:
if hasattr(self.gui, 'new_window'):
if self.gui_object:
if hasattr(self.gui_object, 'new_window'):
config.open_last_wallet()
path = config.get_wallet_path()
self.gui.new_window(path, config.get('url'))
self.gui_object.new_window(path, config.get('url'))
response = "ok"
else:
response = "error: current GUI does not support multiple windows"
Expand Down Expand Up @@ -339,8 +367,7 @@ def stop_wallet(self, path):
if not wallet: return
wallet.stop_threads()

def run_cmdline(self, config_options):
asyncio.set_event_loop(self.asyncio_loop)
async def run_cmdline(self, config_options):
password = config_options.get('password')
new_password = config_options.get('new_password')
config = SimpleConfig(config_options)
Expand Down Expand Up @@ -368,41 +395,50 @@ def run_cmdline(self, config_options):
cmd_runner = Commands(config, wallet, self.network)
func = getattr(cmd_runner, cmd.name)
try:
result = func(*args, **kwargs)
result = await func(*args, **kwargs)
except TypeError as e:
raise Exception("Wrapping TypeError to prevent JSONRPC-Pelix from hiding traceback") from e
return result

def run(self):
while self.is_running():
self.server.handle_request() if self.server else time.sleep(0.1)
def run_daemon(self):
self.running = True
try:
while self.is_running():
time.sleep(0.1)
except KeyboardInterrupt:
self.running = False
self.on_stop()

def is_running(self):
with self.running_lock:
return self.running

def stop(self):
with self.running_lock:
self.running = False

def on_stop(self):
if self.gui_object:
self.gui_object.stop()
# stop network/wallets
for k, wallet in self.wallets.items():
wallet.stop_threads()
if self.network:
self.logger.info("shutting down network")
self.network.stop()
# stop event loop
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
self.on_stop()

def stop(self):
if self.gui:
self.gui.stop()
self.logger.info("stopping, removing lockfile")
remove_lockfile(get_lockfile(self.config))
DaemonThread.stop(self)

def init_gui(self, config, plugins):
def run_gui(self, config, plugins):
threading.current_thread().setName('GUI')
gui_name = config.get('gui', 'qt')
if gui_name in ['lite', 'classic']:
gui_name = 'qt'
gui = __import__('electrum.gui.' + gui_name, fromlist=['electrum'])
self.gui = gui.ElectrumGui(config, self, plugins)
self.gui_object = gui.ElectrumGui(config, self, plugins)
try:
self.gui.main()
self.gui_object.main()
except BaseException as e:
self.logger.exception('')
# app will exit now
self.on_stop()

0 comments on commit 865895d

Please sign in to comment.