Skip to content

Commit

Permalink
Changed frontier scrapy integration from middlewares to scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
gatufo committed Dec 16, 2014
1 parent 03cd307 commit cbe5f4f
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 24 deletions.
14 changes: 12 additions & 2 deletions crawlfrontier/contrib/scrapy/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,29 @@ def scrapy_to_frontier(cls, scrapy_request):
cookies = scrapy_request.cookies
else:
cookies = dict(sum([d.items() for d in scrapy_request.cookies], []))
meta = {
'scrapy_callback': scrapy_request.callback,
'origin_is_frontier': True,
}
meta.update(scrapy_request.meta or {})
return FrontierRequest(url=scrapy_request.url,
method=scrapy_request.method,
headers=scrapy_request.headers,
cookies=cookies,
meta=scrapy_request.meta)
meta=meta)

@classmethod
def frontier_to_scrapy(cls, frontier_request):
meta = {
'frontier_request': frontier_request
}
meta.update(frontier_request.meta or {})
return ScrapyRequest(url=frontier_request.url,
callback=meta.get('scrapy_callback', None),
method=frontier_request.method,
headers=frontier_request.headers,
cookies=frontier_request.cookies,
meta={'frontier_request': frontier_request},
meta=meta,
dont_filter=True)


Expand Down
Empty file.
160 changes: 160 additions & 0 deletions crawlfrontier/contrib/scrapy/schedulers/frontier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from scrapy.core.scheduler import Scheduler
from scrapy.http import Request
from scrapy import log

from collections import deque

from crawlfrontier.contrib.scrapy.manager import ScrapyFrontierManager

STATS_PREFIX = 'crawlfrontier'


class StatsManager(object):
"""
'crawlfrontier/crawled_pages_count': 489,
'crawlfrontier/crawled_pages_count/200': 382,
'crawlfrontier/crawled_pages_count/301': 37,
'crawlfrontier/crawled_pages_count/302': 58,
'crawlfrontier/crawled_pages_count/400': 5,
'crawlfrontier/crawled_pages_count/403': 1,
'crawlfrontier/crawled_pages_count/404': 1,
'crawlfrontier/crawled_pages_count/999': 5,
'crawlfrontier/iterations': 5,
'crawlfrontier/links_extracted_count': 39805,
'crawlfrontier/pending_requests_count': 0,
'crawlfrontier/redirected_requests_count': 273,
'crawlfrontier/request_errors_count': 11,
'crawlfrontier/request_errors_count/DNSLookupError': 1,
'crawlfrontier/request_errors_count/ResponseNeverReceived': 9,
'crawlfrontier/request_errors_count/TimeoutError': 1,
'crawlfrontier/returned_requests_count': 500,
"""
def __init__(self, stats, prefix=STATS_PREFIX):
self.stats = stats
self.prefix = prefix

def add_seeds(self, count=1):
self._inc_value('seeds_count', count)

def add_crawled_page(self, status_code, n_links):
self._inc_value('crawled_pages_count')
self._inc_value('crawled_pages_count/%s' % str(status_code))
self._inc_value('links_extracted_count', n_links)

def add_redirected_requests(self, count=1):
self._inc_value('redirected_requests_count', count)

def add_returned_requests(self, count=1):
self._inc_value('returned_requests_count', count)

def add_request_error(self, error_code):
self._inc_value('request_errors_count')
self._inc_value('request_errors_count/%s' % str(error_code))

def set_iterations(self, iterations):
self._set_value('iterations', iterations)

def set_pending_requests(self, pending_requests):
self._set_value('pending_requests_count', pending_requests)

def _get_stats_name(self, variable):
return '%s/%s' % (self.prefix, variable)

def _inc_value(self, variable, count=1):
self.stats.inc_value(self._get_stats_name(variable), count)

def _set_value(self, variable, value):
self.stats.set_value(self._get_stats_name(variable), value)


class CrawlFrontierScheduler(Scheduler):

def __init__(self, crawler):

self.crawler = crawler
self.stats_manager = StatsManager(crawler.stats)
self._pending_requests = deque()

self.redirect_enabled = crawler.settings.get('REDIRECT_ENABLED')

frontier_settings = crawler.settings.get('FRONTIER_SETTINGS', None)
if not frontier_settings:
log.msg('FRONTIER_SETTINGS not found! Using default frontier settings...', log.WARNING)

self.frontier = ScrapyFrontierManager(frontier_settings)

@classmethod
def from_crawler(cls, crawler):
return cls(crawler)

def enqueue_request(self, request):
if not self._request_is_redirected(request):
self.frontier.add_seeds([request])
self.stats_manager.add_seeds()
return True
elif self.redirect_enabled:
self._add_pending_request(request)
self.stats_manager.add_redirected_requests()
return True
return False

def next_request(self):
request = self._get_next_request()
if request:
self.stats_manager.add_returned_requests()
return request

