Skip to content

Commit

Permalink
Merge pull request #178 from scrapinghub/link-sw-assignment-fix
Browse files Browse the repository at this point in the history
extracted links to SW instances assignment fix
  • Loading branch information
sibiryakov committed Aug 17, 2016
2 parents e7d824b + 3f22470 commit 0be3855
Show file tree
Hide file tree
Showing 30 changed files with 282 additions and 106 deletions.
10 changes: 6 additions & 4 deletions frontera/contrib/backends/__init__.py
Expand Up @@ -60,18 +60,20 @@ def get_next_requests(self, max_next_requests, **kwargs):
self.queue_size -= len(batch)
return batch

def page_crawled(self, response, links):
def page_crawled(self, response):
response.meta[b'state'] = States.CRAWLED
self.states.update_cache(response)
depth = response.meta.get(b'depth', 0)+1
self.metadata.page_crawled(response)

def links_extracted(self, request, links):
to_fetch = OrderedDict()
for link in links:
to_fetch[link.meta[b'fingerprint']] = link
link.meta[b'depth'] = depth
link.meta[b'depth'] = request.meta.get(b'depth', 0)+1
self.states.fetch(to_fetch.keys())
self.states.set_states(links)
unique_links = to_fetch.values()
self.metadata.page_crawled(response, unique_links)
self.metadata.links_extracted(request, unique_links)
self._schedule(unique_links)
self.states.update_cache(unique_links)

Expand Down
13 changes: 9 additions & 4 deletions frontera/contrib/backends/hbase.py
Expand Up @@ -358,13 +358,15 @@ def add_seeds(self, seeds):
domain_fingerprint=seed.meta[b'domain'][b'fingerprint'])
self.batch.put(unhexlify(seed.meta[b'fingerprint']), obj)

def page_crawled(self, response, links):
def page_crawled(self, response):
obj = prepare_hbase_object(status_code=response.status_code, content=response.body) if self.store_content else \
prepare_hbase_object(status_code=response.status_code)
self.batch.put(unhexlify(response.meta[b'fingerprint']), obj)

def links_extracted(self, request, links):
links_dict = dict()
for link in links:
links_dict[unhexlify(link.meta[b'fingerprint'])] = (link, link.url, link.meta[b'domain'])
self.batch.put(unhexlify(response.meta[b'fingerprint']), obj)
for link_fingerprint, (link, link_url, link_domain) in six.iteritems(links_dict):
obj = prepare_hbase_object(url=link_url,
created_at=utcnow_timestamp(),
Expand Down Expand Up @@ -466,8 +468,11 @@ def frontier_stop(self):
def add_seeds(self, seeds):
self.metadata.add_seeds(seeds)

def page_crawled(self, response, links):
self.metadata.page_crawled(response, links)
def page_crawled(self, response):
self.metadata.page_crawled(response)

def links_extracted(self, request, links):
self.metadata.links_extracted(request, links)

def request_error(self, page, error):
self.metadata.request_error(page, error)
Expand Down
8 changes: 5 additions & 3 deletions frontera/contrib/backends/memory/__init__.py
Expand Up @@ -26,8 +26,10 @@ def request_error(self, request, error):
request.meta[b'error'] = error
self._get_or_create_request(request)

def page_crawled(self, response, links):
def page_crawled(self, response):
self._get_or_create_request(response.request)

def links_extracted(self, request, links):
for link in links:
self._get_or_create_request(link)

Expand Down Expand Up @@ -191,11 +193,11 @@ def add_seeds(self, seeds):
self._id += 1
super(MemoryBaseBackend, self).add_seeds(seeds)

def page_crawled(self, response, links):
def links_extracted(self, request, links):
for link in links:
link.meta[b'id'] = self._id
self._id += 1
super(MemoryBaseBackend, self).page_crawled(response, links)
super(MemoryBaseBackend, self).links_extracted(request, links)

def finished(self):
return self.queue.count() == 0
Expand Down
17 changes: 13 additions & 4 deletions frontera/contrib/backends/remote/codecs/json.py
Expand Up @@ -51,10 +51,16 @@ def encode_add_seeds(self, seeds):
'seeds': [_prepare_request_message(seed) for seed in seeds]
})

def encode_page_crawled(self, response, links):
def encode_page_crawled(self, response):
return self.encode({
'type': 'page_crawled',
'r': _prepare_response_message(response, self.send_body),
'r': _prepare_response_message(response, self.send_body)
})

def encode_links_extracted(self, request, links):
return self.encode({
'type': 'links_extracted',
'r': _prepare_request_message(request),
'links': _prepare_links_message(links)
})

Expand Down Expand Up @@ -112,10 +118,13 @@ def _request_from_object(self, obj):

def decode(self, message):
message = dict_to_bytes(super(Decoder, self).decode(message))
if message[b'type'] == b'links_extracted':
request = self._request_from_object(message[b'r'])
links = [self._request_from_object(link) for link in message[b'links']]
return ('links_extracted', request, links)
if message[b'type'] == b'page_crawled':
response = self._response_from_object(message[b'r'])
links = [self._request_from_object(link) for link in message[b'links']]
return ('page_crawled', response, links)
return ('page_crawled', response)
if message[b'type'] == b'request_error':
request = self._request_from_object(message[b'r'])
return ('request_error', request, to_native_str(message[b'error']))
Expand Down
12 changes: 9 additions & 3 deletions frontera/contrib/backends/remote/codecs/msgpack.py
Expand Up @@ -44,8 +44,11 @@ def __init__(self, request_model, *a, **kw):
def encode_add_seeds(self, seeds):
return packb([b'as', [_prepare_request_message(seed) for seed in seeds]])

def encode_page_crawled(self, response, links):
return packb([b'pc', _prepare_response_message(response, self.send_body), [_prepare_request_message(link) for link in links]])
def encode_page_crawled(self, response):
return packb([b'pc', _prepare_response_message(response, self.send_body)])

def encode_links_extracted(self, request, links):
return packb([b'le', _prepare_request_message(request), [_prepare_request_message(link) for link in links]])

def encode_request_error(self, request, error):
return packb([b're', _prepare_request_message(request), str(error)])
Expand Down Expand Up @@ -87,7 +90,10 @@ def decode(self, buffer):
obj = unpackb(buffer)
if obj[0] == b'pc':
return ('page_crawled',
self._response_from_object(obj[1]),
self._response_from_object(obj[1]))
if obj[0] == b'le':
return ('links_extracted',
self._request_from_object(obj[1]),
[self._request_from_object(x) for x in obj[2]])
if obj[0] == b'us':
return ('update_score', self._request_from_object(obj[1]), obj[2], obj[3])
Expand Down
38 changes: 34 additions & 4 deletions frontera/contrib/backends/remote/messagebus.py
Expand Up @@ -4,6 +4,7 @@
from frontera.core import OverusedBuffer
from frontera.utils.misc import load_object
import logging
import six


class MessageBusBackend(Backend):
Expand Down Expand Up @@ -40,13 +41,24 @@ def frontier_stop(self):
self.spider_log_producer.flush()

def add_seeds(self, seeds):
self.spider_log_producer.send(seeds[0].meta[b'fingerprint'], self._encoder.encode_add_seeds(seeds))
per_host = aggregate_per_host(seeds)
for host_fprint, host_links in six.iteritems(per_host):
self.spider_log_producer.send(host_fprint,
self._encoder.encode_add_seeds(host_links))

def page_crawled(self, response, links):
self.spider_log_producer.send(response.meta[b'fingerprint'], self._encoder.encode_page_crawled(response, links))
def page_crawled(self, response):
host_fprint = get_host_fprint(response)
self.spider_log_producer.send(host_fprint, self._encoder.encode_page_crawled(response))

def links_extracted(self, request, links):
per_host = aggregate_per_host(links)
for host_fprint, host_links in six.iteritems(per_host):
self.spider_log_producer.send(host_fprint,
self._encoder.encode_links_extracted(request, host_links))

def request_error(self, page, error):
self.spider_log_producer.send(page.meta[b'fingerprint'], self._encoder.encode_request_error(page, error))
host_fprint = get_host_fprint(page)
self.spider_log_producer.send(host_fprint, self._encoder.encode_request_error(page, error))

def _get_next_requests(self, max_n_requests, **kwargs):
requests = []
Expand Down Expand Up @@ -78,3 +90,21 @@ def queue(self):
@property
def states(self):
return None


def aggregate_per_host(requests):
per_host = dict()
for link in requests:
if b'fingerprint' not in link.meta[b'domain']:
continue
host_fprint = link.meta[b'domain'][b'fingerprint']
if host_fprint not in per_host:
per_host[host_fprint] = []
per_host[host_fprint].append(link)
return per_host


def get_host_fprint(request):
if b'fingerprint' not in request.meta[b'domain']:
return None
return request.meta[b'domain'][b'fingerprint']
7 changes: 5 additions & 2 deletions frontera/contrib/backends/sqlalchemy/__init__.py
Expand Up @@ -204,8 +204,11 @@ def get_next_requests(self, max_next_requests, **kwargs):
batch.extend(self.queue.get_next_requests(max_next_requests, partition_id, **kwargs))
return batch

def page_crawled(self, response, links):
self.metadata.page_crawled(response, links)
def page_crawled(self, response):
self.metadata.page_crawled(response)

def links_extracted(self, request, links):
self.metadata.links_extracted(request, links)

def request_error(self, request, error):
self.metadata.request_error(request, error)
Expand Down
7 changes: 5 additions & 2 deletions frontera/contrib/backends/sqlalchemy/components.py
Expand Up @@ -62,9 +62,12 @@ def request_error(self, page, error):
self.session.commit()

@retry_and_rollback
def page_crawled(self, response, links):
def page_crawled(self, response):
r = self._modify_page(response) if response.meta[b'fingerprint'] in self.cache else self._create_page(response)
self.cache[to_bytes(r.fingerprint)] = self.session.merge(r)
self.cache[r.fingerprint] = self.session.merge(r)
self.session.commit()

def links_extracted(self, request, links):
for link in links:
if link.meta[b'fingerprint'] not in self.cache:
self.cache[link.meta[b'fingerprint']] = self.session.merge(self._create_page(link))
Expand Down
4 changes: 2 additions & 2 deletions frontera/contrib/backends/sqlalchemy/revisiting.py
Expand Up @@ -125,8 +125,8 @@ def _schedule(self, requests):
self.metadata.update_score(batch)
self.queue_size += len(batch)

def page_crawled(self, response, links):
super(Backend, self).page_crawled(response, links)
def page_crawled(self, response):
super(Backend, self).page_crawled(response)
self.states.set_states(response.request)
self._schedule([response.request])
self.states.update_cache(response.request)
4 changes: 3 additions & 1 deletion frontera/contrib/canonicalsolvers/basic.py
Expand Up @@ -21,8 +21,10 @@ def add_seeds(self, seeds):
for seed in seeds:
self._set_canonical(seed)

def page_crawled(self, response, links):
def page_crawled(self, response):
self._set_canonical(response)

def links_extracted(self, request, links):
for link in links:
self._set_canonical(link)

Expand Down
7 changes: 5 additions & 2 deletions frontera/contrib/middlewares/domain.py
Expand Up @@ -79,10 +79,13 @@ def add_seeds(self, seeds):
self._add_domain(seed)
return seeds

def page_crawled(self, response, links):
def page_crawled(self, response):
return self._add_domain(response)

def links_extracted(self, request, links):
for link in links:
self._add_domain(link)
return self._add_domain(response)
return self._add_domain(request)

def request_error(self, request, error):
return self._add_domain(request)
Expand Down
7 changes: 5 additions & 2 deletions frontera/contrib/middlewares/fingerprint.py
Expand Up @@ -30,10 +30,13 @@ def add_seeds(self, seeds):
self._add_fingerprint(seed)
return seeds

def page_crawled(self, response, links):
def page_crawled(self, response):
return self._add_fingerprint(response)

def links_extracted(self, request, links):
for link in links:
self._add_fingerprint(link)
return self._add_fingerprint(response)
return self._add_fingerprint(request)

def request_error(self, request, error):
return self._add_fingerprint(request)
Expand Down
7 changes: 5 additions & 2 deletions frontera/contrib/scrapy/schedulers/frontier.py
Expand Up @@ -112,8 +112,11 @@ def process_spider_output(self, response, result, spider):
links.append(element)
else:
yield element
self.frontier.page_crawled(response=response,
links=links)
frontier_request = response.meta[b'frontier_request']
self.frontier.page_crawled(response) # removed frontier part from .meta
# putting it back, to persist .meta from original request
response.meta[b'frontier_request'] = frontier_request
self.frontier.links_extracted(response.request, links)
self.stats_manager.add_crawled_page(response.status, len(links))

def process_exception(self, request, exception, spider):
Expand Down
13 changes: 12 additions & 1 deletion frontera/core/codec.py
Expand Up @@ -42,11 +42,22 @@ def encode_add_seeds(self, seeds):
pass

@abstractmethod
def encode_page_crawled(self, response, links):
def encode_page_crawled(self, response):
"""
Encodes a page_crawled message
:param object response: A frontier Response object
:return: bytes encoded message
"""
pass

@abstractmethod
def encode_links_extracted(self, request, links):
"""
Encodes a links_extracted message
:param object request: A frontier Request object
:param list links: A list of Request objects
:return: bytes encoded message
Expand Down
18 changes: 13 additions & 5 deletions frontera/core/components.py
Expand Up @@ -32,20 +32,28 @@ def add_seeds(self, seeds):
pass

@abstractmethod
def page_crawled(self, response, links):
def page_crawled(self, response):
"""
This method is called each time a page has been crawled.
This method is called every time a page has been crawled.
:param object response: The :class:`Response <frontera.core.models.Response>` object for the crawled page.
:param list links: A list of :class:`Request <frontera.core.models.Request>` objects generated from \
the links extracted for the crawled page.
"""
pass

@abstractmethod
def links_extracted(self, request, links):
"""
This method is called every time a links extracted from a document.
:param object request: The original :class:`Request <frontera.core.models.Request>` object for the crawled page.
:param list links: A list of :class:`Request <frontera.core.models.Request>` objects containing extracted links.
"""
pass

@abstractmethod
def request_error(self, page, error):
"""
This method is called each time an error occurs when crawling a page
This method is called each time an error occurs when crawling a page.
:param object request: The crawled with error :class:`Request <frontera.core.models.Request>` object.
:param string error: A string identifier for the error.
Expand Down

0 comments on commit 0be3855

Please sign in to comment.