diff --git a/scrapy/core/downloader/__init__.py b/scrapy/core/downloader/__init__.py index 62f48ec5a69..832745e2847 100644 --- a/scrapy/core/downloader/__init__.py +++ b/scrapy/core/downloader/__init__.py @@ -9,7 +9,6 @@ from twisted.internet import reactor, defer from twisted.python.failure import Failure -from scrapy.exceptions import IgnoreRequest from scrapy.conf import settings from scrapy.utils.python import setattr_default from scrapy.utils.defer import mustbe_deferred @@ -34,7 +33,6 @@ def __init__(self, spider): self.active = set() self.queue = deque() self.transferring = set() - self.closing = False self.lastseen = 0 self.next_request_calls = set() @@ -77,13 +75,10 @@ def fetch(self, request, spider): not be downloaded from site. """ site = self.sites[spider] - if site.closing: - raise IgnoreRequest('Cannot fetch on a closing spider') site.active.add(request) def _deactivate(response): site.active.remove(request) - self._close_if_idle(spider) return response dfd = self.middleware.download(self.enqueue, request, spider) @@ -92,8 +87,6 @@ def _deactivate(response): def enqueue(self, request, spider): """Enqueue a Request for a effective download from site""" site = self.sites[spider] - if site.closing: - raise IgnoreRequest def _downloaded(response): send_catch_log(signal=signals.response_downloaded, \ @@ -128,20 +121,9 @@ def _process_queue(self, spider): # Process enqueued requests if there are free slots to transfer for this site while site.queue and site.free_transfer_slots() > 0: request, deferred = site.queue.popleft() - if site.closing: - dfd = defer.fail(Failure(IgnoreRequest())) - else: - dfd = self._download(site, request, spider) + dfd = self._download(site, request, spider) dfd.chainDeferred(deferred) - self._close_if_idle(spider) - - def _close_if_idle(self, spider): - site = self.sites.get(spider) - if site and site.closing and not site.active: - del self.sites[spider] - site.closing.callback(None) - def _download(self, site, request, spider): # The order is very important for the following deferreds. Do not change! @@ -156,12 +138,6 @@ def _download(self, site, request, spider): def finish_transferring(_): site.transferring.remove(request) self._process_queue(spider) - # avoid partially downloaded responses from propagating to the - # downloader middleware, to speed-up the closing process - if site.closing: - log.msg("Crawled while closing spider: %s" % request, \ - level=log.DEBUG, spider=spider) - raise IgnoreRequest return _ return dfd.addBoth(finish_transferring) @@ -173,11 +149,8 @@ def open_spider(self, spider): def close_spider(self, spider): """Free any resources associated with the given spider""" assert spider in self.sites, "Spider not opened: %s" % spider - site = self.sites.get(spider) - site.closing = defer.Deferred() + site = self.sites.pop(spider) site.cancel_request_calls() - self._process_queue(spider) - return site.closing def is_idle(self): return not self.sites diff --git a/scrapy/core/engine.py b/scrapy/core/engine.py index 499c3cf3b61..959f035b956 100644 --- a/scrapy/core/engine.py +++ b/scrapy/core/engine.py @@ -19,12 +19,34 @@ from scrapy.utils.signal import send_catch_log, send_catch_log_deferred from scrapy.utils.defer import mustbe_deferred +class Slot(object): + + def __init__(self): + self.closing = False + self.inprogress = set() # requests in progress + + def add_request(self, request): + self.inprogress.add(request) + + def remove_request(self, request): + self.inprogress.remove(request) + self._maybe_fire_closing() + + def close(self): + self.closing = defer.Deferred() + self._maybe_fire_closing() + return self.closing + + def _maybe_fire_closing(self): + if self.closing and not self.inprogress: + self.closing.callback(None) + + class ExecutionEngine(object): def __init__(self, settings, spider_closed_callback): self.settings = settings - self.closing = {} # dict (spider -> reason) of spiders being closed - self.closing_dfds = {} # dict (spider -> deferred) of spiders being closed + self.slots = {} self.running = False self.paused = False self._next_request_calls = {} @@ -88,7 +110,9 @@ def next_request(self, spider, now=False): self._spider_idle(spider) def _needs_backout(self, spider): + slot = self.slots[spider] return not self.running \ + or slot.closing \ or self.spider_is_closed(spider) \ or self.downloader.sites[spider].needs_backout() \ or self.scraper.sites[spider].needs_backout() @@ -99,6 +123,10 @@ def _next_request(self, spider): return d = self._download(request, spider) d.addBoth(self._handle_downloader_output, request, spider) + d.addErrback(log.msg, spider=spider) + slot = self.slots[spider] + d.addBoth(lambda _: slot.remove_request(request)) + d.addErrback(log.msg, spider=spider) d.addBoth(lambda _: self.next_request(spider)) return d @@ -131,11 +159,6 @@ def spider_is_closed(self, spider): closing stage)""" return spider not in self.downloader.sites - def spider_is_open(self, spider): - """Return True if the spider is fully opened (ie. not in closing - stage)""" - return spider in self.downloader.sites and spider not in self.closing - @property def open_spiders(self): return self.downloader.sites.keys() @@ -145,28 +168,31 @@ def has_capacity(self): return len(self.downloader.sites) < self.downloader.concurrent_spiders def crawl(self, request, spider): - if spider in self.closing: # ignore requests for spiders being closed - return assert spider in self.open_spiders, \ "Spider %r not opened when crawling: %s" % (spider.name, request) self.schedule(request, spider) self.next_request(spider) - # FIXME: we can't log errors because we would be preventing them from - # propagating to the request errback. This should be fixed after the - # next core refactoring. - #schd.addErrback(log.err, "Error on engine.crawl()") def schedule(self, request, spider): return self.scheduler.enqueue_request(spider, request) def download(self, request, spider): + slot = self.slots[request] + slot.add_request(request) if isinstance(request, Response): return request d = self._download(request, spider) d.addCallback(self.download, spider) + d.addBoth(self._remove_request, slot, request) return d + def _remove_request(self, _, slot, request): + slot.remove_request(request) + return _ + def _download(self, request, spider): + slot = self.slots[spider] + slot.add_request(request) def _on_success(response): """handle the result of a page download""" assert isinstance(response, (Response, Request)) @@ -207,6 +233,7 @@ def open_spider(self, spider): assert self.has_capacity(), "No free spider slots when opening %r" % \ spider.name log.msg("Spider opened", spider=spider) + self.slots[spider] = Slot() yield self.scheduler.open_spider(spider) self.downloader.open_spider(spider) yield self.scraper.open_spider(spider) @@ -234,44 +261,54 @@ def _spider_idle(self, spider): def close_spider(self, spider, reason='cancelled'): """Close (cancel) spider and clear all its outstanding requests""" - if spider in self.closing: - return defer.succeed(None) + + slot = self.slots[spider] + if slot.closing: + return slot.closing log.msg("Closing spider (%s)" % reason, spider=spider) - self.closing[spider] = reason + self.scheduler.clear_pending_requests(spider) - dfd = self.downloader.close_spider(spider) - self.closing_dfds[spider] = dfd + + dfd = slot.close() + dfd.addBoth(lambda _: self.scheduler.close_spider(spider)) - dfd.addErrback(log.err, "Unhandled error in scheduler.close_spider()", \ - spider=spider) + dfd.addErrback(log.err, spider=spider) + + dfd.addBoth(lambda _: self.downloader.close_spider(spider)) + dfd.addErrback(log.err, spider=spider) + dfd.addBoth(lambda _: self.scraper.close_spider(spider)) - dfd.addErrback(log.err, "Unhandled error in scraper.close_spider()", \ - spider=spider) - dfd.addBoth(lambda _: self._finish_closing_spider(spider)) - return dfd + dfd.addErrback(log.err, spider=spider) - def _close_all_spiders(self): - dfds = [self.close_spider(s, reason='shutdown') for s in self.open_spiders] - dfds += self.closing_dfds.values() - dlist = defer.DeferredList(dfds) - return dlist + dfd.addBoth(lambda _: self._cancel_next_call(spider)) + dfd.addErrback(log.err, spider=spider) + + dfd.addBoth(lambda _: send_catch_log_deferred(signal=signals.spider_closed, \ + spider=spider, reason=reason)) + dfd.addErrback(log.err, spider=spider) - def _finish_closing_spider(self, spider): - """This function is called after the spider has been closed""" - reason = self.closing.pop(spider, 'finished') - call = self._next_request_calls.pop(spider, None) - if call and call.active(): - call.cancel() - dfd = send_catch_log_deferred(signal=signals.spider_closed, \ - spider=spider, reason=reason) dfd.addBoth(lambda _: stats.close_spider(spider, reason=reason)) - dfd.addErrback(log.err, "Unhandled error in stats.close_spider()", - spider=spider) + dfd.addErrback(log.err, spider=spider) + dfd.addBoth(lambda _: log.msg("Spider closed (%s)" % reason, spider=spider)) - dfd.addBoth(lambda _: self.closing_dfds.pop(spider).callback(spider)) + + dfd.addBoth(lambda _: self.slots.pop(spider)) + dfd.addErrback(log.err, spider=spider) + dfd.addBoth(lambda _: self._spider_closed_callback(spider)) + return dfd + def _cancel_next_call(self, spider): + call = self._next_request_calls.pop(spider, None) + if call and call.active: + call.cancel() + + def _close_all_spiders(self): + dfds = [self.close_spider(s, reason='shutdown') for s in self.open_spiders] + dlist = defer.DeferredList(dfds) + return dlist + @defer.inlineCallbacks def _finish_stopping_engine(self): yield send_catch_log_deferred(signal=signals.engine_stopped) diff --git a/scrapy/core/scheduler.py b/scrapy/core/scheduler.py index 59a8b0abb4b..72c026fdc7c 100644 --- a/scrapy/core/scheduler.py +++ b/scrapy/core/scheduler.py @@ -9,9 +9,6 @@ def __init__(self): self.dfo = settings['SCHEDULER_ORDER'].upper() == 'DFO' self.dupefilter = load_object(settings['DUPEFILTER_CLASS'])() - def spider_is_open(self, spider): - return spider in self.pending_requests - def spider_has_pending_requests(self, spider): if spider in self.pending_requests: return bool(self.pending_requests[spider]) diff --git a/scrapy/utils/engine.py b/scrapy/utils/engine.py index f99142f213b..e3775389f2a 100644 --- a/scrapy/utils/engine.py +++ b/scrapy/utils/engine.py @@ -22,13 +22,11 @@ def get_engine_status(engine=None): ] spider_tests = [ "engine.spider_is_idle(spider)", - "engine.closing.get(spider)", - "engine.scheduler.spider_has_pending_requests(spider)", + "engine.slots[spider].closing", "len(engine.scheduler.pending_requests[spider])", "len(engine.downloader.sites[spider].queue)", "len(engine.downloader.sites[spider].active)", "len(engine.downloader.sites[spider].transferring)", - "engine.downloader.sites[spider].closing", "engine.downloader.sites[spider].lastseen", "len(engine.scraper.sites[spider].queue)", "len(engine.scraper.sites[spider].active)",