From 0ade230a070e57e9674d565c2dad15f4b13ced9f Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Mon, 4 May 2026 21:24:34 +0300 Subject: [PATCH 1/2] fix: centralize UTF-8 file I/O for Windows compatibility Bare open() calls use the system encoding (cp1252 on Windows), causing 'charmap codec can't decode byte ...' errors when parsing repositories containing non-ASCII characters such as curly quotes. Adds utilities/file_io.py with open_utf8, read_json, write_json, and run_utf8 helpers, and migrates ~190 bare open() call sites across libs/openant-core/ (core, parsers, utilities, openant CLI, top-level scripts) to specify encoding="utf-8" explicitly. Also sets encoding/errors on the docker_executor subprocess.run that captures container stdout/stderr as text. Includes a regression test that scans non-test code for any bare open() call without an encoding= argument and fails if a regression reappears. Addresses item 9 from #16. --- .../context/application_context.py | 4 +- libs/openant-core/core/analyzer.py | 4 +- libs/openant-core/core/checkpoint.py | 10 +- libs/openant-core/core/dynamic_tester.py | 4 +- libs/openant-core/core/enhancer.py | 4 +- libs/openant-core/core/parser_adapter.py | 24 +- libs/openant-core/core/reporter.py | 18 +- libs/openant-core/core/scanner.py | 6 +- libs/openant-core/core/schemas.py | 2 +- libs/openant-core/core/verifier.py | 4 +- libs/openant-core/experiment.py | 6 +- libs/openant-core/export_csv.py | 4 +- libs/openant-core/generate_report.py | 4 +- libs/openant-core/openant/cli.py | 16 +- .../parsers/c/call_graph_builder.py | 4 +- .../parsers/c/function_extractor.py | 4 +- .../parsers/c/repository_scanner.py | 2 +- libs/openant-core/parsers/c/test_pipeline.py | 28 +- libs/openant-core/parsers/c/unit_generator.py | 6 +- libs/openant-core/parsers/go/test_pipeline.py | 28 +- .../parsers/javascript/test_pipeline.py | 36 +-- .../parsers/php/call_graph_builder.py | 4 +- .../parsers/php/function_extractor.py | 4 +- .../parsers/php/repository_scanner.py | 2 +- .../openant-core/parsers/php/test_pipeline.py | 28 +- .../parsers/php/unit_generator.py | 6 +- .../openant-core/parsers/python/ast_parser.py | 2 +- .../parsers/python/call_graph_builder.py | 4 +- .../parsers/python/dataset_enhancer.py | 4 +- .../parsers/python/function_extractor.py | 4 +- .../parsers/python/parse_repository.py | 12 +- .../parsers/python/repository_scanner.py | 2 +- .../parsers/python/unit_generator.py | 4 +- .../parsers/ruby/call_graph_builder.py | 4 +- .../parsers/ruby/function_extractor.py | 4 +- .../parsers/ruby/repository_scanner.py | 2 +- .../parsers/ruby/test_pipeline.py | 28 +- .../parsers/ruby/unit_generator.py | 6 +- .../parsers/zig/call_graph_builder.py | 2 +- .../parsers/zig/function_extractor.py | 2 +- .../parsers/zig/repository_scanner.py | 2 +- .../openant-core/parsers/zig/test_pipeline.py | 4 +- .../parsers/zig/unit_generator.py | 4 +- libs/openant-core/tests/test_file_io.py | 288 ++++++++++++++++++ .../agentic_enhancer/repository_index.py | 2 +- .../utilities/context_enhancer.py | 14 +- .../utilities/dynamic_tester/__init__.py | 6 +- .../dynamic_tester/docker_executor.py | 12 +- libs/openant-core/utilities/file_io.py | 60 ++++ libs/openant-core/validate_dataset_schema.py | 2 +- 50 files changed, 543 insertions(+), 193 deletions(-) create mode 100644 libs/openant-core/tests/test_file_io.py create mode 100644 libs/openant-core/utilities/file_io.py diff --git a/libs/openant-core/context/application_context.py b/libs/openant-core/context/application_context.py index f7fa55d..173a814 100644 --- a/libs/openant-core/context/application_context.py +++ b/libs/openant-core/context/application_context.py @@ -545,7 +545,7 @@ def save_context(context: ApplicationContext, output_path: Path) -> None: output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) - with open(output_path, 'w') as f: + with open(output_path, 'w', encoding="utf-8") as f: json.dump(asdict(context), f, indent=2) print(f"Context saved to {output_path}", file=sys.stderr) @@ -560,7 +560,7 @@ def load_context(input_path: Path) -> ApplicationContext: Returns: ApplicationContext loaded from file. """ - with open(input_path) as f: + with open(input_path, encoding="utf-8") as f: data = json.load(f) # Mark as manual to skip validation (already validated when saved) diff --git a/libs/openant-core/core/analyzer.py b/libs/openant-core/core/analyzer.py index 7fb5966..9ef0ec1 100644 --- a/libs/openant-core/core/analyzer.py +++ b/libs/openant-core/core/analyzer.py @@ -330,7 +330,7 @@ def run_analysis( # Load dataset print(f"[Analyze] Loading dataset: {dataset_path}", file=sys.stderr) - with open(dataset_path) as f: + with open(dataset_path, encoding="utf-8") as f: dataset = json.load(f) units = dataset.get("units", []) @@ -513,7 +513,7 @@ def _summary_callback(finding, usage=None): "code_by_route": code_by_route, } - with open(results_path, "w") as f: + with open(results_path, "w", encoding="utf-8") as f: json.dump(experiment_result, f, indent=2) print(f"\n[Analyze] Results written to {results_path}", file=sys.stderr) diff --git a/libs/openant-core/core/checkpoint.py b/libs/openant-core/core/checkpoint.py index 7c42f52..f3578a7 100644 --- a/libs/openant-core/core/checkpoint.py +++ b/libs/openant-core/core/checkpoint.py @@ -79,7 +79,7 @@ def load(self) -> dict[str, dict]: continue filepath = os.path.join(self.dir, filename) try: - with open(filepath, "r") as f: + with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) unit_id = data.get("id") if unit_id: @@ -130,7 +130,7 @@ def save(self, unit_id: str, data: dict): filename = self._safe_filename(unit_id) + ".json" filepath = os.path.join(self.dir, filename) data["id"] = unit_id # ensure id is always present - with open(filepath, "w") as f: + with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) def write_summary( @@ -168,7 +168,7 @@ def write_summary( } if usage is not None: data["usage"] = usage - with open(filepath, "w") as f: + with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) @staticmethod @@ -182,7 +182,7 @@ def read_summary(checkpoint_dir: str) -> dict | None: if not os.path.isfile(filepath): return None try: - with open(filepath, "r") as f: + with open(filepath, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): return None @@ -241,7 +241,7 @@ def status(checkpoint_dir: str) -> dict: continue filepath = os.path.join(checkpoint_dir, filename) try: - with open(filepath, "r") as f: + with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) except (json.JSONDecodeError, OSError): errors += 1 diff --git a/libs/openant-core/core/dynamic_tester.py b/libs/openant-core/core/dynamic_tester.py index 9f9c10d..af83778 100644 --- a/libs/openant-core/core/dynamic_tester.py +++ b/libs/openant-core/core/dynamic_tester.py @@ -51,7 +51,7 @@ def run_tests( os.makedirs(output_dir, exist_ok=True) # Check how many findings to test - with open(pipeline_output_path) as f: + with open(pipeline_output_path, encoding="utf-8") as f: pipeline_data = json.load(f) findings = pipeline_data.get("findings", []) @@ -65,7 +65,7 @@ def run_tests( if not testable: results_path = os.path.join(output_dir, "dynamic_test_results.json") - with open(results_path, "w") as f: + with open(results_path, "w", encoding="utf-8") as f: json.dump({"findings_tested": 0, "results": []}, f, indent=2) return DynamicTestStepResult( diff --git a/libs/openant-core/core/enhancer.py b/libs/openant-core/core/enhancer.py index fef1453..d1697ad 100644 --- a/libs/openant-core/core/enhancer.py +++ b/libs/openant-core/core/enhancer.py @@ -69,7 +69,7 @@ def enhance_dataset( # Load dataset print(f"[Enhance] Loading dataset: {dataset_path}", file=sys.stderr) - with open(dataset_path) as f: + with open(dataset_path, encoding="utf-8") as f: dataset = json.load(f) units = dataset.get("units", []) @@ -138,7 +138,7 @@ def _on_restored(count: int): # Write enhanced dataset os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) - with open(output_path, "w") as f: + with open(output_path, "w", encoding="utf-8") as f: json.dump(enhanced, f, indent=2) print(f"[Enhance] Enhanced dataset: {output_path}", file=sys.stderr) diff --git a/libs/openant-core/core/parser_adapter.py b/libs/openant-core/core/parser_adapter.py index 314d470..767198c 100644 --- a/libs/openant-core/core/parser_adapter.py +++ b/libs/openant-core/core/parser_adapter.py @@ -161,7 +161,7 @@ def _maybe_apply_diff_filter( ) return - with open(result.dataset_path, "r") as f: + with open(result.dataset_path, "r", encoding="utf-8") as f: dataset = json.load(f) # Dataset may be a dict with "units" or a raw list. @@ -172,13 +172,13 @@ def _maybe_apply_diff_filter( stats = apply_diff_filter(units, manifest) - with open(result.dataset_path, "w") as f: + with open(result.dataset_path, "w", encoding="utf-8") as f: json.dump(dataset, f, indent=2) # Expose stats on the ParseResult via a side-channel file; the parse # step_context reads this when assembling parse.report.json. diff_report_path = os.path.join(output_dir, "diff_filter.report.json") - with open(diff_report_path, "w") as f: + with open(diff_report_path, "w", encoding="utf-8") as f: json.dump(stats.to_dict(), f, indent=2) print( @@ -245,7 +245,7 @@ def _load_module(name, filename): print(f"\n[Reachability Filter] Filtering to {processing_level} units...", file=sys.stderr) - with open(call_graph_path, "r") as f: + with open(call_graph_path, "r", encoding="utf-8") as f: call_graph_data = json.load(f) functions = call_graph_data.get("functions", {}) @@ -352,10 +352,10 @@ def _parse_python(repo_path: str, output_dir: str, processing_level: str, skip_t dataset = _apply_reachability_filter(dataset, output_dir, processing_level) # Write outputs - with open(dataset_path, "w") as f: + with open(dataset_path, "w", encoding="utf-8") as f: json.dump(dataset, f, indent=2) - with open(analyzer_output_path, "w") as f: + with open(analyzer_output_path, "w", encoding="utf-8") as f: json.dump(analyzer_output, f, indent=2) units_count = len(dataset.get("units", [])) @@ -413,7 +413,7 @@ def _parse_javascript(repo_path: str, output_dir: str, processing_level: str, sk # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: + with open(dataset_path, encoding="utf-8") as f: data = json.load(f) units_count = len(data.get("units", [])) @@ -470,7 +470,7 @@ def _parse_go(repo_path: str, output_dir: str, processing_level: str, skip_tests # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: + with open(dataset_path, encoding="utf-8") as f: data = json.load(f) units_count = len(data.get("units", [])) @@ -530,7 +530,7 @@ def _parse_c(repo_path: str, output_dir: str, processing_level: str, skip_tests: # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: + with open(dataset_path, encoding="utf-8") as f: data = json.load(f) units_count = len(data.get("units", [])) @@ -590,7 +590,7 @@ def _parse_ruby(repo_path: str, output_dir: str, processing_level: str, skip_tes # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: + with open(dataset_path, encoding="utf-8") as f: data = json.load(f) units_count = len(data.get("units", [])) @@ -650,7 +650,7 @@ def _parse_php(repo_path: str, output_dir: str, processing_level: str, skip_test # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: + with open(dataset_path, encoding="utf-8") as f: data = json.load(f) units_count = len(data.get("units", [])) @@ -710,7 +710,7 @@ def _parse_zig(repo_path: str, output_dir: str, processing_level: str, skip_test # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: + with open(dataset_path, encoding="utf-8") as f: data = json.load(f) units_count = len(data.get("units", [])) diff --git a/libs/openant-core/core/reporter.py b/libs/openant-core/core/reporter.py index 7153dab..c471cc3 100644 --- a/libs/openant-core/core/reporter.py +++ b/libs/openant-core/core/reporter.py @@ -34,7 +34,7 @@ def _load_diff_metadata(scan_dir: str) -> dict | None: if not os.path.exists(manifest_path): return None try: - with open(manifest_path) as f: + with open(manifest_path, encoding="utf-8") as f: manifest = json.load(f) except (json.JSONDecodeError, OSError): return None @@ -50,7 +50,7 @@ def _load_diff_metadata(scan_dir: str) -> dict | None: filter_report = os.path.join(scan_dir, "diff_filter.report.json") if os.path.exists(filter_report): try: - with open(filter_report) as f: + with open(filter_report, encoding="utf-8") as f: stats = json.load(f) out["units_in_diff"] = stats.get("selected") out["units_total_parsed"] = stats.get("total") @@ -129,7 +129,7 @@ def _dedup_caller_callee( return confirmed try: - with open(call_graph_path) as f: + with open(call_graph_path, encoding="utf-8") as f: cg_data = json.load(f) except (json.JSONDecodeError, OSError): return confirmed @@ -212,7 +212,7 @@ def build_pipeline_output( """ print(f"[Report] Building pipeline_output.json...", file=sys.stderr) - with open(results_path) as f: + with open(results_path, encoding="utf-8") as f: experiment = json.load(f) all_results = experiment.get("results", []) @@ -371,7 +371,7 @@ def build_pipeline_output( print(_banner, file=sys.stderr) os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) - with open(output_path, "w") as f: + with open(output_path, "w", encoding="utf-8") as f: json.dump(pipeline_output, f, indent=2, ensure_ascii=False) print(f" pipeline_output.json: {len(findings_data)} findings", file=sys.stderr) @@ -469,7 +469,7 @@ def generate_summary_report( print("[Report] Generating summary report (LLM)...", file=sys.stderr) - with open(results_path) as f: + with open(results_path, encoding="utf-8") as f: pipeline_data = json.load(f) # Merge dynamic test results if available @@ -483,7 +483,7 @@ def generate_summary_report( report_text, usage = _generate_summary(pipeline_data) os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) - with open(output_path, "w") as f: + with open(output_path, "w", encoding="utf-8") as f: f.write(report_text) print(f" Summary report: {output_path}", file=sys.stderr) @@ -517,7 +517,7 @@ def generate_disclosure_docs( print("[Report] Generating disclosure documents (LLM)...", file=sys.stderr) - with open(results_path) as f: + with open(results_path, encoding="utf-8") as f: pipeline_data = json.load(f) # Merge dynamic test results if available @@ -552,7 +552,7 @@ def _one(args): safe_name = finding["short_name"].replace(" ", "_").upper() filename = f"DISCLOSURE_{i:02d}_{safe_name}.md" filepath = os.path.join(output_dir, filename) - with open(filepath, "w") as f: + with open(filepath, "w", encoding="utf-8") as f: f.write(disclosure_text) return finding["short_name"], filepath, usage diff --git a/libs/openant-core/core/scanner.py b/libs/openant-core/core/scanner.py index f081352..1c52191 100644 --- a/libs/openant-core/core/scanner.py +++ b/libs/openant-core/core/scanner.py @@ -149,7 +149,7 @@ def _step_label(name: str) -> str: _diff_report = os.path.join(output_dir, "diff_filter.report.json") if os.path.exists(_diff_report): try: - with open(_diff_report) as _f: + with open(_diff_report, encoding="utf-8") as _f: ctx.summary["diff_stats"] = json.load(_f) except (json.JSONDecodeError, OSError): pass @@ -542,7 +542,7 @@ def _load_step_report(output_dir: str, step: str) -> dict: """Load a step report JSON from disk. Returns empty dict on failure.""" path = os.path.join(output_dir, f"{step}.report.json") try: - with open(path) as f: + with open(path, encoding="utf-8") as f: return json.load(f) except Exception: return {"step": step, "status": "unknown"} @@ -551,7 +551,7 @@ def _load_step_report(output_dir: str, step: str) -> dict: def _read_app_type(app_context_path: str) -> str | None: """Read application_type from an app context JSON file.""" try: - with open(app_context_path) as f: + with open(app_context_path, encoding="utf-8") as f: data = json.load(f) return data.get("application_type") except Exception: diff --git a/libs/openant-core/core/schemas.py b/libs/openant-core/core/schemas.py index 88d30d4..0ffb01c 100644 --- a/libs/openant-core/core/schemas.py +++ b/libs/openant-core/core/schemas.py @@ -268,6 +268,6 @@ def write(self, output_dir: str) -> str: """Write ``{step}.report.json`` to *output_dir*. Returns the path.""" os.makedirs(output_dir, exist_ok=True) path = os.path.join(output_dir, f"{self.step}.report.json") - with open(path, "w") as f: + with open(path, "w", encoding="utf-8") as f: json.dump(self.to_dict(), f, indent=2) return path diff --git a/libs/openant-core/core/verifier.py b/libs/openant-core/core/verifier.py index fa7a43f..34da654 100644 --- a/libs/openant-core/core/verifier.py +++ b/libs/openant-core/core/verifier.py @@ -80,7 +80,7 @@ def run_verification( # Load Stage 1 results print(f"[Verify] Loading results: {results_path}", file=sys.stderr) - with open(results_path) as f: + with open(results_path, encoding="utf-8") as f: experiment = json.load(f) all_results = experiment.get("results", []) @@ -268,7 +268,7 @@ def _write_verified_results( output["metrics"] = {"total": len(merged_results), **counts} - with open(path, "w") as f: + with open(path, "w", encoding="utf-8") as f: json.dump(output, f, indent=2, ensure_ascii=False) diff --git a/libs/openant-core/experiment.py b/libs/openant-core/experiment.py index 359d41f..e1657b0 100644 --- a/libs/openant-core/experiment.py +++ b/libs/openant-core/experiment.py @@ -211,7 +211,7 @@ def load_dataset(name: str, enhanced: bool = False) -> dict: if not path or not os.path.exists(path): raise ValueError(f"Dataset not found: {name} (enhanced={enhanced})") - with open(path, "r") as f: + with open(path, "r", encoding="utf-8") as f: return json.load(f) @@ -221,7 +221,7 @@ def load_ground_truth(name: str) -> dict: if not path or not os.path.exists(path): return {} - with open(path, "r") as f: + with open(path, "r", encoding="utf-8") as f: return json.load(f) @@ -1034,7 +1034,7 @@ def main(): suffix = "" if args.no_enhanced else "_enhanced" output_path = f"experiment_{args.dataset}_{args.model}{suffix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - with open(output_path, "w") as f: + with open(output_path, "w", encoding="utf-8") as f: json.dump(experiment, f, indent=2) print() diff --git a/libs/openant-core/export_csv.py b/libs/openant-core/export_csv.py index 8b69300..bcde479 100644 --- a/libs/openant-core/export_csv.py +++ b/libs/openant-core/export_csv.py @@ -41,7 +41,7 @@ def _load_diff_block(experiment_path: str) -> dict | None: if not os.path.exists(candidate): return None try: - with open(candidate) as f: + with open(candidate, encoding="utf-8") as f: data = json.load(f) except (json.JSONDecodeError, OSError): return None @@ -67,7 +67,7 @@ def _format_diff_banner(diff: dict) -> str: def load_json(path: str) -> dict: """Load JSON file.""" - with open(path, 'r') as f: + with open(path, 'r', encoding="utf-8") as f: return json.load(f) diff --git a/libs/openant-core/generate_report.py b/libs/openant-core/generate_report.py index 633cd9b..662bc5b 100644 --- a/libs/openant-core/generate_report.py +++ b/libs/openant-core/generate_report.py @@ -42,7 +42,7 @@ def load_json(path: str) -> dict: """Load JSON file.""" - with open(path, 'r') as f: + with open(path, 'r', encoding="utf-8") as f: return json.load(f) @@ -83,7 +83,7 @@ def _load_pipeline_metadata(experiment_path: str) -> tuple[dict | None, dict | N if not os.path.exists(candidate): return None, None try: - with open(candidate, 'r') as f: + with open(candidate, 'r', encoding="utf-8") as f: data = json.load(f) except (json.JSONDecodeError, OSError): return None, None diff --git a/libs/openant-core/openant/cli.py b/libs/openant-core/openant/cli.py index b0ce345..673f5d6 100644 --- a/libs/openant-core/openant/cli.py +++ b/libs/openant-core/openant/cli.py @@ -39,7 +39,7 @@ def _load_step_reports(directory: str) -> list[dict]: reports = [] for path in glob.glob(os.path.join(directory, "*.report.json")): try: - with open(path) as f: + with open(path, encoding="utf-8") as f: reports.append(json.load(f)) except (json.JSONDecodeError, OSError): continue @@ -82,7 +82,7 @@ def cmd_scan(args): # is the same one written into pipeline_output.json by reporter.py. if result.pipeline_output_path and os.path.exists(result.pipeline_output_path): try: - with open(result.pipeline_output_path) as f: + with open(result.pipeline_output_path, encoding="utf-8") as f: po = json.load(f) diff_block = po.get("diff") if isinstance(diff_block, dict) and diff_block.get("mode") == "incremental": @@ -135,7 +135,7 @@ def cmd_parse(args): diff_report = os.path.join(output_dir, "diff_filter.report.json") if os.path.exists(diff_report): try: - with open(diff_report) as f: + with open(diff_report, encoding="utf-8") as f: ctx.summary["diff_stats"] = json.load(f) except (json.JSONDecodeError, OSError): pass @@ -607,9 +607,9 @@ def cmd_report_data(args): "dataset_path": os.path.abspath(dataset_path), }) as ctx: # Load data - with open(results_path) as f: + with open(results_path, encoding="utf-8") as f: experiment = json.load(f) - with open(dataset_path) as f: + with open(dataset_path, encoding="utf-8") as f: dataset = json.load(f) # --- Load dynamic test results if available --- @@ -620,9 +620,9 @@ def cmd_report_data(args): dt_path = os.path.join(results_dir, "dynamic_test_results.json") po_path = os.path.join(results_dir, "pipeline_output.json") if os.path.exists(dt_path) and os.path.exists(po_path): - with open(dt_path) as f: + with open(dt_path, encoding="utf-8") as f: dt_data = json.load(f) - with open(po_path) as f: + with open(po_path, encoding="utf-8") as f: po_data = json.load(f) # Map VULN-ID → route_key from pipeline_output @@ -876,7 +876,7 @@ def _linkify_finding(m): diff_block = None if os.path.exists(po_path): try: - with open(po_path) as f: + with open(po_path, encoding="utf-8") as f: po = json.load(f) repo_info = po.get("repository", {}) repo_name = repo_info.get("name", "") diff --git a/libs/openant-core/parsers/c/call_graph_builder.py b/libs/openant-core/parsers/c/call_graph_builder.py index 84e5988..4fda303 100644 --- a/libs/openant-core/parsers/c/call_graph_builder.py +++ b/libs/openant-core/parsers/c/call_graph_builder.py @@ -423,7 +423,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: + with open(args.input_file, encoding="utf-8") as f: extractor_output = json.load(f) print(f"Processing {len(extractor_output.get('functions', {}))} functions...", file=sys.stderr) @@ -444,7 +444,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Output written to: {args.output}", file=sys.stderr) else: diff --git a/libs/openant-core/parsers/c/function_extractor.py b/libs/openant-core/parsers/c/function_extractor.py index 10b5f70..0cde912 100644 --- a/libs/openant-core/parsers/c/function_extractor.py +++ b/libs/openant-core/parsers/c/function_extractor.py @@ -575,7 +575,7 @@ def main(): extractor = FunctionExtractor(args.repo_path) if args.scan_file: - with open(args.scan_file) as f: + with open(args.scan_file, encoding="utf-8") as f: scan_result = json.load(f) result = extractor.extract_from_scan(scan_result) else: @@ -584,7 +584,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Extraction complete. Results written to: {args.output}", file=sys.stderr) print(f"Total functions: {result['statistics']['total_functions']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/c/repository_scanner.py b/libs/openant-core/parsers/c/repository_scanner.py index 6706f92..c57ec96 100644 --- a/libs/openant-core/parsers/c/repository_scanner.py +++ b/libs/openant-core/parsers/c/repository_scanner.py @@ -225,7 +225,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Scan complete. Results written to: {args.output}", file=sys.stderr) print(f"Total files found: {result['statistics']['total_files']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/c/test_pipeline.py b/libs/openant-core/parsers/c/test_pipeline.py index 3f18635..0475cd9 100644 --- a/libs/openant-core/parsers/c/test_pipeline.py +++ b/libs/openant-core/parsers/c/test_pipeline.py @@ -139,7 +139,7 @@ def run_parser_pipeline(self) -> bool: # Save scan results self.scan_results_file = os.path.join(self.output_dir, 'scan_results.json') - with open(self.scan_results_file, 'w') as f: + with open(self.scan_results_file, 'w', encoding="utf-8") as f: json.dump(scan_result, f, indent=2) # Stage 2: Extract functions @@ -178,12 +178,12 @@ def run_parser_pipeline(self) -> bool: print(f" Avg upstream deps: {dataset['statistics']['avg_upstream']}") # Write dataset - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) # Write analyzer output analyzer_output = generator.generate_analyzer_output() - with open(self.analyzer_output_file, 'w') as f: + with open(self.analyzer_output_file, 'w', encoding="utf-8") as f: json.dump(analyzer_output, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -242,7 +242,7 @@ def apply_reachability_filter(self) -> bool: start_time = datetime.now() try: - with open(self.analyzer_output_file, 'r') as f: + with open(self.analyzer_output_file, 'r', encoding="utf-8") as f: analyzer = json.load(f) functions = analyzer.get("functions", {}) @@ -262,7 +262,7 @@ def apply_reachability_filter(self) -> bool: } # Build call graph from dataset unit metadata - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) call_graph = {} @@ -313,7 +313,7 @@ def apply_reachability_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -443,7 +443,7 @@ def run_codeql_analysis(self) -> bool: } return False - with open(sarif_output, 'r') as f: + with open(sarif_output, 'r', encoding="utf-8") as f: sarif_data = json.load(f) self.codeql_findings = [] @@ -555,7 +555,7 @@ def apply_codeql_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) # Build mapping of file -> [(start_line, end_line, func_id)] @@ -605,7 +605,7 @@ def apply_codeql_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -662,7 +662,7 @@ def run_context_enhancer(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) enhancer = ContextEnhancer() @@ -695,7 +695,7 @@ def run_context_enhancer(self) -> bool: 'data_flows_extracted': enhancer.stats['data_flows_extracted'] } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(enhanced, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -740,7 +740,7 @@ def apply_exploitable_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) units = dataset.get("units", []) @@ -767,7 +767,7 @@ def apply_exploitable_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -908,7 +908,7 @@ def run_full_pipeline(self): # Save results summary results_file = os.path.join(self.output_dir, 'pipeline_results.json') - with open(results_file, 'w') as f: + with open(results_file, 'w', encoding="utf-8") as f: clean_results = { 'repository': self.results['repository'], 'test_time': self.results['test_time'], diff --git a/libs/openant-core/parsers/c/unit_generator.py b/libs/openant-core/parsers/c/unit_generator.py index a0391d7..220d7b9 100644 --- a/libs/openant-core/parsers/c/unit_generator.py +++ b/libs/openant-core/parsers/c/unit_generator.py @@ -343,7 +343,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: + with open(args.input_file, encoding="utf-8") as f: call_graph_data = json.load(f) options = { @@ -373,7 +373,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"\nOutput written to: {args.output}", file=sys.stderr) else: @@ -382,7 +382,7 @@ def main(): # Write analyzer output if requested if args.analyzer_output: analyzer = generator.generate_analyzer_output() - with open(args.analyzer_output, 'w') as f: + with open(args.analyzer_output, 'w', encoding="utf-8") as f: json.dump(analyzer, f, indent=2) print(f"Analyzer output written to: {args.analyzer_output}", file=sys.stderr) diff --git a/libs/openant-core/parsers/go/test_pipeline.py b/libs/openant-core/parsers/go/test_pipeline.py index 8fe05b8..7aa9880 100644 --- a/libs/openant-core/parsers/go/test_pipeline.py +++ b/libs/openant-core/parsers/go/test_pipeline.py @@ -168,7 +168,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: # Load and summarize output if os.path.exists(output_file): - with open(output_file, 'r') as f: + with open(output_file, 'r', encoding="utf-8") as f: data = json.load(f) stage_result['summary'] = self._summarize_output(name, data) else: @@ -244,10 +244,10 @@ def run_go_parser_all(self) -> bool: # Post-process: apply dataset name if specified (Go binary doesn't support --name) if result.get('success', False) and self.dataset_name and os.path.exists(self.dataset_file): try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) dataset['name'] = self.dataset_name - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) except Exception as e: print(f"Warning: Could not apply dataset name: {e}") @@ -282,7 +282,7 @@ def apply_reachability_filter(self) -> bool: try: # Load analyzer output for call graph - with open(self.analyzer_output_file, 'r') as f: + with open(self.analyzer_output_file, 'r', encoding="utf-8") as f: analyzer = json.load(f) functions = analyzer.get("functions", {}) @@ -304,7 +304,7 @@ def apply_reachability_filter(self) -> bool: } # Load call graph from dataset (go_parser puts it in statistics) - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) # Build call graph from unit metadata @@ -359,7 +359,7 @@ def apply_reachability_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -498,7 +498,7 @@ def run_codeql_analysis(self) -> bool: } return False - with open(sarif_output, 'r') as f: + with open(sarif_output, 'r', encoding="utf-8") as f: sarif_data = json.load(f) # Extract findings and map to file:line @@ -620,7 +620,7 @@ def apply_codeql_filter(self) -> bool: try: # Load dataset to get function line ranges - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) # Build mapping of file -> [(start_line, end_line, func_id)] @@ -675,7 +675,7 @@ def apply_codeql_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -733,7 +733,7 @@ def run_context_enhancer(self) -> bool: try: # Load dataset - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) # Enhance with LLM @@ -771,7 +771,7 @@ def run_context_enhancer(self) -> bool: } # Write back - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(enhanced, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -824,7 +824,7 @@ def apply_exploitable_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) units = dataset.get("units", []) @@ -854,7 +854,7 @@ def apply_exploitable_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -1002,7 +1002,7 @@ def run_full_pipeline(self): # Save results summary results_file = os.path.join(self.output_dir, 'pipeline_results.json') - with open(results_file, 'w') as f: + with open(results_file, 'w', encoding="utf-8") as f: # Remove stdout/stderr from saved results (too verbose) clean_results = { 'repository': self.results['repository'], diff --git a/libs/openant-core/parsers/javascript/test_pipeline.py b/libs/openant-core/parsers/javascript/test_pipeline.py index 77ab9c4..abd7815 100644 --- a/libs/openant-core/parsers/javascript/test_pipeline.py +++ b/libs/openant-core/parsers/javascript/test_pipeline.py @@ -154,7 +154,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: # Load and summarize output if os.path.exists(output_file): - with open(output_file, 'r') as f: + with open(output_file, 'r', encoding="utf-8") as f: data = json.load(f) stage_result['summary'] = self._summarize_output(name, data) else: @@ -242,7 +242,7 @@ def run_typescript_analyzer(self, files: list = None) -> bool: # If no specific files, use ALL files from scan results if not files and self.scan_results_file and os.path.exists(self.scan_results_file): - with open(self.scan_results_file, 'r') as f: + with open(self.scan_results_file, 'r', encoding="utf-8") as f: scan_data = json.load(f) files = [f['path'] for f in scan_data.get('files', [])] @@ -252,7 +252,7 @@ def run_typescript_analyzer(self, files: list = None) -> bool: # Write file list to a temporary file to avoid command-line length limits file_list_path = os.path.join(self.output_dir, 'file_list.txt') - with open(file_list_path, 'w') as f: + with open(file_list_path, 'w', encoding="utf-8") as f: for file_path in files: # Convert relative path to absolute if not os.path.isabs(file_path): @@ -300,7 +300,7 @@ def run_stage_with_stdout_capture(self, name: str, command: list, output_file: s if result.returncode == 0: # Write stdout to output file - with open(output_file, 'w') as f: + with open(output_file, 'w', encoding="utf-8") as f: f.write(result.stdout) print(f"✓ Success ({elapsed:.2f}s)") @@ -313,7 +313,7 @@ def run_stage_with_stdout_capture(self, name: str, command: list, output_file: s # Load and summarize output if os.path.exists(output_file): - with open(output_file, 'r') as f: + with open(output_file, 'r', encoding="utf-8") as f: data = json.load(f) summary = self._summarize_output(name, data) else: @@ -391,7 +391,7 @@ def run_context_enhancer(self) -> bool: try: # Load dataset - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) # Enhance with LLM @@ -432,7 +432,7 @@ def run_context_enhancer(self) -> bool: } # Write back - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(enhanced, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -490,7 +490,7 @@ def apply_reachability_filter(self) -> bool: try: # Load analyzer output for call graph - with open(self.analyzer_output_file, 'r') as f: + with open(self.analyzer_output_file, 'r', encoding="utf-8") as f: analyzer = json.load(f) functions = analyzer.get("functions", {}) @@ -510,7 +510,7 @@ def apply_reachability_filter(self) -> bool: self.reachable_units = reachability.get_all_reachable() # Load and filter dataset - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) units = dataset.get("units", []) @@ -539,7 +539,7 @@ def apply_reachability_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -590,7 +590,7 @@ def _detect_codeql_language(self) -> str: return "javascript" # Default try: - with open(self.scan_results_file, 'r') as f: + with open(self.scan_results_file, 'r', encoding="utf-8") as f: scan_data = json.load(f) stats = scan_data.get('statistics', {}) @@ -706,7 +706,7 @@ def run_codeql_analysis(self) -> bool: } return False - with open(sarif_output, 'r') as f: + with open(sarif_output, 'r', encoding="utf-8") as f: sarif_data = json.load(f) # Extract findings and map to file:line @@ -830,7 +830,7 @@ def apply_codeql_filter(self) -> bool: try: # Load analyzer output to get function line ranges - with open(self.analyzer_output_file, 'r') as f: + with open(self.analyzer_output_file, 'r', encoding="utf-8") as f: analyzer = json.load(f) functions = analyzer.get("functions", {}) @@ -869,7 +869,7 @@ def apply_codeql_filter(self) -> bool: self.codeql_flagged_units.add(func_id) # Load and filter dataset - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) units = dataset.get("units", []) @@ -891,7 +891,7 @@ def apply_codeql_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -955,7 +955,7 @@ def apply_exploitable_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) units = dataset.get("units", []) @@ -985,7 +985,7 @@ def apply_exploitable_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -1143,7 +1143,7 @@ def run_full_pipeline(self): # Save results summary results_file = os.path.join(self.output_dir, 'pipeline_results.json') - with open(results_file, 'w') as f: + with open(results_file, 'w', encoding="utf-8") as f: # Remove stdout/stderr from saved results (too verbose) clean_results = { 'repository': self.results['repository'], diff --git a/libs/openant-core/parsers/php/call_graph_builder.py b/libs/openant-core/parsers/php/call_graph_builder.py index dfa441e..165c483 100644 --- a/libs/openant-core/parsers/php/call_graph_builder.py +++ b/libs/openant-core/parsers/php/call_graph_builder.py @@ -482,7 +482,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: + with open(args.input_file, encoding="utf-8") as f: extractor_output = json.load(f) print(f"Processing {len(extractor_output.get('functions', {}))} functions...", file=sys.stderr) @@ -503,7 +503,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Output written to: {args.output}", file=sys.stderr) else: diff --git a/libs/openant-core/parsers/php/function_extractor.py b/libs/openant-core/parsers/php/function_extractor.py index bdedecf..1fb6b31 100644 --- a/libs/openant-core/parsers/php/function_extractor.py +++ b/libs/openant-core/parsers/php/function_extractor.py @@ -547,7 +547,7 @@ def main(): extractor = FunctionExtractor(args.repo_path) if args.scan_file: - with open(args.scan_file) as f: + with open(args.scan_file, encoding="utf-8") as f: scan_result = json.load(f) result = extractor.extract_from_scan(scan_result) else: @@ -556,7 +556,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Extraction complete. Results written to: {args.output}", file=sys.stderr) print(f"Total functions: {result['statistics']['total_functions']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/php/repository_scanner.py b/libs/openant-core/parsers/php/repository_scanner.py index bd8a2d9..96b64fd 100644 --- a/libs/openant-core/parsers/php/repository_scanner.py +++ b/libs/openant-core/parsers/php/repository_scanner.py @@ -236,7 +236,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Scan complete. Results written to: {args.output}", file=sys.stderr) print(f"Total files found: {result['statistics']['total_files']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/php/test_pipeline.py b/libs/openant-core/parsers/php/test_pipeline.py index fd10477..e2f9eb2 100644 --- a/libs/openant-core/parsers/php/test_pipeline.py +++ b/libs/openant-core/parsers/php/test_pipeline.py @@ -139,7 +139,7 @@ def run_parser_pipeline(self) -> bool: # Save scan results self.scan_results_file = os.path.join(self.output_dir, 'scan_results.json') - with open(self.scan_results_file, 'w') as f: + with open(self.scan_results_file, 'w', encoding="utf-8") as f: json.dump(scan_result, f, indent=2) # Stage 2: Extract functions @@ -178,12 +178,12 @@ def run_parser_pipeline(self) -> bool: print(f" Avg upstream deps: {dataset['statistics']['avg_upstream']}") # Write dataset - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) # Write analyzer output analyzer_output = generator.generate_analyzer_output() - with open(self.analyzer_output_file, 'w') as f: + with open(self.analyzer_output_file, 'w', encoding="utf-8") as f: json.dump(analyzer_output, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -242,7 +242,7 @@ def apply_reachability_filter(self) -> bool: start_time = datetime.now() try: - with open(self.analyzer_output_file, 'r') as f: + with open(self.analyzer_output_file, 'r', encoding="utf-8") as f: analyzer = json.load(f) functions = analyzer.get("functions", {}) @@ -262,7 +262,7 @@ def apply_reachability_filter(self) -> bool: } # Build call graph from dataset unit metadata - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) call_graph = {} @@ -313,7 +313,7 @@ def apply_reachability_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -443,7 +443,7 @@ def run_codeql_analysis(self) -> bool: } return False - with open(sarif_output, 'r') as f: + with open(sarif_output, 'r', encoding="utf-8") as f: sarif_data = json.load(f) self.codeql_findings = [] @@ -555,7 +555,7 @@ def apply_codeql_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) # Build mapping of file -> [(start_line, end_line, func_id)] @@ -605,7 +605,7 @@ def apply_codeql_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -662,7 +662,7 @@ def run_context_enhancer(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) enhancer = ContextEnhancer() @@ -695,7 +695,7 @@ def run_context_enhancer(self) -> bool: 'data_flows_extracted': enhancer.stats['data_flows_extracted'] } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(enhanced, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -740,7 +740,7 @@ def apply_exploitable_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) units = dataset.get("units", []) @@ -767,7 +767,7 @@ def apply_exploitable_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -908,7 +908,7 @@ def run_full_pipeline(self): # Save results summary results_file = os.path.join(self.output_dir, 'pipeline_results.json') - with open(results_file, 'w') as f: + with open(results_file, 'w', encoding="utf-8") as f: clean_results = { 'repository': self.results['repository'], 'test_time': self.results['test_time'], diff --git a/libs/openant-core/parsers/php/unit_generator.py b/libs/openant-core/parsers/php/unit_generator.py index 9b36684..d7ea416 100644 --- a/libs/openant-core/parsers/php/unit_generator.py +++ b/libs/openant-core/parsers/php/unit_generator.py @@ -344,7 +344,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: + with open(args.input_file, encoding="utf-8") as f: call_graph_data = json.load(f) options = { @@ -374,7 +374,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"\nOutput written to: {args.output}", file=sys.stderr) else: @@ -383,7 +383,7 @@ def main(): # Write analyzer output if requested if args.analyzer_output: analyzer = generator.generate_analyzer_output() - with open(args.analyzer_output, 'w') as f: + with open(args.analyzer_output, 'w', encoding="utf-8") as f: json.dump(analyzer, f, indent=2) print(f"Analyzer output written to: {args.analyzer_output}", file=sys.stderr) diff --git a/libs/openant-core/parsers/python/ast_parser.py b/libs/openant-core/parsers/python/ast_parser.py index e4cdc21..64105a5 100644 --- a/libs/openant-core/parsers/python/ast_parser.py +++ b/libs/openant-core/parsers/python/ast_parser.py @@ -461,7 +461,7 @@ def main(): result = parser.parse() if output_file: - with open(output_file, 'w') as f: + with open(output_file, 'w', encoding="utf-8") as f: json.dump(result, f, indent=2) print(f"Output written to {output_file}") else: diff --git a/libs/openant-core/parsers/python/call_graph_builder.py b/libs/openant-core/parsers/python/call_graph_builder.py index 3d92b25..d175be3 100644 --- a/libs/openant-core/parsers/python/call_graph_builder.py +++ b/libs/openant-core/parsers/python/call_graph_builder.py @@ -492,7 +492,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: + with open(args.input_file, encoding="utf-8") as f: extractor_output = json.load(f) print(f"Processing {len(extractor_output.get('functions', {}))} functions...", file=sys.stderr) @@ -513,7 +513,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Output written to: {args.output}", file=sys.stderr) else: diff --git a/libs/openant-core/parsers/python/dataset_enhancer.py b/libs/openant-core/parsers/python/dataset_enhancer.py index d41f8a8..340a2f8 100644 --- a/libs/openant-core/parsers/python/dataset_enhancer.py +++ b/libs/openant-core/parsers/python/dataset_enhancer.py @@ -226,7 +226,7 @@ def resolve_recursive(current_file: Path, current_code: str, depth: int): def enhance_dataset(dataset_path: str, repo_path: str, output_path: str = None): """Enhance a dataset with resolved dependencies.""" - with open(dataset_path, 'r') as f: + with open(dataset_path, 'r', encoding="utf-8") as f: dataset = json.load(f) resolver = PythonDependencyResolver(repo_path) @@ -263,7 +263,7 @@ def enhance_dataset(dataset_path: str, repo_path: str, output_path: str = None): dataset['enhanced'] = True if output_path: - with open(output_path, 'w') as f: + with open(output_path, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) print(f"Enhanced dataset written to {output_path}") else: diff --git a/libs/openant-core/parsers/python/function_extractor.py b/libs/openant-core/parsers/python/function_extractor.py index 574ba08..23a0e32 100644 --- a/libs/openant-core/parsers/python/function_extractor.py +++ b/libs/openant-core/parsers/python/function_extractor.py @@ -596,7 +596,7 @@ def main(): extractor = FunctionExtractor(args.repo_path) if args.scan_file: - with open(args.scan_file) as f: + with open(args.scan_file, encoding="utf-8") as f: scan_result = json.load(f) result = extractor.extract_from_scan(scan_result) else: @@ -605,7 +605,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Extraction complete. Results written to: {args.output}", file=sys.stderr) print(f"Total functions: {result['statistics']['total_functions']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/python/parse_repository.py b/libs/openant-core/parsers/python/parse_repository.py index 45af852..6101069 100644 --- a/libs/openant-core/parsers/python/parse_repository.py +++ b/libs/openant-core/parsers/python/parse_repository.py @@ -138,7 +138,7 @@ def parse_repository(repo_path: str, options: dict = None) -> tuple: if output_dir: scan_file = Path(output_dir) / 'scan_result.json' - with open(scan_file, 'w') as f: + with open(scan_file, 'w', encoding="utf-8") as f: json.dump(scan_result, f, indent=2) print(f" Saved: {scan_file}", file=sys.stderr) @@ -154,7 +154,7 @@ def parse_repository(repo_path: str, options: dict = None) -> tuple: if output_dir: extract_file = Path(output_dir) / 'functions.json' - with open(extract_file, 'w') as f: + with open(extract_file, 'w', encoding="utf-8") as f: json.dump(extractor_result, f, indent=2) print(f" Saved: {extract_file}", file=sys.stderr) @@ -171,7 +171,7 @@ def parse_repository(repo_path: str, options: dict = None) -> tuple: if output_dir: graph_file = Path(output_dir) / 'call_graph.json' - with open(graph_file, 'w') as f: + with open(graph_file, 'w', encoding="utf-8") as f: json.dump(call_graph_result, f, indent=2) print(f" Saved: {graph_file}", file=sys.stderr) @@ -199,7 +199,7 @@ def parse_repository(repo_path: str, options: dict = None) -> tuple: if output_dir: analyzer_file = Path(output_dir) / 'analyzer_output.json' - with open(analyzer_file, 'w') as f: + with open(analyzer_file, 'w', encoding="utf-8") as f: json.dump(analyzer_output, f, indent=2) print(f" Saved: {analyzer_file}", file=sys.stderr) @@ -253,7 +253,7 @@ def main(): # Save dataset dataset_json = json.dumps(dataset, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(dataset_json) print(f"\nDataset written to: {args.output}", file=sys.stderr) else: @@ -261,7 +261,7 @@ def main(): # Save analyzer output if requested if args.analyzer_output: - with open(args.analyzer_output, 'w') as f: + with open(args.analyzer_output, 'w', encoding="utf-8") as f: json.dump(analyzer_output, f, indent=2) print(f"Analyzer output written to: {args.analyzer_output}", file=sys.stderr) diff --git a/libs/openant-core/parsers/python/repository_scanner.py b/libs/openant-core/parsers/python/repository_scanner.py index e2ab1f0..405a25f 100644 --- a/libs/openant-core/parsers/python/repository_scanner.py +++ b/libs/openant-core/parsers/python/repository_scanner.py @@ -289,7 +289,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Scan complete. Results written to: {args.output}", file=sys.stderr) print(f"Total files found: {result['statistics']['total_files']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/python/unit_generator.py b/libs/openant-core/parsers/python/unit_generator.py index a7d2680..8e36a18 100644 --- a/libs/openant-core/parsers/python/unit_generator.py +++ b/libs/openant-core/parsers/python/unit_generator.py @@ -400,7 +400,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: + with open(args.input_file, encoding="utf-8") as f: call_graph_data = json.load(f) options = { @@ -430,7 +430,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"\nOutput written to: {args.output}", file=sys.stderr) else: diff --git a/libs/openant-core/parsers/ruby/call_graph_builder.py b/libs/openant-core/parsers/ruby/call_graph_builder.py index 3c4b3ea..c627d77 100644 --- a/libs/openant-core/parsers/ruby/call_graph_builder.py +++ b/libs/openant-core/parsers/ruby/call_graph_builder.py @@ -441,7 +441,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: + with open(args.input_file, encoding="utf-8") as f: extractor_output = json.load(f) print(f"Processing {len(extractor_output.get('functions', {}))} functions...", file=sys.stderr) @@ -462,7 +462,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Output written to: {args.output}", file=sys.stderr) else: diff --git a/libs/openant-core/parsers/ruby/function_extractor.py b/libs/openant-core/parsers/ruby/function_extractor.py index f2f1dc3..2335ae2 100644 --- a/libs/openant-core/parsers/ruby/function_extractor.py +++ b/libs/openant-core/parsers/ruby/function_extractor.py @@ -444,7 +444,7 @@ def main(): extractor = FunctionExtractor(args.repo_path) if args.scan_file: - with open(args.scan_file) as f: + with open(args.scan_file, encoding="utf-8") as f: scan_result = json.load(f) result = extractor.extract_from_scan(scan_result) else: @@ -453,7 +453,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Extraction complete. Results written to: {args.output}", file=sys.stderr) print(f"Total functions: {result['statistics']['total_functions']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/ruby/repository_scanner.py b/libs/openant-core/parsers/ruby/repository_scanner.py index 65b9a14..b02d456 100644 --- a/libs/openant-core/parsers/ruby/repository_scanner.py +++ b/libs/openant-core/parsers/ruby/repository_scanner.py @@ -240,7 +240,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"Scan complete. Results written to: {args.output}", file=sys.stderr) print(f"Total files found: {result['statistics']['total_files']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/ruby/test_pipeline.py b/libs/openant-core/parsers/ruby/test_pipeline.py index cffe880..a38b66d 100644 --- a/libs/openant-core/parsers/ruby/test_pipeline.py +++ b/libs/openant-core/parsers/ruby/test_pipeline.py @@ -139,7 +139,7 @@ def run_parser_pipeline(self) -> bool: # Save scan results self.scan_results_file = os.path.join(self.output_dir, 'scan_results.json') - with open(self.scan_results_file, 'w') as f: + with open(self.scan_results_file, 'w', encoding="utf-8") as f: json.dump(scan_result, f, indent=2) # Stage 2: Extract functions @@ -178,12 +178,12 @@ def run_parser_pipeline(self) -> bool: print(f" Avg upstream deps: {dataset['statistics']['avg_upstream']}") # Write dataset - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) # Write analyzer output analyzer_output = generator.generate_analyzer_output() - with open(self.analyzer_output_file, 'w') as f: + with open(self.analyzer_output_file, 'w', encoding="utf-8") as f: json.dump(analyzer_output, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -242,7 +242,7 @@ def apply_reachability_filter(self) -> bool: start_time = datetime.now() try: - with open(self.analyzer_output_file, 'r') as f: + with open(self.analyzer_output_file, 'r', encoding="utf-8") as f: analyzer = json.load(f) functions = analyzer.get("functions", {}) @@ -262,7 +262,7 @@ def apply_reachability_filter(self) -> bool: } # Build call graph from dataset unit metadata - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) call_graph = {} @@ -313,7 +313,7 @@ def apply_reachability_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -443,7 +443,7 @@ def run_codeql_analysis(self) -> bool: } return False - with open(sarif_output, 'r') as f: + with open(sarif_output, 'r', encoding="utf-8") as f: sarif_data = json.load(f) self.codeql_findings = [] @@ -555,7 +555,7 @@ def apply_codeql_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) # Build mapping of file -> [(start_line, end_line, func_id)] @@ -605,7 +605,7 @@ def apply_codeql_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -662,7 +662,7 @@ def run_context_enhancer(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) enhancer = ContextEnhancer() @@ -695,7 +695,7 @@ def run_context_enhancer(self) -> bool: 'data_flows_extracted': enhancer.stats['data_flows_extracted'] } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(enhanced, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -740,7 +740,7 @@ def apply_exploitable_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: + with open(self.dataset_file, 'r', encoding="utf-8") as f: dataset = json.load(f) units = dataset.get("units", []) @@ -767,7 +767,7 @@ def apply_exploitable_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: + with open(self.dataset_file, 'w', encoding="utf-8") as f: json.dump(dataset, f, indent=2) elapsed = (datetime.now() - start_time).total_seconds() @@ -908,7 +908,7 @@ def run_full_pipeline(self): # Save results summary results_file = os.path.join(self.output_dir, 'pipeline_results.json') - with open(results_file, 'w') as f: + with open(results_file, 'w', encoding="utf-8") as f: clean_results = { 'repository': self.results['repository'], 'test_time': self.results['test_time'], diff --git a/libs/openant-core/parsers/ruby/unit_generator.py b/libs/openant-core/parsers/ruby/unit_generator.py index 184a221..393e136 100644 --- a/libs/openant-core/parsers/ruby/unit_generator.py +++ b/libs/openant-core/parsers/ruby/unit_generator.py @@ -344,7 +344,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: + with open(args.input_file, encoding="utf-8") as f: call_graph_data = json.load(f) options = { @@ -374,7 +374,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open(args.output, 'w', encoding="utf-8") as f: f.write(output) print(f"\nOutput written to: {args.output}", file=sys.stderr) else: @@ -383,7 +383,7 @@ def main(): # Write analyzer output if requested if args.analyzer_output: analyzer = generator.generate_analyzer_output() - with open(args.analyzer_output, 'w') as f: + with open(args.analyzer_output, 'w', encoding="utf-8") as f: json.dump(analyzer, f, indent=2) print(f"Analyzer output written to: {args.analyzer_output}", file=sys.stderr) diff --git a/libs/openant-core/parsers/zig/call_graph_builder.py b/libs/openant-core/parsers/zig/call_graph_builder.py index 52f661d..98ce3f8 100644 --- a/libs/openant-core/parsers/zig/call_graph_builder.py +++ b/libs/openant-core/parsers/zig/call_graph_builder.py @@ -321,5 +321,5 @@ def _resolve_call( def save_results(self, output_path: str, results: Dict[str, Any]) -> None: """Save call graph to a JSON file.""" - with open(output_path, "w") as f: + with open(output_path, "w", encoding="utf-8") as f: json.dump(results, f, indent=2) diff --git a/libs/openant-core/parsers/zig/function_extractor.py b/libs/openant-core/parsers/zig/function_extractor.py index f3348a0..ce7772c 100644 --- a/libs/openant-core/parsers/zig/function_extractor.py +++ b/libs/openant-core/parsers/zig/function_extractor.py @@ -276,5 +276,5 @@ def _classify_function(self, name: str, file_path: str) -> str: def save_results(self, output_path: str, results: Dict[str, Any]) -> None: """Save extraction results to a JSON file.""" - with open(output_path, "w") as f: + with open(output_path, "w", encoding="utf-8") as f: json.dump(results, f, indent=2) diff --git a/libs/openant-core/parsers/zig/repository_scanner.py b/libs/openant-core/parsers/zig/repository_scanner.py index ae09564..93542b0 100644 --- a/libs/openant-core/parsers/zig/repository_scanner.py +++ b/libs/openant-core/parsers/zig/repository_scanner.py @@ -131,5 +131,5 @@ def _is_test_file(self, filepath: str) -> bool: def save_results(self, output_path: str, results: Dict[str, Any]) -> None: """Save scan results to a JSON file.""" - with open(output_path, "w") as f: + with open(output_path, "w", encoding="utf-8") as f: json.dump(results, f, indent=2) diff --git a/libs/openant-core/parsers/zig/test_pipeline.py b/libs/openant-core/parsers/zig/test_pipeline.py index b4a9832..f1db48c 100644 --- a/libs/openant-core/parsers/zig/test_pipeline.py +++ b/libs/openant-core/parsers/zig/test_pipeline.py @@ -96,9 +96,9 @@ def main(): "statistics": {"total_units": 0, "by_type": {}}, "metadata": {"generator": "zig_unit_generator.py"}, } - with open(output_dir / "dataset.json", "w") as f: + with open(output_dir / "dataset.json", "w", encoding="utf-8") as f: json.dump(empty_dataset, f, indent=2) - with open(output_dir / "analyzer_output.json", "w") as f: + with open(output_dir / "analyzer_output.json", "w", encoding="utf-8") as f: json.dump({"repository": str(repo_path), "functions": {}}, f, indent=2) return 0 diff --git a/libs/openant-core/parsers/zig/unit_generator.py b/libs/openant-core/parsers/zig/unit_generator.py index de1ce1c..d83ab5a 100644 --- a/libs/openant-core/parsers/zig/unit_generator.py +++ b/libs/openant-core/parsers/zig/unit_generator.py @@ -246,8 +246,8 @@ def save_results( output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) - with open(output_path / "dataset.json", "w") as f: + with open(output_path / "dataset.json", "w", encoding="utf-8") as f: json.dump(dataset, f, indent=2) - with open(output_path / "analyzer_output.json", "w") as f: + with open(output_path / "analyzer_output.json", "w", encoding="utf-8") as f: json.dump(analyzer_output, f, indent=2) diff --git a/libs/openant-core/tests/test_file_io.py b/libs/openant-core/tests/test_file_io.py new file mode 100644 index 0000000..89b763f --- /dev/null +++ b/libs/openant-core/tests/test_file_io.py @@ -0,0 +1,288 @@ +"""Tests for utilities.file_io UTF-8 helpers and a regression scan.""" + +from __future__ import annotations + +import json +import os +import re +import subprocess +import sys +from pathlib import Path + +import pytest + +CORE_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(CORE_ROOT)) + +from utilities.file_io import open_utf8, read_json, run_utf8, write_json # noqa: E402 + + +NON_ASCII = "héllo 日本語 — café" + + +# --------------------------------------------------------------------------- +# Helper unit tests +# --------------------------------------------------------------------------- + +def test_open_utf8_round_trip(tmp_path: Path): + p = tmp_path / "x.txt" + with open_utf8(p, "w") as f: + f.write(NON_ASCII) + with open_utf8(p) as f: + assert f.read() == NON_ASCII + + +def test_open_utf8_passes_through_binary_mode(tmp_path: Path): + """Binary mode should not get encoding= injected.""" + p = tmp_path / "raw.bin" + payload = NON_ASCII.encode("utf-8") + with open_utf8(p, "wb") as f: + f.write(payload) + with open_utf8(p, "rb") as f: + assert f.read() == payload + + +def test_open_utf8_caller_encoding_wins(tmp_path: Path): + """If caller explicitly passes encoding=, helper must not override it.""" + p = tmp_path / "y.txt" + p.write_bytes("café".encode("latin-1")) + with open_utf8(p, encoding="latin-1") as f: + assert f.read() == "café" + + +def test_read_json_round_trip(tmp_path: Path): + p = tmp_path / "data.json" + obj = {"greeting": NON_ASCII, "list": ["a", NON_ASCII, "b"]} + write_json(p, obj) + assert read_json(p) == obj + + +def test_write_json_uses_utf8(tmp_path: Path): + """write_json must encode non-ASCII as UTF-8 bytes (not cp1252).""" + p = tmp_path / "data.json" + write_json(p, {"k": NON_ASCII}) + raw = p.read_bytes() + # The non-ASCII characters should appear as their UTF-8 encoding (or as + # JSON-escaped \uXXXX sequences — both are valid; the key is that the + # file does not contain a cp1252-encoded ?-replacement). + decoded = raw.decode("utf-8") + parsed = json.loads(decoded) + assert parsed["k"] == NON_ASCII + + +def test_write_json_default_indent(tmp_path: Path): + """write_json should pretty-print by default for human readability.""" + p = tmp_path / "data.json" + write_json(p, {"a": 1, "b": 2}) + text = p.read_text(encoding="utf-8") + # Indented output spans multiple lines. + assert "\n" in text + + +# --------------------------------------------------------------------------- +# run_utf8 subprocess test +# --------------------------------------------------------------------------- + +def test_run_utf8_captures_non_ascii_text(): + """run_utf8 with text=True must decode UTF-8 stdout without raising on cp1252.""" + code = ( + "import sys; " + "sys.stdout.buffer.write('" + + NON_ASCII + + "'.encode('utf-8'))" + ) + result = run_utf8( + [sys.executable, "-c", code], + capture_output=True, + text=True, + timeout=30, + ) + assert result.returncode == 0 + assert result.stdout == NON_ASCII + + +def test_run_utf8_universal_newlines_alias(tmp_path: Path): + """universal_newlines=True is an alias for text=True; must also get UTF-8.""" + code = ( + "import sys; " + "sys.stdout.buffer.write('" + + NON_ASCII + + "'.encode('utf-8'))" + ) + result = run_utf8( + [sys.executable, "-c", code], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + timeout=30, + ) + assert result.returncode == 0 + assert result.stdout == NON_ASCII + + +def test_run_utf8_invalid_bytes_replaced_not_raised(): + """errors='replace' default means invalid bytes don't raise.""" + code = ( + "import sys; " + "sys.stdout.buffer.write(b'good\\x9d_bad')" + ) + result = run_utf8( + [sys.executable, "-c", code], + capture_output=True, + text=True, + timeout=30, + ) + assert result.returncode == 0 + # Invalid byte 0x9d is replaced by U+FFFD rather than raising. + assert "good" in result.stdout + assert "bad" in result.stdout + + +def test_run_utf8_caller_can_override_errors_default_strict(): + """Without text=True, run_utf8 should not inject errors='replace'. + + Confirms that the encoding/errors injection only fires for text-mode + captures, leaving binary subprocess invocations untouched. + """ + result = run_utf8( + [sys.executable, "-c", "import sys; sys.stdout.buffer.write(b'\\x9d')"], + capture_output=True, + timeout=30, + ) + assert result.returncode == 0 + assert result.stdout == b"\x9d" + + +def test_run_utf8_does_not_override_explicit_encoding(): + """If caller passes encoding= explicitly, run_utf8 must not overwrite it.""" + result = run_utf8( + [sys.executable, "-c", "print('caf\\xe9')"], + capture_output=True, + text=True, + encoding="latin-1", + timeout=30, + ) + assert result.returncode == 0 + assert "café" in result.stdout + + +# --------------------------------------------------------------------------- +# Regression scan: no bare open() calls reappear in non-test code +# --------------------------------------------------------------------------- + +def _iter_python_sources(root: Path): + for p in root.rglob("*.py"): + rel = p.relative_to(root).as_posix() + if rel.startswith("tests/"): + continue + if rel == "utilities/file_io.py": + continue + # Skip vendored/build artifacts + if any(part in {".venv", "venv", "build", "dist", "__pycache__"} for part in p.parts): + continue + yield p + + +_OPEN_CALL_RE = re.compile(r"(? str: + """Replace string literals and comments with spaces so identifier matches inside + docstrings/comments don't trigger the regression check.""" + out = [] + i = 0 + n = len(text) + in_str = None + triple = False + while i < n: + c = text[i] + if in_str: + if c == "\\" and not triple: + out.append(" ") + i += 2 + continue + if triple and text[i:i + 3] == in_str: + out.append(" ") + in_str = None + triple = False + i += 3 + continue + if not triple and c == in_str: + in_str = None + out.append(" ") + i += 1 + continue + if not triple and c == "\n": + in_str = None + out.append("\n") + i += 1 + continue + out.append("\n" if c == "\n" else " ") + i += 1 + continue + if c == "#": + nl = text.find("\n", i) + if nl == -1: + out.append(" " * (n - i)) + break + out.append(" " * (nl - i)) + i = nl + continue + if text[i:i + 3] in ('"""', "'''"): + in_str = text[i:i + 3] + triple = True + out.append(" ") + i += 3 + continue + if c in ("'", '"'): + in_str = c + out.append(" ") + i += 1 + continue + out.append(c) + i += 1 + return "".join(out) + + +def _has_encoding(call_args: str) -> bool: + return re.search(r"\bencoding\s*=", call_args) is not None + + +def _has_binary_mode(call_args: str) -> bool: + return re.search(r"""(['"])([rwax+]*b[rwax+]*)\1""", call_args) is not None + + +def test_no_bare_open_in_non_test_code(): + """Regression: every text-mode `open(` call in non-test code must specify + encoding=, otherwise Windows defaults to cp1252 and crashes on non-ASCII + source code. + """ + offenders: list[str] = [] + for path in _iter_python_sources(CORE_ROOT): + text = path.read_text(encoding="utf-8") + scrubbed = _strip_strings_and_comments(text) + for m in _OPEN_CALL_RE.finditer(scrubbed): + # Find matching close paren in the SCRUBBED text (parens preserved). + i = m.end() + depth = 1 + while i < len(scrubbed) and depth: + ch = scrubbed[i] + if ch == "(": + depth += 1 + elif ch == ")": + depth -= 1 + i += 1 + if depth != 0: + continue + args = text[m.end():i - 1] + if _has_binary_mode(args) or _has_encoding(args): + continue + line = text[:m.start()].count("\n") + 1 + rel = path.relative_to(CORE_ROOT).as_posix() + offenders.append(f"{rel}:{line}: {text.splitlines()[line - 1].strip()}") + + assert not offenders, ( + "Found bare open() calls without encoding= in non-test code. " + "Use utilities.file_io.open_utf8 / read_json / write_json or pass " + "encoding='utf-8' explicitly:\n " + "\n ".join(offenders) + ) diff --git a/libs/openant-core/utilities/agentic_enhancer/repository_index.py b/libs/openant-core/utilities/agentic_enhancer/repository_index.py index 06ef199..8487d80 100644 --- a/libs/openant-core/utilities/agentic_enhancer/repository_index.py +++ b/libs/openant-core/utilities/agentic_enhancer/repository_index.py @@ -283,7 +283,7 @@ def load_index_from_file(analyzer_output_path: str, repo_path: str = None) -> Re Returns: RepositoryIndex instance """ - with open(analyzer_output_path, 'r') as f: + with open(analyzer_output_path, 'r', encoding="utf-8") as f: analyzer_output = json.load(f) return RepositoryIndex(analyzer_output, repo_path) diff --git a/libs/openant-core/utilities/context_enhancer.py b/libs/openant-core/utilities/context_enhancer.py index 2ffbfe6..cdd9192 100644 --- a/libs/openant-core/utilities/context_enhancer.py +++ b/libs/openant-core/utilities/context_enhancer.py @@ -504,7 +504,7 @@ def enhance_dataset_agentic( if unit_id in processed_ids: cp_file = os.path.join(checkpoint_dir, f"{self._safe_filename(unit_id)}.json") if os.path.exists(cp_file): - with open(cp_file, 'r') as f: + with open(cp_file, 'r', encoding="utf-8") as f: cp_data = json.load(f) unit["agent_context"] = cp_data.get("agent_context", {}) if "code" in cp_data: @@ -538,7 +538,7 @@ def enhance_dataset_agentic( if not os.path.exists(cp_file): continue try: - with open(cp_file, 'r') as f: + with open(cp_file, 'r', encoding="utf-8") as f: cp_data = json.load(f) # Sum usage from all existing checkpoints (completed + errored) cp_usage = cp_data.get("usage", {}) @@ -792,7 +792,7 @@ def _save_unit_checkpoint(self, unit: dict, checkpoint_dir: str): "output_tokens": meta.get("output_tokens", 0), "cost_usd": meta.get("cost_usd", 0.0), } - with open(filepath, 'w') as f: + with open(filepath, 'w', encoding="utf-8") as f: json.dump(cp_data, f, indent=2) def _load_completed_units(self, checkpoint_dir: str) -> set: @@ -805,7 +805,7 @@ def _load_completed_units(self, checkpoint_dir: str) -> set: continue filepath = os.path.join(checkpoint_dir, filename) try: - with open(filepath, 'r') as f: + with open(filepath, 'r', encoding="utf-8") as f: cp_data = json.load(f) unit_id = cp_data.get("id") agent_ctx = cp_data.get("agent_context", {}) @@ -818,7 +818,7 @@ def _load_completed_units(self, checkpoint_dir: str) -> set: def _migrate_legacy_checkpoint(self, checkpoint_path: str, checkpoint_dir: str, units: list): """Migrate a legacy single-file checkpoint to per-unit checkpoint files.""" try: - with open(checkpoint_path, 'r') as f: + with open(checkpoint_path, 'r', encoding="utf-8") as f: checkpoint_data = json.load(f) for cp_unit in checkpoint_data.get("units", []): if cp_unit.get("agent_context") and not cp_unit["agent_context"].get("error"): @@ -998,7 +998,7 @@ def main(): logging.error(f"Error: Input file not found: {input_path}") return 1 - with open(input_path, 'r') as f: + with open(input_path, 'r', encoding="utf-8") as f: dataset = json.load(f) # Enhance @@ -1029,7 +1029,7 @@ def main(): # Write output output_path = Path(args.output) if args.output else input_path - with open(output_path, 'w') as f: + with open(output_path, 'w', encoding="utf-8") as f: json.dump(enhanced, f, indent=2) logging.info(f"Enhanced dataset written to: {output_path}") diff --git a/libs/openant-core/utilities/dynamic_tester/__init__.py b/libs/openant-core/utilities/dynamic_tester/__init__.py index e533f6c..9ec2625 100644 --- a/libs/openant-core/utilities/dynamic_tester/__init__.py +++ b/libs/openant-core/utilities/dynamic_tester/__init__.py @@ -45,7 +45,7 @@ def run_dynamic_tests( List of DynamicTestResult objects """ # Load pipeline output - with open(pipeline_output_path, "r") as f: + with open(pipeline_output_path, "r", encoding="utf-8") as f: pipeline = json.load(f) findings = pipeline.get("findings", []) @@ -253,13 +253,13 @@ def run_dynamic_tests( report_md = generate_report(results, repo_info["name"], total_cost) report_path = os.path.join(output_dir, "DYNAMIC_TEST_RESULTS.md") - with open(report_path, "w") as f: + with open(report_path, "w", encoding="utf-8") as f: f.write(report_md) print(f"\nReport written to {report_path}", file=sys.stderr) # Save structured results JSON results_path = os.path.join(output_dir, "dynamic_test_results.json") - with open(results_path, "w") as f: + with open(results_path, "w", encoding="utf-8") as f: json.dump({ "repository": repo_info["name"], "total_findings": len(findings), diff --git a/libs/openant-core/utilities/dynamic_tester/docker_executor.py b/libs/openant-core/utilities/dynamic_tester/docker_executor.py index 04a45d3..d8459a5 100644 --- a/libs/openant-core/utilities/dynamic_tester/docker_executor.py +++ b/libs/openant-core/utilities/dynamic_tester/docker_executor.py @@ -74,14 +74,14 @@ def _write_test_files(work_dir: str, generation: dict, source_file: str | None = shutil.copy2(source_file, os.path.join(work_dir, os.path.basename(source_file))) # Write Dockerfile - with open(os.path.join(work_dir, "Dockerfile"), "w") as f: + with open(os.path.join(work_dir, "Dockerfile"), "w", encoding="utf-8") as f: f.write(generation["dockerfile"]) # Write test script test_filename = generation.get("test_filename", "test_exploit.py") test_path = os.path.join(work_dir, test_filename) os.makedirs(os.path.dirname(test_path), exist_ok=True) - with open(test_path, "w") as f: + with open(test_path, "w", encoding="utf-8") as f: f.write(generation["test_script"]) # Write requirements/dependencies file @@ -89,7 +89,7 @@ def _write_test_files(work_dir: str, generation: dict, source_file: str | None = req_filename = generation.get("requirements_filename", "requirements.txt") req_path = os.path.join(work_dir, req_filename) os.makedirs(os.path.dirname(req_path), exist_ok=True) - with open(req_path, "w") as f: + with open(req_path, "w", encoding="utf-8") as f: f.write(generation["requirements"]) # Copy attacker server if needed (before docker-compose so it's available) @@ -98,14 +98,14 @@ def _write_test_files(work_dir: str, generation: dict, source_file: str | None = os.makedirs(attacker_dir, exist_ok=True) shutil.copy2(ATTACKER_SERVER_PATH, os.path.join(attacker_dir, "server.py")) # Write attacker Dockerfile - with open(os.path.join(attacker_dir, "Dockerfile"), "w") as f: + with open(os.path.join(attacker_dir, "Dockerfile"), "w", encoding="utf-8") as f: f.write("FROM python:3.11-slim\nWORKDIR /app\nCOPY server.py .\n" "EXPOSE 9999\nCMD [\"python\", \"server.py\"]\n") # Write docker-compose if multi-service, with sanitization if generation.get("docker_compose"): compose_content = _sanitize_compose(generation["docker_compose"]) - with open(os.path.join(work_dir, "docker-compose.yml"), "w") as f: + with open(os.path.join(work_dir, "docker-compose.yml"), "w", encoding="utf-8") as f: f.write(compose_content) @@ -116,6 +116,8 @@ def _run_command(cmd: list[str], timeout: int, cwd: str = None) -> tuple[str, st cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=timeout, cwd=cwd, ) diff --git a/libs/openant-core/utilities/file_io.py b/libs/openant-core/utilities/file_io.py new file mode 100644 index 0000000..bc8d22f --- /dev/null +++ b/libs/openant-core/utilities/file_io.py @@ -0,0 +1,60 @@ +"""Centralized file I/O and subprocess helpers for Windows UTF-8 compatibility. + +On Windows, Python's default encoding is often ``cp1252`` (charmap), which +cannot decode common UTF-8 sequences found in source code. These thin +wrappers ensure that every file open and subprocess call uses UTF-8 +explicitly, preventing ``'charmap' codec can't decode byte ...`` errors. +""" + +import json +import os +import subprocess +from typing import Any, Union + +# Accept str, Path, or any os.PathLike +PathLike = Union[str, os.PathLike] + + +def open_utf8(path: PathLike, mode: str = "r", **kwargs): + """Open a file with UTF-8 encoding by default. + + Drop-in replacement for ``open()`` that sets ``encoding='utf-8'`` unless + the caller explicitly provides a different encoding or opens in binary + mode. + """ + if "b" not in mode and "encoding" not in kwargs: + kwargs["encoding"] = "utf-8" + return open(path, mode, **kwargs) + + +def read_json(path: PathLike) -> Any: + """Read and parse a JSON file using UTF-8 encoding.""" + with open_utf8(path, "r") as f: + return json.load(f) + + +def write_json(path: PathLike, data: Any, **kwargs) -> None: + """Write data as JSON to a file using UTF-8 encoding.""" + kwargs.setdefault("indent", 2) + with open_utf8(path, "w") as f: + json.dump(data, f, **kwargs) + + +def run_utf8(*args, **kwargs) -> subprocess.CompletedProcess: + """Run a subprocess with UTF-8 encoding for text mode. + + Wrapper around ``subprocess.run`` that sets ``encoding='utf-8'`` and + ``errors='replace'`` when ``text=True`` (or its alias + ``universal_newlines=True``) is passed, preventing charmap decode errors + on Windows. + + Note: ``errors='replace'`` substitutes U+FFFD for invalid bytes in + stdout/stderr rather than raising. This is intentional - subprocess + output is used for status display and diagnostics, not for security + analysis (parser results are read from JSON files separately). + Callers can override with ``errors='strict'`` if needed. + """ + if kwargs.get("text") or kwargs.get("universal_newlines"): + kwargs.setdefault("encoding", "utf-8") + kwargs.setdefault("errors", "replace") + return subprocess.run(*args, **kwargs) diff --git a/libs/openant-core/validate_dataset_schema.py b/libs/openant-core/validate_dataset_schema.py index 1312bce..8d884dd 100755 --- a/libs/openant-core/validate_dataset_schema.py +++ b/libs/openant-core/validate_dataset_schema.py @@ -61,7 +61,7 @@ def validate_unit(unit, index): def validate_dataset(path): - with open(path) as f: + with open(path, encoding="utf-8") as f: data = json.load(f) all_errors = [] From 19b0af65ddccb09eaf94067bc446a81b743c5ac6 Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Mon, 4 May 2026 23:10:24 +0300 Subject: [PATCH 2/2] fix: cover Path.read_text/write_text and subprocess.run text-mode for UTF-8 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 1 review fixes for PR #45: - application_context.py, ast_parser.py, dataset_enhancer.py, report/__main__.py, report/generator.py: pass encoding='utf-8' on every Path.read_text() / write_text() call. The previous migration only covered open() calls; pathlib's text helpers also default to the system locale on Windows (cp1252) and crash on non-ASCII source code. - parsers/{c,go,javascript,php,ruby}/test_pipeline.py: pass encoding='utf-8', errors='replace' on subprocess.run(text=True) invocations of parser binaries and CodeQL. Only docker_executor.py was migrated before; these other call sites had the same Windows cp1252 hazard. - tests/test_file_io.py: extend regression scan with two new asserts — Path.read_text/write_text without encoding=, and subprocess.run(text=True) without encoding=. Refactored the call-walking logic into a shared helper. All 14 file_io tests pass; full tests/ suite: 98 passed, 22 skipped. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../context/application_context.py | 8 +- libs/openant-core/parsers/c/test_pipeline.py | 4 + libs/openant-core/parsers/go/test_pipeline.py | 10 +- .../parsers/javascript/test_pipeline.py | 8 ++ .../openant-core/parsers/php/test_pipeline.py | 4 + .../openant-core/parsers/python/ast_parser.py | 4 +- .../parsers/python/dataset_enhancer.py | 2 +- .../parsers/ruby/test_pipeline.py | 4 + libs/openant-core/report/__main__.py | 8 +- libs/openant-core/report/generator.py | 10 +- libs/openant-core/tests/test_file_io.py | 104 +++++++++++++++--- 11 files changed, 133 insertions(+), 33 deletions(-) diff --git a/libs/openant-core/context/application_context.py b/libs/openant-core/context/application_context.py index 173a814..bb2109f 100644 --- a/libs/openant-core/context/application_context.py +++ b/libs/openant-core/context/application_context.py @@ -208,7 +208,7 @@ def gather_context_sources(repo_path: Path) -> dict[str, str]: filepath = repo_path / filename if filepath.exists(): try: - content = filepath.read_text(errors="ignore") + content = filepath.read_text(encoding="utf-8", errors="ignore") # Limit size to avoid token overflow if len(content) > 10000: content = content[:10000] + "\n\n[... truncated ...]" @@ -289,7 +289,7 @@ def detect_entry_points(repo_path: Path) -> str: continue try: - content = py_file.read_text(errors="ignore") + content = py_file.read_text(encoding="utf-8", errors="ignore") rel_path = py_file.relative_to(repo_path) for category, patterns in ENTRY_POINT_PATTERNS.items(): @@ -308,7 +308,7 @@ def detect_entry_points(repo_path: Path) -> str: continue try: - content = js_file.read_text(errors="ignore") + content = js_file.read_text(encoding="utf-8", errors="ignore") rel_path = js_file.relative_to(repo_path) if re.search(r"express\(\)|require\(['\"]express['\"]\)", content): @@ -340,7 +340,7 @@ def check_manual_override(repo_path: Path) -> ApplicationContext | None: continue try: - content = filepath.read_text() + content = filepath.read_text(encoding="utf-8") if filename.endswith('.json'): # Direct JSON format diff --git a/libs/openant-core/parsers/c/test_pipeline.py b/libs/openant-core/parsers/c/test_pipeline.py index 0475cd9..ec8bf53 100644 --- a/libs/openant-core/parsers/c/test_pipeline.py +++ b/libs/openant-core/parsers/c/test_pipeline.py @@ -383,6 +383,8 @@ def run_codeql_analysis(self) -> bool: create_db_cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=600 ) @@ -414,6 +416,8 @@ def run_codeql_analysis(self) -> bool: analyze_cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=1800 ) diff --git a/libs/openant-core/parsers/go/test_pipeline.py b/libs/openant-core/parsers/go/test_pipeline.py index 7aa9880..7f59a9c 100644 --- a/libs/openant-core/parsers/go/test_pipeline.py +++ b/libs/openant-core/parsers/go/test_pipeline.py @@ -119,7 +119,9 @@ def setup(self): ['go', 'build', '-o', 'go_parser', '.'], cwd=go_parser_dir, capture_output=True, - text=True + text=True, + encoding="utf-8", + errors="replace", ) if result.returncode != 0: print(f"Error building Go parser: {result.stderr}") @@ -144,6 +146,8 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: command, capture_output=True, text=True, + encoding="utf-8", + errors="replace", cwd=self.parser_dir ) @@ -438,6 +442,8 @@ def run_codeql_analysis(self) -> bool: create_db_cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=600 # 10 minute timeout ) @@ -469,6 +475,8 @@ def run_codeql_analysis(self) -> bool: analyze_cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=1800 # 30 minute timeout ) diff --git a/libs/openant-core/parsers/javascript/test_pipeline.py b/libs/openant-core/parsers/javascript/test_pipeline.py index abd7815..d614ed6 100644 --- a/libs/openant-core/parsers/javascript/test_pipeline.py +++ b/libs/openant-core/parsers/javascript/test_pipeline.py @@ -130,6 +130,8 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: command, capture_output=True, text=True, + encoding="utf-8", + errors="replace", cwd=self.parser_dir ) @@ -293,6 +295,8 @@ def run_stage_with_stdout_capture(self, name: str, command: list, output_file: s command, capture_output=True, text=True, + encoding="utf-8", + errors="replace", cwd=self.parser_dir ) @@ -646,6 +650,8 @@ def run_codeql_analysis(self) -> bool: create_db_cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=600 # 10 minute timeout ) @@ -677,6 +683,8 @@ def run_codeql_analysis(self) -> bool: analyze_cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=1800 # 30 minute timeout ) diff --git a/libs/openant-core/parsers/php/test_pipeline.py b/libs/openant-core/parsers/php/test_pipeline.py index e2f9eb2..566b13a 100644 --- a/libs/openant-core/parsers/php/test_pipeline.py +++ b/libs/openant-core/parsers/php/test_pipeline.py @@ -383,6 +383,8 @@ def run_codeql_analysis(self) -> bool: create_db_cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=600 ) @@ -414,6 +416,8 @@ def run_codeql_analysis(self) -> bool: analyze_cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=1800 ) diff --git a/libs/openant-core/parsers/python/ast_parser.py b/libs/openant-core/parsers/python/ast_parser.py index 64105a5..03d05aa 100644 --- a/libs/openant-core/parsers/python/ast_parser.py +++ b/libs/openant-core/parsers/python/ast_parser.py @@ -35,7 +35,7 @@ def detect_framework(self) -> str: for f in files: try: - content = f.read_text() + content = f.read_text(encoding="utf-8", errors="replace") if "from django" in content or "django.urls" in content: return "django" if "from flask" in content or "Flask(" in content: @@ -76,7 +76,7 @@ def _read_file(self, file_path: Path) -> str: path_str = str(file_path) if path_str not in self.file_cache: try: - self.file_cache[path_str] = file_path.read_text() + self.file_cache[path_str] = file_path.read_text(encoding="utf-8", errors="replace") except Exception as e: print(f"Error reading {file_path}: {e}") self.file_cache[path_str] = "" diff --git a/libs/openant-core/parsers/python/dataset_enhancer.py b/libs/openant-core/parsers/python/dataset_enhancer.py index 340a2f8..1b1f579 100644 --- a/libs/openant-core/parsers/python/dataset_enhancer.py +++ b/libs/openant-core/parsers/python/dataset_enhancer.py @@ -29,7 +29,7 @@ def _read_file(self, file_path: Path) -> str: path_str = str(file_path) if path_str not in self.file_cache: try: - self.file_cache[path_str] = file_path.read_text() + self.file_cache[path_str] = file_path.read_text(encoding="utf-8", errors="replace") except Exception as e: self.file_cache[path_str] = "" return self.file_cache[path_str] diff --git a/libs/openant-core/parsers/ruby/test_pipeline.py b/libs/openant-core/parsers/ruby/test_pipeline.py index a38b66d..2eb0901 100644 --- a/libs/openant-core/parsers/ruby/test_pipeline.py +++ b/libs/openant-core/parsers/ruby/test_pipeline.py @@ -383,6 +383,8 @@ def run_codeql_analysis(self) -> bool: create_db_cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=600 ) @@ -414,6 +416,8 @@ def run_codeql_analysis(self) -> bool: analyze_cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=1800 ) diff --git a/libs/openant-core/report/__main__.py b/libs/openant-core/report/__main__.py index fbe6515..a54be63 100644 --- a/libs/openant-core/report/__main__.py +++ b/libs/openant-core/report/__main__.py @@ -19,7 +19,7 @@ def cmd_summary(args): """Generate summary report.""" - pipeline_data = json.loads(Path(args.input).read_text()) + pipeline_data = json.loads(Path(args.input).read_text(encoding="utf-8")) try: validate_pipeline_output(pipeline_data) @@ -32,14 +32,14 @@ def cmd_summary(args): output_path = Path(args.output) if args.output else Path("SUMMARY_REPORT.md") output_path.parent.mkdir(parents=True, exist_ok=True) - output_path.write_text(report) + output_path.write_text(report, encoding="utf-8") print(f" -> {output_path}") print(f" Cost: ${usage['cost_usd']:.4f} ({usage['total_tokens']:,} tokens)") def cmd_disclosures(args): """Generate disclosure documents.""" - pipeline_data = json.loads(Path(args.input).read_text()) + pipeline_data = json.loads(Path(args.input).read_text(encoding="utf-8")) try: validate_pipeline_output(pipeline_data) @@ -62,7 +62,7 @@ def cmd_disclosures(args): safe_name = finding["short_name"].replace(" ", "_").upper() filename = f"DISCLOSURE_{i:02d}_{safe_name}.md" - (output_dir / filename).write_text(disclosure) + (output_dir / filename).write_text(disclosure, encoding="utf-8") print(f" -> {output_dir / filename}") count += 1 diff --git a/libs/openant-core/report/generator.py b/libs/openant-core/report/generator.py index c996250..931f9c3 100644 --- a/libs/openant-core/report/generator.py +++ b/libs/openant-core/report/generator.py @@ -63,7 +63,7 @@ def _check_api_key(): def load_prompt(name: str) -> str: """Load a prompt template from the prompts directory.""" - return (PROMPTS_DIR / f"{name}.txt").read_text() + return (PROMPTS_DIR / f"{name}.txt").read_text(encoding="utf-8") def merge_dynamic_results(pipeline_data: dict, pipeline_path: str) -> dict: @@ -76,7 +76,7 @@ def merge_dynamic_results(pipeline_data: dict, pipeline_path: str) -> dict: if not dynamic_path.exists(): return pipeline_data - dynamic_data = json.loads(dynamic_path.read_text()) + dynamic_data = json.loads(dynamic_path.read_text(encoding="utf-8")) results_by_id = {} for result in dynamic_data.get("results", []): fid = result.get("finding_id") @@ -233,7 +233,7 @@ def generate_disclosure(vulnerability_data: dict, product_name: str) -> tuple[st def generate_all(pipeline_path: str, output_dir: str) -> None: """Generate all reports from a pipeline output file.""" - pipeline_data = json.loads(Path(pipeline_path).read_text()) + pipeline_data = json.loads(Path(pipeline_path).read_text(encoding="utf-8")) try: validate_pipeline_output(pipeline_data) @@ -247,7 +247,7 @@ def generate_all(pipeline_path: str, output_dir: str) -> None: # Generate summary report print("Generating summary report...") summary, _usage = generate_summary_report(pipeline_data) - (output_path / "SUMMARY_REPORT.md").write_text(summary) + (output_path / "SUMMARY_REPORT.md").write_text(summary, encoding="utf-8") print(f" -> {output_path / 'SUMMARY_REPORT.md'}") # Generate disclosure for each confirmed vulnerability @@ -265,7 +265,7 @@ def generate_all(pipeline_path: str, output_dir: str) -> None: safe_name = finding["short_name"].replace(" ", "_").upper() filename = f"DISCLOSURE_{i:02d}_{safe_name}.md" - (disclosures_dir / filename).write_text(disclosure) + (disclosures_dir / filename).write_text(disclosure, encoding="utf-8") print(f" -> {disclosures_dir / filename}") diff --git a/libs/openant-core/tests/test_file_io.py b/libs/openant-core/tests/test_file_io.py index 89b763f..a82f3c8 100644 --- a/libs/openant-core/tests/test_file_io.py +++ b/libs/openant-core/tests/test_file_io.py @@ -252,6 +252,25 @@ def _has_binary_mode(call_args: str) -> bool: return re.search(r"""(['"])([rwax+]*b[rwax+]*)\1""", call_args) is not None +def _scan_calls(scrubbed: str, original: str, call_re: re.Pattern): + """Yield (line_number, args_text, original_line) for each call match.""" + for m in call_re.finditer(scrubbed): + i = m.end() + depth = 1 + while i < len(scrubbed) and depth: + ch = scrubbed[i] + if ch == "(": + depth += 1 + elif ch == ")": + depth -= 1 + i += 1 + if depth != 0: + continue + args = original[m.end():i - 1] + line = original[:m.start()].count("\n") + 1 + yield line, args, original.splitlines()[line - 1].strip() + + def test_no_bare_open_in_non_test_code(): """Regression: every text-mode `open(` call in non-test code must specify encoding=, otherwise Windows defaults to cp1252 and crashes on non-ASCII @@ -261,28 +280,81 @@ def test_no_bare_open_in_non_test_code(): for path in _iter_python_sources(CORE_ROOT): text = path.read_text(encoding="utf-8") scrubbed = _strip_strings_and_comments(text) - for m in _OPEN_CALL_RE.finditer(scrubbed): - # Find matching close paren in the SCRUBBED text (parens preserved). - i = m.end() - depth = 1 - while i < len(scrubbed) and depth: - ch = scrubbed[i] - if ch == "(": - depth += 1 - elif ch == ")": - depth -= 1 - i += 1 - if depth != 0: - continue - args = text[m.end():i - 1] + for line, args, src in _scan_calls(scrubbed, text, _OPEN_CALL_RE): if _has_binary_mode(args) or _has_encoding(args): continue - line = text[:m.start()].count("\n") + 1 rel = path.relative_to(CORE_ROOT).as_posix() - offenders.append(f"{rel}:{line}: {text.splitlines()[line - 1].strip()}") + offenders.append(f"{rel}:{line}: {src}") assert not offenders, ( "Found bare open() calls without encoding= in non-test code. " "Use utilities.file_io.open_utf8 / read_json / write_json or pass " "encoding='utf-8' explicitly:\n " + "\n ".join(offenders) ) + + +# Match `.read_text(` / `.write_text(` method calls (any object, including +# Path objects). Don't match `text=` kwargs or other identifiers ending in +# read_text/write_text. +_PATH_TEXT_RE = re.compile(r"\.(?:read_text|write_text)\s*\(") + + +def test_no_bare_pathlib_text_io_in_non_test_code(): + """Regression: ``Path.read_text()`` / ``write_text()`` default to the + system locale encoding on Python <3.10 and to ``locale.getpreferredencoding(False)`` + in 3.10+ unless ``-X utf8`` mode is on. On Windows that is cp1252, which + crashes on non-ASCII content. Every call in non-test code must pass + ``encoding=`` explicitly. + """ + offenders: list[str] = [] + for path in _iter_python_sources(CORE_ROOT): + text = path.read_text(encoding="utf-8") + scrubbed = _strip_strings_and_comments(text) + for line, args, src in _scan_calls(scrubbed, text, _PATH_TEXT_RE): + if _has_encoding(args): + continue + rel = path.relative_to(CORE_ROOT).as_posix() + offenders.append(f"{rel}:{line}: {src}") + + assert not offenders, ( + "Found Path.read_text()/write_text() calls without encoding= in " + "non-test code. Pass encoding='utf-8' explicitly:\n " + + "\n ".join(offenders) + ) + + +# Match `subprocess.run(` (covers `subprocess.run` and `sp.run` etc. via the +# right-hand identifier — restrict to the explicit form to avoid noise). +_SUBPROCESS_RUN_RE = re.compile(r"(? bool: + return ( + re.search(r"\btext\s*=\s*True", call_args) is not None + or re.search(r"\buniversal_newlines\s*=\s*True", call_args) is not None + ) + + +def test_no_bare_text_mode_subprocess_in_non_test_code(): + """Regression: ``subprocess.run(..., text=True)`` decodes stdout/stderr + with the system locale on Windows (cp1252), which crashes on non-ASCII + output from parsers, codeql, etc. Every text-mode subprocess call must + pass ``encoding=`` explicitly (or use ``utilities.file_io.run_utf8``). + """ + offenders: list[str] = [] + for path in _iter_python_sources(CORE_ROOT): + text = path.read_text(encoding="utf-8") + scrubbed = _strip_strings_and_comments(text) + for line, args, src in _scan_calls(scrubbed, text, _SUBPROCESS_RUN_RE): + if not _has_text_mode(args): + continue + if _has_encoding(args): + continue + rel = path.relative_to(CORE_ROOT).as_posix() + offenders.append(f"{rel}:{line}: {src}") + + assert not offenders, ( + "Found subprocess.run(..., text=True) calls without encoding= in " + "non-test code. Pass encoding='utf-8', errors='replace' explicitly " + "(or use utilities.file_io.run_utf8):\n " + "\n ".join(offenders) + )