Skip to content

Commit

Permalink
added tests for cassandra distributed backend
Browse files Browse the repository at this point in the history
  • Loading branch information
voith committed Nov 11, 2016
1 parent 67ca6fa commit 9c316ea
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 25 deletions.
21 changes: 13 additions & 8 deletions frontera/contrib/backends/cassandra/__init__.py
Expand Up @@ -99,7 +99,6 @@ def __init__(self, manager):
settings = manager.settings
cluster_hosts = settings.get('CASSANDRABACKEND_CLUSTER_HOSTS')
cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT')
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
models = settings.get('CASSANDRABACKEND_MODELS')
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE')

Expand All @@ -112,10 +111,6 @@ def __init__(self, manager):
connection.setup(cluster_hosts, keyspace, **cluster_kwargs)
connection.session.default_timeout = settings.get('CASSANDRABACKEND_REQUEST_TIMEOUT')

if drop_all_tables:
for name, table in six.iteritems(self.models):
drop_table(table)

self._metadata = None
self._queue = None
self._states = None
Expand All @@ -124,15 +119,25 @@ def __init__(self, manager):
def strategy_worker(cls, manager):
b = cls(manager)
settings = manager.settings
b._states = States(b.models['StateModel'], settings.get('STATE_CACHE_SIZE_LIMIT'))
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
state_model = b.models['StateModel']
if drop_all_tables:
drop_table(state_model)
b._states = States(state_model, settings.get('STATE_CACHE_SIZE_LIMIT'))
return b

