Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions docs/history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ History
``NotImplementedError``; the mask is accepted (logged only).
* bugfix: echo doubling in ``--pty-exec`` without ``--pty-raw`` (linemode).
* bugfix: missing LICENSE.txt in sdist file.
* bugfix: GMCP, MSDP, and MSSP decoding now uses ``--encoding`` when set,
falling back to latin-1 for non-UTF-8 bytes instead of lossy replacement.
* bugfix: ``NEW_ENVIRON SEND`` with empty payload now correctly
interpreted as "send all" per :rfc:`1572`.
* new: :mod:`telnetlib3.mud` module with encode/decode functions for
GMCP (option 201), MSDP (option 69), and MSSP (option 70) MUD telnet
protocols.
Expand All @@ -29,10 +33,10 @@ History
``DONT``/``WONT`` instead of raising ``ValueError``.
* enhancement: ``NEW_ENVIRON SEND`` and response logging improved --
``SEND (all)`` / ``env send: (empty)`` instead of raw byte dumps.
* bugfix: GMCP, MSDP, and MSSP decoding now uses ``--encoding`` when set,
falling back to latin-1 for non-UTF-8 bytes instead of lossy replacement.
* enhancement: ``telnetlib3-fingerprint`` now probes MSDP and MSSP options
and captures MSSP server status data in session output.
* new: ``--always-will``, ``--always-do``, ``--scan-type``, ``--mssp-wait``,
``--banner-quiet-time``, ``--banner-max-wait`` options for ``telnetlib3-fingerprint``.

2.2.0
* bugfix: workaround for Microsoft Telnet client crash on
Expand Down
144 changes: 129 additions & 15 deletions telnetlib3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,34 @@ async def run_client() -> None:
)
log.debug(config_msg)

always_will: set[bytes] = args["always_will"]
always_do: set[bytes] = args["always_do"]

# Wrap client factory to inject always_will/always_do before negotiation
client_factory: Optional[Callable[..., client_base.BaseClient]] = None
if always_will or always_do:

def _client_factory(**kwargs: Any) -> client_base.BaseClient:
client: TelnetClient
if sys.platform != "win32" and sys.stdin.isatty():
client = TelnetTerminalClient(**kwargs)
else:
client = TelnetClient(**kwargs)
orig_connection_made = client.connection_made

def _patched_connection_made(transport: asyncio.BaseTransport) -> None:
orig_connection_made(transport)
assert client.writer is not None
client.writer.always_will = always_will
client.writer.always_do = always_do

client.connection_made = _patched_connection_made # type: ignore[method-assign]
return client

client_factory = _client_factory

# Build connection kwargs explicitly to avoid pylint false positive
connection_kwargs = {
connection_kwargs: Dict[str, Any] = {
"encoding": args["encoding"],
"tspeed": args["tspeed"],
"shell": args["shell"],
Expand All @@ -514,6 +540,8 @@ async def run_client() -> None:
"connect_timeout": args["connect_timeout"],
"send_environ": args["send_environ"],
}
if client_factory is not None:
connection_kwargs["client_factory"] = client_factory

# connect
_, writer = await open_connection(args["host"], args["port"], **connection_kwargs)
Expand Down Expand Up @@ -565,9 +593,40 @@ def _get_argument_parser() -> argparse.ArgumentParser:
default="TERM,LANG,COLUMNS,LINES,COLORTERM",
help="comma-separated environment variables to send (NEW_ENVIRON)",
)
parser.add_argument(
"--always-will",
action="append",
default=[],
metavar="OPT",
help="always send WILL for this option (name like MXP or number, repeatable)",
)
parser.add_argument(
"--always-do",
action="append",
default=[],
metavar="OPT",
help="always send DO for this option (name like GMCP or number, repeatable)",
)
return parser


def _parse_option_arg(value: str) -> bytes:
"""
Resolve a telnet option name or integer to option bytes.

:param value: Option name (e.g. ``"MXP"``) or decimal byte value (e.g. ``"91"``).
:returns: Single-byte option value.
:raises ValueError: When *value* is not a known name or valid integer.
"""
# local
from .telopt import option_from_name # pylint: disable=import-outside-toplevel

try:
return option_from_name(value)
except KeyError:
return bytes([int(value)])


def _transform_args(args: argparse.Namespace) -> Dict[str, Any]:
return {
"host": args.host,
Expand All @@ -584,12 +643,18 @@ def _transform_args(args: argparse.Namespace) -> Dict[str, Any]:
"connect_minwait": args.connect_minwait,
"connect_timeout": args.connect_timeout,
"send_environ": tuple(v.strip() for v in args.send_environ.split(",") if v.strip()),
"always_will": {_parse_option_arg(v) for v in args.always_will},
"always_do": {_parse_option_arg(v) for v in args.always_do},
}


def main() -> None:
"""Entry point for telnetlib3-client command."""
asyncio.run(run_client())
try:
asyncio.run(run_client())
except OSError as err:
print(f"Error: {err}", file=sys.stderr)
sys.exit(1)


