diff --git a/tests/test_clis/test_port_number_is_changed.py b/tests/test_clis/test_port_number_is_changed.py new file mode 100644 index 00000000..ffb6e5dc --- /dev/null +++ b/tests/test_clis/test_port_number_is_changed.py @@ -0,0 +1,92 @@ +from __future__ import annotations +from xoa_driver.enums import MediaConfigurationType +from xoa_driver.functions.mgmt import ( + get_module_supported_media, + set_module_media_config, +) + +import timeit +import pstats +import cProfile + +import asyncio +import os +import sys +import logging +from typing import Coroutine, List, Any + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from loguru import logger # noqa: E402 +from xoa_driver.testers import L23Tester # noqa: E402 +from xoa_driver.internals.utils.managers.exceptions import NoSuchPortError # noqa: E402 + + +async def test_using_a_outdated_port(): + # Module 7 of destination chassis is supposed to have two modes: BASE-T1(7 ports), BASE-T1S(2 ports), switch to BASE-T1 to start this test. + t = await L23Tester("192.168.1.198", "Ron", enable_logging=False) + + m1 = t.modules.obtain(7) + p1 = m1.ports.obtain(3) + print(await p1.interface.get()) + print('success1') + await set_module_media_config(m1, MediaConfigurationType.BASE_T1S) + try: + print(await p1.interface.get()) + print('success2') + except ConnectionRefusedError as e: + print(e) + try: + p2 = m1.ports.obtain(3) + print(await p2.interface.get()) + print('success3') + except NoSuchPortError as e: + print(e) + + +async def test_getting_a_removed_port(): + # Module 7 of destination chassis is supposed to have two modes: BASE-T1(7 ports), BASE-T1S(2 ports), switch to BASE-T1 to start this test. + t = await L23Tester("192.168.1.198", "Ron", enable_logging=False) + m1 = t.modules.obtain(7) + print(len(m1.ports)) + + p1 = m1.ports.obtain(1) + print('success1', await p1.interface.get()) + + await set_module_media_config(m1, MediaConfigurationType.BASE_T1S) + + print(len(m1.ports)) + try: + print('success2', await p1.interface.get()) + except ConnectionRefusedError as e: + print(e) + # This will raise Exceptions. + + p2 = m1.ports.obtain(1) + print('success3', await p2.interface.get()) + # This line should do fine. + + +async def test_getting_stream_under_a_removed_port(): + # Module 7 of destination chassis is supposed to have two modes: BASE-T1(7 ports), BASE-T1S(2 ports), switch to BASE-T1 to start this test. + t = await L23Tester("192.168.1.198", "Ron", enable_logging=False) + m1 = t.modules.obtain(7) + p1 = m1.ports.obtain(3) + await p1.reservation.set_reserve() + s = await p1.streams.create() + b = s.comment + await set_module_media_config(m1, MediaConfigurationType.BASE_T1S) + + print(await b.get()) + + +def run(method: Coroutine) -> None: + import platform + + if platform.system() == "Windows": + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + asyncio.run(method) + + +if __name__ == "__main__": + run(test_getting_stream_under_a_removed_port()) diff --git a/xoa_driver/__init__.py b/xoa_driver/__init__.py index 14c5ae5d..cd809846 100644 --- a/xoa_driver/__init__.py +++ b/xoa_driver/__init__.py @@ -1,2 +1,2 @@ -__version__ = "2.0.2" +__version__ = "2.1.0" __short_version__ = "2.0" diff --git a/xoa_driver/functions/exceptions.py b/xoa_driver/functions/exceptions.py index 08710a00..b4bc51fa 100644 --- a/xoa_driver/functions/exceptions.py +++ b/xoa_driver/functions/exceptions.py @@ -50,7 +50,7 @@ def __init__(self, serdes: list[int]) -> None: class NotSupportMedia(ConfigError): def __init__(self, module: GenericAnyModule) -> None: module_id = module.module_id - self.msg = f"This module {module_id} does not support the media configuration!" + self.msg = f"This module {module_id} does not support this media configuration!" class NotSupportPortSpeed(ConfigError): diff --git a/xoa_driver/internals/commands/enums.py b/xoa_driver/internals/commands/enums.py index fd249a47..10a73e02 100644 --- a/xoa_driver/internals/commands/enums.py +++ b/xoa_driver/internals/commands/enums.py @@ -1,5 +1,5 @@ -from enum import IntEnum +from enum import IntEnum, IntFlag # region L23 enums @@ -522,10 +522,74 @@ class ProtocolOption(IntEnum): """EtherType""" # Generate RAW form 1...64 bytes - _ignore_ = "ProtocolOption i" - ProtocolOption = vars() - for i in range(1, 65): - ProtocolOption["RAW_%d" % i] = 256 - i # type: ignore + # _ignore_ = "ProtocolOption i" + # ProtocolOption = vars() + # for i in range(1, 65): + # ProtocolOption["RAW_%d" % i] = 256 - i # type: ignore + RAW_1 = 255 + RAW_2 = 254 + RAW_3 = 253 + RAW_4 = 252 + RAW_5 = 251 + RAW_6 = 250 + RAW_7 = 249 + RAW_8 = 248 + RAW_9 = 247 + RAW_10 = 246 + RAW_11 = 245 + RAW_12 = 244 + RAW_13 = 243 + RAW_14 = 242 + RAW_15 = 241 + RAW_16 = 240 + RAW_17 = 239 + RAW_18 = 238 + RAW_19 = 237 + RAW_20 = 236 + RAW_21 = 235 + RAW_22 = 234 + RAW_23 = 233 + RAW_24 = 232 + RAW_25 = 231 + RAW_26 = 230 + RAW_27 = 229 + RAW_28 = 228 + RAW_29 = 227 + RAW_30 = 226 + RAW_31 = 225 + RAW_32 = 224 + RAW_33 = 223 + RAW_34 = 222 + RAW_35 = 221 + RAW_36 = 220 + RAW_37 = 219 + RAW_38 = 218 + RAW_39 = 217 + RAW_40 = 216 + RAW_41 = 215 + RAW_42 = 214 + RAW_43 = 213 + RAW_44 = 212 + RAW_45 = 211 + RAW_46 = 210 + RAW_47 = 209 + RAW_48 = 208 + RAW_49 = 207 + RAW_50 = 206 + RAW_51 = 205 + RAW_52 = 204 + RAW_53 = 203 + RAW_54 = 202 + RAW_55 = 201 + RAW_56 = 200 + RAW_57 = 199 + RAW_58 = 198 + RAW_59 = 197 + RAW_60 = 196 + RAW_61 = 195 + RAW_62 = 194 + RAW_63 = 193 + RAW_64 = 192 class ModifierAction(IntEnum): @@ -910,6 +974,15 @@ class MediaConfigurationType(IntEnum): OSFP800 = 112 """OSFP800""" + QSFPDD800_ANLT = 113 + """QSFPDD800 ANLT""" + + QSFP112_ANLT = 114 + """QSFP112 ANLT""" + + OSFP800_ANLT = 115 + """OSFP800 ANLT""" + UNKNOWN = 255 @@ -1908,7 +1981,7 @@ class AutoNegTecAbility(IntEnum): """BAM 400G CR8 KR8""" -class AutoNegFECOption(IntEnum): +class AutoNegFECOption(IntFlag): """Auto Neg FEC Options""" DEFAULT_FEC = 0 @@ -1925,6 +1998,8 @@ class AutoNegFECOption(IntEnum): """RS 544""" RS272 = 1024 """RS 272""" + RSFEC_CL161 = 8 + """RS CL 161""" class AutoNegFECType(IntEnum): @@ -1963,7 +2038,7 @@ class AutoNegStatus(IntEnum): """AN Good""" -class AutoNegFECStatus(IntEnum): +class AutoNegFECStatus(IntFlag): """Auto Neg FEC Status""" DEFAULT_FEC = 0 @@ -1980,6 +2055,8 @@ class AutoNegFECStatus(IntEnum): """RS 544""" RS272 = 1024 """RS 272""" + RSFEC_CL161 = 8 + """RS CL 161""" class LinkTrainingMode(IntEnum): diff --git a/xoa_driver/internals/commands/pp_commands.py b/xoa_driver/internals/commands/pp_commands.py index 89f851de..ad518529 100644 --- a/xoa_driver/internals/commands/pp_commands.py +++ b/xoa_driver/internals/commands/pp_commands.py @@ -1873,9 +1873,9 @@ class GetDataAttr(ResponseBodyStruct): """coded integer, mode""" tec_ability: AutoNegTecAbility = field(XmpInt()) """coded integer, technical ability.""" - fec_capable: AutoNegFECOption = field(XmpInt()) + fec_capable: int = field(XmpInt()) """coded integer, FEC capable.""" - fec_requested: AutoNegFECOption = field(XmpInt()) + fec_requested: int = field(XmpInt()) """coded integer, FEC requested.""" pause_mode: PauseMode = field(XmpInt()) """coded integer, pause mode.""" @@ -1885,9 +1885,9 @@ class SetDataAttr(RequestBodyStruct): """coded integer, mode""" tec_ability: AutoNegTecAbility = field(XmpInt()) """coded integer, technical ability.""" - fec_capable: AutoNegFECOption = field(XmpInt()) + fec_capable: int = field(XmpInt()) """coded integer, FEC capable.""" - fec_requested: AutoNegFECOption = field(XmpInt()) + fec_requested: int = field(XmpInt()) """coded integer, FEC requested.""" pause_mode: PauseMode = field(XmpInt()) """coded integer, pause mode.""" @@ -1954,9 +1954,9 @@ class GetDataAttr(ResponseBodyStruct): """coded integer, auto-negotiation state.""" tec_ability: AutoNegTecAbility = field(XmpInt()) """coded integer, technical ability.""" - fec_capable: AutoNegFECStatus = field(XmpInt()) + fec_capable: int = field(XmpInt()) """coded integer, FEC capable partner.""" - fec_requested: AutoNegFECStatus = field(XmpInt()) + fec_requested: int = field(XmpInt()) """coded integer, FEC requested partner.""" pause_mode: PauseMode = field(XmpInt()) """coded integer, pause mode.""" diff --git a/xoa_driver/internals/core/interfaces.py b/xoa_driver/internals/core/interfaces.py index 64a497bc..c22f04e2 100644 --- a/xoa_driver/internals/core/interfaces.py +++ b/xoa_driver/internals/core/interfaces.py @@ -42,3 +42,6 @@ def subscribe(self, xmc_cls: "ICommand", callback: "t.Callable") -> None: def on_disconnected(self, callback: "t.Callable") -> None: ... + + def set_outdated(self) -> None: + ... \ No newline at end of file diff --git a/xoa_driver/internals/core/transporter/handler.py b/xoa_driver/internals/core/transporter/handler.py index ef99d827..4eadf3ea 100644 --- a/xoa_driver/internals/core/transporter/handler.py +++ b/xoa_driver/internals/core/transporter/handler.py @@ -122,3 +122,6 @@ def subscribe(self, xmc_cls: ICommand, callback: Callable) -> None: def on_disconnected(self, callback: Callable) -> None: """Regiser users callback which will be called after connection was terminated.""" self.__resp_publisher.subscribe_connection_lost(callback) + + def set_outdated(self) -> None: + pass \ No newline at end of file diff --git a/xoa_driver/internals/hli_v1/modules/module_chimera.py b/xoa_driver/internals/hli_v1/modules/module_chimera.py index 4628f6d6..6649f086 100644 --- a/xoa_driver/internals/hli_v1/modules/module_chimera.py +++ b/xoa_driver/internals/hli_v1/modules/module_chimera.py @@ -1,7 +1,6 @@ import typing import asyncio import functools -from typing import TYPE_CHECKING from typing_extensions import Self from xoa_driver.internals.commands import ( M_STATUS, @@ -29,9 +28,12 @@ from xoa_driver.internals.utils import attributes as utils from xoa_driver.internals.state_storage import modules_state from xoa_driver import ports +from xoa_driver.internals.hli_v1.modules.modules_l23.module_l23_base import MediaModule, CfpModule from . import base_module as bm -if TYPE_CHECKING: + +if typing.TYPE_CHECKING: from xoa_driver.internals.core import interfaces as itf + from xoa_driver.internals.hli_v1.modules.modules_l23.module_l23_base import ModuleL23 from . import __interfaces as m_itf @@ -60,18 +62,18 @@ class ChCFP: CFP test module (Chimera). """ - def __init__(self, conn: "itf.IConnection", module_id: int) -> None: - self.type = M_CFPTYPE(conn, module_id) + def __init__(self, conn: "itf.IConnection", module: typing.Union["ModuleL23", "ModuleChimera"]) -> None: + self.type = M_CFPTYPE(conn, module.module_id) """ The transceiver's CFP type currently inserted. :type: M_CFPTYPE """ - self.config = M_CFPCONFIGEXT(conn, module_id) + self.config = CfpModule(conn, module) """ The CFP configuration of the test module. - :type: M_CFPCONFIGEXT + :type: CfpModule """ @@ -142,7 +144,7 @@ def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") - :type: ChTiming """ - self.cfp = ChCFP(conn, self.module_id) + self.cfp = ChCFP(conn, self) """ CFP test module (Chimera). @@ -204,10 +206,10 @@ def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") - :type: M_REVISION """ - self.media = M_MEDIA(conn, self.module_id) + self.media = MediaModule(conn, self) """Test module's media type. - :type: M_MEDIA + :type: MediaModule """ self.available_speeds = M_MEDIASUPPORT(conn, self.module_id) diff --git a/xoa_driver/internals/hli_v1/modules/modules_l23/module_l23_base.py b/xoa_driver/internals/hli_v1/modules/modules_l23/module_l23_base.py index 1e5ade07..c934e08d 100644 --- a/xoa_driver/internals/hli_v1/modules/modules_l23/module_l23_base.py +++ b/xoa_driver/internals/hli_v1/modules/modules_l23/module_l23_base.py @@ -1,11 +1,10 @@ +from __future__ import annotations import asyncio import functools -from typing import ( - TYPE_CHECKING, - Optional, -) +import typing from typing_extensions import Self from xoa_driver.internals.commands import ( + M_MEDIA, M_STATUS, M_UPGRADE, M_UPGRADEPROGRESS, @@ -24,22 +23,24 @@ M_SMASTATUS, M_NAME, M_REVISION, - M_MEDIA, M_CLOCKSYNCSTATUS, M_TXCLOCKSOURCE_NEW, M_TXCLOCKSTATUS_NEW, M_TXCLOCKFILTER_NEW, ) -if TYPE_CHECKING: - from xoa_driver.internals.core import interfaces as itf from xoa_driver.internals.utils import attributes as utils from xoa_driver.internals.utils.managers import ports_manager as pm from xoa_driver.internals.state_storage import modules_state - +from xoa_driver.enums import MediaConfigurationType +from xoa_driver.internals.core.token import Token from .. import base_module as bm from .. import __interfaces as m_itf +if typing.TYPE_CHECKING: + from xoa_driver.internals.core import interfaces as itf + from xoa_driver.internals.hli_v1.modules.module_chimera import ModuleChimera + class TXClock: """Advanced timing clock""" @@ -104,20 +105,64 @@ def __init__(self, conn: "itf.IConnection", module_id: int) -> None: """ +class ExtendedToken: + def __init__( + self, token: Token, module: typing.Union["ModuleL23", "ModuleChimera"] + ) -> None: + self.__token = token + self.__module = module + + def __await__(self): + return self.__ask_then().__await__() + + async def __ask_then(self): + r = await self.__token + p_counts = (await self.__module.port_count.get()).port_count + if self.__module.ports is not None: + changed = self.__module.ports.reinit(p_counts) + if changed: + await self.__module.ports.fill() + return r + + +class MediaModule: + def __init__(self, conn: "itf.IConnection", module: typing.Union["ModuleL23", "ModuleChimera"]) -> None: + self.__media = M_MEDIA(conn, module.module_id) + self.__module = module + + def get(self) -> Token: + return self.__media.get() + + def set(self, media_config: MediaConfigurationType) -> ExtendedToken: + return ExtendedToken(self.__media.set(media_config), self.__module) + + +class CfpModule: + def __init__(self, conn: "itf.IConnection", module: typing.Union["ModuleL23", "ModuleChimera"]) -> None: + self.__cfpconfigext = M_CFPCONFIGEXT(conn, module.module_id) + self.__module = module + + def get(self) -> Token: + return self.__cfpconfigext.get() + + def set(self, portspeed_list: typing.List[int]) -> ExtendedToken: + return ExtendedToken(self.__cfpconfigext.set(portspeed_list), self.__module) + + class CFP: """Test module CFP""" - def __init__(self, conn: "itf.IConnection", module_id: int) -> None: - self.type = M_CFPTYPE(conn, module_id) + def __init__(self, conn: "itf.IConnection", module: typing.Union["ModuleL23", "ModuleChimera"]) -> None: + self.type = M_CFPTYPE(conn, module.module_id) """The transceiver's CFP type currently inserted. :type: M_CFPTYPE """ - self.config = M_CFPCONFIGEXT(conn, module_id) + self.config = CfpModule(conn, module) """The CFP configuration of the test module. - :type: M_CFPCONFIGEXT + :type: CfpModule """ @@ -196,10 +241,10 @@ def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") - :type: M_STATUS """ - self.media = M_MEDIA(conn, self.module_id) + self.media = MediaModule(conn, self) """Test module's media type. - :type: M_MEDIA + :type: ModuleMedia """ self.available_speeds = M_MEDIASUPPORT(conn, self.module_id) @@ -238,7 +283,7 @@ def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") - :type: AdvancedTiming """ - self.cfp = CFP(conn, self.module_id) + self.cfp = CFP(conn, self) """Test module's CFP configuration. :type: CFP @@ -250,7 +295,7 @@ def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") - :type: MUpgrade """ - self.ports: Optional[pm.PortsManager] = None + self.ports: typing.Optional[pm.PortsManager] = None """L23 Port Index Manager of the test module. :type: PortsManager diff --git a/xoa_driver/internals/hli_v2/modules/module_chimera.py b/xoa_driver/internals/hli_v2/modules/module_chimera.py index d7bf6d27..dc9fd993 100644 --- a/xoa_driver/internals/hli_v2/modules/module_chimera.py +++ b/xoa_driver/internals/hli_v2/modules/module_chimera.py @@ -1,7 +1,6 @@ -import typing import asyncio import functools -from typing import TYPE_CHECKING +import typing from typing_extensions import Self from xoa_driver.internals.commands import ( M_STATUS, @@ -23,8 +22,9 @@ from xoa_driver.internals.utils import attributes as utils from xoa_driver.internals.state_storage import modules_state from xoa_driver.v2 import ports +from xoa_driver.internals.hli_v2.modules.modules_l23.module_l23_base import CfpModule from . import base_module as bm -if TYPE_CHECKING: +if typing.TYPE_CHECKING: from xoa_driver.internals.core import interfaces as itf from . import __interfaces as m_itf @@ -33,6 +33,7 @@ class ChTXClock: """ Advanced timing feature (Chimera). """ + def __init__(self, conn: "itf.IConnection", module_id: int) -> None: self.source = M_TXCLOCKSOURCE_NEW(conn, module_id) """ @@ -50,16 +51,17 @@ class ChCFP: """ CFP test module (Chimera). """ - def __init__(self, conn: "itf.IConnection", module_id: int) -> None: - self.type = M_CFPTYPE(conn, module_id) + + def __init__(self, conn: "itf.IConnection", module: 'ModuleChimera') -> None: + self.type = M_CFPTYPE(conn, module.module_id) """ The transceiver's CFP type currently inserted. Representation of M_CFPTYPE """ - self.config = M_CFPCONFIGEXT(conn, module_id) + self.config = CfpModule(conn, module) """ The CFP configuration of the test module. - Representation of M_CFPCONFIGEXT + Representation of CfpModule """ @@ -67,6 +69,7 @@ class ChUpgrade: """ Upgrade test module (Chimera). """ + def __init__(self, conn: "itf.IConnection", module_id: int) -> None: self.start = M_UPGRADE(conn, module_id) """ @@ -82,6 +85,7 @@ def __init__(self, conn: "itf.IConnection", module_id: int) -> None: class ChTiming: """Test module timing and clock configuration""" + def __init__(self, conn: "itf.IConnection", module_id: int) -> None: self.clock_local_adjust = M_CLOCKPPB(conn, module_id) @@ -92,6 +96,7 @@ def __init__(self, conn: "itf.IConnection", module_id: int) -> None: class ChAdvancedTiming: """Advanced Timing config and control""" + def __init__(self, conn: "itf.IConnection", module_id: int) -> None: self.clock = ChTXClock(conn, module_id) """Advanced timing clock config and status @@ -102,6 +107,7 @@ class ModuleChimera(bm.BaseModule["modules_state.ModuleLocalState"]): """ Representation of a Chimera module on physical tester. """ + def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") -> None: super().__init__(conn, init_data) @@ -112,7 +118,7 @@ def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") - Advanced timing feature (Chimera). """ - self.cfp = ChCFP(conn, self.module_id) + self.cfp = ChCFP(conn, self) """ CFP test module (Chimera). """ @@ -216,6 +222,7 @@ async def _setup(self) -> Self: @revisions.register_chimera_module(rev="Chimera-100G-5S-2P") class MChi100G5S2P(ModuleChimera): """Chimera module Chi-100G-5S-2P""" + def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") -> None: super().__init__(conn, init_data) self.ports: pm.PortsManager[ports.PChi100G5S2P] = pm.PortsManager( @@ -231,6 +238,7 @@ def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") - @revisions.register_chimera_module(rev="Chimera-100G-5S-2P[b]") class MChi100G5S2P_b(ModuleChimera): """Chimera module Chi-100G-5S-2P[b]""" + def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") -> None: super().__init__(conn, init_data) self.ports: pm.PortsManager[ports.PChi100G5S2P_b] = pm.PortsManager( @@ -246,6 +254,7 @@ def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") - @revisions.register_chimera_module(rev="Chimera-40G-2S-2P") class MChi40G2S2P(ModuleChimera): """Chimera module Chi-40G-2S-2P""" + def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") -> None: super().__init__(conn, init_data) self.ports: pm.PortsManager[ports.PChi40G2S2P] = pm.PortsManager( diff --git a/xoa_driver/internals/hli_v2/modules/modules_l23/module_l23_base.py b/xoa_driver/internals/hli_v2/modules/modules_l23/module_l23_base.py index 9ad0c7bb..6f7628b5 100644 --- a/xoa_driver/internals/hli_v2/modules/modules_l23/module_l23_base.py +++ b/xoa_driver/internals/hli_v2/modules/modules_l23/module_l23_base.py @@ -1,11 +1,10 @@ +from __future__ import annotations import asyncio import functools -from typing import ( - TYPE_CHECKING, - Optional, -) +import typing from typing_extensions import Self from xoa_driver.internals.commands import ( + M_MEDIA, M_STATUS, M_UPGRADE, M_UPGRADEPROGRESS, @@ -24,25 +23,28 @@ M_SMASTATUS, M_NAME, M_REVISION, - M_MEDIA, M_CLOCKSYNCSTATUS, M_TXCLOCKSOURCE_NEW, M_TXCLOCKSTATUS_NEW, M_TXCLOCKFILTER_NEW, ) -if TYPE_CHECKING: - from xoa_driver.internals.core import interfaces as itf from xoa_driver.internals.utils import attributes as utils from xoa_driver.internals.utils.managers import ports_manager as pm from xoa_driver.internals.state_storage import modules_state - +from xoa_driver.enums import MediaConfigurationType +from xoa_driver.internals.core.token import Token from .. import base_module as bm from .. import __interfaces as m_itf +if typing.TYPE_CHECKING: + from xoa_driver.internals.core import interfaces as itf + from xoa_driver.internals.hli_v2.modules.module_chimera import ModuleChimera + class TXClock: """Advanced timing clock""" + def __init__(self, conn: "itf.IConnection", module_id: int) -> None: self.source = M_TXCLOCKSOURCE_NEW(conn, module_id) """The source that drives the TX clock rate of the ports on the test module. @@ -62,6 +64,7 @@ def __init__(self, conn: "itf.IConnection", module_id: int) -> None: class SMA: """SMA connector""" + def __init__(self, conn: "itf.IConnection", module_id: int) -> None: self.input = M_SMAINPUT(conn, module_id) """SMA input of the test module. @@ -81,6 +84,7 @@ def __init__(self, conn: "itf.IConnection", module_id: int) -> None: class AdvancedTiming: """Advanced Timing config and control""" + def __init__(self, conn: "itf.IConnection", module_id: int) -> None: self.clock_tx = TXClock(conn, module_id) """Advanced timing clock config and status @@ -91,15 +95,60 @@ def __init__(self, conn: "itf.IConnection", module_id: int) -> None: """ +class ExtendedToken: + def __init__( + self, token: Token, module: typing.Union["ModuleL23", "ModuleChimera"] + ) -> None: + self.__token = token + self.__module = module + + def __await__(self): + return self.__ask_then().__await__() + + async def __ask_then(self): + r = await self.__token + p_counts = (await self.__module.port_count.get()).port_count + if self.__module.ports is not None: + changed = self.__module.ports.reinit(p_counts) + if changed: + await self.__module.ports.fill() + return r + + +class MediaModule: + def __init__(self, conn: "itf.IConnection", module: typing.Union["ModuleL23", "ModuleChimera"]) -> None: + self.__media = M_MEDIA(conn, module.module_id) + self.__module = module + + def get(self) -> Token: + return self.__media.get() + + def set(self, media_config: MediaConfigurationType) -> ExtendedToken: + return ExtendedToken(self.__media.set(media_config), self.__module) + + +class CfpModule: + def __init__(self, conn: "itf.IConnection", module: typing.Union["ModuleL23", "ModuleChimera"]) -> None: + self.__cfpconfigext = M_CFPCONFIGEXT(conn, module.module_id) + self.__module = module + + def get(self) -> Token: + return self.__cfpconfigext.get() + + def set(self, portspeed_list: typing.List[int]) -> ExtendedToken: + return ExtendedToken(self.__cfpconfigext.set(portspeed_list), self.__module) + + class CFP: """Test module CFP""" - def __init__(self, conn: "itf.IConnection", module_id: int) -> None: - self.type = M_CFPTYPE(conn, module_id) + + def __init__(self, conn: "itf.IConnection", module: typing.Union["ModuleChimera", "ModuleL23"]) -> None: + self.type = M_CFPTYPE(conn, module.module_id) """The transceiver's CFP type currently inserted. Representation of M_CFPTYPE """ - self.config = M_CFPCONFIGEXT(conn, module_id) + self.config = CfpModule(conn, module) """The CFP configuration of the test module. Representation of M_CFPCONFIGEXT """ @@ -107,6 +156,7 @@ def __init__(self, conn: "itf.IConnection", module_id: int) -> None: class MTiming: """Test module timing and clock configuration""" + def __init__(self, conn: "itf.IConnection", module_id: int) -> None: self.source = M_TIMESYNC(conn, module_id) """Timing source of the test module. @@ -126,6 +176,7 @@ def __init__(self, conn: "itf.IConnection", module_id: int) -> None: class MUpgrade: """Test module upgrade""" + def __init__(self, conn: "itf.IConnection", module_id: int) -> None: self.start = M_UPGRADE(conn, module_id) """Start the upgrade progress of the test module. @@ -147,6 +198,7 @@ class ModuleL23(bm.BaseModule["modules_state.ModuleL23LocalState"]): """ Representation of a L23 test module on a physical tester. """ + def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") -> None: super().__init__(conn, init_data) @@ -167,9 +219,9 @@ def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") - Representation of M_STATUS """ - self.media = M_MEDIA(conn, self.module_id) + self.media = MediaModule(conn, self) """Test module's media configuration. - Representation of M_MEDIA + Representation of MediaModule """ self.available_speeds = M_MEDIASUPPORT(conn, self.module_id) @@ -198,7 +250,7 @@ def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") - self.advanced_timing = AdvancedTiming(conn, self.module_id) """Test module's advanced timing .""" - self.cfp = CFP(conn, self.module_id) + self.cfp = CFP(conn, self) """Test module's CFP """ self.upgrade = MUpgrade(conn, self.module_id) @@ -207,7 +259,7 @@ def __init__(self, conn: "itf.IConnection", init_data: "m_itf.ModuleInitData") - :type: MUpgrade """ - self.ports: Optional[pm.PortsManager] = None + self.ports: typing.Optional[pm.PortsManager] = None """L23 port index manager of the test module.""" @property diff --git a/xoa_driver/internals/utils/con_traffic_light.py b/xoa_driver/internals/utils/con_traffic_light.py new file mode 100644 index 00000000..d12531f8 --- /dev/null +++ b/xoa_driver/internals/utils/con_traffic_light.py @@ -0,0 +1,88 @@ +from __future__ import annotations +import asyncio +import typing as t +from xoa_driver.internals.core import interfaces as itf +from xoa_driver.internals.core.transporter.protocol.struct_request import Request +from xoa_driver.internals.core.transporter._typings import ICommand + + +class Green: + @staticmethod + def is_connected(inst: itf.IConnection) -> bool: + return inst.is_connected + + @staticmethod + def send(inst: itf.IConnection, data: bytes | bytearray | memoryview) -> None: + return inst.send(data) + + @staticmethod + def close(inst: itf.IConnection) -> None: + return inst.close() + + @staticmethod + async def prepare_data(inst: itf.IConnection, request: "Request") -> tuple[bytes, asyncio.Future]: + return await inst.prepare_data(request) + + @staticmethod + def subscribe(inst: itf.IConnection, xmc_cls: "ICommand", callback: t.Callable) -> None: + return inst.subscribe(xmc_cls, callback) + + @staticmethod + def on_disconnected(inst: itf.IConnection, callback: t.Callable) -> None: + return inst.on_disconnected(callback) + + +class Red: + @staticmethod + def is_connected(inst: itf.IConnection) -> bool: + return False + + @staticmethod + def send(inst: itf.IConnection, data: bytes | bytearray | memoryview) -> None: + raise ConnectionRefusedError("The instance is not valid anymore, please obtain() a new one.") + + @staticmethod + def close(inst) -> None: + raise ConnectionRefusedError("The instance is not valid anymore, please obtain() a new one.") + + @staticmethod + async def prepare_data(inst, request: "Request") -> tuple[bytes, asyncio.Future]: + raise ConnectionRefusedError("The instance is not valid anymore, please obtain() a new one.") + + @staticmethod + def subscribe(inst, xmc_cls: "ICommand", callback: t.Callable) -> None: + raise ConnectionRefusedError("The instance is not valid anymore, please obtain() a new one.") + + @staticmethod + def on_disconnected(inst, callback: t.Callable) -> None: + raise ConnectionRefusedError("The instance is not valid anymore, please obtain() a new one.") + + +class ConnectionTrafficLight: + __slots__ = ("_conn", "state") + + def __init__(self, connection: itf.IConnection) -> None: + self._conn = connection + self.state = Green + + def set_outdated(self) -> None: + self.state = Red + + @property + def is_connected(self) -> bool: + return self.state.is_connected(self._conn) + + def send(self, data: bytes | bytearray | memoryview) -> None: + return self.state.send(self._conn, data) + + def close(self) -> None: + return self.state.close(self._conn) + + async def prepare_data(self, request: "Request") -> tuple[bytes, asyncio.Future]: + return await self.state.prepare_data(self._conn, request) + + def subscribe(self, xmc_cls: "ICommand", callback: t.Callable) -> None: + return self.state.subscribe(self._conn, xmc_cls, callback) + + def on_disconnected(self, callback: t.Callable) -> None: + return self.state.on_disconnected(self._conn, callback) diff --git a/xoa_driver/internals/utils/managers/ports_manager.py b/xoa_driver/internals/utils/managers/ports_manager.py index 06954a67..ba889b10 100644 --- a/xoa_driver/internals/utils/managers/ports_manager.py +++ b/xoa_driver/internals/utils/managers/ports_manager.py @@ -15,9 +15,12 @@ from .abc import AbcResourcesManager from .exceptions import NoSuchPortError +from ..con_traffic_light import ConnectionTrafficLight class IPort(Protocol): + _conn: "itf.IConnection" + def __init__(self, conn: "itf.IConnection", module_id: int, port_id: int) -> None: ... @@ -47,19 +50,32 @@ def __init__(self, conn: "itf.IConnection", module_id: int, ports_type: Type[PT] super().__init__() self._conn = conn self._ports_type = ports_type - self._ports_count = ports_count self._module_id = module_id + self._items: OrderedDict[int, PT] = OrderedDict() + self._ports_count = 0 + self.reinit(ports_count) + + def reinit(self, ports_count: int) -> bool: + if ports_count == self._ports_count: + return False + + if self._items: + for v in self._items.values(): + v._conn.set_outdated() + del self._items self._items: OrderedDict[int, PT] = OrderedDict( ( port_id, self._ports_type( - conn=self._conn, + conn=ConnectionTrafficLight(self._conn), module_id=self._module_id, port_id=port_id, ) ) - for port_id in range(self._ports_count) + for port_id in range(ports_count) ) + self._ports_count = ports_count + return True async def fill(self) -> None: assert not self._lock, "Method can be called only once." @@ -88,7 +104,7 @@ async def fill(self) -> None: assert not self._lock, "Method can be called only once." coros = iter( self._resolver( - conn=self._conn, + conn=ConnectionTrafficLight(self._conn), module_id=self._module_id, port_id=port_id, )