Skip to content

Commit

Permalink
Rename res_usage to cost
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Booth committed Apr 10, 2019
1 parent 2884b94 commit 8dd49bb
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 58 deletions.
70 changes: 35 additions & 35 deletions aiorpcx/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ class SessionBase(asyncio.Protocol):

max_errors = 10
# Multiply this by bandwidth bytes used to get resource usage cost
bw_res_usage_per_byte = 1 / 100000
# If res_usage is over this requests begin to get delayed and concurrency is reduced
ru_soft_limit = 10000
ru_check_delta = ru_soft_limit // 4
# If res_usage is over this the session is closed
ru_hard_limit = 50000
bw_cost_per_byte = 1 / 100000
# If cost is over this requests begin to get delayed and concurrency is reduced
cost_soft_limit = 10000
cost_check_delta = cost_soft_limit // 4
# If cost is over this the session is closed
cost_hard_limit = 50000
# Resource usage is reduced by this every second
ru_decay_per_sec = ru_hard_limit / 3600
# Request delay ranges from 0 to this between ru_soft_limit and ru_hard_limit
ru_sleep = 5.0
cost_decay_per_sec = cost_hard_limit / 3600
# Request delay ranges from 0 to this between cost_soft_limit and cost_hard_limit
cost_sleep = 5.0
# Initial number of requests that can be concurrently processed
initial_concurrent = 6

Expand Down Expand Up @@ -134,32 +134,32 @@ def __init__(self, *, framer=None, loop=None):
self.recv_size = 0
self.last_recv = self.start_time
# Resource usage
self.res_usage = 0.0
self._ru_last = 0.0
self._ru_time = self.start_time
self._ru_fraction = 0.0
self.cost = 0.0
self._cost_last = 0.0
self._cost_time = self.start_time
self._cost_fraction = 0.0
# Concurrency control
self._concurrency = Concurrency(self.initial_concurrent)

def _recalc_concurrency(self):
# Refund resource usage proportionally to elapsed time; the bump passed is negative
now = time.time()
self.bump_res_usage((self._ru_time - now) * self.ru_decay_per_sec)
self._ru_last = self.res_usage
self.bump_cost((self._cost_time - now) * self.cost_decay_per_sec)
self._cost_last = self.cost

# Setting ru_hard_limit <= 0 means to not limit concurrency
# Setting cost_hard_limit <= 0 means to not limit concurrency
value = self._concurrency.max_concurrent
ru_soft_range = self.ru_hard_limit - self.ru_soft_limit
if ru_soft_range <= 0:
cost_soft_range = self.cost_hard_limit - self.cost_soft_limit
if cost_soft_range <= 0:
return value

res_usage = self.res_usage + self.extra_res_usage()
self._ru_time = now
self._ru_fraction = max(0.0, (res_usage - self.ru_soft_limit) / ru_soft_range)
if self._ru_fraction > 1.0:
cost = self.cost + self.extra_cost()
self._cost_time = now
self._cost_fraction = max(0.0, (cost - self.cost_soft_limit) / cost_soft_range)
if self._cost_fraction > 1.0:
raise FinalRPCError(EXCESSIVE_RESOURCE_USAGE, 'excessive resource usage')

target = ceil((1.0 - self._ru_fraction) * self.initial_concurrent)
target = ceil((1.0 - self._cost_fraction) * self.initial_concurrent)
if target != value:
self.logger.info(f'changing task concurrency from {value} to {target}')
return target
Expand All @@ -182,7 +182,7 @@ async def _send_message(self, message):
if not self.is_closing():
framed_message = self.framer.frame(message)
self.send_size += len(framed_message)
self.bump_res_usage(len(framed_message) * self.bw_res_usage_per_byte)
self.bump_cost(len(framed_message) * self.bw_cost_per_byte)
self.send_count += 1
self.last_send = time.time()
if self.verbosity >= 4:
Expand All @@ -205,7 +205,7 @@ def data_received(self, framed_message):
if self.verbosity >= 4:
self.logger.debug(f'Received framed message {framed_message}')
self.recv_size += len(framed_message)
self.bump_res_usage(len(framed_message) * self.bw_res_usage_per_byte)
self.bump_cost(len(framed_message) * self.bw_cost_per_byte)
self.framer.received_bytes(framed_message)

def pause_writing(self):
Expand Down Expand Up @@ -249,15 +249,15 @@ def connection_lost(self, exc):
self._can_send.set()

# External API
def bump_res_usage(self, delta):
def bump_cost(self, delta):
# Delta can be positive or negative
self.res_usage = max(0, self.res_usage + delta)
if abs(self.res_usage - self._ru_last) > self.ru_check_delta:
self.cost = max(0, self.cost + delta)
if abs(self.cost - self._cost_last) > self.cost_check_delta:
self._concurrency.force_recalc(self._recalc_concurrency)

