Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 50 additions & 25 deletions libs/openant-core/parsers/go/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@
from pathlib import Path
from typing import Set


def _stdout_supports_unicode() -> bool:
"""Return True if sys.stdout can emit the symbols we use for status.

Returns False when stdout is piped or redirected (common in CI) and
the encoding cannot be determined — this degrades output to plain ASCII
rather than raising UnicodeEncodeError at runtime.
"""
encoding = getattr(sys.stdout, "encoding", None)
if not encoding:
return False
try:
# Probe with the actual symbols we emit. This catches cp1252 and
# other limited code pages without us having to enumerate them.
"✓✗→".encode(encoding)
return True
except (UnicodeEncodeError, LookupError):
return False


_UNICODE_OK = _stdout_supports_unicode()
SYM_OK = "✓" if _UNICODE_OK else "OK"
SYM_FAIL = "✗" if _UNICODE_OK else "FAIL"
SYM_ARROW = "→" if _UNICODE_OK else "->"

# Add parent directory to path for utilities import
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from utilities.context_enhancer import ContextEnhancer
Expand Down Expand Up @@ -158,7 +183,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict:
}

if result.returncode == 0:
print(f" Success ({elapsed:.2f}s)")
print(f"{SYM_OK} Success ({elapsed:.2f}s)")
print()
# Print stderr (often contains summary info)
if result.stderr:
Expand All @@ -172,7 +197,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict:
data = json.load(f)
stage_result['summary'] = self._summarize_output(name, data)
else:
print(f" Failed (exit code {result.returncode})")
print(f"{SYM_FAIL} Failed (exit code {result.returncode})")
print()
if result.stderr:
print("STDERR:")
Expand All @@ -185,7 +210,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict:

except Exception as e:
elapsed = (datetime.now() - start_time).total_seconds()
print(f" Error: {e}")
print(f"{SYM_FAIL} Error: {e}")
return {
'success': False,
'elapsed_seconds': elapsed,
Expand Down Expand Up @@ -378,17 +403,17 @@ def apply_reachability_filter(self) -> bool:
'summary': summary
}

print(f" Success ({elapsed:.2f}s)")
print(f"{SYM_OK} Success ({elapsed:.2f}s)")
print(f" Entry points detected: {len(self.entry_points)}")
print(f" Units: {original_count} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)")
print(f" Units: {original_count} {SYM_ARROW} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)")
print()

self.results['stages']['reachability_filter'] = result
return True

except Exception as e:
elapsed = (datetime.now() - start_time).total_seconds()
print(f" Error: {e}")
print(f"{SYM_FAIL} Error: {e}")
import traceback
traceback.print_exc()
result = {
Expand Down Expand Up @@ -442,7 +467,7 @@ def run_codeql_analysis(self) -> bool:
)

if result.returncode != 0:
print(f" CodeQL database creation failed")
print(f"{SYM_FAIL} CodeQL database creation failed")
print(f" stderr: {result.stderr[:500] if result.stderr else 'none'}")
elapsed = (datetime.now() - start_time).total_seconds()
self.results['stages']['codeql_analysis'] = {
Expand Down Expand Up @@ -473,7 +498,7 @@ def run_codeql_analysis(self) -> bool:
)

