In [235]:
import collections
import random

class Loop:
    def __init__(self):
        # Очередь для хранения коллбэков
        self.ready = collections.deque()

    def call_soon(self, callback, *args):
        # складывает кортэж из коллбэка и его аргументов в очередь
        self.ready.append((callback, args))

    def run_until_complete(self, callback, *args):
        # Этот метод выполняет всё работу по запуску коллбэков
        self.call_soon(callback, *args)
        # Перекресток вех дорог - основной цикл
        # он крутится пока очередь не опустеет
        while self.ready:
            ntodo = len(self.ready)
            # внутренний цикл итерируется столько раз 
            # сколько было коллбэков в очереди на момент его запуска
            for _ in range(ntodo):
                # на каждой интерации достаёт из очереди
                # один коллбэк и его параметры и запускает
                callback, args = self.ready.popleft()
                callback(*args)

def callback(loop):
    print('Рассказчик')
    loop.call_soon(print, 'Читатель')

loop = Loop()
loop.run_until_complete(callback, loop)

Рассказчик
Читатель


In [246]:
def maybe_print(msg):
    if random.randint(0, 1):
        raise Exception(msg)
    else:
        print(msg)

def starting_point(loop):  # Место посадки
    print('Рассказчик')
    loop.call_soon(maybe_print, 'Читатель')

def main(loop):
    loop.call_soon(starting_point, loop)
    loop.call_soon(starting_point, loop)

loop = Loop()
loop.run_until_complete(main, loop)

Рассказчик
Рассказчик
Читатель
Читатель


In [227]:
def grep(pattern):
    print("start grep for", pattern)
    while True:
        s = yield
        if pattern in s:
            print("found!", s)
        else:
            print("no %s in %s" % (pattern, s))
            
# запуск корутины -- запись значения в s через send -- нахождение паттерна -- след итерация цикла, снова остановка на yield

In [228]:
g = grep("hello")
next(g) # вызов корутины

start grep for hello


In [229]:
g.send("hello world")

found! hello world


In [230]:
next(g) # в s None - остановка цикла -- далее будет stopiteration

TypeError: argument of type 'NoneType' is not iterable

In [None]:
g.send("dwkjdkwjjd world")

In [None]:
g.send("dwdw")

In [None]:
g.throw(ValueError("err"))

In [None]:
g.send("dwdw")

In [None]:
from inspect import getgeneratorstate

In [None]:
def subgen():
    message = yield
    print("Subgen received: ", message)

In [None]:
g = subgen()

In [None]:
g.send("wewe")

In [None]:
getgeneratorstate(g)

In [None]:
# инициализация корутины
g.send(None) # or next(g)

In [None]:
getgeneratorstate(g)

In [None]:
g.send("OK")

In [None]:
def subgen1():
    x = "start"
    message = yield x
    print(f"got {message=}")

# def subgen1():
#     x = "start" сначала выполнение этой строки, выдача x 
#          = yield x
#     message 
#     print(f"got {message=}") запись send() в message и print при вызове next

g1 = subgen1()
g1.__next__()

In [None]:
g1.send("qwert")

In [None]:
def average():
    sum = 0
    counter = 0
    while True:
        num = yield sum
        counter += 1
        sum += num
        print(f"average={sum/counter}")

average = average()
average.send(None)
for i in range(1, 10):
    average.send(i)
    
55 / 10

In [None]:
def start_coro(func):
    def inner(*args, **kwargs):
        g = func(*args, **kwargs)
        #g.send(None)
        next(g)
        return g
    return inner

In [None]:
class CustomExc(Exception):
    pass

In [None]:
@start_coro
def average1():
    count = 0
    sum = 0
    average = None
    while True:
        try:
            x = yield average
        except StopIteration:
            print("Done")
            break
        except CustomExc:
            print("Got CustomExc")
            break
        else:
            count += 1
            sum += x
            average = round(sum/count, 1) # кол-во знаков после запятой
    return average

In [None]:
g2 = average1()
getgeneratorstate(g2)

