Skip to content

Commit

Permalink
Instantiate SessionBase._task_group immediately
Browse files Browse the repository at this point in the history
- cannot be None
- inline _collect_tasks
  • Loading branch information
Neil Booth committed Nov 8, 2018
1 parent 21f0380 commit 1d0a015
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions aiorpcx/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def __init__(self, *, framer=None, loop=None):
self._can_send = Event()
self._can_send.set()
self._pm_task = None
self._task_group = None
self._task_group = TaskGroup()
# Force-close a connection if a send doesn't succeed in this time
self.max_send_delay = 60
# Statistics. The RPC object also keeps its own statistics.
Expand Down Expand Up @@ -140,15 +140,15 @@ async def _process_messages(self):
'''Process incoming messages asynchronously and consume the
results.
'''
async with TaskGroup() as group:
self._task_group = group
await self.spawn(self._receive_messages)
await self.spawn(self._collect_tasks)
async def collect_tasks():
next_done = task_group.next_done
while True:
await next_done()

async def _collect_tasks(self):
next_done = self._task_group.next_done
while True:
await next_done()
task_group = self._task_group
async with task_group:
await self.spawn(self._receive_messages)
await self.spawn(collect_tasks)

async def _limited_wait(self, secs):
# Wait at most secs seconds to send, otherwise abort the connection
Expand Down Expand Up @@ -258,7 +258,7 @@ async def spawn(self, coro, *args):
'''If the session is connected, spawn a task that is cancelled
on disconnect, and return it. Otherwise return None.'''
group = self._task_group
if group and not group.closed():
if not group.closed():
return await group.spawn(coro, *args)
else:
return None
Expand Down

0 comments on commit 1d0a015

Please sign in to comment.