Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

downloader-aware scheduler cleanups #2

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
180 changes: 81 additions & 99 deletions scrapy/pqueues.py
@@ -1,6 +1,6 @@
import hashlib
import logging
from six import text_type
from collections import namedtuple
from six.moves.urllib.parse import urlparse

from queuelib import PriorityQueue
Expand All @@ -17,12 +17,12 @@
SCHEDULER_SLOT_META_KEY = Downloader.DOWNLOAD_SLOT


def _get_from_request(request, key, default=None):
def _get_request_meta(request):
if isinstance(request, dict):
return request.get(key, default)
return request.setdefault('meta', {})

if isinstance(request, Request):
return getattr(request, key, default)
return request.meta

raise ValueError('Bad type of request "%s"' % (request.__class__, ))

Expand All @@ -35,15 +35,8 @@ def _scheduler_slot_write(request, slot):
request.meta[SCHEDULER_SLOT_META_KEY] = slot


def _scheduler_slot(request):

if isinstance(request, dict):
meta = request.get('meta', dict())
elif isinstance(request, Request):
meta = request.meta
else:
raise ValueError('Bad type of request "%s"' % (request.__class__, ))

def _set_scheduler_slot(request):
meta = _get_request_meta(request)
slot = meta.get(SCHEDULER_SLOT_META_KEY, None)

if slot is not None:
Expand All @@ -53,43 +46,29 @@ def _scheduler_slot(request):
url = request.get('url', None)
slot = urlparse(url).hostname or ''
elif isinstance(request, Request):
url = request.url
slot = urlparse_cached(request).hostname or ''

meta[SCHEDULER_SLOT_META_KEY] = slot

return slot


def _pathable(x):
pathable_slot = "".join([c if c.isalnum() or c in '-._' else '_' for c in x])

def _path_safe(text):
""" Return a filesystem-safe version of a string ``text`` """
pathable_slot = "".join([c if c.isalnum() or c in '-._' else '_'
for c in text])
# as we replace some letters we can get collision for different slots
# add we add unique part
unique_slot = hashlib.md5(x.encode('utf8')).hexdigest()

unique_slot = hashlib.md5(text.encode('utf8')).hexdigest()
return '-'.join([pathable_slot, unique_slot])


class PrioritySlot:
__slots__ = ('priority', 'slot')

def __init__(self, priority=0, slot=None):
self.priority = priority
self.slot = slot

def __hash__(self):
return hash((self.priority, self.slot))

def __eq__(self, other):
return (self.priority, self.slot) == (other.priority, other.slot)

def __lt__(self, other):
return (self.priority, self.slot) < (other.priority, other.slot)
class PrioritySlot(namedtuple("PrioritySlot", ["priority", "slot"])):
""" ``(priority, slot)`` tuple which uses a path-safe slot name
when converting to str """
kmike marked this conversation as resolved.
Show resolved Hide resolved
__slots__ = ()

def __str__(self):
return '_'.join([text_type(self.priority),
_pathable(text_type(self.slot))])
return '%s_%s' % (self.priority, _path_safe(str(self.slot)))


class PriorityAsTupleQueue(PriorityQueue):
Expand All @@ -99,78 +78,65 @@ class PriorityAsTupleQueue(PriorityQueue):
json serializable structures
"""
def __init__(self, qfactory, startprios=()):

startprios = [PrioritySlot(priority=p[0], slot=p[1])
for p in startprios]
super(PriorityAsTupleQueue, self).__init__(
qfactory,
[PrioritySlot(priority=p[0], slot=p[1]) for p in startprios]
)

def close(self):
startprios = super(PriorityAsTupleQueue, self).close()
return [(s.priority, s.slot) for s in startprios]

def is_empty(self):
return not self.queues or len(self) == 0
qfactory=qfactory,
startprios=startprios)


class SlotBasedPriorityQueue(object):
class SlotPriorityQueues(object):
""" Container for multiple priority queues. """
def __init__(self, pqfactory, slot_startprios=None):
"""
``pqfactory`` is a factory for creating new PriorityQueues.
It must be a function which accepts a single optional ``startprios``
argument, with a list of priorities to create queues for.
def __init__(self, qfactory, startprios={}):
self.pqueues = dict() # slot -> priority queue
self.qfactory = qfactory # factory for creating new internal queues

if not startprios:
return

if not isinstance(startprios, dict):
raise ValueError("Looks like your priorities file malforfemed. "
"Possible reason: You run scrapy with previous "
"version. Interrupted it. Updated scrapy. And "
"run again.")