In [None]:
for i in range(1, 10):
    g2.send(i)
    
try:
    g2.throw(StopIteration)
except StopIteration as e:
    print("Average: ", e.value) # значение return можно получить только так

In [297]:
@start_coro
def subgen():
    while True:
        try:
            message = yield "w"
        except StopIteration:
            print("Ku-ku")
        else:
            print("........", message)
            
@start_coro
def delegator(subgen):
    while True:
        try:
            data = yield "smth"
            # next(subgen)
            print(subgen.send(data))
        except StopIteration as e:
            subgen.throw(e)

In [298]:
g = subgen()
dele = delegator(g)
next(g) # если не выводить через print, то вывод в строке out
#print(next(g))
# dele.send("aeawe")

........ None


'w'

In [299]:
print(g.send("g.send"))

........ g.send
w


In [300]:
next(dele)

........ None
w


'smth'

In [None]:
dele.__next__() # print(subgen.send(data)) внутри dele -- w

In [291]:
# эквивалентая запись
@start_coro
def new_delegator(subgen):
    result = yield from subgen # в result записывается то, что вернет subgen после StopIteration
    print(result)
    # если просто yield то выдаем сам subgen obj
# start_coro уже не нужен
def new_subgen():
    while True:
        try:
            message = yield "w" # теперь 'w' будет выводиться автоматически
        except StopIteration:
            print("Ku-ku")
            break
        except CustomExc:
            print("Custom ku-ku")
        else:
            print("........", message)
    return "returned val"

# yield from == await (делегатор блокируется на время выполнения subgen, т.е. awaits subgen)

In [292]:
g1 = new_subgen()
dele1 = new_delegator(g1)

In [302]:
next(dele1) # то есть берем содержимое subgen через next делеготора

........ None


'w'

In [293]:
dele1.send("wda")

........ wda


'w'

In [294]:
try:
    dele1.throw(StopIteration)
except StopIteration as e:
    print(e.value)

Ku-ku
returned val
None


In [295]:
# try:
#     g1.throw(StopIteration)
# except StopIteration as e:
#     print(e.value)

In [296]:
g1 = new_subgen()
dele1 = new_delegator(g1)

In [None]:
dele1.throw(StopIteration)

In [None]:
g1 = new_subgen()
dele1 = new_delegator(g1)

In [None]:
dele1.throw(CustomExc) # returned val не вернулось, только при StopIter (в res будет w)

In [None]:
g1 = new_subgen()
dele1 = new_delegator(g1)

In [None]:
print(dele1.throw(CustomExc))

In [None]:
def foo():
    yield [1, 2, 3]
    #yield from "iterable"

In [None]:
foo_ = foo()
for item in foo_:
    print(f"this is {item}")

In [None]:
# если писать yield перед yield from, то будет ошибка gen obj isn't callable
def foo1():
    try:
        yield from "iterable"
        yield [1, 2, 3]
    except StopIteration:
        pass
    return "this is StopIteration value"

In [None]:
foo1_ = foo1()
for item in foo1_:
    print(f"this is {item}")
try:
    next(foo1_)
except StopIteration as e:
    print(e.value)

# gen = foo1()
# try:
#    gen.throw(StopIteration)
# except StopIteration as e:
#     print(e.value)   --- не корутина

In [None]:
# Asyncio
# экземпляры класса Task(Future) - дейсвтия, котоыре должны выполняться асинхронно
# class Future - класс - заглушка для передачи контроля управления обратно в событтийный цикл
# асинхронная функция сразу отдает эту заглушку, чтобы вернуть контроль выполнения
# с экземпляром класса Task связана конкретная корнутина, информация о кторой хранится в атрибутах экземпляра

In [None]:
# Event loop:
    # take Task from the queue -- Task.coro.step() -- starting coro algo -- 
    # (maybe starting subgens, delegators and blocking initial coro) -- 
    # taking another Tasks from the queue -- going to the initial coro -- 
    # staring initial coro from the last place -- e.t.c.

In [231]:
import asyncio

