diff --git a/sonic-ycabled/tests/test_ycable.py b/sonic-ycabled/tests/test_ycable.py index 29e7263ae004..864b408654e7 100644 --- a/sonic-ycabled/tests/test_ycable.py +++ b/sonic-ycabled/tests/test_ycable.py @@ -8,6 +8,7 @@ import os import sys import time +import traceback if sys.version_info >= (3, 3): from unittest.mock import MagicMock, patch @@ -40,20 +41,20 @@ def test_ycable_info_helper_class_run(self, mocked_sleep): with patch('ycable.ycable.platform_sfputil') as patched_util: patched_util.logical.return_value = ['Ethernet0', 'Ethernet4'] patched_util.get_asic_id_for_logical_port.return_value = 0 - Y_cable_state_task = YcableStateUpdateTask() - Y_cable_state_task.task_process = MagicMock() - Y_cable_state_task.task_stopping_event = MagicMock() + y_cable_presence = [True] stopping_event = MagicMock() sfp_error_event = MagicMock() - y_cable_presence = [True] - Y_cable_state_task.task_run(sfp_error_event, y_cable_presence) - Y_cable_state_task.task_stop() - Y_cable_task = YcableInfoUpdateTask() + Y_cable_state_task = YcableStateUpdateTask(sfp_error_event, y_cable_presence) + Y_cable_state_task.task_process = MagicMock() + Y_cable_state_task.task_stopping_event = MagicMock() + Y_cable_state_task.start() + Y_cable_state_task.join() + Y_cable_task = YcableInfoUpdateTask(y_cable_presence) Y_cable_task.task_thread = MagicMock() Y_cable_task.task_stopping_event = MagicMock() Y_cable_task.task_stopping_event.is_set = MagicMock() - Y_cable_task.task_run(y_cable_presence) - Y_cable_task.task_stop() + Y_cable_task.start() + Y_cable_task.join() Y_cable_state_task.task_stopping_event.return_value.is_set.return_value = True #Y_cable_state_task.task_worker(stopping_event, sfp_error_event, y_cable_presence) # For now just check if exception is thrown for UT purposes @@ -67,19 +68,20 @@ def test_ycable_info_helper_class_run(self, mocked_sleep): @patch("swsscommon.swsscommon.Select.select", MagicMock()) def test_ycable_helper_class_run_loop(self): Y_cable_task = YCableTableUpdateTask() + Y_cable_cli_task = YCableCliUpdateTask() Y_cable_task.task_stopping_event = MagicMock() + Y_cable_cli_task.task_stopping_event = MagicMock() Y_cable_task.task_thread = MagicMock() Y_cable_task.task_thread.start = MagicMock() Y_cable_task.task_thread.join = MagicMock() - Y_cable_task.task_cli_thead = MagicMock() - Y_cable_task.task_cli_thead.start = MagicMock() - Y_cable_task.task_cli_thead.join = MagicMock() #Y_cable_task.task_stopping_event.return_value.is_set.return_value = False swsscommon.SubscriberStateTable.return_value.pop.return_value = (True, True, {"read_side": "2"}) Y_cable_task.task_worker() - Y_cable_task.task_cli_worker() - Y_cable_task.task_run() - Y_cable_task.task_stop() + Y_cable_task.start() + Y_cable_task.join() + Y_cable_cli_task.task_cli_worker() + Y_cable_cli_task.start() + Y_cable_cli_task.join() @patch("swsscommon.swsscommon.Select", MagicMock()) @patch("swsscommon.swsscommon.Select.addSelectable", MagicMock()) @@ -89,14 +91,10 @@ def test_ycable_helper_class_run(self): Y_cable_task.task_thread = MagicMock() Y_cable_task.task_thread.start = MagicMock() Y_cable_task.task_thread.join = MagicMock() - Y_cable_task.task_cli_thead = MagicMock() - Y_cable_task.task_cli_thead.start = MagicMock() - Y_cable_task.task_cli_thead.join = MagicMock() Y_cable_task.task_stopping_event.return_value.is_set.return_value = True Y_cable_task.task_worker() - Y_cable_task.task_cli_worker() - Y_cable_task.task_run() - Y_cable_task.task_stop() + Y_cable_task.start() + Y_cable_task.join() def test_detect_port_in_error_status(self): @@ -291,9 +289,7 @@ def test_DaemonYcable_init_deinit(self): @patch('ycable.ycable.platform_sfputil', MagicMock()) @patch('ycable.ycable.DaemonYcable.load_platform_util', MagicMock()) @patch('ycable.ycable.YcableInfoUpdateTask', MagicMock()) - @patch('ycable.ycable.YcableInfoUpdateTask.task_run', MagicMock()) @patch('ycable.ycable.YcableStateUpdateTask', MagicMock()) - @patch('ycable.ycable.YcableStateUpdateTask.task_run', MagicMock()) @patch('ycable.ycable_utilities.y_cable_helper.init_ports_status_for_y_cable', MagicMock()) def test_DaemonYcable_init_deinit_full(self): ycable = DaemonYcable(SYSLOG_IDENTIFIER) @@ -327,3 +323,45 @@ def wait_until(total_wait_time, interval, call_back, *args, **kwargs): time.sleep(interval) wait_time += interval return False + + +class TestYcableScriptException(object): + + @patch("swsscommon.swsscommon.Select", MagicMock(side_effect=NotImplementedError)) + @patch("swsscommon.swsscommon.Select.addSelectable", MagicMock(side_effect=NotImplementedError)) + @patch("swsscommon.swsscommon.Select.select", MagicMock(side_effect=NotImplementedError)) + def test_ycable_helper_class_run_loop_with_exception(self): + + + + Y_cable_cli_task = YCableCliUpdateTask() + expected_exception_start = None + expected_exception_join = None + trace = None + try: + Y_cable_cli_task.start() + Y_cable_cli_task.task_cli_worker() + except Exception as e1: + expected_exception_start = e1 + trace = traceback.format_exc() + + + try: + Y_cable_cli_task.join() + except Exception as e2: + expected_exception_join = e2 + + """ + #Handy debug Helpers or else use import logging + #f = open("newfile", "w") + #f.write(format(e2)) + #f.write(format(m1)) + #f.write(trace) + """ + + assert(type(expected_exception_start) == type(expected_exception_join)) + assert(expected_exception_start.args == expected_exception_join.args) + assert("NotImplementedError" in str(trace) and "effect" in str(trace)) + assert("sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py" in str(trace)) + assert("swsscommon.Select" in str(trace)) + diff --git a/sonic-ycabled/ycable/ycable.py b/sonic-ycabled/ycable/ycable.py index 90d71ee581b7..fd3e0ecf5f52 100644 --- a/sonic-ycabled/ycable/ycable.py +++ b/sonic-ycabled/ycable/ycable.py @@ -6,10 +6,12 @@ """ try: + import os import signal import sys import time import threading + import traceback from enum import Enum from sonic_py_common import daemon_base, device_info, logger @@ -94,10 +96,14 @@ def handle_state_update_task(port, fvp_dict, y_cable_presence, stopping_event): # Thread wrapper class to update ycable info periodically -class YcableInfoUpdateTask(object): - def __init__(self): - self.task_thread = None + +class YcableInfoUpdateTask(threading.Thread): + + def __init__(self, y_cable_presence): + threading.Thread.__init__(self) + self.exc = None self.task_stopping_event = threading.Event() + self.y_cable_presence = y_cable_presence self.table_helper = y_cable_table_helper.YcableInfoUpdateTableHelper() @@ -122,25 +128,36 @@ def task_worker(self, y_cable_presence): helper_logger.log_info("Stop DOM monitoring loop") - def task_run(self, y_cable_presence): + def run(self): if self.task_stopping_event.is_set(): return - self.task_thread = threading.Thread(target=self.task_worker, args=(y_cable_presence,)) - self.task_thread.start() + try: + self.task_worker(self.y_cable_presence) + except Exception as e: + helper_logger.log_error("Exception occured at child thread YcableInfoUpdateTask due to {} {}".format(repr(e), traceback.format_exc())) + + self.exc = e - def task_stop(self): - self.task_stopping_event.set() - self.task_thread.join() + def join(self): + threading.Thread.join(self) + + if self.exc: + raise self.exc # Process wrapper class to update sfp state info periodically -class YcableStateUpdateTask(object): - def __init__(self): - self.task_process = None + + +class YcableStateUpdateTask(threading.Thread): + def __init__(self, sfp_error_event, y_cable_presence): + threading.Thread.__init__(self) + self.exc = None self.task_stopping_event = threading.Event() self.sfp_insert_events = {} + self.sfp_error_event = sfp_error_event + self.y_cable_presence = y_cable_presence self.table_helper = y_cable_table_helper.YcableStateUpdateTableHelper() @@ -192,18 +209,21 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence): handle_state_update_task(port, fvp_dict, y_cable_presence, stopping_event) - - def task_run(self, sfp_error_event, y_cable_presence): + def run(self): if self.task_stopping_event.is_set(): return - self.task_process = threading.Thread(target=self.task_worker, args=( - self.task_stopping_event, sfp_error_event, y_cable_presence)) - self.task_process.start() + try: + self.task_worker(self.task_stopping_event, self.sfp_error_event, self.y_cable_presence) + except Exception as e: + helper_logger.log_error("Exception occured at child thread YcableStateUpdateTask due to {} {}".format(repr(e), traceback.format_exc())) + self.exc = e + + def join(self): + threading.Thread.join(self) - def task_stop(self): - self.task_stopping_event.set() - self.task_process.join() + if self.exc: + raise self.exc # # Daemon ======================================================================= @@ -220,6 +240,7 @@ def __init__(self, log_identifier): self.sfp_error_event = threading.Event() self.y_cable_presence = [False] self.table_helper = y_cable_table_helper.DaemonYcableTableHelper() + self.threads = [] # Signal handler def signal_handler(self, sig, frame): @@ -349,36 +370,58 @@ def run(self): self.init() # Start the ycable task update thread - ycable_info_update = YcableInfoUpdateTask() - ycable_info_update.task_run(self.y_cable_presence) + ycable_info_update = YcableInfoUpdateTask(self.y_cable_presence) + ycable_info_update.start() + self.threads.append(ycable_info_update) # Start the sfp state info update process - ycable_state_update = YcableStateUpdateTask() - ycable_state_update.task_run(self.sfp_error_event, self.y_cable_presence) + ycable_state_update = YcableStateUpdateTask(self.sfp_error_event, self.y_cable_presence) + ycable_state_update.start() + self.threads.append(ycable_state_update) # Start the Y-cable state info update process if Y cable presence established y_cable_state_worker_update = None if self.y_cable_presence[0] is True: y_cable_state_worker_update = y_cable_helper.YCableTableUpdateTask() - y_cable_state_worker_update.task_run() + y_cable_state_worker_update.start() + self.threads.append(y_cable_state_worker_update) + y_cable_cli_worker_update = y_cable_helper.YCableCliUpdateTask() + y_cable_cli_worker_update.start() + self.threads.append(y_cable_cli_worker_update) # Start main loop self.log_info("Start daemon main loop") while not self.stop_event.wait(self.timeout): self.log_info("Ycable main loop") + # check all threads are alive + for thread in self.threads: + if thread.is_alive() is False: + try: + thread.join() + except Exception as e: + self.log_error("Exception occured at child thread {} to {}".format(thread.getName(), repr(e))) + self.log_error("thread id {} is not running, exiting main loop".format(thread.getName())) + os.kill(os.getpid(), signal.SIGKILL) - self.log_info("Stop daemon main loop") + + self.log_error("Stop daemon main loop") # Stop the ycable periodic info info update thread - ycable_info_update.task_stop() + if ycable_info_update.is_alive(): + ycable_info_update.join() # Stop the ycable update process - ycable_state_update.task_stop() + if ycable_state_update.is_alive(): + ycable_state_update.join() # Stop the Y-cable state info update process if self.y_cable_presence[0] is True: - y_cable_state_worker_update.task_stop() + if y_cable_state_worker_update.is_alive(): + y_cable_state_worker_update.join() + if y_cable_cli_worker_update.is_alive(): + y_cable_cli_worker_update.join() + # Start daemon deinitialization sequence self.deinit() diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py index c9d53823f5bd..cd6a1a6670b6 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py @@ -11,6 +11,7 @@ import sys import threading import time +import traceback from importlib import import_module @@ -3441,16 +3442,15 @@ def handle_ycable_enable_disable_tel_notification(fvp_m, key): disable_telemetry = False # Thread wrapper class to update y_cable status periodically -class YCableTableUpdateTask(object): +class YCableTableUpdateTask(threading.Thread): def __init__(self): - self.task_thread = None - self.task_cli_thread = None - self.task_download_firmware_thread = {} + threading.Thread.__init__(self) + + self.exc = None self.task_stopping_event = threading.Event() self.hw_mux_cable_tbl_keys = {} self.table_helper = y_cable_table_helper.YcableTableUpdateTableHelper() - self.cli_table_helper = y_cable_table_helper.YcableCliUpdateTableHelper() def task_worker(self): @@ -3622,6 +3622,32 @@ def task_worker(self): handle_hw_mux_cable_table_grpc_notification( fvp_n, self.table_helper.get_hw_mux_cable_tbl_peer(), asic_index, self.table_helper.get_mux_metrics_tbl(), True, port_n) + def run(self): + if self.task_stopping_event.is_set(): + return + + try: + self.task_worker() + except Exception as e: + helper_logger.log_error("Exception occured at child thread YCableTableUpdateTask due to {} {}".format(repr(e), traceback.format_exc())) + self.exc = e + + + def join(self): + threading.Thread.join(self) + + if self.exc: + raise self.exc + +class YCableCliUpdateTask(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + + self.exc = None + self.task_download_firmware_thread = {} + self.task_stopping_event = threading.Event() + self.cli_table_helper = y_cable_table_helper.YcableCliUpdateTableHelper() + def task_cli_worker(self): @@ -3825,19 +3851,25 @@ def task_cli_worker(self): break - def task_run(self): - self.task_thread = threading.Thread(target=self.task_worker) - self.task_cli_thread = threading.Thread(target=self.task_cli_worker) - self.task_thread.start() - self.task_cli_thread.start() - - def task_stop(self): - - self.task_stopping_event.set() - helper_logger.log_info("stopping the cli and probing task threads xcvrd") - self.task_thread.join() - self.task_cli_thread.join() + def run(self): + if self.task_stopping_event.is_set(): + return + try: + self.task_cli_worker() + except Exception as e: + helper_logger.log_error("Exception occured at child thread YcableCliUpdateTask due to {} {}".format(repr(e), traceback.format_exc())) + self.exc = e + + def join(self): + + threading.Thread.join(self) + for key, value in self.task_download_firmware_thread.items(): self.task_download_firmware_thread[key].join() helper_logger.log_info("stopped all thread") + if self.exc is not None: + + raise self.exc + +