Skip to content

Commit

Permalink
[Alerts] Populate event cache upon api restart (#5683)
Browse files Browse the repository at this point in the history
  • Loading branch information
yanburman committed Jun 5, 2024
1 parent 2e5f204 commit f791264
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 6 deletions.
5 changes: 4 additions & 1 deletion mlrun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,10 @@
"grafana_url": "",
"alerts": {
# supported modes: "enabled", "disabled".
"mode": "enabled"
"mode": "enabled",
# maximum number of alerts we allow to be configured.
# user will get an error when exceeding this
"max_allowed": 1000,
},
"auth_with_client_id": {
"enabled": False,
Expand Down
2 changes: 2 additions & 0 deletions server/api/api/endpoints/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ async def delete_alert(
project=project, name=name, request=request
)

logger.debug("Deleting alert", project=project, name=name)

await run_in_threadpool(
server.api.crud.Alerts().delete_alert, db_session, project, name
)
Expand Down
51 changes: 49 additions & 2 deletions server/api/crud/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

import sqlalchemy.orm

import mlrun.common.schemas
import mlrun.utils.singleton
import server.api.api.utils
import server.api.utils.helpers
import server.api.utils.singletons.db
from mlrun.config import config as mlconfig
from mlrun.utils import logger
from server.api.utils.notification_pusher import AlertNotificationPusher

Expand All @@ -45,12 +47,22 @@ def store_alert(

if alert is not None:
self._delete_notifications(alert)
else:
num_alerts = (
server.api.utils.singletons.db.get_db().get_num_configured_alerts(
session
)
)
if num_alerts >= mlconfig.alerts.max_allowed:
raise mlrun.errors.MLRunPreconditionFailedError(
f"Allowed number of alerts exceeded: {num_alerts}"
)

self._validate_and_mask_notifications(alert_data)

if alert is not None:
for kind in alert.trigger.events:
server.api.crud.Events().remove_event_configuration(project, kind)
server.api.crud.Events().remove_event_configuration(project, kind, name)
alert_data.created = alert.created
alert_data.id = alert.id

Expand Down Expand Up @@ -116,7 +128,7 @@ def delete_alert(
return

for kind in alert.trigger.events:
server.api.crud.Events().remove_event_configuration(project, kind)
server.api.crud.Events().remove_event_configuration(project, kind, name)

server.api.utils.singletons.db.get_db().delete_alert(session, project, name)

Expand Down Expand Up @@ -191,6 +203,41 @@ def process_event(
event=event_data.entity.ids[0],
)

def populate_event_cache(self, session: sqlalchemy.orm.Session):
try:
self._try_populate_event_cache(session)
except Exception as exc:
logger.error(
"Error populating event cache for alerts. Transitioning state to offline!",
exc=mlrun.errors.err_to_str(exc),
)
mlconfig.httpdb.state = mlrun.common.schemas.APIStates.offline
return

server.api.crud.Events().cache_initialized = True
logger.debug("Finished populating event cache for alerts")

@staticmethod
def _try_populate_event_cache(session: sqlalchemy.orm.Session):
for alert in server.api.utils.singletons.db.get_db().get_all_alerts(session):
for event_name in alert.trigger.events:
server.api.crud.Events().add_event_configuration(
alert.project, event_name, alert.name
)

def process_event_no_cache(
self,
session: sqlalchemy.orm.Session,
event_name: str,
event_data: mlrun.common.schemas.Event,
):
for alert in server.api.utils.singletons.db.get_db().get_all_alerts(session):
for config_event_name in alert.trigger.events:
if config_event_name == event_name:
self.process_event(
alert.project, event_name, alert.name, event_data
)

@staticmethod
def _event_entity_matches(alert_entity, event_entity):
if "*" in alert_entity.ids:
Expand Down
16 changes: 13 additions & 3 deletions server/api/crud/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class Events(
):
# we cache alert names based on project and event name as key
_cache: dict[(str, str), list[str]] = {}
cache_initialized = False

@staticmethod
def is_valid_event(project: str, event_data: mlrun.common.schemas.Event):
Expand All @@ -41,13 +42,16 @@ def is_valid_event(project: str, event_data: mlrun.common.schemas.Event):
def add_event_configuration(self, project, name, alert_name):
self._cache.setdefault((project, name), []).append(alert_name)

def remove_event_configuration(self, project, name):
del self._cache[(project, name)]
def remove_event_configuration(self, project, name, alert_name):
alerts = self._cache[(project, name)]
alerts.remove(alert_name)
if len(alerts) == 0:
self._cache.pop((project, name))

def delete_project_alert_events(self, project):
to_delete = [name for proj, name in self._cache if proj == project]
for name in to_delete:
self.remove_event_configuration(project, name)
self._cache.pop((project, name))

def process_event(
self,
Expand All @@ -66,6 +70,12 @@ def process_event(

event_data.timestamp = datetime.datetime.now(datetime.timezone.utc)

if not self.cache_initialized:
server.api.crud.Alerts().process_event_no_cache(
session, event_name, event_data
)
return

try:
for name in self._cache[(project, event_name)]:
server.api.crud.Alerts().process_event(
Expand Down
8 changes: 8 additions & 0 deletions server/api/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,14 @@ def list_alert_templates(self, session) -> list[mlrun.common.schemas.AlertTempla
def store_alert(self, session, alert: mlrun.common.schemas.AlertConfig):
pass

@abstractmethod
def get_all_alerts(self, session) -> list[mlrun.common.schemas.AlertConfig]:
pass

@abstractmethod
def get_num_configured_alerts(self, session) -> int:
pass

@abstractmethod
def store_alert_notifications(
self,
Expand Down
11 changes: 11 additions & 0 deletions server/api/db/sqldb/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import pytz
from sqlalchemy import MetaData, and_, distinct, func, or_, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.inspection import inspect
from sqlalchemy.orm import Session

import mlrun
Expand Down Expand Up @@ -3781,6 +3782,9 @@ def _query(self, session, cls, **kw):
kw = {k: v for k, v in kw.items() if v is not None}
return session.query(cls).filter_by(**kw)

def _get_count(self, session, cls):
return session.query(func.count(inspect(cls).primary_key[0])).scalar()

def _find_or_create_users(self, session, user_names):
users = list(self._query(session, User).filter(User.name.in_(user_names)))
new = set(user_names) - {user.name for user in users}
Expand Down Expand Up @@ -4390,6 +4394,13 @@ def get_alert_template(
self._get_alert_template_record(session, name)
)

def get_all_alerts(self, session) -> list[mlrun.common.schemas.AlertConfig]:
query = self._query(session, AlertConfig)
return list(map(self._transform_alert_config_record_to_schema, query.all()))

def get_num_configured_alerts(self, session) -> int:
return self._get_count(session, AlertConfig)

def store_alert(
self, session, alert: mlrun.common.schemas.AlertConfig
) -> mlrun.common.schemas.AlertConfig:
Expand Down
4 changes: 4 additions & 0 deletions server/api/initial_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ def init_data(


def update_default_configuration_data():
server.api.db.session.run_function_with_new_db_session(
server.api.crud.Alerts().populate_event_cache
)

logger.debug("Updating default configuration data")
db_session = create_session()
try:
Expand Down

0 comments on commit f791264

Please sign in to comment.