In [None]:
# @asyncio.coroutine -- побуждает функцию возвращать gen obj(корутину)
# yield from - вызов асинхронной функции(корутины)
# task = asyncio.ensure_future(gen_foo()) - создание экземпляра класса Task

# if __name__ == "__main__":
#    loop = asynio.get_event_loop()
#    loop.run.until_complete(main())
#    loop.close()

In [None]:
# новый подход
# async def foo(): преобразование в корутину
#    await ... (вызов корутины)
# async def main(): формирование алгоритма событийного цикла
#    task1 = asyncio.create_task(gen_foo1()) - Task inst creation
#    task2 = asyncio.create_task(gen_foo2())
#    await asyncio.gather(task1, task2)

# if __name__ == "__main__":
#    asyncio.run(main())

In [None]:
# async/await - натинвые корутины
# await - точки входа/выхода в корутинах
# async def определяет нативную корутину
# async def outer():
#   await asyncio.sleep(2) - await передает управление влоденной корутине
# await ждет полного выполения вложенной корутины(await схож с yield from)
# вложенная корутина может вызывать свои корутины через await и тд
# await оптарвляет sleep в event_loop и loop передает упралвение толкьо ей, outer корутина блокируется
# то есть в loop будет прокручиватсья толкьо asyncio.sleep

# нативные корутины вызываюстя только через await
# await нельяз запустить в глобальнмо простанстве
# поэтмоу нужно запустить asyncio.run(main_func()) - отправить туда коуртину. котоаря запустит в свою очередь вложенную через await
# то есть нельзя сделать await main(), где main() - алгоритм цикла событий
# через await отправляем корутину в цикл событий
# await приостанавливает выполнение текущей корутины, пока не выполинтся вызываемая с помощью await вложенная корутина

In [303]:
async def print_nums():
    num = 1
    while True:
        print(num)
        num += 1
        await asyncio.sleep(0.1)
        
async def print_time(): 
    count = 0
    while True:
        if count % 3 == 0:
            print(f"{count} sec have passed")
        count += 1
        await asyncio.sleep(1)
        
async def main():
    task1 = asyncio.create_task(print_nums())
    task2 = asyncio.create_task(print_time())

    await asyncio.gather(task1, task2)
    
    
if __name__ == "__main__":
    # asyncio.run(main())
    await main() # в jupyter свой event loop

1
0 sec have passed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
3 sec have passed
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
6 sec have passed
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
9 sec have passed
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
12 sec have passed
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
15 sec have passed
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
18 sec have passed
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
21 sec have passed
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
24 s

1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
168 sec have passed
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
171 sec have passed
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
174 sec have passed
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
177 sec have passed
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
180 sec have passed
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
183 sec have passed
1819
1820
1821


3093
3094
3095
3096
3097
3098
312 sec have passed
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
315 sec have passed
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146
3147
3148
3149
3150
3151
3152
3153
3154
3155
3156
3157
318 sec have passed
3158
3159
3160
3161
3162
3163
3164
3165
3166
3167
3168
3169
3170
3171
3172
3173
3174
3175
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185
3186
3187
321 sec have passed
3188
3189
3190
3191
3192
3193
3194
3195
3196
3197
3198
3199
3200
3201
3202
3203
3204
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217
324 sec have passed
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
3238
3239
3240
3241
3242
3243
3244
3245
3246
327 sec have passed
3247
3248
3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262
3263
3264
3265
3266
3267
3268


CancelledError: 

In [232]:
async def countdown(n):
    print(f"start count_{n}")
    for i in range(n):
        print(f"count_{i}")
        await asyncio.sleep(1) # in sec

In [233]:
import time

t1 = time.time()
c = countdown(5) # warning after work of the garbage collector, вызвали не через await

print(c)
await c

print(time.time() - t1)

# asynio.sleep вызывает callback(обработчик события) и помещает его в очередь цикла событий, далее 
# контроль упралвения переключается на др событие(в это время корутина спит)
# 4 ms - время выполнения других событий(tasks) в очереди -- возвращения к событию

