From 1d9d00356e76c42731d01011308667bbd610a665 Mon Sep 17 00:00:00 2001 From: Charles Zablit Date: Fri, 31 Oct 2025 19:24:45 +0000 Subject: [PATCH] [update-checkout] refactor to use pathlib.Path instead of os.path --- .../tests/test_locked_repository.py | 93 ++++++++++--------- .../update_checkout/cli_arguments.py | 4 +- .../update_checkout/git_command.py | 7 +- .../update_checkout/runner_arguments.py | 3 +- .../update_checkout/update_checkout.py | 74 ++++++++------- 5 files changed, 99 insertions(+), 82 deletions(-) diff --git a/utils/update_checkout/tests/test_locked_repository.py b/utils/update_checkout/tests/test_locked_repository.py index 80773d2703ef2..c01c03be77313 100644 --- a/utils/update_checkout/tests/test_locked_repository.py +++ b/utils/update_checkout/tests/test_locked_repository.py @@ -1,10 +1,13 @@ +from pathlib import Path import unittest from unittest.mock import patch from update_checkout.update_checkout import UpdateArguments, _is_any_repository_locked +FAKE_PATH = Path("/fake_path") -def _update_arguments_with_fake_path(repo_name: str, path: str) -> UpdateArguments: + +def _update_arguments_with_fake_path(repo_name: str, path: Path) -> UpdateArguments: return UpdateArguments( repo_name=repo_name, source_root=path, @@ -23,87 +26,93 @@ def _update_arguments_with_fake_path(repo_name: str, path: str) -> UpdateArgumen class TestIsAnyRepositoryLocked(unittest.TestCase): - @patch("os.path.exists") - @patch("os.path.isdir") - @patch("os.listdir") - def test_repository_with_lock_file(self, mock_listdir, mock_isdir, mock_exists): + @patch("pathlib.Path.exists", autospec=True) + @patch("pathlib.Path.is_dir", autospec=True) + @patch("pathlib.Path.iterdir", autospec=True) + def test_repository_with_lock_file(self, mock_iterdir, mock_is_dir, mock_exists): pool_args = [ - _update_arguments_with_fake_path("repo1", "/fake_path"), - _update_arguments_with_fake_path("repo2", "/fake_path"), + _update_arguments_with_fake_path("repo1", FAKE_PATH), + _update_arguments_with_fake_path("repo2", FAKE_PATH), ] - def listdir_side_effect(path): - if "repo1" in path: - return ["index.lock", "config"] - elif "repo2" in path: - return ["HEAD", "config"] + def iterdir_side_effect(path: Path): + if "repo1" in path.as_posix(): + return [path.joinpath("index.lock"), path.joinpath("config")] + elif "repo2" in path.as_posix(): + return [path.joinpath("HEAD"), path.joinpath("config")] return [] mock_exists.return_value = True - mock_isdir.return_value = True - mock_listdir.side_effect = listdir_side_effect + mock_is_dir.return_value = True + mock_iterdir.side_effect = iterdir_side_effect result = _is_any_repository_locked(pool_args) self.assertEqual(result, {"repo1"}) - @patch("os.path.exists") - @patch("os.path.isdir") - @patch("os.listdir") - def test_repository_without_git_dir(self, mock_listdir, mock_isdir, mock_exists): + @patch("pathlib.Path.exists") + @patch("pathlib.Path.is_dir") + @patch("pathlib.Path.iterdir") + def test_repository_without_git_dir(self, mock_iterdir, mock_is_dir, mock_exists): pool_args = [ - _update_arguments_with_fake_path("repo1", "/fake_path"), + _update_arguments_with_fake_path("repo1", FAKE_PATH), ] mock_exists.return_value = False - mock_isdir.return_value = False - mock_listdir.return_value = [] + mock_is_dir.return_value = False + mock_iterdir.return_value = [] result = _is_any_repository_locked(pool_args) self.assertEqual(result, set()) - @patch("os.path.exists") - @patch("os.path.isdir") - @patch("os.listdir") - def test_repository_with_git_file(self, mock_listdir, mock_isdir, mock_exists): + @patch("pathlib.Path.exists") + @patch("pathlib.Path.is_dir") + @patch("pathlib.Path.iterdir") + def test_repository_with_git_file(self, mock_iterdir, mock_is_dir, mock_exists): pool_args = [ - _update_arguments_with_fake_path("repo1", "/fake_path"), + _update_arguments_with_fake_path("repo1", FAKE_PATH), ] mock_exists.return_value = True - mock_isdir.return_value = False - mock_listdir.return_value = [] + mock_is_dir.return_value = False + mock_iterdir.return_value = [] result = _is_any_repository_locked(pool_args) self.assertEqual(result, set()) - @patch("os.path.exists") - @patch("os.path.isdir") - @patch("os.listdir") + @patch("pathlib.Path.exists") + @patch("pathlib.Path.is_dir") + @patch("pathlib.Path.iterdir") def test_repository_with_multiple_lock_files( - self, mock_listdir, mock_isdir, mock_exists + self, mock_iterdir, mock_is_dir, mock_exists ): pool_args = [ - _update_arguments_with_fake_path("repo1", "/fake_path"), + _update_arguments_with_fake_path("repo1", FAKE_PATH), ] mock_exists.return_value = True - mock_isdir.return_value = True - mock_listdir.return_value = ["index.lock", "merge.lock", "HEAD"] + mock_is_dir.return_value = True + mock_iterdir.return_value = [ + FAKE_PATH.joinpath(x) for x in ("index.lock", "merge.lock", "HEAD") + ] result = _is_any_repository_locked(pool_args) self.assertEqual(result, {"repo1"}) - @patch("os.path.exists") - @patch("os.path.isdir") - @patch("os.listdir") - def test_repository_with_no_lock_files(self, mock_listdir, mock_isdir, mock_exists): + @patch("pathlib.Path.exists") + @patch("pathlib.Path.is_dir") + @patch("pathlib.Path.iterdir") + def test_repository_with_no_lock_files( + self, mock_iterdir, mock_is_dir, mock_exists + ): pool_args = [ - _update_arguments_with_fake_path("repo1", "/fake_path"), + _update_arguments_with_fake_path("repo1", FAKE_PATH), ] mock_exists.return_value = True - mock_isdir.return_value = True - mock_listdir.return_value = ["HEAD", "config", "logs"] + mock_is_dir.return_value = True + mock_iterdir.return_value = [ + FAKE_PATH.joinpath(x) for x in ("HEAD", "config", "logs") + ] result = _is_any_repository_locked(pool_args) self.assertEqual(result, set()) diff --git a/utils/update_checkout/update_checkout/cli_arguments.py b/utils/update_checkout/update_checkout/cli_arguments.py index cde61e8c56f2d..00fcff4592736 100644 --- a/utils/update_checkout/update_checkout/cli_arguments.py +++ b/utils/update_checkout/update_checkout/cli_arguments.py @@ -1,4 +1,5 @@ import argparse +from pathlib import Path from typing import List, Optional from build_swift.build_swift.constants import SWIFT_SOURCE_ROOT @@ -22,7 +23,7 @@ class CliArguments(argparse.Namespace): tag: Optional[str] match_timestamp: bool n_processes: int - source_root: str + source_root: Path use_submodules: bool verbose: bool @@ -139,6 +140,7 @@ def parse_args() -> "CliArguments": "--source-root", help="The root directory to checkout repositories", default=SWIFT_SOURCE_ROOT, + type=Path, ) parser.add_argument( "--use-submodules", diff --git a/utils/update_checkout/update_checkout/git_command.py b/utils/update_checkout/update_checkout/git_command.py index ede7ec5f08ef6..274b08ee530e2 100644 --- a/utils/update_checkout/update_checkout/git_command.py +++ b/utils/update_checkout/update_checkout/git_command.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import shlex import subprocess import sys @@ -44,7 +45,7 @@ def __str__(self): class Git: @staticmethod def run( - repo_path: str, + repo_path: Path, args: List[str], echo: bool = False, env: Optional[Dict[str, Any]] = None, @@ -77,9 +78,7 @@ def run( f"command `{command}` terminated with a non-zero exit " f"status {str(e.returncode)}, aborting" ) - raise GitException( - e.returncode, command, os.path.basename(repo_path), output - ) + raise GitException(e.returncode, command, repo_path.name, output) except OSError as e: if fatal: sys.exit( diff --git a/utils/update_checkout/update_checkout/runner_arguments.py b/utils/update_checkout/update_checkout/runner_arguments.py index f55c75c8c6f47..5aa65389bb465 100644 --- a/utils/update_checkout/update_checkout/runner_arguments.py +++ b/utils/update_checkout/update_checkout/runner_arguments.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from pathlib import Path from typing import Any, Dict, List, Optional from .cli_arguments import CliArguments @@ -14,7 +15,7 @@ class RunnerArguments: @dataclass class UpdateArguments(RunnerArguments): - source_root: str + source_root: Path config: Dict[str, Any] scheme_map: Any tag: Optional[str] diff --git a/utils/update_checkout/update_checkout/update_checkout.py b/utils/update_checkout/update_checkout/update_checkout.py index f2127a9a1ed63..c1ec0dc8efe5f 100755 --- a/utils/update_checkout/update_checkout/update_checkout.py +++ b/utils/update_checkout/update_checkout/update_checkout.py @@ -10,6 +10,7 @@ import json import os +from pathlib import Path import platform import re import sys @@ -23,8 +24,8 @@ from .parallel_runner import ParallelRunner -SCRIPT_FILE = os.path.abspath(__file__) -SCRIPT_DIR = os.path.dirname(SCRIPT_FILE) +SCRIPT_FILE = Path(__file__).absolute() +SCRIPT_DIR = SCRIPT_FILE.parent class SkippedReason: @@ -41,13 +42,13 @@ def print_skipped_repositories(skipped_reasons: List["SkippedReason"], step: str print(f" '{reason.repo_name}' - {reason.reason}") -def confirm_tag_in_repo(repo_path: str, tag: str, repo_name: str) -> Optional[str]: +def confirm_tag_in_repo(repo_path: Path, tag: str, repo_name: str) -> Optional[str]: """Confirm that a given tag exists in a git repository. This function assumes that the repository is already a current working directory before it's called. Args: - repo_path (str): path to the repository + repo_path (Path): path to the repository tag (str): tag to look up in the repository repo_name (str): name the repository for the look up, used for logging @@ -72,7 +73,7 @@ def confirm_tag_in_repo(repo_path: str, tag: str, repo_name: str) -> Optional[st def find_rev_by_timestamp( - repo_path: str, timestamp: str, repo_name: str, refspec: str + repo_path: Path, timestamp: str, repo_name: str, refspec: str ) -> str: refspec_exists = True try: @@ -90,7 +91,7 @@ def find_rev_by_timestamp( def get_branch_for_repo( - repo_path: str, + repo_path: Path, config: Dict[str, Any], repo_name: str, scheme_name: str, @@ -101,7 +102,7 @@ def get_branch_for_repo( return a branch found in the config for this repository name. Args: - repo_path (str): path to the repository + repo_path (Path): path to the repository config (Dict[str, Any]): deserialized `update-checkout-config.json` repo_name (str): name of the repository for checking out the branch scheme_name (str): name of the scheme to look up in the config @@ -148,14 +149,14 @@ def update_single_repository(pool_args: UpdateArguments): verbose = pool_args.verbose repo_name = pool_args.repo_name - repo_path = os.path.join(pool_args.source_root, repo_name) - if not os.path.isdir(repo_path) or os.path.islink(repo_path): + repo_path = pool_args.source_root.joinpath(repo_name) + if not repo_path.is_dir() or repo_path.is_symlink(): return try: - prefix = "[{0}] ".format(os.path.basename(repo_path)).ljust(40) + prefix = f"[{repo_path.name}] ".ljust(40) if verbose: - print(prefix + "Updating '" + repo_path + "'") + print(f"{prefix}Updating '{repo_path}'") cross_repo = False checkout_target = None @@ -305,13 +306,13 @@ def run_for_repo_and_each_submodule_rec(args: List[str]): raise -def get_timestamp_to_match(match_timestamp: bool, source_root: str): +def get_timestamp_to_match(match_timestamp: bool, source_root: Path): """Computes a timestamp of the last commit on the current branch in the `swift` repository. Args: match_timestamp (bool): value of `--match-timestamp` to check. - source_root (str): directory that contains sources of the Swift project. + source_root (Path): directory that contains sources of the Swift project. Returns: str | None: a timestamp of the last commit of `swift` repository if @@ -320,7 +321,7 @@ def get_timestamp_to_match(match_timestamp: bool, source_root: str): """ if not match_timestamp: return None - swift_repo_path = os.path.join(source_root, "swift") + swift_repo_path = source_root.joinpath("swift") output, _, _ = Git.run(swift_repo_path, ["log", "-1", "--format=%cI"], fatal=True) return output @@ -367,11 +368,11 @@ def _is_any_repository_locked(pool_args: List[UpdateArguments]) -> Set[str]: repos = [(x.source_root, x.repo_name) for x in pool_args] locked_repositories = set() for source_root, repo_name in repos: - dot_git_path = os.path.join(source_root, repo_name, ".git") - if not os.path.exists(dot_git_path) or not os.path.isdir(dot_git_path): + dot_git_path = source_root.joinpath(repo_name, ".git") + if not dot_git_path.exists() or not dot_git_path.is_dir(): continue - for file in os.listdir(dot_git_path): - if file.endswith(".lock"): + for file in dot_git_path.iterdir(): + if file.suffix == ".lock": locked_repositories.add(repo_name) return locked_repositories @@ -411,7 +412,7 @@ def update_all_repositories( # If the repository is not listed in the branch-scheme, skip it. if scheme_map and repo_name not in scheme_map: # If the repository exists locally, notify we are skipping it. - if os.path.isdir(os.path.join(args.source_root, repo_name)): + if args.source_root.joinpath(repo_name).is_dir(): skipped_repositories.append( SkippedReason( repo_name, @@ -498,12 +499,19 @@ def obtain_additional_swift_sources(pool_args: AdditionalSwiftSourcesArguments): echo=verbose, ) - repo_path = os.path.join(args.source_root, repo_name) + repo_path = args.source_root.joinpath(repo_name) if pool_args.scheme_name: - src_path = os.path.join(repo_path, ".git") + src_path = repo_path.joinpath(".git") Git.run( args.source_root, - ["--git-dir", src_path, "--work-tree", repo_path, "checkout", repo_branch], + [ + "--git-dir", + str(src_path), + "--work-tree", + str(repo_path), + "checkout", + repo_branch, + ], env=env, ) Git.run(repo_path, ["submodule", "update", "--recursive"], env=env) @@ -518,7 +526,7 @@ def obtain_all_additional_swift_sources( skipped_repositories = [] pool_args = [] for repo_name, repo_info in config["repos"].items(): - repo_path = os.path.join(args.source_root, repo_name) + repo_path = args.source_root.joinpath(repo_name) if repo_name in skip_repository_list: skipped_repositories.append(SkippedReason(repo_name, "requested by user")) continue @@ -535,7 +543,7 @@ def obtain_all_additional_swift_sources( break else: - repo_exists = os.path.isdir(os.path.join(repo_path, ".git")) + repo_exists = repo_path.joinpath(".git").is_dir() if repo_exists: skipped_repositories.append( @@ -631,8 +639,8 @@ def dump_repo_hashes( def repo_hashes(args: CliArguments, config: Dict[str, Any]) -> Dict[str, str]: repos = {} for repo_name, _ in sorted(config["repos"].items(), key=lambda x: x[0]): - repo_path = os.path.join(args.source_root, repo_name) - if os.path.exists(repo_path): + repo_path = args.source_root.joinpath(repo_name) + if repo_path.exists(): h, _, _ = Git.run(repo_path, ["rev-parse", "HEAD"], fatal=True) else: h = "skip" @@ -713,7 +721,7 @@ def validate_config(config: Dict[str, Any]): seen[alias] = scheme_name -def full_target_name(repo_path: str, repository: str, target: str) -> str: +def full_target_name(repo_path: Path, repository: str, target: str) -> str: tag, _, _ = Git.run(repo_path, ["tag", "-l", target], fatal=True) if tag == target: return tag @@ -781,10 +789,8 @@ def main() -> int: # Set the default config path if none are specified if not args.configs: - default_path = os.path.join( - SCRIPT_DIR, os.pardir, "update-checkout-config.json" - ) - args.configs.append(default_path) + default_path = SCRIPT_DIR.parent.joinpath("update-checkout-config.json") + args.configs.append(str(default_path)) config: Dict[str, Any] = {} for config_path in args.configs: with open(config_path) as f: @@ -822,8 +828,8 @@ def main() -> int: ) SkippedReason.print_skipped_repositories(skipped_repositories, "clone") - swift_repo_path = os.path.join(args.source_root, "swift") - if "swift" not in skip_repo_list and os.path.exists(swift_repo_path): + swift_repo_path = args.source_root.joinpath("swift") + if "swift" not in skip_repo_list and swift_repo_path.exists(): # Check if `swift` repo itself needs to switch to a cross-repo branch. branch_name, cross_repo = get_branch_for_repo( swift_repo_path, config, "swift", scheme_name, scheme_map, cross_repos_pr @@ -851,7 +857,7 @@ def main() -> int: return 0 # Quick check whether somebody is calling update in an empty directory - directory_contents = os.listdir(args.source_root) + directory_contents = args.source_root.iterdir() if not ( "cmark" in directory_contents or "llvm" in directory_contents