diff --git a/platform.py b/platform.py index 2ffe735a2..123e147c9 100644 --- a/platform.py +++ b/platform.py @@ -34,7 +34,8 @@ import sys import shutil import logging -from typing import Optional, Dict, List, Any +from pathlib import Path +from typing import Optional, Dict, List, Any, Union from platformio.compat import IS_WINDOWS from platformio.public import PlatformBase, to_unix_path @@ -95,8 +96,8 @@ raise SystemExit(1) # Set IDF_TOOLS_PATH to Pio core_dir -PROJECT_CORE_DIR=ProjectConfig.get_instance().get("platformio", "core_dir") -IDF_TOOLS_PATH=os.path.join(PROJECT_CORE_DIR) +PROJECT_CORE_DIR = ProjectConfig.get_instance().get("platformio", "core_dir") +IDF_TOOLS_PATH = PROJECT_CORE_DIR os.environ["IDF_TOOLS_PATH"] = IDF_TOOLS_PATH os.environ['IDF_PATH'] = "" @@ -130,51 +131,62 @@ def wrapper(*args, **kwargs): @safe_file_operation -def safe_remove_file(path: str) -> bool: - """Safely remove a file with error handling.""" - if os.path.exists(path) and os.path.isfile(path): - os.remove(path) +def safe_remove_file(path: Union[str, Path]) -> bool: + """Safely remove a file with error handling using pathlib.""" + path = Path(path) + if path.is_file() or path.is_symlink(): + path.unlink() logger.debug(f"File removed: {path}") return True @safe_file_operation -def safe_remove_directory(path: str) -> bool: - """Safely remove directories with error handling.""" - if os.path.exists(path) and os.path.isdir(path): +def safe_remove_directory(path: Union[str, Path]) -> bool: + """Safely remove directories with error handling using pathlib.""" + path = Path(path) + if not path.exists(): + return True + if path.is_symlink(): + path.unlink() + elif path.is_dir(): shutil.rmtree(path) logger.debug(f"Directory removed: {path}") return True @safe_file_operation -def safe_remove_directory_pattern(base_path: str, pattern: str) -> bool: - """Safely remove directories matching a pattern with error handling.""" - if not os.path.exists(base_path): +def safe_remove_directory_pattern(base_path: Union[str, Path], pattern: str) -> bool: + """Safely remove directories matching a pattern with error handling using pathlib.""" + base_path = Path(base_path) + if not base_path.exists(): return True # Find all directories matching the pattern in the base directory - for item in os.listdir(base_path): - item_path = os.path.join(base_path, item) - if os.path.isdir(item_path) and fnmatch.fnmatch(item, pattern): - shutil.rmtree(item_path) - logger.debug(f"Directory removed: {item_path}") + for item in base_path.rglob("*"): + if fnmatch.fnmatch(item.name, pattern): + if item.is_symlink(): + item.unlink() + elif item.is_dir(): + shutil.rmtree(item) + logger.debug(f"Directory removed: {item}") return True @safe_file_operation -def safe_copy_file(src: str, dst: str) -> bool: - """Safely copy files with error handling.""" - os.makedirs(os.path.dirname(dst), exist_ok=True) - shutil.copyfile(src, dst) +def safe_copy_file(src: Union[str, Path], dst: Union[str, Path]) -> bool: + """Safely copy files with error handling using pathlib.""" + src, dst = Path(src), Path(dst) + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src, dst) logger.debug(f"File copied: {src} -> {dst}") return True @safe_file_operation -def safe_copy_directory(src: str, dst: str) -> bool: - """Safely copy directories with error handling.""" - os.makedirs(os.path.dirname(dst), exist_ok=True) - shutil.copytree(src, dst, dirs_exist_ok=True) +def safe_copy_directory(src: Union[str, Path], dst: Union[str, Path]) -> bool: + """Safely copy directories with error handling using pathlib.""" + src, dst = Path(src), Path(dst) + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copytree(src, dst, dirs_exist_ok=True, copy_function=shutil.copy2, symlinks=True) logger.debug(f"Directory copied: {src} -> {dst}") return True @@ -190,11 +202,11 @@ def __init__(self, *args, **kwargs): self._mcu_config_cache = {} @property - def packages_dir(self) -> str: + def packages_dir(self) -> Path: """Get cached packages directory path.""" if self._packages_dir is None: config = ProjectConfig.get_instance() - self._packages_dir = config.get("platformio", "packages_dir") + self._packages_dir = Path(config.get("platformio", "packages_dir")) return self._packages_dir def _check_tl_install_version(self) -> bool: @@ -213,10 +225,10 @@ def _check_tl_install_version(self) -> bool: return True # Check if tool is already installed - tl_install_path = os.path.join(self.packages_dir, tl_install_name) - package_json_path = os.path.join(tl_install_path, "package.json") + tl_install_path = self.packages_dir / tl_install_name + package_json_path = tl_install_path / "package.json" - if not os.path.exists(package_json_path): + if not package_json_path.exists(): logger.info(f"{tl_install_name} not installed, installing version {required_version}") return self._install_tl_install(required_version) @@ -300,16 +312,16 @@ def _install_tl_install(self, version: str) -> bool: Returns: bool: True if installation successful, False otherwise """ - tl_install_path = os.path.join(self.packages_dir, tl_install_name) - old_tl_install_path = os.path.join(self.packages_dir, "tl-install") + tl_install_path = Path(self.packages_dir) / tl_install_name + old_tl_install_path = Path(self.packages_dir) / "tl-install" try: - old_tl_install_exists = os.path.exists(old_tl_install_path) + old_tl_install_exists = old_tl_install_path.exists() if old_tl_install_exists: # remove outdated tl-install safe_remove_directory(old_tl_install_path) - if os.path.exists(tl_install_path): + if tl_install_path.exists(): logger.info(f"Removing old {tl_install_name} installation") safe_remove_directory(tl_install_path) @@ -318,10 +330,10 @@ def _install_tl_install(self, version: str) -> bool: self.packages[tl_install_name]["version"] = version pm.install(version) # Ensure backward compatibility by removing pio install status indicator - tl_piopm_path = os.path.join(tl_install_path, ".piopm") + tl_piopm_path = tl_install_path / ".piopm" safe_remove_file(tl_piopm_path) - if os.path.exists(os.path.join(tl_install_path, "package.json")): + if (tl_install_path / "package.json").exists(): logger.info(f"{tl_install_name} successfully installed and verified") self.packages[tl_install_name]["optional"] = True @@ -349,40 +361,37 @@ def _cleanup_versioned_tool_directories(self, tool_name: str) -> None: Args: tool_name: Name of the tool to clean up """ - if not os.path.exists(self.packages_dir) or not os.path.isdir(self.packages_dir): + packages_path = Path(self.packages_dir) + if not packages_path.exists() or not packages_path.is_dir(): return try: # Remove directories with '@' in their name (e.g., tool-name@version, tool-name@src) - safe_remove_directory_pattern(self.packages_dir, f"{tool_name}@*") + safe_remove_directory_pattern(packages_path, f"{tool_name}@*") # Remove directories with version suffixes (e.g., tool-name.12345) - safe_remove_directory_pattern(self.packages_dir, f"{tool_name}.*") + safe_remove_directory_pattern(packages_path, f"{tool_name}.*") # Also check for any directory that starts with tool_name and contains '@' - for item in os.listdir(self.packages_dir): - if item.startswith(tool_name) and '@' in item: - item_path = os.path.join(self.packages_dir, item) - if os.path.isdir(item_path): - safe_remove_directory(item_path) - logger.debug(f"Removed versioned directory: {item_path}") + for item in packages_path.iterdir(): + if item.name.startswith(tool_name) and '@' in item.name and item.is_dir(): + safe_remove_directory(item) + logger.debug(f"Removed versioned directory: {item}") - except OSError as e: - logger.error(f"Error cleaning up versioned directories for {tool_name}: {e}") + except OSError: + logger.exception(f"Error cleaning up versioned directories for {tool_name}") def _get_tool_paths(self, tool_name: str) -> Dict[str, str]: """Get centralized path calculation for tools with caching.""" if tool_name not in self._tools_cache: - tool_path = os.path.join(self.packages_dir, tool_name) + tool_path = Path(self.packages_dir) / tool_name self._tools_cache[tool_name] = { - 'tool_path': tool_path, - 'package_path': os.path.join(tool_path, "package.json"), - 'tools_json_path': os.path.join(tool_path, "tools.json"), - 'piopm_path': os.path.join(tool_path, ".piopm"), - 'idf_tools_path': os.path.join( - self.packages_dir, tl_install_name, "tools", "idf_tools.py" - ) + 'tool_path': str(tool_path), + 'package_path': str(tool_path / "package.json"), + 'tools_json_path': str(tool_path / "tools.json"), + 'piopm_path': str(tool_path / ".piopm"), + 'idf_tools_path': str(Path(self.packages_dir) / tl_install_name / "tools" / "idf_tools.py") } return self._tools_cache[tool_name] @@ -390,10 +399,10 @@ def _check_tool_status(self, tool_name: str) -> Dict[str, bool]: """Check the installation status of a tool.""" paths = self._get_tool_paths(tool_name) return { - 'has_idf_tools': os.path.exists(paths['idf_tools_path']), - 'has_tools_json': os.path.exists(paths['tools_json_path']), - 'has_piopm': os.path.exists(paths['piopm_path']), - 'tool_exists': os.path.exists(paths['tool_path']) + 'has_idf_tools': Path(paths['idf_tools_path']).exists(), + 'has_tools_json': Path(paths['tools_json_path']).exists(), + 'has_piopm': Path(paths['piopm_path']).exists(), + 'tool_exists': Path(paths['tool_path']).exists() } def _run_idf_tools_install(self, tools_json_path: str, idf_tools_path: str) -> bool: @@ -493,16 +502,14 @@ def _install_with_idf_tools(self, tool_name: str, paths: Dict[str, str]) -> bool return False # Copy tool files - target_package_path = os.path.join( - IDF_TOOLS_PATH, "tools", tool_name, "package.json" - ) + target_package_path = Path(IDF_TOOLS_PATH) / "tools" / tool_name / "package.json" if not safe_copy_file(paths['package_path'], target_package_path): return False safe_remove_directory(paths['tool_path']) - tl_path = f"file://{os.path.join(IDF_TOOLS_PATH, 'tools', tool_name)}" + tl_path = f"file://{Path(IDF_TOOLS_PATH) / 'tools' / tool_name}" pm.install(tl_path) logger.info(f"Tool {tool_name} successfully installed") @@ -597,7 +604,7 @@ def _configure_mcu_toolchains( self.install_tool(toolchain) # ULP toolchain if ULP directory exists - if mcu_config.get("ulp_toolchain") and os.path.isdir("ulp"): + if mcu_config.get("ulp_toolchain") and Path("ulp").is_dir(): for toolchain in mcu_config["ulp_toolchain"]: self.install_tool(toolchain) @@ -616,16 +623,14 @@ def _configure_installer(self) -> None: return # Remove pio install marker to avoid issues when switching versions - old_tl_piopm_path = os.path.join(self.packages_dir, "tl-install", ".piopm") - if os.path.exists(old_tl_piopm_path): + old_tl_piopm_path = Path(self.packages_dir) / "tl-install" / ".piopm" + if old_tl_piopm_path.exists(): safe_remove_file(old_tl_piopm_path) # Check if idf_tools.py is available - installer_path = os.path.join( - self.packages_dir, tl_install_name, "tools", "idf_tools.py" - ) + installer_path = Path(self.packages_dir) / tl_install_name / "tools" / "idf_tools.py" - if os.path.exists(installer_path): + if installer_path.exists(): logger.debug(f"{tl_install_name} is available and ready") self.packages[tl_install_name]["optional"] = True else: @@ -653,42 +658,42 @@ def _configure_check_tools(self, variables: Dict) -> None: def _ensure_mklittlefs_version(self) -> None: """Ensure correct mklittlefs version is installed.""" - piopm_path = os.path.join(self.packages_dir, "tool-mklittlefs", ".piopm") + piopm_path = Path(self.packages_dir) / "tool-mklittlefs" / ".piopm" - if os.path.exists(piopm_path): + if piopm_path.exists(): try: with open(piopm_path, 'r', encoding='utf-8') as f: package_data = json.load(f) version = package_data.get('version', '') if not version.startswith("3."): - os.remove(piopm_path) + safe_remove_file(piopm_path) logger.info(f"Incompatible mklittlefs version {version} removed (required: 3.x)") - except (json.JSONDecodeError, KeyError) as e: - logger.error(f"Error reading mklittlefs package {e}") + except (json.JSONDecodeError, KeyError): + logger.exception("Error reading mklittlefs package metadata") def _setup_mklittlefs_for_download(self) -> None: """Setup mklittlefs for download functionality with version 4.x.""" - mklittlefs_dir = os.path.join(self.packages_dir, "tool-mklittlefs") - mklittlefs4_dir = os.path.join( - self.packages_dir, "tool-mklittlefs4" - ) + mklittlefs_dir = Path(self.packages_dir) / "tool-mklittlefs" + mklittlefs4_dir = Path(self.packages_dir) / "tool-mklittlefs4" # Ensure mklittlefs 3.x is installed - if not os.path.exists(mklittlefs_dir): + if not mklittlefs_dir.exists(): self.install_tool("tool-mklittlefs") - if os.path.exists(os.path.join(mklittlefs_dir, "tools.json")): + if (mklittlefs_dir / "tools.json").exists(): self.install_tool("tool-mklittlefs") # Install mklittlefs 4.x - if not os.path.exists(mklittlefs4_dir): + if not mklittlefs4_dir.exists(): self.install_tool("tool-mklittlefs4") - if os.path.exists(os.path.join(mklittlefs4_dir, "tools.json")): + if (mklittlefs4_dir / "tools.json").exists(): self.install_tool("tool-mklittlefs4") # Copy mklittlefs 4.x over 3.x - if os.path.exists(mklittlefs4_dir): - package_src = os.path.join(mklittlefs_dir, "package.json") - package_dst = os.path.join(mklittlefs4_dir, "package.json") + if mklittlefs4_dir.exists(): + # Copy 3.x package.json into 4.x before mirroring 4.x -> 3.x, + # so 3.x dir ends up with 4.x binaries and 3.x metadata. + package_src = mklittlefs_dir / "package.json" + package_dst = mklittlefs4_dir / "package.json" safe_copy_file(package_src, package_dst) shutil.copytree(mklittlefs4_dir, mklittlefs_dir, dirs_exist_ok=True) self.packages.pop("tool-mkfatfs", None) @@ -899,7 +904,7 @@ def configure_debug_session(self, debug_config): ignore_conds = [ debug_config.load_cmds != ["load"], not flash_images, - not all([os.path.isfile(item["path"]) for item in flash_images]), + not all([Path(item["path"]).is_file() for item in flash_images]), ] if any(ignore_conds):