Skip to content

Commit

Permalink
Merge pull request #12 from olxbr/feature/better-integration
Browse files Browse the repository at this point in the history
Integration with asyncworker and Prometheus refactored
  • Loading branch information
gligneul committed Feb 17, 2020
2 parents fcc4943 + 0256f95 commit fab1688
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 226 deletions.
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ labels = dict(
team_name="my_team"
)
healthcheck = Healthcheck(barterdude) # automatic and customizable healthcheck
prometheus = Prometheus(barterdude, labels), # automatic and customizable Prometheus integration
prometheus = Prometheus(barterdude, labels) # automatic and customizable Prometheus integration

monitor = Monitor(
healthcheck,
Expand All @@ -58,20 +58,28 @@ monitor = Monitor(
Logging() # automatic and customizable logging
)

my_metric = prometheus.metrics.counter(name="fail", description="fail again") # It's the same as https://github.com/prometheus/client_python

@barterdude.consume_amqp(["queue1", "queue2"], monitor)

@barterdude.consume_amqp(
["queue1", "queue2"],
monitor,
coroutines = 10, # number of coroutines spawned to consume messages (1 per message)
bulk_flush_interval = 60.0, # max waiting time for messages to start process n_coroutines
requeue_on_fail = True # should retry or not the message
)
async def your_consumer(msg: RabbitMQMessage): # you receive only one message and we parallelize processing for you
await barterdude.publish_amqp(
exchange="my_exchange",
data=msg.body
)
if msg.body == "fail":
prometheus.count(name="fail", description="fail again") # you can use prometheus metrics
my_metric.inc() # you can use prometheus metrics
healthcheck.force_fail() # you can use your hooks inside consumer too
msg.reject(requeue=True) # You can force to reject a message, exactly equal https://b2wdigital.github.io/async-worker/src/asyncworker/asyncworker.rabbitmq.html#asyncworker.rabbitmq.message.RabbitMQMessage
msg.reject(requeue=False) # You can force to reject a message, exactly equal https://b2wdigital.github.io/async-worker/src/asyncworker/asyncworker.rabbitmq.html#asyncworker.rabbitmq.message.RabbitMQMessage

if msg.body == "exception":
raise Exception() # this will reject the message WITHOUT requeue to avoid processing loop
raise Exception() # this will reject the message and requeue

# if everything is fine, than message automatically is accepted

Expand Down
14 changes: 7 additions & 7 deletions barterdude/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from asyncio import gather
from asyncworker import App, Options, RouteTypes
from asyncworker import App, RouteTypes
from asyncworker.options import Options
from asyncworker.connections import AMQPConnection
from asyncworker.rabbitmq.message import RabbitMQMessage
from typing import Iterable
Expand Down Expand Up @@ -35,9 +36,9 @@ def consume_amqp(
self,
queues: Iterable[str],
monitor: Monitor = Monitor(),
bulk_size: int = 1,
coroutines: int = 10,
bulk_flush_interval: float = 60.0,
**kwargs,
requeue_on_fail: bool = True
):
def decorator(f):
async def process_message(message: RabbitMQMessage):
Expand All @@ -46,18 +47,17 @@ async def process_message(message: RabbitMQMessage):
await f(message)
except Exception as error:
await monitor.dispatch_on_fail(message, error)
message.reject()
message.reject(requeue_on_fail)
else:
await monitor.dispatch_on_success(message)

@self.__app.route(
queues,
type=RouteTypes.AMQP_RABBITMQ,
options={
Options.BULK_SIZE: bulk_size,
Options.BULK_SIZE: coroutines,
Options.BULK_FLUSH_INTERVAL: bulk_flush_interval
},
**kwargs
}
)
async def wrapper(messages: RabbitMQMessage):
await gather(*map(process_message, messages))
Expand Down
97 changes: 19 additions & 78 deletions barterdude/hooks/metrics/prometheus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@
from asyncworker.rabbitmq.message import RabbitMQMessage
from barterdude import BarterDude
from barterdude.hooks import HttpHook
from barterdude.hooks.metrics.prometheus.definition import Definition
from barterdude.hooks.metrics.prometheus.partial_metric import partial_metric
from barterdude.hooks.metrics.prometheus.definitions import Definitions
from barterdude.hooks.metrics.prometheus.metrics import Metrics
try:
from prometheus_client import (
CollectorRegistry,
REGISTRY,
generate_latest,
CONTENT_TYPE_LATEST,
Counter,
Gauge,
Summary,
Histogram,
Info,
Enum
)
except ImportError: # pragma: no cover
raise ImportError("""
Expand All @@ -27,86 +22,31 @@

class Prometheus(HttpHook):

MESSAGE_UNITS = "messages"
TIME_UNITS = "seconds"
NAMESPACE = "barterdude"
D_BEFORE_CONSUME = "before_consume"
D_SUCCESS = "success"
D_FAIL = "fail"
D_TIME_MEASURE = "time_measure"

def __init__(
self,
barterdude: BarterDude,
labels: dict,
path: str = "/metrics",
registry: CollectorRegistry = CollectorRegistry(),
definition: Definition = Definition()
registry: CollectorRegistry = REGISTRY

):
self.__registry = registry
self.__labels = labels
self._d = definition
self.__metrics = Metrics(self.__registry)
self.__definitions = Definitions(
registry, self.__metrics, list(labels.keys())
)
self._msg_start = {}
self.__prepare_metrics()
self.__definitions.save_metrics()
super(Prometheus, self).__init__(barterdude, path)

def __prepare_metrics(self):
self._d.prepare_before_consume(
self.D_BEFORE_CONSUME,
labelnames=list(self.__labels.keys()),
namespace=self.NAMESPACE,
unit=self.MESSAGE_UNITS,
registry=self.__registry
)
self._d.prepare_on_complete(
self.D_SUCCESS,
labelnames=list(self.__labels.keys()),
namespace=self.NAMESPACE,
unit=self.MESSAGE_UNITS,
registry=self.__registry
)
self._d.prepare_on_complete(
self.D_FAIL,
labelnames=list(self.__labels.keys()),
namespace=self.NAMESPACE,
unit=self.MESSAGE_UNITS,
registry=self.__registry
)
self._d.prepare_time_measure(
self.D_TIME_MEASURE,
labelnames=list(self.__labels.keys()),
namespace=self.NAMESPACE,
unit=self.TIME_UNITS,
registry=self.__registry
)

@property
def counter(self):
return partial_metric(Counter, self.__registry)

@property
def gauge(self):
return partial_metric(Gauge, self.__registry)

@property
def summary(self):
return partial_metric(Summary, self.__registry)

@property
def histogram(self):
return partial_metric(Histogram, self.__registry)

@property
def info(self):
return partial_metric(Info, self.__registry)

@property
def enum(self):
return partial_metric(Enum, self.__registry)
def metrics(self):
return self.__metrics

async def before_consume(self, message: RabbitMQMessage):
hash_message = id(message)
self._d.metrics[self.D_BEFORE_CONSUME].labels(
self.metrics[self.__definitions.BEFORE_CONSUME].labels(
**self.__labels
).inc()
self._msg_start[hash_message] = time.time()
Expand All @@ -121,16 +61,17 @@ async def _on_complete(self,
labels = self.__labels.copy()
labels["state"] = state
labels["error"] = str(type(error)) if (error) else None
self._d.metrics[state].labels(**labels).inc()
self._d.metrics[self.D_TIME_MEASURE].labels(**labels).observe(
final_time - self._msg_start.pop(hash_message)
self.metrics[state].labels(**labels).inc()
self.metrics[self.__definitions.TIME_MEASURE].labels(
**labels).observe(
final_time - self._msg_start.pop(hash_message)
)

async def on_success(self, message: RabbitMQMessage):
await self._on_complete(message, self.D_SUCCESS)
await self._on_complete(message, self.__definitions.SUCCESS)

async def on_fail(self, message: RabbitMQMessage, error: Exception):
await self._on_complete(message, self.D_FAIL, error)
await self._on_complete(message, self.__definitions.FAIL, error)

async def __call__(self, req: web.Request):
return web.Response(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
from prometheus_client import (
CollectorRegistry,
Counter,
Expand All @@ -7,26 +8,64 @@
from barterdude.hooks.metrics.prometheus.metrics import Metrics


class Definition:
class Definitions:

MESSAGE_UNITS = "messages"
TIME_UNITS = "seconds"
NAMESPACE = "barterdude"
BEFORE_CONSUME = "before_consume"
SUCCESS = "success"
FAIL = "fail"
TIME_MEASURE = "time_measure"

def __init__(
self,
histogram_buckets: tuple = Histogram.DEFAULT_BUCKETS,
metrics: Metrics = Metrics()
registry: CollectorRegistry,
metrics: Metrics,
labelkeys: Iterable[str],
time_buckets: tuple = Histogram.DEFAULT_BUCKETS
):
self.__histogram_buckets = histogram_buckets
self.__registry = registry
self.__labelkeys = labelkeys
self.__time_buckets = time_buckets
self.__metrics = metrics

@property
def metrics(self):
return self.__metrics
def save_metrics(self):
self._prepare_before_consume(
self.BEFORE_CONSUME,
labelnames=copy.copy(self.__labelkeys),
namespace=self.NAMESPACE,
unit=self.MESSAGE_UNITS,
registry=self.__registry
)
self._prepare_on_complete(
self.SUCCESS,
labelnames=copy.copy(self.__labelkeys),
namespace=self.NAMESPACE,
unit=self.MESSAGE_UNITS,
registry=self.__registry
)
self._prepare_on_complete(
self.FAIL,
labelnames=copy.copy(self.__labelkeys),
namespace=self.NAMESPACE,
unit=self.MESSAGE_UNITS,
registry=self.__registry
)
self._prepare_time_measure(
self.TIME_MEASURE,
labelnames=copy.copy(self.__labelkeys),
namespace=self.NAMESPACE,
unit=self.TIME_UNITS,
registry=self.__registry
)

def prepare_before_consume(
def _prepare_before_consume(
self, name: str, labelnames: Iterable[str] = (),
namespace: str = "", unit: str = "",
registry: CollectorRegistry = CollectorRegistry()):

self.metrics[name] = Counter(
self.__metrics[name] = Counter(
name="received_number_before_consume",
documentation="Messages that worker received from queue(s)",
labelnames=labelnames,
Expand All @@ -35,14 +74,14 @@ def prepare_before_consume(
registry=registry
)

def prepare_on_complete(
def _prepare_on_complete(
self, state: str, labelnames: Iterable[str] = (),
namespace: str = "", unit: str = "",
registry: CollectorRegistry = CollectorRegistry()):

labelnames += ["state", "error"]

self.metrics[state] = Counter(
self.__metrics[state] = Counter(
name=f"consumed_number_on_{state}",
documentation=(f"Messages that worker consumed with {state}"
" from queue(s)"),
Expand All @@ -52,18 +91,18 @@ def prepare_on_complete(
registry=registry
)

def prepare_time_measure(
def _prepare_time_measure(
self, name: str, labelnames: Iterable[str] = (),
namespace: str = "", unit: str = "",
registry: CollectorRegistry = CollectorRegistry()):

labelnames += ["state", "error"]

self.metrics[name] = Histogram(
self.__metrics[name] = Histogram(
name="time_spent_processing_message",
documentation=("Time spent when function was "
"processing a message"),
buckets=self.__histogram_buckets,
buckets=self.__time_buckets,
labelnames=labelnames,
namespace=namespace,
unit=unit,
Expand Down
36 changes: 36 additions & 0 deletions barterdude/hooks/metrics/prometheus/metrics.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,44 @@
from prometheus_client.metrics import MetricWrapperBase
from prometheus_client import (
Counter,
Gauge,
Summary,
Histogram,
Info,
Enum
)
from functools import partial


class Metrics(dict):

def __init__(self, registry):
self.__registry = registry

@property
def counter(self) -> Counter.__class__:
return partial(Counter, registry=self.__registry)

@property
def gauge(self) -> Gauge.__class__:
return partial(Gauge, registry=self.__registry)

@property
def summary(self) -> Summary.__class__:
return partial(Summary, registry=self.__registry)

@property
def histogram(self) -> Histogram.__class__:
return partial(Histogram, registry=self.__registry)

@property
def info(self) -> Info.__class__:
return partial(Info, registry=self.__registry)

@property
def enum(self) -> Enum.__class__:
return partial(Enum, registry=self.__registry)

def __setitem__(self, name: str, metric: MetricWrapperBase):
if name in self:
value = self.__getitem__(name)
Expand Down
Loading

0 comments on commit fab1688

Please sign in to comment.