Skip to content

Commit

Permalink
Refactored close spider behaviour so that the engine now waits for all
Browse files Browse the repository at this point in the history
downloading (and enqueued for download) requests to finish and their responses
to be processed in the scraper/spiders, before closing the spider.

This will be required in the future to avoid loosing requests when we add
scheduler persistence and it's also a more correct behaviour overall.

The closing process has also been refactored to remove unneeded closing state
from downloader and leave it only in the engine.

Finally, some unused methods has been removed too, like spider_is_open() for
engine and scheduler.
  • Loading branch information
pablohoffman committed Jul 8, 2011
1 parent 574b070 commit 409aaad
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 75 deletions.
31 changes: 2 additions & 29 deletions scrapy/core/downloader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from twisted.internet import reactor, defer
from twisted.python.failure import Failure

from scrapy.exceptions import IgnoreRequest
from scrapy.conf import settings
from scrapy.utils.python import setattr_default
from scrapy.utils.defer import mustbe_deferred
Expand All @@ -34,7 +33,6 @@ def __init__(self, spider):
self.active = set()
self.queue = deque()
self.transferring = set()
self.closing = False
self.lastseen = 0
self.next_request_calls = set()

Expand Down Expand Up @@ -77,13 +75,10 @@ def fetch(self, request, spider):
not be downloaded from site.
"""
site = self.sites[spider]
if site.closing:
raise IgnoreRequest('Cannot fetch on a closing spider')

site.active.add(request)
def _deactivate(response):
site.active.remove(request)
self._close_if_idle(spider)
return response

dfd = self.middleware.download(self.enqueue, request, spider)
Expand All @@ -92,8 +87,6 @@ def _deactivate(response):
def enqueue(self, request, spider):
"""Enqueue a Request for a effective download from site"""
site = self.sites[spider]
if site.closing:
raise IgnoreRequest

def _downloaded(response):
send_catch_log(signal=signals.response_downloaded, \
Expand Down Expand Up @@ -128,20 +121,9 @@ def _process_queue(self, spider):
# Process enqueued requests if there are free slots to transfer for this site
while site.queue and site.free_transfer_slots() > 0:
request, deferred = site.queue.popleft()
if site.closing:
dfd = defer.fail(Failure(IgnoreRequest()))
else:
dfd = self._download(site, request, spider)
dfd = self._download(site, request, spider)
dfd.chainDeferred(deferred)

self._close_if_idle(spider)

def _close_if_idle(self, spider):
site = self.sites.get(spider)
if site and site.closing and not site.active:
del self.sites[spider]
site.closing.callback(None)

def _download(self, site, request, spider):
# The order is very important for the following deferreds. Do not change!

Expand All @@ -156,12 +138,6 @@ def _download(self, site, request, spider):
def finish_transferring(_):
site.transferring.remove(request)
self._process_queue(spider)
# avoid partially downloaded responses from propagating to the
# downloader middleware, to speed-up the closing process
if site.closing:
log.msg("Crawled while closing spider: %s" % request, \
level=log.DEBUG, spider=spider)
raise IgnoreRequest
return _
return dfd.addBoth(finish_transferring)

Expand All @@ -173,11 +149,8 @@ def open_spider(self, spider):
def close_spider(self, spider):
"""Free any resources associated with the given spider"""
assert spider in self.sites, "Spider not opened: %s" % spider
site = self.sites.get(spider)
site.closing = defer.Deferred()
site = self.sites.pop(spider)
site.cancel_request_calls()
self._process_queue(spider)
return site.closing

def is_idle(self):
return not self.sites
Expand Down
117 changes: 77 additions & 40 deletions scrapy/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,34 @@
from scrapy.utils.signal import send_catch_log, send_catch_log_deferred
from scrapy.utils.defer import mustbe_deferred

class Slot(object):

def __init__(self):
self.closing = False
self.inprogress = set() # requests in progress

def add_request(self, request):
self.inprogress.add(request)

def remove_request(self, request):
self.inprogress.remove(request)
self._maybe_fire_closing()

def close(self):
self.closing = defer.Deferred()
self._maybe_fire_closing()
return self.closing

def _maybe_fire_closing(self):
if self.closing and not self.inprogress:
self.closing.callback(None)


class ExecutionEngine(object):

def __init__(self, settings, spider_closed_callback):
self.settings = settings
self.closing = {} # dict (spider -> reason) of spiders being closed
self.closing_dfds = {} # dict (spider -> deferred) of spiders being closed
self.slots = {}
self.running = False
self.paused = False
self._next_request_calls = {}
Expand Down Expand Up @@ -88,7 +110,9 @@ def next_request(self, spider, now=False):
self._spider_idle(spider)

def _needs_backout(self, spider):
slot = self.slots[spider]
return not self.running \
or slot.closing \
or self.spider_is_closed(spider) \
or self.downloader.sites[spider].needs_backout() \
or self.scraper.sites[spider].needs_backout()
Expand All @@ -99,6 +123,10 @@ def _next_request(self, spider):
return
d = self._download(request, spider)
d.addBoth(self._handle_downloader_output, request, spider)
d.addErrback(log.msg, spider=spider)
slot = self.slots[spider]
d.addBoth(lambda _: slot.remove_request(request))
d.addErrback(log.msg, spider=spider)
d.addBoth(lambda _: self.next_request(spider))
return d

Expand Down Expand Up @@ -131,11 +159,6 @@ def spider_is_closed(self, spider):
closing stage)"""
return spider not in self.downloader.sites

