In [1]:
# Downloader类
    
import urllib
class Downloader:
    def __init__(self, delay=5, user_agent='yuhao', num_retries=1, cache=None):
        self.throttle = Throttle(delay)
        self.user_agent = user_agent
        self.num_retries = num_retries
        self.cache = cache
    def __call__(self, url):
        result = None
        if self.cache:
            try:
                result = self.cache[url]
            except KeyError:
                #url is not available in cache
                pass
            else:
                if result['code'] == None:
                     return result['html']
                if self.num_retries > 0 and 500 <= result['code'] <600:
                    result = None
        if result is None:
            self.throttle.wait(url)
            headers = {'User-agent':self.user_agent}
            result = self.download(url, headers, self.num_retries)
            if self.cache:
                self.cache[url] = result
            return result['html']
    def download(self, url, headers, num_retries, data=None):
        print ('Downloading:',url)
        # 创建请求而不是url， url读取会被禁止, 设置代理(默认代理会被拒)
        req = urllib.request.Request(url, headers={'User-agent':user_agent})
        code = None
        try:
            # 读取后编码成字符串，在后面re.findall中才能匹配
            html = urllib.request.urlopen(req).read().decode('utf-8')

        except urllib.error.HTTPError as e:
            print ('Download error:',e.reason)
            html = None
            code = e.code
            if(num_retries > 0):
                # 处理500-600的服务器错误
                if hasattr(e, 'code') and 500 <= e.code <600:
                    return download(url, num_retries - 1)
        
        return {'html':html,'code':code}
        
    




In [2]:
# 使用MongoDB实现cache类
    # 创建index时，expireAfterSeconds属性如果为utc时间，则会被数据库自动删除，否则不会
from datetime import datetime, timedelta
from pymongo import MongoClient

class MongoCache:
    def __init__(self, client=None, expires=timedelta(days=30)):
        self.client = MongoClient('localhost',27017)
        self.db = client.cache
        self.db.webpage.create_index('timestamp', expireAfterSeconds=expires.total_seconds())
    
    def __getitem__(self, url):
        record = self.db.webpage.find_one({'_id':url})
        if record:
            return record['result']
        else:
            raise KeyError(url + 'does not exist')
            
    def __setitem__(self, url, result):
        record = {'result': result, 'timestamp':datetime.now()}
        self.db.webpage.update( {'_id': url}, {'$set': record},  upsert=True)

        
        


In [3]:
# 链接爬虫4.0


import re
from datetime import datetime
import csv
import urllib.robotparser as r_p
import time
import lxml.html

# Throttle类记录每个域名最近访问时间
class Throttle:
    def __init__(self, delay):
        self.delay = delay
        self.domains = {}
    def wait(self, url):
        domain = urllib.parse.urlparse(url).netloc
        last_accessed = self.domains.get(domain)
        if self.delay > 0 and last_accessed is not None:
            sleep_secs = self.delay - (datetime.now() - last_accessed).seconds
            if sleep_secs > 0:
                time.sleep(sleep_secs)
        self.domains[domain] = datetime.now()    

        


rp = r_p.RobotFileParser()
rp.set_url('http://example.webscraping.com/robots.txt')
rp.read()

user_agent = 'GoodCrawler'



# 链接爬取
def link_crawler4(seed_url, link_regex, num_retries=1, delay=5, max_depth=2, cache=None):
    max_depth = 2
    crawl_queue = [seed_url]
    # 避免重复url
    seen = {seed_url:0}
    
    num_urls = 0
    D= Downloader(delay=delay, user_agent=user_agent, num_retries=num_retries, cache=cache)
    
    while crawl_queue:
        url = crawl_queue.pop()
        depth = seen[url]
        print( rp.can_fetch(user_agent, url))
        # 检查robots.txt是否当前代理可以爬取
        if not rp.can_fetch(user_agent, url):
            print("Blocked by robots.txt", user_agent, url)
            return
        html = D(url)
        
        if depth != max_depth:
            for link in get_links(html):
                if re.match(link_regex, link):
                    link = urllib.parse.urljoin(seed_url, link)
                    if link not in seen:
                        seen[link] = depth + 1
                        crawl_queue.append(link)

def get_links(html):
    webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']', re.IGNORECASE)
    return webpage_regex.findall(html)


