Skip to content

Commit

Permalink
remove out of place stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
zh217 committed Aug 16, 2018
1 parent c3cbcc2 commit 8ed9d8c
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 263 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ python:
install:
- pip install -r test_req.txt
script:
- py.test
- py.test --verbose --cov=./aiochan
- codecov
240 changes: 27 additions & 213 deletions aiochan/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
'p': buffers.PromiseBuffer}

__all__ = ('Chan', 'select', 'merge', 'from_iter', 'from_range', 'zip_chans', 'combine_latest', 'tick_tock', 'timeout',
'Mux', 'Dup', 'Pub', 'go', 'nop', 'run_in_thread', 'run')
'Dup', 'Pub', 'go', 'nop', 'run_in_thread', 'run')

MAX_OP_QUEUE_SIZE = 1024
"""
Expand All @@ -42,9 +42,6 @@ class Chan:
Channels can be used as async generators using the ``async for`` construct for async iteration of the values.
Channels can be used as context managers using the ``with`` construct: when exiting the context, the channel
will be closed.
:param buffer: if a :meth:`aiochan.buffers.AbstractBuffer` is given, then it will be used as the buffer. In this
case `buffer_size` has no effect.
Expand Down Expand Up @@ -226,12 +223,6 @@ def _get(self, handler):
self._gets.append(handler)
return None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def __aiter__(self):
return ChanIterator(self)

Expand Down Expand Up @@ -633,15 +624,13 @@ async def collect(self, n=None):
result.append(r)
return result

def to_queue(self, q=None):
def to_queue(self, q):
"""
Put elements from the channel onto the given queue. Useful for inter-thread communication.
:param q: the queue. If `None`, a `queue.Queue` will be constructed.
:param q: the queue.
:return: the queue `q`.
"""
if q is None:
q = queue.Queue()

async def worker():
async for v in self:
Expand Down Expand Up @@ -894,41 +883,6 @@ async def worker(inp, o):

return self.async_apply(worker, out)

def debounce(self, seconds, *, out=None, close=True):
"""
Release elements that has been put into the channel into the output channel if at least `seconds` have passed
since any previous put operations. If the channel is closed, the last put value will be released immediately
if it has not been released before.
:param seconds: time since last put operations that must have passed before release
:param out: if given, will be used as the output channel
:param close: close the output channel when the input is closed
:return: the output channel containing the released values
"""

async def worker(inp, o):
tout = Chan()
last = None
while True:
el, c = await select(tout, inp)
if c is tout:
tout = Chan()
if last is not None:
await o.put(last)
last = None
else:
if el is None:
if last is not None:
await o.put(last)
break
else:
tout = timeout(seconds)
last = el
if close:
o.close()

return self.async_apply(worker, out)

def dup(self):
"""
Create a :meth:`aiochan.channel.Dup` from the channel
Expand Down Expand Up @@ -1083,6 +1037,7 @@ def from_range(start=None, end=None, step=None, *, loop=None):
def select(*chan_ops,
priority=False,
default=None,
cb=None,
loop=None):
"""
Asynchronously completes at most one operation in chan_ops
Expand All @@ -1092,22 +1047,31 @@ def select(*chan_ops,
:param priority: if True, the operations will be tried serially, else the order is random
:param default: if not None, do not queue the operations if they cannot be completed immediately, instead return
a future containing SelectResult(val=default, chan=None).
:param cb:
:param loop: asyncio loop to run on
:return: a function containing SelectResult(val=result, chan=succeeded_chan)
"""
chan_ops = list(chan_ops)
loop = loop or asyncio.get_event_loop()
ft = loop.create_future()
if not cb:
loop = loop or asyncio.get_event_loop()
ft = loop.create_future()
flag = SelectFlag()
if not priority:
random.shuffle(chan_ops)
ret = None

def set_result_wrap(c):
def set_result(v):
ft.set_result((v, c))
if not cb:
def set_result_wrap(c):
def set_result(v):
ft.set_result((v, c))

return set_result
else:
def set_result_wrap(c):
def set_result(v):
cb(v, c)

return set_result
return set_result

for chan_op in chan_ops:
if isinstance(chan_op, Chan):
Expand All @@ -1125,6 +1089,14 @@ def set_result(v):
if r is not None:
ret = (r[0], chan)
break
if cb:
if ret:
cb(ret[0], ret[1])
elif default is not None and flag.active:
flag.commit(None)
cb(default, None)
return

