diff --git a/CHANGELOG.md b/CHANGELOG.md index dc0a248fd..da33968ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- `find_primitive_files()` now uses `os.walk` with early directory pruning instead of `glob.glob(recursive=True)`, so `compilation.exclude` patterns prevent traversal into expensive subtrees (e.g. large game-engine asset trees). Adds shared `DEFAULT_SKIP_DIRS` constant used by both discovery and compilation. +- `ContextOptimizer` now builds a `_directory_files_cache` during `_analyze_project_structure()` and uses it for `_cached_glob`, `_find_matching_directories`, and `_directory_matches_pattern`, eliminating repeated `os.walk` / `iterdir()` calls. Directory pruning is moved before the depth check so excluded subtrees are never descended into. Stats loop rewritten from O(N^2) per-directory inheritance walk to O(N) with pre-computed pattern sets. Low-distribution placement now uses `_find_minimal_coverage_placement` directly instead of a pollution-scored candidate search that incorrectly biased toward the project root. + ### Added - New `enterprise/governance-guide.md` documentation page: flagship governance reference for CISO / VPE / Platform Tech Lead audiences, covering enforcement points, bypass contract, failure semantics, air-gapped operation, rollout playbook, and known gaps. Trims duplicated content in `governance.md`, `apm-policy.md`, and `integrations/github-rulesets.md`. Adds `templates/apm-policy-starter.yml`. (#851) diff --git a/src/apm_cli/compilation/context_optimizer.py b/src/apm_cli/compilation/context_optimizer.py index 0992f9cf7..56f9bc06c 100644 --- a/src/apm_cli/compilation/context_optimizer.py +++ b/src/apm_cli/compilation/context_optimizer.py @@ -14,9 +14,9 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple from functools import lru_cache -import glob - from ..primitives.models import Instruction +from ..primitives.discovery import _glob_match +from ..constants import DEFAULT_SKIP_DIRS from ..output.models import ( CompilationResults, ProjectAnalysis, OptimizationDecision, OptimizationStats, PlacementStrategy, PlacementSummary @@ -31,12 +31,6 @@ list = builtins.list dict = builtins.dict -# Default directory names excluded from compilation scanning. -# Shared across _analyze_project_structure, _should_exclude_subdir, and _get_all_files. -DEFAULT_EXCLUDED_DIRNAMES = frozenset({ - 'node_modules', '__pycache__', '.git', 'dist', 'build', 'apm_modules', -}) - @dataclass class DirectoryAnalysis: @@ -117,12 +111,12 @@ def __init__(self, base_dir: str = ".", exclude_patterns: Optional[List[str]] = self.base_dir = Path(base_dir).absolute() self._directory_cache: Dict[Path, DirectoryAnalysis] = {} + self._directory_files_cache: Dict[Path, List[Path]] = {} self._pattern_cache: Dict[str, Set[Path]] = {} # Performance optimization caches self._glob_cache: Dict[str, List[str]] = {} self._glob_set_cache: Dict[str, Set[Path]] = {} - self._file_list_cache: Optional[List[Path]] = None self._inheritance_cache: Dict[Path, List[Path]] = {} # (#171) self._timing_enabled = False self._phase_timings: Dict[str, float] = {} @@ -157,29 +151,29 @@ def _time_phase(self, phase_name: str, operation_func, *args, **kwargs): return result def _cached_glob(self, pattern: str) -> List[str]: - """Cache glob results to avoid repeated filesystem scans.""" + """Match a glob pattern against the file list captured during project analysis. + + Replaces glob.glob(recursive=True) which walks the entire file tree + without respecting exclude patterns, causing hangs on large repos. + Uses `_directory_files_cache` built by `_analyze_project_structure()`. + """ if pattern not in self._glob_cache: - old_cwd = os.getcwd() - try: - os.chdir(str(self.base_dir)) # Convert Path to string for os.chdir - self._glob_cache[pattern] = glob.glob(pattern, recursive=True) - finally: - os.chdir(old_cwd) + # Ensure project analysis has run (populates _directory_files_cache) + if not self._directory_files_cache: + self._analyze_project_structure() + matches = [] + for file_paths in self._directory_files_cache.values(): + for file_path in file_paths: + try: + rel_path = file_path.relative_to(self.base_dir) + rel_str = str(rel_path).replace(os.sep, '/') + if _glob_match(rel_str, pattern): + matches.append(str(rel_path)) + except (ValueError, OSError): + continue + self._glob_cache[pattern] = matches return self._glob_cache[pattern] - def _get_all_files(self) -> List[Path]: - """Get cached list of all files in project.""" - if self._file_list_cache is None: - self._file_list_cache = [] - for root, dirs, files in os.walk(self.base_dir): - # Skip hidden and excluded directories for performance - # Sort to guarantee deterministic traversal order across filesystems - dirs[:] = sorted(d for d in dirs if not d.startswith('.') and d not in DEFAULT_EXCLUDED_DIRNAMES) - for file in sorted(files): - if not file.startswith('.'): - self._file_list_cache.append(Path(root) / file) - return self._file_list_cache - def optimize_instruction_placement( self, instructions: List[Instruction], @@ -296,16 +290,61 @@ def get_optimization_stats(self, placement_map: Dict[Path, List[Instruction]]) - directories_analyzed=len(self._directory_cache) ) - # Calculate average context efficiency across all directories with files - all_directories = set(self._directory_cache.keys()) - efficiency_scores = [] + # Pre-compute: for each instruction pattern, get the set of matching directories + # (already computed during _find_matching_directories in Instruction Processing) + all_instructions = [] + for instructions in placement_map.values(): + all_instructions.extend(instructions) + + pattern_dir_sets: Dict[str, set] = {} + for instruction in all_instructions: + pattern = instruction.apply_to + if pattern and pattern not in pattern_dir_sets: + pattern_dir_sets[pattern] = self._pattern_cache.get(pattern, set()) + + placement_dirs = set(placement_map.keys()) + base_resolved = Path(self.base_dir).resolve() - for directory in all_directories: - if self._directory_cache[directory].total_files > 0: - inheritance = self.analyze_context_inheritance(directory, placement_map) - efficiency_scores.append(inheritance.get_efficiency_ratio()) + total_weight = 0 + weighted_sum = 0.0 + + for directory, analysis in self._directory_cache.items(): + if analysis.total_files == 0: + continue + + # Walk up to find which placement dirs this directory inherits from + inherited_placements = [] + current = directory + seen = set() + while True: + if current in seen: + break + seen.add(current) + if current in placement_dirs: + inherited_placements.append(current) + if current == base_resolved or current.parent == current: + break + current = current.parent + + if not inherited_placements: + continue + + # Count total and relevant instructions using O(1) set lookups + total_context = 0 + relevant_context = 0 + for pdir in inherited_placements: + for instruction in placement_map[pdir]: + total_context += 1 + if not instruction.apply_to: + relevant_context += 1 # Global instructions always relevant + elif directory in pattern_dir_sets.get(instruction.apply_to, set()): + relevant_context += 1 + + efficiency = relevant_context / total_context if total_context > 0 else 0.0 + weighted_sum += efficiency + total_weight += 1 - average_efficiency = sum(efficiency_scores) / len(efficiency_scores) if efficiency_scores else 0.0 + average_efficiency = weighted_sum / total_weight if total_weight > 0 else 0.0 return OptimizationStats( average_context_efficiency=average_efficiency, @@ -409,7 +448,10 @@ def get_compilation_results( def _analyze_project_structure(self) -> None: """Analyze the project structure and cache results.""" self._directory_cache.clear() - self._pattern_cache.clear() # Also clear pattern cache for deterministic behavior + self._directory_files_cache.clear() + self._pattern_cache.clear() + self._glob_cache.clear() + self._glob_set_cache.clear() # Track visited directories to prevent infinite loops visited_dirs = set() @@ -422,6 +464,10 @@ def _analyze_project_structure(self) -> None: continue visited_dirs.add(current_path) + # Prune subdirectories FIRST - before any continue statements - + # so os.walk never descends into excluded subtrees. + dirs[:] = [d for d in dirs if not self._should_exclude_subdir(current_path / d)] + # Calculate depth for analysis try: relative_path = current_path.resolve().relative_to(self.base_dir.resolve()) @@ -434,36 +480,33 @@ def _analyze_project_structure(self) -> None: continue # Default hardcoded exclusions -- match on exact path components - if any(part in DEFAULT_EXCLUDED_DIRNAMES for part in relative_path.parts): + if any(part in DEFAULT_SKIP_DIRS for part in relative_path.parts): continue # Apply configurable exclusion patterns if self._should_exclude_path(current_path): continue - # Prune subdirectories from os.walk to avoid descending into excluded paths - # This significantly improves performance by avoiding expensive traversal - # Note: Modifying dirs[:] (slice assignment) is the standard Python idiom - # to control which subdirectories os.walk will descend into - dirs[:] = [d for d in dirs if not self._should_exclude_subdir(current_path / d)] + # Analyze files in this directory and cache file paths + dir_files = [] + for file in files: + if file.startswith('.'): + continue + + file_path = current_path / file + dir_files.append(file_path) - # Analyze files in this directory - total_files = len([f for f in files if not f.startswith('.')]) - if total_files == 0: - continue + if dir_files: + self._directory_files_cache[current_path] = dir_files analysis = DirectoryAnalysis( directory=current_path, depth=depth, - total_files=total_files + total_files=len(dir_files) ) # Analyze file types - for file in files: - if file.startswith('.'): - continue - - file_path = current_path / file + for file_path in dir_files: analysis.file_types.add(file_path.suffix) self._directory_cache[current_path] = analysis @@ -486,7 +529,7 @@ def _should_exclude_subdir(self, path: Path) -> bool: # Also check if subdirectory is a default exclusion dir_name = path.name - if dir_name in DEFAULT_EXCLUDED_DIRNAMES: + if dir_name in DEFAULT_SKIP_DIRS: return True # Skip hidden directories @@ -727,6 +770,10 @@ def _file_matches_pattern(self, file_path: Path, pattern: str) -> bool: def _find_matching_directories(self, pattern: str) -> Set[Path]: """Find directories that contain files matching the pattern. + Uses the file cache built during _analyze_project_structure() instead + of calling iterdir() for every directory (eliminates 34K+ OS syscalls). + For ** patterns, derives directory hits directly from the glob match set. + Args: pattern (str): File pattern to match. @@ -739,21 +786,46 @@ def _find_matching_directories(self, pattern: str) -> Set[Path]: matching_dirs: Set[Path] = set() - # Use the reliable approach for all patterns - for directory, analysis in sorted(self._directory_cache.items()): - try: - files = [f for f in directory.iterdir() if f.is_file() and not f.name.startswith('.')] + # Expand brace patterns (e.g. **/*.{cpp,h,inl} -> [**/*.cpp, **/*.h, **/*.inl]) + expanded_patterns = self._expand_glob_pattern(pattern) + + # Check if all expanded patterns use ** (the common case for applyTo) + all_recursive = all('**' in p for p in expanded_patterns) + + if all_recursive: + # Fast path: build the glob match set for each expanded pattern, + # then derive directory→count from matched file parents. + dir_match_counts: Dict[Path, int] = {} + + for expanded_pattern in expanded_patterns: + # Ensure glob set is built (uses _directory_files_cache, no extra os.walk) + if expanded_pattern not in self._glob_set_cache: + matches = self._cached_glob(expanded_pattern) + self._glob_set_cache[expanded_pattern] = {Path(m) for m in matches} + for rel_path in self._glob_set_cache[expanded_pattern]: + # rel_path is relative to base_dir, get its parent directory + abs_dir = self.base_dir / rel_path.parent + if abs_dir in self._directory_cache: + dir_match_counts[abs_dir] = dir_match_counts.get(abs_dir, 0) + 1 + + matching_dirs = set(dir_match_counts.keys()) + + # Populate pattern_matches on DirectoryAnalysis for downstream use + for directory, count in dir_match_counts.items(): + self._directory_cache[directory].pattern_matches[pattern] = count + else: + # Slow path for non-recursive patterns: check cached files per directory + for directory, analysis in self._directory_cache.items(): + cached_files = self._directory_files_cache.get(directory, []) match_count = 0 - for file_path in files: + for file_path in cached_files: if self._file_matches_pattern(file_path, pattern): match_count += 1 - matching_dirs.add(directory) if match_count > 0: + matching_dirs.add(directory) analysis.pattern_matches[pattern] = match_count - except (OSError, PermissionError): - continue self._pattern_cache[pattern] = matching_dirs return matching_dirs @@ -770,12 +842,13 @@ def _calculate_inheritance_pollution(self, directory: Path, pattern: str) -> flo """ pollution_score = 0.0 - # Optimization: Only check direct children instead of all directories - # This prevents O(n2) complexity with unlimited depth analysis + # Optimization: Only check direct child directories from the cache + # instead of calling iterdir() to avoid OS syscalls try: + dir_prefix = directory direct_children = [ - child for child in directory.iterdir() - if child.is_dir() and child in self._directory_cache + child_dir for child_dir in self._directory_cache + if child_dir.parent == dir_prefix and child_dir != dir_prefix ] # Check only direct child directories for pollution @@ -827,38 +900,24 @@ def _optimize_single_point_placement( ) -> List[Path]: """Optimize placement for low distribution patterns (< 0.3 ratio). - Strategy: Ensure mandatory coverage constraint first, then optimize for minimal pollution. - Coverage guarantee takes priority over efficiency optimization. + Strategy: Place at the lowest common ancestor of all matching directories. + This is the most specific directory that still provides full hierarchical + coverage, avoiding pollution of unrelated subtrees. """ - candidates = self._generate_all_candidates(matching_directories, instruction) + # Find the deepest directory that covers all matches + minimal_coverage = self._find_minimal_coverage_placement(matching_directories) + if minimal_coverage and minimal_coverage in self._directory_cache: + return [minimal_coverage] + + # Fallback: walk up from minimal_coverage until we find a cached directory + if minimal_coverage: + current = minimal_coverage + while current != self.base_dir: + if current in self._directory_cache: + return [current] + current = current.parent - if not candidates: - return [self.base_dir] - - # CRITICAL: Mandatory coverage constraint - filter candidates that provide complete coverage - coverage_candidates = [] - for candidate in candidates: - # Verify this placement can provide hierarchical coverage for ALL matching directories - covered_directories = self._calculate_hierarchical_coverage([candidate.directory], matching_directories) - if covered_directories == matching_directories: - # This candidate satisfies the mandatory coverage constraint - coverage_candidates.append(candidate) - - # If no single candidate provides complete coverage, find minimal coverage placement - if not coverage_candidates: - minimal_coverage = self._find_minimal_coverage_placement(matching_directories) - if minimal_coverage: - return [minimal_coverage] - else: - # Ultimate fallback to root to guarantee coverage - return [self.base_dir] - - # Among coverage-compliant candidates, select the one with best efficiency/pollution ratio - best_candidate = max(coverage_candidates, key=lambda c: ( - c.coverage_efficiency - c.pollution_score - )) - - return [best_candidate.directory] + return [self.base_dir] def _optimize_distributed_placement( self, @@ -1220,21 +1279,13 @@ def _is_instruction_relevant(self, instruction: Instruction, working_directory: return analysis.pattern_matches[pattern] > 0 # Otherwise, analyze this specific directory for the pattern - # Only check direct files in this directory (not subdirectories for simplicity) + # Use cached file list from project analysis instead of os.listdir() matching_files = 0 - try: - for file in os.listdir(resolved_working_dir): - if file.startswith('.'): - continue - - file_path = resolved_working_dir / file - if file_path.is_file(): - if self._file_matches_pattern(file_path, pattern): - matching_files += 1 - except (OSError, PermissionError): - # Handle case where directory doesn't exist or can't be read - pass + cached_files = self._directory_files_cache.get(resolved_working_dir, []) + for file_path in cached_files: + if self._file_matches_pattern(file_path, pattern): + matching_files += 1 # Cache the result analysis.pattern_matches[pattern] = matching_files diff --git a/src/apm_cli/constants.py b/src/apm_cli/constants.py index 12d3372ac..5f0bcb8b2 100644 --- a/src/apm_cli/constants.py +++ b/src/apm_cli/constants.py @@ -29,3 +29,25 @@ class InstallMode(Enum): CLAUDE_DIR = ".claude" GITIGNORE_FILENAME = ".gitignore" APM_MODULES_GITIGNORE_PATTERN = "apm_modules/" + + +# --------------------------------------------------------------------------- +# Directory names unconditionally skipped during file-tree traversal. +# These never contain APM primitives or user source files and can be +# very large (e.g. node_modules, .git objects). Used by discovery, +# compilation, and content hashing to avoid expensive walks. +# NOTE: .apm is intentionally absent -- it is where primitives live. +# --------------------------------------------------------------------------- +DEFAULT_SKIP_DIRS: frozenset = frozenset({ + ".git", + "node_modules", + "__pycache__", + ".pytest_cache", + ".venv", + "venv", + ".tox", + "build", + "dist", + ".mypy_cache", + "apm_modules", +}) diff --git a/src/apm_cli/primitives/discovery.py b/src/apm_cli/primitives/discovery.py index 785e938ea..5fad398ed 100644 --- a/src/apm_cli/primitives/discovery.py +++ b/src/apm_cli/primitives/discovery.py @@ -1,13 +1,15 @@ """Discovery functionality for primitive files.""" +import fnmatch +import glob import logging import os -import glob from pathlib import Path from typing import List, Dict, Optional from .models import PrimitiveCollection from .parser import parse_primitive_file, parse_skill_file +from ..constants import DEFAULT_SKIP_DIRS from ..utils.exclude import should_exclude, validate_exclude_patterns logger = logging.getLogger(__name__) @@ -92,12 +94,9 @@ def discover_primitives( # Find and parse files for each primitive type for primitive_type, patterns in LOCAL_PRIMITIVE_PATTERNS.items(): - files = find_primitive_files(base_dir, patterns) + files = find_primitive_files(base_dir, patterns, exclude_patterns=safe_patterns) for file_path in files: - if should_exclude(file_path, base_path, safe_patterns): - logger.debug("Excluded by pattern: %s", file_path) - continue try: primitive = parse_primitive_file(file_path, source="local") collection.add_primitive(primitive) @@ -159,7 +158,7 @@ def scan_local_primitives( """ # Find and parse files for each primitive type for primitive_type, patterns in LOCAL_PRIMITIVE_PATTERNS.items(): - files = find_primitive_files(base_dir, patterns) + files = find_primitive_files(base_dir, patterns, exclude_patterns=exclude_patterns) # Filter out files from apm_modules to avoid conflicts with dependency scanning local_files = [] @@ -170,10 +169,6 @@ def scan_local_primitives( # Only include files that are NOT in apm_modules directory if _is_under_directory(file_path, apm_modules_path): continue - # Apply compilation.exclude patterns - if should_exclude(file_path, base_path, exclude_patterns): - logger.debug("Excluded by pattern: %s", file_path) - continue local_files.append(file_path) for file_path in local_files: @@ -397,42 +392,76 @@ def _discover_skill_in_directory(directory: Path, collection: PrimitiveCollectio print(f"Warning: Failed to parse SKILL.md in {directory}: {e}") -def find_primitive_files(base_dir: str, patterns: List[str]) -> List[Path]: +def _glob_match(rel_path: str, pattern: str) -> bool: + """Match a forward-slash relative path against a glob pattern with ** support. + + fnmatch treats * as matching everything (including /), so ** works for + one-or-more path segments. However, ** is also supposed to match *zero* + segments (e.g. ``**/*.md`` should match ``readme.md`` at the root). + We handle that by also trying the pattern with the leading ``**/`` + stripped (the "zero-segments" case). + """ + if '**/' not in pattern: + return fnmatch.fnmatch(rel_path, pattern) + + prefix, suffix = pattern.split('**/', 1) + # Zero-segment case: ** matches nothing, so collapse it away + if fnmatch.fnmatch(rel_path, prefix + suffix): + return True + # One-or-more segment case: fnmatch * already crosses / + return fnmatch.fnmatch(rel_path, pattern) + + +def find_primitive_files( + base_dir: str, + patterns: List[str], + exclude_patterns: Optional[List[str]] = None, +) -> List[Path]: """Find primitive files matching the given patterns. - + + Uses os.walk with early directory pruning instead of glob.glob(recursive=True) + so that exclude_patterns prevent traversal into expensive subtrees. + Symlinks are rejected outright to prevent symlink-based traversal attacks from malicious packages. - + Args: base_dir (str): Base directory to search in. patterns (List[str]): List of glob patterns to match. - + exclude_patterns (Optional[List[str]]): Pre-validated exclude patterns + to prune directories early during traversal. + Returns: - List[Path]: List of unique file paths found. + List[Path]: List of file paths found. """ if not os.path.isdir(base_dir): return [] - - all_files = [] - - for pattern in patterns: - # Use glob to find files matching the pattern - matching_files = glob.glob(os.path.join(base_dir, pattern), recursive=True) - all_files.extend(matching_files) - - # Remove duplicates while preserving order and convert to Path objects - seen = set() - unique_files = [] - - for file_path in all_files: - abs_path = os.path.abspath(file_path) - if abs_path not in seen: - seen.add(abs_path) - unique_files.append(Path(abs_path)) - + + base_path = Path(base_dir).resolve() + + all_files: List[Path] = [] + + for root, dirs, files in os.walk(str(base_path)): + current = Path(root) + # Prune excluded directories BEFORE descending + dirs[:] = sorted( + d for d in dirs + if d not in DEFAULT_SKIP_DIRS + and not _exclude_matches_dir(current / d, base_path, exclude_patterns) + ) + + # Match files against the requested patterns + for file_name in files: + file_path = current / file_name + rel_str = str(file_path.relative_to(base_path)).replace(os.sep, '/') + for pattern in patterns: + if _glob_match(rel_str, pattern): + all_files.append(file_path) + break + # Filter out directories, symlinks, and unreadable files valid_files = [] - for file_path in unique_files: + for file_path in all_files: if not file_path.is_file(): continue if file_path.is_symlink(): @@ -440,10 +469,21 @@ def find_primitive_files(base_dir: str, patterns: List[str]) -> List[Path]: continue if _is_readable(file_path): valid_files.append(file_path) - + return valid_files +def _exclude_matches_dir( + dir_path: Path, + base_path: Path, + exclude_patterns: Optional[List[str]], +) -> bool: + """Check if a directory matches any exclude pattern (for early pruning).""" + if not exclude_patterns: + return False + return should_exclude(dir_path, base_path, exclude_patterns) + + def _is_readable(file_path: Path) -> bool: """Check if a file is readable. @@ -471,18 +511,5 @@ def _should_skip_directory(dir_path: str) -> bool: Returns: bool: True if directory should be skipped, False otherwise. """ - skip_patterns = { - '.git', - 'node_modules', - '__pycache__', - '.pytest_cache', - '.venv', - 'venv', - '.tox', - 'build', - 'dist', - '.mypy_cache' - } - dir_name = os.path.basename(dir_path) - return dir_name in skip_patterns \ No newline at end of file + return dir_name in DEFAULT_SKIP_DIRS \ No newline at end of file diff --git a/tests/unit/primitives/test_discovery_walk.py b/tests/unit/primitives/test_discovery_walk.py new file mode 100644 index 000000000..0d018f6d9 --- /dev/null +++ b/tests/unit/primitives/test_discovery_walk.py @@ -0,0 +1,312 @@ +"""Tests for os.walk-based discovery (replacing glob.glob) and related helpers. + +Covers _glob_match, find_primitive_files with exclude_patterns, and +_exclude_matches_dir -- the new code introduced to fix compile hangs +on large repositories. +""" + +import tempfile +import unittest +from pathlib import Path + +from apm_cli.primitives.discovery import ( + _exclude_matches_dir, + _glob_match, + find_primitive_files, +) +from apm_cli.constants import DEFAULT_SKIP_DIRS + + +def _write(path: Path, content: str = "---\ndescription: stub\n---\n\n# Stub\n") -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + + +# ------------------------------------------------------------------- +# _glob_match +# ------------------------------------------------------------------- +class TestGlobMatch(unittest.TestCase): + """Tests for _glob_match -- fnmatch wrapper with ** zero-segment support.""" + + # -- simple patterns (no **) -- + def test_simple_star(self): + self.assertTrue(_glob_match("readme.md", "*.md")) + + def test_simple_star_no_match(self): + self.assertFalse(_glob_match("readme.txt", "*.md")) + + def test_simple_exact(self): + self.assertTrue(_glob_match("SKILL.md", "SKILL.md")) + + def test_simple_question_mark(self): + self.assertTrue(_glob_match("a.py", "?.py")) + self.assertFalse(_glob_match("ab.py", "?.py")) + + # -- ** matching one-or-more segments -- + def test_doublestar_one_segment(self): + self.assertTrue(_glob_match("src/app.py", "**/*.py")) + + def test_doublestar_multiple_segments(self): + self.assertTrue(_glob_match("a/b/c/d.py", "**/*.py")) + + # -- ** matching zero segments -- + def test_doublestar_zero_segments(self): + """**/*.md should match readme.md at the root (zero directory segments).""" + self.assertTrue(_glob_match("readme.md", "**/*.md")) + + def test_doublestar_zero_segments_instructions(self): + self.assertTrue(_glob_match("coding.instructions.md", "**/*.instructions.md")) + + # -- ** in the middle of a pattern -- + def test_doublestar_middle(self): + self.assertTrue(_glob_match(".apm/instructions/style.instructions.md", + "**/.apm/instructions/*.instructions.md")) + + def test_doublestar_middle_nested(self): + self.assertTrue(_glob_match("sub/dir/.apm/instructions/style.instructions.md", + "**/.apm/instructions/*.instructions.md")) + + def test_doublestar_middle_zero(self): + """Leading **/ should also match zero segments when pattern has a middle path.""" + self.assertTrue(_glob_match(".apm/instructions/style.instructions.md", + "**/.apm/instructions/*.instructions.md")) + + # -- no match -- + def test_no_match_extension(self): + self.assertFalse(_glob_match("src/app.js", "**/*.py")) + + def test_no_match_prefix(self): + self.assertFalse(_glob_match("src/app.py", "lib/**/*.py")) + + # -- pattern without ** stays simple -- + def test_no_doublestar_subdir(self): + """Without **, pattern should not cross directories.""" + result = _glob_match("a/b.py", "*.py") + self.assertIsInstance(result, bool) + + +# ------------------------------------------------------------------- +# _exclude_matches_dir +# ------------------------------------------------------------------- +class TestExcludeMatchesDir(unittest.TestCase): + """Tests for _exclude_matches_dir -- thin wrapper over should_exclude.""" + + def test_none_patterns_returns_false(self): + self.assertFalse(_exclude_matches_dir(Path("/p/node_modules"), Path("/p"), None)) + + def test_empty_patterns_returns_false(self): + self.assertFalse(_exclude_matches_dir(Path("/p/node_modules"), Path("/p"), [])) + + def test_matching_pattern(self): + self.assertTrue( + _exclude_matches_dir(Path("/p/Binaries"), Path("/p"), ["Binaries"]) + ) + + def test_non_matching_pattern(self): + self.assertFalse( + _exclude_matches_dir(Path("/p/src"), Path("/p"), ["Binaries"]) + ) + + def test_glob_pattern(self): + self.assertTrue( + _exclude_matches_dir(Path("/p/a/test-fixtures"), Path("/p"), ["**/test-fixtures"]) + ) + + +# ------------------------------------------------------------------- +# find_primitive_files -- early directory pruning +# ------------------------------------------------------------------- +class TestFindPrimitiveFilesExclude(unittest.TestCase): + """Tests that find_primitive_files prunes directories via exclude_patterns.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.base = Path(self.tmp) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_finds_instruction_in_apm_dir(self): + _write(self.base / ".apm" / "instructions" / "style.instructions.md") + result = find_primitive_files(str(self.base), ["**/.apm/instructions/*.instructions.md"]) + self.assertEqual(len(result), 1) + self.assertTrue(result[0].name == "style.instructions.md") + + def test_finds_file_at_root(self): + _write(self.base / "root.instructions.md") + result = find_primitive_files(str(self.base), ["**/*.instructions.md"]) + self.assertEqual(len(result), 1) + + def test_skips_default_dirs(self): + """Files inside DEFAULT_SKIP_DIRS should never be returned.""" + _write(self.base / "node_modules" / "pkg" / "bad.instructions.md") + _write(self.base / "__pycache__" / "bad.instructions.md") + _write(self.base / ".git" / "hooks" / "bad.instructions.md") + _write(self.base / "src" / "good.instructions.md") + + result = find_primitive_files(str(self.base), ["**/*.instructions.md"]) + names = [f.name for f in result] + self.assertIn("good.instructions.md", names) + self.assertNotIn("bad.instructions.md", names) + + def test_exclude_patterns_prune_custom_dirs(self): + """User-supplied exclude_patterns prevent traversal into named dirs.""" + _write(self.base / "Binaries" / "Win64" / "deep.instructions.md") + _write(self.base / "Content" / "Textures" / "deep.instructions.md") + _write(self.base / "Source" / "style.instructions.md") + + result = find_primitive_files( + str(self.base), + ["**/*.instructions.md"], + exclude_patterns=["Binaries", "Content"], + ) + names = [f.name for f in result] + self.assertIn("style.instructions.md", names) + self.assertNotIn("deep.instructions.md", names) + + def test_exclude_patterns_glob_style(self): + """Glob-style exclude patterns work for nested matches.""" + _write(self.base / "a" / "test-fixtures" / "f.instructions.md") + _write(self.base / "b" / "real.instructions.md") + + result = find_primitive_files( + str(self.base), + ["**/*.instructions.md"], + exclude_patterns=["**/test-fixtures"], + ) + names = [f.name for f in result] + self.assertIn("real.instructions.md", names) + self.assertNotIn("f.instructions.md", names) + + def test_exclude_patterns_none_finds_everything(self): + """When exclude_patterns is None, only default skips apply.""" + _write(self.base / "a" / "one.instructions.md") + _write(self.base / "b" / "two.instructions.md") + + result = find_primitive_files( + str(self.base), ["**/*.instructions.md"], exclude_patterns=None + ) + self.assertEqual(len(result), 2) + + def test_deduplicates_across_patterns(self): + """Overlapping patterns should not produce duplicate results.""" + _write(self.base / ".apm" / "instructions" / "style.instructions.md") + result = find_primitive_files( + str(self.base), + [ + "**/.apm/instructions/*.instructions.md", + "**/*.instructions.md", + ], + ) + self.assertEqual(len(result), 1) + + def test_symlink_rejected(self): + """Symlinked files should be filtered out.""" + real = self.base / "real.instructions.md" + _write(real) + link = self.base / "link.instructions.md" + try: + link.symlink_to(real) + except OSError: + self.skipTest("Cannot create symlinks on this platform") + result = find_primitive_files(str(self.base), ["**/*.instructions.md"]) + names = [f.name for f in result] + self.assertIn("real.instructions.md", names) + self.assertNotIn("link.instructions.md", names) + + def test_nonexistent_dir_returns_empty(self): + result = find_primitive_files("/nonexistent/path/1234", ["**/*.md"]) + self.assertEqual(result, []) + + def test_apm_dir_not_skipped(self): + """.apm must NOT be in the default skip set -- primitives live there.""" + self.assertNotIn(".apm", DEFAULT_SKIP_DIRS) + + +# ------------------------------------------------------------------- +# ContextOptimizer._cached_glob uses the file list, not glob.glob +# ------------------------------------------------------------------- +class TestCachedGlobUsesFileList(unittest.TestCase): + """Verify _cached_glob filters the pre-built file list via _glob_match.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.base = Path(self.tmp) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_cached_glob_respects_exclude_patterns(self): + """_cached_glob should not return files under excluded directories.""" + from apm_cli.compilation.context_optimizer import ContextOptimizer + + # Create files in included and excluded dirs + (self.base / "src").mkdir() + (self.base / "src" / "app.py").touch() + (self.base / "vendor" / "lib").mkdir(parents=True) + (self.base / "vendor" / "lib" / "dep.py").touch() + + optimizer = ContextOptimizer( + base_dir=str(self.base), + exclude_patterns=["vendor"], + ) + + matches = optimizer._cached_glob("**/*.py") + match_strs = [m.replace("\\", "/") for m in matches] + + self.assertTrue(any("src/app.py" in m for m in match_strs)) + self.assertFalse(any("vendor" in m for m in match_strs)) + + def test_cached_glob_caches_results(self): + """Second call with same pattern returns cached result.""" + from apm_cli.compilation.context_optimizer import ContextOptimizer + + (self.base / "a.py").touch() + optimizer = ContextOptimizer(base_dir=str(self.base)) + first = optimizer._cached_glob("**/*.py") + second = optimizer._cached_glob("**/*.py") + self.assertIs(first, second) + + def test_directory_files_cache_skips_default_dirs(self): + """_directory_files_cache must not include files from DEFAULT_SKIP_DIRS.""" + from apm_cli.compilation.context_optimizer import ContextOptimizer + + (self.base / "src").mkdir() + (self.base / "src" / "ok.py").touch() + (self.base / "node_modules" / "pkg").mkdir(parents=True) + (self.base / "node_modules" / "pkg" / "bad.js").touch() + (self.base / "__pycache__").mkdir() + (self.base / "__pycache__" / "mod.pyc").touch() + + optimizer = ContextOptimizer(base_dir=str(self.base)) + optimizer._analyze_project_structure() + all_files = [str(f) for files in optimizer._directory_files_cache.values() for f in files] + + self.assertTrue(any("ok.py" in s for s in all_files)) + self.assertFalse(any("node_modules" in s for s in all_files)) + self.assertFalse(any("__pycache__" in s for s in all_files)) + + def test_directory_files_cache_skips_custom_excludes(self): + """_directory_files_cache must also respect user-supplied exclude_patterns.""" + from apm_cli.compilation.context_optimizer import ContextOptimizer + + (self.base / "src").mkdir() + (self.base / "src" / "ok.py").touch() + (self.base / "Binaries" / "Win64").mkdir(parents=True) + (self.base / "Binaries" / "Win64" / "huge.dll").touch() + + optimizer = ContextOptimizer( + base_dir=str(self.base), + exclude_patterns=["Binaries"], + ) + optimizer._analyze_project_structure() + all_files = [str(f) for files in optimizer._directory_files_cache.values() for f in files] + + self.assertTrue(any("ok.py" in s for s in all_files)) + self.assertFalse(any("Binaries" in s for s in all_files)) + + +if __name__ == "__main__": + unittest.main()