- title: python多进程多线程抓取图片
- author: Burgan
- date: 2021-02-19
- category: Projects
- tags: python, crawler
- status: draft
- summary: 使用多进程+多线程方式爬区图片的简单实现


多进程多线程的资料一大把, 这里简单推荐几个,可供参考:

[Multiprocessing : use tqdm to display a progress bar](https://stackoverflow.com/questions/41920124/multiprocessing-use-tqdm-to-display-a-progress-bar)

[Python ThreadPoolExecutor Tutorial](https://tutorialedge.net/python/concurrency/python-threadpoolexecutor-tutorial/)

[python 彻底解读多线程与多进程](https://blog.csdn.net/lzy98/article/details/88819425)

In [114]:
# ttl import
import os
import time
from random import random, randint
from pathlib import Path
import json
import requests
from threading import Thread
from multiprocessing import Process, Pool, Manager
import numpy as np
import pandas as pd
from bs4 import BeautifulSoup
import datetime
from concurrent.futures import ThreadPoolExecutor

## Step_1: 将页面url按规则计算出来,存在一个dict中,{url, file_path, saved}, 并设有检查器

**初始化并保存数据表, 保存到'./urls_data.json'**

In [2]:
def get_all_vol_url():
    vol_dict = dict()
    res = requests.get('https://www.manhuadb.com/manhua/222/211_2669.html')
    soup = BeautifulSoup(res.content, 'lxml')
    links = soup.find_all('li', 'sort_div')
    def append_by_li(li):
        vol_ = 'vol_' + li['data-sort']
        vol_dict[vol_] = 'https://www.manhuadb.com'+li.a['href']
    [append_by_li(li) for li in links]
    return vol_dict

In [3]:
def get_image_count_of_vol(vol_url):
    res = requests.get(vol_url)
    soup = BeautifulSoup(res.content, 'lxml')
    div = soup.find('div', {'class': {'d-none vg-r-data'}, })
    return int(div['data-total'])

In [31]:
def get_vol_df(vol, url):
    """把一个vol的页码完型并存到df中."""
    counts = get_image_count_of_vol(url)
    img_urls = [url.replace('.html', '_p%d.html' % i) for i in range(1, counts+1)]
    def set_path_by_no(no):
        folder = f'/home/qicai21/Pictures/blames/{vol}/'
        name = '{0:03}.jpg'.format(no)
        return folder + name
    img_paths = [set_path_by_no(i) for i in range(1, counts+1)]
    exists = np.zeros(counts, dtype=int)
    vols = np.repeat(vol, counts)
    df = pd.DataFrame({
        'url': img_urls,
        'path': img_paths,
        'exist': exists,
        'vol': vols

    })
    return df

In [32]:
def set_all_img_data():
    vols = get_all_vol_url()
    dfs = [get_vol_df(vol, url) for vol, url in vols.items()]
    return pd.concat(dfs)

In [52]:
df = set_all_img_data()
df = df.reset_index(drop=True)
df.to_json('./urls_data.json')

**update数据表的exist选项**

In [40]:
def set_exist(data_row):
    p = Path(data_row['path'])
    if not p.parents[0].exists():
        p.parents[0].mkdir(parents=True, exist_ok=True)
    if p.exists():
        data_row['exist'] = 1
    return data_row
        
def update_img_df_with_file_exist(df):
    return df.apply(set_exist, axis=1)

In [49]:
read_df = pd.read_json('./urls_data.json')
df = update_img_df_with_file_exist(read_df)
df.to_json('./urls_data.json')

## Step_2: 简单线程用例

In [118]:
def delay_check_file(path):
    print('start check file %s' % path)
    time.sleep(random())
    if Path(path).exists():
        print('exist')
    else:
        print('not exist')

def make_thread(row):
    t = Thread(target=delay_check_file, args=(row['path'],))
    t.start()
    t.join()
        
df = read_df
df[0:1000:50].apply(make_thread, axis=1)
print('tasks completed!')

start check file /home/qicai21/Pictures/blames/vol_1/001.jpg
exist
start check file /home/qicai21/Pictures/blames/vol_1/051.jpg
exist
start check file /home/qicai21/Pictures/blames/vol_1/101.jpg
exist
start check file /home/qicai21/Pictures/blames/vol_1/151.jpg
exist
start check file /home/qicai21/Pictures/blames/vol_1/201.jpg
exist
start check file /home/qicai21/Pictures/blames/vol_1/251.jpg
exist
start check file /home/qicai21/Pictures/blames/vol_2/050.jpg
not exist
start check file /home/qicai21/Pictures/blames/vol_2/100.jpg
not exist
start check file /home/qicai21/Pictures/blames/vol_2/150.jpg
not exist
start check file /home/qicai21/Pictures/blames/vol_2/200.jpg
not exist
start check file /home/qicai21/Pictures/blames/vol_3/027.jpg
not exist
start check file /home/qicai21/Pictures/blames/vol_3/077.jpg
not exist
start check file /home/qicai21/Pictures/blames/vol_3/127.jpg
not exist
start check file /home/qicai21/Pictures/blames/vol_3/177.jpg
not exist
start check file /home/qicai21

## Step_3: 封装下载方法为线程方法

In [18]:
def download_image(url, path_to_save):
    res = requests.get(url)
    soup = BeautifulSoup(res.content, 'lxml')
    img = soup.find('img', {'class': {'img-fluid show-pic'}, })
    src = img['src']
    res = requests.get(src)
    with open(path_to_save, 'wb') as file:
        file.write(res.content)

def generate_download_thread(url, path):
    t = Thread(target=download_image, args=(url, path))
    t.start()
    return t

url_list = read_df[read_df['vol']=='vol_3']['url'][:10].tolist()
path_list = read_df[read_df['vol']=='vol_3']['path'][:10].tolist()
tasks = [generate_download_thread(url, path) for url, path in zip(url_list, path_list)]
[t.join() for t in tasks]
print('download complete!')

download complete!


## Step_4: 简单进程用例

In [12]:
def delay_check_file(path):
    print('start check file %s' % path)
    time.sleep(random())
    if Path(path).exists():
        print(f'{os.getpid()}: exist')
    else:
        print(f'{os.getpid()}: not exist')
        
        
read_df = pd.read_json('./urls_data.json')
p_list_2 = read_df[read_df['vol'] == 'vol_2'][::50]['path'].tolist()
p_list_3 = read_df[read_df['vol'] == 'vol_3'][::50]['path'].tolist()

def loop_check(path_list):
    [delay_check_file(p) for p in path_list]
    
p1 = Process(target=loop_check, args=(p_list_2, ))
p2 = Process(target=loop_check, args=(p_list_3, ))

p1.start()
p2.start()
p1.join()
p2.join()
print('task completed!')

start check file /home/qicai21/Pictures/blames/vol_2/001.jpg
start check file /home/qicai21/Pictures/blames/vol_3/001.jpg
7168: exist
start check file /home/qicai21/Pictures/blames/vol_2/051.jpg
7168: exist
start check file /home/qicai21/Pictures/blames/vol_2/101.jpg
7171: not exist
start check file /home/qicai21/Pictures/blames/vol_3/051.jpg
7168: exist
start check file /home/qicai21/Pictures/blames/vol_2/151.jpg
7171: not exist
start check file /home/qicai21/Pictures/blames/vol_3/101.jpg
7171: not exist
start check file /home/qicai21/Pictures/blames/vol_3/151.jpg
7168: exist
start check file /home/qicai21/Pictures/blames/vol_2/201.jpg
7171: not exist
start check file /home/qicai21/Pictures/blames/vol_3/201.jpg
7168: exist
7171: not exist
task completed!


## Step_5: 封装下载方法为多进程方法

In [39]:
vol3_df = read_df[read_df['vol']=='vol_3'].loc[:, ['url', 'path']]
imgs = vol3_df.values.tolist()

def download_threading(imgs):
    threads = [Thread(target=download_image, args=(url, path)) for url, path in imgs]
    [t.start() for t in threads]
    [t.join() for t in threads]
    print('pid[%s]: all tread completed' % os.getpid())
     
p1 = Process(target=download_threading, args=(imgs[:171],))
p2 = Process(target=download_threading, args=(imgs[171:],))
p1.start()
p2.start()
    
p1.join()
p2.join()
print('all download completed!')

Exception in thread Thread-141:
Exception in thread Traceback (most recent call last):
Thread-157  File "/home/qicai21/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 699, in urlopen
:
Traceback (most recent call last):
  File "/home/qicai21/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 699, in urlopen
    httplib_response = self._make_request(
      File "/home/qicai21/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 445, in _make_request
httplib_response = self._make_request(
      File "/home/qicai21/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 445, in _make_request
six.raise_from(e, None)
      File "<string>", line 3, in raise_from
six.raise_from(e, None)  File "/home/qicai21/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 440, in _make_request

      File "<string>", line 3, in raise_from
httplib_response = conn.getresponse()  File "/home/qicai21/anaconda3/lib/py

pid[9339]: all tread completed
pid[9340]: all tread completed
all download completed!


> 这个过程之中有2个线程抛出了异常,但程序并未中止.查看了一下,错误来自于图片服务器没有响应,requests不再等待了.

>"
raise ProxyError(e, request=request)
requests.exceptions.ProxyError: HTTPSConnectionPool(host='i2.manhuadb.com', port=443): Max retries exceeded with url: /ccbaike/211/2670/143_dwfgfkrz.jpg (Caused by ProxyError('Cannot connect to proxy.', RemoteDisconnected('Remote end closed connection without response')))"

>当然,如果说这2,300的并发访问就给服务器带来什么压力的话,不太现实,但直观感受是这样有点用力过猛.
>必须知道的是:在2019年05月28日国家网信办发布的《数据安全管理办法（征求意见稿）》中，拟通过行政法规的形式，对爬虫的使用进行限制：

>"
第十六条 网络运营者采取自动化手段访问收集网站数据，不得妨碍网站正常运行；此类行为严重影响网站运行，如自动化访问收集流量超过网站日均流量三分之一，网站要求停止自动化访问收集时，应当停止。
"

## Step_6: 封装线程方法到2个进程中

In [144]:
# python3.2后增加了原生的线程池的支持,有说法是max_workers默认为5,但是在文档里没查到.手动指定一下更安全.


def task(n):
    if n%3==0:
        print("loop start")
    print("Processing {}".format(n))
       
def pool_run():
    print("Starting ThreadPoolExecutor")
    with ThreadPoolExecutor(max_workers=3) as executor:
        [executor.submit(task, (i)) for i in range(6)]
    print("All tasks complete")
    
pool_run()

Starting ThreadPoolExecutor
loop start
Processing 0
Processing 1
Processing 2
loop start
Processing 3
Processing 4
Processing 5
All tasks complete


In [147]:
def download_img_in_30s(url, path_to_save):
    try:
        res = requests.get(url, timeout=30)
        soup = BeautifulSoup(res.content, 'lxml')
        img = soup.find('img', {'class': {'img-fluid show-pic'}, })
        src = img['src']
        try:
            res = requests.get(src, timeout=30)
            with open(path_to_save, 'wb') as file:
                file.write(res.content)
        except Exception as e:
            return
    except Exception as e:
        return

In [162]:
def run_download_threads(imgs):
    print("pro_%s: start" % os.getpid())
    with ThreadPoolExecutor(max_workers=5) as exec:
        [exec.submit(download_img_in_30s, *i) for i in imgs]
    print("pro_%s: completed" % os.getpid())

In [66]:
read_df = pd.read_json('./urls_data.json')

In [167]:
df = update_img_df_with_file_exist(read_df)
imgs = df[df['exist']==0][:400].loc[:, ['url', 'path']].values.tolist()
print("#"*20)
print("loop start: %s" % datetime.datetime.now().strftime('%H:%M:%S'))
p1 = Process(target=run_download_threads, args=(imgs[:200], ))
p2 = Process(target=run_download_threads, args=(imgs[200:], ))
p1.start()
p2.start()
p1.join()
p2.join()
print("loop finish: %s" % datetime.datetime.now().strftime('%H:%M:%S'))

####################
loop start: 04:19:00
pro_23290: start
pro_23293: start
pro_23293: completed
pro_23290: completed
loop finish: 04:21:30


> 这里就出了一个奇奇怪怪的问题,executor.submit执行的参数,如果数量大于1个程序就不执行,但*号传进去就没问题...