Skip to content

Commit

Permalink
Merge branch 'release-0.8.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Booth committed Dec 3, 2016
2 parents 8d9a5c4 + 33b1ce8 commit 4825fbb
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 33 deletions.
19 changes: 19 additions & 0 deletions RELEASE-NOTES
@@ -1,3 +1,22 @@
verion 0.8.0
------------

- stale connections are periodically closed. See docs/ENV-NOTES for
SESSION_TIMEOUT, default is 10 minutes. Issue #56.
- each session gets its own ID which is used in the logs instead of its
network address; the network address is still shown on initial connection.
Issue #55.
- the session ID is also shown in the sessions list. You can use this ID
with the following new RPC commands which take a list of session ids:

electrumx_rpc.py log
electrumx_rpc.py disconnect

The first will toggle logging of the sessions. A logged sesssion
prints every incoming request to the logs.
The second will disconnect the sessions.
Example: "electrumx_rpc.py log 15 369"

version 0.7.20
--------------

Expand Down
3 changes: 3 additions & 0 deletions docs/ENV-NOTES
Expand Up @@ -83,6 +83,9 @@ BANDWIDTH_LIMIT - per-session periodic bandwith usage limit in bytes.
end of each period. Currently the period is
hard-coded to be one hour. The default limit value
is 2 million bytes.
SESSION_TIMEOUT - an integer number of seconds defaulting to 600.
Sessions with no activity for longer than this are
disconnected.

If you want IRC connectivity to advertise your node:

Expand Down
29 changes: 18 additions & 11 deletions lib/jsonrpc.py
Expand Up @@ -64,6 +64,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
INTERNAL_ERROR = -32603

ID_TYPES = (type(None), str, numbers.Number)
NEXT_SESSION_ID = 0

class RPCError(Exception):
'''RPC handlers raise this error.'''
Expand All @@ -79,6 +80,7 @@ class LargeRequestError(Exception):
def __init__(self):
super().__init__()
self.start = time.time()
self.last_recv = self.start
self.bandwidth_start = self.start
self.bandwidth_interval = 3600
self.bandwidth_used = 0
Expand All @@ -103,6 +105,10 @@ def __init__(self):
# If buffered incoming data exceeds this the connection is closed
self.max_buffer_size = 1000000
self.anon_logs = False
self.id_ = JSONRPC.NEXT_SESSION_ID
JSONRPC.NEXT_SESSION_ID += 1
self.log_prefix = '[{:d}] '.format(self.id_)
self.log_me = False

def peername(self, *, for_log=True):
'''Return the peer name of this connection.'''
Expand Down Expand Up @@ -140,10 +146,10 @@ def data_received(self, data):
# Close abuvsive connections where buffered data exceeds limit
buffer_size = len(data) + sum(len(part) for part in self.parts)
if buffer_size > self.max_buffer_size:
self.logger.error('read buffer of {:,d} bytes exceeds {:,d} '
'byte limit, closing {}'
.format(buffer_size, self.max_buffer_size,
self.peername()))
self.log_error('read buffer of {:,d} bytes exceeds {:,d} '
'byte limit, closing {}'
.format(buffer_size, self.max_buffer_size,
self.peername()))
self.transport.close()

# Do nothing if this connection is closing
Expand All @@ -155,6 +161,7 @@ def data_received(self, data):
if npos == -1:
self.parts.append(data)
break
self.last_recv = time.time()
self.recv_count += 1
tail, data = data[:npos], data[npos + 1:]
parts, self.parts = self.parts, []
Expand Down Expand Up @@ -184,6 +191,8 @@ def decode_message(self, message):

'''Queue the request for asynchronous handling.'''
self.messages.put_nowait(message)
if self.log_me:
self.log_info('queued {}'.format(message))

def send_json_notification(self, method, params):
'''Create a json notification.'''
Expand Down Expand Up @@ -216,7 +225,7 @@ def send_json(self, payload):
data = (json.dumps(payload) + '\n').encode()
except TypeError:
msg = 'JSON encoding failure: {}'.format(payload)
self.logger.error(msg)
self.log_error(msg)
self.send_json_error(msg, self.INTERNAL_ERROR, id_)
else:
if len(data) > max(1000, self.max_send):
Expand All @@ -239,10 +248,9 @@ async def handle_message(self, message):
excess = self.bandwidth_used - self.bandwidth_limit
if excess > 0:
secs = 1 + excess // self.bandwidth_limit
self.logger.warning('{} has high bandwidth use of {:,d} bytes, '
'sleeping {:d}s'
.format(self.peername(), self.bandwidth_used,
secs))
self.log_warning('high bandwidth use of {:,d} bytes, '
'sleeping {:d}s'
.format(self.bandwidth_used, secs))
await asyncio.sleep(secs)

