Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mfd_connect/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(
login_timeout: int = 15,
is_veloce: bool = False,
with_redirection: bool = True,
serial_logs_path: str | Path = None,
serial_logs_path: str | Path | None = None,
model: "BaseModel | None" = None,
cache_system_data: bool = True,
):
Expand Down
130 changes: 103 additions & 27 deletions mfd_connect/telnet/telnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,17 @@
from netaddr import IPAddress
from typing import Union, Optional, Type, Iterable, List

from mfd_connect.util import EFI_SHELL_PROMPT_REGEX, UNIX_PROMPT_REGEX, SerialKeyCode
from mfd_connect.util import (
EFI_SHELL_PROMPT_REGEX,
UNIX_PROMPT_REGEX,
ANSI_TERMINAL_ESCAPE_REGEX_STR,
ANSI_SHELL_PROMPT_FALLBACK_REGEX,
CURSOR_POSITION_REQUEST_REGEX,
FALLBACK_CURSOR_POSITION_RESPONSE,
LOGIN_PROMPT_RECOVERY_RETRIES,
LOGIN_PROMPT_RECOVERY_TIMEOUT,
SerialKeyCode,
)
from mfd_connect.exceptions import ConnectionCalledProcessError, OsNotSupported
from mfd_typing.os_values import OSName, OSType, OSBitness
from ..pathlib.path import CustomPath
Expand All @@ -33,6 +43,13 @@
add_logging_level(level_name="CMD", level_value=log_levels.CMD)
add_logging_level(level_name="OUT", level_value=log_levels.OUT)

# Login flow prompt patterns:
# 1) Login prompt asking for username (e.g. "login: ").
# 2) Configured shell prompt regex passed by user (strict/expected shell prompt).
# 3) ANSI fallback shell prompt regex for consoles printing VT/ANSI control sequences inside the prompt line.
DEFAULT_LOGIN_PROMPT = "login: "
DEFAULT_PASSWORD_PROMPT = "Password: "


class TelnetConnection(Connection):
"""Class handling communication through Telnet protocol."""
Expand All @@ -45,9 +62,9 @@ def __init__(
*,
port: int,
username: str,
password: str,
login_prompt: str = "login: ",
password_prompt: str = "Password: ",
password: str | None,
login_prompt: str = DEFAULT_LOGIN_PROMPT,
password_prompt: str = DEFAULT_PASSWORD_PROMPT,
prompt_regex: str = UNIX_PROMPT_REGEX,
execution_retries: int = 2,
retry_cooldown: int = 2,
Expand Down Expand Up @@ -95,14 +112,61 @@ def __str__(self):
def _in_pre_os(self) -> bool:
return self._prompt == EFI_SHELL_PROMPT_REGEX

def _get_shell_prompt_patterns(self) -> List[bytes]:
"""Return prompt patterns covering both plain and ANSI-colored shell prompts."""
return [self._prompt.encode(), ANSI_SHELL_PROMPT_FALLBACK_REGEX]

@staticmethod
def _strip_ansi_sequences(text: str) -> str:
"""Strip ANSI/VT terminal escape sequences from output text."""
return ANSI_TERMINAL_ESCAPE_REGEX_STR.sub("", text)

def _wait_for_shell_prompt_after_login(self, shell_prompt_patterns: list[bytes]) -> None:
"""Wait for shell prompt with lightweight recovery for CPR/control-sequence-only output."""
for attempt in range(LOGIN_PROMPT_RECOVERY_RETRIES):
timeout = self._login_timeout if attempt == 0 else LOGIN_PROMPT_RECOVERY_TIMEOUT
pattern_index, _, output = self.console.expect(shell_prompt_patterns, timeout)
is_prompt_found = pattern_index != -1

if is_prompt_found:
logger.log(
level=log_levels.MODULE_DEBUG,
msg=f"Prompt wait matched pattern {shell_prompt_patterns[pattern_index]}, raw output: {output}",
)
logger.log(level=log_levels.MODULE_DEBUG, msg="Prompt found")
return

if output and CURSOR_POSITION_REQUEST_REGEX.search(output):
logger.log(
level=log_levels.MODULE_DEBUG,
msg=f"Detected CPR request in login output, sending response: {FALLBACK_CURSOR_POSITION_RESPONSE}",
)
self.console.write(FALLBACK_CURSOR_POSITION_RESPONSE, end=b"")
continue

logger.log(
level=log_levels.MODULE_DEBUG,
msg=f"Shell prompt not found, raw console output after login (attempt {attempt + 1}): {output}",
)

if attempt < LOGIN_PROMPT_RECOVERY_RETRIES - 1:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Forcing two newlines to refresh shell prompt after control-sequence-only output",
)
self.console.write()
self.console.write()

