Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 93 additions & 88 deletions platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import sys
import shutil
import logging
from typing import Optional, Dict, List, Any
from pathlib import Path
from typing import Optional, Dict, List, Any, Union

from platformio.compat import IS_WINDOWS
from platformio.public import PlatformBase, to_unix_path
Expand Down Expand Up @@ -95,8 +96,8 @@
raise SystemExit(1)

# Set IDF_TOOLS_PATH to Pio core_dir
PROJECT_CORE_DIR=ProjectConfig.get_instance().get("platformio", "core_dir")
IDF_TOOLS_PATH=os.path.join(PROJECT_CORE_DIR)
PROJECT_CORE_DIR = ProjectConfig.get_instance().get("platformio", "core_dir")
IDF_TOOLS_PATH = PROJECT_CORE_DIR
os.environ["IDF_TOOLS_PATH"] = IDF_TOOLS_PATH
os.environ['IDF_PATH'] = ""

Expand Down Expand Up @@ -130,51 +131,62 @@ def wrapper(*args, **kwargs):


@safe_file_operation
def safe_remove_file(path: str) -> bool:
"""Safely remove a file with error handling."""
if os.path.exists(path) and os.path.isfile(path):
os.remove(path)
def safe_remove_file(path: Union[str, Path]) -> bool:
"""Safely remove a file with error handling using pathlib."""
path = Path(path)
if path.is_file() or path.is_symlink():
path.unlink()
logger.debug(f"File removed: {path}")
return True


@safe_file_operation
def safe_remove_directory(path: str) -> bool:
"""Safely remove directories with error handling."""
if os.path.exists(path) and os.path.isdir(path):
def safe_remove_directory(path: Union[str, Path]) -> bool:
"""Safely remove directories with error handling using pathlib."""
path = Path(path)
if not path.exists():
return True
if path.is_symlink():
path.unlink()
elif path.is_dir():
shutil.rmtree(path)
logger.debug(f"Directory removed: {path}")
return True


@safe_file_operation
def safe_remove_directory_pattern(base_path: str, pattern: str) -> bool:
"""Safely remove directories matching a pattern with error handling."""
if not os.path.exists(base_path):
def safe_remove_directory_pattern(base_path: Union[str, Path], pattern: str) -> bool:
"""Safely remove directories matching a pattern with error handling using pathlib."""
base_path = Path(base_path)
if not base_path.exists():
return True
# Find all directories matching the pattern in the base directory
for item in os.listdir(base_path):
item_path = os.path.join(base_path, item)
if os.path.isdir(item_path) and fnmatch.fnmatch(item, pattern):
shutil.rmtree(item_path)
logger.debug(f"Directory removed: {item_path}")
for item in base_path.rglob("*"):
if fnmatch.fnmatch(item.name, pattern):
if item.is_symlink():
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
logger.debug(f"Directory removed: {item}")
return True


@safe_file_operation
def safe_copy_file(src: str, dst: str) -> bool:
"""Safely copy files with error handling."""
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copyfile(src, dst)
def safe_copy_file(src: Union[str, Path], dst: Union[str, Path]) -> bool:
"""Safely copy files with error handling using pathlib."""
src, dst = Path(src), Path(dst)
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
logger.debug(f"File copied: {src} -> {dst}")
return True


@safe_file_operation
def safe_copy_directory(src: str, dst: str) -> bool:
"""Safely copy directories with error handling."""
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copytree(src, dst, dirs_exist_ok=True)
def safe_copy_directory(src: Union[str, Path], dst: Union[str, Path]) -> bool:
"""Safely copy directories with error handling using pathlib."""
src, dst = Path(src), Path(dst)
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(src, dst, dirs_exist_ok=True, copy_function=shutil.copy2, symlinks=True)
logger.debug(f"Directory copied: {src} -> {dst}")
return True

Expand All @@ -190,11 +202,11 @@ def __init__(self, *args, **kwargs):
self._mcu_config_cache = {}

@property
def packages_dir(self) -> str:
def packages_dir(self) -> Path:
"""Get cached packages directory path."""
if self._packages_dir is None:
config = ProjectConfig.get_instance()
self._packages_dir = config.get("platformio", "packages_dir")
self._packages_dir = Path(config.get("platformio", "packages_dir"))
return self._packages_dir

def _check_tl_install_version(self) -> bool:
Expand All @@ -213,10 +225,10 @@ def _check_tl_install_version(self) -> bool:
return True

# Check if tool is already installed
tl_install_path = os.path.join(self.packages_dir, tl_install_name)
package_json_path = os.path.join(tl_install_path, "package.json")
tl_install_path = self.packages_dir / tl_install_name
package_json_path = tl_install_path / "package.json"

if not os.path.exists(package_json_path):
if not package_json_path.exists():
logger.info(f"{tl_install_name} not installed, installing version {required_version}")
return self._install_tl_install(required_version)