for slot, prios in startprios.items():
self.pqueues[slot] = PriorityAsTupleQueue(self.qfactory, prios)
``slot_startprios`` is a ``{slot: startprios}`` dict.
"""
self.pqfactory = pqfactory
self.pqueues = {} # slot -> priority queue
for slot, startprios in (slot_startprios or {}).items():
self.pqueues[slot] = self.pqfactory(startprios)

def pop_slot(self, slot):
""" Pop an object from a priority queue for this slot """
queue = self.pqueues[slot]
request = queue.pop()
is_empty = queue.is_empty()
if is_empty:
if len(queue) == 0:
del self.pqueues[slot]
return request

return request, is_empty

def push_slot(self, request, priority):
slot = _scheduler_slot(request)
is_new = False
def push_slot(self, slot, obj, priority):
""" Push an object to a priority queue for this slot """
if slot not in self.pqueues:
self.pqueues[slot] = PriorityAsTupleQueue(self.qfactory)
self.pqueues[slot] = self.pqfactory()
queue = self.pqueues[slot]
is_new = queue.is_empty()
queue.push(request, PrioritySlot(priority=priority, slot=slot))
return slot, is_new
queue.push(obj, priority)

def close(self):
startprios = dict()
for slot, queue in self.pqueues.items():
prios = queue.close()
startprios[slot] = prios
active = {slot: queue.close()
for slot, queue in self.pqueues.items()}
self.pqueues.clear()
return startprios
return active

def __len__(self):
return sum(len(x) for x in self.pqueues.values()) if self.pqueues else 0

def __contains__(self, slot):
return slot in self.pqueues


class DownloaderAwarePriorityQueue(SlotBasedPriorityQueue):
class DownloaderAwarePriorityQueue(object):

_DOWNLOADER_AWARE_PQ_ID = 'DOWNLOADER_AWARE_PQ_ID'

@classmethod
def from_crawler(cls, crawler, qfactory, startprios={}):
def from_crawler(cls, crawler, qfactory, startprios=None):
whalebot-helmsman marked this conversation as resolved.
Show resolved Hide resolved
return cls(crawler, qfactory, startprios)

def __init__(self, crawler, qfactory, startprios={}):
def __init__(self, crawler, qfactory, startprios=None):
ip_concurrency_key = 'CONCURRENT_REQUESTS_PER_IP'
ip_concurrency = crawler.settings.getint(ip_concurrency_key, 0)

Expand All @@ -179,16 +145,25 @@ def __init__(self, crawler, qfactory, startprios={}):
ip_concurrency_key,
ip_concurrency))

super(DownloaderAwarePriorityQueue, self).__init__(qfactory,
startprios)
self._slots = {slot: 0 for slot in self.pqueues}
def pqfactory(startprios=()):
return PriorityAsTupleQueue(qfactory, startprios)

if startprios and not isinstance(startprios, dict):
raise ValueError("DownloaderAwarePriorityQueue accepts "
"``startprios`` as a dict; %r instance is passed."
" Only a crawl started with the same priority "
"queue class can be resumed." % startprios.__class__)
self._slot_pqueues = SlotPriorityQueues(pqfactory,
slot_startprios=startprios)

self._active_downloads = {slot: 0 for slot in self._slot_pqueues.pqueues}
crawler.signals.connect(self.on_response_download,
signal=response_downloaded)
crawler.signals.connect(self.on_request_reached_downloader,
signal=request_reached_downloader)

def mark(self, request):
meta = _get_from_request(request, 'meta', None)
meta = _get_request_meta(request)
if not isinstance(meta, dict):
raise ValueError('No meta attribute in %s' % (request, ))
meta[self._DOWNLOADER_AWARE_PQ_ID] = id(self)
Expand All @@ -197,39 +172,46 @@ def check_mark(self, request):
return request.meta.get(self._DOWNLOADER_AWARE_PQ_ID, None) == id(self)

def pop(self):
slots = [(d, s) for s, d in self._slots.items() if s in self.pqueues]
slots = [(active_downloads, slot)
for slot, active_downloads in self._active_downloads.items()
if slot in self._slot_pqueues]

if not slots:
return

slot = min(slots)[1]
request, _ = self.pop_slot(slot)
request = self._slot_pqueues.pop_slot(slot)
self.mark(request)
return request

def push(self, request, priority):
slot, _ = self.push_slot(request, priority)
if slot not in self._slots:
self._slots[slot] = 0
slot = _set_scheduler_slot(request)
priority_slot = PrioritySlot(priority=priority, slot=slot)
self._slot_pqueues.push_slot(slot, request, priority_slot)
if slot not in self._active_downloads:
self._active_downloads[slot] = 0

def on_response_download(self, response, request, spider):
if not self.check_mark(request):
return

slot = _scheduler_slot_read(request)
if slot not in self._slots or self._slots[slot] <= 0:
if slot not in self._active_downloads or self._active_downloads[slot] <= 0:
raise ValueError('Get response for wrong slot "%s"' % (slot, ))
self._slots[slot] = self._slots[slot] - 1
if self._slots[slot] == 0 and slot not in self.pqueues:
del self._slots[slot]
self._active_downloads[slot] = self._active_downloads[slot] - 1
if self._active_downloads[slot] == 0 and slot not in self._slot_pqueues:
del self._active_downloads[slot]

def on_request_reached_downloader(self, request, spider):
if not self.check_mark(request):
return

slot = _scheduler_slot_read(request)
self._slots[slot] = self._slots.get(slot, 0) + 1
self._active_downloads[slot] = self._active_downloads.get(slot, 0) + 1

def close(self):
self._slots.clear()
return super(DownloaderAwarePriorityQueue, self).close()
self._active_downloads.clear()
return self._slot_pqueues.close()

def __len__(self):
return len(self._slot_pqueues)
2 changes: 1 addition & 1 deletion tests/test_scheduler.py
Expand Up @@ -260,7 +260,7 @@ def test_logic(self):
)
requests.append(request)

self.assertEqual(self.scheduler.mqs._slots, {})
self.assertEqual(self.scheduler.mqs._active_downloads, {})
self.assertEqual(len(slots), len(_SLOTS))

for request in requests:
Expand Down