<coroutine object countdown at 0x7f653d7a02e0>
start count_5
count_0
count_1
count_2
count_3
count_4
5.00423526763916


In [None]:
await countdown(5)
await countdown(5)
# time: 10+ sec, текущая корутина (не задается в jupyter) блокируется на время 
# выполнения сначала 1-ой, затем 2-ой корутин -- 10+ sec

In [None]:
t1 = time.time()

task1 = asyncio.create_task(countdown(5)) # сразу отправляет в event_loop
print("created task")

await countdown(6)
print("end count 6")

await task1 # дожидаемся выполнения корутины,запущенной не в блок режиме(task1)
print("end task") # здесь уже глав корутина блокируется 

t2 = time.time()
print("time", t2 - t1)

In [234]:
t1 = time.time()

tasks = [
    asyncio.create_task(countdown(i))
    for i in (5, 6, 3, 4)
]
print("created tasks")
# gather = asyncio.gather(*tasks) # собирает(объединяет tasks) -- дожидаемся выполнение всех тасков
# print(type(gather))
# await asyncio.gather(*tasks) # gather
# await gather
print("end gather")

t2 = time.time()
print("time", t2 - t1)

created tasks
end gather
time 0.0008287429809570312
start count_5
count_0
start count_6
count_0
start count_3
count_0
start count_4
count_0
count_1
count_1
count_1
count_1
count_2
count_2
count_2
count_2
count_3
count_3
count_3
count_4
count_4
count_5


In [None]:
a = 5 # без gather можно было бы выполнить этот код, счет шел бы далее 
# т.к. главная вызывающая корутина main прокрутится, ее код выполнится, 
# а task будут дальше крутиться в очереди цикла событий

In [247]:
t1 = time.time()

tasks = [
    asyncio.create_task(countdown(i))
    for i in (5, 6, 3, 4)
]
print("created tasks")
gather = asyncio.gather(*tasks) # собирает(объединяет tasks) -- дожидаемся выполнение всех тасков из event loop'а
print(type(gather))
# await asyncio.gather(*tasks) # gather
await gather # "блокирует" поток главной корутины - обертки (async def main()), которая await'ится с помощью asyncio.run
print("end gather")

t2 = time.time()
print("time", t2 - t1)

created tasks
<class 'asyncio.tasks._GatheringFuture'>
start count_5
count_0
start count_6
count_0
start count_3
count_0
start count_4
count_0
count_1
count_1
count_1
count_1
count_2
count_2
count_2
count_2
count_3
count_3
count_3
count_4
count_4
count_5
end gather
time 6.006422519683838


In [248]:
async def countdown(n):
    print(f"start count_{n}")
    for i in range(n):
        print(f"count_{n}: {i}")
        await asyncio.sleep(1)
    
    return n

t1 = time.time()

tasks = [
    asyncio.create_task(countdown(i))
    for i in (5, 6, 3, 4)
]
print("created tasks")

for coro in asyncio.as_completed(tasks): # gen, возвращающий корутины, закончившие выполнение
    res = await coro # в res запишется то, что вернется после await корутины, то есть то, что корутина возвращает в результате StopIteration
    print(res, coro)

print("end gather")

t2 = time.time()
print("time", t2 - t1)

created tasks
start count_5
count_5: 0
start count_6
count_6: 0
start count_3
count_3: 0
start count_4
count_4: 0
count_5: 1
count_6: 1
count_3: 1
count_4: 1
count_5: 2
count_6: 2
count_3: 2
count_4: 2
count_5: 3
count_6: 3
count_4: 3
3 <coroutine object as_completed.<locals>._wait_for_one at 0x7f653d7a1700>
count_5: 4
count_6: 4
4 <coroutine object as_completed.<locals>._wait_for_one at 0x7f653d5f8c80>
count_6: 5
5 <coroutine object as_completed.<locals>._wait_for_one at 0x7f653d7a1700>
6 <coroutine object as_completed.<locals>._wait_for_one at 0x7f653d5f8c80>
end gather
time 6.006797552108765