Expand Down Expand Up @@ -300,16 +312,16 @@ def _install_tl_install(self, version: str) -> bool:
Returns:
bool: True if installation successful, False otherwise
"""
tl_install_path = os.path.join(self.packages_dir, tl_install_name)
old_tl_install_path = os.path.join(self.packages_dir, "tl-install")
tl_install_path = Path(self.packages_dir) / tl_install_name
old_tl_install_path = Path(self.packages_dir) / "tl-install"

try:
old_tl_install_exists = os.path.exists(old_tl_install_path)
old_tl_install_exists = old_tl_install_path.exists()
if old_tl_install_exists:
# remove outdated tl-install
safe_remove_directory(old_tl_install_path)

if os.path.exists(tl_install_path):
if tl_install_path.exists():
logger.info(f"Removing old {tl_install_name} installation")
safe_remove_directory(tl_install_path)

Expand All @@ -318,10 +330,10 @@ def _install_tl_install(self, version: str) -> bool:
self.packages[tl_install_name]["version"] = version
pm.install(version)
# Ensure backward compatibility by removing pio install status indicator
tl_piopm_path = os.path.join(tl_install_path, ".piopm")
tl_piopm_path = tl_install_path / ".piopm"
safe_remove_file(tl_piopm_path)

if os.path.exists(os.path.join(tl_install_path, "package.json")):
if (tl_install_path / "package.json").exists():
logger.info(f"{tl_install_name} successfully installed and verified")
self.packages[tl_install_name]["optional"] = True

Expand Down Expand Up @@ -349,51 +361,48 @@ def _cleanup_versioned_tool_directories(self, tool_name: str) -> None:
Args:
tool_name: Name of the tool to clean up
"""
if not os.path.exists(self.packages_dir) or not os.path.isdir(self.packages_dir):
packages_path = Path(self.packages_dir)
if not packages_path.exists() or not packages_path.is_dir():
return

try:
# Remove directories with '@' in their name (e.g., tool-name@version, tool-name@src)
safe_remove_directory_pattern(self.packages_dir, f"{tool_name}@*")
safe_remove_directory_pattern(packages_path, f"{tool_name}@*")

# Remove directories with version suffixes (e.g., tool-name.12345)
safe_remove_directory_pattern(self.packages_dir, f"{tool_name}.*")
safe_remove_directory_pattern(packages_path, f"{tool_name}.*")

# Also check for any directory that starts with tool_name and contains '@'
for item in os.listdir(self.packages_dir):
if item.startswith(tool_name) and '@' in item:
item_path = os.path.join(self.packages_dir, item)
if os.path.isdir(item_path):
safe_remove_directory(item_path)
logger.debug(f"Removed versioned directory: {item_path}")
for item in packages_path.iterdir():
if item.name.startswith(tool_name) and '@' in item.name and item.is_dir():
safe_remove_directory(item)
logger.debug(f"Removed versioned directory: {item}")

except OSError as e:
logger.error(f"Error cleaning up versioned directories for {tool_name}: {e}")
except OSError:
logger.exception(f"Error cleaning up versioned directories for {tool_name}")

def _get_tool_paths(self, tool_name: str) -> Dict[str, str]:
"""Get centralized path calculation for tools with caching."""
if tool_name not in self._tools_cache:
tool_path = os.path.join(self.packages_dir, tool_name)
tool_path = Path(self.packages_dir) / tool_name

self._tools_cache[tool_name] = {
'tool_path': tool_path,
'package_path': os.path.join(tool_path, "package.json"),
'tools_json_path': os.path.join(tool_path, "tools.json"),
'piopm_path': os.path.join(tool_path, ".piopm"),
'idf_tools_path': os.path.join(
self.packages_dir, tl_install_name, "tools", "idf_tools.py"
)
'tool_path': str(tool_path),
'package_path': str(tool_path / "package.json"),
'tools_json_path': str(tool_path / "tools.json"),
'piopm_path': str(tool_path / ".piopm"),
'idf_tools_path': str(Path(self.packages_dir) / tl_install_name / "tools" / "idf_tools.py")
}
return self._tools_cache[tool_name]

def _check_tool_status(self, tool_name: str) -> Dict[str, bool]:
"""Check the installation status of a tool."""
paths = self._get_tool_paths(tool_name)
return {
'has_idf_tools': os.path.exists(paths['idf_tools_path']),
'has_tools_json': os.path.exists(paths['tools_json_path']),
'has_piopm': os.path.exists(paths['piopm_path']),
'tool_exists': os.path.exists(paths['tool_path'])
'has_idf_tools': Path(paths['idf_tools_path']).exists(),
'has_tools_json': Path(paths['tools_json_path']).exists(),
'has_piopm': Path(paths['piopm_path']).exists(),
'tool_exists': Path(paths['tool_path']).exists()
}

def _run_idf_tools_install(self, tools_json_path: str, idf_tools_path: str) -> bool:
Expand Down Expand Up @@ -493,16 +502,14 @@ def _install_with_idf_tools(self, tool_name: str, paths: Dict[str, str]) -> bool
return False

# Copy tool files
target_package_path = os.path.join(
IDF_TOOLS_PATH, "tools", tool_name, "package.json"
)
target_package_path = Path(IDF_TOOLS_PATH) / "tools" / tool_name / "package.json"

