Skip to content

Commit

Permalink
Add prometheus metrics to pg and redis operations (#1005)
Browse files Browse the repository at this point in the history
  • Loading branch information
vangheem authored and masipcat committed Dec 1, 2020
1 parent 19299c3 commit 126e16c
Show file tree
Hide file tree
Showing 11 changed files with 468 additions and 120 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Expand Up @@ -4,7 +4,8 @@ CHANGELOG
6.0.17 (unreleased)
-------------------

- Nothing changed yet.
- Add metrics to pg and redis operations
[vangheem]


6.0.16 (2020-11-27)
Expand Down
36 changes: 33 additions & 3 deletions guillotina/contrib/redis/dm.py
@@ -1,4 +1,5 @@
from guillotina import configure
from guillotina import metrics
from guillotina.contrib.redis import get_driver
from guillotina.files.adapter import DBDataManager
from guillotina.interfaces import IExternalFileStorageManager
Expand All @@ -10,6 +11,31 @@
import time


try:
import prometheus_client

REDIS_OPS = prometheus_client.Counter(
"guillotina_dm_redis_ops_total",
"Total count of ops by type of operation and the error if there was.",
labelnames=["type", "error"],
)
REDIS_OPS_PROCESSING_TIME = prometheus_client.Histogram(
"guillotina_dm_redis_ops_processing_time_seconds",
"Histogram of operations processing time by type (in seconds)",
labelnames=["type"],
)

class watch(metrics.watch):
def __init__(self, operation: str):
super().__init__(
counter=REDIS_OPS, histogram=REDIS_OPS_PROCESSING_TIME, labels={"type": operation},
)


except ImportError:
watch = metrics.watch # type: ignore


@configure.adapter(for_=IExternalFileStorageManager, provides=IUploadDataManager, name="redis")
class RedisFileDataManager(DBDataManager):

Expand All @@ -23,7 +49,8 @@ async def load(self):
if self._data is None:
redis = await self.get_redis()
key = self.get_key()
data = await redis.get(key)
with watch("get"):
data = await redis.get(key)
if not data:
self._data = {}
else:
Expand All @@ -42,7 +69,9 @@ async def _save(self):
redis = await self.get_redis()
key = self.get_key()
self._data["last_activity"] = time.time()
await redis.set(key, orjson.dumps(self._data, default=guillotina_json_default), expire=self._ttl)
value = orjson.dumps(self._data, default=guillotina_json_default)
with watch("set"):
await redis.set(key, value, expire=self._ttl)

async def get_redis(self):
if self._redis is None:
Expand All @@ -66,4 +95,5 @@ async def _delete_key(self):
# and clear the cache key
redis = await self.get_redis()
key = self.get_key()
await redis.delete(key)
with watch("delete"):
await redis.delete(key)
54 changes: 44 additions & 10 deletions guillotina/contrib/redis/driver.py
Expand Up @@ -6,6 +6,7 @@
raise

from guillotina import app_settings
from guillotina import metrics
from guillotina.contrib.redis.exceptions import NoRedisConfigured
from typing import Any
from typing import List
Expand All @@ -16,6 +17,31 @@
import logging


try:
import prometheus_client

REDIS_OPS = prometheus_client.Counter(
"guillotina_cache_redis_ops_total",
"Total count of ops by type of operation and the error if there was.",
labelnames=["type", "error"],
)
REDIS_OPS_PROCESSING_TIME = prometheus_client.Histogram(
"guillotina_cache_redis_ops_processing_time_seconds",
"Histogram of operations processing time by type (in seconds)",
labelnames=["type"],
)

class watch(metrics.watch):
def __init__(self, operation: str):
super().__init__(
counter=REDIS_OPS, histogram=REDIS_OPS_PROCESSING_TIME, labels={"type": operation},
)


except ImportError:
watch = metrics.watch # type: ignore


logger = logging.getLogger("guillotina.contrib.redis")


Expand Down Expand Up @@ -45,10 +71,12 @@ async def initialize(self, loop):
@backoff.on_exception(backoff.expo, (OSError,), max_time=30, max_tries=4)
async def _connect(self):
settings = app_settings["redis"]
self._pool = await aioredis.create_pool(
(settings["host"], settings["port"]), **settings["pool"], loop=self._loop
)
self._conn = await self._pool.acquire()
with watch("create_pool"):
self._pool = await aioredis.create_pool(
(settings["host"], settings["port"]), **settings["pool"], loop=self._loop
)
with watch("acquire_conn"):
self._conn = await self._pool.acquire()
self._pubsub_subscriptor = aioredis.Redis(self._conn)

async def finalize(self):
Expand All @@ -72,18 +100,21 @@ async def set(self, key: str, data: str, *, expire: Optional[int] = None):
args: List[Any] = []
if expire is not None:
args[:] = [b"EX", expire]
ok = await self._pool.execute(b"SET", key, data, *args)
with watch("set"):
ok = await self._pool.execute(b"SET", key, data, *args)
assert ok == b"OK", ok

async def get(self, key: str) -> str:
if self._pool is None:
raise NoRedisConfigured()
return await self._pool.execute(b"GET", key)
with watch("get"):
return await self._pool.execute(b"GET", key)

async def delete(self, key: str):
if self._pool is None:
raise NoRedisConfigured()
await self._pool.execute(b"DEL", key)
with watch("delete"):
await self._pool.execute(b"DEL", key)

async def expire(self, key: str, expire: int):
if self._pool is None:
Expand All @@ -100,7 +131,8 @@ async def delete_all(self, keys: List[str]):
raise NoRedisConfigured()
for key in keys:
try:
await self._pool.execute(b"DEL", key)
with watch("delete_many"):
await self._pool.execute(b"DEL", key)
logger.debug("Deleted cache keys {}".format(keys))
except Exception:
logger.warning("Error deleting cache keys {}".format(keys), exc_info=True)
Expand All @@ -111,14 +143,16 @@ async def flushall(self, *, async_op: Optional[bool] = False):
ops = [b"FLUSHDB"]
if async_op:
ops.append(b"ASYNC")
await self._pool.execute(*ops)
with watch("flush"):
await self._pool.execute(*ops)

# PUBSUB API

async def publish(self, channel_name: str, data: str):
if self._pool is None:
raise NoRedisConfigured()
await self._pool.execute(b"publish", channel_name, data)
with watch("publish"):
await self._pool.execute(b"publish", channel_name, data)

async def unsubscribe(self, channel_name: str):
if self._pubsub_subscriptor is None:
Expand Down

0 comments on commit 126e16c

Please sign in to comment.