diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b9ddc62b..ab7af9125 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `find_primitive_files()` now uses `os.walk` with early directory pruning so `compilation.exclude` patterns prevent traversal into excluded subtrees on large repos. (#870) - CI: smoke tests in `build-release.yml`'s `build-and-test` job (Linux x86_64, Linux arm64, Windows) are now gated to promotion boundaries (tag/schedule/dispatch) instead of running on every push to main. Push-time smoke duplicated the merge-time smoke gate in `ci-integration.yml` and burned ~15 redundant codex-binary downloads/day. Tag-cut releases still run smoke as a pre-ship gate; nightly catches upstream codex URL drift; merge-time still gates merges into main. (#878) - CI docs: clarify that branch-protection ruleset must store the check-run name (`gate`), not the workflow display string (`Merge Gate / gate`); document the merge-gate aggregator in `cicd.instructions.md` and mark the legacy stub workflow as deprecated. diff --git a/src/apm_cli/constants.py b/src/apm_cli/constants.py index 12d3372ac..aaf01f0aa 100644 --- a/src/apm_cli/constants.py +++ b/src/apm_cli/constants.py @@ -29,3 +29,25 @@ class InstallMode(Enum): CLAUDE_DIR = ".claude" GITIGNORE_FILENAME = ".gitignore" APM_MODULES_GITIGNORE_PATTERN = "apm_modules/" + + +# --------------------------------------------------------------------------- +# Directory names unconditionally skipped during primitive-file discovery. +# These never contain APM primitives or user source files and can be very +# large (e.g. node_modules, .git objects). Used by find_primitive_files() +# in primitives/discovery.py to prune traversal. +# NOTE: .apm is intentionally absent -- it is where primitives live. +# --------------------------------------------------------------------------- +DEFAULT_SKIP_DIRS: frozenset = frozenset({ + ".git", + "node_modules", + "__pycache__", + ".pytest_cache", + ".venv", + "venv", + ".tox", + "build", + "dist", + ".mypy_cache", + "apm_modules", +}) diff --git a/src/apm_cli/primitives/discovery.py b/src/apm_cli/primitives/discovery.py index 785e938ea..ef63f1667 100644 --- a/src/apm_cli/primitives/discovery.py +++ b/src/apm_cli/primitives/discovery.py @@ -1,13 +1,15 @@ """Discovery functionality for primitive files.""" +import fnmatch +import glob import logging import os -import glob from pathlib import Path -from typing import List, Dict, Optional +from typing import Dict, List, Optional, Tuple from .models import PrimitiveCollection from .parser import parse_primitive_file, parse_skill_file +from ..constants import DEFAULT_SKIP_DIRS from ..utils.exclude import should_exclude, validate_exclude_patterns logger = logging.getLogger(__name__) @@ -92,12 +94,9 @@ def discover_primitives( # Find and parse files for each primitive type for primitive_type, patterns in LOCAL_PRIMITIVE_PATTERNS.items(): - files = find_primitive_files(base_dir, patterns) + files = find_primitive_files(base_dir, patterns, exclude_patterns=safe_patterns) for file_path in files: - if should_exclude(file_path, base_path, safe_patterns): - logger.debug("Excluded by pattern: %s", file_path) - continue try: primitive = parse_primitive_file(file_path, source="local") collection.add_primitive(primitive) @@ -159,7 +158,7 @@ def scan_local_primitives( """ # Find and parse files for each primitive type for primitive_type, patterns in LOCAL_PRIMITIVE_PATTERNS.items(): - files = find_primitive_files(base_dir, patterns) + files = find_primitive_files(base_dir, patterns, exclude_patterns=exclude_patterns) # Filter out files from apm_modules to avoid conflicts with dependency scanning local_files = [] @@ -170,10 +169,6 @@ def scan_local_primitives( # Only include files that are NOT in apm_modules directory if _is_under_directory(file_path, apm_modules_path): continue - # Apply compilation.exclude patterns - if should_exclude(file_path, base_path, exclude_patterns): - logger.debug("Excluded by pattern: %s", file_path) - continue local_files.append(file_path) for file_path in local_files: @@ -397,42 +392,118 @@ def _discover_skill_in_directory(directory: Path, collection: PrimitiveCollectio print(f"Warning: Failed to parse SKILL.md in {directory}: {e}") -def find_primitive_files(base_dir: str, patterns: List[str]) -> List[Path]: +def _glob_match(rel_path: str, pattern: str) -> bool: + """Match a forward-slash relative path against a glob pattern. + + Segment-aware: ``*`` and ``?`` match within a single path segment only, + while ``**`` matches zero or more complete segments. This preserves + standard glob semantics so a pattern like + ``**/.apm/instructions/*.instructions.md`` does not accidentally match + ``.apm/instructions/sub/x.instructions.md`` (the trailing ``*`` must + not cross ``/``). + + Args: + rel_path: Relative path using forward slashes. + pattern: Glob pattern using forward slashes. + + Returns: + True if the path matches the pattern. + """ + path_parts: List[str] = [p for p in rel_path.split('/') if p] + pattern_parts: List[str] = [p for p in pattern.split('/') if p] + memo: Dict[Tuple[int, int], bool] = {} + + def _match(pi: int, qi: int) -> bool: + key = (pi, qi) + if key in memo: + return memo[key] + + if qi == len(pattern_parts): + result = pi == len(path_parts) + memo[key] = result + return result + + current = pattern_parts[qi] + + if current == '**': + # ** matches zero segments, OR consumes one segment and stays at ** + result = _match(pi, qi + 1) + if not result and pi < len(path_parts): + result = _match(pi + 1, qi) + memo[key] = result + return result + + if pi >= len(path_parts): + memo[key] = False + return False + + # Use platform-aware fnmatch semantics so Windows matching remains + # case-insensitive, consistent with prior glob.glob() behavior. + result = ( + fnmatch.fnmatch(path_parts[pi], current) + and _match(pi + 1, qi + 1) + ) + memo[key] = result + return result + + return _match(0, 0) + + +def find_primitive_files( + base_dir: str, + patterns: List[str], + exclude_patterns: Optional[List[str]] = None, +) -> List[Path]: """Find primitive files matching the given patterns. - + + Uses os.walk with early directory pruning instead of glob.glob(recursive=True) + so that exclude_patterns prevent traversal into expensive subtrees. + Symlinks are rejected outright to prevent symlink-based traversal attacks from malicious packages. - + Args: base_dir (str): Base directory to search in. patterns (List[str]): List of glob patterns to match. - + exclude_patterns (Optional[List[str]]): Pre-validated exclude patterns + to prune directories early during traversal. + Returns: - List[Path]: List of unique file paths found. + List[Path]: List of file paths found. """ if not os.path.isdir(base_dir): return [] - - all_files = [] - - for pattern in patterns: - # Use glob to find files matching the pattern - matching_files = glob.glob(os.path.join(base_dir, pattern), recursive=True) - all_files.extend(matching_files) - - # Remove duplicates while preserving order and convert to Path objects - seen = set() - unique_files = [] - - for file_path in all_files: - abs_path = os.path.abspath(file_path) - if abs_path not in seen: - seen.add(abs_path) - unique_files.append(Path(abs_path)) - + + base_path = Path(base_dir).resolve() + + all_files: List[Path] = [] + + for root, dirs, files in os.walk(str(base_path)): + current = Path(root) + # Prune excluded directories BEFORE descending + dirs[:] = sorted( + d for d in dirs + if d not in DEFAULT_SKIP_DIRS + and not _exclude_matches_dir(current / d, base_path, exclude_patterns) + ) + + # Sort files for deterministic discovery order across platforms + for file_name in sorted(files): + file_path = current / file_name + rel_str = str(file_path.relative_to(base_path)).replace(os.sep, '/') + # File-level exclude: a pattern like "**/*.draft.md" should drop + # individual files even when their parent directory is included. + if exclude_patterns and should_exclude(file_path, base_path, exclude_patterns): + logger.debug("Excluded by pattern: %s", file_path) + continue + for pattern in patterns: + if _glob_match(rel_str, pattern): + all_files.append(file_path) + break + # Filter out directories, symlinks, and unreadable files valid_files = [] - for file_path in unique_files: + for file_path in all_files: if not file_path.is_file(): continue if file_path.is_symlink(): @@ -440,10 +511,21 @@ def find_primitive_files(base_dir: str, patterns: List[str]) -> List[Path]: continue if _is_readable(file_path): valid_files.append(file_path) - + return valid_files +def _exclude_matches_dir( + dir_path: Path, + base_path: Path, + exclude_patterns: Optional[List[str]], +) -> bool: + """Check if a directory matches any exclude pattern (for early pruning).""" + if not exclude_patterns: + return False + return should_exclude(dir_path, base_path, exclude_patterns) + + def _is_readable(file_path: Path) -> bool: """Check if a file is readable. @@ -471,18 +553,5 @@ def _should_skip_directory(dir_path: str) -> bool: Returns: bool: True if directory should be skipped, False otherwise. """ - skip_patterns = { - '.git', - 'node_modules', - '__pycache__', - '.pytest_cache', - '.venv', - 'venv', - '.tox', - 'build', - 'dist', - '.mypy_cache' - } - dir_name = os.path.basename(dir_path) - return dir_name in skip_patterns \ No newline at end of file + return dir_name in DEFAULT_SKIP_DIRS \ No newline at end of file diff --git a/tests/unit/primitives/test_discovery_walk.py b/tests/unit/primitives/test_discovery_walk.py new file mode 100644 index 000000000..e93e1d763 --- /dev/null +++ b/tests/unit/primitives/test_discovery_walk.py @@ -0,0 +1,310 @@ +"""Tests for os.walk-based discovery (replacing glob.glob) and related helpers. + +Covers _glob_match, find_primitive_files with exclude_patterns, and +_exclude_matches_dir -- the new code introduced to fix compile hangs +on large repositories. +""" + +import tempfile +import unittest +from pathlib import Path + +from apm_cli.primitives.discovery import ( + _exclude_matches_dir, + _glob_match, + find_primitive_files, +) +from apm_cli.constants import DEFAULT_SKIP_DIRS + + +def _write(path: Path, content: str = "---\ndescription: stub\n---\n\n# Stub\n") -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + + +# ------------------------------------------------------------------- +# _glob_match +# ------------------------------------------------------------------- +class TestGlobMatch(unittest.TestCase): + """Tests for _glob_match -- fnmatch wrapper with ** zero-segment support.""" + + # -- simple patterns (no **) -- + def test_simple_star(self): + self.assertTrue(_glob_match("readme.md", "*.md")) + + def test_simple_star_no_match(self): + self.assertFalse(_glob_match("readme.txt", "*.md")) + + def test_simple_exact(self): + self.assertTrue(_glob_match("SKILL.md", "SKILL.md")) + + def test_simple_question_mark(self): + self.assertTrue(_glob_match("a.py", "?.py")) + self.assertFalse(_glob_match("ab.py", "?.py")) + + # -- ** matching one-or-more segments -- + def test_doublestar_one_segment(self): + self.assertTrue(_glob_match("src/app.py", "**/*.py")) + + def test_doublestar_multiple_segments(self): + self.assertTrue(_glob_match("a/b/c/d.py", "**/*.py")) + + # -- ** matching zero segments -- + def test_doublestar_zero_segments(self): + """**/*.md should match readme.md at the root (zero directory segments).""" + self.assertTrue(_glob_match("readme.md", "**/*.md")) + + def test_doublestar_zero_segments_instructions(self): + self.assertTrue(_glob_match("coding.instructions.md", "**/*.instructions.md")) + + # -- ** in the middle of a pattern -- + def test_doublestar_middle(self): + self.assertTrue(_glob_match(".apm/instructions/style.instructions.md", + "**/.apm/instructions/*.instructions.md")) + + def test_doublestar_middle_nested(self): + self.assertTrue(_glob_match("sub/dir/.apm/instructions/style.instructions.md", + "**/.apm/instructions/*.instructions.md")) + + def test_doublestar_middle_zero(self): + """Leading **/ should also match zero segments when pattern has a middle path.""" + self.assertTrue(_glob_match(".apm/instructions/style.instructions.md", + "**/.apm/instructions/*.instructions.md")) + + # -- no match -- + def test_no_match_extension(self): + self.assertFalse(_glob_match("src/app.js", "**/*.py")) + + def test_no_match_prefix(self): + self.assertFalse(_glob_match("src/app.py", "lib/**/*.py")) + + # -- pattern without ** stays simple -- + def test_no_doublestar_subdir(self): + """Without **, pattern should not cross directories.""" + result = _glob_match("a/b.py", "*.py") + self.assertIsInstance(result, bool) + + +# ------------------------------------------------------------------- +# _exclude_matches_dir +# ------------------------------------------------------------------- +class TestExcludeMatchesDir(unittest.TestCase): + """Tests for _exclude_matches_dir -- thin wrapper over should_exclude.""" + + def test_none_patterns_returns_false(self): + self.assertFalse(_exclude_matches_dir(Path("/p/node_modules"), Path("/p"), None)) + + def test_empty_patterns_returns_false(self): + self.assertFalse(_exclude_matches_dir(Path("/p/node_modules"), Path("/p"), [])) + + def test_matching_pattern(self): + self.assertTrue( + _exclude_matches_dir(Path("/p/Binaries"), Path("/p"), ["Binaries"]) + ) + + def test_non_matching_pattern(self): + self.assertFalse( + _exclude_matches_dir(Path("/p/src"), Path("/p"), ["Binaries"]) + ) + + def test_glob_pattern(self): + self.assertTrue( + _exclude_matches_dir(Path("/p/a/test-fixtures"), Path("/p"), ["**/test-fixtures"]) + ) + + +# ------------------------------------------------------------------- +# find_primitive_files -- early directory pruning +# ------------------------------------------------------------------- +class TestFindPrimitiveFilesExclude(unittest.TestCase): + """Tests that find_primitive_files prunes directories via exclude_patterns.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.base = Path(self.tmp) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_finds_instruction_in_apm_dir(self): + _write(self.base / ".apm" / "instructions" / "style.instructions.md") + result = find_primitive_files(str(self.base), ["**/.apm/instructions/*.instructions.md"]) + self.assertEqual(len(result), 1) + self.assertTrue(result[0].name == "style.instructions.md") + + def test_finds_file_at_root(self): + _write(self.base / "root.instructions.md") + result = find_primitive_files(str(self.base), ["**/*.instructions.md"]) + self.assertEqual(len(result), 1) + + def test_skips_default_dirs(self): + """Files inside DEFAULT_SKIP_DIRS should never be returned.""" + _write(self.base / "node_modules" / "pkg" / "bad.instructions.md") + _write(self.base / "__pycache__" / "bad.instructions.md") + _write(self.base / ".git" / "hooks" / "bad.instructions.md") + _write(self.base / "src" / "good.instructions.md") + + result = find_primitive_files(str(self.base), ["**/*.instructions.md"]) + names = [f.name for f in result] + self.assertIn("good.instructions.md", names) + self.assertNotIn("bad.instructions.md", names) + + def test_exclude_patterns_prune_custom_dirs(self): + """User-supplied exclude_patterns prevent traversal into named dirs.""" + _write(self.base / "Binaries" / "Win64" / "deep.instructions.md") + _write(self.base / "Content" / "Textures" / "deep.instructions.md") + _write(self.base / "Source" / "style.instructions.md") + + result = find_primitive_files( + str(self.base), + ["**/*.instructions.md"], + exclude_patterns=["Binaries", "Content"], + ) + names = [f.name for f in result] + self.assertIn("style.instructions.md", names) + self.assertNotIn("deep.instructions.md", names) + + def test_exclude_patterns_glob_style(self): + """Glob-style exclude patterns work for nested matches.""" + _write(self.base / "a" / "test-fixtures" / "f.instructions.md") + _write(self.base / "b" / "real.instructions.md") + + result = find_primitive_files( + str(self.base), + ["**/*.instructions.md"], + exclude_patterns=["**/test-fixtures"], + ) + names = [f.name for f in result] + self.assertIn("real.instructions.md", names) + self.assertNotIn("f.instructions.md", names) + + def test_exclude_patterns_none_finds_everything(self): + """When exclude_patterns is None, only default skips apply.""" + _write(self.base / "a" / "one.instructions.md") + _write(self.base / "b" / "two.instructions.md") + + result = find_primitive_files( + str(self.base), ["**/*.instructions.md"], exclude_patterns=None + ) + self.assertEqual(len(result), 2) + + def test_deduplicates_across_patterns(self): + """Overlapping patterns should not produce duplicate results.""" + _write(self.base / ".apm" / "instructions" / "style.instructions.md") + result = find_primitive_files( + str(self.base), + [ + "**/.apm/instructions/*.instructions.md", + "**/*.instructions.md", + ], + ) + self.assertEqual(len(result), 1) + + def test_symlink_rejected(self): + """Symlinked files should be filtered out.""" + real = self.base / "real.instructions.md" + _write(real) + link = self.base / "link.instructions.md" + try: + link.symlink_to(real) + except OSError: + self.skipTest("Cannot create symlinks on this platform") + result = find_primitive_files(str(self.base), ["**/*.instructions.md"]) + names = [f.name for f in result] + self.assertIn("real.instructions.md", names) + self.assertNotIn("link.instructions.md", names) + + def test_nonexistent_dir_returns_empty(self): + result = find_primitive_files("/nonexistent/path/1234", ["**/*.md"]) + self.assertEqual(result, []) + + def test_apm_dir_not_skipped(self): + """.apm must NOT be in the default skip set -- primitives live there.""" + self.assertNotIn(".apm", DEFAULT_SKIP_DIRS) + + +# ------------------------------------------------------------------- +# _glob_match segment-aware semantics (PR #870 review C2) +# ------------------------------------------------------------------- +class TestGlobMatchSegmentAware(unittest.TestCase): + """Verify * does not cross / boundaries; ** does.""" + + def test_star_does_not_cross_slash(self): + from apm_cli.primitives.discovery import _glob_match + # Pattern matches one segment under instructions/ only + self.assertTrue(_glob_match(".apm/instructions/x.md", ".apm/instructions/*.md")) + self.assertFalse(_glob_match(".apm/instructions/sub/x.md", ".apm/instructions/*.md")) + + def test_double_star_crosses_slash(self): + from apm_cli.primitives.discovery import _glob_match + self.assertTrue(_glob_match("a/b/c/x.md", "**/x.md")) + self.assertTrue(_glob_match("x.md", "**/x.md")) # zero segments + + def test_star_with_double_star_prefix(self): + from apm_cli.primitives.discovery import _glob_match + # ** then literal then * -- * should still respect / + self.assertTrue(_glob_match("a/b/.apm/instructions/foo.md", + "**/.apm/instructions/*.md")) + self.assertFalse(_glob_match("a/b/.apm/instructions/sub/foo.md", + "**/.apm/instructions/*.md")) + + def test_question_mark_single_char_no_slash(self): + from apm_cli.primitives.discovery import _glob_match + self.assertTrue(_glob_match("ab", "a?")) + self.assertFalse(_glob_match("a/b", "a?b")) + + +# ------------------------------------------------------------------- +# File-level exclude patterns (PR #870 review C1) +# ------------------------------------------------------------------- +class TestFindPrimitiveFilesFileExclude(unittest.TestCase): + """File-level exclude patterns must filter individual files, + not just whole directories.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.base = Path(self.tmp) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_file_pattern_excludes_individual_files(self): + from apm_cli.primitives.discovery import find_primitive_files + + skills_dir = self.base / ".apm" / "skills" + skills_dir.mkdir(parents=True) + (skills_dir / "good.md").touch() + (skills_dir / "bad.draft.md").touch() + + results = find_primitive_files( + str(self.base), + ["**/.apm/skills/*.md"], + exclude_patterns=["**/*.draft.md"], + ) + + names = sorted(p.name for p in results) + self.assertIn("good.md", names) + self.assertNotIn("bad.draft.md", names) + + def test_files_sorted_deterministically(self): + """os.walk does not sort files -- find_primitive_files must.""" + from apm_cli.primitives.discovery import find_primitive_files + + skills_dir = self.base / ".apm" / "skills" + skills_dir.mkdir(parents=True) + # Create files in non-alphabetical order + for name in ["zeta.md", "alpha.md", "mike.md"]: + (skills_dir / name).touch() + + results = find_primitive_files( + str(self.base), + ["**/.apm/skills/*.md"], + ) + names = [p.name for p in results] + self.assertEqual(names, sorted(names)) + + +if __name__ == "__main__": + unittest.main()