def process_spider_output(self, result, request, response, spider):
links = []
for element in result:
if isinstance(element, Request):
links.append(element)
else:
yield element
self.frontier.page_crawled(scrapy_response=response,
scrapy_links=links)
self.stats_manager.add_crawled_page(response.status, len(links))

def process_download_error(self, spider_failure, download_failure, request, spider):
error_code = self._get_failure_code(download_failure or spider_failure)
self.frontier.request_error(scrapy_request=request, error=error_code)
self.stats_manager.add_request_error(error_code)

def open(self, spider):
log.msg('Starting frontier', log.INFO)
if not self.frontier.manager.auto_start:
self.frontier.start()

def close(self, reason):
log.msg('Finishing frontier (%s)' % reason, log.INFO)
self.frontier.stop()
self.stats_manager.set_iterations(self.frontier.manager.iteration)
self.stats_manager.set_pending_requests(len(self))

def __len__(self):
return len(self._pending_requests)

def has_pending_requests(self):
return len(self) > 0

def _get_next_request(self):
if not self.frontier.manager.finished and \
not self.has_pending_requests():
for request in self.frontier.get_next_requests():
self._add_pending_request(request)
return self._get_pending_request()

def _add_pending_request(self, request):
return self._pending_requests.append(request)

def _get_pending_request(self):
return self._pending_requests.popleft() if self._pending_requests else None

def _get_failure_code(self, failure):
try:
return failure.type.__name__
except:
return '?'

def _request_is_redirected(self, request):
return request.meta.get('redirect_times', 0) > 0
2 changes: 1 addition & 1 deletion crawlfrontier/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class FrontierObject(object):
def copy(self):
return copy.deepcopy(self)
return copy.copy(self)


class Request(FrontierObject):
Expand Down
6 changes: 3 additions & 3 deletions examples/scrapy_frontier/scrapy_frontier/frontier/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
# Frontier
#--------------------------------------------------------
BACKEND = 'crawlfrontier.contrib.backends.memory.FIFO'
MAX_REQUESTS = 5
MAX_NEXT_REQUESTS = 1
MAX_REQUESTS = 200
MAX_NEXT_REQUESTS = 10

#--------------------------------------------------------
# Logging
#--------------------------------------------------------
LOGGING_EVENTS_ENABLED = False
LOGGING_MANAGER_ENABLED = True
LOGGING_MANAGER_ENABLED = False
LOGGING_BACKEND_ENABLED = False
LOGGING_DEBUGGING_ENABLED = False
28 changes: 12 additions & 16 deletions examples/scrapy_frontier/scrapy_frontier/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,25 @@

LOGSTATS_INTERVAL = 10

#DUPEFILTER_CLASS = 'scrapy.dupefilter.BaseDupeFilter'

#CLOSESPIDER_PAGECOUNT = 1000

SPIDER_MIDDLEWARES = {}
DOWNLOADER_MIDDLEWARES = {
'scrapy.contrib.downloadermiddleware.httpcache.HttpCacheMiddleware': 599,
}
DOWNLOADER_MIDDLEWARES = {}

#--------------------------------------------------------------------------
# Frontier Settings
#--------------------------------------------------------------------------
SCHEDULER = 'crawlfrontier.contrib.scrapy.schedulers.frontier.CrawlFrontierScheduler'
FRONTIER_SETTINGS = 'scrapy_frontier.frontier.settings'


#--------------------------------------------------------------------------
# Seed loaders
#--------------------------------------------------------------------------
SPIDER_MIDDLEWARES.update({
'crawlfrontier.contrib.scrapy.middlewares.frontier.CrawlFrontierSpiderMiddleware': 0,
'crawlfrontier.contrib.scrapy.middlewares.seeds.file.FileSeedLoader': 1,
})
DOWNLOADER_MIDDLEWARES.update({
'crawlfrontier.contrib.scrapy.middlewares.frontier.CrawlFrontierDownloaderMiddleware': 100, # After retry mw.
})
FRONTIER_ENABLED = True
FRONTIER_SETTINGS = 'scrapy_frontier.frontier.settings'
FRONTIER_SCHEDULER_INTERVAL = 0.01
FRONTIER_SCHEDULER_CONCURRENT_REQUESTS = 256

SEEDS_SOURCE = 'seeds.txt'

#--------------------------------------------------------------------------
# Testing
#--------------------------------------------------------------------------
#CLOSESPIDER_PAGECOUNT = 1
2 changes: 2 additions & 0 deletions examples/scrapy_frontier/scrapy_frontier/spiders/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ class MySpider(CrawlSpider):

def parse_page(self, response):
pass

parse_start_url = parse_page
6 changes: 4 additions & 2 deletions examples/scrapy_frontier/seeds.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
http://scrapinghub.com
http://diffeo.com
#http://scrapinghub.com
http://diffeo.com
#http://www.google.com
#http://trec-kba.org/trec-kba-stream-corpus.shtml

0 comments on commit cbe5f4f

Please sign in to comment.