In [None]:
"str" # так же ждем выполнения всех корутин

In [249]:
async def countdown(n):
    print(f"start count_{n}")
    for i in range(n):
        print(f"count_{n}: {i}")
        await asyncio.sleep(1)
    
    return n


async def block_coro(n):
    time.sleep(n)


t1 = time.time()

tasks = [
    asyncio.create_task(countdown(5)), # засыпаем на 1sec -- сразу отдаем управление блокирующей корутине
    asyncio.create_task(block_coro(5)),# т.е. спим 5s вместо 1s -- потом event_loop выходит из блока -- досчитывается остальное
]
print("created tasks")

await asyncio.gather(*tasks)

print("end gather")

t2 = time.time()
print("time", t2 - t1)

# Task
# Это специальный подкласс класса Future. 
#Он нужен для запуска корутины на коллбэчном цикле событий.

# create_tast (мб) эквивалентно loop.call_soon(callback, *args), то есть
# если не оборачивать в task, то корутпны прокрутятся в цикле событий сразу же
# без доабвления их в очередь цикла событий и постепенной выдачи результата по очереди
# то есть: первоначально добавление main в очередь -- coro полностью прокрутятся в глав корутине main -- завершение event loop, т..к очередь пуста

# с gather/as_completed потко выполнения main ждет полной прокрутки корутин в 
# в цикле событий

created tasks
start count_5
count_5: 0
count_5: 1
count_5: 2
count_5: 3
count_5: 4
end gather
time 9.006069898605347


In [286]:
def foo():
    data = yield from "smth"
    print(data)

@start_coro # забирает первйы yield - "s"
def sub_foo(gen):
    try:
        for _ in range(0, 4):
            a = yield from gen
            print("output:", a)
    except StopIteration:
        print("Done")
        
g1 = foo()
g2 = sub_foo(g1)
print(next(g2))
# g2.send("qwe") has no attr send
next(g2)
# next(g2) # выводит буквы при next, в a ничего не пишется
# next(g2)
next(g1) # equal to next(g2)
        
# g1 = foo()
# g2 = sub_foo(g1)
# for i in g2:
#     print(i)

m


'h'

In [251]:

!pip install aiohttp

Defaulting to user installation because normal site-packages is not writeable
Collecting aiohttp
  Downloading aiohttp-3.8.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m897.9 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting async-timeout<5.0,>=4.0.0a3
  Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB)