def extra_res_usage(self):
'''A dynamic value added to this session's resource usage when deciding how much to
throttle requests. Can be negative.
def extra_cost(self):
'''A dynamic value added to this session's cost when deciding how much to throttle
requests. Can be negative.
'''
return 0.0

Expand Down Expand Up @@ -346,8 +346,8 @@ async def _receive_messages(self):
async def _throttled_message(self, message):
'''Process a single request, respecting the concurrency limit.'''
async with self._concurrency:
if self._ru_fraction:
await sleep(self._ru_fraction * self.ru_sleep)
if self._cost_fraction:
await sleep(self._cost_fraction * self.cost_sleep)
try:
await self.handle_message(message)
except ProtocolError as e:
Expand Down Expand Up @@ -482,8 +482,8 @@ async def _receive_messages(self):
async def _throttled_request(self, request):
'''Process a single request, respecting the concurrency limit.'''
async with self._concurrency:
if self._ru_fraction:
await sleep(self._ru_fraction * self.ru_sleep)
if self._cost_fraction:
await sleep(self._cost_fraction * self.cost_sleep)
try:
result = await self.handle_request(request)
except (ProtocolError, RPCError) as e:
Expand Down
46 changes: 23 additions & 23 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,64 +326,64 @@ async def test_slow_connection_aborted(self, server):
async def test_concurrency(self, server):
async with Connector(RPCSession, 'localhost', server.port) as client:
# Prevent this interfering
client.ru_decay_per_sec = 0
client.cost_decay_per_sec = 0
# Test usage below soft limit
client.res_usage = client.ru_soft_limit - 10
client.cost = client.cost_soft_limit - 10
assert client._recalc_concurrency() == client.initial_concurrent
assert client._ru_fraction == 0.0
assert client._cost_fraction == 0.0
# Test usage at soft limit doesn't affect concurrency
client.res_usage = client.ru_soft_limit
client.cost = client.cost_soft_limit
assert client._recalc_concurrency() == client.initial_concurrent
assert client._ru_fraction == 0.0
assert client._cost_fraction == 0.0
# Test usage half-way
client.res_usage = (client.ru_soft_limit + client.ru_hard_limit) // 2
client.cost = (client.cost_soft_limit + client.cost_hard_limit) // 2
assert 1 < client._recalc_concurrency() < client.initial_concurrent
assert 0.49 < client._ru_fraction < 0.51
assert 0.49 < client._cost_fraction < 0.51
# Test at hard limit
client.res_usage = client.ru_hard_limit
client.cost = client.cost_hard_limit
assert client._recalc_concurrency() == 0
assert client._ru_fraction == 1.0
assert client._cost_fraction == 1.0
# Test above hard limit disconnects
client.res_usage = client.ru_hard_limit + 1
client.cost = client.cost_hard_limit + 1
with pytest.raises(FinalRPCError):
client._recalc_concurrency()

@pytest.mark.asyncio
async def test_concurrency_decay(self, server):
async with Connector(RPCSession, 'localhost', server.port) as client:
client.ru_decay_per_sec = 100
client.res_usage = 1000
client.cost_decay_per_sec = 100
client.cost = 1000
await sleep(0.01)
client._recalc_concurrency()
assert 995 < client.res_usage < 999
assert 995 < client.cost < 999

@pytest.mark.asyncio
async def test_concurrency_hard_limit_0(self, server):
async with Connector(RPCSession, 'localhost', server.port) as client:
client.res_usage = 1_000_000_000
client.ru_hard_limit = 0
client.cost = 1_000_000_000
client.cost_hard_limit = 0
assert client._recalc_concurrency() == client.initial_concurrent

@pytest.mark.asyncio
async def test_extra_res_usage(self, server):
async def test_extra_cost(self, server):
async with Connector(RPCSession, 'localhost', server.port) as client:
client.extra_res_usage = lambda: client.ru_soft_limit + 1
client.extra_cost = lambda: client.cost_soft_limit + 1
client._recalc_concurrency()
assert client._ru_fraction > 0
client.extra_res_usage = lambda: client.ru_hard_limit + 1
assert client._cost_fraction > 0
client.extra_cost = lambda: client.cost_hard_limit + 1
with pytest.raises(FinalRPCError):
client._recalc_concurrency()

@pytest.mark.asyncio
async def test_request_sleep(self, server):
async with Connector(RPCSession, 'localhost', server.port) as client:
server = await MyServerSession.current_server()
server.bump_res_usage((server.ru_soft_limit + server.ru_hard_limit) / 2)
server.ru_sleep = 0.1
server.bump_cost((server.cost_soft_limit + server.cost_hard_limit) / 2)
server.cost_sleep = 0.1
t1 = time.time()
await client.send_request('echo', [23])
t2 = time.time()
assert t2 - t1 > server.ru_sleep / 2
assert t2 - t1 > server.cost_sleep / 2

@pytest.mark.asyncio
async def test_close_on_many_errors(self, server):
Expand Down Expand Up @@ -789,6 +789,6 @@ async def test_coverage(self):
async def test_request_sleeps(self, msg_server, caplog):
async with Connector(MessageSession, 'localhost', msg_server.port) as client:
server = await MessageServer.current_server()
server.bump_res_usage((server.ru_soft_limit + server.ru_hard_limit) / 2)
server.bump_cost((server.cost_soft_limit + server.cost_hard_limit) / 2)
# Messaging doesn't wait, so this is just for code coverage
await client.send_message((b'version', b'abc'))

0 comments on commit 8dd49bb

Please sign in to comment.