diff --git a/sonic-ledd/scripts/ledd b/sonic-ledd/scripts/ledd index f27296ce652e..52812dbaac29 100644 --- a/sonic-ledd/scripts/ledd +++ b/sonic-ledd/scripts/ledd @@ -10,6 +10,8 @@ try: import sys from sonic_py_common import daemon_base + from sonic_py_common import multi_asic + from sonic_py_common.interface import backplane_prefix from swsscommon import swsscommon except ImportError as e: raise ImportError (str(e) + " - required module not found") @@ -35,6 +37,9 @@ SELECT_TIMEOUT = 1000 LEDUTIL_LOAD_ERROR = 1 +# The empty namespace refers to linux host namespace. +EMPTY_NAMESPACE = '' + class DaemonLedd(daemon_base.DaemonBase): # Run daemon @@ -65,15 +70,24 @@ class DaemonLedd(daemon_base.DaemonBase): self.log_error("Failed to load ledutil: %s" % (str(e)), True) sys.exit(LEDUTIL_LOAD_ERROR) - # Open a handle to the Application database - appl_db = daemon_base.db_connect("APPL_DB") + # Load the namespace details first from the database_global.json file. + swsscommon.SonicDBConfig.initializeGlobalConfig() + + # Get the namespaces in the platform. For multi-asic devices we get the namespaces + # of front-end ascis which have front-panel interfaces. + namespaces = multi_asic.get_front_end_namespaces() # Subscribe to PORT table notifications in the Application DB + appl_db, sst = {}, {} sel = swsscommon.Select() - sst = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) - sel.addSelectable(sst) - # Listen indefinitely for changes to the PORT table in the Application DB + for namespace in namespaces: + # Open a handle to the Application database, in all namespaces + appl_db[namespace] = daemon_base.db_connect("APPL_DB", namespace=namespace) + sst[namespace] = swsscommon.SubscriberStateTable(appl_db[namespace], swsscommon.APP_PORT_TABLE_NAME) + sel.addSelectable(sst[namespace]) + + # Listen indefinitely for changes to the PORT table in the Application DB's while True: # Use timeout to prevent ignoring the signals we want to handle # in signal_handler() (e.g. SIGTERM for graceful shutdown) @@ -86,17 +100,20 @@ class DaemonLedd(daemon_base.DaemonBase): self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") continue - (key, op, fvp) = sst.pop() - - # TODO: Once these flag entries have been removed from the DB, - # we can remove this check - if key in ["PortConfigDone", "PortInitDone"]: - continue + # Get the namespace from the selectable object and use it to index the SubscriberStateTable handle. + ns=c.getDbNamespace() + (key, op, fvp) = sst[ns].pop() + if fvp: + # TODO: Once these flag entries have been removed from the DB, + # we can remove this check + if key in ["PortConfigDone", "PortInitDone"]: + continue - fvp_dict = dict(fvp) + fvp_dict = dict(fvp) - if op == "SET" and "oper_status" in fvp_dict: - led_control.port_link_state_change(key, fvp_dict["oper_status"]) + if op == "SET" and "oper_status" in fvp_dict: + if not key.startswith(backplane_prefix()): + led_control.port_link_state_change(key, fvp_dict["oper_status"]) return 1 diff --git a/sonic-xcvrd/scripts/xcvrd b/sonic-xcvrd/scripts/xcvrd index 3ed032272197..b59acdbc33a8 100644 --- a/sonic-xcvrd/scripts/xcvrd +++ b/sonic-xcvrd/scripts/xcvrd @@ -19,7 +19,8 @@ try: from enum import Enum from sonic_py_common import daemon_base, device_info, logger from swsscommon import swsscommon -except ImportError as e: + from sonic_py_common import multi_asic +except ImportError, e: raise ImportError (str(e) + " - required module not found") # @@ -95,6 +96,9 @@ platform_chassis = None # by DaemonXcvrd helper_logger = logger.Logger(SYSLOG_IDENTIFIER) +# The empty namespace refers to linux host namespace. +EMPTY_NAMESPACE = '' + # # Helper functions ============================================================= # @@ -421,14 +425,17 @@ def post_port_dom_info_to_db(logical_port_name, table, stop_event=threading.Even # Update port dom/sfp info in db def post_port_sfp_dom_info_to_db(is_warm_start, stop_event=threading.Event()): # Connect to STATE_DB and create transceiver dom/sfp info tables - transceiver_dict = {} - state_db = daemon_base.db_connect("STATE_DB") - int_tbl = swsscommon.Table(state_db, TRANSCEIVER_INFO_TABLE) - dom_tbl = swsscommon.Table(state_db, TRANSCEIVER_DOM_SENSOR_TABLE) - - appl_db = daemon_base.db_connect("APPL_DB") - app_port_tbl = swsscommon.ProducerStateTable(appl_db, - swsscommon.APP_PORT_TABLE_NAME) + transceiver_dict, state_db, appl_db, int_tbl, dom_tbl, app_port_tbl = {}, {}, {}, {}, {}, {} + + # Get the namespaces in the platform + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + appl_db[asic_id] = daemon_base.db_connect("APPL_DB", namespace) + int_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_INFO_TABLE) + dom_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) + app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db[asic_id], swsscommon.APP_PORT_TABLE_NAME) # Post all the current interface dom/sfp info to STATE_DB logical_port_list = platform_sfputil.logical @@ -436,13 +443,18 @@ def post_port_sfp_dom_info_to_db(is_warm_start, stop_event=threading.Event()): if stop_event.is_set(): break - post_port_sfp_info_to_db(logical_port_name, int_tbl, transceiver_dict, stop_event) - post_port_dom_info_to_db(logical_port_name, dom_tbl, stop_event) - post_port_dom_threshold_info_to_db(logical_port_name, dom_tbl, stop_event) + # Get the asic to which this port belongs + asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port_name) + if asic_index is None: + logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) + continue + post_port_sfp_info_to_db(logical_port_name, int_tbl[asic_index], transceiver_dict, stop_event) + post_port_dom_info_to_db(logical_port_name, dom_tbl[asic_index], stop_event) + post_port_dom_threshold_info_to_db(logical_port_name, dom_tbl[asic_index], stop_event) ## Do not notify media settings during warm reboot to avoid dataplane traffic impact if is_warm_start == False: - notify_media_setting(logical_port_name, transceiver_dict, app_port_tbl) + notify_media_setting(logical_port_name, transceiver_dict, app_port_tbl[asic_index]) transceiver_dict.clear() # Delete port dom/sfp info from db @@ -481,8 +493,15 @@ def recover_missing_sfp_table_entries(sfp_util, int_tbl, status_tbl, stop_event) for logical_port_name in logical_port_list: if stop_event.is_set(): break - if logical_port_name not in keys and not detect_port_in_error_status(logical_port_name, status_tbl): - post_port_sfp_info_to_db(logical_port_name, int_tbl, transceiver_dict, stop_event) + + # Get the asic to which this port belongs + asic_index = sfp_util.get_asic_id_for_logical_port(logical_port_name) + if asic_index is None: + logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) + continue + + if logical_port_name not in keys and not detect_port_in_error_status(logical_port_name, status_tbl[asic_index]): + post_port_sfp_info_to_db(logical_port_name, int_tbl[asic_index], transceiver_dict, stop_event) def check_port_in_range(range_str, physical_port): @@ -719,28 +738,40 @@ def detect_port_in_error_status(logical_port_name, status_tbl): # Init TRANSCEIVER_STATUS table def init_port_sfp_status_tbl(stop_event=threading.Event()): # Connect to STATE_DB and create transceiver status table - state_db = daemon_base.db_connect("STATE_DB") - status_tbl = swsscommon.Table(state_db, TRANSCEIVER_STATUS_TABLE) + state_db, status_tbl = {}, {} + + # Get the namespaces in the platform + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + status_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_STATUS_TABLE) # Init TRANSCEIVER_STATUS table logical_port_list = platform_sfputil.logical for logical_port_name in logical_port_list: if stop_event.is_set(): break + + # Get the asic to which this port belongs + asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port_name) + if asic_index is None: + logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) + continue + physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) if physical_port_list is None: helper_logger.log_error("No physical ports found for logical port '%s'" % logical_port_name) - update_port_transceiver_status_table(logical_port_name, status_tbl, SFP_STATUS_REMOVED) + update_port_transceiver_status_table(logical_port_name, status_tbl[asic_index], SFP_STATUS_REMOVED) for physical_port in physical_port_list: if stop_event.is_set(): break if not _wrapper_get_presence(physical_port): - update_port_transceiver_status_table(logical_port_name, status_tbl, SFP_STATUS_REMOVED) + update_port_transceiver_status_table(logical_port_name, status_tbl[asic_index], SFP_STATUS_REMOVED) else: - update_port_transceiver_status_table(logical_port_name, status_tbl, SFP_STATUS_INSERTED) - + update_port_transceiver_status_table(logical_port_name, status_tbl[asic_index], SFP_STATUS_INSERTED) # # Helper classes =============================================================== @@ -752,21 +783,36 @@ class DomInfoUpdateTask(object): self.task_thread = None self.task_stopping_event = threading.Event() + # Load the namespace details first from the database_global.json file. + swsscommon.SonicDBConfig.initializeGlobalConfig() + def task_worker(self): helper_logger.log_info("Start DOM monitoring loop") # Connect to STATE_DB and create transceiver dom info table - state_db = daemon_base.db_connect("STATE_DB") - dom_tbl = swsscommon.Table(state_db, TRANSCEIVER_DOM_SENSOR_TABLE) - status_tbl = swsscommon.Table(state_db, TRANSCEIVER_STATUS_TABLE) + state_db, dom_tbl, status_tbl = {}, {}, {} + + # Get the namespaces in the platform + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + dom_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) + status_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_STATUS_TABLE) # Start loop to update dom info in DB periodically while not self.task_stopping_event.wait(DOM_INFO_UPDATE_PERIOD_SECS): logical_port_list = platform_sfputil.logical for logical_port_name in logical_port_list: - if not detect_port_in_error_status(logical_port_name, status_tbl): - post_port_dom_info_to_db(logical_port_name, dom_tbl, self.task_stopping_event) - post_port_dom_threshold_info_to_db(logical_port_name, dom_tbl, self.task_stopping_event) + # Get the asic to which this port belongs + asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port_name) + if asic_index is None: + logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) + continue + + if not detect_port_in_error_status(logical_port_name, status_tbl[asic_index]): + post_port_dom_info_to_db(logical_port_name, dom_tbl[asic_index], self.task_stopping_event) + post_port_dom_threshold_info_to_db(logical_port_name, dom_tbl[asic_index], self.task_stopping_event) helper_logger.log_info("Stop DOM monitoring loop") @@ -787,6 +833,9 @@ class SfpStateUpdateTask(object): self.task_process = None self.task_stopping_event = multiprocessing.Event() + # Load the namespace details first from the database_global.json file. + swsscommon.SonicDBConfig.initializeGlobalConfig() + def _mapping_event_from_change_event(self, status, port_dict): """ mapping from what get_transceiver_change_event returns to event defined in the state machine @@ -816,15 +865,20 @@ class SfpStateUpdateTask(object): transceiver_dict = {} # Connect to STATE_DB and create transceiver dom/sfp info tables - state_db = daemon_base.db_connect("STATE_DB") - int_tbl = swsscommon.Table(state_db, TRANSCEIVER_INFO_TABLE) - dom_tbl = swsscommon.Table(state_db, TRANSCEIVER_DOM_SENSOR_TABLE) - status_tbl = swsscommon.Table(state_db, TRANSCEIVER_STATUS_TABLE) + state_db, appl_db, int_tbl, dom_tbl, status_tbl, app_port_tbl = {}, {}, {}, {}, {}, {} + + # Get the namespaces in the platform + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + int_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_INFO_TABLE) + dom_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) + status_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_STATUS_TABLE) - # Connect to APPL_DB to notify Media notifications - appl_db = daemon_base.db_connect("APPL_DB") - app_port_tbl = swsscommon.ProducerStateTable(appl_db, - swsscommon.APP_PORT_TABLE_NAME) + # Connect to APPL_DB to notify Media notifications + appl_db[asic_id] = daemon_base.db_connect("APPL_DB", namespace) + app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db[asic_id], swsscommon.APP_PORT_TABLE_NAME) # Start main loop to listen to the SFP change event. # The state migrating sequence: @@ -949,37 +1003,44 @@ class SfpStateUpdateTask(object): helper_logger.log_warning("Got unknown FP port index {}, ignored".format(key)) continue for logical_port in logical_port_list: + + # Get the asic to which this port belongs + asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port) + if asic_index is None: + logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port)) + continue + if value == SFP_STATUS_INSERTED: helper_logger.log_info("Got SFP inserted event") # A plugin event will clear the error state. - update_port_transceiver_status_table(logical_port, status_tbl, SFP_STATUS_INSERTED) + update_port_transceiver_status_table(logical_port, status_tbl[asic_index], SFP_STATUS_INSERTED) helper_logger.log_info("receive plug in and update port sfp status table.") - rc = post_port_sfp_info_to_db(logical_port, int_tbl, transceiver_dict) + rc = post_port_sfp_info_to_db(logical_port, int_tbl[asic_index], transceiver_dict) # If we didn't get the sfp info, assuming the eeprom is not ready, give a try again. if rc == SFP_EEPROM_NOT_READY: helper_logger.log_warning("SFP EEPROM is not ready. One more try...") time.sleep(TIME_FOR_SFP_READY_SECS) - post_port_sfp_info_to_db(logical_port, int_tbl, transceiver_dict) - post_port_dom_info_to_db(logical_port, dom_tbl) - post_port_dom_threshold_info_to_db(logical_port, dom_tbl) - notify_media_setting(logical_port, transceiver_dict, app_port_tbl) + post_port_sfp_info_to_db(logical_port, int_tbl[asic_index], transceiver_dict) + post_port_dom_info_to_db(logical_port, dom_tbl[asic_index]) + post_port_dom_threshold_info_to_db(logical_port, dom_tbl[asic_index]) + notify_media_setting(logical_port, transceiver_dict, app_port_tbl[asic_index]) transceiver_dict.clear() elif value == SFP_STATUS_REMOVED: helper_logger.log_info("Got SFP removed event") - update_port_transceiver_status_table(logical_port, status_tbl, SFP_STATUS_REMOVED) + update_port_transceiver_status_table(logical_port, status_tbl[asic_index], SFP_STATUS_REMOVED) helper_logger.log_info("receive plug out and pdate port sfp status table.") - del_port_sfp_dom_info_from_db(logical_port, int_tbl, dom_tbl) + del_port_sfp_dom_info_from_db(logical_port, int_tbl[asic_index], dom_tbl[asic_index]) elif value in errors_block_eeprom_reading: helper_logger.log_info("Got SFP Error event") # Add port to error table to stop accessing eeprom of it # If the port already in the error table, the stored error code will # be updated to the new one. - update_port_transceiver_status_table(logical_port, status_tbl, value) + update_port_transceiver_status_table(logical_port, status_tbl[asic_index], value) helper_logger.log_info("receive error update port sfp status table.") # In this case EEPROM is not accessible, so remove the DOM info # since it will be outdated if long time no update. # but will keep the interface info in the DB since it static. - del_port_sfp_dom_info_from_db(logical_port, None, dom_tbl) + del_port_sfp_dom_info_from_db(logical_port, None, dom_tbl[asic_index]) else: # SFP return unkown event, just ignore for now. @@ -1042,6 +1103,7 @@ class DaemonXcvrd(daemon_base.DaemonBase): super(DaemonXcvrd, self).__init__(log_identifier) self.timeout = XCVRD_MAIN_THREAD_SLEEP_SECS + self.num_asics = multi_asic.get_num_asics() self.stop_event = threading.Event() self.sfp_error_event = multiprocessing.Event() @@ -1059,9 +1121,9 @@ class DaemonXcvrd(daemon_base.DaemonBase): self.log_warning("Caught unhandled signal '" + sig + "'") # Wait for port config is done - def wait_for_port_config_done(self): + def wait_for_port_config_done(self, namespace): # Connect to APPL_DB and subscribe to PORT table notifications - appl_db = daemon_base.db_connect("APPL_DB") + appl_db = daemon_base.db_connect("APPL_DB", namespace=namespace) sel = swsscommon.Select() sst = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) @@ -1124,19 +1186,34 @@ class DaemonXcvrd(daemon_base.DaemonBase): self.log_error("Failed to load sfputil: %s" % (str(e)), True) sys.exit(SFPUTIL_LOAD_ERROR) + # Load the namespace details first from the database_global.json file. + swsscommon.SonicDBConfig.initializeGlobalConfig() + # Load port info try: - port_config_file_path = device_info.get_path_to_port_config_file() - platform_sfputil.read_porttab_mappings(port_config_file_path) - except Exception as e: + if multi_asic.is_multi_asic(): + # For multi ASIC platforms we pass DIR of port_config_file_path and the number of asics + (platform_path, hwsku_path) = device_info.get_paths_to_platform_and_hwsku_dirs() + platform_sfputil.read_all_porttab_mappings(hwsku_path, self.num_asics) + else: + # For single ASIC platforms we pass port_config_file_path and the asic_inst as 0 + port_config_file_path = device_info.get_path_to_port_config_file() + platform_sfputil.read_porttab_mappings(port_config_file_path, 0) + except Exception, e: self.log_error("Failed to read port info: %s" % (str(e)), True) sys.exit(PORT_CONFIG_LOAD_ERROR) # Connect to STATE_DB and create transceiver dom/sfp info tables - state_db = daemon_base.db_connect("STATE_DB") - self.int_tbl = swsscommon.Table(state_db, TRANSCEIVER_INFO_TABLE) - self.dom_tbl = swsscommon.Table(state_db, TRANSCEIVER_DOM_SENSOR_TABLE) - self.status_tbl = swsscommon.Table(state_db, TRANSCEIVER_STATUS_TABLE) + state_db, self.int_tbl, self.dom_tbl, self.status_tbl = {}, {}, {}, {} + + # Get the namespaces in the platform + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + self.int_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_INFO_TABLE) + self.dom_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) + self.status_tbl[asic_id] = swsscommon.Table(state_db[asic_id], TRANSCEIVER_STATUS_TABLE) self.load_media_settings() warmstart = swsscommon.WarmStart() @@ -1146,7 +1223,8 @@ class DaemonXcvrd(daemon_base.DaemonBase): # Make sure this daemon started after all port configured self.log_info("Wait for port config is done") - self.wait_for_port_config_done() + for namespace in namespaces: + self.wait_for_port_config_done(namespace) # Post all the current interface dom/sfp info to STATE_DB self.log_info("Post all port DOM/SFP info to DB") @@ -1163,8 +1241,14 @@ class DaemonXcvrd(daemon_base.DaemonBase): # Delete all the information from DB and then exit logical_port_list = platform_sfputil.logical for logical_port_name in logical_port_list: - del_port_sfp_dom_info_from_db(logical_port_name, self.int_tbl, self.dom_tbl) - delete_port_from_status_table(logical_port_name, self.status_tbl) + # Get the asic to which this port belongs + asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port_name) + if asic_index is None: + logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) + continue + + del_port_sfp_dom_info_from_db(logical_port_name, self.int_tbl[asic_index], self.dom_tbl[asic_index]) + delete_port_from_status_table(logical_port_name, self.status_tbl[asic_index]) # Run daemon def run(self):