if not safe_copy_file(paths['package_path'], target_package_path):
return False

safe_remove_directory(paths['tool_path'])

tl_path = f"file://{os.path.join(IDF_TOOLS_PATH, 'tools', tool_name)}"
tl_path = f"file://{Path(IDF_TOOLS_PATH) / 'tools' / tool_name}"
pm.install(tl_path)

logger.info(f"Tool {tool_name} successfully installed")
Expand Down Expand Up @@ -597,7 +604,7 @@ def _configure_mcu_toolchains(
self.install_tool(toolchain)

# ULP toolchain if ULP directory exists
if mcu_config.get("ulp_toolchain") and os.path.isdir("ulp"):
if mcu_config.get("ulp_toolchain") and Path("ulp").is_dir():
for toolchain in mcu_config["ulp_toolchain"]:
self.install_tool(toolchain)

Expand All @@ -616,16 +623,14 @@ def _configure_installer(self) -> None:
return

# Remove pio install marker to avoid issues when switching versions
old_tl_piopm_path = os.path.join(self.packages_dir, "tl-install", ".piopm")
if os.path.exists(old_tl_piopm_path):
old_tl_piopm_path = Path(self.packages_dir) / "tl-install" / ".piopm"
if old_tl_piopm_path.exists():
safe_remove_file(old_tl_piopm_path)

# Check if idf_tools.py is available
installer_path = os.path.join(
self.packages_dir, tl_install_name, "tools", "idf_tools.py"
)
installer_path = Path(self.packages_dir) / tl_install_name / "tools" / "idf_tools.py"

if os.path.exists(installer_path):
if installer_path.exists():
logger.debug(f"{tl_install_name} is available and ready")
self.packages[tl_install_name]["optional"] = True
else:
Expand Down Expand Up @@ -653,42 +658,42 @@ def _configure_check_tools(self, variables: Dict) -> None:

def _ensure_mklittlefs_version(self) -> None:
"""Ensure correct mklittlefs version is installed."""
piopm_path = os.path.join(self.packages_dir, "tool-mklittlefs", ".piopm")
piopm_path = Path(self.packages_dir) / "tool-mklittlefs" / ".piopm"

if os.path.exists(piopm_path):
if piopm_path.exists():
try:
with open(piopm_path, 'r', encoding='utf-8') as f:
package_data = json.load(f)
version = package_data.get('version', '')
if not version.startswith("3."):
os.remove(piopm_path)
safe_remove_file(piopm_path)
logger.info(f"Incompatible mklittlefs version {version} removed (required: 3.x)")
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"Error reading mklittlefs package {e}")
except (json.JSONDecodeError, KeyError):
logger.exception("Error reading mklittlefs package metadata")

def _setup_mklittlefs_for_download(self) -> None:
"""Setup mklittlefs for download functionality with version 4.x."""
mklittlefs_dir = os.path.join(self.packages_dir, "tool-mklittlefs")
mklittlefs4_dir = os.path.join(
self.packages_dir, "tool-mklittlefs4"
)
mklittlefs_dir = Path(self.packages_dir) / "tool-mklittlefs"
mklittlefs4_dir = Path(self.packages_dir) / "tool-mklittlefs4"

# Ensure mklittlefs 3.x is installed
if not os.path.exists(mklittlefs_dir):
if not mklittlefs_dir.exists():
self.install_tool("tool-mklittlefs")
if os.path.exists(os.path.join(mklittlefs_dir, "tools.json")):
if (mklittlefs_dir / "tools.json").exists():
self.install_tool("tool-mklittlefs")

# Install mklittlefs 4.x
if not os.path.exists(mklittlefs4_dir):
if not mklittlefs4_dir.exists():
self.install_tool("tool-mklittlefs4")
if os.path.exists(os.path.join(mklittlefs4_dir, "tools.json")):
if (mklittlefs4_dir / "tools.json").exists():
self.install_tool("tool-mklittlefs4")

# Copy mklittlefs 4.x over 3.x
if os.path.exists(mklittlefs4_dir):
package_src = os.path.join(mklittlefs_dir, "package.json")
package_dst = os.path.join(mklittlefs4_dir, "package.json")
if mklittlefs4_dir.exists():
# Copy 3.x package.json into 4.x before mirroring 4.x -> 3.x,
# so 3.x dir ends up with 4.x binaries and 3.x metadata.
package_src = mklittlefs_dir / "package.json"
package_dst = mklittlefs4_dir / "package.json"
safe_copy_file(package_src, package_dst)
shutil.copytree(mklittlefs4_dir, mklittlefs_dir, dirs_exist_ok=True)
self.packages.pop("tool-mkfatfs", None)
Expand Down Expand Up @@ -899,7 +904,7 @@ def configure_debug_session(self, debug_config):
ignore_conds = [
debug_config.load_cmds != ["load"],
not flash_images,
not all([os.path.isfile(item["path"]) for item in flash_images]),
not all([Path(item["path"]).is_file() for item in flash_images]),
]

if any(ignore_conds):
Expand Down