From 4995a2bb9c2ccd8838e68e3c81134fb3ec82501d Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Mon, 4 May 2026 21:25:20 +0300 Subject: [PATCH 1/5] fix: Windows-compatible path handling in JS/Go parsers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three Windows-only failures preventing OpenAnt from running correctly on Windows. Ports the path/encoding fixes from joshbouncesecurity#3 with a more nuanced Unicode strategy. 1. ts-morph backslash interpretation. path.relative() and path.resolve() produce backslash-separated paths on Windows; ts-morph treats those backslashes as escape characters when matching paths it has already added, so the JS analyzer silently emitted 0 functions. Normalise to forward slashes via a toPosixPath() helper before handing paths to ts-morph and before storing them in functionId values. 2. CRLF-tainted file lists. The --files-from consumer split on \n only, leaving a trailing \r on every path when the list was written on Windows. Switch to a /\r?\n/ split with per-line trim(). 3. cp1252 console crashes. The "Pipeline Test" status output used the Unicode glyphs U+2713 / U+2717 / U+2192, which crash on cp1252-encoded stdouts (the Windows default). Detect at import time whether stdout can encode those symbols and fall back to ASCII (OK / FAIL / ->) only when it cannot, so UTF-8 terminals keep the prettier output. Also remove the now-stale Windows xfail/skip markers from tests/ test_js_parser.py and tests/test_go_cli.py — the underlying bugs are fixed, so those guards no longer apply. Adds tests/test_windows_path_handling.py covering all three fixes: - ts-morph accepts a backslash-only file list and produces non-empty POSIX-form functionId values - the analyzer accepts a CRLF-terminated file list with a trailing blank line - both pipeline modules pick the ASCII fallback under a cp1252 stdout and the Unicode form under a UTF-8 stdout Refs knostic#16 (item 8). Co-Authored-By: Claude Opus 4.7 (1M context) --- libs/openant-core/parsers/go/test_pipeline.py | 68 +++-- .../parsers/javascript/test_pipeline.py | 74 ++++-- .../parsers/javascript/typescript_analyzer.js | 44 +++- libs/openant-core/tests/test_go_cli.py | 16 +- libs/openant-core/tests/test_js_parser.py | 18 -- .../tests/test_windows_path_handling.py | 239 ++++++++++++++++++ 6 files changed, 371 insertions(+), 88 deletions(-) create mode 100644 libs/openant-core/tests/test_windows_path_handling.py diff --git a/libs/openant-core/parsers/go/test_pipeline.py b/libs/openant-core/parsers/go/test_pipeline.py index 8fe05b8..078d799 100644 --- a/libs/openant-core/parsers/go/test_pipeline.py +++ b/libs/openant-core/parsers/go/test_pipeline.py @@ -43,6 +43,24 @@ from pathlib import Path from typing import Set + +def _stdout_supports_unicode() -> bool: + """Return True if sys.stdout can emit the symbols we use for status.""" + encoding = getattr(sys.stdout, "encoding", None) or "" + try: + # Probe with the actual symbols we emit. This catches cp1252 and + # other limited code pages without us having to enumerate them. + "✓✗→".encode(encoding) + return True + except (UnicodeEncodeError, LookupError): + return False + + +_UNICODE_OK = _stdout_supports_unicode() +SYM_OK = "✓" if _UNICODE_OK else "OK" +SYM_FAIL = "✗" if _UNICODE_OK else "FAIL" +SYM_ARROW = "→" if _UNICODE_OK else "->" + # Add parent directory to path for utilities import sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from utilities.context_enhancer import ContextEnhancer @@ -158,7 +176,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: } if result.returncode == 0: - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") print() # Print stderr (often contains summary info) if result.stderr: @@ -172,7 +190,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: data = json.load(f) stage_result['summary'] = self._summarize_output(name, data) else: - print(f"✗ Failed (exit code {result.returncode})") + print(f"{SYM_FAIL} Failed (exit code {result.returncode})") print() if result.stderr: print("STDERR:") @@ -185,7 +203,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") return { 'success': False, 'elapsed_seconds': elapsed, @@ -378,9 +396,9 @@ def apply_reachability_filter(self) -> bool: 'summary': summary } - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") print(f" Entry points detected: {len(self.entry_points)}") - print(f" Units: {original_count} → {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") + print(f" Units: {original_count} {SYM_ARROW} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") print() self.results['stages']['reachability_filter'] = result @@ -388,7 +406,7 @@ def apply_reachability_filter(self) -> bool: except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") import traceback traceback.print_exc() result = { @@ -442,7 +460,7 @@ def run_codeql_analysis(self) -> bool: ) if result.returncode != 0: - print(f"✗ CodeQL database creation failed") + print(f"{SYM_FAIL} CodeQL database creation failed") print(f" stderr: {result.stderr[:500] if result.stderr else 'none'}") elapsed = (datetime.now() - start_time).total_seconds() self.results['stages']['codeql_analysis'] = { @@ -473,7 +491,7 @@ def run_codeql_analysis(self) -> bool: ) if result.returncode != 0: - print(f"✗ CodeQL analysis failed") + print(f"{SYM_FAIL} CodeQL analysis failed") print(f" stderr: {result.stderr[:500] if result.stderr else 'none'}") elapsed = (datetime.now() - start_time).total_seconds() self.results['stages']['codeql_analysis'] = { @@ -489,7 +507,7 @@ def run_codeql_analysis(self) -> bool: # Step 3: Parse SARIF output print("Parsing results...") if not os.path.exists(sarif_output): - print("✗ SARIF output not found") + print("{SYM_FAIL} SARIF output not found") elapsed = (datetime.now() - start_time).total_seconds() self.results['stages']['codeql_analysis'] = { 'success': False, @@ -550,7 +568,7 @@ def run_codeql_analysis(self) -> bool: 'summary': summary } - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") print(f" Total findings: {len(self.codeql_findings)}") print(f" Unique files: {summary['unique_files']}") if summary['by_level']: @@ -562,7 +580,7 @@ def run_codeql_analysis(self) -> bool: except FileNotFoundError: elapsed = (datetime.now() - start_time).total_seconds() - print("✗ CodeQL not found. Please install CodeQL CLI.") + print("{SYM_FAIL} CodeQL not found. Please install CodeQL CLI.") print(" See: https://docs.github.com/en/code-security/codeql-cli") self.results['stages']['codeql_analysis'] = { 'success': False, @@ -573,7 +591,7 @@ def run_codeql_analysis(self) -> bool: except subprocess.TimeoutExpired: elapsed = (datetime.now() - start_time).total_seconds() - print("✗ CodeQL analysis timed out") + print("{SYM_FAIL} CodeQL analysis timed out") self.results['stages']['codeql_analysis'] = { 'success': False, 'elapsed_seconds': elapsed, @@ -583,7 +601,7 @@ def run_codeql_analysis(self) -> bool: except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") import traceback traceback.print_exc() self.results['stages']['codeql_analysis'] = { @@ -695,10 +713,10 @@ def apply_codeql_filter(self) -> bool: 'summary': summary } - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") print(f" CodeQL findings: {len(self.codeql_findings)}") print(f" Flagged function units: {len(self.codeql_flagged_units)}") - print(f" Units: {original_count} → {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") + print(f" Units: {original_count} {SYM_ARROW} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") print() self.results['stages']['codeql_filter'] = result @@ -706,7 +724,7 @@ def apply_codeql_filter(self) -> bool: except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") import traceback traceback.print_exc() result = { @@ -784,14 +802,14 @@ def run_context_enhancer(self) -> bool: } print() - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") self.results['stages']['context_enhancer'] = result return True except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") import traceback traceback.print_exc() result = { @@ -873,12 +891,12 @@ def apply_exploitable_filter(self) -> bool: 'summary': summary } - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") print(f" Classification breakdown:") for cls, count in sorted(classification_counts.items()): - marker = "→" if cls == "exploitable" else " " + marker = "{SYM_ARROW}" if cls == "exploitable" else " " print(f" {marker} {cls}: {count}") - print(f" Units: {original_count} → {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") + print(f" Units: {original_count} {SYM_ARROW} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") print() self.results['stages']['exploitable_filter'] = result @@ -886,7 +904,7 @@ def apply_exploitable_filter(self) -> bool: except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") import traceback traceback.print_exc() result = { @@ -966,13 +984,13 @@ def run_full_pipeline(self): self.results['success'] = all_success if all_success: - print("✓ All stages completed successfully") + print("{SYM_OK} All stages completed successfully") else: - print("✗ Some stages failed") + print("{SYM_FAIL} Some stages failed") print() for stage_name, stage_result in self.results['stages'].items(): - status = "✓" if stage_result.get('success') else "✗" + status = "{SYM_OK}" if stage_result.get('success') else "{SYM_FAIL}" elapsed = stage_result.get('elapsed_seconds', 0) print(f" {status} {stage_name}: {elapsed:.2f}s") diff --git a/libs/openant-core/parsers/javascript/test_pipeline.py b/libs/openant-core/parsers/javascript/test_pipeline.py index 77ab9c4..b80ed85 100644 --- a/libs/openant-core/parsers/javascript/test_pipeline.py +++ b/libs/openant-core/parsers/javascript/test_pipeline.py @@ -42,6 +42,24 @@ from pathlib import Path from typing import Set, Tuple + +def _stdout_supports_unicode() -> bool: + """Return True if sys.stdout can emit the symbols we use for status.""" + encoding = getattr(sys.stdout, "encoding", None) or "" + try: + # Probe with the actual symbols we emit. This catches cp1252 and + # other limited code pages without us having to enumerate them. + "✓✗→".encode(encoding) + return True + except (UnicodeEncodeError, LookupError): + return False + + +_UNICODE_OK = _stdout_supports_unicode() +SYM_OK = "✓" if _UNICODE_OK else "OK" +SYM_FAIL = "✗" if _UNICODE_OK else "FAIL" +SYM_ARROW = "→" if _UNICODE_OK else "->" + # Add parent directory to path for utilities import sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from utilities.context_enhancer import ContextEnhancer @@ -144,7 +162,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: } if result.returncode == 0: - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") print() # Print stderr (often contains summary info) if result.stderr: @@ -158,7 +176,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: data = json.load(f) stage_result['summary'] = self._summarize_output(name, data) else: - print(f"✗ Failed (exit code {result.returncode})") + print(f"{SYM_FAIL} Failed (exit code {result.returncode})") print() if result.stderr: print("STDERR:") @@ -171,7 +189,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") return { 'success': False, 'elapsed_seconds': elapsed, @@ -303,7 +321,7 @@ def run_stage_with_stdout_capture(self, name: str, command: list, output_file: s with open(output_file, 'w') as f: f.write(result.stdout) - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") print() # Print stderr (often contains summary info) if result.stderr: @@ -327,7 +345,7 @@ def run_stage_with_stdout_capture(self, name: str, command: list, output_file: s 'stderr': result.stderr } else: - print(f"✗ Failed (exit code {result.returncode})") + print(f"{SYM_FAIL} Failed (exit code {result.returncode})") print() if result.stderr: print("STDERR:") @@ -345,7 +363,7 @@ def run_stage_with_stdout_capture(self, name: str, command: list, output_file: s except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") return { 'success': False, 'elapsed_seconds': elapsed, @@ -445,14 +463,14 @@ def run_context_enhancer(self) -> bool: } print() - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") self.results['stages']['context_enhancer'] = result return True except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") import traceback traceback.print_exc() result = { @@ -558,9 +576,9 @@ def apply_reachability_filter(self) -> bool: 'summary': summary } - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") print(f" Entry points detected: {len(self.entry_points)}") - print(f" Units: {original_count} → {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") + print(f" Units: {original_count} {SYM_ARROW} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") print() self.results['stages']['reachability_filter'] = result @@ -568,7 +586,7 @@ def apply_reachability_filter(self) -> bool: except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") import traceback traceback.print_exc() result = { @@ -650,7 +668,7 @@ def run_codeql_analysis(self) -> bool: ) if result.returncode != 0: - print(f"✗ CodeQL database creation failed") + print(f"{SYM_FAIL} CodeQL database creation failed") print(f" stderr: {result.stderr[:500] if result.stderr else 'none'}") elapsed = (datetime.now() - start_time).total_seconds() self.results['stages']['codeql_analysis'] = { @@ -681,7 +699,7 @@ def run_codeql_analysis(self) -> bool: ) if result.returncode != 0: - print(f"✗ CodeQL analysis failed") + print(f"{SYM_FAIL} CodeQL analysis failed") print(f" stderr: {result.stderr[:500] if result.stderr else 'none'}") elapsed = (datetime.now() - start_time).total_seconds() self.results['stages']['codeql_analysis'] = { @@ -697,7 +715,7 @@ def run_codeql_analysis(self) -> bool: # Step 3: Parse SARIF output print("Parsing results...") if not os.path.exists(sarif_output): - print("✗ SARIF output not found") + print("{SYM_FAIL} SARIF output not found") elapsed = (datetime.now() - start_time).total_seconds() self.results['stages']['codeql_analysis'] = { 'success': False, @@ -760,7 +778,7 @@ def run_codeql_analysis(self) -> bool: 'summary': summary } - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") print(f" Total findings: {len(self.codeql_findings)}") print(f" Unique files: {summary['unique_files']}") if summary['by_level']: @@ -772,7 +790,7 @@ def run_codeql_analysis(self) -> bool: except FileNotFoundError: elapsed = (datetime.now() - start_time).total_seconds() - print("✗ CodeQL not found. Please install CodeQL CLI.") + print("{SYM_FAIL} CodeQL not found. Please install CodeQL CLI.") print(" See: https://docs.github.com/en/code-security/codeql-cli") self.results['stages']['codeql_analysis'] = { 'success': False, @@ -783,7 +801,7 @@ def run_codeql_analysis(self) -> bool: except subprocess.TimeoutExpired: elapsed = (datetime.now() - start_time).total_seconds() - print("✗ CodeQL analysis timed out") + print("{SYM_FAIL} CodeQL analysis timed out") self.results['stages']['codeql_analysis'] = { 'success': False, 'elapsed_seconds': elapsed, @@ -793,7 +811,7 @@ def run_codeql_analysis(self) -> bool: except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") import traceback traceback.print_exc() self.results['stages']['codeql_analysis'] = { @@ -911,10 +929,10 @@ def apply_codeql_filter(self) -> bool: 'summary': summary } - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") print(f" CodeQL findings: {len(self.codeql_findings)}") print(f" Flagged function units: {len(self.codeql_flagged_units)}") - print(f" Units: {original_count} → {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") + print(f" Units: {original_count} {SYM_ARROW} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") print() self.results['stages']['codeql_filter'] = result @@ -922,7 +940,7 @@ def apply_codeql_filter(self) -> bool: except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") import traceback traceback.print_exc() result = { @@ -1004,12 +1022,12 @@ def apply_exploitable_filter(self) -> bool: 'summary': summary } - print(f"✓ Success ({elapsed:.2f}s)") + print(f"{SYM_OK} Success ({elapsed:.2f}s)") print(f" Classification breakdown:") for cls, count in sorted(classification_counts.items()): - marker = "→" if cls == "exploitable" else " " + marker = "{SYM_ARROW}" if cls == "exploitable" else " " print(f" {marker} {cls}: {count}") - print(f" Units: {original_count} → {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") + print(f" Units: {original_count} {SYM_ARROW} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") print() self.results['stages']['exploitable_filter'] = result @@ -1017,7 +1035,7 @@ def apply_exploitable_filter(self) -> bool: except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() - print(f"✗ Error: {e}") + print(f"{SYM_FAIL} Error: {e}") import traceback traceback.print_exc() result = { @@ -1105,13 +1123,13 @@ def run_full_pipeline(self): self.results['success'] = all_success if all_success: - print("✓ All stages completed successfully") + print("{SYM_OK} All stages completed successfully") else: - print("✗ Some stages failed") + print("{SYM_FAIL} Some stages failed") print() for stage_name, stage_result in self.results['stages'].items(): - status = "✓" if stage_result.get('success') else "✗" + status = "{SYM_OK}" if stage_result.get('success') else "{SYM_FAIL}" elapsed = stage_result.get('elapsed_seconds', 0) print(f" {status} {stage_name}: {elapsed:.2f}s") diff --git a/libs/openant-core/parsers/javascript/typescript_analyzer.js b/libs/openant-core/parsers/javascript/typescript_analyzer.js index a41a80d..3cb7992 100644 --- a/libs/openant-core/parsers/javascript/typescript_analyzer.js +++ b/libs/openant-core/parsers/javascript/typescript_analyzer.js @@ -29,6 +29,21 @@ const { Project } = require("ts-morph"); const { ts } = require("@ts-morph/common"); const path = require("path"); +/** + * Convert a filesystem path to forward slashes. + * + * ts-morph stores source-file paths internally with forward slashes and + * also treats backslashes as escape characters when matching paths it + * has already added. On Windows, Node's `path.relative()` and + * `path.resolve()` return backslash-separated paths, which causes + * ts-morph to silently fail to find any files (resulting in 0 functions + * extracted). Always normalise to forward slashes before handing paths + * to ts-morph or storing them as functionId components. + */ +function toPosixPath(p) { + return p.replace(/\\/g, "/"); +} + /** * Maximally permissive compiler options for AST extraction. * We use ESNext target/module to accept ALL valid JS/TS syntax @@ -136,10 +151,15 @@ class TypeScriptAnalyzer { ? filePath : path.join(this.repoPath, filePath); + // ts-morph treats backslashes as escape characters when matching + // paths it has already added. Normalise to forward slashes so + // Windows-native paths (with `\`) resolve consistently. + const normalised = toPosixPath(fullPath); + try { - this.project.addSourceFileAtPath(fullPath); + this.project.addSourceFileAtPath(normalised); } catch (error) { - console.error(`Failed to add file ${fullPath}: ${error.message}`); + console.error(`Failed to add file ${normalised}: ${error.message}`); } } @@ -163,7 +183,12 @@ class TypeScriptAnalyzer { * Extract all functions/methods from a source file */ extractFunctionsFromFile(sourceFile) { - const relativePath = path.relative(this.repoPath, sourceFile.getFilePath()); + // Always emit POSIX-style relative paths so functionId values are + // stable across platforms (Python downstream consumers and dataset + // diffs key off these strings). + const relativePath = toPosixPath( + path.relative(this.repoPath, sourceFile.getFilePath()), + ); // Extract function declarations for (const func of sourceFile.getFunctions()) { @@ -407,7 +432,9 @@ class TypeScriptAnalyzer { * For each function, find what other functions it calls */ buildCallGraphForFile(sourceFile) { - const relativePath = path.relative(this.repoPath, sourceFile.getFilePath()); + const relativePath = toPosixPath( + path.relative(this.repoPath, sourceFile.getFilePath()), + ); // Analyze function declarations for (const func of sourceFile.getFunctions()) { @@ -889,9 +916,14 @@ if (require.main === module) { process.exit(1); } const content = fs.readFileSync(listFile, "utf-8"); + // Split on either CRLF or LF and trim residual whitespace so + // file lists written on Windows (with \r\n line endings) don't + // leave a trailing \r on each path, which would make + // addSourceFileAtPath fail. filePaths = content - .split("\n") - .filter((line) => line.trim().length > 0); + .split(/\r?\n/) + .map((line) => line.trim()) + .filter((line) => line.length > 0); console.error(`Loaded ${filePaths.length} files from ${listFile}`); i += 2; } else if (args[i] === "--output" && i + 1 < args.length) { diff --git a/libs/openant-core/tests/test_go_cli.py b/libs/openant-core/tests/test_go_cli.py index fc92113..bce25c9 100644 --- a/libs/openant-core/tests/test_go_cli.py +++ b/libs/openant-core/tests/test_go_cli.py @@ -129,17 +129,11 @@ def test_parse_js_repo(self, sample_js_repo, tmp_path): "--language", "javascript", "--json", ) - if result.returncode != 0: - if "No module named" in result.stderr: - if sys.platform == "win32": - pytest.skip("Go CLI using system Python without required packages (Windows)") - else: - pytest.fail("Go CLI resolved wrong Python (missing required packages)") - if "UnicodeEncodeError" in result.stderr: - if sys.platform == "win32": - pytest.skip("Pre-existing Unicode bug in JS test_pipeline.py on Windows") - else: - pytest.fail("UnicodeEncodeError from JS parser on non-Windows (unexpected regression)") + if result.returncode != 0 and "No module named" in result.stderr: + if sys.platform == "win32": + pytest.skip("Go CLI using system Python without required packages (Windows)") + else: + pytest.fail("Go CLI resolved wrong Python (missing required packages)") assert result.returncode == 0 envelope = json.loads(result.stdout) assert envelope["status"] == "success" diff --git a/libs/openant-core/tests/test_js_parser.py b/libs/openant-core/tests/test_js_parser.py index 25bf951..44e5d67 100644 --- a/libs/openant-core/tests/test_js_parser.py +++ b/libs/openant-core/tests/test_js_parser.py @@ -6,7 +6,6 @@ import json import subprocess import shutil -import sys from pathlib import Path import pytest @@ -66,13 +65,6 @@ def test_skip_tests_flag(self, tmp_path): class TestTypeScriptAnalyzer: - # Known issue: ts-morph fails to resolve files with backslash paths on Windows - _windows_path_xfail = pytest.mark.xfail( - sys.platform == "win32", - reason="ts-morph path resolution issue with Windows backslash paths", - strict=False, - ) - def test_analyzes_files(self, sample_js_repo, tmp_path): # First scan to get file list scan_output = tmp_path / "scan.json" @@ -94,7 +86,6 @@ def test_analyzes_files(self, sample_js_repo, tmp_path): assert result.returncode == 0 assert analyzer_output.exists() - @_windows_path_xfail def test_extracts_functions(self, sample_js_repo, tmp_path): scan_output = tmp_path / "scan.json" run_node("repository_scanner.js", sample_js_repo, "--output", str(scan_output)) @@ -190,14 +181,6 @@ def test_units_have_required_fields(self, analyzer_output, tmp_path): class TestFullPipeline: """End-to-end test through parser_adapter.""" - # Known issue: test_pipeline.py uses Unicode checkmarks that fail on Windows cp1252 - _windows_unicode_xfail = pytest.mark.xfail( - sys.platform == "win32", - reason="JS test_pipeline.py Unicode chars fail on Windows cp1252 encoding", - strict=False, - ) - - @_windows_unicode_xfail def test_parse_js_repo(self, sample_js_repo, tmp_output_dir): from core.parser_adapter import parse_repository @@ -213,7 +196,6 @@ def test_parse_js_repo(self, sample_js_repo, tmp_output_dir): assert result.analyzer_output_path is not None assert Path(result.analyzer_output_path).exists() - @_windows_unicode_xfail def test_auto_detects_javascript(self, sample_js_repo, tmp_output_dir): from core.parser_adapter import parse_repository diff --git a/libs/openant-core/tests/test_windows_path_handling.py b/libs/openant-core/tests/test_windows_path_handling.py new file mode 100644 index 0000000..b2f29f3 --- /dev/null +++ b/libs/openant-core/tests/test_windows_path_handling.py @@ -0,0 +1,239 @@ +"""Tests for Windows-specific path/encoding handling in JS and Go parser pipelines. + +These tests cover three fixes that prevent OpenAnt from running correctly on +Windows. They are meant to run on every platform — the bugs they protect +against are platform-independent in their failure modes (you can simulate a +backslash file list, a CRLF file list, and a cp1252-only stdout from any +host) and a regression on POSIX would still be a regression. +""" +import importlib.util +import io +import json +import os +import shutil +import subprocess +import sys +from pathlib import Path + +import pytest + +PARSERS_DIR = Path(__file__).parent.parent / "parsers" +JS_PARSERS_DIR = PARSERS_DIR / "javascript" +GO_PARSERS_DIR = PARSERS_DIR / "go" +TS_ANALYZER = JS_PARSERS_DIR / "typescript_analyzer.js" +JS_NODE_MODULES = JS_PARSERS_DIR / "node_modules" + + +# --------------------------------------------------------------------------- +# JS analyzer: backslash paths must be normalised before reaching ts-morph +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif( + not shutil.which("node") or not JS_NODE_MODULES.exists(), + reason="Node.js or JS parser npm dependencies not available", +) +def test_typescript_analyzer_accepts_backslash_paths(tmp_path): + """Regression: ts-morph silently drops files when given backslash paths. + + On Windows, ``path.relative()`` and ``path.resolve()`` produce paths + separated by ``\\``. ts-morph treats backslash as an escape character + when matching paths it has already added, so without explicit + normalisation the analyzer reports zero functions even for valid input. + We simulate this on any platform by writing a file list with backslash + separators and asserting the analyzer still finds the function. + """ + # Create a simple repo + repo = tmp_path / "repo" + src = repo / "src" + src.mkdir(parents=True) + (src / "module.js").write_text( + "function greet(name) { return `hello ${name}`; }\n" + "module.exports = { greet };\n", + encoding="utf-8", + ) + + # Write a file list using backslash separators (the Windows-native form). + # On POSIX this is otherwise meaningless input, but the analyzer's + # normalisation step should still accept it. + file_list = tmp_path / "files.txt" + abs_path = str(src / "module.js") + backslash_path = abs_path.replace("/", "\\") + file_list.write_text(backslash_path + "\n", encoding="utf-8") + + out_file = tmp_path / "analyzer_output.json" + result = subprocess.run( + [ + "node", + str(TS_ANALYZER), + str(repo), + "--files-from", + str(file_list), + "--output", + str(out_file), + ], + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode == 0, ( + f"analyzer failed:\nSTDERR:\n{result.stderr}\nSTDOUT:\n{result.stdout}" + ) + data = json.loads(out_file.read_text(encoding="utf-8")) + + # Functions must be found, regardless of slash flavour in the input. + assert data.get("functions"), ( + f"expected at least one function; got {data.get('functions')!r}" + ) + func_names = [f.get("name") for f in data["functions"].values()] + assert "greet" in func_names + + # Function ids must be POSIX-form (forward slashes only). Backslash + # leakage into ids would break downstream Python consumers. + for func_id in data["functions"]: + assert "\\" not in func_id, f"functionId contains backslash: {func_id!r}" + + +@pytest.mark.skipif( + not shutil.which("node") or not JS_NODE_MODULES.exists(), + reason="Node.js or JS parser npm dependencies not available", +) +def test_typescript_analyzer_strips_crlf_from_file_list(tmp_path): + """Regression: file lists written on Windows have CRLF line endings. + + Splitting on ``\\n`` alone leaves a trailing ``\\r`` on each path, + which ts-morph then fails to resolve. Confirm the analyzer accepts + a CRLF-terminated file list and produces a non-empty result. + """ + repo = tmp_path / "repo" + repo.mkdir() + (repo / "a.js").write_text("function alpha() {}\n", encoding="utf-8") + (repo / "b.js").write_text("function beta() {}\n", encoding="utf-8") + + file_list = tmp_path / "files.txt" + # Explicit CRLF, plus a trailing blank line that should be tolerated. + content = "\r\n".join([str(repo / "a.js"), str(repo / "b.js"), ""]) + file_list.write_bytes(content.encode("utf-8")) + + out_file = tmp_path / "out.json" + result = subprocess.run( + [ + "node", + str(TS_ANALYZER), + str(repo), + "--files-from", + str(file_list), + "--output", + str(out_file), + ], + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode == 0, ( + f"analyzer failed:\nSTDERR:\n{result.stderr}\nSTDOUT:\n{result.stdout}" + ) + data = json.loads(out_file.read_text(encoding="utf-8")) + func_names = [f.get("name") for f in data.get("functions", {}).values()] + assert "alpha" in func_names + assert "beta" in func_names + + +# --------------------------------------------------------------------------- +# test_pipeline.py: status output must stay safe on a cp1252 stdout +# --------------------------------------------------------------------------- + + +def _load_pipeline_module(name, source_path): + """Import a parser test_pipeline.py module under a custom name. + + The two pipelines (JS, Go) live in sibling directories and both + expose a module named ``test_pipeline``. We import them under + distinct names so they coexist in this test process. + """ + spec = importlib.util.spec_from_file_location(name, source_path) + mod = importlib.util.module_from_spec(spec) + # The pipeline modules import siblings via sys.path manipulation; + # let them do that as part of their normal import. + spec.loader.exec_module(mod) + return mod + + +@pytest.fixture(params=["javascript", "go"]) +def pipeline_module(request, monkeypatch): + """Load the JS or Go test_pipeline module fresh under a synthetic stdout. + + We point ``sys.stdout`` at a buffer with a cp1252 encoding before the + module is imported, so the module-level ``_stdout_supports_unicode()`` + check sees the constrained encoding. We then re-import each time to + capture the fresh module-level state. + """ + # Replace stdout with a cp1252-only buffer so the module-level helper + # picks the ASCII fallback. + fake_stdout = io.TextIOWrapper(io.BytesIO(), encoding="cp1252", newline="") + monkeypatch.setattr(sys, "stdout", fake_stdout) + + parsers_root = PARSERS_DIR + sys.path.insert(0, str(parsers_root.parent)) # so utilities.* imports work + try: + if request.param == "javascript": + path = parsers_root / "javascript" / "test_pipeline.py" + mod_name = "openant_test_js_pipeline_cp1252" + else: + path = parsers_root / "go" / "test_pipeline.py" + mod_name = "openant_test_go_pipeline_cp1252" + + # Drop any cached version so module-level symbol detection re-runs. + sys.modules.pop(mod_name, None) + mod = _load_pipeline_module(mod_name, path) + yield mod + finally: + try: + sys.path.remove(str(parsers_root.parent)) + except ValueError: + pass + + +def test_pipeline_uses_ascii_fallback_on_cp1252_stdout(pipeline_module): + """Status symbols must be ASCII-only on a cp1252-encoded stdout. + + The original pipelines printed ``✓``, ``✗`` and ``→`` directly, which + crashed Python's print on cp1252 consoles (the Windows default). + """ + assert pipeline_module._UNICODE_OK is False, ( + "_stdout_supports_unicode() should report False for cp1252 stdout" + ) + assert pipeline_module.SYM_OK == "OK" + assert pipeline_module.SYM_FAIL == "FAIL" + assert pipeline_module.SYM_ARROW == "->" + + # And the ASCII fallbacks must round-trip through cp1252 without error. + for s in (pipeline_module.SYM_OK, pipeline_module.SYM_FAIL, pipeline_module.SYM_ARROW): + s.encode("cp1252") # must not raise + + +def test_pipeline_uses_unicode_when_stdout_supports_it(monkeypatch): + """When stdout can encode the symbols, prefer the prettier Unicode form.""" + # Reload under a UTF-8 stdout to confirm the other branch. + fake_stdout = io.TextIOWrapper(io.BytesIO(), encoding="utf-8", newline="") + monkeypatch.setattr(sys, "stdout", fake_stdout) + + parsers_root = PARSERS_DIR + sys.path.insert(0, str(parsers_root.parent)) + try: + sys.modules.pop("openant_test_js_pipeline_utf8", None) + mod = _load_pipeline_module( + "openant_test_js_pipeline_utf8", + parsers_root / "javascript" / "test_pipeline.py", + ) + assert mod._UNICODE_OK is True + assert mod.SYM_OK == "✓" + assert mod.SYM_FAIL == "✗" + assert mod.SYM_ARROW == "→" + finally: + try: + sys.path.remove(str(parsers_root.parent)) + except ValueError: + pass From 923b16b5d247c2bde73c7877cd6687a70c0ba3ce Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Mon, 4 May 2026 22:40:29 +0300 Subject: [PATCH 2/5] =?UTF-8?q?fix:=20address=20review=20F1,=20F2,=20F3,?= =?UTF-8?q?=20F6=20=E2=80=94=20f-string=20regressions,=20toPosixPath=20gap?= =?UTF-8?q?s,=20unicode=20clarity?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit F1: fix 7 broken non-f-string print calls in go/test_pipeline.py (SYM_FAIL, SYM_OK, SYM_ARROW literals emitted verbatim) and 7 matching regressions in javascript/test_pipeline.py. status/marker variables now use the SYM_* variables directly instead of wrapping them in string literals. F2: apply toPosixPath to extractSingleFunction's filePath and to the nested requiredProject.addSourceFileAtPath resolvedPath so single-function CLI mode normalises Windows paths the same way batch mode does. F3: normalise this.repoPath in the TypeScriptAnalyzer constructor via toPosixPath(path.resolve(repoPath)) so all later path.relative / path.join calls operate on a consistent forward-slash base. F6: replace the `or ""` encoding fallback in _stdout_supports_unicode() with an explicit `if not encoding: return False` guard in both pipeline files, and add a docstring noting the piped/redirected-stdout behaviour. F9: add UNC path behaviour note to toPosixPath JSDoc. Co-Authored-By: Claude Sonnet 4.6 --- libs/openant-core/parsers/go/test_pipeline.py | 25 +++++++++++------- .../parsers/javascript/test_pipeline.py | 25 +++++++++++------- .../parsers/javascript/typescript_analyzer.js | 26 ++++++++++++++----- 3 files changed, 52 insertions(+), 24 deletions(-) diff --git a/libs/openant-core/parsers/go/test_pipeline.py b/libs/openant-core/parsers/go/test_pipeline.py index 078d799..7a47ce5 100644 --- a/libs/openant-core/parsers/go/test_pipeline.py +++ b/libs/openant-core/parsers/go/test_pipeline.py @@ -45,8 +45,15 @@ def _stdout_supports_unicode() -> bool: - """Return True if sys.stdout can emit the symbols we use for status.""" - encoding = getattr(sys.stdout, "encoding", None) or "" + """Return True if sys.stdout can emit the symbols we use for status. + + Returns False when stdout is piped or redirected (common in CI) and + the encoding cannot be determined — this degrades output to plain ASCII + rather than raising UnicodeEncodeError at runtime. + """ + encoding = getattr(sys.stdout, "encoding", None) + if not encoding: + return False try: # Probe with the actual symbols we emit. This catches cp1252 and # other limited code pages without us having to enumerate them. @@ -507,7 +514,7 @@ def run_codeql_analysis(self) -> bool: # Step 3: Parse SARIF output print("Parsing results...") if not os.path.exists(sarif_output): - print("{SYM_FAIL} SARIF output not found") + print(f"{SYM_FAIL} SARIF output not found") elapsed = (datetime.now() - start_time).total_seconds() self.results['stages']['codeql_analysis'] = { 'success': False, @@ -580,7 +587,7 @@ def run_codeql_analysis(self) -> bool: except FileNotFoundError: elapsed = (datetime.now() - start_time).total_seconds() - print("{SYM_FAIL} CodeQL not found. Please install CodeQL CLI.") + print(f"{SYM_FAIL} CodeQL not found. Please install CodeQL CLI.") print(" See: https://docs.github.com/en/code-security/codeql-cli") self.results['stages']['codeql_analysis'] = { 'success': False, @@ -591,7 +598,7 @@ def run_codeql_analysis(self) -> bool: except subprocess.TimeoutExpired: elapsed = (datetime.now() - start_time).total_seconds() - print("{SYM_FAIL} CodeQL analysis timed out") + print(f"{SYM_FAIL} CodeQL analysis timed out") self.results['stages']['codeql_analysis'] = { 'success': False, 'elapsed_seconds': elapsed, @@ -894,7 +901,7 @@ def apply_exploitable_filter(self) -> bool: print(f"{SYM_OK} Success ({elapsed:.2f}s)") print(f" Classification breakdown:") for cls, count in sorted(classification_counts.items()): - marker = "{SYM_ARROW}" if cls == "exploitable" else " " + marker = SYM_ARROW if cls == "exploitable" else " " print(f" {marker} {cls}: {count}") print(f" Units: {original_count} {SYM_ARROW} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") print() @@ -984,13 +991,13 @@ def run_full_pipeline(self): self.results['success'] = all_success if all_success: - print("{SYM_OK} All stages completed successfully") + print(f"{SYM_OK} All stages completed successfully") else: - print("{SYM_FAIL} Some stages failed") + print(f"{SYM_FAIL} Some stages failed") print() for stage_name, stage_result in self.results['stages'].items(): - status = "{SYM_OK}" if stage_result.get('success') else "{SYM_FAIL}" + status = SYM_OK if stage_result.get('success') else SYM_FAIL elapsed = stage_result.get('elapsed_seconds', 0) print(f" {status} {stage_name}: {elapsed:.2f}s") diff --git a/libs/openant-core/parsers/javascript/test_pipeline.py b/libs/openant-core/parsers/javascript/test_pipeline.py index b80ed85..6ceee6c 100644 --- a/libs/openant-core/parsers/javascript/test_pipeline.py +++ b/libs/openant-core/parsers/javascript/test_pipeline.py @@ -44,8 +44,15 @@ def _stdout_supports_unicode() -> bool: - """Return True if sys.stdout can emit the symbols we use for status.""" - encoding = getattr(sys.stdout, "encoding", None) or "" + """Return True if sys.stdout can emit the symbols we use for status. + + Returns False when stdout is piped or redirected (common in CI) and + the encoding cannot be determined — this degrades output to plain ASCII + rather than raising UnicodeEncodeError at runtime. + """ + encoding = getattr(sys.stdout, "encoding", None) + if not encoding: + return False try: # Probe with the actual symbols we emit. This catches cp1252 and # other limited code pages without us having to enumerate them. @@ -715,7 +722,7 @@ def run_codeql_analysis(self) -> bool: # Step 3: Parse SARIF output print("Parsing results...") if not os.path.exists(sarif_output): - print("{SYM_FAIL} SARIF output not found") + print(f"{SYM_FAIL} SARIF output not found") elapsed = (datetime.now() - start_time).total_seconds() self.results['stages']['codeql_analysis'] = { 'success': False, @@ -790,7 +797,7 @@ def run_codeql_analysis(self) -> bool: except FileNotFoundError: elapsed = (datetime.now() - start_time).total_seconds() - print("{SYM_FAIL} CodeQL not found. Please install CodeQL CLI.") + print(f"{SYM_FAIL} CodeQL not found. Please install CodeQL CLI.") print(" See: https://docs.github.com/en/code-security/codeql-cli") self.results['stages']['codeql_analysis'] = { 'success': False, @@ -801,7 +808,7 @@ def run_codeql_analysis(self) -> bool: except subprocess.TimeoutExpired: elapsed = (datetime.now() - start_time).total_seconds() - print("{SYM_FAIL} CodeQL analysis timed out") + print(f"{SYM_FAIL} CodeQL analysis timed out") self.results['stages']['codeql_analysis'] = { 'success': False, 'elapsed_seconds': elapsed, @@ -1025,7 +1032,7 @@ def apply_exploitable_filter(self) -> bool: print(f"{SYM_OK} Success ({elapsed:.2f}s)") print(f" Classification breakdown:") for cls, count in sorted(classification_counts.items()): - marker = "{SYM_ARROW}" if cls == "exploitable" else " " + marker = SYM_ARROW if cls == "exploitable" else " " print(f" {marker} {cls}: {count}") print(f" Units: {original_count} {SYM_ARROW} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)") print() @@ -1123,13 +1130,13 @@ def run_full_pipeline(self): self.results['success'] = all_success if all_success: - print("{SYM_OK} All stages completed successfully") + print(f"{SYM_OK} All stages completed successfully") else: - print("{SYM_FAIL} Some stages failed") + print(f"{SYM_FAIL} Some stages failed") print() for stage_name, stage_result in self.results['stages'].items(): - status = "{SYM_OK}" if stage_result.get('success') else "{SYM_FAIL}" + status = SYM_OK if stage_result.get('success') else SYM_FAIL elapsed = stage_result.get('elapsed_seconds', 0) print(f" {status} {stage_name}: {elapsed:.2f}s") diff --git a/libs/openant-core/parsers/javascript/typescript_analyzer.js b/libs/openant-core/parsers/javascript/typescript_analyzer.js index 3cb7992..2889613 100644 --- a/libs/openant-core/parsers/javascript/typescript_analyzer.js +++ b/libs/openant-core/parsers/javascript/typescript_analyzer.js @@ -39,6 +39,9 @@ const path = require("path"); * ts-morph to silently fail to find any files (resulting in 0 functions * extracted). Always normalise to forward slashes before handing paths * to ts-morph or storing them as functionId components. + * + * UNC paths (`\\server\share\...`) are correctly converted to + * `//server/share/...`, which TypeScript understands on Windows. */ function toPosixPath(p) { return p.replace(/\\/g, "/"); @@ -65,7 +68,9 @@ const PERMISSIVE_COMPILER_OPTIONS = { class TypeScriptAnalyzer { constructor(repoPath) { - this.repoPath = repoPath; + // Normalise immediately so all later path operations (path.relative, + // path.join) work with a consistent forward-slash base on Windows. + this.repoPath = toPosixPath(path.resolve(repoPath)); this.project = new Project({ compilerOptions: PERMISSIVE_COMPILER_OPTIONS, }); @@ -525,8 +530,12 @@ function extractSingleFunction(filePath, functionRef) { compilerOptions: PERMISSIVE_COMPILER_OPTIONS, }); + // Normalise to forward slashes so ts-morph can match the path it stores + // internally. On Windows, filePath may arrive with backslashes. + const normalisedFilePath = toPosixPath(path.resolve(filePath)); + try { - const sourceFile = project.addSourceFileAtPath(filePath); + const sourceFile = project.addSourceFileAtPath(normalisedFilePath); // Parse function reference (e.g., "sessionHandler.handleLogin" or just "handleLogin") let className = null; @@ -716,16 +725,21 @@ function extractSingleFunction(filePath, functionRef) { ); if (requireMatch) { const requiredPath = requireMatch[1]; - // Resolve the path relative to current file - const currentDir = path.dirname(filePath); - let resolvedPath = path.resolve(currentDir, requiredPath); + // Resolve the path relative to current file; use the + // already-normalised path to avoid mixed separators. + const currentDir = path.dirname(normalisedFilePath); + let resolvedPath = toPosixPath( + path.resolve(currentDir, requiredPath), + ); // Try with .js extension if not present if (!fs.existsSync(resolvedPath)) { resolvedPath = resolvedPath + ".js"; } if (!fs.existsSync(resolvedPath)) { - resolvedPath = path.resolve(currentDir, requiredPath + ".ts"); + resolvedPath = toPosixPath( + path.resolve(currentDir, requiredPath + ".ts"), + ); } if (fs.existsSync(resolvedPath)) { From e3d9abd418c75fe794d6ef3db8591aa7c0e2aeb5 Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Mon, 4 May 2026 22:40:41 +0300 Subject: [PATCH 3/5] =?UTF-8?q?test:=20address=20review=20F4,=20F5,=20F7,?= =?UTF-8?q?=20F8=20=E2=80=94=20test=20correctness=20and=20coverage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit F4: gate test_typescript_analyzer_accepts_backslash_paths to Windows only. Replacing all forward slashes in a POSIX absolute path produces a non-absolute path on Linux/macOS, making the test fail on those platforms. F5: pop freshly-loaded cp1252 and utf-8 pipeline modules from sys.modules in finally blocks (pipeline_module fixture and unicode test) to prevent stale module-level state leaking into later tests. F7: parametrize test_pipeline_uses_unicode_when_stdout_supports_it to cover both the JS and Go pipeline modules, closing the coverage gap for the Go Unicode code path. F8: restore UnicodeEncodeError regression guard in test_go_cli.py test_parse_js_repo. The Windows-specific pytest.skip was correct to remove (bug is fixed), but the non-Windows pytest.fail provides a diagnostic on future regressions. Co-Authored-By: Claude Sonnet 4.6 --- libs/openant-core/tests/test_go_cli.py | 2 + .../tests/test_windows_path_handling.py | 37 ++++++++++++++----- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/libs/openant-core/tests/test_go_cli.py b/libs/openant-core/tests/test_go_cli.py index bce25c9..42ad294 100644 --- a/libs/openant-core/tests/test_go_cli.py +++ b/libs/openant-core/tests/test_go_cli.py @@ -134,6 +134,8 @@ def test_parse_js_repo(self, sample_js_repo, tmp_path): pytest.skip("Go CLI using system Python without required packages (Windows)") else: pytest.fail("Go CLI resolved wrong Python (missing required packages)") + if result.returncode != 0 and "UnicodeEncodeError" in result.stderr: + pytest.fail("UnicodeEncodeError from JS parser (unexpected regression)") assert result.returncode == 0 envelope = json.loads(result.stdout) assert envelope["status"] == "success" diff --git a/libs/openant-core/tests/test_windows_path_handling.py b/libs/openant-core/tests/test_windows_path_handling.py index b2f29f3..9df02da 100644 --- a/libs/openant-core/tests/test_windows_path_handling.py +++ b/libs/openant-core/tests/test_windows_path_handling.py @@ -29,6 +29,11 @@ # --------------------------------------------------------------------------- +@pytest.mark.skipif( + sys.platform != "win32", + reason="backslash path simulation only meaningful on Windows; " + "replacing all forward slashes produces a non-absolute path on POSIX", +) @pytest.mark.skipif( not shutil.which("node") or not JS_NODE_MODULES.exists(), reason="Node.js or JS parser npm dependencies not available", @@ -40,8 +45,9 @@ def test_typescript_analyzer_accepts_backslash_paths(tmp_path): separated by ``\\``. ts-morph treats backslash as an escape character when matching paths it has already added, so without explicit normalisation the analyzer reports zero functions even for valid input. - We simulate this on any platform by writing a file list with backslash - separators and asserting the analyzer still finds the function. + This test only runs on Windows because a Linux/macOS absolute path + (``/tmp/...``) with all slashes replaced becomes ``\\tmp\\...`` which + Node does not treat as absolute, making the test unsound on POSIX. """ # Create a simple repo repo = tmp_path / "repo" @@ -190,6 +196,10 @@ def pipeline_module(request, monkeypatch): mod = _load_pipeline_module(mod_name, path) yield mod finally: + # Remove the freshly loaded module so its stale cp1252-patched + # module-level state (_UNICODE_OK, SYM_*) doesn't leak into later + # tests that may load the same pipeline module. + sys.modules.pop(mod_name, None) try: sys.path.remove(str(parsers_root.parent)) except ValueError: @@ -214,8 +224,19 @@ def test_pipeline_uses_ascii_fallback_on_cp1252_stdout(pipeline_module): s.encode("cp1252") # must not raise -def test_pipeline_uses_unicode_when_stdout_supports_it(monkeypatch): - """When stdout can encode the symbols, prefer the prettier Unicode form.""" +@pytest.mark.parametrize( + "mod_name,rel_path", + [ + ("openant_test_js_pipeline_utf8", "javascript/test_pipeline.py"), + ("openant_test_go_pipeline_utf8", "go/test_pipeline.py"), + ], +) +def test_pipeline_uses_unicode_when_stdout_supports_it(monkeypatch, mod_name, rel_path): + """When stdout can encode the symbols, prefer the prettier Unicode form. + + Covers both the JS and Go pipeline modules to ensure neither regresses + to ASCII when the terminal supports Unicode. + """ # Reload under a UTF-8 stdout to confirm the other branch. fake_stdout = io.TextIOWrapper(io.BytesIO(), encoding="utf-8", newline="") monkeypatch.setattr(sys, "stdout", fake_stdout) @@ -223,16 +244,14 @@ def test_pipeline_uses_unicode_when_stdout_supports_it(monkeypatch): parsers_root = PARSERS_DIR sys.path.insert(0, str(parsers_root.parent)) try: - sys.modules.pop("openant_test_js_pipeline_utf8", None) - mod = _load_pipeline_module( - "openant_test_js_pipeline_utf8", - parsers_root / "javascript" / "test_pipeline.py", - ) + sys.modules.pop(mod_name, None) + mod = _load_pipeline_module(mod_name, parsers_root / rel_path) assert mod._UNICODE_OK is True assert mod.SYM_OK == "✓" assert mod.SYM_FAIL == "✗" assert mod.SYM_ARROW == "→" finally: + sys.modules.pop(mod_name, None) try: sys.path.remove(str(parsers_root.parent)) except ValueError: From 22e6b567e526cb6464f0fccc84efec15c8ade657 Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Mon, 4 May 2026 22:49:05 +0300 Subject: [PATCH 4/5] =?UTF-8?q?test:=20address=20review=20F10-F14=20?= =?UTF-8?q?=E2=80=94=20module=20registry,=20skip=20guard,=20path=20consist?= =?UTF-8?q?ency,=20sys.path=20dedup,=20assertion=20safety?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit F10: register module in sys.modules inside _load_pipeline_module so that pre/post sys.modules.pop() calls in the fixture and unicode test are meaningful and prevent stale module-level state from leaking. F11: combine double @pytest.mark.skipif decorators on test_typescript_analyzer_accepts_backslash_paths into a single condition so the skip reason is always accurate and neither guard can mask the other. F12: move toPosixPath/path.resolve above the fs.existsSync check in extractSingleFunction so the existence check and error message both use the normalised path, consistent with the rest of the function. F13: guard sys.path.insert with already_on_path check in both the pipeline_module fixture and test_pipeline_uses_unicode_when_stdout_supports_it to prevent duplicate path entries across parametrized test runs; only remove from sys.path if we added it. F14: replace literal Unicode glyph strings in assertions (✓ ✗ →) with \u-escape sequences so pytest assertion error messages remain safe on cp1252 consoles. Co-Authored-By: Claude Sonnet 4.6 --- .../parsers/javascript/typescript_analyzer.js | 14 ++--- .../tests/test_windows_path_handling.py | 58 ++++++++++++------- 2 files changed, 45 insertions(+), 27 deletions(-) diff --git a/libs/openant-core/parsers/javascript/typescript_analyzer.js b/libs/openant-core/parsers/javascript/typescript_analyzer.js index 2889613..05a5db1 100644 --- a/libs/openant-core/parsers/javascript/typescript_analyzer.js +++ b/libs/openant-core/parsers/javascript/typescript_analyzer.js @@ -520,9 +520,13 @@ class TypeScriptAnalyzer { function extractSingleFunction(filePath, functionRef) { const fs = require("fs"); - // Check if file exists - if (!fs.existsSync(filePath)) { - console.error(`File not found: ${filePath}`); + // Normalise to forward slashes so ts-morph can match the path it stores + // internally. On Windows, filePath may arrive with backslashes. + const normalisedFilePath = toPosixPath(path.resolve(filePath)); + + // Check if file exists using the normalised path for consistent error messages. + if (!fs.existsSync(normalisedFilePath)) { + console.error(`File not found: ${normalisedFilePath}`); process.exit(1); } @@ -530,10 +534,6 @@ function extractSingleFunction(filePath, functionRef) { compilerOptions: PERMISSIVE_COMPILER_OPTIONS, }); - // Normalise to forward slashes so ts-morph can match the path it stores - // internally. On Windows, filePath may arrive with backslashes. - const normalisedFilePath = toPosixPath(path.resolve(filePath)); - try { const sourceFile = project.addSourceFileAtPath(normalisedFilePath); diff --git a/libs/openant-core/tests/test_windows_path_handling.py b/libs/openant-core/tests/test_windows_path_handling.py index 9df02da..e26d1f8 100644 --- a/libs/openant-core/tests/test_windows_path_handling.py +++ b/libs/openant-core/tests/test_windows_path_handling.py @@ -30,13 +30,11 @@ @pytest.mark.skipif( - sys.platform != "win32", - reason="backslash path simulation only meaningful on Windows; " - "replacing all forward slashes produces a non-absolute path on POSIX", -) -@pytest.mark.skipif( - not shutil.which("node") or not JS_NODE_MODULES.exists(), - reason="Node.js or JS parser npm dependencies not available", + sys.platform != "win32" + or not shutil.which("node") + or not JS_NODE_MODULES.exists(), + reason="Windows-only (backslash paths are non-absolute on POSIX) and " + "requires Node.js and JS parser npm dependencies", ) def test_typescript_analyzer_accepts_backslash_paths(tmp_path): """Regression: ts-morph silently drops files when given backslash paths. @@ -158,9 +156,17 @@ def _load_pipeline_module(name, source_path): The two pipelines (JS, Go) live in sibling directories and both expose a module named ``test_pipeline``. We import them under distinct names so they coexist in this test process. + + The module is registered in ``sys.modules`` so that callers can + reliably remove it afterwards (via ``sys.modules.pop(name, None)``) + to prevent stale module-level state — such as ``_UNICODE_OK`` and + ``SYM_*`` globals — from leaking into subsequent tests. """ spec = importlib.util.spec_from_file_location(name, source_path) mod = importlib.util.module_from_spec(spec) + # Register before exec so that relative imports inside the module + # resolve correctly, and so callers can pop the module after use. + sys.modules[name] = mod # The pipeline modules import siblings via sys.path manipulation; # let them do that as part of their normal import. spec.loader.exec_module(mod) @@ -182,7 +188,10 @@ def pipeline_module(request, monkeypatch): monkeypatch.setattr(sys, "stdout", fake_stdout) parsers_root = PARSERS_DIR - sys.path.insert(0, str(parsers_root.parent)) # so utilities.* imports work + parsers_parent = str(parsers_root.parent) + already_on_path = parsers_parent in sys.path + if not already_on_path: + sys.path.insert(0, parsers_parent) # so utilities.* imports work try: if request.param == "javascript": path = parsers_root / "javascript" / "test_pipeline.py" @@ -200,10 +209,11 @@ def pipeline_module(request, monkeypatch): # module-level state (_UNICODE_OK, SYM_*) doesn't leak into later # tests that may load the same pipeline module. sys.modules.pop(mod_name, None) - try: - sys.path.remove(str(parsers_root.parent)) - except ValueError: - pass + if not already_on_path: + try: + sys.path.remove(parsers_parent) + except ValueError: + pass def test_pipeline_uses_ascii_fallback_on_cp1252_stdout(pipeline_module): @@ -242,17 +252,25 @@ def test_pipeline_uses_unicode_when_stdout_supports_it(monkeypatch, mod_name, re monkeypatch.setattr(sys, "stdout", fake_stdout) parsers_root = PARSERS_DIR - sys.path.insert(0, str(parsers_root.parent)) + parsers_parent = str(parsers_root.parent) + already_on_path = parsers_parent in sys.path + if not already_on_path: + sys.path.insert(0, parsers_parent) try: sys.modules.pop(mod_name, None) mod = _load_pipeline_module(mod_name, parsers_root / rel_path) assert mod._UNICODE_OK is True - assert mod.SYM_OK == "✓" - assert mod.SYM_FAIL == "✗" - assert mod.SYM_ARROW == "→" + # Use Unicode escape sequences rather than literal glyphs so that + # assertion failure messages are safe on cp1252 consoles — printing + # the literal characters (U+2713, U+2717, U+2192) would itself raise + # UnicodeEncodeError on the very platform these tests guard against. + assert mod.SYM_OK == "\u2713" # CHECK MARK (U+2713) + assert mod.SYM_FAIL == "\u2717" # BALLOT X (U+2717) + assert mod.SYM_ARROW == "\u2192" # RIGHTWARDS ARROW (U+2192) finally: sys.modules.pop(mod_name, None) - try: - sys.path.remove(str(parsers_root.parent)) - except ValueError: - pass + if not already_on_path: + try: + sys.path.remove(parsers_parent) + except ValueError: + pass From fb31f2a846576e2773764401ca50bcb6d7e2506b Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Mon, 4 May 2026 22:56:56 +0300 Subject: [PATCH 5/5] =?UTF-8?q?test:=20address=20review=20F15,=20F16,=20F1?= =?UTF-8?q?7=20=E2=80=94=20sys.path=20isolation,=20exec=5Fmodule=20rollbac?= =?UTF-8?q?k,=20comment=20alignment?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit F15: snapshot and restore sys.path around exec_module in _load_pipeline_module so that module-level sys.path.insert calls in the pipeline files do not accumulate across parametrized test runs. F16: roll back sys.modules[name] on exec_module failure (CPython importlib convention) so a failed load does not leave a partially-initialized module poisoning the cache. F17: fix inconsistent inline comment spacing on the three \u-escape assertion lines — normalise to 2 spaces before # for all three. Co-Authored-By: Claude Sonnet 4.6 --- .../tests/test_windows_path_handling.py | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/libs/openant-core/tests/test_windows_path_handling.py b/libs/openant-core/tests/test_windows_path_handling.py index e26d1f8..95a6c56 100644 --- a/libs/openant-core/tests/test_windows_path_handling.py +++ b/libs/openant-core/tests/test_windows_path_handling.py @@ -161,15 +161,31 @@ def _load_pipeline_module(name, source_path): reliably remove it afterwards (via ``sys.modules.pop(name, None)``) to prevent stale module-level state — such as ``_UNICODE_OK`` and ``SYM_*`` globals — from leaking into subsequent tests. + + ``sys.path`` is snapshot/restored around ``exec_module`` so that the + module-level ``sys.path.insert`` calls in the pipeline files do not + accumulate extra entries on repeated calls (e.g. across parametrized + test runs). """ spec = importlib.util.spec_from_file_location(name, source_path) mod = importlib.util.module_from_spec(spec) # Register before exec so that relative imports inside the module # resolve correctly, and so callers can pop the module after use. sys.modules[name] = mod - # The pipeline modules import siblings via sys.path manipulation; - # let them do that as part of their normal import. - spec.loader.exec_module(mod) + # Snapshot sys.path so that module-level sys.path.insert calls inside + # the pipeline files do not leave behind permanent extra entries. + path_snapshot = sys.path[:] + try: + spec.loader.exec_module(mod) + except BaseException: + # Roll back the registration so the failed module does not pollute + # sys.modules (CPython convention for importlib loaders). + sys.modules.pop(name, None) + raise + finally: + # Restore sys.path to prevent the module's own insertions from + # accumulating across repeated calls. + sys.path[:] = path_snapshot return mod @@ -264,7 +280,7 @@ def test_pipeline_uses_unicode_when_stdout_supports_it(monkeypatch, mod_name, re # assertion failure messages are safe on cp1252 consoles — printing # the literal characters (U+2713, U+2717, U+2192) would itself raise # UnicodeEncodeError on the very platform these tests guard against. - assert mod.SYM_OK == "\u2713" # CHECK MARK (U+2713) + assert mod.SYM_OK == "\u2713" # CHECK MARK (U+2713) assert mod.SYM_FAIL == "\u2717" # BALLOT X (U+2717) assert mod.SYM_ARROW == "\u2192" # RIGHTWARDS ARROW (U+2192) finally: