Skip to content

Commit

Permalink
format_filename replaces invalid bytes (#2553)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidism committed Jul 5, 2023
2 parents 0c8f301 + 43c2e7f commit af1e8d4
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 82 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Expand Up @@ -26,6 +26,8 @@ Unreleased
setting ``default=()``. :issue:`2246, 2292, 2295`
- Make the decorators returned by ``@argument()`` and ``@option()`` reusable when the
``cls`` parameter is used. :issue:`2294`
- Don't fail when writing filenames to streams with strict errors. Replace invalid
bytes with the replacement character (````). :issue:`2395`


Version 8.1.3
Expand Down
12 changes: 3 additions & 9 deletions docs/arguments.rst
Expand Up @@ -124,15 +124,9 @@ File Path Arguments

In the previous example, the files were opened immediately. But what if
we just want the filename? The naïve way is to use the default string
argument type. However, remember that Click is Unicode-based, so the string
will always be a Unicode value. Unfortunately, filenames can be Unicode or
bytes depending on which operating system is being used. As such, the type
is insufficient.

Instead, you should be using the :class:`Path` type, which automatically
handles this ambiguity. Not only will it return either bytes or Unicode
depending on what makes more sense, but it will also be able to do some
basic checks for you such as existence checks.
argument type. The :class:`Path` type has several checks available which raise nice
errors if they fail, such as existence. Filenames in these error messages are formatted
with :func:`format_filename`, so any undecodable bytes will be printed nicely.

Example:

Expand Down
9 changes: 3 additions & 6 deletions src/click/_compat.py
Expand Up @@ -17,10 +17,6 @@
_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")


def get_filesystem_encoding() -> str:
return sys.getfilesystemencoding() or sys.getdefaultencoding()


def _make_text_stream(
stream: t.BinaryIO,
encoding: t.Optional[str],
Expand Down Expand Up @@ -380,13 +376,14 @@ def _wrap_io_open(


def open_stream(
filename: str,
filename: "t.Union[str, os.PathLike[str]]",
mode: str = "r",
encoding: t.Optional[str] = None,
errors: t.Optional[str] = "strict",
atomic: bool = False,
) -> t.Tuple[t.IO[t.Any], bool]:
binary = "b" in mode
filename = os.fspath(filename)

# Standard streams first. These are simple because they ignore the
# atomic flag. Use fsdecode to handle Path("-").
Expand Down Expand Up @@ -564,7 +561,7 @@ def _safe_write(s):
else:

def _get_argv_encoding() -> str:
return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding()
return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()

def _get_windows_console_stream(
f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
Expand Down
4 changes: 2 additions & 2 deletions src/click/exceptions.py
@@ -1,10 +1,10 @@
import os
import typing as t
from gettext import gettext as _
from gettext import ngettext

from ._compat import get_text_stderr
from .utils import echo
from .utils import format_filename

if t.TYPE_CHECKING:
from .core import Command
Expand Down Expand Up @@ -262,7 +262,7 @@ def __init__(self, filename: str, hint: t.Optional[str] = None) -> None:
hint = _("unknown error")

super().__init__(hint)
self.ui_filename: str = os.fsdecode(filename)
self.ui_filename: str = format_filename(filename)
self.filename = filename

def format_message(self) -> str:
Expand Down
63 changes: 39 additions & 24 deletions src/click/types.py
@@ -1,14 +1,15 @@
import os
import stat
import sys
import typing as t
from datetime import datetime
from gettext import gettext as _
from gettext import ngettext

from ._compat import _get_argv_encoding
from ._compat import get_filesystem_encoding
from ._compat import open_stream
from .exceptions import BadParameter
from .utils import format_filename
from .utils import LazyFile
from .utils import safecall

Expand Down Expand Up @@ -207,7 +208,7 @@ def convert(
try:
value = value.decode(enc)
except UnicodeError:
fs_enc = get_filesystem_encoding()
fs_enc = sys.getfilesystemencoding()
if fs_enc != enc:
try:
value = value.decode(fs_enc)
Expand Down Expand Up @@ -687,22 +688,27 @@ def to_info_dict(self) -> t.Dict[str, t.Any]:
info_dict.update(mode=self.mode, encoding=self.encoding)
return info_dict

def resolve_lazy_flag(self, value: t.Any) -> bool:
def resolve_lazy_flag(self, value: "t.Union[str, os.PathLike[str]]") -> bool:
if self.lazy is not None:
return self.lazy
if value == "-":
if os.fspath(value) == "-":
return False
elif "w" in self.mode:
return True
return False

def convert(
self, value: t.Any, param: t.Optional["Parameter"], ctx: t.Optional["Context"]
) -> t.Any:
try:
if hasattr(value, "read") or hasattr(value, "write"):
return value
self,
value: t.Union[str, "os.PathLike[str]", t.IO[t.Any]],
param: t.Optional["Parameter"],
ctx: t.Optional["Context"],
) -> t.IO[t.Any]:
if _is_file_like(value):
return value

value = t.cast("t.Union[str, os.PathLike[str]]", value)

try:
lazy = self.resolve_lazy_flag(value)

if lazy:
Expand Down Expand Up @@ -732,7 +738,7 @@ def convert(

return f
except OSError as e: # noqa: B014
self.fail(f"'{os.fsdecode(value)}': {e.strerror}", param, ctx)
self.fail(f"'{format_filename(value)}': {e.strerror}", param, ctx)

def shell_complete(
self, ctx: "Context", param: "Parameter", incomplete: str
Expand All @@ -751,6 +757,10 @@ def shell_complete(
return [CompletionItem(incomplete, type="file")]


def _is_file_like(value: t.Any) -> "te.TypeGuard[t.IO[t.Any]]":
return hasattr(value, "read") or hasattr(value, "write")


class Path(ParamType):
"""The ``Path`` type is similar to the :class:`File` type, but
returns the filename instead of an open file. Various checks can be
Expand Down Expand Up @@ -827,20 +837,25 @@ def to_info_dict(self) -> t.Dict[str, t.Any]:
)
return info_dict

def coerce_path_result(self, rv: t.Any) -> t.Any:
if self.type is not None and not isinstance(rv, self.type):
def coerce_path_result(
self, value: "t.Union[str, os.PathLike[str]]"
) -> "t.Union[str, bytes, os.PathLike[str]]":
if self.type is not None and not isinstance(value, self.type):
if self.type is str:
rv = os.fsdecode(rv)
return os.fsdecode(value)
elif self.type is bytes:
rv = os.fsencode(rv)
return os.fsencode(value)
else:
rv = self.type(rv)
return t.cast("os.PathLike[str]", self.type(value))

return rv
return value

def convert(
self, value: t.Any, param: t.Optional["Parameter"], ctx: t.Optional["Context"]
) -> t.Any:
self,
value: "t.Union[str, os.PathLike[str]]",
param: t.Optional["Parameter"],
ctx: t.Optional["Context"],
) -> "t.Union[str, bytes, os.PathLike[str]]":
rv = value

is_dash = self.file_okay and self.allow_dash and rv in (b"-", "-")
Expand All @@ -860,7 +875,7 @@ def convert(
return self.coerce_path_result(rv)
self.fail(
_("{name} {filename!r} does not exist.").format(
name=self.name.title(), filename=os.fsdecode(value)
name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
Expand All @@ -869,15 +884,15 @@ def convert(
if not self.file_okay and stat.S_ISREG(st.st_mode):
self.fail(
_("{name} {filename!r} is a file.").format(
name=self.name.title(), filename=os.fsdecode(value)
name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
)
if not self.dir_okay and stat.S_ISDIR(st.st_mode):
self.fail(
_("{name} '{filename}' is a directory.").format(
name=self.name.title(), filename=os.fsdecode(value)
name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
Expand All @@ -886,7 +901,7 @@ def convert(
if self.readable and not os.access(rv, os.R_OK):
self.fail(
_("{name} {filename!r} is not readable.").format(
name=self.name.title(), filename=os.fsdecode(value)
name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
Expand All @@ -895,7 +910,7 @@ def convert(
if self.writable and not os.access(rv, os.W_OK):
self.fail(
_("{name} {filename!r} is not writable.").format(
name=self.name.title(), filename=os.fsdecode(value)
name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
Expand All @@ -904,7 +919,7 @@ def convert(
if self.executable and not os.access(value, os.X_OK):
self.fail(
_("{name} {filename!r} is not executable.").format(
name=self.name.title(), filename=os.fsdecode(value)
name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
Expand Down
47 changes: 34 additions & 13 deletions src/click/utils.py
Expand Up @@ -11,7 +11,6 @@
from ._compat import _find_binary_writer
from ._compat import auto_wrap_for_ansi
from ._compat import binary_streams
from ._compat import get_filesystem_encoding
from ._compat import open_stream
from ._compat import should_strip_ansi
from ._compat import strip_ansi
Expand Down Expand Up @@ -48,7 +47,7 @@ def make_str(value: t.Any) -> str:
"""Converts a value into a valid string."""
if isinstance(value, bytes):
try:
return value.decode(get_filesystem_encoding())
return value.decode(sys.getfilesystemencoding())
except UnicodeError:
return value.decode("utf-8", "replace")
return str(value)
Expand Down Expand Up @@ -113,21 +112,21 @@ class LazyFile:

def __init__(
self,
filename: str,
filename: t.Union[str, "os.PathLike[str]"],
mode: str = "r",
encoding: t.Optional[str] = None,
errors: t.Optional[str] = "strict",
atomic: bool = False,
):
self.name = filename
self.name: str = os.fspath(filename)
self.mode = mode
self.encoding = encoding
self.errors = errors
self.atomic = atomic
self._f: t.Optional[t.IO[t.Any]]
self.should_close: bool

if filename == "-":
if self.name == "-":
self._f, self.should_close = open_stream(filename, mode, encoding, errors)
else:
if "r" in mode:
Expand All @@ -144,7 +143,7 @@ def __getattr__(self, name: str) -> t.Any:
def __repr__(self) -> str:
if self._f is not None:
return repr(self._f)
return f"<unopened file '{self.name}' {self.mode}>"
return f"<unopened file '{format_filename(self.name)}' {self.mode}>"

def open(self) -> t.IO[t.Any]:
"""Opens the file if it's not yet open. This call might fail with
Expand Down Expand Up @@ -398,13 +397,26 @@ def open_file(


def format_filename(
filename: t.Union[str, bytes, "os.PathLike[t.AnyStr]"], shorten: bool = False
filename: "t.Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]",
shorten: bool = False,
) -> str:
"""Formats a filename for user display. The main purpose of this
function is to ensure that the filename can be displayed at all. This
will decode the filename to unicode if necessary in a way that it will
not fail. Optionally, it can shorten the filename to not include the
full path to the filename.
"""Format a filename as a string for display. Ensures the filename can be
displayed by replacing any invalid bytes or surrogate escapes in the name
with the replacement character ``�``.
Invalid bytes or surrogate escapes will raise an error when written to a
stream with ``errors="strict". This will typically happen with ``stdout``
when the locale is something like ``en_GB.UTF-8``.
Many scenarios *are* safe to write surrogates though, due to PEP 538 and
PEP 540, including:
- Writing to ``stderr``, which uses ``errors="backslashreplace"``.
- The system has ``LANG=C.UTF-8``, ``C``, or ``POSIX``. Python opens
stdout and stderr with ``errors="surrogateescape"``.
- None of ``LANG/LC_*`` are set. Python assumes ``LANG=C.UTF-8``.
- Python is started in UTF-8 mode with ``PYTHONUTF8=1`` or ``-X utf8``.
Python opens stdout and stderr with ``errors="surrogateescape"``.
:param filename: formats a filename for UI display. This will also convert
the filename into unicode without failing.
Expand All @@ -413,8 +425,17 @@ def format_filename(
"""
if shorten:
filename = os.path.basename(filename)
else:
filename = os.fspath(filename)

if isinstance(filename, bytes):
filename = filename.decode(sys.getfilesystemencoding(), "replace")
else:
filename = filename.encode("utf-8", "surrogateescape").decode(
"utf-8", "replace"
)

return os.fsdecode(filename)
return filename


def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) -> str:
Expand Down
19 changes: 0 additions & 19 deletions tests/conftest.py
@@ -1,6 +1,3 @@
import os
import tempfile

import pytest

from click.testing import CliRunner
Expand All @@ -9,19 +6,3 @@
@pytest.fixture(scope="function")
def runner(request):
return CliRunner()


def _check_symlinks_supported():
with tempfile.TemporaryDirectory(prefix="click-pytest-") as tempdir:
target = os.path.join(tempdir, "target")
open(target, "w").close()
link = os.path.join(tempdir, "link")

try:
os.symlink(target, link)
return True
except OSError:
return False


symlinks_supported = _check_symlinks_supported()

0 comments on commit af1e8d4

Please sign in to comment.