Skip to content

Commit

Permalink
back pressure control
Browse files Browse the repository at this point in the history
  • Loading branch information
zh217 committed Sep 7, 2018
1 parent f839e83 commit a84ae7e
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions aiochan/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,9 +589,12 @@ def process_worker(in_q, out_q):
elif mode == 'process':
threading.Thread(target=process_worker, args=(in_q.sync_q, out_q.sync_q)).start()

in_flight = asyncio.Semaphore(n, loop=self.loop)

async def pipe_in_worker(q):
while True:
data = await self.get()
await in_flight.acquire()
if data is None:
await q.put(None)
results_chan.close()
Expand All @@ -603,6 +606,7 @@ async def pipe_in_worker(q):
async def pipe_out_worker(q):
while True:
next_item = await q.get()
in_flight.release()
if next_item is None:
break
data, async_ft = next_item
Expand Down Expand Up @@ -698,16 +702,20 @@ def process_worker(in_q, out_q):
elif mode == 'process':
threading.Thread(target=process_worker, args=(in_q.sync_q, out_q.sync_q)).start()

in_flight = asyncio.Semaphore(n, loop=self.loop)

async def pipe_in_worker(q):
while True:
data = await self.get()
await in_flight.acquire()
await q.put(data)
if data is None:
break

async def pipe_out_worker(q):
while True:
data = await q.get()
in_flight.release()
if data is None:
if close:
out.close()
Expand Down

0 comments on commit a84ae7e

Please sign in to comment.