From 8e6d8d82a7f3de046cde07b443e7eff9d3b03379 Mon Sep 17 00:00:00 2001 From: themylogin Date: Fri, 12 Apr 2019 00:42:05 +0200 Subject: [PATCH 1/2] Block VolumeStatus alert source while creating pool (cherry picked from commit 726ed446a0691682c44ebe3f3cab274f4d0f3737) --- gui/system/forms.py | 71 +++++++++++--------- src/middlewared/middlewared/plugins/alert.py | 63 +++++++++++------ 2 files changed, 81 insertions(+), 53 deletions(-) diff --git a/gui/system/forms.py b/gui/system/forms.py index dd2dd33f3862..03bd60840c5b 100644 --- a/gui/system/forms.py +++ b/gui/system/forms.py @@ -385,39 +385,44 @@ def done(self, form_list, **kwargs): with open(WIZARD_PROGRESSFILE, 'wb') as f: f.write(pickle.dumps(progress)) - if volume_import: - volume_name, guid = cleaned_data.get( - 'volume_id' - ).split('|') - if not _n.zfs_import(volume_name, guid): - raise MiddlewareError(_( - 'Volume "%s" import failed! Check the pool ' - 'status for more details.' - ) % volume_name) - - volume = Volume(vol_name=volume_name) - volume.save() - model_objs.append(volume) - - scrub = Scrub.objects.create(scrub_volume=volume) - model_objs.append(scrub) - - if volume_form: - bysize = volume_form._get_unused_disks_by_size() - - if volume_type == 'auto': - groups = volume_form._grp_autoselect(bysize) - else: - groups = volume_form._grp_predefined(bysize, volume_type) - - with client as c: - c.call('pool.create', { - 'name': volume_name, - 'topology': groups, - }) - - volume = Volume.objects.get(vol_name=volume_name) - model_objs.append(volume) + with client as c: + lock = c.call("alert.block_source", "VolumeStatus") + try: + if volume_import: + volume_name, guid = cleaned_data.get( + 'volume_id' + ).split('|') + if not _n.zfs_import(volume_name, guid): + raise MiddlewareError(_( + 'Volume "%s" import failed! Check the pool ' + 'status for more details.' + ) % volume_name) + + volume = Volume(vol_name=volume_name) + volume.save() + model_objs.append(volume) + + scrub = Scrub.objects.create(scrub_volume=volume) + model_objs.append(scrub) + + if volume_form: + bysize = volume_form._get_unused_disks_by_size() + + if volume_type == 'auto': + groups = volume_form._grp_autoselect(bysize) + else: + groups = volume_form._grp_predefined(bysize, volume_type) + + with client as c: + c.call('pool.create', { + 'name': volume_name, + 'topology': groups, + }) + + volume = Volume.objects.get(vol_name=volume_name) + model_objs.append(volume) + finally: + c.call("alert.unblock_source", lock) # Create SMART tests for every disk available disks = [] diff --git a/src/middlewared/middlewared/plugins/alert.py b/src/middlewared/middlewared/plugins/alert.py index 7e08df576426..368a10cc3f4e 100644 --- a/src/middlewared/middlewared/plugins/alert.py +++ b/src/middlewared/middlewared/plugins/alert.py @@ -83,6 +83,9 @@ class AlertService(Service): def __init__(self, middleware): super().__init__(middleware) + self.blocked_sources = defaultdict(set) + self.sources_locks = {} + @private async def initialize(self): self.node = "A" @@ -404,38 +407,43 @@ async def __run_alerts(self): self.alert_source_last_run[alert_source.name] = datetime.utcnow() - self.logger.trace("Running alert source: %r", alert_source.name) + alerts_a = [alert + for alert in self.alerts + if alert.node == master_node and alert.source == alert_source.name] + locked = False + if self.blocked_sources[alert_source.name]: + self.logger.debug("Not running alert source %r because it is blocked", alert_source.name) + locked = True + else: + self.logger.trace("Running alert source: %r", alert_source.name) - try: - alerts_a = await self.__run_source(alert_source.name) - except UnavailableException: - alerts_a = [alert - for alert in self.alerts - if alert.node == master_node and alert.source == alert_source.name] + try: + alerts_a = await self.__run_source(alert_source.name) + except UnavailableException: + pass for alert in alerts_a: alert.node = master_node alerts_b = [] if run_on_backup_node and alert_source.run_on_backup_node: try: + alerts_b = [alert + for alert in self.alerts + if alert.node == backup_node and alert.source == alert_source.name] try: - alerts_b = await self.middleware.call("failover.call_remote", "alert.run_source", - [alert_source.name]) + if not locked: + alerts_b = await self.middleware.call("failover.call_remote", "alert.run_source", + [alert_source.name]) + + alerts_b = [Alert(**dict(alert, + level=(AlertLevel(alert["level"]) if alert["level"] is not None + else alert["level"]))) + for alert in alerts_b] except CallError as e: if e.errno == CallError.EALERTCHECKERUNAVAILABLE: - alerts_b = [alert - for alert in self.alerts - if alert.node == backup_node and alert.source == alert_source.name] + pass else: raise - else: - alerts_b = [Alert(**dict(alert, - klass=AlertClass.class_by_name[alert["klass"]], - _uuid=alert.pop("id"), - _source=alert.pop("source"), - _key=alert.pop("key"), - _text=alert.pop("text"))) - for alert in alerts_b] except Exception: alerts_b = [ Alert(AlertSourceRunFailedOnBackupNodeAlertClass, @@ -487,6 +495,21 @@ async def run_source(self, source_name): except UnavailableException: raise CallError("This alert checker is unavailable", CallError.EALERTCHECKERUNAVAILABLE) + @private + async def block_source(self, source_name): + if source_name not in ALERT_SOURCES: + raise CallError("Invalid alert source") + + lock = str(uuid.uuid4()) + self.blocked_sources[source_name].add(lock) + self.sources_locks[lock] = source_name + return lock + + @private + async def unblock_source(self, lock): + source_name = self.sources_locks.pop(lock) + self.blocked_sources[source_name].remove(lock) + async def __run_source(self, source_name): alert_source = ALERT_SOURCES[source_name] From 1f9f9f8729bfe35b9716b1b66b800a50afaa044a Mon Sep 17 00:00:00 2001 From: themylogin Date: Fri, 12 Apr 2019 01:12:13 +0200 Subject: [PATCH 2/2] Make alert.block_source expire on timeout (cherry picked from commit 076bf3f5206a3384dfb558a34eae58928a6a060c) --- src/middlewared/middlewared/plugins/alert.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/middlewared/middlewared/plugins/alert.py b/src/middlewared/middlewared/plugins/alert.py index 368a10cc3f4e..b4104a37203b 100644 --- a/src/middlewared/middlewared/plugins/alert.py +++ b/src/middlewared/middlewared/plugins/alert.py @@ -1,7 +1,8 @@ -from collections import defaultdict +from collections import defaultdict, namedtuple from datetime import datetime import errno import os +import time import traceback import uuid @@ -34,6 +35,8 @@ ALERT_SOURCES = {} ALERT_SERVICES_FACTORIES = {} +AlertSourceLock = namedtuple("AlertSourceLock", ["source_name", "expires_at"]) + class AlertSourceRunFailedAlertClass(AlertClass): category = AlertCategory.SYSTEM @@ -401,6 +404,10 @@ async def __run_alerts(self): if remote_failover_status == "BACKUP": run_on_backup_node = True + for k, source_lock in list(self.sources_locks.items()): + if source_lock.expires_at <= time.monotonic(): + await self.unblock_source(k) + for alert_source in ALERT_SOURCES.values(): if not alert_source.schedule.should_run(datetime.utcnow(), self.alert_source_last_run[alert_source.name]): continue @@ -496,19 +503,19 @@ async def run_source(self, source_name): raise CallError("This alert checker is unavailable", CallError.EALERTCHECKERUNAVAILABLE) @private - async def block_source(self, source_name): + async def block_source(self, source_name, timeout=3600): if source_name not in ALERT_SOURCES: raise CallError("Invalid alert source") lock = str(uuid.uuid4()) self.blocked_sources[source_name].add(lock) - self.sources_locks[lock] = source_name + self.sources_locks[lock] = AlertSourceLock(source_name, time.monotonic() + timeout) return lock @private async def unblock_source(self, lock): - source_name = self.sources_locks.pop(lock) - self.blocked_sources[source_name].remove(lock) + source_lock = self.sources_locks.pop(lock) + self.blocked_sources[source_lock.source_name].remove(lock) async def __run_source(self, source_name): alert_source = ALERT_SOURCES[source_name]