协程是实现并发编程的一种方式,多线程/多进程,正式解决并发问题的经典模型之一,但是随着互联网的快速发展,遇到了c10k瓶颈,也就是同时连接到服务器的客户达到了一万个,于是很多代码跑崩了,进程上下文切换占用了大量资源,线程也顶不住如此巨大的压力

python3.7提供了基于asyncio和async/await的方法

**这一节建议在pycharm这类ide中测试,jupyter会有Error**

### 1. 一个简单的爬虫例子

In [1]:
#模拟爬虫
import time
def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    time.sleep(sleep_time)
    print('OK {}'.format(url))
    
def main(urls):
    for url in urls:
        crawl_page(url)
        
        
%time main(['url_1', 'url_2', 'url_3', 'url_4'])

crawling url_1
OK url_1
crawling url_2
OK url_2
crawling url_3
OK url_3
crawling url_4
OK url_4
Wall time: 10 s


**并发化**

In [4]:
import asyncio

async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))
    
async def main(urls):
    for url in urls:
        await crawl_page(url)
        
        
#asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
await main(['url_1', 'url_2', 'url_3', 'url_4'])

#下面是3.6的写法 建议在colab下运行
#loop = asyncio.get_event_loop()
#%time loop.run_until_complete(main(['url_1', 'url_2', 'url_3', 'url_4']))

crawling url_1
OK url_1
crawling url_2
OK url_2
crawling url_3
OK url_3
crawling url_4
OK url_4


async 修饰词声明异步函数,于是,这里crawl_page和main都变成了异步函数,而调用异步函数,我们便可以得到一个协程对象(coroutine object)

In [10]:
print(crawl_page(''))

