diff --git a/CHANGES.rst b/CHANGES.rst index 14cc09da0..2c812d0b6 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -26,6 +26,8 @@ Unreleased setting ``default=()``. :issue:`2246, 2292, 2295` - Make the decorators returned by ``@argument()`` and ``@option()`` reusable when the ``cls`` parameter is used. :issue:`2294` +- Don't fail when writing filenames to streams with strict errors. Replace invalid + bytes with the replacement character (``�``). :issue:`2395` Version 8.1.3 diff --git a/docs/arguments.rst b/docs/arguments.rst index e5765c940..60b53c558 100644 --- a/docs/arguments.rst +++ b/docs/arguments.rst @@ -124,15 +124,9 @@ File Path Arguments In the previous example, the files were opened immediately. But what if we just want the filename? The naïve way is to use the default string -argument type. However, remember that Click is Unicode-based, so the string -will always be a Unicode value. Unfortunately, filenames can be Unicode or -bytes depending on which operating system is being used. As such, the type -is insufficient. - -Instead, you should be using the :class:`Path` type, which automatically -handles this ambiguity. Not only will it return either bytes or Unicode -depending on what makes more sense, but it will also be able to do some -basic checks for you such as existence checks. +argument type. The :class:`Path` type has several checks available which raise nice +errors if they fail, such as existence. Filenames in these error messages are formatted +with :func:`format_filename`, so any undecodable bytes will be printed nicely. Example: diff --git a/src/click/_compat.py b/src/click/_compat.py index 1273ddfdf..fa5a47f00 100644 --- a/src/click/_compat.py +++ b/src/click/_compat.py @@ -17,10 +17,6 @@ _ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]") -def get_filesystem_encoding() -> str: - return sys.getfilesystemencoding() or sys.getdefaultencoding() - - def _make_text_stream( stream: t.BinaryIO, encoding: t.Optional[str], @@ -380,13 +376,14 @@ def _wrap_io_open( def open_stream( - filename: str, + filename: "t.Union[str, os.PathLike[str]]", mode: str = "r", encoding: t.Optional[str] = None, errors: t.Optional[str] = "strict", atomic: bool = False, ) -> t.Tuple[t.IO[t.Any], bool]: binary = "b" in mode + filename = os.fspath(filename) # Standard streams first. These are simple because they ignore the # atomic flag. Use fsdecode to handle Path("-"). @@ -564,7 +561,7 @@ def _safe_write(s): else: def _get_argv_encoding() -> str: - return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding() + return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding() def _get_windows_console_stream( f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str] diff --git a/src/click/exceptions.py b/src/click/exceptions.py index 76a06ff72..01dbdd0ed 100644 --- a/src/click/exceptions.py +++ b/src/click/exceptions.py @@ -1,10 +1,10 @@ -import os import typing as t from gettext import gettext as _ from gettext import ngettext from ._compat import get_text_stderr from .utils import echo +from .utils import format_filename if t.TYPE_CHECKING: from .core import Command @@ -262,11 +262,11 @@ def __init__(self, filename: str, hint: t.Optional[str] = None) -> None: hint = _("unknown error") super().__init__(hint) - self.ui_filename: str = os.fsdecode(filename) + self.ui_filename: str = format_filename(filename) self.filename = filename def format_message(self) -> str: - return _("Could not open file {filename!r}: {message}").format( + return _("Could not open file '{filename}': {message}").format( filename=self.ui_filename, message=self.message ) diff --git a/src/click/types.py b/src/click/types.py index c684861a2..2b1d1797f 100644 --- a/src/click/types.py +++ b/src/click/types.py @@ -1,14 +1,15 @@ import os import stat +import sys import typing as t from datetime import datetime from gettext import gettext as _ from gettext import ngettext from ._compat import _get_argv_encoding -from ._compat import get_filesystem_encoding from ._compat import open_stream from .exceptions import BadParameter +from .utils import format_filename from .utils import LazyFile from .utils import safecall @@ -207,7 +208,7 @@ def convert( try: value = value.decode(enc) except UnicodeError: - fs_enc = get_filesystem_encoding() + fs_enc = sys.getfilesystemencoding() if fs_enc != enc: try: value = value.decode(fs_enc) @@ -687,22 +688,27 @@ def to_info_dict(self) -> t.Dict[str, t.Any]: info_dict.update(mode=self.mode, encoding=self.encoding) return info_dict - def resolve_lazy_flag(self, value: t.Any) -> bool: + def resolve_lazy_flag(self, value: "t.Union[str, os.PathLike[str]]") -> bool: if self.lazy is not None: return self.lazy - if value == "-": + if os.fspath(value) == "-": return False elif "w" in self.mode: return True return False def convert( - self, value: t.Any, param: t.Optional["Parameter"], ctx: t.Optional["Context"] - ) -> t.Any: - try: - if hasattr(value, "read") or hasattr(value, "write"): - return value + self, + value: t.Union[str, "os.PathLike[str]", t.IO[t.Any]], + param: t.Optional["Parameter"], + ctx: t.Optional["Context"], + ) -> t.IO[t.Any]: + if _is_file_like(value): + return value + + value = t.cast("t.Union[str, os.PathLike[str]]", value) + try: lazy = self.resolve_lazy_flag(value) if lazy: @@ -732,7 +738,7 @@ def convert( return f except OSError as e: # noqa: B014 - self.fail(f"'{os.fsdecode(value)}': {e.strerror}", param, ctx) + self.fail(f"'{format_filename(value)}': {e.strerror}", param, ctx) def shell_complete( self, ctx: "Context", param: "Parameter", incomplete: str @@ -751,6 +757,10 @@ def shell_complete( return [CompletionItem(incomplete, type="file")] +def _is_file_like(value: t.Any) -> "te.TypeGuard[t.IO[t.Any]]": + return hasattr(value, "read") or hasattr(value, "write") + + class Path(ParamType): """The ``Path`` type is similar to the :class:`File` type, but returns the filename instead of an open file. Various checks can be @@ -827,20 +837,25 @@ def to_info_dict(self) -> t.Dict[str, t.Any]: ) return info_dict - def coerce_path_result(self, rv: t.Any) -> t.Any: - if self.type is not None and not isinstance(rv, self.type): + def coerce_path_result( + self, value: "t.Union[str, os.PathLike[str]]" + ) -> "t.Union[str, bytes, os.PathLike[str]]": + if self.type is not None and not isinstance(value, self.type): if self.type is str: - rv = os.fsdecode(rv) + return os.fsdecode(value) elif self.type is bytes: - rv = os.fsencode(rv) + return os.fsencode(value) else: - rv = self.type(rv) + return t.cast("os.PathLike[str]", self.type(value)) - return rv + return value def convert( - self, value: t.Any, param: t.Optional["Parameter"], ctx: t.Optional["Context"] - ) -> t.Any: + self, + value: "t.Union[str, os.PathLike[str]]", + param: t.Optional["Parameter"], + ctx: t.Optional["Context"], + ) -> "t.Union[str, bytes, os.PathLike[str]]": rv = value is_dash = self.file_okay and self.allow_dash and rv in (b"-", "-") @@ -860,7 +875,7 @@ def convert( return self.coerce_path_result(rv) self.fail( _("{name} {filename!r} does not exist.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, @@ -869,7 +884,7 @@ def convert( if not self.file_okay and stat.S_ISREG(st.st_mode): self.fail( _("{name} {filename!r} is a file.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, @@ -877,7 +892,7 @@ def convert( if not self.dir_okay and stat.S_ISDIR(st.st_mode): self.fail( _("{name} '{filename}' is a directory.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, @@ -886,7 +901,7 @@ def convert( if self.readable and not os.access(rv, os.R_OK): self.fail( _("{name} {filename!r} is not readable.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, @@ -895,7 +910,7 @@ def convert( if self.writable and not os.access(rv, os.W_OK): self.fail( _("{name} {filename!r} is not writable.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, @@ -904,7 +919,7 @@ def convert( if self.executable and not os.access(value, os.X_OK): self.fail( _("{name} {filename!r} is not executable.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, diff --git a/src/click/utils.py b/src/click/utils.py index 6e74ace10..3b5d286b6 100644 --- a/src/click/utils.py +++ b/src/click/utils.py @@ -11,7 +11,6 @@ from ._compat import _find_binary_writer from ._compat import auto_wrap_for_ansi from ._compat import binary_streams -from ._compat import get_filesystem_encoding from ._compat import open_stream from ._compat import should_strip_ansi from ._compat import strip_ansi @@ -48,7 +47,7 @@ def make_str(value: t.Any) -> str: """Converts a value into a valid string.""" if isinstance(value, bytes): try: - return value.decode(get_filesystem_encoding()) + return value.decode(sys.getfilesystemencoding()) except UnicodeError: return value.decode("utf-8", "replace") return str(value) @@ -113,13 +112,13 @@ class LazyFile: def __init__( self, - filename: str, + filename: t.Union[str, "os.PathLike[str]"], mode: str = "r", encoding: t.Optional[str] = None, errors: t.Optional[str] = "strict", atomic: bool = False, ): - self.name = filename + self.name: str = os.fspath(filename) self.mode = mode self.encoding = encoding self.errors = errors @@ -127,7 +126,7 @@ def __init__( self._f: t.Optional[t.IO[t.Any]] self.should_close: bool - if filename == "-": + if self.name == "-": self._f, self.should_close = open_stream(filename, mode, encoding, errors) else: if "r" in mode: @@ -144,7 +143,7 @@ def __getattr__(self, name: str) -> t.Any: def __repr__(self) -> str: if self._f is not None: return repr(self._f) - return f"" + return f"" def open(self) -> t.IO[t.Any]: """Opens the file if it's not yet open. This call might fail with @@ -398,13 +397,26 @@ def open_file( def format_filename( - filename: t.Union[str, bytes, "os.PathLike[t.AnyStr]"], shorten: bool = False + filename: "t.Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]", + shorten: bool = False, ) -> str: - """Formats a filename for user display. The main purpose of this - function is to ensure that the filename can be displayed at all. This - will decode the filename to unicode if necessary in a way that it will - not fail. Optionally, it can shorten the filename to not include the - full path to the filename. + """Format a filename as a string for display. Ensures the filename can be + displayed by replacing any invalid bytes or surrogate escapes in the name + with the replacement character ``�``. + + Invalid bytes or surrogate escapes will raise an error when written to a + stream with ``errors="strict". This will typically happen with ``stdout`` + when the locale is something like ``en_GB.UTF-8``. + + Many scenarios *are* safe to write surrogates though, due to PEP 538 and + PEP 540, including: + + - Writing to ``stderr``, which uses ``errors="backslashreplace"``. + - The system has ``LANG=C.UTF-8``, ``C``, or ``POSIX``. Python opens + stdout and stderr with ``errors="surrogateescape"``. + - None of ``LANG/LC_*`` are set. Python assumes ``LANG=C.UTF-8``. + - Python is started in UTF-8 mode with ``PYTHONUTF8=1`` or ``-X utf8``. + Python opens stdout and stderr with ``errors="surrogateescape"``. :param filename: formats a filename for UI display. This will also convert the filename into unicode without failing. @@ -413,8 +425,17 @@ def format_filename( """ if shorten: filename = os.path.basename(filename) + else: + filename = os.fspath(filename) + + if isinstance(filename, bytes): + filename = filename.decode(sys.getfilesystemencoding(), "replace") + else: + filename = filename.encode("utf-8", "surrogateescape").decode( + "utf-8", "replace" + ) - return os.fsdecode(filename) + return filename def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) -> str: diff --git a/tests/conftest.py b/tests/conftest.py index a71db7d58..9440804fe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,3 @@ -import os -import tempfile - import pytest from click.testing import CliRunner @@ -9,19 +6,3 @@ @pytest.fixture(scope="function") def runner(request): return CliRunner() - - -def _check_symlinks_supported(): - with tempfile.TemporaryDirectory(prefix="click-pytest-") as tempdir: - target = os.path.join(tempdir, "target") - open(target, "w").close() - link = os.path.join(tempdir, "link") - - try: - os.symlink(target, link) - return True - except OSError: - return False - - -symlinks_supported = _check_symlinks_supported() diff --git a/tests/test_types.py b/tests/test_types.py index df44d8ac9..0a2508882 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -1,10 +1,11 @@ import os.path import pathlib +import tempfile import pytest -from conftest import symlinks_supported import click +from click import FileError @pytest.mark.parametrize( @@ -104,12 +105,25 @@ def test_path_type(runner, cls, expect): assert result.return_value == expect +def _symlinks_supported(): + with tempfile.TemporaryDirectory(prefix="click-pytest-") as tempdir: + target = os.path.join(tempdir, "target") + open(target, "w").close() + link = os.path.join(tempdir, "link") + + try: + os.symlink(target, link) + return True + except OSError: + return False + + @pytest.mark.skipif( - not symlinks_supported, reason="The current OS or FS doesn't support symlinks." + not _symlinks_supported(), reason="The current OS or FS doesn't support symlinks." ) def test_path_resolve_symlink(tmp_path, runner): test_file = tmp_path / "file" - test_file_str = os.fsdecode(test_file) + test_file_str = os.fspath(test_file) test_file.write_text("") path_type = click.Path(resolve_path=True) @@ -121,10 +135,100 @@ def test_path_resolve_symlink(tmp_path, runner): abs_link = test_dir / "abs" abs_link.symlink_to(test_file) - abs_rv = path_type.convert(os.fsdecode(abs_link), param, ctx) + abs_rv = path_type.convert(os.fspath(abs_link), param, ctx) assert abs_rv == test_file_str rel_link = test_dir / "rel" rel_link.symlink_to(pathlib.Path("..") / "file") - rel_rv = path_type.convert(os.fsdecode(rel_link), param, ctx) + rel_rv = path_type.convert(os.fspath(rel_link), param, ctx) assert rel_rv == test_file_str + + +def _non_utf8_filenames_supported(): + with tempfile.TemporaryDirectory(prefix="click-pytest-") as tempdir: + try: + f = open(os.path.join(tempdir, "\udcff"), "w") + except OSError: + return False + + f.close() + return True + + +@pytest.mark.skipif( + not _non_utf8_filenames_supported(), + reason="The current OS or FS doesn't support non-UTF-8 filenames.", +) +def test_path_surrogates(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + type = click.Path(exists=True) + path = pathlib.Path("\udcff") + + with pytest.raises(click.BadParameter, match="'�' does not exist"): + type.convert(path, None, None) + + type = click.Path(file_okay=False) + path.touch() + + with pytest.raises(click.BadParameter, match="'�' is a file"): + type.convert(path, None, None) + + path.unlink() + type = click.Path(dir_okay=False) + path.mkdir() + + with pytest.raises(click.BadParameter, match="'�' is a directory"): + type.convert(path, None, None) + + path.rmdir() + + def no_access(*args, **kwargs): + """Test environments may be running as root, so we have to fake the result of + the access tests that use os.access + """ + p = args[0] + assert p == path, f"unexpected os.access call on file not under test: {p!r}" + return False + + path.touch() + type = click.Path(readable=True) + + with pytest.raises(click.BadParameter, match="'�' is not readable"): + with monkeypatch.context() as m: + m.setattr(os, "access", no_access) + type.convert(path, None, None) + + type = click.Path(readable=False, writable=True) + + with pytest.raises(click.BadParameter, match="'�' is not writable"): + with monkeypatch.context() as m: + m.setattr(os, "access", no_access) + type.convert(path, None, None) + + type = click.Path(readable=False, executable=True) + + with pytest.raises(click.BadParameter, match="'�' is not executable"): + with monkeypatch.context() as m: + m.setattr(os, "access", no_access) + type.convert(path, None, None) + + path.unlink() + + +@pytest.mark.parametrize( + "type", + [ + click.File(mode="r"), + click.File(mode="r", lazy=True), + ], +) +def test_file_surrogates(type, tmp_path): + path = tmp_path / "\udcff" + + with pytest.raises(click.BadParameter, match="�': No such file or directory"): + type.convert(path, None, None) + + +def test_file_error_surrogates(): + message = FileError(filename="\udcff").format_message() + assert message == "Could not open file '�': unknown error" diff --git a/tests/test_utils.py b/tests/test_utils.py index 02a6101ff..63668154b 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -98,10 +98,7 @@ def test_filename_formatting(): assert click.format_filename(b"/x/foo.txt") == "/x/foo.txt" assert click.format_filename("/x/foo.txt") == "/x/foo.txt" assert click.format_filename("/x/foo.txt", shorten=True) == "foo.txt" - - # filesystem encoding on windows permits this. - if not WIN: - assert click.format_filename(b"/x/foo\xff.txt", shorten=True) == "foo\udcff.txt" + assert click.format_filename(b"/x/\xff.txt", shorten=True) == "�.txt" def test_prompts(runner):