if result.returncode != 0:
print(f" CodeQL analysis failed")
print(f"{SYM_FAIL} CodeQL analysis failed")
print(f" stderr: {result.stderr[:500] if result.stderr else 'none'}")
elapsed = (datetime.now() - start_time).total_seconds()
self.results['stages']['codeql_analysis'] = {
Expand All @@ -489,7 +514,7 @@ def run_codeql_analysis(self) -> bool:
# Step 3: Parse SARIF output
print("Parsing results...")
if not os.path.exists(sarif_output):
print("✗ SARIF output not found")
print(f"{SYM_FAIL} SARIF output not found")
elapsed = (datetime.now() - start_time).total_seconds()
self.results['stages']['codeql_analysis'] = {
'success': False,
Expand Down Expand Up @@ -550,7 +575,7 @@ def run_codeql_analysis(self) -> bool:
'summary': summary
}

print(f" Success ({elapsed:.2f}s)")
print(f"{SYM_OK} Success ({elapsed:.2f}s)")
print(f" Total findings: {len(self.codeql_findings)}")
print(f" Unique files: {summary['unique_files']}")
if summary['by_level']:
Expand All @@ -562,7 +587,7 @@ def run_codeql_analysis(self) -> bool:

except FileNotFoundError:
elapsed = (datetime.now() - start_time).total_seconds()
print("✗ CodeQL not found. Please install CodeQL CLI.")
print(f"{SYM_FAIL} CodeQL not found. Please install CodeQL CLI.")
print(" See: https://docs.github.com/en/code-security/codeql-cli")
self.results['stages']['codeql_analysis'] = {
'success': False,
Expand All @@ -573,7 +598,7 @@ def run_codeql_analysis(self) -> bool:

except subprocess.TimeoutExpired:
elapsed = (datetime.now() - start_time).total_seconds()
print("✗ CodeQL analysis timed out")
print(f"{SYM_FAIL} CodeQL analysis timed out")
self.results['stages']['codeql_analysis'] = {
'success': False,
'elapsed_seconds': elapsed,
Expand All @@ -583,7 +608,7 @@ def run_codeql_analysis(self) -> bool:

except Exception as e:
elapsed = (datetime.now() - start_time).total_seconds()
print(f" Error: {e}")
print(f"{SYM_FAIL} Error: {e}")
import traceback
traceback.print_exc()
self.results['stages']['codeql_analysis'] = {
Expand Down Expand Up @@ -695,18 +720,18 @@ def apply_codeql_filter(self) -> bool:
'summary': summary
}

print(f" Success ({elapsed:.2f}s)")
print(f"{SYM_OK} Success ({elapsed:.2f}s)")
print(f" CodeQL findings: {len(self.codeql_findings)}")
print(f" Flagged function units: {len(self.codeql_flagged_units)}")
print(f" Units: {original_count} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)")
print(f" Units: {original_count} {SYM_ARROW} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)")
print()

self.results['stages']['codeql_filter'] = result
return True

except Exception as e:
elapsed = (datetime.now() - start_time).total_seconds()
print(f" Error: {e}")
print(f"{SYM_FAIL} Error: {e}")
import traceback
traceback.print_exc()
result = {
Expand Down Expand Up @@ -784,14 +809,14 @@ def run_context_enhancer(self) -> bool:
}

print()
print(f" Success ({elapsed:.2f}s)")
print(f"{SYM_OK} Success ({elapsed:.2f}s)")

self.results['stages']['context_enhancer'] = result
return True

except Exception as e:
elapsed = (datetime.now() - start_time).total_seconds()
print(f" Error: {e}")
print(f"{SYM_FAIL} Error: {e}")
import traceback
traceback.print_exc()
result = {
Expand Down Expand Up @@ -873,20 +898,20 @@ def apply_exploitable_filter(self) -> bool:
'summary': summary
}

print(f" Success ({elapsed:.2f}s)")
print(f"{SYM_OK} Success ({elapsed:.2f}s)")
print(f" Classification breakdown:")
for cls, count in sorted(classification_counts.items()):
marker = "→" if cls == "exploitable" else " "
marker = SYM_ARROW if cls == "exploitable" else " "
print(f" {marker} {cls}: {count}")
print(f" Units: {original_count} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)")
print(f" Units: {original_count} {SYM_ARROW} {len(filtered_units)} ({summary['reduction_percentage']}% reduction)")
print()

self.results['stages']['exploitable_filter'] = result
return True

except Exception as e:
elapsed = (datetime.now() - start_time).total_seconds()
print(f" Error: {e}")
print(f"{SYM_FAIL} Error: {e}")
import traceback
traceback.print_exc()
result = {
Expand Down Expand Up @@ -966,13 +991,13 @@ def run_full_pipeline(self):
self.results['success'] = all_success

if all_success:
print("✓ All stages completed successfully")
print(f"{SYM_OK} All stages completed successfully")
else:
print("✗ Some stages failed")
print(f"{SYM_FAIL} Some stages failed")

print()
for stage_name, stage_result in self.results['stages'].items():
status = "✓" if stage_result.get('success') else "✗"
status = SYM_OK if stage_result.get('success') else SYM_FAIL
elapsed = stage_result.get('elapsed_seconds', 0)
print(f" {status} {stage_name}: {elapsed:.2f}s")

Expand Down
Loading
Loading