diff --git a/openwisp_monitoring/check/classes/base.py b/openwisp_monitoring/check/classes/base.py index 71ef7582d..1c16f5201 100644 --- a/openwisp_monitoring/check/classes/base.py +++ b/openwisp_monitoring/check/classes/base.py @@ -35,15 +35,15 @@ def _get_or_create_metric(self, configuration=None): Gets or creates metric """ check = self.check_instance - if check.object_id and check.content_type: + if check.object_id and check.content_type_id: obj_id = check.object_id - ct = check.content_type + ct = ContentType.objects.get_for_id(check.content_type_id) else: obj_id = str(check.id) ct = ContentType.objects.get_for_model(Check) options = dict( object_id=obj_id, - content_type=ct, + content_type_id=ct.id, configuration=configuration or self.__class__.__name__.lower(), ) metric, created = Metric._get_or_create(**options) diff --git a/openwisp_monitoring/check/tasks.py b/openwisp_monitoring/check/tasks.py index 75e8e22d3..3f56878ae 100644 --- a/openwisp_monitoring/check/tasks.py +++ b/openwisp_monitoring/check/tasks.py @@ -84,7 +84,7 @@ def auto_create_ping( if has_check: return content_type_model = content_type_model or ContentType - ct = content_type_model.objects.get(app_label=app_label, model=model) + ct = content_type_model.objects.get_by_natural_key(app_label=app_label, model=model) check = Check( name='Ping', check_type=ping_path, content_type=ct, object_id=object_id ) @@ -108,7 +108,7 @@ def auto_create_config_check( if has_check: return content_type_model = content_type_model or ContentType - ct = content_type_model.objects.get(app_label=app_label, model=model) + ct = content_type_model.objects.get_by_natural_key(app_label=app_label, model=model) check = Check( name='Configuration Applied', check_type=config_check_path, @@ -135,7 +135,7 @@ def auto_create_iperf3_check( if has_check: return content_type_model = content_type_model or ContentType - ct = content_type_model.objects.get(app_label=app_label, model=model) + ct = content_type_model.objects.get_by_natural_key(app_label=app_label, model=model) check = Check( name='Iperf3', check_type=iperf3_check_path, diff --git a/openwisp_monitoring/check/tests/test_models.py b/openwisp_monitoring/check/tests/test_models.py index 26605e6d7..0f71fa94d 100644 --- a/openwisp_monitoring/check/tests/test_models.py +++ b/openwisp_monitoring/check/tests/test_models.py @@ -169,7 +169,9 @@ def test_config_modified_device_problem(self): self.assertEqual(m.key, 'config_applied') dm = d.monitoring dm.refresh_from_db() + m.refresh_from_db(fields=['is_healthy', 'is_healthy_tolerant']) self.assertEqual(m.is_healthy, False) + self.assertEqual(m.is_healthy_tolerant, False) self.assertEqual(dm.status, 'problem') self.assertEqual(Notification.objects.count(), 1) diff --git a/openwisp_monitoring/db/backends/influxdb/client.py b/openwisp_monitoring/db/backends/influxdb/client.py index d9e4ffa53..338258c9d 100644 --- a/openwisp_monitoring/db/backends/influxdb/client.py +++ b/openwisp_monitoring/db/backends/influxdb/client.py @@ -117,6 +117,10 @@ def dbs(self): dbs['__all__'] = dbs['default'] return dbs + @cached_property + def use_udp(self): + return TIMESERIES_DB.get('OPTIONS', {}).get('udp_writes', False) + @retry def create_or_alter_retention_policy(self, name, duration): """creates or alters existing retention policy if necessary""" diff --git a/openwisp_monitoring/db/backends/influxdb/tests.py b/openwisp_monitoring/db/backends/influxdb/tests.py index e9bdec2a0..c87d09c59 100644 --- a/openwisp_monitoring/db/backends/influxdb/tests.py +++ b/openwisp_monitoring/db/backends/influxdb/tests.py @@ -331,7 +331,7 @@ def test_timeseries_write_params(self, mock_write): retention_policy=None, tags={}, # this should be the original time at the moment of first failure - timestamp='2020-01-14T00:00:00Z', + timestamp=datetime(2020, 1, 14, tzinfo=tz('UTC')).isoformat(), current=False, ) diff --git a/openwisp_monitoring/device/api/views.py b/openwisp_monitoring/device/api/views.py index 6a8c33b68..364b20bed 100644 --- a/openwisp_monitoring/device/api/views.py +++ b/openwisp_monitoring/device/api/views.py @@ -2,6 +2,7 @@ import uuid from datetime import datetime +from cache_memoize import cache_memoize from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError from django.db.models import Count, Q @@ -45,6 +46,17 @@ Location = load_model('geo', 'Location') +def get_device_args_rewrite(view, pk): + """ + Use only the PK parameter for calculating the cache key + """ + try: + pk = uuid.UUID(pk) + except ValueError: + return pk + return pk.hex + + class DeviceMetricView(MonitoringApiViewMixin, GenericAPIView): """ Retrieve device information, monitoring status (health status), @@ -57,15 +69,23 @@ class DeviceMetricView(MonitoringApiViewMixin, GenericAPIView): """ model = DeviceData - queryset = ( - DeviceData.objects.select_related('devicelocation') - .select_related('monitoring') - .all() - ) + queryset = DeviceData.objects.only( + 'id', + 'key', + ).all() serializer_class = serializers.Serializer permission_classes = [DevicePermission] schema = schema + @classmethod + def invalidate_get_device_cache(cls, instance, **kwargs): + """ + Called from signal receiver which performs cache invalidation + """ + view = cls() + view.get_object.invalidate(view, str(instance.pk)) + logger.debug(f'invalidated view cache for device ID {instance.pk}') + def get_permissions(self): if self.request.method in SAFE_METHODS and not self.request.query_params.get( 'key' @@ -84,7 +104,7 @@ def get(self, request, pk): pk = str(uuid.UUID(pk)) except ValueError: return Response({'detail': 'not found'}, status=404) - self.instance = self.get_object() + self.instance = self.get_object(pk) response = super().get(request, pk) if not request.query_params.get('csv'): charts_data = dict(response.data) @@ -97,7 +117,7 @@ def get(self, request, pk): def _get_charts(self, request, *args, **kwargs): ct = ContentType.objects.get_for_model(Device) return Chart.objects.filter( - metric__object_id=args[0], metric__content_type=ct + metric__object_id=args[0], metric__content_type_id=ct.id ).select_related('metric') def _get_additional_data(self, request, *args, **kwargs): @@ -105,8 +125,12 @@ def _get_additional_data(self, request, *args, **kwargs): return {'data': self.instance.data} return {} + @cache_memoize(24 * 60 * 60, args_rewrite=get_device_args_rewrite) + def get_object(self, pk): + return super().get_object() + def post(self, request, pk): - self.instance = self.get_object() + self.instance = self.get_object(pk) self.instance.data = request.data # validate incoming data try: diff --git a/openwisp_monitoring/device/apps.py b/openwisp_monitoring/device/apps.py index 220a636f5..7fd19a70c 100644 --- a/openwisp_monitoring/device/apps.py +++ b/openwisp_monitoring/device/apps.py @@ -43,6 +43,7 @@ def ready(self): self.connect_is_working_changed() self.connect_device_signals() self.connect_config_status_changed() + self.connect_wifi_client_signals() self.connect_offline_device_close_wifisession() self.device_recovery_detection() self.set_update_config_model() @@ -51,19 +52,48 @@ def ready(self): self.add_connection_ignore_notification_reasons() def connect_device_signals(self): + from .api.views import DeviceMetricView + Device = load_model('config', 'Device') + DeviceData = load_model('device_monitoring', 'DeviceData') + DeviceLocation = load_model('geo', 'DeviceLocation') post_save.connect( self.device_post_save_receiver, sender=Device, dispatch_uid='device_post_save_receiver', ) + post_save.connect( + DeviceData.invalidate_cache, + sender=Device, + dispatch_uid='post_save_device_invalidate_devicedata_cache', + ) + post_save.connect( + DeviceData.invalidate_cache, + sender=DeviceLocation, + dispatch_uid='post_save_devicelocation_invalidate_devicedata_cache', + ) post_delete.connect( self.device_post_delete_receiver, sender=Device, dispatch_uid='device_post_delete_receiver', ) + post_delete.connect( + DeviceMetricView.invalidate_get_device_cache, + sender=Device, + dispatch_uid=('device_post_delete_invalidate_view_cache'), + ) + post_delete.connect( + DeviceData.invalidate_cache, + sender=Device, + dispatch_uid='post_delete_device_invalidate_devicedata_cache', + ) + post_delete.connect( + DeviceData.invalidate_cache, + sender=DeviceLocation, + dispatch_uid='post_delete_devicelocation_invalidate_devicedata_cache', + ) @classmethod def device_post_save_receiver(cls, instance, created, **kwargs): @@ -176,6 +206,22 @@ def connect_config_status_changed(cls): dispatch_uid='monitoring.config_status_changed_receiver', ) + @classmethod + def connect_wifi_client_signals(cls): + if not app_settings.WIFI_SESSIONS_ENABLED: + return + WifiClient = load_model('device_monitoring', 'WifiClient') + post_save.connect( + WifiClient.invalidate_cache, + sender=WifiClient, + dispatch_uid='post_save_invalidate_wificlient_cache', + ) + post_delete.connect( + WifiClient.invalidate_cache, + sender=WifiClient, + dispatch_uid='post_delete_invalidate_wificlient_cache', + ) + @classmethod def connect_offline_device_close_wifisession(cls): if not app_settings.WIFI_SESSIONS_ENABLED: diff --git a/openwisp_monitoring/device/base/models.py b/openwisp_monitoring/device/base/models.py index 2fc564c03..99295192c 100644 --- a/openwisp_monitoring/device/base/models.py +++ b/openwisp_monitoring/device/base/models.py @@ -54,6 +54,31 @@ def __init__(self, *args, **kwargs): self.writer = DeviceDataWriter(self) super().__init__(*args, **kwargs) + @classmethod + @cache_memoize(24 * 60 * 60) + def get_devicedata(cls, pk): + obj = ( + cls.objects.select_related('devicelocation') + .only( + 'id', + 'organization_id', + 'devicelocation__location_id', + 'devicelocation__floorplan_id', + ) + .get(id=pk) + ) + return obj + + @classmethod + def invalidate_cache(cls, instance, *args, **kwargs): + if isinstance(instance, load_model('geo', 'DeviceLocation')): + pk = instance.content_object_id + else: + if kwargs.get('created'): + return + pk = instance.pk + cls.get_devicedata.invalidate(cls, str(pk)) + def can_be_updated(self): """ Do not attempt at pushing the conf if the device is not reachable @@ -244,7 +269,7 @@ def save_data(self, time=None): self._transform_data() time = time or now() options = dict(tags={'pk': self.pk}, timestamp=time, retention_policy=SHORT_RP) - timeseries_write.delay(name=self.__key, values={'data': self.json()}, **options) + timeseries_write(name=self.__key, values={'data': self.json()}, **options) cache_key = get_device_cache_key(device=self, context='current-data') # cache current data to allow getting it without querying the timeseries DB cache.set( @@ -258,13 +283,54 @@ def save_data(self, time=None): timeout=86400, # 24 hours ) if app_settings.WIFI_SESSIONS_ENABLED: - tasks.save_wifi_clients_and_sessions.delay( - device_data=self.data, device_pk=self.pk - ) + self.save_wifi_clients_and_sessions() def json(self, *args, **kwargs): return json.dumps(self.data, *args, **kwargs) + def save_wifi_clients_and_sessions(self): + _WIFICLIENT_FIELDS = ['vendor', 'ht', 'vht', 'he', 'wmm', 'wds', 'wps'] + WifiClient = load_model('device_monitoring', 'WifiClient') + WifiSession = load_model('device_monitoring', 'WifiSession') + + active_sessions = [] + interfaces = self.data.get('interfaces', []) + for interface in interfaces: + if interface.get('type') != 'wireless': + continue + interface_name = interface.get('name') + wireless = interface.get('wireless', {}) + if not wireless or wireless['mode'] != 'access_point': + continue + ssid = wireless.get('ssid') + clients = wireless.get('clients', []) + for client in clients: + # Save WifiClient + client_obj = WifiClient.get_wifi_client(client.get('mac')) + update_fields = [] + for field in _WIFICLIENT_FIELDS: + if getattr(client_obj, field) != client.get(field): + setattr(client_obj, field, client.get(field)) + update_fields.append(field) + if update_fields: + client_obj.full_clean() + client_obj.save(update_fields=update_fields) + + # Save WifiSession + session_obj, _ = WifiSession.objects.get_or_create( + device_id=self.id, + interface_name=interface_name, + ssid=ssid, + wifi_client=client_obj, + stop_time=None, + ) + active_sessions.append(session_obj.pk) + + # Close open WifiSession + WifiSession.objects.filter(device_id=self.id, stop_time=None,).exclude( + pk__in=active_sessions + ).update(stop_time=now()) + class AbstractDeviceMonitoring(TimeStampedEditableModel): device = models.OneToOneField( @@ -385,6 +451,18 @@ class Meta: abstract = True verbose_name = _('WiFi Client') + @classmethod + @cache_memoize(24 * 60 * 60) + def get_wifi_client(cls, mac_address): + wifi_client, _ = cls.objects.get_or_create(mac_address=mac_address) + return wifi_client + + @classmethod + def invalidate_cache(cls, instance, *args, **kwargs): + if kwargs.get('created'): + return + cls.get_wifi_client.invalidate(cls, instance.mac_address) + class AbstractWifiSession(TimeStampedEditableModel): created = None diff --git a/openwisp_monitoring/device/tasks.py b/openwisp_monitoring/device/tasks.py index aabaf38ab..6799acd3c 100644 --- a/openwisp_monitoring/device/tasks.py +++ b/openwisp_monitoring/device/tasks.py @@ -36,53 +36,6 @@ def trigger_device_checks(pk, recovery=True): device.monitoring.update_status(status) -@shared_task(base=OpenwispCeleryTask) -def save_wifi_clients_and_sessions(device_data, device_pk): - _WIFICLIENT_FIELDS = ['vendor', 'ht', 'vht', 'he', 'wmm', 'wds', 'wps'] - WifiClient = load_model('device_monitoring', 'WifiClient') - WifiSession = load_model('device_monitoring', 'WifiSession') - - active_sessions = [] - interfaces = device_data.get('interfaces', []) - for interface in interfaces: - if interface.get('type') != 'wireless': - continue - interface_name = interface.get('name') - wireless = interface.get('wireless', {}) - if not wireless or wireless['mode'] != 'access_point': - continue - ssid = wireless.get('ssid') - clients = wireless.get('clients', []) - for client in clients: - # Save WifiClient - client_obj, created = WifiClient.objects.get_or_create( - mac_address=client.get('mac') - ) - update_fields = [] - for field in _WIFICLIENT_FIELDS: - if getattr(client_obj, field) != client.get(field): - setattr(client_obj, field, client.get(field)) - update_fields.append(field) - if update_fields: - client_obj.full_clean() - client_obj.save(update_fields=update_fields) - - # Save WifiSession - session_obj, _ = WifiSession.objects.get_or_create( - device_id=device_pk, - interface_name=interface_name, - ssid=ssid, - wifi_client=client_obj, - stop_time=None, - ) - active_sessions.append(session_obj.pk) - - # Close open WifiSession - WifiSession.objects.filter(device_id=device_pk, stop_time=None,).exclude( - pk__in=active_sessions - ).update(stop_time=now()) - - @shared_task(base=OpenwispCeleryTask) def delete_wifi_clients_and_sessions(days=6 * 30): WifiClient = load_model('device_monitoring', 'WifiClient') @@ -106,7 +59,7 @@ def offline_device_close_session(device_id): def write_device_metrics(pk, data, time=None, current=False): DeviceData = load_model('device_monitoring', 'DeviceData') try: - device_data = DeviceData.objects.get(id=pk) + device_data = DeviceData.get_devicedata(str(pk)) except DeviceData.DoesNotExist: return device_data.writer.write(data, time, current) diff --git a/openwisp_monitoring/device/tests/test_admin.py b/openwisp_monitoring/device/tests/test_admin.py index e187d0db5..094517472 100644 --- a/openwisp_monitoring/device/tests/test_admin.py +++ b/openwisp_monitoring/device/tests/test_admin.py @@ -3,6 +3,7 @@ from django.contrib.auth import get_user_model from django.contrib.auth.models import Permission from django.contrib.contenttypes.forms import generic_inlineformset_factory +from django.core.cache import cache from django.test import TestCase from django.urls import reverse from django.utils.timezone import now, timedelta @@ -81,6 +82,9 @@ class TestAdmin( def setUp(self): self._login_admin() + def tearDown(self): + cache.clear() + def _login_admin(self): u = User.objects.create_superuser('admin', 'admin', 'test@test.com') self.client.force_login(u) diff --git a/openwisp_monitoring/device/tests/test_api.py b/openwisp_monitoring/device/tests/test_api.py index 9f2311b8a..1af76aa77 100644 --- a/openwisp_monitoring/device/tests/test_api.py +++ b/openwisp_monitoring/device/tests/test_api.py @@ -3,6 +3,7 @@ from unittest.mock import patch from django.contrib.auth import get_user_model +from django.core.cache import cache from django.urls import reverse from django.utils import timezone from rest_framework.authtoken.models import Token @@ -66,6 +67,10 @@ class TestDeviceApi(AuthenticationMixin, TestGeoMixin, DeviceMonitoringTestCase) def setUp(self): self._login_admin() + def tearDown(self): + super().tearDown() + cache.clear() + def _login_admin(self): u = User.objects.create_superuser('admin', 'admin', 'test@test.com') self.client.force_login(u) @@ -140,17 +145,22 @@ def test_200_none(self): o = self._create_org() d = self._create_device(organization=o) data = {'type': 'DeviceMonitoring', 'interfaces': []} - r = self._post_data(d.id, d.key, data) + with self.assertNumQueries(4): + r = self._post_data(d.id, d.key, data) self.assertEqual(r.status_code, 200) # Add 1 for general metric and chart self.assertEqual(self.metric_queryset.count(), 0) self.assertEqual(self.chart_queryset.count(), 0) data = {'type': 'DeviceMonitoring'} - r = self._post_data(d.id, d.key, data) + with self.assertNumQueries(2): + r = self._post_data(d.id, d.key, data) self.assertEqual(r.status_code, 200) # Add 1 for general metric and chart self.assertEqual(self.metric_queryset.count(), 0) self.assertEqual(self.chart_queryset.count(), 0) + d.delete() + r = self._post_data(d.id, d.key, data) + self.assertEqual(r.status_code, 404) def test_200_create(self): self.create_test_data(no_resources=True) @@ -241,7 +251,12 @@ def test_device_with_location(self): # creation of resources metrics can be avoided here as it is not involved # this speeds up the test by reducing requests made del data2['resources'] - response = self._post_data(device.id, device.key, data2) + additional_queries = 0 if self._is_timeseries_udp_writes else 1 + with self.assertNumQueries(17 + additional_queries): + response = self._post_data(device.id, device.key, data2) + # Ensure cache is working + with self.assertNumQueries(13 + additional_queries): + response = self._post_data(device.id, device.key, data2) self.assertEqual(response.status_code, 200) # Add 1 for general metric and chart self.assertEqual(self.metric_queryset.count(), 4) @@ -1171,7 +1186,7 @@ def test_post_metric_write_signal(self): signal=post_metric_write, sender=Metric, values=values, - time=start_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ'), + time=start_time.isoformat(), current=False, ) self.assertEqual(signal_calls[0][1], expected_arguments) diff --git a/openwisp_monitoring/device/tests/test_models.py b/openwisp_monitoring/device/tests/test_models.py index b2c93e655..a6331b7fb 100644 --- a/openwisp_monitoring/device/tests/test_models.py +++ b/openwisp_monitoring/device/tests/test_models.py @@ -726,6 +726,10 @@ class TestWifiClientSession(TestWifiClientSessionMixin, TestCase): wifi_session_model = WifiSession device_data_model = DeviceData + def tearDown(self): + super().tearDown() + cache.clear() + def test_wifi_client_session_created(self): data = self._sample_data data['interfaces'].append(self.mesh_interface) @@ -842,7 +846,7 @@ def test_database_queries(self): with self.subTest('Test new sessions for existing clients'): data = deepcopy(self._sample_data) - with self.assertNumQueries(16): + with self.assertNumQueries(13): self._save_device_data(device_data, data) @patch.object(app_settings, 'WIFI_SESSIONS_ENABLED', False) diff --git a/openwisp_monitoring/device/writer.py b/openwisp_monitoring/device/writer.py index 40a8a467d..6cae5e909 100644 --- a/openwisp_monitoring/device/writer.py +++ b/openwisp_monitoring/device/writer.py @@ -95,7 +95,7 @@ def write(self, data, time=None, current=False): name = f'{ifname} traffic' metric, created = Metric._get_or_create( object_id=self.device_data.pk, - content_type=ct, + content_type_id=ct.id, configuration='traffic', name=name, key='traffic', @@ -116,7 +116,7 @@ def write(self, data, time=None, current=False): name = '{0} wifi clients'.format(ifname) metric, created = Metric._get_or_create( object_id=self.device_data.pk, - content_type=ct, + content_type_id=ct.id, configuration='clients', name=name, key='wifi_clients', @@ -211,7 +211,7 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No if signal_strength is not None or signal_power is not None: metric, created = Metric._get_or_create( object_id=self.device_data.pk, - content_type=ct, + content_type_id=ct.id, configuration='signal_strength', name='signal strength', key=ifname, @@ -239,7 +239,7 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No if snr is not None or signal_quality is not None: metric, created = Metric._get_or_create( object_id=self.device_data.pk, - content_type=ct, + content_type_id=ct.id, configuration='signal_quality', name='signal quality', key=ifname, @@ -252,7 +252,7 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No # create access technology chart metric, created = Metric._get_or_create( object_id=self.device_data.pk, - content_type=ct, + content_type_id=ct.id, configuration='access_tech', name='access technology', key=ifname, @@ -275,7 +275,7 @@ def _write_cpu( 'load_15': float(load[2]), } metric, created = Metric._get_or_create( - object_id=primary_key, content_type=content_type, configuration='cpu' + object_id=primary_key, content_type_id=content_type.id, configuration='cpu' ) if created: self._create_resources_chart(metric, resource='cpu') @@ -297,7 +297,7 @@ def _write_disk( size_bytes += disk['size_bytes'] available_bytes += disk['available_bytes'] metric, created = Metric._get_or_create( - object_id=primary_key, content_type=content_type, configuration='disk' + object_id=primary_key, content_type_id=content_type.id, configuration='disk' ) if created: self._create_resources_chart(metric, resource='disk') @@ -328,7 +328,9 @@ def _write_memory( 1 - (memory['available'] + memory['buffered']) / memory['total'] ) metric, created = Metric._get_or_create( - object_id=primary_key, content_type=content_type, configuration='memory' + object_id=primary_key, + content_type_id=content_type.id, + configuration='memory', ) if created: self._create_resources_chart(metric, resource='memory') diff --git a/openwisp_monitoring/monitoring/apps.py b/openwisp_monitoring/monitoring/apps.py index b66d6596f..86d337ee3 100644 --- a/openwisp_monitoring/monitoring/apps.py +++ b/openwisp_monitoring/monitoring/apps.py @@ -1,6 +1,6 @@ from django.apps import AppConfig from django.conf import settings -from django.db.models.signals import post_delete +from django.db.models.signals import post_delete, post_save from django.utils.translation import gettext_lazy as _ from swapper import get_model_name, load_model @@ -49,8 +49,29 @@ def register_menu_groups(self): def connect_metric_signals(self): Metric = load_model('monitoring', 'Metric') + AlertSettings = load_model('monitoring', 'AlertSettings') post_delete.connect( Metric.post_delete_receiver, sender=Metric, dispatch_uid='metric_post_delete_receiver', ) + post_save.connect( + Metric.invalidate_cache, + sender=Metric, + dispatch_uid='post_save_invalidate_metric_cache', + ) + post_delete.connect( + Metric.invalidate_cache, + sender=Metric, + dispatch_uid='post_delete_invalidate_metric_cache', + ) + post_save.connect( + AlertSettings.invalidate_cache, + sender=AlertSettings, + dispatch_uid='post_save_invalidate_metric_cache', + ) + post_delete.connect( + AlertSettings.invalidate_cache, + sender=AlertSettings, + dispatch_uid='post_delete_invalidate_metric_cache', + ) diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py index ea3ae6d26..2e95dbf03 100644 --- a/openwisp_monitoring/monitoring/base/models.py +++ b/openwisp_monitoring/monitoring/base/models.py @@ -9,6 +9,7 @@ from django.contrib.auth import get_user_model from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType +from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.core.validators import MaxValueValidator from django.db import models @@ -39,6 +40,22 @@ logger = logging.getLogger(__name__) +def get_or_create_metric_cache_key(**kwargs): + cache_key = [ + 'get_or_create_metric', + ] + cache_key.append( + 'key={}'.format( + kwargs.get('key', kwargs.get('name', kwargs.get('configuration'))) + ) + ) + for key in ['content_type_id', 'object_id', 'configuration']: + cache_key.append('{}={}'.format(key, kwargs.get(key))) + cache_key.append('main_tags={}'.format(kwargs.get('main_tags', OrderedDict()))) + cache_key = ','.join(cache_key) + return cache_key + + class AbstractMetric(TimeStampedEditableModel): name = models.CharField(max_length=64) key = models.SlugField( @@ -139,7 +156,10 @@ def post_delete_receiver(cls, instance, *args, **kwargs): delete_timeseries.delay(instance.key, instance.tags) @classmethod - def _get_or_create(cls, **kwargs): + def _get_or_create( + cls, + **kwargs, + ): """ like ``get_or_create`` method of django model managers but with validation before creation @@ -151,9 +171,12 @@ def _get_or_create(cls, **kwargs): if lookup_kwargs.get('name'): del lookup_kwargs['name'] extra_tags = lookup_kwargs.pop('extra_tags', {}) - metric = cls.objects.get(**lookup_kwargs) + metric_cache_key = get_or_create_metric_cache_key(**kwargs) + metric = cache.get(metric_cache_key) + if not metric: + metric = cls.objects.get(**lookup_kwargs) + cache.set(metric_cache_key, metric, 24 * 60 * 60) created = False - if extra_tags != metric.extra_tags: metric.extra_tags.update(kwargs['extra_tags']) metric.extra_tags = cls._sort_dict(metric.extra_tags) @@ -162,9 +185,26 @@ def _get_or_create(cls, **kwargs): metric = cls(**kwargs) metric.full_clean() metric.save() + cache.set(metric_cache_key, metric, 24 * 60 * 60) created = True return metric, created + @classmethod + def invalidate_cache(cls, instance, *args, **kwargs): + if kwargs.get('created', False): + return + cache_key = get_or_create_metric_cache_key( + **{ + 'name': instance.name, + 'key': instance.key, + 'content_type_id': instance.content_type_id, + 'object_id': instance.object_id, + 'configuration': instance.configuration, + 'main_tags': instance.main_tags, + } + ) + cache.delete(cache_key) + @property def codename(self): """identifier stored in timeseries db""" @@ -192,7 +232,7 @@ def _makekey(value): @property def tags(self): tags = {} - if self.content_type and self.object_id: + if self.content_type_id and self.object_id: tags.update( { 'content_type': self.content_type_key, @@ -217,7 +257,8 @@ def _sort_dict(dict_): @property def content_type_key(self): try: - return '.'.join(self.content_type.natural_key()) + content_type = ContentType.objects.get_for_id(self.content_type_id) + return '.'.join(content_type.natural_key()) except AttributeError: return None @@ -372,9 +413,12 @@ def write( current=current, ) pre_metric_write.send(**signal_kwargs) + timestamp = time or timezone.now() + if isinstance(timestamp, str): + timestamp = parse_date(timestamp) options = dict( tags=self.tags, - timestamp=time or timezone.now(), + timestamp=timestamp.isoformat(), database=database, retention_policy=retention_policy, current=current, @@ -388,7 +432,7 @@ def write( 'retention_policy': retention_policy, 'send_alert': send_alert, } - options['metric_pk'] = self.pk + options['metric'] = self # if alert_on_related_field then check threshold # on the related_field instead of field_name @@ -405,7 +449,7 @@ def write( {'value': extra_values[self.alert_field]} ) if write: - timeseries_write.delay(name=self.key, values=values, **options) + timeseries_write(name=self.key, values=values, **options) return {'name': self.key, 'values': values, **options} @classmethod @@ -417,7 +461,7 @@ def batch_write(cls, raw_data): write_data.append(metric.write(**kwargs, write=False)) except ValueError as error: error_dict[metric.key] = str(error) - timeseries_batch_write.delay(write_data) + timeseries_batch_write(write_data) if error_dict: raise ValueError(error_dict) @@ -816,6 +860,11 @@ class Meta: ('view_alertsettings_inline', 'Can view Alert settings inline'), ) + @classmethod + def invalidate_cache(cls, instance, *args, **kwargs): + Metric = instance.metric._meta.model + Metric.invalidate_cache(instance.metric) + def full_clean(self, *args, **kwargs): if self.custom_threshold == self.config_dict['threshold']: self.custom_threshold = None diff --git a/openwisp_monitoring/monitoring/migrations/__init__.py b/openwisp_monitoring/monitoring/migrations/__init__.py index f519cb6c5..58c517a90 100644 --- a/openwisp_monitoring/monitoring/migrations/__init__.py +++ b/openwisp_monitoring/monitoring/migrations/__init__.py @@ -77,7 +77,7 @@ def create_general_metrics(apps, schema_editor): name='General Clients', key='wifi_clients', object_id=None, - content_type=None, + content_type_id=None, ) if created: chart = Chart(metric=metric, configuration='gen_wifi_clients') @@ -89,7 +89,7 @@ def create_general_metrics(apps, schema_editor): name='General Traffic', key='traffic', object_id=None, - content_type=None, + content_type_id=None, ) if created: chart = Chart(metric=metric, configuration='general_traffic') diff --git a/openwisp_monitoring/monitoring/tasks.py b/openwisp_monitoring/monitoring/tasks.py index 1601e0863..9952e73ff 100644 --- a/openwisp_monitoring/monitoring/tasks.py +++ b/openwisp_monitoring/monitoring/tasks.py @@ -10,11 +10,13 @@ from .signals import post_metric_write -def _metric_post_write(name, values, metric_pk, check_threshold_kwargs, **kwargs): - if not metric_pk or not check_threshold_kwargs: +def _metric_post_write(name, values, metric, check_threshold_kwargs, **kwargs): + if not metric or not check_threshold_kwargs: return try: - metric = load_model('monitoring', 'Metric').objects.get(pk=metric_pk) + Metric = load_model('monitoring', 'Metric') + if not isinstance(metric, Metric): + metric = Metric.objects.select_related('alertsettings').get(pk=metric) except ObjectDoesNotExist: # The metric can be deleted by the time threshold is being checked. # This can happen as the task is being run async. @@ -37,14 +39,33 @@ def _metric_post_write(name, values, metric_pk, check_threshold_kwargs, **kwargs autoretry_for=(TimeseriesWriteException,), **RETRY_OPTIONS ) -def timeseries_write( - self, name, values, metric_pk=None, check_threshold_kwargs=None, **kwargs +def _timeseries_write( + self, name, values, metric=None, check_threshold_kwargs=None, **kwargs ): """ write with exponential backoff on a failure """ timeseries_db.write(name, values, **kwargs) - _metric_post_write(name, values, metric_pk, check_threshold_kwargs, **kwargs) + _metric_post_write(name, values, metric, check_threshold_kwargs, **kwargs) + + +def timeseries_write(name, values, metric=None, check_threshold_kwargs=None, **kwargs): + """ + If the timeseries database is using UDP to write data, + then write data synchronously. + """ + if timeseries_db.use_udp: + func = _timeseries_write + else: + func = _timeseries_write.delay + metric = metric.pk if metric else None + func( + name=name, + values=values, + metric=metric, + check_threshold_kwargs=check_threshold_kwargs, + **kwargs + ) @shared_task( @@ -53,7 +74,7 @@ def timeseries_write( autoretry_for=(TimeseriesWriteException,), **RETRY_OPTIONS ) -def timeseries_batch_write(self, data): +def _timeseries_batch_write(self, data): """ Similar to timeseries_write function above, but operates on list of metric data (batch operation) @@ -63,6 +84,19 @@ def timeseries_batch_write(self, data): _metric_post_write(**metric_data) +def timeseries_batch_write(data): + """ + If the timeseries database is using UDP to write data, + then write data synchronously. + """ + if timeseries_db.use_udp: + _timeseries_batch_write(data=data) + else: + for item in data: + item['metric'] = item['metric'].pk + _timeseries_batch_write.delay(data=data) + + @shared_task(base=OpenwispCeleryTask) def delete_timeseries(key, tags): timeseries_db.delete_series(key=key, tags=tags) diff --git a/openwisp_monitoring/monitoring/tests/__init__.py b/openwisp_monitoring/monitoring/tests/__init__.py index de274f358..87b8836c5 100644 --- a/openwisp_monitoring/monitoring/tests/__init__.py +++ b/openwisp_monitoring/monitoring/tests/__init__.py @@ -1,6 +1,7 @@ import time from datetime import timedelta +from django.core.cache import cache from django.utils.timezone import now from swapper import load_model @@ -194,6 +195,7 @@ def setUpClass(cls): register_metric(key, value) for key, value in charts.items(): register_chart(key, value) + cache.clear() super().setUpClass() @classmethod @@ -203,6 +205,7 @@ def tearDownClass(cls): unregister_metric(metric_name) for key in charts.keys(): unregister_chart(key) + cache.clear() super().tearDownClass() def tearDown(self): @@ -263,6 +266,6 @@ def _read_chart(self, chart, *args, **kwargs): return self._read_chart_or_metric(chart, *args, **kwargs) def _write_metric(self, metric, *args, **kwargs): + metric.write(*args, **kwargs) if self._is_timeseries_udp_writes: time.sleep(0.12) - metric.write(*args, **kwargs) diff --git a/openwisp_monitoring/monitoring/tests/test_models.py b/openwisp_monitoring/monitoring/tests/test_models.py index c518fb24e..c63f0dc81 100644 --- a/openwisp_monitoring/monitoring/tests/test_models.py +++ b/openwisp_monitoring/monitoring/tests/test_models.py @@ -2,6 +2,7 @@ from django.contrib.auth import get_user_model from django.contrib.contenttypes.models import ContentType +from django.core.cache import cache from django.core.exceptions import ValidationError from django.test import TestCase from django.utils import timezone @@ -21,6 +22,10 @@ class TestModels(TestMonitoringMixin, TestCase): + def tearDown(self): + cache.clear() + super().tearDown() + def test_general_metric_str(self): m = Metric(name='Test metric') self.assertEqual(str(m), m.name) @@ -110,7 +115,7 @@ def test_get_or_create_renamed_object(self): m, created = Metric._get_or_create( name='logins', configuration='test_metric', - content_type=ct, + content_type_id=ct.id, object_id=obj.pk, ) self.assertTrue(created) @@ -119,7 +124,7 @@ def test_get_or_create_renamed_object(self): m2, created = Metric._get_or_create( name='logins', configuration='test_metric', - content_type=ct, + content_type_id=ct.id, object_id=obj.pk, ) self.assertEqual(m.id, m2.id) @@ -232,7 +237,8 @@ def test_threshold_is_crossed_deferred_2(self): ) m.write(60) m.write(99) - self.assertEqual(m.is_healthy, True) + m.refresh_from_db(fields=['is_healthy', 'is_healthy_tolerant']) + self.assertEqual(m.is_healthy, False) self.assertEqual(m.is_healthy_tolerant, True) def test_general_check_threshold_no_exception(self): @@ -295,7 +301,7 @@ def test_metric_post_write_signals_emitted(self): metric=om, values={om.field_name: 3}, signal=post_metric_write, - time=start_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ'), + time=start_time.isoformat(), current=True, ) @@ -332,22 +338,24 @@ def test_tolerance(self): ) with self.subTest('within tolerance, no alerts expected'): m.write(99, time=timezone.now() - timedelta(minutes=2)) - self.assertEqual(m.is_healthy, True) + m.refresh_from_db(fields=['is_healthy', 'is_healthy_tolerant']) + self.assertEqual(m.is_healthy, False) self.assertEqual(m.is_healthy_tolerant, True) self.assertEqual(Notification.objects.count(), 0) m.write(99, time=timezone.now() - timedelta(minutes=4)) - self.assertEqual(m.is_healthy, True) + m.refresh_from_db(fields=['is_healthy', 'is_healthy_tolerant']) + self.assertEqual(m.is_healthy, False) self.assertEqual(m.is_healthy_tolerant, True) self.assertEqual(Notification.objects.count(), 0) with self.subTest('tolerance trepassed, alerts expected'): m.write(99, time=timezone.now() - timedelta(minutes=6)) - m.refresh_from_db() + m.refresh_from_db(fields=['is_healthy', 'is_healthy_tolerant']) self.assertEqual(m.is_healthy, False) self.assertEqual(m.is_healthy_tolerant, False) self.assertEqual(Notification.objects.count(), 1) with self.subTest('value back to normal, tolerance not considered'): m.write(71, time=timezone.now() - timedelta(minutes=7)) - m.refresh_from_db() + m.refresh_from_db(fields=['is_healthy', 'is_healthy_tolerant']) self.assertEqual(m.is_healthy, True) self.assertEqual(m.is_healthy_tolerant, True) self.assertEqual(Notification.objects.count(), 2) diff --git a/openwisp_monitoring/monitoring/tests/test_monitoring_notifications.py b/openwisp_monitoring/monitoring/tests/test_monitoring_notifications.py index ded1baa15..65580d033 100644 --- a/openwisp_monitoring/monitoring/tests/test_monitoring_notifications.py +++ b/openwisp_monitoring/monitoring/tests/test_monitoring_notifications.py @@ -96,7 +96,8 @@ def test_general_check_threshold_deferred_not_crossed(self): metric=m, custom_operator='>', custom_threshold=90, custom_tolerance=1 ) m.write(99) - self.assertEqual(m.is_healthy, True) + m.refresh_from_db(fields=['is_healthy', 'is_healthy_tolerant']) + self.assertEqual(m.is_healthy, False) self.assertEqual(m.is_healthy_tolerant, True) self.assertEqual(Notification.objects.count(), 0) @@ -528,11 +529,12 @@ def test_general_check_threshold_with_alert_field_deferred_not_crossed(self): metric=m, custom_operator='>', custom_threshold=30, custom_tolerance=1 ) m.write(10, extra_values={'test_related_2': 32}) - self.assertEqual(m.is_healthy, True) + m.refresh_from_db(fields=['is_healthy', 'is_healthy_tolerant']) + self.assertEqual(m.is_healthy, False) self.assertEqual(m.is_healthy_tolerant, True) self.assertEqual(Notification.objects.count(), 0) m.write(20, extra_values={'test_related_2': 35}) - self.assertEqual(m.is_healthy, True) + self.assertEqual(m.is_healthy, False) self.assertEqual(m.is_healthy_tolerant, True) self.assertEqual(Notification.objects.count(), 0)