# 异步爬虫

In [1]:
# 高性能异步爬虫
# 目的：在爬虫中使用异步实现高性能的数据爬取操作

# requestes.get 是一个阻塞的方法 

In [8]:
import requests

In [9]:
headers = {
    'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36'
}

In [10]:
urls = [
    'http://wenjian.jkb.com.cn/video/20140916xuetangguanli.mp4',
    'http://wenjian.jkb.com.cn/video/20140916yidaosuzhiliaodechangyongfangan.mp4',
    'http://wenjian.jkb.com.cn/video/20140910manxingbingheliyongyao.mp4'
]

In [11]:
def get_content(url):
    print("正在爬取： ", url)
    response = requests.get(url=url, headers=headers)
    if response.status_code == 200:
        return response.content

In [12]:
def parse_content(content):
    print('相应数据的长度为： ', len(content))

In [None]:
for url in urls:
    content = get_content(url)
    parse_content(content)

# 异步爬虫的方式

In [13]:
# 多线程 多进程
# 好处：可以为相关阻塞操作单独开启线程或进程 阻塞操作可以异步执行
# 弊端：无法无限制的开启多线程或者多进程


In [14]:
# 线程池 进程池（适当使用）
# 好处：我们可以降低系统对进程或线程创建和销毁的频率 从而很好的降低系统的开销
# 弊端：池中线程或进程的数量是有上限的

In [15]:
import time


In [19]:
def get_page(str):# 模拟
    print("正在下载：", str)
    time.sleep(2)
    print('下载成功：', str)

In [20]:
name_list = ['xiaozi', 'aa', 'bb', 'cc']

In [21]:
# 使用单线程串行方式执行
start_time = time.time()

for i in range(len(name_list)):
    get_page(name_list[i])
    
end_time = time.time()
print('%d second'% (end_time - start_time))

正在下载： xiaozi
下载成功： xiaozi
正在下载： aa
下载成功： aa
正在下载： bb
下载成功： bb
正在下载： cc
下载成功： cc
8 second


In [22]:
# 基于线程池的方式执行

In [23]:
# 导入线程池模块对应的类
from multiprocessing.dummy import Pool

In [24]:
start_time = time.time()
# 实例化一个线程池对象
pool = Pool(4)
# 将列表中每一个元素传递给get_page进行处理
pool.map(get_page, name_list)
end_time = time.time()
print('%d second'% (end_time - start_time))

正在下载： xiaozi
正在下载： aa
正在下载： bb
正在下载： cc
下载成功： xiaozi
下载成功： aa
下载成功： bb
下载成功： cc
2 second


In [25]:
# 线程池使用原则
# 线程池处理的是阻塞且较为耗时的操作


In [46]:
from lxml import etree
import random
import re

In [26]:
# 爬取梨视频的视频数据
# https://www.pearvideo.com/category_5

In [56]:
# 可以参考 https://blog.csdn.net/General_zy/article/details/114791619
# # 动态加载 通过XHR得到视频地址
def getVideoUrl(detail_url):
    # https://www.pearvideo.com/videoStatus.jsp
    url = 'https://www.pearvideo.com/videoStatus.jsp'
    # 需要UA Cookie Referer
    # 否则出现该文章已删除的反爬问题
    headers = {
        'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36',
        'Cookie':'__secdyid=16e132a5380e9acf9e8ef97a306c51163c1b18ec78ef1e55021617337941; JSESSIONID=0287BC9F031F6318EB35D723F1FD05AD; PEAR_UUID=577037eb-26a6-4a93-969b-f5b689bd2446; _uab_collina=161733794176512684106548; UM_distinctid=17890da802c853-0f1a66b5a48631-5771031-1fa400-17890da802de18; Hm_lvt_9707bc8d5f6bba210e7218b8496f076a=1617337942; p_h5_u=5AAFC631-D04D-4263-A8F8-1C4EA78E68A6; acw_tc=781bad2916173399152028460e5d9ec352fd8e7cce2d0e03ba85417275dcbe; CNZZDATA1260553744=706518655-1617334338-https%253A%252F%252Fwww.baidu.com%252F%7C1617339739; Hm_lpvt_9707bc8d5f6bba210e7218b8496f076a=1617339923; SERVERID=a6169b2e0636a71b774d6641c064eb8c|1617339990|1617337941',
        'Referer':detail_url
    }
    videoId = (detail_url.split('/')[-1]).split('_')[-1]
    param = {
        'contId': videoId,
        'mrd': str(random.random())
    }
    # json获取
    # https://video.pearvideo.com/mp4/short/20210402/1617342588941-15646275-hd.mp4
    obj_json = requests.get(url=url, headers=headers, params=param).json()
    #print(obj_json)
    # 这里获得的srcUrl还不是实际播放地址 还需要处理
    srcUrl = obj_json['videoInfo']['videos']['srcUrl']
    # 真实
    # https://video.pearvideo.com/mp4/short/20210402/cont-1724857-15646275-hd.mp4
    realUrl = srcUrl.replace((srcUrl.split('/')[-1]).split('-')[0], 'cont-'+videoId)
    return realUrl

