From 68379197986ae3deb81a545b5fd6920ea3347094 Mon Sep 17 00:00:00 2001 From: Eugenio Lacuesta <1731933+elacuesta@users.noreply.github.com> Date: Mon, 26 Apr 2021 14:55:02 -0300 Subject: [PATCH] Add peek method to queues (#5112) --- pylintrc | 1 + scrapy/pqueues.py | 59 +++++++--- scrapy/squeues.py | 59 +++++++--- tests/test_pqueues.py | 144 +++++++++++++++++++++++ tests/test_squeues_request.py | 214 ++++++++++++++++++++++++++++++++++ 5 files changed, 447 insertions(+), 30 deletions(-) create mode 100644 tests/test_pqueues.py create mode 100644 tests/test_squeues_request.py diff --git a/pylintrc b/pylintrc index 5b6b9fab0c7..972bf99ded8 100644 --- a/pylintrc +++ b/pylintrc @@ -24,6 +24,7 @@ disable=abstract-method, consider-using-in, consider-using-set-comprehension, consider-using-sys-exit, + consider-using-with, cyclic-import, dangerous-default-value, deprecated-method, diff --git a/scrapy/pqueues.py b/scrapy/pqueues.py index a9aa6c649da..b4b63e7c728 100644 --- a/scrapy/pqueues.py +++ b/scrapy/pqueues.py @@ -3,6 +3,7 @@ from scrapy.utils.misc import create_instance + logger = logging.getLogger(__name__) @@ -17,8 +18,7 @@ def _path_safe(text): >>> _path_safe('some@symbol?').startswith('some_symbol_') True """ - pathable_slot = "".join([c if c.isalnum() or c in '-._' else '_' - for c in text]) + pathable_slot = "".join([c if c.isalnum() or c in '-._' else '_' for c in text]) # as we replace some letters we can get collision for different slots # add we add unique part unique_slot = hashlib.md5(text.encode('utf8')).hexdigest() @@ -35,6 +35,9 @@ class ScrapyPriorityQueue: * close() * __len__() + Optionally, the queue could provide a ``peek`` method, that should return the + next object to be returned by ``pop``, but without removing it from the queue. + ``__init__`` method of ScrapyPriorityQueue receives a downstream_queue_cls argument, which is a class used to instantiate a new (internal) queue when a new priority is allocated. @@ -70,10 +73,12 @@ def init_prios(self, startprios): self.curprio = min(startprios) def qfactory(self, key): - return create_instance(self.downstream_queue_cls, - None, - self.crawler, - self.key + '/' + str(key)) + return create_instance( + self.downstream_queue_cls, + None, + self.crawler, + self.key + '/' + str(key), + ) def priority(self, request): return -request.priority @@ -99,6 +104,18 @@ def pop(self): self.curprio = min(prios) if prios else None return m + def peek(self): + """Returns the next object to be returned by :meth:`pop`, + but without removing it from the queue. + + Raises :exc:`NotImplementedError` if the underlying queue class does + not implement a ``peek`` method, which is optional for queues. + """ + if self.curprio is None: + return None + queue = self.queues[self.curprio] + return queue.peek() + def close(self): active = [] for p, q in self.queues.items(): @@ -116,8 +133,7 @@ def __init__(self, crawler): self.downloader = crawler.engine.downloader def stats(self, possible_slots): - return [(self._active_downloads(slot), slot) - for slot in possible_slots] + return [(self._active_downloads(slot), slot) for slot in possible_slots] def get_slot_key(self, request): return self.downloader._get_slot_key(request, None) @@ -162,10 +178,12 @@ def __init__(self, crawler, downstream_queue_cls, key, slot_startprios=()): self.pqueues[slot] = self.pqfactory(slot, startprios) def pqfactory(self, slot, startprios=()): - return ScrapyPriorityQueue(self.crawler, - self.downstream_queue_cls, - self.key + '/' + _path_safe(slot), - startprios) + return ScrapyPriorityQueue( + self.crawler, + self.downstream_queue_cls, + self.key + '/' + _path_safe(slot), + startprios, + ) def pop(self): stats = self._downloader_interface.stats(self.pqueues) @@ -187,9 +205,22 @@ def push(self, request): queue = self.pqueues[slot] queue.push(request) + def peek(self): + """Returns the next object to be returned by :meth:`pop`, + but without removing it from the queue. + + Raises :exc:`NotImplementedError` if the underlying queue class does + not implement a ``peek`` method, which is optional for queues. + """ + stats = self._downloader_interface.stats(self.pqueues) + if not stats: + return None + slot = min(stats)[1] + queue = self.pqueues[slot] + return queue.peek() + def close(self): - active = {slot: queue.close() - for slot, queue in self.pqueues.items()} + active = {slot: queue.close() for slot, queue in self.pqueues.items()} self.pqueues.clear() return active diff --git a/scrapy/squeues.py b/scrapy/squeues.py index 77ffda6f713..44898ba085d 100644 --- a/scrapy/squeues.py +++ b/scrapy/squeues.py @@ -19,7 +19,6 @@ def __init__(self, path, *args, **kwargs): dirname = os.path.dirname(path) if not os.path.exists(dirname): os.makedirs(dirname, exist_ok=True) - super().__init__(path, *args, **kwargs) return DirectoriesCreated @@ -38,6 +37,20 @@ def pop(self): if s: return deserialize(s) + def peek(self): + """Returns the next object to be returned by :meth:`pop`, + but without removing it from the queue. + + Raises :exc:`NotImplementedError` if the underlying queue class does + not implement a ``peek`` method, which is optional for queues. + """ + try: + s = super().peek() + except AttributeError as ex: + raise NotImplementedError("The underlying queue class does not implement 'peek'") from ex + if s: + return deserialize(s) + return SerializableQueue @@ -59,12 +72,21 @@ def push(self, request): def pop(self): request = super().pop() - if not request: return None + return request_from_dict(request, self.spider) + + def peek(self): + """Returns the next object to be returned by :meth:`pop`, + but without removing it from the queue. - request = request_from_dict(request, self.spider) - return request + Raises :exc:`NotImplementedError` if the underlying queue class does + not implement a ``peek`` method, which is optional for queues. + """ + request = super().peek() + if not request: + return None + return request_from_dict(request, self.spider) return ScrapyRequestQueue @@ -76,6 +98,19 @@ class ScrapyRequestQueue(queue_class): def from_crawler(cls, crawler, *args, **kwargs): return cls() + def peek(self): + """Returns the next object to be returned by :meth:`pop`, + but without removing it from the queue. + + Raises :exc:`NotImplementedError` if the underlying queue class does + not implement a ``peek`` method, which is optional for queues. + """ + try: + s = super().peek() + except AttributeError as ex: + raise NotImplementedError("The underlying queue class does not implement 'peek'") from ex + return s + return ScrapyRequestQueue @@ -109,17 +144,9 @@ def _pickle_serialize(obj): marshal.loads ) -PickleFifoDiskQueue = _scrapy_serialization_queue( - PickleFifoDiskQueueNonRequest -) -PickleLifoDiskQueue = _scrapy_serialization_queue( - PickleLifoDiskQueueNonRequest -) -MarshalFifoDiskQueue = _scrapy_serialization_queue( - MarshalFifoDiskQueueNonRequest -) -MarshalLifoDiskQueue = _scrapy_serialization_queue( - MarshalLifoDiskQueueNonRequest -) +PickleFifoDiskQueue = _scrapy_serialization_queue(PickleFifoDiskQueueNonRequest) +PickleLifoDiskQueue = _scrapy_serialization_queue(PickleLifoDiskQueueNonRequest) +MarshalFifoDiskQueue = _scrapy_serialization_queue(MarshalFifoDiskQueueNonRequest) +MarshalLifoDiskQueue = _scrapy_serialization_queue(MarshalLifoDiskQueueNonRequest) FifoMemoryQueue = _scrapy_non_serialization_queue(queue.FifoMemoryQueue) LifoMemoryQueue = _scrapy_non_serialization_queue(queue.LifoMemoryQueue) diff --git a/tests/test_pqueues.py b/tests/test_pqueues.py new file mode 100644 index 00000000000..ec55033d15b --- /dev/null +++ b/tests/test_pqueues.py @@ -0,0 +1,144 @@ +import tempfile +import unittest + +import queuelib + +from scrapy.http.request import Request +from scrapy.pqueues import ScrapyPriorityQueue, DownloaderAwarePriorityQueue +from scrapy.spiders import Spider +from scrapy.squeues import FifoMemoryQueue +from scrapy.utils.test import get_crawler + +from tests.test_scheduler import MockDownloader, MockEngine + + +class PriorityQueueTest(unittest.TestCase): + def setUp(self): + self.crawler = get_crawler(Spider) + self.spider = self.crawler._create_spider("foo") + + def test_queue_push_pop_one(self): + temp_dir = tempfile.mkdtemp() + queue = ScrapyPriorityQueue.from_crawler(self.crawler, FifoMemoryQueue, temp_dir) + self.assertIsNone(queue.pop()) + self.assertEqual(len(queue), 0) + req1 = Request("https://example.org/1", priority=1) + queue.push(req1) + self.assertEqual(len(queue), 1) + dequeued = queue.pop() + self.assertEqual(len(queue), 0) + self.assertEqual(dequeued.url, req1.url) + self.assertEqual(dequeued.priority, req1.priority) + self.assertEqual(queue.close(), []) + + def test_no_peek_raises(self): + if hasattr(queuelib.queue.FifoMemoryQueue, "peek"): + raise unittest.SkipTest("queuelib.queue.FifoMemoryQueue.peek is defined") + temp_dir = tempfile.mkdtemp() + queue = ScrapyPriorityQueue.from_crawler(self.crawler, FifoMemoryQueue, temp_dir) + queue.push(Request("https://example.org")) + with self.assertRaises(NotImplementedError, msg="The underlying queue class does not implement 'peek'"): + queue.peek() + queue.close() + + def test_peek(self): + if not hasattr(queuelib.queue.FifoMemoryQueue, "peek"): + raise unittest.SkipTest("queuelib.queue.FifoMemoryQueue.peek is undefined") + temp_dir = tempfile.mkdtemp() + queue = ScrapyPriorityQueue.from_crawler(self.crawler, FifoMemoryQueue, temp_dir) + self.assertEqual(len(queue), 0) + self.assertIsNone(queue.peek()) + req1 = Request("https://example.org/1") + req2 = Request("https://example.org/2") + req3 = Request("https://example.org/3") + queue.push(req1) + queue.push(req2) + queue.push(req3) + self.assertEqual(len(queue), 3) + self.assertEqual(queue.peek().url, req1.url) + self.assertEqual(queue.pop().url, req1.url) + self.assertEqual(len(queue), 2) + self.assertEqual(queue.peek().url, req2.url) + self.assertEqual(queue.pop().url, req2.url) + self.assertEqual(len(queue), 1) + self.assertEqual(queue.peek().url, req3.url) + self.assertEqual(queue.pop().url, req3.url) + self.assertEqual(queue.close(), []) + + def test_queue_push_pop_priorities(self): + temp_dir = tempfile.mkdtemp() + queue = ScrapyPriorityQueue.from_crawler(self.crawler, FifoMemoryQueue, temp_dir, [-1, -2, -3]) + self.assertIsNone(queue.pop()) + self.assertEqual(len(queue), 0) + req1 = Request("https://example.org/1", priority=1) + req2 = Request("https://example.org/2", priority=2) + req3 = Request("https://example.org/3", priority=3) + queue.push(req1) + queue.push(req2) + queue.push(req3) + self.assertEqual(len(queue), 3) + dequeued = queue.pop() + self.assertEqual(len(queue), 2) + self.assertEqual(dequeued.url, req3.url) + self.assertEqual(dequeued.priority, req3.priority) + self.assertEqual(queue.close(), [-1, -2]) + + +class DownloaderAwarePriorityQueueTest(unittest.TestCase): + def setUp(self): + crawler = get_crawler(Spider) + crawler.engine = MockEngine(downloader=MockDownloader()) + self.queue = DownloaderAwarePriorityQueue.from_crawler( + crawler=crawler, + downstream_queue_cls=FifoMemoryQueue, + key="foo/bar", + ) + + def tearDown(self): + self.queue.close() + + def test_push_pop(self): + self.assertEqual(len(self.queue), 0) + self.assertIsNone(self.queue.pop()) + req1 = Request("http://www.example.com/1") + req2 = Request("http://www.example.com/2") + req3 = Request("http://www.example.com/3") + self.queue.push(req1) + self.queue.push(req2) + self.queue.push(req3) + self.assertEqual(len(self.queue), 3) + self.assertEqual(self.queue.pop().url, req1.url) + self.assertEqual(len(self.queue), 2) + self.assertEqual(self.queue.pop().url, req2.url) + self.assertEqual(len(self.queue), 1) + self.assertEqual(self.queue.pop().url, req3.url) + self.assertEqual(len(self.queue), 0) + self.assertIsNone(self.queue.pop()) + + def test_no_peek_raises(self): + if hasattr(queuelib.queue.FifoMemoryQueue, "peek"): + raise unittest.SkipTest("queuelib.queue.FifoMemoryQueue.peek is defined") + self.queue.push(Request("https://example.org")) + with self.assertRaises(NotImplementedError, msg="The underlying queue class does not implement 'peek'"): + self.queue.peek() + + def test_peek(self): + if not hasattr(queuelib.queue.FifoMemoryQueue, "peek"): + raise unittest.SkipTest("queuelib.queue.FifoMemoryQueue.peek is undefined") + self.assertEqual(len(self.queue), 0) + req1 = Request("https://example.org/1") + req2 = Request("https://example.org/2") + req3 = Request("https://example.org/3") + self.queue.push(req1) + self.queue.push(req2) + self.queue.push(req3) + self.assertEqual(len(self.queue), 3) + self.assertEqual(self.queue.peek().url, req1.url) + self.assertEqual(self.queue.pop().url, req1.url) + self.assertEqual(len(self.queue), 2) + self.assertEqual(self.queue.peek().url, req2.url) + self.assertEqual(self.queue.pop().url, req2.url) + self.assertEqual(len(self.queue), 1) + self.assertEqual(self.queue.peek().url, req3.url) + self.assertEqual(self.queue.pop().url, req3.url) + self.assertIsNone(self.queue.peek()) diff --git a/tests/test_squeues_request.py b/tests/test_squeues_request.py new file mode 100644 index 00000000000..c5fcc1853db --- /dev/null +++ b/tests/test_squeues_request.py @@ -0,0 +1,214 @@ +import shutil +import tempfile +import unittest + +import queuelib + +from scrapy.squeues import ( + PickleFifoDiskQueue, + PickleLifoDiskQueue, + MarshalFifoDiskQueue, + MarshalLifoDiskQueue, + FifoMemoryQueue, + LifoMemoryQueue, +) +from scrapy.http import Request +from scrapy.spiders import Spider +from scrapy.utils.test import get_crawler + + +""" +Queues that handle requests +""" + + +class BaseQueueTestCase(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp(prefix="scrapy-queue-tests-") + self.qpath = self.tempfilename() + self.qdir = self.mkdtemp() + self.crawler = get_crawler(Spider) + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + def tempfilename(self): + with tempfile.NamedTemporaryFile(dir=self.tmpdir) as nf: + return nf.name + + def mkdtemp(self): + return tempfile.mkdtemp(dir=self.tmpdir) + + +class RequestQueueTestMixin: + def queue(self): + raise NotImplementedError() + + def test_one_element_with_peek(self): + if not hasattr(queuelib.queue.FifoMemoryQueue, "peek"): + raise unittest.SkipTest("The queuelib queues do not define peek") + q = self.queue() + self.assertEqual(len(q), 0) + self.assertIsNone(q.peek()) + self.assertIsNone(q.pop()) + req = Request("http://www.example.com") + q.push(req) + self.assertEqual(len(q), 1) + self.assertEqual(q.peek().url, req.url) + self.assertEqual(q.pop().url, req.url) + self.assertEqual(len(q), 0) + self.assertIsNone(q.peek()) + self.assertIsNone(q.pop()) + q.close() + + def test_one_element_without_peek(self): + if hasattr(queuelib.queue.FifoMemoryQueue, "peek"): + raise unittest.SkipTest("The queuelib queues define peek") + q = self.queue() + self.assertEqual(len(q), 0) + self.assertIsNone(q.pop()) + req = Request("http://www.example.com") + q.push(req) + self.assertEqual(len(q), 1) + with self.assertRaises(NotImplementedError, msg="The underlying queue class does not implement 'peek'"): + q.peek() + self.assertEqual(q.pop().url, req.url) + self.assertEqual(len(q), 0) + self.assertIsNone(q.pop()) + q.close() + + +class FifoQueueMixin(RequestQueueTestMixin): + def test_fifo_with_peek(self): + if not hasattr(queuelib.queue.FifoMemoryQueue, "peek"): + raise unittest.SkipTest("The queuelib queues do not define peek") + q = self.queue() + self.assertEqual(len(q), 0) + self.assertIsNone(q.peek()) + self.assertIsNone(q.pop()) + req1 = Request("http://www.example.com/1") + req2 = Request("http://www.example.com/2") + req3 = Request("http://www.example.com/3") + q.push(req1) + q.push(req2) + q.push(req3) + self.assertEqual(len(q), 3) + self.assertEqual(q.peek().url, req1.url) + self.assertEqual(q.pop().url, req1.url) + self.assertEqual(len(q), 2) + self.assertEqual(q.peek().url, req2.url) + self.assertEqual(q.pop().url, req2.url) + self.assertEqual(len(q), 1) + self.assertEqual(q.peek().url, req3.url) + self.assertEqual(q.pop().url, req3.url) + self.assertEqual(len(q), 0) + self.assertIsNone(q.peek()) + self.assertIsNone(q.pop()) + q.close() + + def test_fifo_without_peek(self): + if hasattr(queuelib.queue.FifoMemoryQueue, "peek"): + raise unittest.SkipTest("The queuelib queues do not define peek") + q = self.queue() + self.assertEqual(len(q), 0) + self.assertIsNone(q.pop()) + req1 = Request("http://www.example.com/1") + req2 = Request("http://www.example.com/2") + req3 = Request("http://www.example.com/3") + q.push(req1) + q.push(req2) + q.push(req3) + with self.assertRaises(NotImplementedError, msg="The underlying queue class does not implement 'peek'"): + q.peek() + self.assertEqual(len(q), 3) + self.assertEqual(q.pop().url, req1.url) + self.assertEqual(len(q), 2) + self.assertEqual(q.pop().url, req2.url) + self.assertEqual(len(q), 1) + self.assertEqual(q.pop().url, req3.url) + self.assertEqual(len(q), 0) + self.assertIsNone(q.pop()) + q.close() + + +class LifoQueueMixin(RequestQueueTestMixin): + def test_lifo_with_peek(self): + if not hasattr(queuelib.queue.FifoMemoryQueue, "peek"): + raise unittest.SkipTest("The queuelib queues do not define peek") + q = self.queue() + self.assertEqual(len(q), 0) + self.assertIsNone(q.peek()) + self.assertIsNone(q.pop()) + req1 = Request("http://www.example.com/1") + req2 = Request("http://www.example.com/2") + req3 = Request("http://www.example.com/3") + q.push(req1) + q.push(req2) + q.push(req3) + self.assertEqual(len(q), 3) + self.assertEqual(q.peek().url, req3.url) + self.assertEqual(q.pop().url, req3.url) + self.assertEqual(len(q), 2) + self.assertEqual(q.peek().url, req2.url) + self.assertEqual(q.pop().url, req2.url) + self.assertEqual(len(q), 1) + self.assertEqual(q.peek().url, req1.url) + self.assertEqual(q.pop().url, req1.url) + self.assertEqual(len(q), 0) + self.assertIsNone(q.peek()) + self.assertIsNone(q.pop()) + q.close() + + def test_lifo_without_peek(self): + if hasattr(queuelib.queue.FifoMemoryQueue, "peek"): + raise unittest.SkipTest("The queuelib queues do not define peek") + q = self.queue() + self.assertEqual(len(q), 0) + self.assertIsNone(q.pop()) + req1 = Request("http://www.example.com/1") + req2 = Request("http://www.example.com/2") + req3 = Request("http://www.example.com/3") + q.push(req1) + q.push(req2) + q.push(req3) + with self.assertRaises(NotImplementedError, msg="The underlying queue class does not implement 'peek'"): + q.peek() + self.assertEqual(len(q), 3) + self.assertEqual(q.pop().url, req3.url) + self.assertEqual(len(q), 2) + self.assertEqual(q.pop().url, req2.url) + self.assertEqual(len(q), 1) + self.assertEqual(q.pop().url, req1.url) + self.assertEqual(len(q), 0) + self.assertIsNone(q.pop()) + q.close() + + +class PickleFifoDiskQueueRequestTest(FifoQueueMixin, BaseQueueTestCase): + def queue(self): + return PickleFifoDiskQueue.from_crawler(crawler=self.crawler, key="pickle/fifo") + + +class PickleLifoDiskQueueRequestTest(LifoQueueMixin, BaseQueueTestCase): + def queue(self): + return PickleLifoDiskQueue.from_crawler(crawler=self.crawler, key="pickle/lifo") + + +class MarshalFifoDiskQueueRequestTest(FifoQueueMixin, BaseQueueTestCase): + def queue(self): + return MarshalFifoDiskQueue.from_crawler(crawler=self.crawler, key="marshal/fifo") + + +class MarshalLifoDiskQueueRequestTest(LifoQueueMixin, BaseQueueTestCase): + def queue(self): + return MarshalLifoDiskQueue.from_crawler(crawler=self.crawler, key="marshal/lifo") + + +class FifoMemoryQueueRequestTest(FifoQueueMixin, BaseQueueTestCase): + def queue(self): + return FifoMemoryQueue.from_crawler(crawler=self.crawler) + + +class LifoMemoryQueueRequestTest(LifoQueueMixin, BaseQueueTestCase): + def queue(self): + return LifoMemoryQueue.from_crawler(crawler=self.crawler)