/
__init__.py
145 lines (112 loc) · 5.18 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from __future__ import absolute_import
import six
from cassandra.cqlengine import connection
from cassandra.cqlengine.management import drop_table
from frontera.contrib.backends import (CommonDistributedStorageBackend,
CommonStorageBackend)
from frontera.contrib.backends.cassandra.components import (Metadata,
BroadCrawlingQueue,
Queue, States)
from frontera.utils.misc import load_object
class CassandraBackend(CommonStorageBackend):
def __init__(self, manager):
self.manager = manager
settings = manager.settings
cluster_hosts = settings.get('CASSANDRABACKEND_CLUSTER_HOSTS')
cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT')
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
models = settings.get('CASSANDRABACKEND_MODELS')
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE')
self.models = dict([(name, load_object(cls)) for name, cls in six.iteritems(models)])
cluster_kwargs = {
'port': cluster_port,
'compression': True,
}
if not connection.cluster:
connection.setup(cluster_hosts, keyspace, **cluster_kwargs)
connection.session.default_timeout = settings.get('CASSANDRABACKEND_REQUEST_TIMEOUT')
if drop_all_tables:
for name, table in six.iteritems(self.models):
drop_table(table)
self._metadata = Metadata(self.models['MetadataModel'], settings.get('CASSANDRABACKEND_CACHE_SIZE'))
self._states = States(self.models['StateModel'], settings.get('STATE_CACHE_SIZE_LIMIT'))
self._queue = self._create_queue(settings)
def frontier_stop(self):
self.states.flush()
connection.unregister_connection('default')
def _create_queue(self, settings):
return Queue(self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
class FIFOBackend(CassandraBackend):
component_name = 'Cassandra FIFO Backend'
def _create_queue(self, settings):
return Queue(self.models['FifoOrLIfoQueueModel'],
settings.get('SPIDER_FEED_PARTITIONS'),
ordering='created')
class LIFOBackend(CassandraBackend):
component_name = 'Cassandra LIFO Backend'
def _create_queue(self, settings):
return Queue(self.models['FifoOrLIfoQueueModel'],
settings.get('SPIDER_FEED_PARTITIONS'),
ordering='created_desc')
class DFSBackend(CassandraBackend):
component_name = 'Cassandra DFS Backend'
def _create_queue(self, settings):
return Queue(self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
def _get_score(self, obj):
return -obj.meta[b'depth']
class BFSBackend(CassandraBackend):
component_name = 'Cassandra BFS Backend'
def _create_queue(self, settings):
return Queue(self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
def _get_score(self, obj):
return obj.meta[b'depth']
BASE = CassandraBackend
LIFO = LIFOBackend
FIFO = FIFOBackend
DFS = DFSBackend
BFS = BFSBackend
class Distributed(CommonDistributedStorageBackend):
def __init__(self, manager):
self.manager = manager
settings = manager.settings
cluster_hosts = settings.get('CASSANDRABACKEND_CLUSTER_HOSTS')
cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT')
models = settings.get('CASSANDRABACKEND_MODELS')
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE')
self.models = dict([(name, load_object(cls)) for name, cls in six.iteritems(models)])
cluster_kwargs = {
'port': cluster_port,
'compression': True,
}
if not connection.cluster:
connection.setup(cluster_hosts, keyspace, **cluster_kwargs)
connection.session.default_timeout = settings.get('CASSANDRABACKEND_REQUEST_TIMEOUT')
self._metadata = None
self._queue = None
self._states = None
@classmethod
def strategy_worker(cls, manager):
b = cls(manager)
settings = manager.settings
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
state_model = b.models['StateModel']
if drop_all_tables:
drop_table(state_model)
b._states = States(state_model, settings.get('STATE_CACHE_SIZE_LIMIT'))
return b
@classmethod
def db_worker(cls, manager):
b = cls(manager)
settings = manager.settings
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
metadata_model = b.models['MetadataModel']
queue_model = b.models['QueueModel']
if drop_all_tables:
drop_table(metadata_model)
drop_table(queue_model)
b._metadata = Metadata(metadata_model, settings.get('CASSANDRABACKEND_CACHE_SIZE'))
b._queue = BroadCrawlingQueue(queue_model, settings.get('SPIDER_FEED_PARTITIONS'))
return b
def frontier_stop(self):
super(Distributed, self).frontier_stop()
connection.unregister_connection('default')