''' 
client = MongoClient('localhost',27017)
cache = MongoCache(client=client, expires=timedelta())
link_crawler4('http://example.webscraping.com', '/(places/default/view|places/default/index)', 
              delay = 3, cache = cache, num_retries = 2,  max_depth = -1)
'''
    

" \nclient = MongoClient('localhost',27017)\ncache = MongoCache(client=client, expires=timedelta())\nlink_crawler4('http://example.webscraping.com', '/(places/default/view|places/default/index)', \n              delay = 3, cache = cache, num_retries = 2,  max_depth = -1)\n"

In [4]:
'''
# 下载用于并发例子的url数据
import csv
from zipfile import ZipFile

D = Downloader()
zipped_data = D('http://s3.amazonaws.com/alexa-static/top-1m.csv.zip')
urls = []  # top 1 million URL will be stored in this list
    # ZipFile需要一个文件接口，不能是字符串，用StringIO包装
with ZipFile(io.StringIO(zipped_data)) as zf:       
    csv_filename = zf.namelist()[0]
    for _, website in csv.reader(zf.open(csv_filename)):
        urls.append('http://' + website)
'''

"\n# 下载用于并发例子的url数据\nimport csv\nfrom zipfile import ZipFile\n\nD = Downloader()\nzipped_data = D('http://s3.amazonaws.com/alexa-static/top-1m.csv.zip')\nurls = []  # top 1 million URL will be stored in this list\n    # ZipFile需要一个文件接口，不能是字符串，用StringIO包装\nwith ZipFile(io.StringIO(zipped_data)) as zf:       \n    csv_filename = zf.namelist()[0]\n    for _, website in csv.reader(zf.open(csv_filename)):\n        urls.append('http://' + website)\n"

In [5]:
# 修改scrape_callback接口

class AlexaCallback:
    def __init__(self, max_urls=1000):
        self.max_urls = max_urls    # 设定Alexa文件中提取的URL数量
        self.seed_url = 'http://s3.amazonaws.com/alexa-static/top-1m.csv.zip'
        
    def __call__(self, url, html):
        if url == self.seed_url:
            urls = []
            with ZipFile(StringIO(zipped_data)) as zf:       
                csv_filename = zf.namelist()[0]
                for _, website in csv.reader(zf.open(csv_filename)):
                    urls.append('http://' + website)
                    if len(urls) == self.max_urls:
                        break
            return urls

In [6]:
'''

# 串行下载，使用Alexa回调函数

# 串行下载估算，每个URL下载花费1.6秒
scrape_callback = AlexaCallback()
link_crawler4(seed_url=scrape_callback.seed_url, cache_callback=MongoCache(), scrape_callback=scrape_callback)
'''

'\n\n# 串行下载，使用Alexa回调函数\n\n# 串行下载估算，每个URL下载花费1.6秒\nscrape_callback = AlexaCallback()\nlink_crawler4(seed_url=scrape_callback.seed_url, cache_callback=MongoCache(), scrape_callback=scrape_callback)\n'

In [7]:

# 多线程爬虫


import time
import threading
SLEEP_TIME = 1

def threaded_crawler(seed_url, link_regex, num_retries=1, delay=5, max_depth=3, cache=None , max_threads=10):

    crawl_queue = [seed_url]
    # 避免重复url
    seen = {seed_url:0}
    
    num_urls = 0
    D= Downloader(delay=delay, user_agent=user_agent, num_retries=num_retries, cache=cache)#, timeout=timeout)
    
    def process_queue():
        while True:
            try:
                url = crawl_queue.pop()
            except IndexError:
                # crawl queue is empty
                break
            else:
                depth = seen[url]
                html = D(url)
                if depth != max_depth:
                    for link in get_links(html):
                        if re.match(link_regex, link):
                            link = urllib.parse.urljoin(seed_url, link)
                            if link not in seen:
                                seen[link] = depth + 1
                                crawl_queue.append(link)
    threads = []
    while threads or crawl_queue:
        # the crawl is still active
        for thread in threads:
            if not thread.is_alive():
                #remove the stopped threads
                threads.remove(thread)
        while len(threads) < max_threads and crawl_queue:
            # can start some more threads
            thread = threading.Thread(target=process_queue, daemon=True)
            # set daemon so main thread can exit when receives ctrl-c
            thread.start()
            threads.append(thread)
        # all threads have been processed
        # sleep temporarily so CPU can focus execution elsewhere
        time.sleep(SLEEP_TIME)
    