In [55]:
print(getVideoUrl('https://www.pearvideo.com/video_1724857'))

https://video.pearvideo.com/mp4/short/20210402/cont-1724857-15646275-hd.mp4


In [58]:
# 对下述url发请求 解析出视频详情页的url和视频名称
url = 'https://www.pearvideo.com/category_5'
page_text = requests.get(url=url, headers=headers).text

tree = etree.HTML(page_text)
li_list = tree.xpath('//ul[@id="listvideoListUl"]/li')
urls = []# 存储所有视频的链接
for li in li_list:
    detail_url = 'https://www.pearvideo.com/' + li.xpath('./div/a/@href')[0]
    name = li.xpath('./div/a/div[2]/text()')[0] + '.mp4'
    #print(detail_url)
    #print(name)
    # 对详情页的url发起请求
    detail_page_text = requests.get(url=url, headers=headers).text
    # 从详情页中解析出视频的地址 （url）
    # https://video.pearvideo.com/mp4/short/20210402/cont-1724857-15646275-hd.mp4
    # 动态加载 通过XHR得到视频地址
    video_url = getVideoUrl(detail_url)
    dic = {
        'name':name,
        'url':video_url
    }
    urls.append(dic)
    

In [60]:
# 使用线程池对视频数据进行请求 （较为耗时的阻塞操作）
from multiprocessing.dummy import Pool

In [62]:
def get_video_data(dic):
    url = dic['url']
    headers = {
        'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36',
    }
    print(dic['name'], '正在下载')
    data = requests.get(url=url, headers=headers).content
    with open(dic['name'], 'wb') as fp:
        fp.write(data)
        print(dic['name'], '下载成功')

In [63]:
pool = Pool(4)
pool.map(get_video_data, urls)
pool.close()
pool.join()

马晔宁：画虎路上不停歇.mp4 正在下载
全季CEO沈怡均带头接种疫苗：领导就要带头打.mp4 正在下载
DIY迷你娃娃屋，田园中的小风车彩虹别墅.mp4 正在下载
阿富汗青金石的美丽传说.mp4 正在下载
马晔宁：画虎路上不停歇.mp4 下载成功
全季CEO沈怡均带头接种疫苗：领导就要带头打.mp4 下载成功
阿富汗青金石的美丽传说.mp4 下载成功
DIY迷你娃娃屋，田园中的小风车彩虹别墅.mp4 下载成功


In [1]:
# 单线程+异步协程（推荐）
# 协程
# event_loop: 事件循环 相当于一个无线循环 我们可以把一些函数注册到这个事件循环上 当满足某些条件的时候 函数就会被循环执行
# coroutine：协程对象 我们可以将协程对象注册到事件循环中 它会被事件循环调用
# 我们可以使用 async 关键字来定义一个方法 这个方法在调用时不会被立即执行 而是返回一个协程对象 
# task：任务 它是对协程对象的进一步封装 实际上和 task 没有本质区别
# future：代表将来执行或还没有执行的任务 实际上和task没有本质区别
# async：定义一个协程
# await：用来挂起阻塞方法的执行

