Skip to content

Commit

Permalink
add Chan.stats for collecting runtime statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
zh217 committed Sep 8, 2018
1 parent a84ae7e commit e6c5b48
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 8 deletions.
48 changes: 42 additions & 6 deletions aiochan/channel.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import asyncio
import collections
import functools
import itertools
import janus
import multiprocessing
import numbers
import operator
import queue
import random
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor

import janus

from . import buffers
from ._util import FnHandler, SelectFlag, SelectHandler
Expand Down Expand Up @@ -61,7 +61,8 @@ class Chan:
:meth:`aiochan.channel.Chan.put_nowait`.
:param name: used to provide more friendly debugging outputs.
"""
__slots__ = ('loop', '_buf', '_gets', '_puts', '_closed', '_dirty_puts', '_dirty_gets', '_name', '_close_event')
__slots__ = ('loop', '_buf', '_gets', '_puts', '_closed', '_dirty_puts', '_dirty_gets', '_name', '_close_event',
'_delivered_immediate', '_delivered_buffered', '_delivered_queued')

_count = 0

Expand All @@ -86,6 +87,9 @@ def __init__(self,
else:
self._buf = buffer

self._delivered_immediate = 0
self._delivered_buffered = 0
self._delivered_queued = 0
self._gets = collections.deque()
self._puts = collections.deque()
self._closed = False
Expand Down Expand Up @@ -134,7 +138,7 @@ def _put(self, val, handler):
if self.closed or not handler.active:
return (not self.closed,)

# case 1: buffer available, and current buffer and then drain buffer
# case 1: buffer available, add to buffer and then drain buffer
if self._buf and self._buf.can_add:
# print('put op: buffer')
handler.commit()
Expand All @@ -143,6 +147,7 @@ def _put(self, val, handler):
getter = self._gets.popleft()
if getter.active:
self._dispatch(getter.commit(), self._buf.take())
self._delivered_queued += 1
return (True,)

getter = None
Expand All @@ -161,6 +166,7 @@ def _put(self, val, handler):
# print('put op: dispatch immediate to getter')
handler.commit()
self._dispatch(getter.commit(), val)
self._delivered_queued += 1
return (True,)

# case 3: no buffer, no pending getter, queue put op if put is blockable
Expand Down Expand Up @@ -195,6 +201,7 @@ def _get(self, handler):
self._dirty_puts = 0
break
self._check_exhausted()
self._delivered_buffered += 1
return (val,)

putter = None
Expand All @@ -213,6 +220,7 @@ def _get(self, handler):
# print('get op: get immediate from putter')
handler.commit()
self._dispatch(putter[0].commit(), True)
self._delivered_immediate += 1
return (putter[1],)

# case c: we are closed and no buffer
Expand Down Expand Up @@ -361,6 +369,7 @@ def close(self):
if getter.active:
val = self._buf.take() if self._buf and self._buf.can_take else None
self._dispatch(getter.commit(), val)
self._delivered_queued += 1
except IndexError:
self._dirty_gets = 0
break
Expand All @@ -384,6 +393,30 @@ def join(self):
"""
return self._close_event.wait()

def stats(self):
"""
Getting the current stats of the channel, useful for determining bottlenecks and debugging back pressure
in a processing pipeline.
:return: a `ChanStat` object `cs`, where
`cs.state` is `'PENDING_PUTS'`, `'PENDING_GETS'` or `'FLUENT'` according to whether the channel is
currently blocked on puts, blocked on gets, or not blocked (either because there is no operation going
on or there is buffer available), `cs.buffered`, `cs.queued`, `cs.immediate` count how many values
have been delivered according to whether the getter was given a buffered value, the getter was queued,
or the getter obtained value immediately from a pending putter.
"""
if self._puts:
state = 'PENDING_PUTS'
elif self._gets:
state = 'PENDING_GETS'
else:
state = 'FLUENT'

return ChanStat(state=state,
buffered=self._delivered_buffered,
queued=self._delivered_queued,
immediate=self._delivered_immediate)

async def _pipe_worker(self, out):
async for v in self:
if not await out.put(v):
Expand Down Expand Up @@ -1117,6 +1150,9 @@ def tick():
return c


ChanStat = collections.namedtuple('ChanStat', 'state buffered queued immediate')


class ChanIterator:
__slots__ = ('_chan',)

Expand Down
61 changes: 61 additions & 0 deletions aiochan/test/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,67 @@ def topic(v):
await nop()


@pytest.mark.asyncio
async def test_stats():
c = Chan(1)
s = c.stats()
assert s.state == 'FLUENT'
assert s.buffered == 0
assert s.queued == 0
assert s.immediate == 0
c.get_nowait(immediate_only=False)
s = c.stats()
assert s.state == 'PENDING_GETS'
assert s.buffered == 0
assert s.queued == 0
assert s.immediate == 0

await c.put(1)
s = c.stats()
assert s.state == 'FLUENT'
assert s.buffered == 0
assert s.queued == 1
assert s.immediate == 0

await c.put(1)
s = c.stats()
assert s.state == 'FLUENT'
assert s.buffered == 0
assert s.queued == 1
assert s.immediate == 0

c.put_nowait(1, immediate_only=False)
s = c.stats()
assert s.state == 'PENDING_PUTS'
assert s.buffered == 0
assert s.queued == 1
assert s.immediate == 0

await c.get()
s = c.stats()
assert s.state == 'FLUENT'
assert s.buffered == 1
assert s.queued == 1
assert s.immediate == 0

await c.get()
s = c.stats()
assert s.state == 'FLUENT'
assert s.buffered == 2
assert s.queued == 1
assert s.immediate == 0

c = Chan()

c.put_nowait(1, immediate_only=False)
await c.get()
s = c.stats()
assert s.state == 'FLUENT'
assert s.buffered == 0
assert s.queued == 0
assert s.immediate == 1


@pytest.mark.asyncio
async def test_go():
async def af(a):
Expand Down
2 changes: 1 addition & 1 deletion doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
# The short X.Y version
version = ''
# The full version, including alpha/beta/rc tags
release = '0.2.3'
release = '0.2.4'

# -- General configuration ---------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='aiochan',
version='0.2.3',
version='0.2.4',
packages=find_packages(),
platforms='any',
classifiers=[
Expand Down

0 comments on commit e6c5b48

Please sign in to comment.