In [None]:
# default_exp IpPool

%reload_ext autoreload
%autoreload 2

# 代理池
实现效果
1. 自动抓取新ip
* 自动删除无效ip（根据健康度）

任务：
* 显示优质ip

In [None]:
# export
import re,random,time
from concurrent.futures import ThreadPoolExecutor

import requests,redis 
from bs4 import BeautifulSoup


## 获取一个可用ip

In [None]:
# export
def connect_db() -> object:
    connection_pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
    rdb = redis.Redis(connection_pool=connection_pool)
    return rdb



In [None]:
# export
rdb = connect_db()
def _get_ip(protocal='http') -> str:
    '把health作为权重，随机抽取ip'
    prim_ips = rdb.zrange(protocal,-20,-1)    
    return random.choice(prim_ips)

In [None]:
def make_test_data():
    rdb = connect_db()
    rdb.zadd('http',{'39.137.107.9:8080':10}) 
    rdb.zadd('https',{'39.137.107.9:8080':10})        

In [None]:
# make_test_data()
_get_ip()   

'39.137.69.6:80'

In [None]:
prim_ips = rdb.zrange('http',-20,-1,withscores=True)
prim_ips

[('94.179.135.230:54393', 100.0),
 ('94.205.254.82:3128', 100.0),
 ('95.79.55.196:53281', 100.0),
 ('35.247.242.31:3128', 205.0),
 ('34.89.173.90:3128', 213.0),
 ('52.179.231.206:80', 221.0),
 ('39.137.69.7:8080', 305.0),
 ('39.137.69.6:8080', 320.0),
 ('39.137.69.6:80', 346.0),
 ('52.80.58.248:3128', 359.0),
 ('47.112.46.46:80', 450.0),
 ('58.176.150.177:80', 593.0),
 ('61.147.210.159:8118', 1436.0),
 ('39.137.69.10:8080', 1503.0),
 ('39.137.69.7:80', 1570.0),
 ('61.147.210.159:8080', 1579.0),
 ('39.137.69.9:80', 1685.0),
 ('39.137.69.10:80', 1707.0),
 ('39.137.69.8:80', 1818.0),
 ('221.180.170.104:8080', 2188.0)]

## 更新健康值

In [None]:
# export
def update_health(ip,is_health=False,protocal='http') -> float:
    increase = 1 if is_health else -int(rdb.zscore(protocal,ip)/2)
    result = rdb.zincrby(protocal,increase,ip)
    return result

In [None]:
update_health('39.137.107.9:8080',is_health=False,protocal='https')

3.0

## 爬取ip

In [None]:
# export
proxy_website_urls = '''
https://www.kuaidaili.com/free/inha/
http://www.nimadaili.com/gaoni/
http://www.xiladaili.com/gaoni/
https://ip.jiangxianli.com/?anonymity=2
https://www.7yip.cn/free/
http://www.ip3366.net/free/
https://list.proxylistplus.com/Fresh-HTTP-Proxy-List-1
http://proxyslist.com/
'''.strip().split('\n')

def match_ip(tag): return re.match(r'^(\d{1,3}\.){3}\d{1,3}$',tag.text.strip())
def match_port(tag): return re.match(r'^\d{2,5}$',tag.text.strip())
def match_ip_with_port(tag): return re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{2,5}$',tag.text.strip())

def find_port(ip_item_soup) -> str:
    soup = ip_item_soup
    while True:
        # 不停的查找包含port的父级
        soup = soup.parent
        if len(soup.find_all(match_ip)) > 1:
#             print('解析port失败',soup)
            return 
        if soup.find(match_port): 
            return soup.find(match_port).text.strip()

def find_ips(soup) -> iter:
    '从soup中解析出ip和port'
    # 39.137.107.98:80这种情况
    if soup.find_all(match_ip_with_port):
        for item in soup.find_all(match_ip_with_port):
            yield item.text.strip()
    # 39.137.107.98 | 80这种情况
    elif soup.find_all(match_ip):
        for item in soup.find_all(match_ip):
            ip = item.text.strip()
            port = find_port(item)
            if port: yield ip+':'+port
    else:
        print('解析失败：',soup)
        
        
# 这里没想好，到底http和https都爬，然后自动切换还是手动
def crawl_ip(url,protocal='http'):
    '爬取1个页面的ip'
    increase = 0
    
    res = requests.get(url,headers={'user-agent':'Mozilla/5.0'})
    if res.status_code == 200:
        soup = BeautifulSoup(res.text,features='lxml')
        for ip in find_ips(soup):
            
            if rdb.zadd('http',{ip:100},nx=True):
                increase += 1
        stock = rdb.zcount(protocal,0,100000)
        print(f'{url} 新增：{increase}，库存更新为：{stock}个')
    else:
        print(url,res,'requests请求失败')

