Skip to content

Commit

Permalink
Clean up concurrency logic; expose recalc_concurrency()
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Booth committed Apr 10, 2019
1 parent 27cd682 commit def2530
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 91 deletions.
53 changes: 26 additions & 27 deletions aiorpcx/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@
from aiorpcx.util import Concurrency


EXCESSIVE_RESOURCE_USAGE = -99


class Connector(object):

def __init__(self, session_factory, host=None, port=None, proxy=None,
Expand Down Expand Up @@ -141,29 +138,6 @@ def __init__(self, *, framer=None, loop=None):
self._concurrency = Concurrency(self.initial_concurrent)
self._cost_check_delta = max(self.cost_soft_limit // 4, 100)

def _recalc_concurrency(self):
# Refund resource usage proportionally to elapsed time; the bump passed is negative
now = time.time()
self.bump_cost((self._cost_time - now) * self.cost_decay_per_sec)
self._cost_last = self.cost

# Setting cost_hard_limit <= 0 means to not limit concurrency
value = self._concurrency.max_concurrent
cost_soft_range = self.cost_hard_limit - self.cost_soft_limit
if cost_soft_range <= 0:
return value

cost = self.cost + self.extra_cost()
self._cost_time = now
self._cost_fraction = max(0.0, (cost - self.cost_soft_limit) / cost_soft_range)
if self._cost_fraction > 1.0:
raise FinalRPCError(EXCESSIVE_RESOURCE_USAGE, 'excessive resource usage')

target = ceil((1.0 - self._cost_fraction) * self.initial_concurrent)
if target != value:
self.logger.info(f'changing task concurrency from {value} to {target}')
return target

def _receive_messages(self):
raise NotImplementedError

Expand Down Expand Up @@ -253,7 +227,32 @@ def bump_cost(self, delta):
# Delta can be positive or negative
self.cost = max(0, self.cost + delta)
if abs(self.cost - self._cost_last) > self._cost_check_delta:
self._concurrency.force_recalc(self._recalc_concurrency)
self.recalc_concurrency()

def recalc_concurrency(self):
'''Call to recalculate sleeps and concurrency for the session. Called automatically if
cost has drifted significantly. Otherwise can be called at regular intervals if
desired.
'''
# Refund resource usage proportionally to elapsed time; the bump passed is negative
now = time.time()
self.cost = max(0, self.cost - (now - self._cost_time) * self.cost_decay_per_sec)
self._cost_time = now
self._cost_last = self.cost

# Setting cost_hard_limit <= 0 means to not limit concurrency
value = self._concurrency.max_concurrent
cost_soft_range = self.cost_hard_limit - self.cost_soft_limit
if cost_soft_range <= 0:
return

cost = self.cost + self.extra_cost()
self._cost_fraction = max(0.0, (cost - self.cost_soft_limit) / cost_soft_range)

target = ceil((1.0 - self._cost_fraction) * self.initial_concurrent)
if target != value:
self.logger.info(f'changing task concurrency from {value} to {target}')
self._concurrency.set_target(target)

def extra_cost(self):
'''A dynamic value added to this session's cost when deciding how much to throttle
Expand Down
45 changes: 20 additions & 25 deletions aiorpcx/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,42 +85,37 @@ def signature_info(func):
return SignatureInfo(min_args, max_args, required_names, other_names)


def _require_non_negative(value):
if not isinstance(value, int) or value < 0:
raise RuntimeError('concurrency must be a natural number')
EXCESSIVE_RESOURCE_USAGE = -99


class Concurrency(object):

def __init__(self, max_concurrent):
_require_non_negative(max_concurrent)
self._max_concurrent = max_concurrent
self._semaphore = asyncio.Semaphore(max_concurrent)
self._recalc_func = None

async def _set_max_concurrent(self, value):
_require_non_negative(value)
diff = value - self._max_concurrent
self._max_concurrent = value
if diff >= 0:
for _ in range(diff):
self._semaphore.release()
else:
for _ in range(-diff):
def __init__(self, target):
self._target = int(target)
self._semaphore = asyncio.Semaphore(self._target)
self._sem_value = self._target

async def _retarget_semaphore(self):
while self._sem_value != self._target:
if self._target <= 0:
from .jsonrpc import FinalRPCError
raise FinalRPCError(EXCESSIVE_RESOURCE_USAGE, 'excessive resource usage')
if self._sem_value > self._target:
await self._semaphore.acquire()
self._sem_value -= 1
else:
self._semaphore.release()
self._sem_value += 1

@property
def max_concurrent(self):
return self._max_concurrent
return self._target

def force_recalc(self, recalc_func):
self._recalc_func = recalc_func
def set_target(self, target):
self._target = int(target)

async def __aenter__(self):
if self._recalc_func:
target = self._recalc_func()
self._recalc_func = None
await self._set_max_concurrent(target)
await self._retarget_semaphore()
await self._semaphore.acquire()

async def __aexit__(self, exc_type, exc_value, traceback):
Expand Down
28 changes: 17 additions & 11 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,50 +329,56 @@ async def test_concurrency(self, server):
client.cost_decay_per_sec = 0
# Test usage below soft limit
client.cost = client.cost_soft_limit - 10
assert client._recalc_concurrency() == client.initial_concurrent
client.recalc_concurrency()
assert client._concurrency.max_concurrent == client.initial_concurrent
assert client._cost_fraction == 0.0
# Test usage at soft limit doesn't affect concurrency
client.cost = client.cost_soft_limit
assert client._recalc_concurrency() == client.initial_concurrent
client.recalc_concurrency()
assert client._concurrency.max_concurrent == client.initial_concurrent
assert client._cost_fraction == 0.0
# Test usage half-way
client.cost = (client.cost_soft_limit + client.cost_hard_limit) // 2
assert 1 < client._recalc_concurrency() < client.initial_concurrent
client.recalc_concurrency()
assert 1 < client._concurrency.max_concurrent < client.initial_concurrent
assert 0.49 < client._cost_fraction < 0.51
# Test at hard limit
client.cost = client.cost_hard_limit
assert client._recalc_concurrency() == 0
client.recalc_concurrency()
assert client._cost_fraction == 1.0
# Test above hard limit disconnects
client.cost = client.cost_hard_limit + 1
client.recalc_concurrency()
with pytest.raises(FinalRPCError):
client._recalc_concurrency()
async with client._concurrency:
pass

@pytest.mark.asyncio
async def test_concurrency_decay(self, server):
async with Connector(RPCSession, 'localhost', server.port) as client:
client.cost_decay_per_sec = 100
client.cost = 1000
await sleep(0.01)
client._recalc_concurrency()
client.recalc_concurrency()
assert 995 < client.cost < 999

@pytest.mark.asyncio
async def test_concurrency_hard_limit_0(self, server):
async with Connector(RPCSession, 'localhost', server.port) as client:
client.cost = 1_000_000_000
client.cost_hard_limit = 0
assert client._recalc_concurrency() == client.initial_concurrent
client.recalc_concurrency()
assert client._concurrency.max_concurrent == client.initial_concurrent

@pytest.mark.asyncio
async def test_extra_cost(self, server):
async with Connector(RPCSession, 'localhost', server.port) as client:
client.extra_cost = lambda: client.cost_soft_limit + 1
client._recalc_concurrency()
assert client._cost_fraction > 0
client.recalc_concurrency()
assert 1 > client._cost_fraction > 0
client.extra_cost = lambda: client.cost_hard_limit + 1
with pytest.raises(FinalRPCError):
client._recalc_concurrency()
client.recalc_concurrency()
assert client._cost_fraction > 1

@pytest.mark.asyncio
async def test_request_sleep(self, server):
Expand Down
42 changes: 14 additions & 28 deletions tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest

from aiorpcx import FinalRPCError
from aiorpcx.util import (SignatureInfo, signature_info, Concurrency,
is_async_call)

Expand All @@ -25,12 +26,10 @@ def test_is_async_call():

def test_concurrency_constructor():
Concurrency(3)
Concurrency(max_concurrent=0)
Concurrency(max_concurrent=32)
with pytest.raises(RuntimeError):
Concurrency(max_concurrent=-1)
with pytest.raises(RuntimeError):
Concurrency(max_concurrent=2.5)
Concurrency(target=6)
Concurrency(target=0)
with pytest.raises(ValueError):
Concurrency(target=-1)


async def concurrency_max(c):
Expand Down Expand Up @@ -63,34 +62,21 @@ async def work():

@pytest.mark.asyncio
async def test_max_concurrent():
c = Concurrency(max_concurrent=3)
c = Concurrency(target=3)
assert c.max_concurrent == 3
assert await concurrency_max(c) == 3
await c._set_max_concurrent(3)
c.set_target(3)
assert c.max_concurrent == 3
assert await concurrency_max(c) == 3
await c._set_max_concurrent(1)
c.set_target(1)
assert c.max_concurrent == 1
assert await concurrency_max(c) == 1
await c._set_max_concurrent(0)
assert c._semaphore._value == 0

c.set_target(0)
assert c.max_concurrent == 0
assert await concurrency_max(c) == 0
await c._set_max_concurrent(5)
with pytest.raises(FinalRPCError):
async with c:
pass
c.set_target(5)
assert c.max_concurrent == 5
assert await concurrency_max(c) == 5
with pytest.raises(RuntimeError):
await c._set_max_concurrent(-1)
with pytest.raises(RuntimeError):
await c._set_max_concurrent(2.6)


@pytest.mark.asyncio
async def test_force_recalc():
c = Concurrency(max_concurrent=6)
c.force_recalc(lambda: 4)
assert c.max_concurrent == 6
async with c:
assert c.max_concurrent == 4
assert c._recalc_func is None
assert c.max_concurrent == 4

0 comments on commit def2530

Please sign in to comment.