raise ConnectionResetError("Prompt not found after entering credentials")

def _establish_telnet_connection(self) -> None:
"""
Establish connection with telnet, check if connected and then login to console.

:raises TelnetException: if encountered unexpected exception when connecting,
if connection was not established after retrying
"""
for attempt in range(self._execution_retries):
for _ in range(self._execution_retries):
try:
logger.log(level=log_levels.MODULE_DEBUG, msg="Trying to connect to target...")
self._connect()
Expand Down Expand Up @@ -144,7 +208,7 @@ def _login(self) -> None:

:raises TelnetException: if could not login to console after retrying
"""
for attempt in range(self._execution_retries):
for _ in range(self._execution_retries):
try:
self._enter_credentials()
return
Expand All @@ -164,45 +228,56 @@ def _enter_credentials(self) -> None:
:raises ConnectionResetError: if failed to enter credentials successfully
"""
logger.log(level=log_levels.MODULE_DEBUG, msg="Writing newline to prompt")
prompt_pattern_list = [self._login_prompt.encode(), self._prompt.encode()]
shell_prompt_patterns = self._get_shell_prompt_patterns()
# First expect either login prompt or already visible shell prompt.
login_or_shell_prompt_patterns = [self._login_prompt.encode(), *shell_prompt_patterns]
self.console.write()
logger.log(
level=log_levels.MODULE_DEBUG,
msg=f"Waiting for login prompt, {prompt_pattern_list}",
msg=f"Waiting for login/shell prompt, {login_or_shell_prompt_patterns}",
)

time.sleep(1)
pattern_index, match, output = self.console.expect(prompt_pattern_list, self._login_timeout)
is_login_prompt_found = pattern_index != -1
is_already_logged_in = pattern_index == 1
if not is_login_prompt_found:
pattern_index, _, output = self.console.expect(login_or_shell_prompt_patterns, self._login_timeout)
is_prompt_found = pattern_index != -1
is_already_logged_in = pattern_index in (1, 2)
if not is_prompt_found:
raise ConnectionResetError("Login prompt not found")
logger.log(
level=log_levels.MODULE_DEBUG,
msg=f"Found {prompt_pattern_list[pattern_index]} pattern, read from console: {output}",
msg=f"Found {login_or_shell_prompt_patterns[pattern_index]} pattern, read from console: {output}",
)
if is_already_logged_in:
return
logger.log(level=log_levels.MODULE_DEBUG, msg="Writing username to prompt")
self.console.write(self._username)

if self._password is not None:
logger.log(level=log_levels.MODULE_DEBUG, msg="Waiting for password prompt")
pattern_index, match, output = self.console.expect([self._password_prompt.encode()], self._login_timeout)
is_password_prompt_found = pattern_index != -1
if not is_password_prompt_found:
logger.log(level=log_levels.MODULE_DEBUG, msg="Password prompt not found, expecting command prompt")
self.console.write(b"\n")
else:
has_password = self._password not in (None, "")
if has_password:
logger.log(level=log_levels.MODULE_DEBUG, msg="Waiting for password or shell prompt")
pattern_index, _, output = self.console.expect(
[self._password_prompt.encode(), *shell_prompt_patterns], self._login_timeout
)

if pattern_index == 0:
logger.log(level=log_levels.MODULE_DEBUG, msg="Writing password to prompt")
self.console.write(self._password)
elif pattern_index in (1, 2):
logger.log(level=log_levels.MODULE_DEBUG, msg="Shell prompt found without password prompt")
else:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Password prompt not found, continuing with forced prompt refresh",
)

logger.log(
level=log_levels.MODULE_DEBUG,
msg="Sending extra newline to recover from CPR and force shell prompt refresh",
)
self.console.write()