In [None]:
crawl_ip(proxy_website_urls[1])

## 校验IP

In [None]:
# export
def validate(ip,url='http://m.sm.cn/',timeout=5) -> float:
    protocal = url.split(':')[0]
    proxies={protocal: protocal+'://'+ip}
    try:
        res = requests.get(url,
                           headers={'user-agent':'Mozilla/5.0'},
                           proxies=proxies,
                           timeout=timeout)
    except:
        return update_health(ip,is_health=False,protocal=protocal)
    else:
        if res and res.status_code == 200:
            return update_health(ip,is_health=True,protocal=protocal)
        else:
            return update_health(ip,is_health=False,protocal=protocal)
        

In [None]:
validate('128.199.246.10:44344')

## 定期更新IP
>5min更新一次

In [None]:
# export
last_crawl = 0
def parallel_crawl_ips():
    with ThreadPoolExecutor(max_workers=10) as executor:
        executor.map(crawl_ip, proxy_website_urls) 
def repeat_crawl_ips(frequency=300):
    global last_crawl
    now = time.time()
    if last_crawl//frequency != now//frequency:
        last_crawl = now
        parallel_crawl_ips()

In [None]:
repeat_crawl_ips()

## 定期删除IP
> 每日删除health为0的IP

In [None]:
# export
# 这里也是，没想好，怎么维护http和https两个库

last_delete = 0
def delete_ips(protocal='http'):
    result = rdb.zremrangebyscore(protocal,0,20) 
    return result
    
def repeat_delete_ips(frequency=24*3600):
    global last_delete
    now = time.time()
    
    if last_delete//frequency != now//frequency:
        last_delete = now
        result = delete_ips()
        print('移除：',result,'个IP')
    

In [None]:
repeat_delete_ips()

## 自动维护IP池

In [None]:
# export
def get_ip(protocal='http') -> str:
    repeat_crawl_ips()
    repeat_delete_ips()
    return _get_ip(protocal)
    

In [None]:
get_ip()

## 代理请求
1. 失败换ip重新请求
* 超过10次log下来

In [None]:
# export
def proxy_get(url) -> object:
    repeat_count = 0
    while repeat_count<10:
        repeat_count += 1

        protocal = url.split(':')[0]
        ip = get_ip(protocal=protocal)
        proxies={protocal: protocal+'://'+ip}
        
        try:
            res = requests.get(url,
                               headers={'user-agent':'Mozilla/5.0'},
                               proxies=proxies,
                               timeout=5)
        except:
            # 报错 重来 health-
            update_health(ip,False,protocal)            
        else:
            # 有些200的text中确实bad request
            if res and res.status_code == 200 and len(res.text)>100:
                # 200 & html存在，则保存 break health+
                update_health(ip,True,protocal)            
                print('success:',url,'try times:',repeat_count)
                return res
            elif res and res.status_code == 404:
                # 404 log break health+
                with open('rrpm.log','a') as f:
                    f.write(f'{url} 404\n')
                update_health(ip,True,protocal)  
                break 
            else:
                # 其他,比如访问过快 重来 health-
                update_health(ip,False,protocal)   
    print('overtry:',url)

In [None]:
res = proxy_get('http://www.woshipm.com/category/operate')

## 并发任务

In [None]:
# export
def parallel_task(fn,task_arg_list,max_workers=100) -> list:
    data_list = []
    completed_num = 0
    task_num = len(task_arg_list)
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for data in executor.map(fn,task_arg_list):
            data_list.append(data)
            completed_num += 1
            print('progress:{:.2%}'.format(completed_num/task_num))     
    end_time = time.time()
    print('time cost:',int(end_time - start_time),'s',len(task_arg_list),'urls')
    return data_list


## 发布

In [None]:
# hide
!nbdev_build_lib --fname 23_IP_Pool.ipynb

Converted 23_IP_Pool.ipynb.


In [None]:
!git add 23_IP_Pool.ipynb
!git commit -m "use one redis connect"

[master 9a17c01] use one redis connect
 1 file changed, 78 insertions(+), 187 deletions(-)


## 错误

In [None]:
# redis.exceptions.ConnectionError: Error 8 connecting to localhost:6379. nodename nor servname provided, or not known.
# 由于redis给没有用户名的用户，有连接数限制
!ulimit -n 1024