if ret:
ft.set_result(ret)
elif default is not None and flag.active:
Expand Down Expand Up @@ -1227,148 +1199,6 @@ async def worker():
return out


class Mux:
"""
A multiplexer: similar to :meth:`aiochan.channel.merge` but allowing finer control.
Operation modes can be specified individually for each input channel of this multiplexer, in the form of a
set of keywords `solo`, `mute` or `pause`.
Channels that are currently in `pause` mode will not be attempted for gets.
At any moment when a new value is available from any of the inputs, one of the following will happen, in order:
* if the input has `mute` attribute, its value will be silently dropped,
* if the input has `solo` attribute, its value will be put onto the output
* else its value will be put onto the output only if none of the input channels has the `solo` attribute. See the
documentation for the `solo_mode` parameter for its behaviour when its values are not used.
Multiplexers can be used as context managers that can be auto-closed on exiting the context.
:param out: the output chan. If `None`, a new unbuffered channel will be used.
:param solo_mode: `mute` or `pause`. If `mute`, when there are any solo-mode inputs active, other inputs will
be muted: their values are taken but silently dropped. If `pause`, other inputs will not be attempted for
gets at all.
"""
__slots__ = ('_out', '_chans', '_solo_mode', '_change_chan')

def __init__(self, out=None, solo_mode='mute'):
assert solo_mode in ('mute', 'pause')
out = out or Chan()
self._change_chan = Chan()
self._out = out
self._solo_mode = solo_mode
self._chans = {}
solos = set()
mutes = set()
reads = set()

def calc_state():
nonlocal solos, mutes, reads
solos = {c for c, v in self._chans.items() if 'solo' in v}
mutes = {c for c, v in self._chans.items() if 'mute' in v}
if self._solo_mode == 'pause' and solos:
reads = solos.copy()
else:
reads = {c for c, v in self._chans.items() if 'pause' not in v}
reads.add(self._change_chan)

calc_state()

async def worker():
while True:
v, c = await select(*reads)
if c is self._change_chan:
if v is None:
break
calc_state()
continue

if v is None:
self._chans.pop(c, None)
calc_state()
continue

if c in solos or (not solos and c not in mutes):
if not await self._out.put(v):
break

out.loop.create_task(worker())

def _changed(self):
self._change_chan.put_nowait(True, immediate_only=False)

@property
def out(self):
"""
:return: the output channel
"""
return self._out

def mix(self, *inputs, modes=()):
"""
Add channels into the multiplexer. After adding, their values will appear in the output.
:param inputs: the channels to add
:param modes: a set containing the attributes of the added channels.
:return: `self`
"""
modes = {v for v in modes if v in ('solo', 'mute', 'pause')}
for ch in inputs:
self._chans[ch] = modes
self._changed()
return self

def unmix(self, *inputs):
"""
Remove inputs from the multiplexer
:param inputs: the inputs to remove
:return: `self`
"""
for ch in inputs:
self._chans.pop(ch, None)
self._changed()
return self

def unmix_all(self):
"""
Remove all inputs from the multiplexer
:return: `self`
"""
self._chans.clear()
self._changed()
return self

def solo_mode(self, mode):
"""
Set the solo mode of the multiplexer.
:param mode: `mute` or `pause`.
:return: `self`
"""
assert mode in ('mute', 'pause')
self._solo_mode = mode
self._changed()
return self

def close(self):
"""
Close the multiplexer
:return: `self`
"""
self._change_chan.close()
return self

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()


class Dup:
"""
A duplicator: takes values from the input, and gives out the same value to all outputs.
Expand All @@ -1378,8 +1208,6 @@ class Dup:
When there are no output channels, values from the input channels are dropped.
Duplicators can be used as context managers.
:param inp: the input channel
"""

Expand Down Expand Up @@ -1455,12 +1283,6 @@ def close(self):
self._close_chan.close()
return self

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()


class Pub:
"""
Expand All @@ -1471,8 +1293,6 @@ class Pub:
appropriate situations, either globally by setting the `buffer` and `buffer_size` parameters, or individually
for each subscription channel.
Publishers can be used as context managers.
:param inp: the channel to be used as the source of the publication.
:param topic_fn: a function accepting one argument and returning one result. This will be applied to each value
as they come in from `inp`, and the results will be used as topics for subscription. `None` topic is
Expand Down Expand Up @@ -1574,12 +1394,6 @@ def close(self):
self.unsub_all(k)
return self

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()


def go(coro, loop=None):
"""
Expand Down

0 comments on commit 8ed9d8c

Please sign in to comment.