def get_links(html):
    webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']', re.IGNORECASE)
    return webpage_regex.findall(html)
client = MongoClient('localhost',27017)
cache = MongoCache(client=client, expires=timedelta())
threaded_crawler(seed_url='http://example.webscraping.com', link_regex='/(places/default/view|places/default/index)', 
              delay = 5, num_retries = 2,  max_depth = 3, cache=cache)



Downloading: http://example.webscraping.com




Downloading: http://example.webscraping.com/places/default/view/Antigua-and-Barbuda-10
Downloading:Downloading: http://example.webscraping.com/places/default/view/Angola-7
Downloading: http://example.webscraping.com/places/default/view/Antarctica-9
Downloading: http://example.webscraping.com/places/default/view/Albania-3
 Downloading: http://example.webscraping.com/places/default/view/Aland-Islands-2Downloading: http://example.webscraping.com/places/default/view/Andorra-6
Downloading: http://example.webscraping.com/places/default/view/Algeria-4
Downloading: http://example.webscraping.com/places/default/view/American-Samoa-5
http://example.webscraping.com/places/default/view/Anguilla-8

Downloading: http://example.webscraping.com/places/default/index/1
Downloading: http://example.webscraping.com/places/default/index
Downloading:Downloading:  http://example.webscraping.com/places/default/index/0
http://example.webscraping.com/places/default/index/2
Downloading: http://example.webscraping

In [8]:
'''
# 多进程爬虫
    # 由于爬虫队列保存在本地内存，其他进程无法处理这些爬虫
    # 因此将爬虫队列转移到MongoDB中，单独存储队列
from datetime import datetime, timedelta
from pymongo import MongoClient, errors

# 添加了三种状态 （OUTSTANDING / PROCESSING / COMPLETE ）
    # OUTSTANDING：添加新URL
    # PROCESSING： 从队列取出URL准备下载
    # COMPLETE：   下载结束
class MongoQueue:
    
    OUTSTANDING, PROCESSING, COMPLETE = range(3)
    
    def __init__(self, client=None, timeout=300):
        self.client = MongoClient()
        self.db = self.client.cache
        self.timeout = timeout
    
    def __nonzero__(self):
        # return true if there are more jobs to process
        record = self.db.crawl_queue.find_one( {'status': {'$ne': self.COMPLETE} } )
        return True if record else False
    
    def push(self, url):
        try:
            self.db.crawl_queue.insert( {'_id': url, 'status': self.OUTSTANDING} )
        except errors.DuplicateKeyError as e:
            # already in the queue
            pass
        
    def pop(self):
        record = self.db.crawl_queue.find_and_modify(
            query = { 'status': self.OUTSTANDING},
            update = {'$set': { 'status': self.PROCESSING, 'timestamp': datetime.now() } }
        )
        if record:
            return record['_id']
        else:
            self.repair()
            raise KeyError()
            
    def complete(self, url):
        self.db.crawl_queue.update( {'_id': url}, {'$set': {'status': self.COMPLETE} } )
    
    def repair(self):
        record = self.db.crawl_queue.find_and_modify(
            query = { 
                'timestamp': { '$lt': datetime.now() - timedelta(seconds=self.timeout) },
                'status': { '$ne': self.COMPLETE}
            },
            update = {'$set': {'status': self.OUTSTANDING} }
        )
        if record:
            print('Released:', record['_id'])
'''

"\n# 多进程爬虫\n    # 由于爬虫队列保存在本地内存，其他进程无法处理这些爬虫\n    # 因此将爬虫队列转移到MongoDB中，单独存储队列\nfrom datetime import datetime, timedelta\nfrom pymongo import MongoClient, errors\n\n# 添加了三种状态 （OUTSTANDING / PROCESSING / COMPLETE ）\n    # OUTSTANDING：添加新URL\n    # PROCESSING： 从队列取出URL准备下载\n    # COMPLETE：   下载结束\nclass MongoQueue:\n    \n    OUTSTANDING, PROCESSING, COMPLETE = range(3)\n    \n    def __init__(self, client=None, timeout=300):\n        self.client = MongoClient()\n        self.db = self.client.cache\n        self.timeout = timeout\n    \n    def __nonzero__(self):\n        # return true if there are more jobs to process\n        record = self.db.crawl_queue.find_one( {'status': {'$ne': self.COMPLETE} } )\n        return True if record else False\n    \n    def push(self, url):\n        try:\n            self.db.crawl_queue.insert( {'_id': url, 'status': self.OUTSTANDING} )\n        except errors.DuplicateKeyError as e:\n            # already in the queue\n            pass\n        \n    de

