Skip to content

Commit

Permalink
SessionBase: move to a generalized cost metric
Browse files Browse the repository at this point in the history
I've long wanted a more generic cost metric for requests than
simply bandwidth.  SomberNight's kyuupichan/electrumx#785
prompted me to implement something reasonably generic and configurable.

Features:

- generalized res_usage metric with time decay
- bandwidth consumption contributes to the resource consumption
- derived classes can add to the resource usage count freely, such as
  on a per-request basis
- derived classes can provide an extra "soft" cost.  This could come
  from IP address bucketing, session grouping, or anything.  This is
  soft because it is not counted as part of the session's recorded
  resource usage, it is only temporarily added for the purpose of
  calculating the throttling rate.
- soft and hard limits.  Once resource usage (including the soft amount)
  hits the soft limit, request concurrency for the session begins to reduce
  and each request sleeps a little.  As resource usage grows to the hard
  limit, the sleeps increase and the concurrency reduces further.  If the
  hard limit is exceeded the session is closed
- most limits and parameters are configurable
  • Loading branch information
Neil Booth committed Apr 10, 2019
1 parent 0d96937 commit 962a826
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 71 deletions.
102 changes: 66 additions & 36 deletions aiorpcx/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@

import asyncio
import logging
from math import ceil
import time

from aiorpcx.curio import (
Event, TaskGroup, TaskTimeout, CancelledError,
spawn, timeout_after, spawn_sync, ignore_after
spawn, timeout_after, spawn_sync, ignore_after, sleep
)
from aiorpcx.framing import (
NewlineFramer, BitcoinFramer,
Expand All @@ -47,6 +48,9 @@
from aiorpcx.util import Concurrency


EXCESSIVE_RESOURCE_USAGE = -99


class Connector(object):

def __init__(self, session_factory, host=None, port=None, proxy=None,
Expand All @@ -67,7 +71,7 @@ async def create_connection(self):
async def __aenter__(self):
_transport, self.protocol = await self.create_connection()
# By default, do not limit outgoing connections
self.protocol.bw_limit = 0
self.ru_hard_limit = 0
return self.protocol

async def __aexit__(self, exc_type, exc_value, traceback):
Expand All @@ -89,6 +93,19 @@ class SessionBase(asyncio.Protocol):
'''

max_errors = 10
# Multiply this by bandwidth bytes used to get resource usage cost
bw_res_usage_per_byte = 1 / 100000
# Resource usage is reduced by this every second
ru_decay_per_sec = 10000 / 3600
# If res_usage is over this requests begin to get delayed and concurrency is reduced
ru_soft_limit = 10000
ru_check_delta = ru_soft_limit // 4
# If res_usage is over this the session is closed
ru_hard_limit = 50000
# Request delay ranges from 0 to this between ru_soft_limit and ru_hard_limit
ru_sleep = 5.0
# Initial number of requests that can be concurrently processed
initial_concurrent = 6

def __init__(self, *, framer=None, loop=None):
self.framer = framer or self.default_framer()
Expand Down Expand Up @@ -116,35 +133,36 @@ def __init__(self, *, framer=None, loop=None):
self.recv_count = 0
self.recv_size = 0
self.last_recv = self.start_time
# Bandwidth usage per hour before throttling starts
self.bw_limit = 2000000
self.bw_time = self.start_time
self.bw_charge = 0
# Resource usage
self.res_usage = 0.0
self._ru_last = 0.0
self._ru_time = self.start_time
self._ru_fraction = 0.0
# Concurrency control
self.max_concurrent = 6
self._concurrency = Concurrency(self.max_concurrent)
self._concurrency = Concurrency(self.initial_concurrent)

async def _update_concurrency(self):
# A non-positive value means not to limit concurrency
if self.bw_limit <= 0:
return
def _recalc_concurrency(self):
# Refund resource usage proportionally to elapsed time; the bump passed is negative
now = time.time()
# Reduce the recorded usage in proportion to the elapsed time
refund = (now - self.bw_time) * (self.bw_limit / 3600)
self.bw_charge = max(0, self.bw_charge - int(refund))
self.bw_time = now
# Reduce concurrency allocation by 1 for each whole bw_limit used
throttle = int(self.bw_charge / self.bw_limit)
target = max(1, self.max_concurrent - throttle)
current = self._concurrency.max_concurrent
if target != current:
self.logger.info(f'changing task concurrency from {current} '
f'to {target}')
await self._concurrency.set_max_concurrent(target)

def _using_bandwidth(self, size):
'''Called when sending or receiving size bytes.'''
self.bw_charge += size
self.bump_res_usage((self._ru_time - now) * self.ru_decay_per_sec)
self._ru_last = self.res_usage

# Setting ru_hard_limit <= 0 means to not limit concurrency
value = self._concurrency.max_concurrent
ru_soft_range = self.ru_hard_limit - self.ru_soft_limit
if ru_soft_range <= 0:
return value

res_usage = self.res_usage + self.extra_res_usage()
self._ru_time = now
self._ru_fraction = max(0.0, (res_usage - self.ru_soft_limit) / ru_soft_range)
if self._ru_fraction > 1.0:
raise FinalRPCError(EXCESSIVE_RESOURCE_USAGE, 'excessive resource usage')

target = ceil((1.0 - self._ru_fraction) * self.initial_concurrent)
if target != value:
self.logger.info(f'changing task concurrency from {value} to {target}')
return target

def _receive_messages(self):
raise NotImplementedError
Expand All @@ -164,7 +182,7 @@ async def _send_message(self, message):
if not self.is_closing():
framed_message = self.framer.frame(message)
self.send_size += len(framed_message)
self._using_bandwidth(len(framed_message))
self.bump_res_usage(len(framed_message) * self.bw_res_usage_per_byte)
self.send_count += 1
self.last_send = time.time()
if self.verbosity >= 4:
Expand All @@ -187,7 +205,7 @@ def data_received(self, framed_message):
if self.verbosity >= 4:
self.logger.debug(f'Received framed message {framed_message}')
self.recv_size += len(framed_message)
self._using_bandwidth(len(framed_message))
self.bump_res_usage(len(framed_message) * self.bw_res_usage_per_byte)
self.framer.received_bytes(framed_message)

def pause_writing(self):
Expand Down Expand Up @@ -231,6 +249,18 @@ def connection_lost(self, exc):
self._can_send.set()

# External API
def bump_res_usage(self, delta):
# Delta can be positive or negative
self.res_usage = max(0, self.res_usage + delta)
if abs(self.res_usage - self._ru_last) > self.ru_check_delta:
self._concurrency.force_recalc(self._recalc_concurrency)

def extra_res_usage(self):
'''A dynamic value added to this session's resource usage when deciding how much to
throttle requests. Can be negative.
'''
return 0.0

def default_framer(self):
'''Return a default framer.'''
raise NotImplementedError
Expand Down Expand Up @@ -311,13 +341,13 @@ async def _receive_messages(self):
else:
self.last_recv = time.time()
self.recv_count += 1
if self.recv_count % 10 == 0:
await self._update_concurrency()
await self._throttled_message(message)

async def _throttled_message(self, message):
'''Process a single request, respecting the concurrency limit.'''
async with self._concurrency.semaphore:
async with self._concurrency:
if self._ru_fraction:
await sleep(self._ru_fraction * self.ru_sleep)
try:
await self.handle_message(message)
except ProtocolError as e:
Expand Down Expand Up @@ -435,8 +465,6 @@ async def _receive_messages(self):

self.last_recv = time.time()
self.recv_count += 1
if self.recv_count % 10 == 0:
await self._update_concurrency()

try:
requests = self.connection.receive_message(message)
Expand All @@ -453,7 +481,9 @@ async def _receive_messages(self):

async def _throttled_request(self, request):
'''Process a single request, respecting the concurrency limit.'''
async with self._concurrency.semaphore:
async with self._concurrency:
if self._ru_fraction:
await sleep(self._ru_fraction * self.ru_sleep)
try:
result = await self.handle_request(request)
except (ProtocolError, RPCError) as e:
Expand Down
30 changes: 22 additions & 8 deletions aiorpcx/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,36 @@ class Concurrency(object):
def __init__(self, max_concurrent):
_require_non_negative(max_concurrent)
self._max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self._semaphore = asyncio.Semaphore(max_concurrent)
self._recalc_func = None

@property
def max_concurrent(self):
return self._max_concurrent

async def set_max_concurrent(self, value):
async def _set_max_concurrent(self, value):
_require_non_negative(value)
diff = value - self._max_concurrent
self._max_concurrent = value
if diff >= 0:
for _ in range(diff):
self.semaphore.release()
self._semaphore.release()
else:
for _ in range(-diff):
await self.semaphore.acquire()
await self._semaphore.acquire()

@property
def max_concurrent(self):
return self._max_concurrent

def force_recalc(self, recalc_func):
self._recalc_func = recalc_func

async def __aenter__(self):
if self._recalc_func:
target = self._recalc_func()
self._recalc_func = None
await self.set_max_concurrent(target)
await self._semaphore.acquire()

async def __aexit__(self, exc_type, exc_value, traceback):
self._semaphore.release()


def check_task(logger, task):
Expand Down
56 changes: 37 additions & 19 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,26 +325,44 @@ async def test_slow_connection_aborted(self, server):
@pytest.mark.asyncio
async def test_concurrency(self, server):
async with Connector(RPCSession, 'localhost', server.port) as client:
client.bw_limit = 1_000_000
# Test high bw usage crushes concurrency to 1
client.bw_charge = 1_000_000_000
prior_mc = client._concurrency.max_concurrent
await client._update_concurrency()
assert 1 == client._concurrency.max_concurrent < prior_mc
# Test passage of time restores it
client.bw_time -= 1000 * 1000 * 1000
await client._update_concurrency()
assert client._concurrency.max_concurrent == prior_mc

@pytest.mark.asyncio
async def test_concurrency_bw_limit_0(self, server):
# Prevent this interfering
client.ru_decay_per_sec = 0
# Test usage below soft limit
client.res_usage = client.ru_soft_limit - 10
assert client._recalc_concurrency() == client.initial_concurrent
assert client._ru_fraction == 0.0
# Test usage at soft limit doesn't affect concurrency
client.res_usage = client.ru_soft_limit
assert client._recalc_concurrency() == client.initial_concurrent
assert client._ru_fraction == 0.0
# Test usage half-way
client.res_usage = (client.ru_soft_limit + client.ru_hard_limit) // 2
assert 1 < client._recalc_concurrency() < client.initial_concurrent
assert 0.49 < client._ru_fraction < 0.51
# Test at hard limit
client.res_usage = client.ru_hard_limit
assert client._recalc_concurrency() == 0
assert client._ru_fraction == 1.0
# Test above hard limit disconnects
client.res_usage = client.ru_hard_limit + 1
with pytest.raises(FinalRPCError):
client._recalc_concurrency()

@pytest.mark.asyncio
async def test_concurrency_decay(self, server):
async with Connector(RPCSession, 'localhost', server.port) as client:
# Test high bw usage crushes concurrency to 1
client.bw_charge = 1000 * 1000 * 1000
client.bw_limit = 0
prior_mc = client._concurrency.max_concurrent
await client._update_concurrency()
assert client._concurrency.max_concurrent == prior_mc
client.ru_decay_per_sec = 100
client.res_usage = 1000
await sleep(0.01)
client._recalc_concurrency()
assert 995 < client.res_usage < 999

@pytest.mark.asyncio
async def test_concurrency_hard_limit_0(self, server):
async with Connector(RPCSession, 'localhost', server.port) as client:
client.res_usage = 1_000_000_000
client.ru_hard_limit = 0
assert client._recalc_concurrency() == client.initial_concurrent

@pytest.mark.asyncio
async def test_close_on_many_errors(self, server):
Expand Down
16 changes: 8 additions & 8 deletions tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def concurrency_max(c):
fut = loop.create_future()

async def work():
async with c.semaphore:
async with c:
q.append(None)
await fut

Expand All @@ -66,20 +66,20 @@ async def test_max_concurrent():
c = Concurrency(max_concurrent=3)
assert c.max_concurrent == 3
assert await concurrency_max(c) == 3
await c.set_max_concurrent(3)
await c._set_max_concurrent(3)
assert c.max_concurrent == 3
assert await concurrency_max(c) == 3
await c.set_max_concurrent(1)
await c._set_max_concurrent(1)
assert c.max_concurrent == 1
assert await concurrency_max(c) == 1
await c.set_max_concurrent(0)
assert c.semaphore._value == 0
await c._set_max_concurrent(0)
assert c._semaphore._value == 0
assert c.max_concurrent == 0
assert await concurrency_max(c) == 0
await c.set_max_concurrent(5)
await c._set_max_concurrent(5)
assert c.max_concurrent == 5
assert await concurrency_max(c) == 5
with pytest.raises(RuntimeError):
await c.set_max_concurrent(-1)
await c._set_max_concurrent(-1)
with pytest.raises(RuntimeError):
await c.set_max_concurrent(2.6)
await c._set_max_concurrent(2.6)

0 comments on commit 962a826

Please sign in to comment.