if isinstance(message, list):
Expand All @@ -254,8 +262,7 @@ async def handle_message(self, message):
try:
self.send_json(payload)
except self.LargeRequestError:
self.logger.warning('blocked large request from {}: {}'
.format(self.peername(), message))
self.log_warning('blocked large request {}'.format(message))

async def batch_payload(self, batch):
'''Return the JSON payload corresponding to a batch JSON request.'''
Expand Down
10 changes: 10 additions & 0 deletions lib/util.py
Expand Up @@ -20,6 +20,16 @@ class LoggedClass(object):
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.INFO)
self.log_prefix = ''

def log_info(self, msg):
self.logger.info(self.log_prefix + msg)

def log_warning(self, msg):
self.logger.warning(self.log_prefix + msg)

def log_error(self, msg):
self.logger.error(self.log_prefix + msg)


# Method decorator. To be used for calculations that will always
Expand Down
1 change: 1 addition & 0 deletions server/env.py
Expand Up @@ -50,6 +50,7 @@ def __init__(self):
self.max_subs = self.integer('MAX_SUBS', 250000)
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000)
self.session_timeout = self.integer('SESSION_TIMEOUT', 600)
# IRC
self.irc = self.default('IRC', False)
self.irc_nick = self.default('IRC_NICK', None)
Expand Down
99 changes: 78 additions & 21 deletions server/protocol.py
Expand Up @@ -228,8 +228,11 @@ def __init__(self, env):
self.next_log_sessions = 0
self.max_subs = env.max_subs
self.subscription_count = 0
self.next_stale_check = 0
self.futures = []
env.max_send = max(350000, env.max_send)
self.logger.info('session timeout: {:,d} seconds'
.format(env.session_timeout))
self.logger.info('session bandwidth limit {:,d} bytes'
.format(env.bandwidth_limit))
self.logger.info('max response size {:,d} bytes'.format(env.max_send))
Expand Down Expand Up @@ -354,24 +357,48 @@ async def close_sessions(self, secs=30):
.format(len(self.sessions)))

def add_session(self, session):
self.clear_stale_sessions()
coro = session.serve_requests()
future = asyncio.ensure_future(coro)
self.sessions[session] = future
self.logger.info('connection from {}, {:,d} total'
session.log_info('connection from {}, {:,d} total'
.format(session.peername(), len(self.sessions)))
# Some connections are acknowledged after the servers are closed
if not self.servers:
self.close_session(session)

def remove_session(self, session):
self.subscription_count -= session.sub_count()
future = self.sessions.pop(session)
future.cancel()

def close_session(self, session):
'''Close the session's transport and cancel its future.'''
session.transport.close()
self.sessions[session].cancel()
return '{:d} disconnected'.format(session.id_)

def remove_session(self, session):
self.subscription_count -= session.sub_count()
future = self.sessions.pop(session)
future.cancel()
def toggle_logging(self, session):
'''Close the session's transport and cancel its future.'''
session.log_me = not session.log_me
if session.log_me:
return 'logging {:d}'.format(session.id_)
else:
return 'not logging {:d}'.format(session.id_)

def clear_stale_sessions(self):
'''Cut off sessions that haven't done anything for 10 minutes.'''
now = time.time()
if now > self.next_stale_check:
self.next_stale_check = now + 60
cutoff = now - self.env.session_timeout
stale = [session for session in self.sessions
if session.last_recv < cutoff]
for session in stale:
self.close_session(session)
if stale:
self.logger.info('dropped {:,d} stale connections'
.format(len(stale)))

def new_subscription(self):
if self.subscription_count >= self.max_subs:
Expand Down Expand Up @@ -408,14 +435,15 @@ def time_fmt(t):
return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60))