<coroutine object async-def-wrapper.<locals>.crawl_page at 0x000001176F90DB48>


  """Entry point for launching an IPython kernel.


### 执行协程

#### 1.await来调用

await执行的效果,和python正常执行是一样的,也就是说程序会阻塞在这里,进入被调用的协程函数,执行完毕返回后再继续,而这也是await的字面意思<br/>
代码中
```python
await asyncio.sleep(sleep_time)#会在这里休息若干秒,
await crawl_page(url)#会执行crawl_page()函数
```

### 2.通过asyncio.create_task()来创建任务

### 3.需要asyncio.run来触发运行

asyncio.run这个函数是python3.7之后才有的特性,可以让python的协程接口变得非常简单
asyncio.run(main())作为主程序的入口函数
在程序运行期间,只调用一次asyncio.run()

**await是同步调用**

### Task

In [5]:
import asyncio

In [6]:
async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))
    
async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    for task in tasks:
        await task
        
#3.6的写法
async def main3_6(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    for task in tasks:
        await task

In [7]:
#%time asyncio.run(main(['url_1','url_2','url_3','url_4']))
await main(['url_1','url_2','url_3','url_4'])

crawling url_1
crawling url_2
crawling url_3
crawling url_4
OK url_1
OK url_2
OK url_3
OK url_4


In [14]:
async def count_n(number):
    print("computer begin")
    n = 0
    for i in range(10000):
        n += i
    print('computer end, loop {} result is {}'.format(number,n))
    
async def computer_main():
    tasks = [asyncio.create_task(count_n(i)) for i in range(3)]
    for task in tasks:
        await task

In [15]:
await computer_main()

computer begin
computer end, loop 0 result is 49995000
computer begin
computer end, loop 1 result is 49995000
computer begin
computer end, loop 2 result is 49995000


从这个结果看,如果任务中有休眠,或者阻塞的情况,那么协程将会比较有作用,也就是io密集的话,协程比多线程要简单些

对于task,另一种做法

In [16]:
import asyncio
 
async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))
 
async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    await asyncio.gather(*tasks)
    
await main(['url_1', 'url_2', 'url_3', 'url_4'])

crawling url_1
crawling url_2
crawling url_3
crawling url_4
OK url_1
OK url_2
OK url_3
OK url_4


In [11]:
tasks = [1,2,3]
def main(arg1, arg2, arg3):
    print(arg1)
    print(arg2)
    print(arg3)

print(* tasks)    
main(* tasks)

1 2 3
1
2
3


In [13]:
dic = {"name":"txx","age":"18"}
def main(name, age):
    print(name)
    print(age)
print(* dic)
main(** dic)

name age
txx
18


*tasks解包列表,将列表变成了函数的参数;与之对应的是,*\**dict将字典变成函数的参数<br/>
asyncio.create_task,asyncio.run这些函数都是py3.7以上版本提供的

In [23]:
import asyncio
 
async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')
 
async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')
 
async def main():
    print('before await')
    await worker_1()
    print('awaited worker_1')
    await worker_2()
    print('awaited worker_2')
    

#这些都是同步调用
await main()

before await
worker_1 start
worker_1 done
awaited worker_1
worker_2 start
worker_2 done
awaited worker_2


In [24]:
import asyncio
 
async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')
 
async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')
 
async def main():
    task1 = asyncio.create_task(worker_1())
    task2 = asyncio.create_task(worker_2())
    print('before await')
    await task1
    print('awaited worker_1')
    await task2
    print('awaited worker_2')
 
await main()

before await
worker_1 start
worker_2 start
worker_1 done
awaited worker_1
worker_2 done
awaited worker_2


* asyncio.run(main()) 程序进入main()函数,事件循环开启
* task1和task2任务被创建,并进入事件循环等待运行,运行到print,输出'before await'
* await task1执行,用户选择从当前的主任务中切出,事件调度器开始调度worker_1;
* woker_1开始工作,运行 print 输出'worker_1 start'，然后运行到 await asyncio.sleep(1)， 从当前任务切出，事件调度器开始调度 worker_2；
* worker_2 开始运行，运行 print 输出 'worker_2 start'，然后运行 await asyncio.sleep(2) 从当前任务切出；
* 以上所有事件的运行时间，都应该在 1ms 到 10ms 之间，甚至可能更短，事件调度器从这个时候开始暂停调度；
* 一秒钟后，worker_1 的 sleep 完成，事件调度器将控制权重新传给 task_1，输出 'worker_1 done'，task_1 完成任务，从事件循环中退出；
* await task1 完成，事件调度器将控制器传给主任务，输出 'awaited worker_1'，·然后在 await task2 处继续等待；
* 两秒钟后，worker_2 的 sleep 完成，事件调度器将控制权重新传给 task_2，输出 'worker_2 done'，task_2 完成任务，从事件循环中退出；
* 主任务输出 'awaited worker_2'，协程全任务结束，事件循环结束。

#### 某些协程任务限定时间,一旦超时就取消
#### 某些协程运行时出现错误,该怎么处理

In [26]:
import asyncio
 
async def worker_1():
    await asyncio.sleep(1)
    return 1
 
async def worker_2():
    await asyncio.sleep(2)
    return 2 / 0
 
async def worker_3():
    await asyncio.sleep(3)
    return 3
 
async def main():
    task_1 = asyncio.create_task(worker_1())
    task_2 = asyncio.create_task(worker_2())
    task_3 = asyncio.create_task(worker_3())
 
    await asyncio.sleep(2)
    task_3.cancel()
 
    res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
    print(res)
 
await main()

[1, ZeroDivisionError('division by zero'), CancelledError()]


可以看到worker_1正常工作,worker_2运行中出现错误,worker_3执行时间过长被cancel掉<br/>
**注意:** return_exceptions=True这行代码,如果不设置这个参数,错误就会完整地throw到我们这个执行层,从而需要try except来捕捉,这也就意味着其他还没被执行地任务会被全部取消掉,所以需要将return_exceptions设置为True即可

### 协程实现生产者消费者模型

In [27]:
import asyncio
import random
 
async def consumer(queue, id):
    while True:
        val = await queue.get()
        print('{} get a val: {}'.format(id, val))
        await asyncio.sleep(1)
 
async def producer(queue, id):
    for i in range(5):
        val = random.randint(1, 10)
        await queue.put(val)
        print('{} put a val: {}'.format(id, val))
        await asyncio.sleep(1)
 
async def main():
    queue = asyncio.Queue()
 
    consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1'))
    consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2'))
 
    producer_1 = asyncio.create_task(producer(queue, 'producer_1'))
    producer_2 = asyncio.create_task(producer(queue, 'producer_2'))
 
    await asyncio.sleep(10)
    consumer_1.cancel()
    consumer_2.cancel()
    
    await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)
 
await main()

producer_1 put a val: 7
producer_2 put a val: 1
consumer_1 get a val: 7
consumer_2 get a val: 1
producer_1 put a val: 7
producer_2 put a val: 6
consumer_2 get a val: 7
consumer_1 get a val: 6
producer_1 put a val: 6
producer_2 put a val: 9
consumer_1 get a val: 6
consumer_2 get a val: 9
producer_1 put a val: 4
producer_2 put a val: 5
consumer_2 get a val: 4
consumer_1 get a val: 5
producer_1 put a val: 3
producer_2 put a val: 4
consumer_1 get a val: 3
consumer_2 get a val: 4


In [8]:
!pip install aiohttp

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple


In [10]:
!pip install bs4

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple


In [12]:
import asyncio
import aiohttp
 
from bs4 import BeautifulSoup

header = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.129 Safari/537.36'}
 
async def fetch_content(url):
    async with aiohttp.ClientSession(
        headers=header, connector=aiohttp.TCPConnector(ssl=False)
    ) as session:
        async with session.get(url) as response:
            return await response.text()
 
async def main():
    url = "https://movie.douban.com/cinema/later/beijing/"
    init_page = await fetch_content(url)
    init_soup = BeautifulSoup(init_page, 'lxml')
 
    movie_names, urls_to_fetch, movie_dates = [], [], []
 
    all_movies = init_soup.find('div', id="showing-soon")
    for each_movie in all_movies.find_all('div', class_="item"):
        all_a_tag = each_movie.find_all('a')
        all_li_tag = each_movie.find_all('li')
 
        movie_names.append(all_a_tag[1].text)
        urls_to_fetch.append(all_a_tag[1]['href'])
        movie_dates.append(all_li_tag[0].text)
 
    tasks = [fetch_content(url) for url in urls_to_fetch]
    pages = await asyncio.gather(*tasks)
 
    for movie_name, movie_date, page in zip(movie_names, movie_dates, pages):
        soup_item = BeautifulSoup(page, 'lxml')
        img_tag = soup_item.find('img')
 
        print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))
 
await main()

金禅降魔 05月08日 https://img9.doubanio.com/view/photo/s_ratio_poster/public/p2564190636.jpg
82号古宅 05月15日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2586838530.jpg
亲亲哒 05月28日 https://img9.doubanio.com/view/photo/s_ratio_poster/public/p2579189776.jpg
奇妙王国之魔法奇缘 06月01日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2577837112.jpg
六月的秘密 06月21日 https://img1.doubanio.com/view/photo/s_ratio_poster/public/p2522497098.jpg
秘密访客 06月25日 https://img1.doubanio.com/view/photo/s_ratio_poster/public/p2579398648.jpg
无名狂 06月25日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2574800433.jpg
我想静静 08月07日 https://img1.doubanio.com/view/photo/s_ratio_poster/public/p2595969179.jpg