In [9]:
'''
# 多进程爬虫
    # 修改爬虫队列，以存储在MongoDB中
        # 队列改为MongoQueue
        # seen标志取消，db已经实现id为url，因此不会重复下载
        
    
    # todo 虽然页面都下载完了，但是后来因为没有下载的了，就一直报KeyError，需要个终止条件

import time
import threading
SLEEP_TIME = 1

def threaded_crawler2(seed_url, link_regex, num_retries=1, delay=5, cache=None , max_threads=10):
    
    crawl_queue = MongoQueue()
    crawl_queue.push(seed_url)
    
    
    num_urls = 0
    D= Downloader(delay=delay, user_agent=user_agent, num_retries=num_retries, cache=cache)#, timeout=timeout)
    
    
    def process_queue():
        while True:
            try:
                url = crawl_queue.pop()
            except IndexError:
                # crawl queue is empty
                break
            else:
                html = D(url)
                crawl_queue.complete(url)
                for link in get_links(html):
                    if re.match(link_regex, link):
                        link = urllib.parse.urljoin(seed_url, link)
                        crawl_queue.push(link)
    threads = []
    while threads or crawl_queue:
        # the crawl is still active
        for thread in threads:
            if not thread.is_alive():
                #remove the stopped threads
                threads.remove(thread)
        while len(threads) < max_threads and crawl_queue:
            # can start some more threads
            thread = threading.Thread(target=process_queue, daemon=True)
            # set daemon so main thread can exit when receives ctrl-c
            thread.start()
            threads.append(thread)
        # all threads have been processed
        # sleep temporarily so CPU can focus execution elsewhere
        time.sleep(SLEEP_TIME)
    
def get_links(html):
    webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']', re.IGNORECASE)
    return webpage_regex.findall(html)

client = MongoClient('localhost',27017)
cache = MongoCache(client=client, expires=timedelta())
threaded_crawler2(seed_url='http://example.webscraping.com', link_regex='/(places/default/view|places/default/index)', 
              delay = 5, num_retries = 2, cache=cache)
'''

'\n# 多进程爬虫\n    # 修改爬虫队列，以存储在MongoDB中\n        # 队列改为MongoQueue\n        # seen标志取消，db已经实现id为url，因此不会重复下载\n        \n    \n    # todo 虽然页面都下载完了，但是后来因为没有下载的了，就一直报KeyError，需要个终止条件\n\nimport time\nimport threading\nSLEEP_TIME = 1\n\ndef threaded_crawler2(seed_url, link_regex, num_retries=1, delay=5, cache=None , max_threads=10):\n    \n    crawl_queue = MongoQueue()\n    crawl_queue.push(seed_url)\n    \n    \n    num_urls = 0\n    D= Downloader(delay=delay, user_agent=user_agent, num_retries=num_retries, cache=cache)#, timeout=timeout)\n    \n    \n    def process_queue():\n        while True:\n            try:\n                url = crawl_queue.pop()\n            except IndexError:\n                # crawl queue is empty\n                break\n            else:\n                html = D(url)\n                crawl_queue.complete(url)\n                for link in get_links(html):\n                    if re.match(link_regex, link):\n                        link = urllib.parse.urljoin(see

In [10]:
'''
# 启动多个进程
import multiprocessing

def process_link_crawler(args, **kwargs):
    
    num_cpus = multiprocessing.cpu_count()
    print('Starting {} processes'.format(num_cpus))
    processes = []
    
    for i in range(num_cpus):
        p = multiprocessing.Process(target=threaded_crawler, args=[args], kwargs=kwargs)
        p.start()
        processes.append(p)
    # wait for processes to complete
    for p in processes:
        p.join()
'''    

"\n# 启动多个进程\nimport multiprocessing\n\ndef process_link_crawler(args, **kwargs):\n    \n    num_cpus = multiprocessing.cpu_count()\n    print('Starting {} processes'.format(num_cpus))\n    processes = []\n    \n    for i in range(num_cpus):\n        p = multiprocessing.Process(target=threaded_crawler, args=[args], kwargs=kwargs)\n        p.start()\n        processes.append(p)\n    # wait for processes to complete\n    for p in processes:\n        p.join()\n"