In [1]:
import queue
 
def task(name, work_queue):
    if work_queue.empty():
        print(f"Task {name} nothing to do")
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            print(f"Task {name} running")
            for x in range(count):
                total += 1
            print(f"Task {name} total: {total}")
 
def main():
    """
    Это основная точка входа в программу
    """
    # Создание очереди работы
    work_queue = queue.Queue()
 
    # Помещение работы в очередь
    for work in [15, 10, 5, 2]:
        work_queue.put(work)
 
    # Создание нескольких синхронных задач
    tasks = [(task, "One", work_queue), (task, "Two", work_queue)]
 
    # Запуск задач
    for t, n, q in tasks:
        t(n, q)
 
if __name__ == "__main__":
    main()

Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do


Рассмотрим важные строки программы:

Строка 1 импортирует модуль queue. Здесь программа хранит работу, которая должна быть выполнена задачами;  
Строки с 3 по 13 определяют task(). Данная функция извлекает работу из очереди work_queue и обрабатывает ее до тех пор, пока больше не нужно ничего делать;  
Строка 15 определяет функцию main() для запуска задач программы;  
Строка 20 создает work_queue. Все задачи используют этот общий ресурс для извлечения работы;  
Строки с 23 по 24 помещают работу в work_queue. В данном случае это просто случайное количество значений для задач, которые нужно обработать;  
Строка 27 создает список кортежей задач со значениями параметров, передаваемых задачами;  
Строки с 30 по 31 перебирают список кортежей задач, вызывая каждый из них и передавая ранее определенные значения параметров;  
Строка 34 вызывает main() для запуска программы.  
Задача в данной программе является просто функцией, что принимает строку и очередь в качестве параметров. При выполнении она ищет что-либо в очереди для обработки. Если есть над чем поработать, из очереди извлекаются значения, запускается цикл for для подсчета до этого значения и выводится итоговое значение в конце. Получение работы из очереди продолжается до тех пор, пока на не закончится.


In [2]:
import queue
 
 
def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        print(f"Task {name} running")
        for x in range(count):
            total += 1
            yield
        print(f"Task {name} total: {total}")
 
 
def main():
    """
    Это основная точка входа в программу
    """
    # Создание очереди работы
    work_queue = queue.Queue()
 
    # Размещение работы в очереди
    for work in [15, 10, 5, 2]:
        work_queue.put(work)
 
    # Создание задач
    tasks = [task("One", work_queue), task("Two", work_queue)]
 
    # Запуск задач
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True
 
 
if __name__ == "__main__":
    main()

Task One running
Task Two running
Task Two total: 10
Task Two running
Task One total: 15
Task One running
Task Two total: 5
Task One total: 2


Строки с 3 по 11 определяют task(), как и раньше. Кроме того, в Строке 10 добавляется yield, превращая функцию в генератор. В этом случае происходит переключение контекста и управление возвращается обратно в цикл while в main();  
Строка 25 создает список задач, но немного иначе, чем вы видели в предыдущем примере кода. В этом случае каждая задача вызывается с параметрами, указанными в переменной списка задач. Это необходимо для запуска функции генератора task() в первый раз;  
Строки с 31 по 36 являются модификациями цикла while в main(), которые позволяют совместно выполнять task(). Управление возвращается к каждому экземпляру task(), позволяя циклу продолжаться и запустить другую задачу;  
Строка 32 возвращает контроль к task() и продолжает выполнение после точки, где был вызван yield;  
Строка 36 устанавливает переменную done. Цикл while заканчивается, когда все задачи завершены и удалены из tasks.