def _get_fingerprint_argument_parser() -> argparse.ArgumentParser:
Expand Down Expand Up @@ -634,13 +699,48 @@ def _get_fingerprint_argument_parser() -> argparse.ArgumentParser:
parser.add_argument(
"--ttype", default="VT100", help="terminal type sent in response to TTYPE requests"
)
parser.add_argument(
"--scan-type",
choices=["quick", "full"],
default="quick",
help="probe depth: 'quick' probes core options only, " "'full' includes legacy options",
)
parser.add_argument(
"--send-env",
action="append",
metavar="KEY=VALUE",
default=[],
help="environment variable to send (repeatable)",
)
parser.add_argument(
"--always-will",
action="append",
default=[],
metavar="OPT",
help="always send WILL for this option (name like MXP or number, repeatable)",
)
parser.add_argument(
"--always-do",
action="append",
default=[],
metavar="OPT",
help="always send DO for this option (name like GMCP or number, repeatable)",
)
parser.add_argument(
"--mssp-wait",
default=5.0,
type=float,
help="max seconds since connect to wait for MSSP data",
)
parser.add_argument(
"--banner-quiet-time",
default=2.0,
type=float,
help="seconds of silence before considering banner complete",
)
parser.add_argument(
"--banner-max-wait", default=8.0, type=float, help="max seconds to wait for banner data"
)
return parser


Expand Down Expand Up @@ -674,8 +774,16 @@ async def run_fingerprint_client() -> None:
silent=args.silent,
set_name=args.set_name,
environ_encoding=args.stream_encoding,
scan_type=args.scan_type,
mssp_wait=args.mssp_wait,
banner_quiet_time=args.banner_quiet_time,
banner_max_wait=args.banner_max_wait,
)

# Parse --always-will/--always-do option names/numbers
fp_always_will = {_parse_option_arg(v) for v in args.always_will}
fp_always_do = {_parse_option_arg(v) for v in args.always_do}

# Parse --send-env KEY=VALUE pairs
extra_env: Dict[str, str] = {}
for item in args.send_env:
Expand Down Expand Up @@ -705,6 +813,8 @@ def patched_connection_made(transport: asyncio.BaseTransport) -> None:
orig_connection_made(transport)
assert client.writer is not None
client.writer.environ_encoding = environ_encoding
client.writer.always_will = fp_always_will
client.writer.always_do = fp_always_do

def patched_send_env(keys: Sequence[str]) -> Dict[str, Any]:
result = orig_send_env(keys)
Expand All @@ -718,18 +828,22 @@ def patched_send_env(keys: Sequence[str]) -> Dict[str, Any]:

waiter_closed: asyncio.Future[None] = asyncio.get_event_loop().create_future()

_, writer = await open_connection(
host=args.host,
port=args.port,
client_factory=fingerprint_client_factory,
shell=shell,
encoding=False,
term=ttype,
connect_minwait=2.0,
connect_maxwait=4.0,
connect_timeout=args.connect_timeout,
waiter_closed=waiter_closed,
)
try:
_, writer = await open_connection(
host=args.host,
port=args.port,
client_factory=fingerprint_client_factory,
shell=shell,
encoding=False,
term=ttype,
connect_minwait=2.0,
connect_maxwait=4.0,
connect_timeout=args.connect_timeout,
waiter_closed=waiter_closed,
)
except OSError as err:
log.error("%s:%d: %s", args.host, args.port, err)
raise

assert writer.protocol is not None
assert isinstance(writer.protocol, client_base.BaseClient)
Expand All @@ -740,7 +854,7 @@ def fingerprint_main() -> None:
"""Entry point for ``telnetlib3-fingerprint`` command."""
try:
asyncio.run(run_fingerprint_client())
except ConnectionError as err:
except OSError as err:
print(f"Error: {err}", file=sys.stderr)
sys.exit(1)

Expand Down
9 changes: 8 additions & 1 deletion telnetlib3/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

# local
from ._types import ShellCallback
from .telopt import theNULL, name_commands
from .telopt import DO, WILL, theNULL, name_commands
from .stream_reader import TelnetReader, TelnetReaderUnicode
from .stream_writer import TelnetWriter, TelnetWriterUnicode

Expand Down Expand Up @@ -272,6 +272,13 @@ def begin_negotiation(self) -> None:
self._check_later = asyncio.get_event_loop().call_soon(self._check_negotiation_timer)
self._tasks.append(self._check_later)

# Send proactive WILL/DO for any "always" options
if self.writer is not None:
for opt in self.writer.always_will:
self.writer.iac(WILL, opt)
for opt in self.writer.always_do:
self.writer.iac(DO, opt)

def encoding(self, outgoing: bool = False, incoming: bool = False) -> Union[str, bool]:
"""
Encoding that should be used for the direction indicated.
Expand Down
3 changes: 2 additions & 1 deletion telnetlib3/fingerprinting.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ProbeResult(TypedDict, total=False):
)

# Maximum files per protocol-fingerprint folder
FINGERPRINT_MAX_FILES = int(os.environ.get("TELNETLIB3_FINGERPRINT_MAX_FILES", "200"))
FINGERPRINT_MAX_FILES = int(os.environ.get("TELNETLIB3_FINGERPRINT_MAX_FILES", "1000"))

# Maximum number of unique fingerprint folders
FINGERPRINT_MAX_FINGERPRINTS = int(
Expand Down Expand Up @@ -292,6 +292,7 @@ class FingerprintingServer(FingerprintingTelnetServer, TelnetServer):
]

ALL_PROBE_OPTIONS = CORE_OPTIONS + MUD_OPTIONS + LEGACY_OPTIONS
QUICK_PROBE_OPTIONS = CORE_OPTIONS + MUD_OPTIONS

# All known options including extended, for display/name lookup only
_ALL_KNOWN_OPTIONS = ALL_PROBE_OPTIONS + EXTENDED_OPTIONS
Expand Down
Loading
Loading