@classmethod
def db_worker(cls, manager):
b = cls(manager)
settings = manager.settings
b._metadata = Metadata(b.models['MetadataModel'], settings.get('CASSANDRABACKEND_CACHE_SIZE'))
b._queue = BroadCrawlingQueue(b.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
metadata_model = b.models['MetadataModel']
queue_model = b.models['QueueModel']
if drop_all_tables:
drop_table(metadata_model)
drop_table(queue_model)
b._metadata = Metadata(metadata_model, settings.get('CASSANDRABACKEND_CACHE_SIZE'))
b._queue = BroadCrawlingQueue(queue_model, settings.get('SPIDER_FEED_PARTITIONS'))
return b

def frontier_stop(self):
Expand Down
2 changes: 1 addition & 1 deletion frontera/contrib/backends/cassandra/components.py
Expand Up @@ -172,7 +172,7 @@ def schedule(self, batch):
self.batch.execute()

def count(self):
return self.queue_model.all().count()
return self.queue_model.objects.count()


class BroadCrawlingQueue(Queue):
Expand Down
2 changes: 1 addition & 1 deletion frontera/contrib/backends/cassandra/revisiting.py
Expand Up @@ -89,7 +89,7 @@ def _create_queue_obj(self, fprint, score, request, partition_id, host_crc32, sc
return q

def count(self):
return self.queue_model.all().count()
return self.queue_model.objects.count()


class Backend(CommonRevisitingStorageBackendMixin, CassandraBackend):
Expand Down
71 changes: 56 additions & 15 deletions tests/contrib/backends/cassandra/test_backend_cassandra.py
Expand Up @@ -9,7 +9,7 @@
drop_keyspace, drop_table,
sync_table)

from frontera.contrib.backends.cassandra import CassandraBackend
from frontera.contrib.backends.cassandra import CassandraBackend, Distributed
from frontera.contrib.backends.cassandra.models import (FifoOrLIfoQueueModel,
MetadataModel,
QueueModel, StateModel)
Expand All @@ -29,7 +29,7 @@
r4 = r3.copy()


class BaseCassandraTest(object):
class CassandraConfig(object):

def setUp(self):
settings = Settings()
Expand All @@ -53,7 +53,7 @@ def _set_global_connection(self, hosts, port, timeout):
connection.session.default_timeout = timeout


class TestCassandraBackendModels(BaseCassandraTest, unittest.TestCase):
class TestCassandraBackendModels(CassandraConfig, unittest.TestCase):

def test_pickled_fields(self):
sync_table(MetadataModel)
Expand Down Expand Up @@ -131,7 +131,25 @@ def assert_db_values(self, model, _filter, fields):
self.assertEqual(stored_value, original_value)


class TestCassandraBackend(BaseCassandraTest, unittest.TestCase):
class TestCassandraBackend(CassandraConfig, unittest.TestCase):

def init_backend(self):
self.backend = CassandraBackend(self.manager)

@property
def metadata(self):
self.init_backend()
return self.backend.metadata

@property
def states(self):
self.init_backend()
return self.backend.states

@property
def queue(self):
self.init_backend()
return self.backend.queue

def _get_tables(self):
query = 'SELECT table_name FROM system_schema.tables WHERE keyspace_name = \'{}\''.format(self.keyspace)
Expand All @@ -141,7 +159,7 @@ def _get_tables(self):
def test_tables_created(self):
tables_before = self._get_tables()
self.assertEqual(tables_before, [])
CassandraBackend(self.manager)
self.init_backend()
tables_after = self._get_tables()
self.assertEqual(set(tables_after), set(['metadata', 'states', 'queue']))

Expand All @@ -158,14 +176,14 @@ def _get_state_data():
rows_before = _get_state_data()
self.assertEqual(rows_before.count(), 1)
self.manager.settings.CASSANDRABACKEND_DROP_ALL_TABLES = True
CassandraBackend(self.manager)
self.assertEqual(set(tables_before), set(['metadata', 'states', 'queue']))
self.init_backend()
tables_after = self._get_tables()
self.assertEqual(set(tables_after), set(['metadata', 'states', 'queue']))
rows_after = _get_state_data()
self.assertEqual(rows_after.count(), 0)

def test_metadata(self):
b = CassandraBackend(self.manager)
metadata = b.metadata
metadata = self.metadata
metadata.add_seeds([r1, r2, r3])
meta_qs = MetadataModel.objects.all()
self.assertEqual(set([r1.url, r2.url, r3.url]), set([m.url for m in meta_qs]))
Expand All @@ -183,10 +201,9 @@ def test_metadata(self):
self.assertEqual(meta_qs.count(), 3)

def test_state(self):
b = CassandraBackend(self.manager)
state = b.states
state = self.states
state.set_states([r1, r2, r3])
self.assertEqual([r.meta[b'state'] for r in [r1, r2, r3]], [States.NOT_CRAWLED]*3)
self.assertEqual([r.meta[b'state'] for r in [r1, r2, r3]], [States.NOT_CRAWLED] * 3)
state.update_cache([r1, r2, r3])
self.assertDictEqual(state._cache, {b'10': States.NOT_CRAWLED,
b'11': States.NOT_CRAWLED,
Expand All @@ -209,11 +226,11 @@ def test_state(self):

def test_queue(self):
self.manager.settings.SPIDER_FEED_PARTITIONS = 2
b = CassandraBackend(self.manager)
queue = b.queue
queue = self.queue
batch = [('10', 0.5, r1, True), ('11', 0.6, r2, True),
('12', 0.7, r3, True)]
queue.schedule(batch)
self.assertEqual(queue.count(), 3)
self.assertEqual(set([r.url for r in queue.get_next_requests(10, 0,
min_requests=3,
min_hosts=1,
Expand All @@ -224,10 +241,34 @@ def test_queue(self):
min_hosts=1,
max_requests_per_host=10)]),
set([r1.url, r2.url]))
self.assertEqual(queue.count(), 0)


class TestCassandraDistributedBackend(TestCassandraBackend):

def init_backend(self):
self.backend = Distributed(self.manager)
self.strategy_worker = self.backend.strategy_worker(self.manager)
self.db_worker = self.backend.db_worker(self.manager)

@property
def metadata(self):
self.init_backend()
return self.db_worker.metadata

@property
def states(self):
self.init_backend()
return self.strategy_worker.states

@property
def queue(self):
self.init_backend()
return self.db_worker.queue


class BaseCassandraIntegrationTests(object):
obj = BaseCassandraTest()
obj = CassandraConfig()

def setup_backend(self, method):
self.obj.setUp()
Expand Down

0 comments on commit 9c316ea

Please sign in to comment.