def spider_is_open(self, spider):
"""Return True if the spider is fully opened (ie. not in closing
stage)"""
return spider in self.downloader.sites and spider not in self.closing

@property
def open_spiders(self):
return self.downloader.sites.keys()
Expand All @@ -145,28 +168,31 @@ def has_capacity(self):
return len(self.downloader.sites) < self.downloader.concurrent_spiders

def crawl(self, request, spider):
if spider in self.closing: # ignore requests for spiders being closed
return
assert spider in self.open_spiders, \
"Spider %r not opened when crawling: %s" % (spider.name, request)
self.schedule(request, spider)
self.next_request(spider)
# FIXME: we can't log errors because we would be preventing them from
# propagating to the request errback. This should be fixed after the
# next core refactoring.
#schd.addErrback(log.err, "Error on engine.crawl()")

def schedule(self, request, spider):
return self.scheduler.enqueue_request(spider, request)

def download(self, request, spider):
slot = self.slots[request]
slot.add_request(request)
if isinstance(request, Response):
return request
d = self._download(request, spider)
d.addCallback(self.download, spider)
d.addBoth(self._remove_request, slot, request)
return d

def _remove_request(self, _, slot, request):
slot.remove_request(request)
return _

def _download(self, request, spider):
slot = self.slots[spider]
slot.add_request(request)
def _on_success(response):
"""handle the result of a page download"""
assert isinstance(response, (Response, Request))
Expand Down Expand Up @@ -207,6 +233,7 @@ def open_spider(self, spider):
assert self.has_capacity(), "No free spider slots when opening %r" % \
spider.name
log.msg("Spider opened", spider=spider)
self.slots[spider] = Slot()
yield self.scheduler.open_spider(spider)
self.downloader.open_spider(spider)
yield self.scraper.open_spider(spider)
Expand Down Expand Up @@ -234,44 +261,54 @@ def _spider_idle(self, spider):

def close_spider(self, spider, reason='cancelled'):
"""Close (cancel) spider and clear all its outstanding requests"""
if spider in self.closing:
return defer.succeed(None)

slot = self.slots[spider]
if slot.closing:
return slot.closing
log.msg("Closing spider (%s)" % reason, spider=spider)
self.closing[spider] = reason

self.scheduler.clear_pending_requests(spider)
dfd = self.downloader.close_spider(spider)
self.closing_dfds[spider] = dfd