In [4]:
import time
import queue
from codetiming import Timer
 
 
def task(name, queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    while not queue.empty():
        delay = queue.get()
        print(f"Task {name} running")
        timer.start()
        time.sleep(delay)
        timer.stop()
        yield
 
 
def main():
    """
    Это основная точка входа в программу
    """
    # Создание очереди работы
    work_queue = queue.Queue()
 
    # Добавление работы в очередь
    for work in [15, 10, 5, 2]:
        work_queue.put(work)
 
    tasks = [task("One", work_queue), task("Two", work_queue)]
 
    # Запуск задач
    done = False
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        while not done:
            for t in tasks:
                try:
                    next(t)
                except StopIteration:
                    tasks.remove(t)
                if len(tasks) == 0:
                    done = True
 
if __name__ == "__main__":
    main()

Task One running
Task One elapsed time: 15.0
Task Two running
Task Two elapsed time: 10.0
Task One running
Task One elapsed time: 5.0
Task Two running
Task Two elapsed time: 2.0

Total elapsed time: 32.0


Строка 1 импортирует модуль time, чтобы у программы был доступ к time.sleep();  
Строка 3 импортирует код Timer из модуля codetiming;  
Строка 6 создает экземпляр класса Timer, используемый для измерения времени, нужного для итерации каждой задачи цикла;  
Строка 10 запускает экземпляр timer;  
Строка 11 изменяет task() для включения time.sleep(delay) для имитации задержки IO. Это заменяет цикл for, что отвечал за подсчет в example_1.py;  
Строка 12 останавливает экземпляр timer и выводит, истекшее с момента вызова timer.start(), время;  
Строка 30 создает менеджер контекста Timer, что выводит истекшее время с момента начала всего цикла.

Как и ранее, Task On и Task Two  запускаются, собирая работу из очереди обрабатывая ее. Однако даже при добавлении задержки видно, что кооперативный параллелизм ничего не привнес. Задержка останавливает обработку всей программы, а CPU просто ждет, чтобы задержка IO завершилась.

Именно под этим и подразумевается блокирующий код Python в документации по асинхронизации. Вы заметите, что время, необходимое для запуска всей программы, является просто совокупным временем всех задержек. Выполнение заданий таким способом нельзя считать успешным.

In [None]:
import asyncio
from codetiming import Timer
 
 
async def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    while not work_queue.empty():
        delay = await work_queue.get()
        print(f"Task {name} running")
        timer.start()
        await asyncio.sleep(delay)
        timer.stop()
 
 
async def main():
    """
    Это главная точка входа для главной программы
    """
    # Создание очереди работы
    work_queue = asyncio.Queue()
 
    # Помещение работы в очередь
    for work in [15, 10, 5, 2]:
        await work_queue.put(work)
 
    # Запуск задач
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        await asyncio.gather(
            asyncio.create_task(task("One", work_queue)),
            asyncio.create_task(task("Two", work_queue)),
        )
 
 
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

Строка 1 импортирует asyncio для получения доступа к асинхронной функциональности Python. Это замена импорта time;  
Строка 2 импортирует класс Timer из модуля codetiming;  
Строка 4 добавляет ключевое слово async перед определением task(). Это сообщает программе, что task может выполняться асинхронно;  
Строка 5 создается экземпляр Timer, используемый для измерения времени, необходимого для каждой итерации цикла задач;  
Строка 9 запускает экземпляр timer;  
Строка 10 заменяет time.sleep(delay) неблокирующим asyncio.sleep(delay), что также возвращает контроль (или переключает контексты) обратно в цикл основного события;  
Строка 11 останавливается экземпляр timer и выводится истекшее время с момента вызова timer.start();
Строка 18 создает неблокирующую асинхронную work_queue;  
Строки 21-22 помещают работу в work_queue асинхронно с использованием ключевого слова await;  
Строка 25 создается менеджер контекста Timer, который выводит истекшее время, затраченное на выполнение цикла while;  
Строки 26-29 создают две задачи и собирают их вместе, поэтому программа будет ожидать завершения обеих задач;  
Строка 32 запускает программу асинхронно. Здесь также запускается внутренний цикл событий.


In [None]:
import queue
import requests
from codetiming import Timer
 
 
def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    with requests.Session() as session:
        while not work_queue.empty():
            url = work_queue.get()
            print(f"Task {name} getting URL: {url}")
            timer.start()
            session.get(url)
            timer.stop()
            yield
 
 
def main():
    """
    Это основная точка входа в программу
    """
    # Создание очереди работы
    work_queue = queue.Queue()
 
    # Помещение работы в очередь
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://apple.com",
        "http://microsoft.com",
        "http://facebook.com",
        "http://twitter.com",
    ]:
        work_queue.put(url)
 
    tasks = [task("One", work_queue), task("Two", work_queue)]
 
    # Запуск задачи
    done = False
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        while not done:
            for t in tasks:
                try:
                    next(t)
                except StopIteration:
                    tasks.remove(t)
                if len(tasks) == 0:
                    done = True
 
 
if __name__ == "__main__":
    main()

Строка 2 импортирует requests, что предоставляет удобный способ совершать HTTP вызовы.  
Строка 3 импортирует класс Timer из модуля codetiming.  
Строка 6 создается экземпляр Timer, используемый для измерения времени, необходимого для каждой итерации цикла задач.  
Строка 11 запускает экземпляр timer  
Строка 12 создает задержку, похожую на то, что в example_3.py. Однако на этот раз вызывается session.get(url), который возвращает содержимое URL, полученного из work_queue.
Строка 13 останавливает экземпляр timer и выводит истекшее время с момента вызова timer.start().  
Строки с 23 по 32 помещают список URL в work_queue.  
Строка 39 создается менеджер контекста Timer, который выводит истекшее время, затраченное на выполнение всего цикла while.


In [None]:
import asyncio
import aiohttp
from codetiming import Timer
 
 
async def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    async with aiohttp.ClientSession() as session:
        while not work_queue.empty():
            url = await work_queue.get()
            print(f"Task {name} getting URL: {url}")
            timer.start()
            async with session.get(url) as response:
                await response.text()
            timer.stop()
 
 
async def main():
    """
    Это основная точка входа в программу
    """
    # Создание очереди работы
    work_queue = asyncio.Queue()
 
    # Помещение работы в очередь
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://apple.com",
        "http://microsoft.com",
        "http://facebook.com",
        "http://twitter.com",
    ]:
        await work_queue.put(url)
 
    # Запуск задач
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        await asyncio.gather(
            asyncio.create_task(task("One", work_queue)),
            asyncio.create_task(task("Two", work_queue)),
        )
 
 
if __name__ == "__main__":
    asyncio.run(main())

Строка 2 импортирует библиотеку aiohttp, которая обеспечивает асинхронный способ выполнения HTTP вызовов.  
Строка 3 импортирует класс Timer из модуля codetiming.  
Строка 5 помечает task() как асинхронную функцию.  
Строка 6 создает экземпляр Timer, используемый для измерения времени, необходимого для каждой итерации цикла задач.  
Строка 7 создается менеджер контекста сессии aiohttp.  
Строка 8 создает менеджер контекста ответа aiohttp. Он также выполняет HTTP вызов GET для URL, взятого из work_queue.  
Строка 11 запускает экземпляр timer  
Строка 12 использует сеанс для асинхронного получения текста из URL.  
Строка 13 останавливает экземпляр timer и выводит истекшее время с момента вызова timer.start().  
Строка 39 создает менеджер контекста Timer, который выводит истекшее время, затраченное на выполнение всего цикла while.  
При запуске программы вы увидите следующий вывод: