Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Added Cassandra backend #225

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -47,6 +47,8 @@ before_script:
- docker-compose --version
- docker-compose --verbose -f tests/kafka/docker-compose.yml up -d
- docker ps -a
- docker run --name cassandra -p 127.0.0.1:9042:9042 -d cassandra
- python tests/contrib/backends/cassandra/wait_for_cluster_up.py

script: tox

Expand Down
91 changes: 91 additions & 0 deletions docs/source/topics/frontera-settings.rst
Expand Up @@ -487,6 +487,97 @@ Default: ``timedelta(days=1)``
Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect
documents scheduled after the change. All previously queued documents will be crawled with old periodicity.

.. _cassandra-settings:

Cassandra
---------

.. setting:: CASSANDRABACKEND_CACHE_SIZE

CASSANDRABACKEND_CACHE_SIZE
^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default:: ``10000``

Cassandra Metadata LRU Cache size. It's used for caching objects, which are requested from DB every time already known,
documents are crawled. This is mainly saves DB throughput, increase it if you're experiencing problems with too high
volume of SELECT's to Metadata table, or decrease if you need to save memory.


.. setting:: CASSANDRABACKEND_CLUSTER_HOSTS

CASSANDRABACKEND_CLUSTER_HOSTS
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default:: ``['127.0.0.1']``

The list of contact points to try connecting for cluster discovery. All contact points are not required, the driver
discovers the rest.

.. setting:: CASSANDRABACKEND_CLUSTER_PORT

CASSANDRABACKEND_CLUSTER_PORT
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default:: ``9042``

The server-side port to open connections to Cassandra.

.. setting:: CASSANDRABACKEND_DROP_ALL_TABLES

CASSANDRABACKEND_DROP_ALL_TABLES
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default: ``False``

Set to ``True`` to drop and create all DB tables on backend instantiation.

.. setting:: CASSANDRABACKEND_KEYSPACE

CASSANDRABACKEND_KEYSPACE
^^^^^^^^^^^^^^^^^^^^^^^^^

Default:: ``crawler``

Set Cassandra Keyspace.

.. setting:: CASSANDRABACKEND_MODELS

CASSANDRABACKEND_MODELS
^^^^^^^^^^^^^^^^^^^^^^^

Default::

{
'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel',
'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel',
'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel',
'FifoOrLIfoQueueModel': 'frontera.contrib.backends.cassandra.models.FifoOrLIfoQueueModel',
}

This is mapping of Cassandra models used by backends. It is mainly used for customization.

.. setting:: CASSANDRABACKEND_REQUEST_TIMEOUT

CASSANDRABACKEND_REQUEST_TIMEOUT
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default:: ``60``

Timeout in seconds for every request made by the Cassandra driver for to Cassandra.

Revisiting backend
------------------

.. setting:: CASSANDRABACKEND_REVISIT_INTERVAL

CASSANDRABACKEND_REVISIT_INTERVAL
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Default: ``timedelta(days=1)``

Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect
documents scheduled after the change. All previously queued documents will be crawled with old periodicity.

.. _hbase-settings:

Expand Down
49 changes: 46 additions & 3 deletions docs/source/topics/frontier-backends.rst
Expand Up @@ -254,13 +254,52 @@ For a complete list of all settings used for SQLAlchemy backends check the :doc:
SQLAlchemy :class:`Backend <frontera.core.components.Backend>` implementation of a random selection
algorithm.

.. _frontier-backends-cassandra:

Cassandra backends
^^^^^^^^^^^^^^^^^^

This set of :class:`Backend <frontera.core.components.Backend>` objects will use Cassandra as storage for
:ref:`basic algorithms <frontier-backends-basic-algorithms>`.

If you need to use your own `cassandra models`_, you can do it by using the
:setting:`CASSANDRABACKEND_MODELS` setting.

This setting uses a dictionary where ``key`` represents the name of the model to define and ``value`` the model to use.

For a complete list of all settings used for Cassandra backends check the :doc:`settings <frontera-settings>` section.

.. class:: frontera.contrib.backends.cassandra.BASE

Base class for Cassandra :class:`Backend <frontera.core.components.Backend>` objects.

.. class:: frontera.contrib.backends.cassandra.FIFO

Cassandra :class:`Backend <frontera.core.components.Backend>` implementation of `FIFO`_ algorithm.

.. class:: frontera.contrib.backends.cassandra.LIFO

Cassandra :class:`Backend <frontera.core.components.Backend>` implementation of `LIFO`_ algorithm.

.. class:: frontera.contrib.backends.cassandra.BFS

Cassandra :class:`Backend <frontera.core.components.Backend>` implementation of `BFS`_ algorithm.

.. class:: frontera.contrib.backends.cassandra.DFS

Cassandra :class:`Backend <frontera.core.components.Backend>` implementation of `DFS`_ algorithm.

.. class:: frontera.contrib.backends.cassandra.Distributed

Cassandra :class:`Backend <frontera.core.components.Distributed>` implementation of a distributed backend.

Revisiting backend
^^^^^^^^^^^^^^^^^^

Based on custom SQLAlchemy backend, and queue. Crawling starts with seeds. After seeds are crawled, every new
document will be scheduled for immediate crawling. On fetching every new document will be scheduled for recrawling
after fixed interval set by :setting:`SQLALCHEMYBACKEND_REVISIT_INTERVAL`.
There are two backends for Revisiting which are based on Cassandra and SqlAlchemy Backend and Queue. Crawling starts
with seeds. After seeds are crawled, every new document will be scheduled for immediate crawling. On fetching every new
document will be scheduled for recrawling after fixed interval set by :setting:`SQLALCHEMYBACKEND_REVISIT_INTERVAL` or
:setting:`CASSANDRABACKEND_REVISIT_INTERVAL`.

Current implementation of revisiting backend has no prioritization. During long term runs spider could go idle, because
there are no documents available for crawling, but there are documents waiting for their scheduled revisit time.
Expand All @@ -270,6 +309,9 @@ there are no documents available for crawling, but there are documents waiting f

Base class for SQLAlchemy :class:`Backend <frontera.core.components.Backend>` implementation of revisiting back-end.

.. class:: frontera.contrib.backends.cassandra.revisiting.Backend

Base class for Cassandra :class:`Backend <frontera.core.components.Backend>` implementation of revisiting back-end.

HBase backend
^^^^^^^^^^^^^
Expand Down Expand Up @@ -298,3 +340,4 @@ setting.
.. _SQLAlchemy: http://www.sqlalchemy.org/
.. _any databases supported by SQLAlchemy: http://docs.sqlalchemy.org/en/latest/dialects/index.html
.. _declarative sqlalchemy models: http://docs.sqlalchemy.org/en/latest/orm/extensions/declarative/index.html
.. _cassandra models: https://datastax.github.io/python-driver/cqlengine/models.html
124 changes: 123 additions & 1 deletion frontera/contrib/backends/__init__.py
@@ -1,9 +1,14 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from collections import OrderedDict
from datetime import datetime

from frontera import Backend
from frontera.core.components import States
from frontera.core.components import States, Queue as BaseQueue, DistributedBackend
from frontera.core.models import Request, Response
from frontera.utils.misc import utcnow_timestamp

from w3lib.util import to_native_str


class CommonBackend(Backend):
Expand Down Expand Up @@ -84,3 +89,120 @@ def request_error(self, request, error):

def finished(self):
return self.queue_size == 0


class CommonStorageBackend(CommonBackend):

@property
def queue(self):
return self._queue

@property
def metadata(self):
return self._metadata

@property
def states(self):
return self._states


class CommonDistributedStorageBackend(DistributedBackend):

@property
def queue(self):
return self._queue

@property
def metadata(self):
return self._metadata

@property
def states(self):
return self._states

def frontier_start(self):
for component in [self.metadata, self.queue, self.states]:
if component:
component.frontier_start()

def frontier_stop(self):
for component in [self.metadata, self.queue, self.states]:
if component:
component.frontier_stop()

def add_seeds(self, seeds):
self.metadata.add_seeds(seeds)

def get_next_requests(self, max_next_requests, **kwargs):
partitions = kwargs.pop('partitions', [0]) # TODO: Collect from all known partitions
batch = []
for partition_id in partitions:
batch.extend(self.queue.get_next_requests(max_next_requests, partition_id, **kwargs))
return batch

def page_crawled(self, response):
self.metadata.page_crawled(response)

def links_extracted(self, request, links):
self.metadata.links_extracted(request, links)

def request_error(self, request, error):
self.metadata.request_error(request, error)

def finished(self):
raise NotImplementedError


class CreateOrModifyPageMixin(object):

def _create_page(self, obj):
db_page = self.model()
db_page.fingerprint = to_native_str(obj.meta[b'fingerprint'])
db_page.url = obj.url
db_page.created_at = datetime.utcnow()
db_page.meta = obj.meta
db_page.depth = 0

if isinstance(obj, Request):
db_page.headers = obj.headers
db_page.method = to_native_str(obj.method)
db_page.cookies = obj.cookies
elif isinstance(obj, Response):
db_page.headers = obj.request.headers
db_page.method = to_native_str(obj.request.method)
db_page.cookies = obj.request.cookies
db_page.status_code = obj.status_code
return db_page

def _modify_page(self, obj):
db_page = self.cache[obj.meta[b'fingerprint']]
db_page.fetched_at = datetime.utcnow()
if isinstance(obj, Response):
db_page.headers = obj.request.headers
db_page.method = to_native_str(obj.request.method)
db_page.cookies = obj.request.cookies
db_page.status_code = obj.status_code
return db_page


class CommonRevisitingStorageBackendMixin(object):

def _schedule(self, requests):
batch = []
for request in requests:
if request.meta[b'state'] in [States.NOT_CRAWLED]:
request.meta[b'crawl_at'] = utcnow_timestamp()
elif request.meta[b'state'] in [States.CRAWLED, States.ERROR]:
request.meta[b'crawl_at'] = utcnow_timestamp() + self.interval
else:
continue # QUEUED
batch.append((request.meta[b'fingerprint'], self._get_score(request), request, True))
self.queue.schedule(batch)
self.metadata.update_score(batch)
self.queue_size += len(batch)

def page_crawled(self, response):
super(CommonRevisitingStorageBackendMixin, self).page_crawled(response)
self.states.set_states(response.request)
self._schedule([response.request])
self.states.update_cache(response.request)