This repository has been archived by the owner on Sep 20, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
scrapy_prometheus.py
160 lines (125 loc) · 5.88 KB
/
scrapy_prometheus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import functools
import socket
from collections import defaultdict
import prometheus_client
from scrapy import statscollectors, signals
METRIC_COUNTER = prometheus_client.Counter
METRIC_GAUGE = prometheus_client.Gauge
def push_to_gateway(pushgateway, registry, method='POST', timeout=5, job='scrapy', grouping_key=None):
if method.upper() == 'POST':
push = prometheus_client.pushadd_to_gateway
else:
push = prometheus_client.push_to_gateway
return push(pushgateway, job=job, grouping_key=grouping_key, timeout=timeout, registry=registry)
class InvalidMetricType(TypeError):
pass
def _forced_spider(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
"""
:param PrometheusStatsCollector self:
"""
spider = getattr(self, '_forced_spider', None)
if spider:
kwargs.setdefault('spider', spider)
return func(self, *args, **kwargs)
return wrapper
class PrometheusStatsCollector(statscollectors.StatsCollector):
def __init__(self, crawler):
"""
:param scrapy.crawler.Crawler crawler:
"""
self.crawler = crawler
self.registries = defaultdict(lambda: prometheus_client.CollectorRegistry())
self.crawler.signals.connect(self.engine_stopped, signal=signals.engine_stopped)
super(PrometheusStatsCollector, self).__init__(crawler)
def forced_spider(self, spider):
"""
Force this spider to be used when writing metrics.
:param scrapy.spider.Spider spider:
"""
self._forced_spider = spider
def get_registry(self, spider):
"""
Return CollectorRegistry associated with spider. To get default CollectorRegistry, pass None.
:param scrapy.spider.Spider spider:
:rtype: prometheus_client.CollectorRegistry
"""
return self.registries[getattr(spider, 'name', None)]
# noinspection PyProtectedMember
def get_metric(self, key, metric_type, spider=None, labels=None):
prefix = self.crawler.settings.get('PROMETHEUS_METRIC_PREFIX', 'scrapy_prometheus')
name = '%s_%s' % (prefix, key.replace('/', '_'))
registry = self.get_registry(spider)
if name not in registry._names_to_collectors:
metric, created = metric_type(name, key, labels, registry=registry), True
else:
metric, created = registry._names_to_collectors[name], False
if not hasattr(metric_type, '__wrapped__') or hasattr(metric_type, '__wrapped__') and not isinstance(metric,
metric_type.__wrapped__):
if not self.crawler.settings.getbool('PROMETHEUS_SUPPRESS_TYPE_CHECK', False):
raise InvalidMetricType('Wrong type %s for metric %s, which is %s' % (
metric_type.__wrapped__.__name__, name, metric.__class__.__name__
))
else:
return None, created
return metric, created
@_forced_spider
def set_value(self, key, value, spider=None):
super(PrometheusStatsCollector, self).set_value(key, value, spider)
if isinstance(value, (int, float)):
metric, _ = self.get_metric(key, METRIC_GAUGE, spider=spider)
if metric:
metric.set(value)
@_forced_spider
def inc_value(self, key, count=1, start=0, spider=None):
super(PrometheusStatsCollector, self).inc_value(key, count, start, spider)
if isinstance(count, (int, float)):
metric, _ = self.get_metric(key, METRIC_COUNTER, spider=spider)
if metric:
metric.inc(count)
@_forced_spider
def max_value(self, key, value, spider=None):
super(PrometheusStatsCollector, self).max_value(key, value, spider)
if isinstance(value, (int, float)):
metric, _ = self.get_metric(key, METRIC_GAUGE, spider=spider)
if metric:
metric._value.set(max(metric._value.get(), value))
@_forced_spider
def min_value(self, key, value, spider=None):
super(PrometheusStatsCollector, self).min_value(key, value, spider)
if isinstance(value, (int, float)):
metric, _ = self.get_metric(key, METRIC_GAUGE, spider=spider)
if metric:
metric._value.set(min(metric._value.get(), value))
def get_grouping_key(self, spider=None):
grouping_key = {
'spider': spider.name if spider else ''
}
try:
grouping_key['instance'] = socket.gethostname()
except:
grouping_key['instance'] = ""
return grouping_key
def _persist_stats(self, stats, spider=None):
super(PrometheusStatsCollector, self)._persist_stats(stats, spider)
if spider and spider.name not in self.registries:
spider.logger.warning('%s spider not found in collector registries', spider.name)
return
try:
push_to_gateway(
pushgateway=self.crawler.settings.get('PROMETHEUS_PUSHGATEWAY', '127.0.0.1:9091'),
registry=self.registries[spider.name if spider else None],
method=self.crawler.settings.get('PROMETHEUS_PUSH_METHOD', 'POST'),
timeout=self.crawler.settings.get('PROMETHEUS_PUSH_TIMEOUT', 5),
job=self.crawler.settings.get('PROMETHEUS_JOB', 'scrapy'),
grouping_key=self.crawler.settings.get('PROMETHEUS_GROUPING_KEY', self.get_grouping_key(spider))
)
except:
if spider:
spider.logger.exception('Failed to push "%s" spider metrics to pushgateway', spider.name)
else:
if spider:
spider.logger.info('Pushed "%s" spider metrics to pushgateway', spider.name)
def engine_stopped(self):
self._persist_stats(self._stats, None)