In [1]:
import asyncio

In [2]:
# async 修饰的函数 调用之后返回的一个协程对象
async def request(url):
    print('正在请求的url是', url)
    print('请求成功，', url)

In [3]:
c = request('www.baidu.com')# 没有马上执行 而是生成了一个协程对象

In [5]:
# 创建一个事件循环对象
EventLoop = asyncio.get_event_loop()
# 将协程对象注册到loop中 然后启动loop
EventLoop.run_until_complete(c)
# jupyter notebook其连接着 IPython 内核，而 IPython 内核本身在事件循环上运行
# 而 asyncio 不允许嵌套其事件​​循环，因此会出现如上图的错误信息。

RuntimeError: This event loop is already running

In [None]:
# task的使用
loop = asyncio.get_event_loop()
# 基于loop创建了一个task对象
task = loop.create_task(c)
print(task)# task未执行

loop.run_until_complete(task)

print(task)# task已执行

In [None]:
# future的使用
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)
print(task)

loop.run_until_complete(task)

print(task)

In [None]:
# 绑定回调
async def request(url):
    print('正在请求的url是', url)
    print('请求成功，', url)
    return url

# 回调函数 任务对象执行成功后执行回调函数
def callback_func(task):
    # result 返回的就是任务对象中封装的协程对象对应的函数的返回值
    print(task.result())

c = request('www.baidu.com')

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)
# 将回调函数绑定到任务对象中
task.add_done_callback(callback_func)
loop.run_until_complete(task)


In [6]:
# 多任务异步协程实现

In [7]:
import asyncio
import time

In [None]:
async def request(url):
    print('正在下载',url)
    # 在异步协程中如果出现了同步模块相关的代码 那么就无法实现异步
    # time.sleep(2)
    # 当在asyncio中遇到阻塞操作 必须进行手动挂起
    await asyncio.sleep(2)
    print('下载完毕，', url)
    return url

In [None]:
start = time.time()
urls = [
    'www.baidu.com',
    'www.sougou.com',
    'www.doubanjia.com'
]

In [None]:
# 任务列表：存放多个任务对象
stasks = []
for url in urls:
    c = request(url)
    task = asyncio.ensure_future(c)
    stasks.append(task)
    
loop = asyncio.get_event_loop()
# 需要将任务列表封装到wait中
loop.run_until_complete(asyncio.wait(stasks))

print(time.time() - start)

In [None]:
# 多任务异步协程的应用

In [10]:
# 使用flask搭建一个web服务器 其中包括三个路由 每个的响应时间都是2秒
from flask import Flask
import time

app = Flask(__name__)

@app.route('/bobo')
def index_bobo():
    time.sleep(2)
    return 'Hello bobo'

@app.route('/jay')
def index_jay():
    time.sleep(2)
    return 'Hello jay'

@app.route('/tom')
def index_tom():
    time.sleep(2)
    return 'Hello tom'

In [12]:
import requests
import asyncio
import time
import aiohttp# 使用该模块中的ClientSession

In [None]:
start = time.time()

urls = [
    'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom'
]

def getPage(url):
    #print('正在下载', url)
    # requests.get 是基于同步的
    # 必须使用基于异步的网络请求模块进行指定url的请求发送
    # aiohttp 基于异步网络请求的模块
    #response = requests.get(url=url)
    #print('下载完毕', response.text)
    async with aiohttp.ClientSession() as session:
        # get、post
        # headers、params、data、proxy='http://ip:port'
        async with await session.get(url) as response:
            # text() 返回字符串形式的响应数据
            # read() 返回二进制形式的响应数据
            # json() 返回的就是json对象
            # 注意：获取响应数据操作之前一定要使用await进行手动挂起
            page_text = await response.text()
            print(page_text)

tasks = []

for url in urls:
    c = getPage(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
    
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()

print('总耗时', end - start)