dfd = slot.close()

dfd.addBoth(lambda _: self.scheduler.close_spider(spider))
dfd.addErrback(log.err, "Unhandled error in scheduler.close_spider()", \
spider=spider)
dfd.addErrback(log.err, spider=spider)

dfd.addBoth(lambda _: self.downloader.close_spider(spider))
dfd.addErrback(log.err, spider=spider)

dfd.addBoth(lambda _: self.scraper.close_spider(spider))
dfd.addErrback(log.err, "Unhandled error in scraper.close_spider()", \
spider=spider)
dfd.addBoth(lambda _: self._finish_closing_spider(spider))
return dfd
dfd.addErrback(log.err, spider=spider)

def _close_all_spiders(self):
dfds = [self.close_spider(s, reason='shutdown') for s in self.open_spiders]
dfds += self.closing_dfds.values()
dlist = defer.DeferredList(dfds)
return dlist
dfd.addBoth(lambda _: self._cancel_next_call(spider))
dfd.addErrback(log.err, spider=spider)

dfd.addBoth(lambda _: send_catch_log_deferred(signal=signals.spider_closed, \
spider=spider, reason=reason))
dfd.addErrback(log.err, spider=spider)

def _finish_closing_spider(self, spider):
"""This function is called after the spider has been closed"""
reason = self.closing.pop(spider, 'finished')
call = self._next_request_calls.pop(spider, None)
if call and call.active():
call.cancel()
dfd = send_catch_log_deferred(signal=signals.spider_closed, \
spider=spider, reason=reason)
dfd.addBoth(lambda _: stats.close_spider(spider, reason=reason))
dfd.addErrback(log.err, "Unhandled error in stats.close_spider()",
spider=spider)
dfd.addErrback(log.err, spider=spider)

dfd.addBoth(lambda _: log.msg("Spider closed (%s)" % reason, spider=spider))
dfd.addBoth(lambda _: self.closing_dfds.pop(spider).callback(spider))

dfd.addBoth(lambda _: self.slots.pop(spider))
dfd.addErrback(log.err, spider=spider)

dfd.addBoth(lambda _: self._spider_closed_callback(spider))

return dfd

def _cancel_next_call(self, spider):
call = self._next_request_calls.pop(spider, None)
if call and call.active:
call.cancel()

def _close_all_spiders(self):
dfds = [self.close_spider(s, reason='shutdown') for s in self.open_spiders]
dlist = defer.DeferredList(dfds)
return dlist

@defer.inlineCallbacks
def _finish_stopping_engine(self):
yield send_catch_log_deferred(signal=signals.engine_stopped)
Expand Down
3 changes: 0 additions & 3 deletions scrapy/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ def __init__(self):
self.dfo = settings['SCHEDULER_ORDER'].upper() == 'DFO'
self.dupefilter = load_object(settings['DUPEFILTER_CLASS'])()

def spider_is_open(self, spider):
return spider in self.pending_requests

def spider_has_pending_requests(self, spider):
if spider in self.pending_requests:
return bool(self.pending_requests[spider])
Expand Down
4 changes: 1 addition & 3 deletions scrapy/utils/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ def get_engine_status(engine=None):
]
spider_tests = [
"engine.spider_is_idle(spider)",
"engine.closing.get(spider)",
"engine.scheduler.spider_has_pending_requests(spider)",
"engine.slots[spider].closing",
"len(engine.scheduler.pending_requests[spider])",
"len(engine.downloader.sites[spider].queue)",
"len(engine.downloader.sites[spider].active)",
"len(engine.downloader.sites[spider].transferring)",
"engine.downloader.sites[spider].closing",
"engine.downloader.sites[spider].lastseen",
"len(engine.scraper.sites[spider].queue)",
"len(engine.scraper.sites[spider].active)",
Expand Down

0 comments on commit 409aaad

Please sign in to comment.