fmt = ('{:<4} {:>23} {:>15} {:>7} '
fmt = ('{:<4} {:>7} {:>23} {:>15} {:>7} '
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
yield fmt.format('Type', 'Peer', 'Client', 'Subs',
yield fmt.format('Type', 'ID ', 'Peer', 'Client', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB',
'Txs', 'Time')
for (kind, peer, subs, client, recv_count, recv_size,
for (kind, id_, log_me, peer, subs, client, recv_count, recv_size,
send_count, send_size, txs_sent, time) in data:
yield fmt.format(kind, peer, client,
yield fmt.format(kind, str(id_) + ('L' if log_me else ' '),
peer, client,
'{:,d}'.format(subs),
'{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024),
Expand All @@ -429,6 +457,8 @@ def session_data(self, for_log):
now = time.time()
sessions = sorted(self.sessions.keys(), key=lambda s: s.start)
return [(session.kind,
session.id_,
session.log_me,
session.peername(for_log=for_log),
session.sub_count(),
session.client,
Expand All @@ -438,6 +468,33 @@ def session_data(self, for_log):
now - session.start)
for session in sessions]

def lookup_session(self, param):
try:
id_ = int(param)
except:
pass
else:
for session in self.sessions:
if session.id_ == id_:
return session
return None

def for_each_session(self, params, operation):
result = []
for param in params:
session = self.lookup_session(param)
if session:
result.append(operation(session))
else:
result.append('unknown session: {}'.format(param))
return result

async def rpc_disconnect(self, params):
return self.for_each_session(params, self.close_session)

async def rpc_log(self, params):
return self.for_each_session(params, self.toggle_logging)

async def rpc_getinfo(self, params):
return self.server_summary()

Expand Down Expand Up @@ -485,10 +542,10 @@ def connection_lost(self, exc):
'''Handle client disconnection.'''
super().connection_lost(exc)
if self.error_count or self.send_size >= 1024*1024:
self.logger.info('{} disconnected. '
'Sent {:,d} bytes in {:,d} messages {:,d} errors'
.format(self.peername(), self.send_size,
self.send_count, self.error_count))
self.log_info('disconnected. Sent {:,d} bytes in {:,d} messages '
'{:,d} errors'
.format(self.send_size, self.send_count,
self.error_count))
self.manager.remove_session(self)

async def handle_request(self, method, params):
Expand All @@ -514,7 +571,7 @@ async def serve_requests(self):
break
except Exception:
# Getting here should probably be considered a bug and fixed
self.logger.error('error handling request {}'.format(message))
self.log_error('error handling request {}'.format(message))
traceback.print_exc()

def sub_count(self):
Expand Down Expand Up @@ -638,8 +695,7 @@ async def notify(self, height, touched, cache):
self.send_json(payload)

if matches:
self.logger.info('notified {} of {} addresses'
.format(self.peername(), len(matches)))
self.log_info('notified of {:,d} addresses'.format(len(matches)))

def height(self):
'''Return the current flushed database height.'''
Expand Down Expand Up @@ -825,12 +881,12 @@ async def transaction_broadcast(self, params):
tx_hash = await self.daemon.sendrawtransaction(params)
self.txs_sent += 1
self.manager.txs_sent += 1
self.logger.info('sent tx: {}'.format(tx_hash))
self.log_info('sent tx: {}'.format(tx_hash))
return tx_hash
except DaemonError as e:
error = e.args[0]
message = error['message']
self.logger.info('sendrawtransaction: {}'.format(message))
self.log_info('sendrawtransaction: {}'.format(message))
if 'non-mandatory-script-verify-flag' in message:
return (
'Your client produced a transaction that is not accepted '
Expand Down Expand Up @@ -886,8 +942,8 @@ async def banner(self, params):
with codecs.open(self.env.banner_file, 'r', 'utf-8') as f:
banner = f.read()
except Exception as e:
self.logger.error('reading banner file {}: {}'
.format(self.env.banner_file, e))
self.log_error('reading banner file {}: {}'
.format(self.env.banner_file, e))
else:
network_info = await self.daemon.getnetworkinfo()
version = network_info['version']
Expand Down Expand Up @@ -932,7 +988,8 @@ class LocalRPC(Session):

def __init__(self, *args):
super().__init__(*args)
cmds = 'getinfo sessions numsessions peers numpeers'.split()
cmds = ('disconnect getinfo log numpeers numsessions peers sessions'
.split())
self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd))
for cmd in cmds}
self.client = 'RPC'
Expand Down
2 changes: 1 addition & 1 deletion server/version.py
@@ -1 +1 @@
VERSION = "ElectrumX 0.7.20"
VERSION = "ElectrumX 0.8.0"

0 comments on commit 4825fbb

Please sign in to comment.