-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
354 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,353 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 1, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import sys\n", | ||
"sys.path.append('../')\n", | ||
"\n", | ||
"# import os\n", | ||
"# os.environ['PYTHONASYNCIODEBUG'] = '1'" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# A very quick introduction" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"You will need to import the module `aiochan` and `asyncio` first:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 2, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import aiochan as ac\n", | ||
"import asyncio" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"A channel is like a golang channel or a Clojure core.async chan. Creating a channel is simple:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 3, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"data": { | ||
"text/plain": [ | ||
"Chan<_unk_0 140697829983528>" | ||
] | ||
}, | ||
"execution_count": 3, | ||
"metadata": {}, | ||
"output_type": "execute_result" | ||
} | ||
], | ||
"source": [ | ||
"c = ac.Chan()\n", | ||
"c" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"In the following examples, we use `ac.run` to run the main coroutine. You can also run asyncio loops directly." | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"We can call `await c.put(v)` to put value into the channel, `await c.get()` to get value from the channel, and `c.close()` to close the channel:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 5, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"obtained: product 1\n", | ||
"obtained: product 2\n", | ||
"obtained: product 3\n", | ||
"obtained: product 4\n", | ||
"obtained: product 5\n", | ||
"It is late, let us call it a day.\n", | ||
"consumer goes home\n", | ||
"producer goes home\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"async def producer(c):\n", | ||
" i = 0\n", | ||
" while True:\n", | ||
" await asyncio.sleep(0.1) # producing stuff takes time\n", | ||
" i += 1\n", | ||
" still_open = await c.put('product ' + str(i))\n", | ||
" if not still_open:\n", | ||
" print('producer goes home')\n", | ||
" break\n", | ||
"\n", | ||
"\n", | ||
"async def consumer(c):\n", | ||
" while True:\n", | ||
" product = await c.get()\n", | ||
" if product is not None:\n", | ||
" print('obtained:', product)\n", | ||
" else:\n", | ||
" print('consumer goes home')\n", | ||
" break\n", | ||
"\n", | ||
"async def main():\n", | ||
" c = ac.Chan()\n", | ||
" ac.go(producer(c))\n", | ||
" ac.go(consumer(c))\n", | ||
" await asyncio.sleep(0.6)\n", | ||
" print('It is late, let us call it a day.')\n", | ||
" c.close()\n", | ||
" await asyncio.sleep(0.2) # necessary to wait for producer\n", | ||
"\n", | ||
"ac.run(main())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Channel works as an async iterator:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 6, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"obtained: product 1\n", | ||
"obtained: product 2\n", | ||
"obtained: product 3\n", | ||
"obtained: product 4\n", | ||
"obtained: product 5\n", | ||
"It is late, let us call it a day.\n", | ||
"consumer goes home\n", | ||
"producer goes home\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"async def producer(c):\n", | ||
" i = 0\n", | ||
" while True:\n", | ||
" await asyncio.sleep(0.1) # producing stuff takes time\n", | ||
" i += 1\n", | ||
" still_open = await c.put('product ' + str(i))\n", | ||
" if not still_open:\n", | ||
" print('producer goes home')\n", | ||
" break\n", | ||
"\n", | ||
"\n", | ||
"async def consumer(c):\n", | ||
" async for product in c:\n", | ||
" print('obtained:', product)\n", | ||
" print('consumer goes home')\n", | ||
"\n", | ||
"async def main():\n", | ||
" c = ac.Chan()\n", | ||
" ac.go(producer(c))\n", | ||
" ac.go(consumer(c))\n", | ||
" await asyncio.sleep(0.6)\n", | ||
" print('It is late, let us call it a day.')\n", | ||
" c.close()\n", | ||
" await asyncio.sleep(0.2) # necessary to wait for producer\n", | ||
"\n", | ||
"ac.run(main())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"`select`, which is the whole point of channels, works as in golang or `alt!` in Clojure's core.async to complete one and only one operation non-deterministically:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 8, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"received worker0-1\n", | ||
"received worker1-1\n", | ||
"received worker2-1\n", | ||
"received worker0-2\n", | ||
"received worker1-2\n", | ||
"received worker2-2\n", | ||
"received worker0-3\n", | ||
"received worker1-3\n", | ||
"received worker2-3\n", | ||
"received worker0-4\n", | ||
"received worker1-4\n", | ||
"received worker2-4\n", | ||
"received worker0-5\n", | ||
"received worker1-5\n", | ||
"received worker2-5\n", | ||
"consumer stopped\n", | ||
"worker0 stopped\n", | ||
"worker1 stopped\n", | ||
"worker2 stopped\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"async def worker(out, stop, tag):\n", | ||
" i = 0\n", | ||
" while True:\n", | ||
" i += 1\n", | ||
" await asyncio.sleep(0.1)\n", | ||
" result, c = await ac.select(stop, (out, '%s-%s' % (tag, i)), priority=True)\n", | ||
" if c is stop:\n", | ||
" print('%s stopped' % tag)\n", | ||
" break\n", | ||
"\n", | ||
"async def consumer(c, stop):\n", | ||
" while True:\n", | ||
" result, c = await ac.select(stop, c, priority=True)\n", | ||
" if c is stop:\n", | ||
" print('consumer stopped')\n", | ||
" break\n", | ||
" else:\n", | ||
" print('received', result)\n", | ||
"\n", | ||
"async def main():\n", | ||
" c = ac.Chan()\n", | ||
" stop = ac.Chan()\n", | ||
" for i in range(3):\n", | ||
" ac.go(worker(c, stop, 'worker%s' % i))\n", | ||
" ac.go(consumer(c, stop))\n", | ||
" await asyncio.sleep(0.6)\n", | ||
" stop.close()\n", | ||
" await asyncio.sleep(0.2)\n", | ||
"\n", | ||
"ac.run(main())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Channels can use some buffering to implement back-pressure:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 10, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"producing 1\n", | ||
"producing 2\n", | ||
"producing 3\n", | ||
"consuming 1\n", | ||
"producing 4\n", | ||
"producing 5\n", | ||
"consuming 2\n", | ||
"producing 6\n", | ||
"consuming 3\n", | ||
"producing 7\n", | ||
"consuming 4\n", | ||
"producing 8\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"async def worker(c):\n", | ||
" i = 0\n", | ||
" while True:\n", | ||
" i += 1\n", | ||
" await asyncio.sleep(0.05)\n", | ||
" print('producing', i)\n", | ||
" await c.put(i)\n", | ||
"\n", | ||
"async def consumer(c):\n", | ||
" while True:\n", | ||
" await asyncio.sleep(0.2)\n", | ||
" result = await c.get()\n", | ||
" print('consuming', result)\n", | ||
"\n", | ||
"async def main():\n", | ||
" c = ac.Chan(3)\n", | ||
" ac.go(worker(c))\n", | ||
" ac.go(consumer(c))\n", | ||
" await asyncio.sleep(1)\n", | ||
"\n", | ||
"ac.run(main())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Sliding and dropping buffers are also available." | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"That's all the basics, but there are much more: functional methods, combination patterns, pipelines, thread or process-based parallelism and so on." | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 3", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.6.6" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 2 | ||
} |