logger.log(level=log_levels.MODULE_DEBUG, msg="Waiting for prompt")
pattern_index, match, output = self.console.expect([self._prompt.encode()], self._login_timeout)
is_prompt_found = pattern_index != -1
if not is_prompt_found:
raise ConnectionResetError("Prompt not found after entering credentials")
logger.log(level=log_levels.MODULE_DEBUG, msg="Prompt found")
self._wait_for_shell_prompt_after_login(shell_prompt_patterns)

def _clear_cmdline(self) -> None:
"""Clear commandline and wait for prompt."""
Expand Down Expand Up @@ -296,7 +371,7 @@ def _handle_telnet_broke_error(exception: Exception) -> str:
return

try:
pattern_index, match, output = self.console.expect([self._prompt.encode()], timeout)
_, _, output = self.console.expect(self._get_shell_prompt_patterns(), timeout)
# output = self.console.telnet.read_very_eager()
except self._TELNET_BROKE_ERRORS as e:
# command is already triggered, it might be risky to retry execution of some
Expand Down Expand Up @@ -536,6 +611,7 @@ def execute_command(
if not discard_stdout:
output = output[output.find("\n") + 1 :] # remove command from output
output = output[: output.rfind("\n")] # remove prompt from end of command output
output = self._strip_ansi_sequences(output).replace("\r", "")
if not skip_logging:
logger.log(level=log_levels.OUT, msg=f"output>>\n{output}")
else:
Expand Down
14 changes: 13 additions & 1 deletion mfd_connect/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,17 @@
"""Package for miscellaneous additional utilities."""

from .batch_queue import BatchQueue
from .serial_utils import SerialKeyCode, EFI_SHELL_PROMPT_REGEX, MEV_IMC_SERIAL_BAUDRATE, UNIX_PROMPT_REGEX
from .serial_utils import (
SerialKeyCode,
EFI_SHELL_PROMPT_REGEX,
UNIX_PROMPT_REGEX,
MEV_IMC_SERIAL_BAUDRATE,
ANSI_TERMINAL_ESCAPE_REGEX,
ANSI_TERMINAL_ESCAPE_REGEX_STR,
ANSI_SHELL_PROMPT_FALLBACK_REGEX,
CURSOR_POSITION_REQUEST_REGEX,
FALLBACK_CURSOR_POSITION_RESPONSE,
LOGIN_PROMPT_RECOVERY_RETRIES,
LOGIN_PROMPT_RECOVERY_TIMEOUT,
)
from .ansiterm import Ansiterm
22 changes: 22 additions & 0 deletions mfd_connect/util/serial_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: MIT
"""Serial utils."""

import re
from enum import Enum


Expand All @@ -27,6 +28,27 @@ class SerialKeyCode(Enum):
F12 = "\x1b\x40"


# Shell prompt patterns
EFI_SHELL_PROMPT_REGEX = r"(\>|Shell>) \x1b\[0m\x1b\[37m\x1b\[40m"
UNIX_PROMPT_REGEX = r"[#\$](?:\033\[0m \S*)?\s*$"

# ANSI/VT terminal escape sequence handling
ANSI_TERMINAL_ESCAPE_REGEX = rb"(?:\x1b\[[0-9;?]*[ -/]*[@-~]|\x1b[@-_])"
ANSI_TERMINAL_ESCAPE_REGEX_STR = re.compile(r"(?:\x1b\[[0-9;?]*[ -/]*[@-~]|\x1b[@-_])")
ANSI_SHELL_PROMPT_FALLBACK_REGEX = (
rb"(?:[^\r\n\x1b]|"
+ ANSI_TERMINAL_ESCAPE_REGEX
+ rb")*[#\$](?:[^\r\n\x1b]|"
+ ANSI_TERMINAL_ESCAPE_REGEX
+ rb")*\s*$"
)

# Cursor Position Request (CPR) handling for interactive shells
CURSOR_POSITION_REQUEST_REGEX = re.compile(rb"\x1b\[6n")
FALLBACK_CURSOR_POSITION_RESPONSE = b"\x1b[24;80R"

# Login recovery parameters
LOGIN_PROMPT_RECOVERY_RETRIES = 6
LOGIN_PROMPT_RECOVERY_TIMEOUT = 5

MEV_IMC_SERIAL_BAUDRATE = 115200
Loading
Loading