Collecting yarl<2.0,>=1.0
  Downloading yarl-1.8.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (263 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m264.0/264.0 KB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting multidict<7.0,>=4.5
  Downloading multidict-6.0.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (114 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m114.5/114.5 KB[0m [31m943.5 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting aiosignal>=1.1.

In [252]:
import aiohttp

In [283]:
URL = "https://docs.python.org/3/whatsnew/3.11.html"
URLS = [URL] * 50


async def fetch(url, session):
    async with session.get(url) as resp:
        data = await resp.read()
        assert resp.status == 200
        #print(resp.status, len(data))
    

async def batch_fetch(urls):
    async with aiohttp.ClientSession() as session:
        for url in urls:
            # next(fetch(url, session)) not an iterator
            print(await fetch(url, session)) # None
            
# по-прежнему последовательная обрабокта каждого урла, т.к. нет тасков на каждый(лучше - определенное количество) урлов
# вне jupyter весь последующий код оборачивается в кортину main(нет, в качестве корутины main выступает batch_url)
# jupyter просто позволяет сразу делать await корутины
t1 = time.time()

await batch_fetch(URLS) # run(batch ...)

t2 = time.time()
print("time", t2 - t1)

None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
time 5.815146207809448


In [304]:
URL = "https://docs.python.org/3/whatsnew/3.11.html"
URLS = [URL] * 50

# создаем таски и используем семафор для огранчиения нагрузки на сайт
# причем лучше заранее создать определенное количество тасков, чтобы не было большого расхода по памяти 
async def fetch(url, session, sem):
    async with sem:
        async with session.get(url) as resp:
            data = await resp.read()
            assert resp.status == 200
            #print(resp.status, len(data))
    

async def batch_fetch(urls, sem):
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(fetch(url, session, sem))
            for url in urls
        ]
        await asyncio.gather(*tasks)

            
t1 = time.time()

sem = asyncio.Semaphore(10)
await batch_fetch(URLS, sem)

t2 = time.time()
print("time", t2 - t1)
# врем явыполнения - время обработки самого "медленного" урла(+время ротации тасков и др операции цикла)

time 1.7093641757965088


In [318]:
from bs4 import BeautifulSoup

In [323]:
# coroutine join()
# Block until all items in the queue have been received and processed.
URL = "https://docs.python.org/3/whatsnew/3.11.html"
URLS = [URL] * 15
counter = 0


async def fetch(session, q):
    while True:
        url = await q.get() # also blocking: waiting for smth in the queue(can set timeout)
        global counter
        counter += 1

        try:
            async with session.get(url) as resp:
                data = await resp.text()
                print(data[:1000])
                soup = BeautifulSoup(data.content, "html.parser")
                print(soup)
                assert resp.status == 200
        finally:
            q.task_done() # без этой команды join будет бесконечно ждать 
                           # выполнения всех тасков из очереди
    

async def batch_fetch(urls, workers=5):
    q = asyncio.Queue()
    for url in urls:
        await q.put(url)

    async with aiohttp.ClientSession() as session:
        workers = [
            asyncio.create_task(fetch(session, q))
            for _ in range(workers)
        ]
        await q.join() # нужно явным образом отменить каждый воркер, иначе они завершаются вне зоны видисоти garbage коллектором (во время работы)
        
        for w in workers:
            w.cancel()


t1 = time.time()

await batch_fetch(URLS, workers=5)

t2 = time.time()
print("time", t2 - t1)
print("count", counter)


# queue.join():
# Метод queue.join() блокирует выполнение программы до тех пор, пока все элементы в очереди не будут получены и обработаны.
# Представляет собой сопрограмму, используется с оператором await и может быть обернута в задачу.
# Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь.
# Счетчик уменьшается всякий раз, когда сопрограмма-потребитель вызывает queue.task_done() для указания, что элемент был получен и вся работа над ним завершена.
# Когда количество незавершенных задач падает до нуля, то метод queue.join() разблокирует ход выполнения программы.

# queue.task_done():
# Метод queue.task_done() указывает, что задача, ранее поставленная в очередь, завершена. Используется потребителями очереди.
# Для каждого метода queue.get(), используемого для выборки задачи, последующий вызов queue.task_done() сообщает очереди, что обработка задачи завершена.
# Если queue.join() в настоящее время блокируется, то он возобновится, когда все элементы будут обработаны (это означает, что вызов queue.task_done() был получен для каждого элемента, который был queue.put() в очереди).
# Вызывает ошибку ValueError, если вызывается больше раз, чем было помещено в очередь.


<!DOCTYPE html>

<html>
  <head>
    <meta charset="utf-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" /><meta name="generator" content="Docutils 0.17.1: http://docutils.sourceforge.net/" />

    <title>What’s New In Python 3.11 &#8212; Python 3.11.0 documentation</title><meta name="viewport" content="width=device-width, initial-scale=1.0">
    
    <link rel="stylesheet" type="text/css" href="../_static/pygments.css" />
    <link rel="stylesheet" type="text/css" href="../_static/pydoctheme.css?2022.1" />
    
    <script data-url_root="../" id="documentation_options" src="../_static/documentation_options.js"></script>
    <script src="../_static/jquery.js"></script>
    <script src="../_static/underscore.js"></script>
    <script src="../_static/doctools.js"></script>
    
    <script src="../_static/sidebar.js"></script>
    
    <link rel="search" type="application/opensearchdescription+xml"
          title="Search within Python 3.11.0 documentation

CancelledError: 

In [324]:
import aiohttp
