diff --git a/pyproject.toml b/pyproject.toml index 1b7a1fcc6..221405a88 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ "cryptography >=44.0.0,<45.0.0", "semgrep == 1.113.0", "email-validator >=2.2.0,<3.0.0", + "rich >=13.5.3,<15.0.0", ] keywords = [] # https://pypi.org/classifiers/ diff --git a/src/macaron/__main__.py b/src/macaron/__main__.py index d1180d9bb..a312afd1d 100644 --- a/src/macaron/__main__.py +++ b/src/macaron/__main__.py @@ -20,6 +20,7 @@ ) from macaron.config.defaults import create_defaults, load_defaults from macaron.config.global_config import global_config +from macaron.console import RichConsoleHandler, access_handler from macaron.errors import ConfigurationError from macaron.output_reporter.reporter import HTMLReporter, JSONReporter, PolicyReporter from macaron.policy_engine.policy_engine import run_policy_engine, show_prelude @@ -63,7 +64,8 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None if analyzer_single_args.provenance_expectation is not None: if not os.path.exists(analyzer_single_args.provenance_expectation): logger.critical( - 'The provenance expectation file "%s" does not exist.', analyzer_single_args.provenance_expectation + 'The provenance expectation file "%s" does not exist.', + analyzer_single_args.provenance_expectation, ) sys.exit(os.EX_OSFILE) global_config.load_expectation_files(analyzer_single_args.provenance_expectation) @@ -72,7 +74,8 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None if analyzer_single_args.python_venv is not None: if not os.path.exists(analyzer_single_args.python_venv): logger.critical( - 'The Python virtual environment path "%s" does not exist.', analyzer_single_args.python_venv + 'The Python virtual environment path "%s" does not exist.', + analyzer_single_args.python_venv, ) sys.exit(os.EX_OSFILE) global_config.load_python_venv(analyzer_single_args.python_venv) @@ -95,7 +98,10 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None else: user_provided_local_maven_repo = analyzer_single_args.local_maven_repo if not os.path.isdir(user_provided_local_maven_repo): - logger.error("The user provided local Maven repo at %s is not valid.", user_provided_local_maven_repo) + logger.error( + "The user provided local Maven repo at %s is not valid.", + user_provided_local_maven_repo, + ) sys.exit(os.EX_USAGE) global_config.local_maven_repo = user_provided_local_maven_repo @@ -111,7 +117,8 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None lstrip_blocks=True, ) html_reporter = HTMLReporter( - env=custom_jinja_env, target_template=os.path.basename(analyzer_single_args.template_path) + env=custom_jinja_env, + target_template=os.path.basename(analyzer_single_args.template_path), ) if not html_reporter.template: logger.error("Exiting because the custom template cannot be found.") @@ -207,8 +214,11 @@ def verify_policy(verify_policy_args: argparse.Namespace) -> int: result = run_policy_engine(verify_policy_args.database, policy_content) vsa = generate_vsa(policy_content=policy_content, policy_result=result) + # Retrieve the console handler previously configured via the access_handler. + rich_handler = access_handler.get_handler() if vsa is not None: vsa_filepath = os.path.join(global_config.output_path, "vsa.intoto.jsonl") + rich_handler.update_vsa(os.path.relpath(vsa_filepath, os.getcwd())) logger.info( "Generating the Verification Summary Attestation (VSA) to %s.", os.path.relpath(vsa_filepath, os.getcwd()), @@ -222,8 +232,12 @@ def verify_policy(verify_policy_args: argparse.Namespace) -> int: file.write(json.dumps(vsa)) except OSError as err: logger.error( - "Could not generate the VSA to %s. Error: %s", os.path.relpath(vsa_filepath, os.getcwd()), err + "Could not generate the VSA to %s. Error: %s", + os.path.relpath(vsa_filepath, os.getcwd()), + err, ) + else: + rich_handler.update_vsa("No VSA generated.") policy_reporter = PolicyReporter() policy_reporter.generate(global_config.output_path, result) @@ -290,16 +304,23 @@ def find_source(find_args: argparse.Namespace) -> int: def perform_action(action_args: argparse.Namespace) -> None: """Perform the indicated action of Macaron.""" + rich_handler = access_handler.get_handler() match action_args.action: case "dump-defaults": + if not action_args.disable_rich_output: + rich_handler.start("dump-defaults") # Create the defaults.ini file in the output dir and exit. create_defaults(action_args.output_dir, os.getcwd()) sys.exit(os.EX_OK) case "verify-policy": + if not action_args.disable_rich_output: + rich_handler.start("verify-policy") sys.exit(verify_policy(action_args)) case "analyze": + if not action_args.disable_rich_output: + rich_handler.start("analyze") if not global_config.gh_token: logger.error("GitHub access token not set.") sys.exit(os.EX_USAGE) @@ -317,6 +338,8 @@ def perform_action(action_args: argparse.Namespace) -> None: analyze_slsa_levels_single(action_args) case "find-source": + if not action_args.disable_rich_output: + rich_handler.start("find-source") try: for git_service in GIT_SERVICES: git_service.load_defaults() @@ -329,6 +352,8 @@ def perform_action(action_args: argparse.Namespace) -> None: find_source(action_args) case "gen-build-spec": + if not action_args.disable_rich_output: + rich_handler.start("gen-build-spec") sys.exit(gen_build_spec(action_args)) case _: @@ -393,6 +418,13 @@ def main(argv: list[str] | None = None) -> None: action="store_true", ) + main_parser.add_argument( + "--disable-rich-output", + default=False, + help="Disable Rich UI output", + action="store_true", + ) + main_parser.add_argument( "-o", "--output-dir", @@ -531,7 +563,10 @@ def main(argv: list[str] | None = None) -> None: ) # Dump the default values. - sub_parser.add_parser(name="dump-defaults", description="Dumps the defaults.ini file to the output directory.") + sub_parser.add_parser( + name="dump-defaults", + description="Dumps the defaults.ini file to the output directory.", + ) # Verify the Datalog policy. vp_parser = sub_parser.add_parser(name="verify-policy") @@ -593,65 +628,98 @@ def main(argv: list[str] | None = None) -> None: main_parser.print_help() sys.exit(os.EX_USAGE) - if args.verbose: - log_level = logging.DEBUG - log_format = "%(asctime)s [%(name)s:%(funcName)s:%(lineno)d] [%(levelname)s] %(message)s" - else: - log_level = logging.INFO - log_format = "%(asctime)s [%(levelname)s] %(message)s" - # Set global logging config. We need the stream handler for the initial # output directory checking log messages. - st_handler = logging.StreamHandler(sys.stdout) - logging.basicConfig(format=log_format, handlers=[st_handler], force=True, level=log_level) + st_handler: logging.StreamHandler = logging.StreamHandler(sys.stdout) + rich_handler: RichConsoleHandler = access_handler.set_handler(args.verbose) + if args.disable_rich_output: + if args.verbose: + log_level = logging.DEBUG + log_format = "%(asctime)s [%(name)s:%(funcName)s:%(lineno)d] [%(levelname)s] %(message)s" + else: + log_level = logging.INFO + log_format = "%(asctime)s [%(levelname)s] %(message)s" + st_handler = logging.StreamHandler(sys.stdout) + logging.basicConfig(format=log_format, handlers=[st_handler], force=True, level=log_level) + else: + if args.verbose: + log_level = logging.DEBUG + log_format = "%(asctime)s [%(name)s:%(funcName)s:%(lineno)d] %(message)s" + else: + log_level = logging.INFO + log_format = "%(asctime)s %(message)s" + rich_handler = access_handler.set_handler(args.verbose) + logging.basicConfig(format=log_format, handlers=[rich_handler], force=True, level=log_level) - # Set the output directory. - if not args.output_dir: - logger.error("The output path cannot be empty. Exiting ...") - sys.exit(os.EX_USAGE) + try: + # Set the output directory. + if not args.output_dir: + logger.error("The output path cannot be empty. Exiting ...") + sys.exit(os.EX_USAGE) - if os.path.isfile(args.output_dir): - logger.error("The output directory already exists. Exiting ...") - sys.exit(os.EX_USAGE) + if os.path.isfile(args.output_dir): + logger.error("The output directory already exists. Exiting ...") + sys.exit(os.EX_USAGE) - if os.path.isdir(args.output_dir): - logger.info("Setting the output directory to %s", os.path.relpath(args.output_dir, os.getcwd())) - else: - logger.info("No directory at %s. Creating one ...", os.path.relpath(args.output_dir, os.getcwd())) - os.makedirs(args.output_dir) - - # Add file handler to the root logger. Remove stream handler from the - # root logger to prevent dependencies printing logs to stdout. - debug_log_path = os.path.join(args.output_dir, "debug.log") - log_file_handler = logging.FileHandler(debug_log_path, "w") - log_file_handler.setFormatter(logging.Formatter(log_format)) - logging.getLogger().removeHandler(st_handler) - logging.getLogger().addHandler(log_file_handler) - - # Add StreamHandler to the Macaron logger only. - mcn_logger = logging.getLogger("macaron") - mcn_logger.addHandler(st_handler) - - logger.info("The logs will be stored in debug.log") - - # Set Macaron's global configuration. - # The path to provenance expectation files will be updated if - # set through analyze sub-command. - global_config.load( - macaron_path=macaron.MACARON_PATH, - output_path=args.output_dir, - build_log_path=os.path.join(args.output_dir, "build_log"), - debug_level=log_level, - local_repos_path=args.local_repos_path, - resources_path=os.path.join(macaron.MACARON_PATH, "resources"), - ) + if os.path.isdir(args.output_dir): + logger.info( + "Setting the output directory to %s", + os.path.relpath(args.output_dir, os.getcwd()), + ) + else: + logger.info( + "No directory at %s. Creating one ...", + os.path.relpath(args.output_dir, os.getcwd()), + ) + os.makedirs(args.output_dir) + + # Add file handler to the root logger. Remove stream handler from the + # root logger to prevent dependencies printing logs to stdout. + debug_log_path = os.path.join(args.output_dir, "debug.log") + log_file_handler = logging.FileHandler(debug_log_path, "w") + log_file_handler.setFormatter(logging.Formatter(log_format)) + if args.disable_rich_output: + logging.getLogger().removeHandler(st_handler) + else: + logging.getLogger().removeHandler(rich_handler) + logging.getLogger().addHandler(log_file_handler) + + # Add StreamHandler to the Macaron logger only. + mcn_logger = logging.getLogger("macaron") + if args.disable_rich_output: + mcn_logger.addHandler(st_handler) + else: + mcn_logger.addHandler(rich_handler) + + logger.info("The logs will be stored in debug.log") + + # Set Macaron's global configuration. + # The path to provenance expectation files will be updated if + # set through analyze sub-command. + global_config.load( + macaron_path=macaron.MACARON_PATH, + output_path=args.output_dir, + build_log_path=os.path.join(args.output_dir, "build_log"), + debug_level=log_level, + local_repos_path=args.local_repos_path, + resources_path=os.path.join(macaron.MACARON_PATH, "resources"), + ) - # Load the default values from defaults.ini files. - if not load_defaults(args.defaults_path): - logger.error("Exiting because the defaults configuration could not be loaded.") - sys.exit(os.EX_NOINPUT) + # Load the default values from defaults.ini files. + if not load_defaults(args.defaults_path): + logger.error("Exiting because the defaults configuration could not be loaded.") + sys.exit(os.EX_NOINPUT) - perform_action(args) + perform_action(args) + except KeyboardInterrupt: + if not args.disable_rich_output: + rich_handler.error("Macaron failed: Interrupted by user") + sys.exit(os.EX_SOFTWARE) + finally: + if args.disable_rich_output: + st_handler.close() + else: + rich_handler.close() def _get_token_from_dict_or_env(token: str, token_dict: dict[str, str]) -> str: diff --git a/src/macaron/build_spec_generator/build_spec_generator.py b/src/macaron/build_spec_generator/build_spec_generator.py index 4262f7e6a..dd1217cd0 100644 --- a/src/macaron/build_spec_generator/build_spec_generator.py +++ b/src/macaron/build_spec_generator/build_spec_generator.py @@ -14,6 +14,7 @@ from macaron.build_spec_generator.build_command_patcher import PatchCommandBuildTool, PatchValueType from macaron.build_spec_generator.reproducible_central.reproducible_central import gen_reproducible_central_build_spec +from macaron.console import access_handler from macaron.path_utils.purl_based_path import get_purl_based_dir logger: logging.Logger = logging.getLogger(__name__) @@ -131,6 +132,8 @@ def gen_build_spec_for_purl( build_spec_format.value, os.path.relpath(build_spec_filepath, os.getcwd()), ) + rich_handler = access_handler.get_handler() + rich_handler.update_gen_build_spec("Build Spec Path:", os.path.relpath(build_spec_filepath, os.getcwd())) try: with open(build_spec_filepath, mode="w", encoding="utf-8") as file: file.write(build_spec_content) diff --git a/src/macaron/build_spec_generator/reproducible_central/reproducible_central.py b/src/macaron/build_spec_generator/reproducible_central/reproducible_central.py index 326eea794..305d42be4 100644 --- a/src/macaron/build_spec_generator/reproducible_central/reproducible_central.py +++ b/src/macaron/build_spec_generator/reproducible_central/reproducible_central.py @@ -23,6 +23,7 @@ lookup_build_tools_check, lookup_latest_component, ) +from macaron.console import access_handler from macaron.errors import QueryMacaronDatabaseError from macaron.slsa_analyzer.checks.build_tool_check import BuildToolFacts @@ -253,6 +254,11 @@ def get_rc_build_tool_name( BuildToolFacts.__tablename__, [(fact.build_tool_name, fact.language) for fact in build_tool_facts], ) + rich_handler = access_handler.get_handler() + rich_handler.update_gen_build_spec( + "Build Tools:", + "\n".join([f"{fact.build_tool_name} ({fact.language})" for fact in build_tool_facts]), + ) return _get_rc_build_tool_name_from_build_facts(build_tool_facts) @@ -349,8 +355,13 @@ def gen_reproducible_central_build_spec( group = purl.namespace artifact = purl.name version = purl.version + rich_handler = access_handler.get_handler() + rich_handler.update_gen_build_spec("Package URL:", purl.to_string()) if group is None or version is None: logger.error("Missing group and/or version for purl %s.", purl.to_string()) + rich_handler.update_gen_build_spec("Repository URL:", "[red]FAILED[/]") + rich_handler.update_gen_build_spec("Commit Hash:", "[red]FAILED[/]") + rich_handler.update_gen_build_spec("Build Tools:", "[red]FAILED[/]") return None try: @@ -371,6 +382,9 @@ def gen_reproducible_central_build_spec( + "Please check if an analysis for it exists in the database.", purl.to_string(), ) + rich_handler.update_gen_build_spec("Repository URL:", "[red]FAILED[/]") + rich_handler.update_gen_build_spec("Commit Hash:", "[red]FAILED[/]") + rich_handler.update_gen_build_spec("Build Tools:", "[red]FAILED[/]") return None latest_component_repository = latest_component.repository @@ -386,6 +400,8 @@ def gen_reproducible_central_build_spec( latest_component_repository.remote_path, latest_component_repository.commit_sha, ) + rich_handler.update_gen_build_spec("Repository URL:", latest_component_repository.remote_path) + rich_handler.update_gen_build_spec("Commit Hash:", latest_component_repository.commit_sha) # Getting the RC build tool name from the build tool check facts. rc_build_tool_name = get_rc_build_tool_name( diff --git a/src/macaron/config/defaults.py b/src/macaron/config/defaults.py index 0ac469604..a5b487c0b 100644 --- a/src/macaron/config/defaults.py +++ b/src/macaron/config/defaults.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module provides functions to manage default values.""" @@ -9,6 +9,8 @@ import pathlib import shutil +from macaron.console import access_handler + logger: logging.Logger = logging.getLogger(__name__) @@ -162,14 +164,17 @@ def create_defaults(output_path: str, cwd_path: str) -> bool: # Since we have only one defaults.ini file and ConfigParser.write does not # preserve the comments, copy the file directly. dest_path = os.path.join(output_path, "defaults.ini") + rich_handler = access_handler.get_handler() try: shutil.copy2(src_path, dest_path) logger.info( "Dumped the default values in %s.", os.path.relpath(os.path.join(output_path, "defaults.ini"), cwd_path), ) + rich_handler.update_dump_defaults(os.path.relpath(dest_path, cwd_path)) return True # We catch OSError to support errors on different platforms. except OSError as error: logger.error("Failed to create %s: %s.", os.path.relpath(dest_path, cwd_path), error) + rich_handler.update_dump_defaults("[bold red]Failed[/]") return False diff --git a/src/macaron/console.py b/src/macaron/console.py new file mode 100644 index 000000000..f1d9a1a18 --- /dev/null +++ b/src/macaron/console.py @@ -0,0 +1,546 @@ +# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module implements a rich console handler for logging.""" + +import logging +import time +from typing import Any + +from rich.console import Group, RenderableType +from rich.live import Live +from rich.logging import RichHandler +from rich.panel import Panel +from rich.progress import BarColumn, MofNCompleteColumn, Progress, TaskID, TextColumn +from rich.rule import Rule +from rich.status import Status +from rich.table import Table + + +class RichConsoleHandler(RichHandler): + """A rich console handler for logging with rich formatting and live updates.""" + + def __init__(self, *args: Any, verbose: bool = False, **kwargs: Any) -> None: + """ + Initialize the RichConsoleHandler. + + Parameters + ---------- + verbose : bool, optional + if True, enables verbose logging, by default False + args + Variable length argument list. + kwargs + Arbitrary keyword arguments. + """ + super().__init__(*args, **kwargs) + self.setLevel(logging.DEBUG) + self.command = "" + self.logs: list[str] = [] + self.description_table = Table(show_header=False, box=None) + self.description_table_content: dict[str, str | Status] = { + "Package URL:": Status("[green]Processing[/]"), + "Local Cloned Path:": Status("[green]Processing[/]"), + "Remote Path:": Status("[green]Processing[/]"), + "Branch:": Status("[green]Processing[/]"), + "Commit Hash:": Status("[green]Processing[/]"), + "Commit Date:": Status("[green]Processing[/]"), + "Excluded Checks:": Status("[green]Processing[/]"), + "Final Checks:": Status("[green]Processing[/]"), + "CI Services:": Status("[green]Processing[/]"), + "Build Tools:": Status("[green]Processing[/]"), + } + self.progress = Progress( + TextColumn(" RUNNING ANALYSIS"), + BarColumn(bar_width=None, complete_style="green"), + MofNCompleteColumn(), + ) + self.task_id: TaskID + self.progress_table = Table(show_header=False, box=None) + self.checks: dict[str, str] = {} + self.failed_checks_table = Table(show_header=False, box=None) + self.summary_table = Table(show_header=False, box=None) + self.report_table = Table(show_header=False, box=None) + self.reports = { + "HTML Report": "Not Generated", + "Dependencies Report": "Not Generated", + "JSON Report": "Not Generated", + } + self.components_violates_table = Table(show_header=False, box=None) + self.components_satisfy_table = Table(show_header=False, box=None) + self.policy_summary_table = Table(show_header=False, box=None) + self.policy_summary: dict[str, str | Status] = { + "Passed Policies": "None", + "Failed Policies": "None", + "Policy Report": Status("[green]Generating[/]"), + } + self.verification_summary_attestation: str | None = None + self.find_source_table = Table(show_header=False, box=None) + self.find_source_content: dict[str, str | Status] = { + "Repository URL:": Status("[green]Processing[/]"), + "Commit Hash:": Status("[green]Processing[/]"), + "JSON Report:": "Not Generated", + } + for key, value in self.find_source_content.items(): + self.find_source_table.add_row(key, value) + self.dump_defaults: str | Status = Status("[green]Generating[/]") + self.gen_build_spec: dict[str, str | Status] = { + "Package URL:": Status("[green]Processing[/]"), + "Repository URL:": Status("[green]Processing[/]"), + "Commit Hash:": Status("[green]Processing[/]"), + "Build Tools:": Status("[green]Processing[/]"), + "Build Spec Path:": "Not Generated", + } + self.gen_build_spec_table = Table(show_header=False, box=None) + for key, value in self.gen_build_spec.items(): + self.gen_build_spec_table.add_row(key, value) + self.verbose = verbose + self.verbose_panel = Panel( + "\n".join(self.logs), + title="Verbose Mode", + title_align="left", + border_style="blue", + ) + self.error_message: str = "" + self.live = Live(get_renderable=self.make_layout, refresh_per_second=10) + + def emit(self, record: logging.LogRecord) -> None: + """ + Emit a log record with rich formatting. + + Parameters + ---------- + record : logging.LogRecord + The log record to be emitted. + """ + log_time = time.strftime("%H:%M:%S") + msg = self.format(record) + + if record.levelno >= logging.ERROR: + self.logs.append(f"[red][ERROR][/red] {log_time} {msg}") + elif record.levelno >= logging.WARNING: + self.logs.append(f"[yellow][WARNING][/yellow] {log_time} {msg}") + else: + self.logs.append(f"[blue][INFO][/blue] {log_time} {msg}") + + self.verbose_panel.renderable = "\n".join(self.logs) + + def add_description_table_content(self, key: str, value: str | Status) -> None: + """ + Add or update a key-value pair in the description table. + + Parameters + ---------- + key : str + The key to be added or updated. + value : str or Status + The value associated with the key. + """ + self.description_table_content[key] = value + description_table = Table(show_header=False, box=None) + description_table.add_column("Details", justify="left") + description_table.add_column("Value", justify="left") + for field, content in self.description_table_content.items(): + description_table.add_row(field, content) + + self.description_table = description_table + + def no_of_checks(self, value: int) -> None: + """ + Initialize the progress bar with the total number of checks. + + Parameters + ---------- + value : int + The total number of checks to be performed. + """ + self.task_id = self.progress.add_task("analyzing", total=value) + + def update_checks(self, check_id: str, status: str = "RUNNING") -> None: + """ + Update the status of a specific check and refresh the progress table. + + Parameters + ---------- + check_id : str + The identifier of the check to be updated. + status : str, optional + The new status of the check, by default "RUNNING" + """ + self.checks[check_id] = status + + progress_table = Table(show_header=False, box=None) + progress_table.add_column("Status", justify="left") + progress_table.add_column("Check", justify="left") + + for check_name, check_status in self.checks.items(): + if check_status == "RUNNING": + progress_table.add_row(Status("[bold green]RUNNING[/]"), check_name) + self.progress_table = progress_table + + if self.task_id is not None and status != "RUNNING": + self.progress.update(self.task_id, advance=1) + + def update_checks_summary(self, checks_summary: dict, total_checks: int) -> None: + """ + Update the summary tables with the results of the checks. + + Parameters + ---------- + checks_summary : dict + Dictionary containing lists of checks categorized by their results. + total_checks : int + The total number of checks. + """ + failed_checks_table = Table(show_header=False, box=None) + failed_checks_table.add_column("Status", justify="left") + failed_checks_table.add_column("Check ID", justify="left") + failed_checks_table.add_column("Description", justify="left") + + failed_checks = checks_summary["FAILED"] + for check in failed_checks: + failed_checks_table.add_row( + "[bold red]FAILED[/]", + check.check.check_id, + check.check.check_description, + ) + + self.failed_checks_table = failed_checks_table + + summary_table = Table(show_header=False, box=None) + summary_table.add_column("Check Result Type", justify="left") + summary_table.add_column("Count", justify="left") + summary_table.add_row("Total Checks", str(total_checks), style="white") + + for check_result_type, checks in checks_summary.items(): + if check_result_type == "PASSED": + summary_table.add_row("PASSED", str(len(checks)), style="green") + if check_result_type == "FAILED": + summary_table.add_row("FAILED", str(len(checks)), style="red") + if check_result_type == "SKIPPED": + summary_table.add_row("SKIPPED", str(len(checks)), style="yellow") + if check_result_type == "DISABLED": + summary_table.add_row("DISABLED", str(len(checks)), style="bright_blue") + if check_result_type == "UNKNOWN": + summary_table.add_row("UNKNOWN", str(len(checks)), style="white") + + self.summary_table = summary_table + + def update_report_table(self, report_type: str, report_path: str) -> None: + """ + Update the report table with the path of a generated report. + + Parameters + ---------- + report_type : str + The type of the report (e.g., "HTML Report", "JSON Report"). + report_path : str + The relative path to the generated report. + """ + self.reports[report_type] = report_path + report_table = Table(show_header=False, box=None) + report_table.add_column("Report Type", justify="left") + report_table.add_column("Report Path", justify="left") + + for report_detail, report_value in self.reports.items(): + report_table.add_row(report_detail, report_value, style="blue") + + self.report_table = report_table + + def generate_policy_summary_table(self) -> None: + """Generate the policy summary table based on the current policy summary data.""" + policy_summary_table = Table(show_header=False, box=None) + policy_summary_table.add_column("Detail", justify="left") + policy_summary_table.add_column("Value", justify="left") + + policy_summary_table.add_row( + "[bold green]Passed Policies[/]", + self.policy_summary["Passed Policies"], + ) + policy_summary_table.add_row( + "[bold red]Failed Policies[/]", + self.policy_summary["Failed Policies"], + ) + policy_summary_table.add_row("[bold blue]Policy Report[/]", self.policy_summary["Policy Report"]) + + self.policy_summary_table = policy_summary_table + + def update_policy_report(self, report_path: str) -> None: + """ + Update the policy report path in the policy summary. + + Parameters + ---------- + report_path : str + The relative path to the policy report. + """ + self.policy_summary["Policy Report"] = report_path + self.generate_policy_summary_table() + + def update_vsa(self, vsa_path: str) -> None: + """ + Update the verification summary attestation path. + + Parameters + ---------- + vsa_path : str + The relative path to the verification summary attestation. + """ + self.verification_summary_attestation = vsa_path + + def update_policy_engine(self, results: dict) -> None: + """ + Update the policy engine results including components that violate or satisfy policies. + + Parameters + ---------- + results : dict + Dictionary containing policy engine results including components that violate or satisfy policies, + and lists of passed and failed policies. + """ + components_violates_table = Table(show_header=False, box=None) + components_violates_table.add_column("Assign No.", justify="left") + components_violates_table.add_column("Component", justify="left") + components_violates_table.add_column("Policy", justify="left") + + for values in results["component_violates_policy"]: + components_violates_table.add_row(values[0], values[1], values[2]) + + self.components_violates_table = components_violates_table + + components_satisfy_table = Table(show_header=False, box=None) + components_satisfy_table.add_column("Assign No.", justify="left") + components_satisfy_table.add_column("Component", justify="left") + components_satisfy_table.add_column("Policy", justify="left") + + for values in results["component_satisfies_policy"]: + components_satisfy_table.add_row(values[0], values[1], values[2]) + + self.components_satisfy_table = components_satisfy_table + + self.policy_summary["Passed Policies"] = ( + "\n".join(policy[0] for policy in results["passed_policies"]) if results["passed_policies"] else "None" + ) + self.policy_summary["Failed Policies"] = ( + "\n".join(policy[0] for policy in results["failed_policies"]) if results["failed_policies"] else "None" + ) + + self.generate_policy_summary_table() + + def update_find_source_table(self, key: str, value: str | Status) -> None: + """ + Add or update a key-value pair in the find source table. + + Parameters + ---------- + key : str + The key to be added or updated. + value : str or Status + The value associated with the key. + """ + self.find_source_content[key] = value + find_source_table = Table(show_header=False, box=None) + find_source_table.add_column("Details", justify="left") + find_source_table.add_column("Value", justify="left") + for field, content in self.find_source_content.items(): + find_source_table.add_row(field, content) + self.find_source_table = find_source_table + + def update_dump_defaults(self, value: str | Status) -> None: + """ + Update the dump defaults value. + + Parameters + ---------- + value : str or Status + The value to be set for dump defaults. + """ + self.dump_defaults = value + + def update_gen_build_spec(self, key: str, value: str | Status) -> None: + """ + Add or update a key-value pair in the generate build spec table. + + Parameters + ---------- + key : str + The key to be added or updated. + value : str or Status + The value associated with the key. + """ + self.gen_build_spec[key] = value + gen_build_spec_table = Table(show_header=False, box=None) + gen_build_spec_table.add_column("Details", justify="left") + gen_build_spec_table.add_column("Value", justify="left") + for field, content in self.gen_build_spec.items(): + gen_build_spec_table.add_row(field, content) + self.gen_build_spec_table = gen_build_spec_table + + def make_layout(self) -> Group: + """ + Create the layout for the live console display. + + Returns + ------- + Group + A rich Group object containing the layout for the live console display. + """ + layout: list[RenderableType] = [] + if self.command == "analyze": + layout = layout + [Rule(" DESCRIPTION", align="left")] + if self.description_table.row_count > 0: + layout = layout + ["", self.description_table] + if self.progress_table.row_count > 0: + layout = layout + ["", self.progress, "", self.progress_table] + if self.failed_checks_table.row_count > 0: + layout = layout + [ + "", + Rule(" SUMMARY", align="left"), + "", + self.failed_checks_table, + ] + if self.summary_table.row_count > 0: + layout = layout + ["", self.summary_table] + if self.report_table.row_count > 0: + layout = layout + [ + self.report_table, + ] + elif self.summary_table.row_count > 0: + layout = layout + [ + "", + Rule(" SUMMARY", align="left"), + "", + self.summary_table, + ] + if self.report_table.row_count > 0: + layout = layout + [ + self.report_table, + ] + elif self.command == "verify-policy": + if self.policy_summary_table.row_count > 0: + if self.components_violates_table.row_count > 0: + layout = layout + [ + "[bold red] Components Violate Policy[/]", + self.components_violates_table, + ] + else: + layout = layout + [ + "[bold red] Components Violate Policy[/] [white not italic]None[/]", + ] + if self.components_satisfy_table.row_count > 0: + layout = layout + [ + "", + "[bold green] Components Satisfy Policy[/]", + self.components_satisfy_table, + ] + else: + layout = layout + [ + "", + "[bold green] Components Satisfy Policy[/] [white not italic]None[/]", + ] + layout = layout + ["", self.policy_summary_table] + if self.verification_summary_attestation: + vsa_table = Table(show_header=False, box=None) + vsa_table.add_column("Detail", justify="left") + vsa_table.add_column("Value", justify="left") + + vsa_table.add_row( + "[bold blue]Verification Summary Attestation[/]", + self.verification_summary_attestation, + ) + vsa_table.add_row( + "[bold blue]Decode and Inspect the Content[/]", + f"cat {self.verification_summary_attestation} | jq -r [white]'.payload'[/] | base64 -d | jq", + ) + + layout = layout + [vsa_table] + elif self.command == "find-source": + if self.find_source_table.row_count > 0: + layout = layout + [self.find_source_table] + elif self.command == "dump-defaults": + dump_defaults_table = Table(show_header=False, box=None) + dump_defaults_table.add_column("Detail", justify="left") + dump_defaults_table.add_column("Value", justify="left") + dump_defaults_table.add_row("Dump Defaults", self.dump_defaults) + layout = layout + [dump_defaults_table] + elif self.command == "gen-build-spec": + if self.gen_build_spec_table.row_count > 0: + layout = layout + [self.gen_build_spec_table] + if self.verbose: + layout = layout + ["", self.verbose_panel] + if self.error_message: + error_panel = Panel( + self.error_message, + title="Error", + title_align="left", + border_style="red", + ) + layout = layout + ["", error_panel] + return Group(*layout) + + def error(self, message: str) -> None: + """ + Handle error logging. + + Parameters + ---------- + message : str + The error message to be logged. + """ + self.error_message = message + + def start(self, command: str) -> None: + """ + Start the live console display. + + Parameters + ---------- + command : str + The command being executed (e.g., "analyze", "verify-policy"). + """ + self.command = command + if not self.live.is_started: + self.live.start() + + def close(self) -> None: + """Stop the live console display.""" + self.live.stop() + + +class AccessHandler: + """A class to manage access to the RichConsoleHandler instance.""" + + def __init__(self) -> None: + """Initialize the AccessHandler with a default RichConsoleHandler instance.""" + self.rich_handler = RichConsoleHandler() + + def set_handler(self, verbose: bool) -> RichConsoleHandler: + """ + Set a new RichConsoleHandler instance with the specified verbosity. + + Parameters + ---------- + verbose : bool + if True, enables verbose logging + + Returns + ------- + RichConsoleHandler + The new RichConsoleHandler instance. + """ + self.rich_handler = RichConsoleHandler(verbose=verbose) + return self.rich_handler + + def get_handler(self) -> RichConsoleHandler: + """ + Get the current RichConsoleHandler instance. + + Returns + ------- + RichConsoleHandler + The current RichConsoleHandler instance. + """ + return self.rich_handler + + +access_handler = AccessHandler() diff --git a/src/macaron/output_reporter/reporter.py b/src/macaron/output_reporter/reporter.py index 78464e13d..c676fcec9 100644 --- a/src/macaron/output_reporter/reporter.py +++ b/src/macaron/output_reporter/reporter.py @@ -19,6 +19,7 @@ ) import macaron.output_reporter.jinja2_extensions as jinja2_extensions # pylint: disable=consider-using-from-import +from macaron.console import access_handler from macaron.output_reporter.results import Report from macaron.output_reporter.scm import SCMStatus @@ -42,6 +43,7 @@ def __init__(self, mode: str = "w", encoding: str = "utf-8"): """ self.mode = mode self.encoding = encoding + self.rich_handler = access_handler.get_handler() def write_file(self, file_path: str, data: str) -> bool: """Write the data into a file. @@ -64,7 +66,11 @@ def write_file(self, file_path: str, data: str) -> bool: file.write(data) return True except OSError as error: - logger.error("Cannot write to %s. Error: %s", os.path.relpath(file_path, os.getcwd()), error) + logger.error( + "Cannot write to %s. Error: %s", + os.path.relpath(file_path, os.getcwd()), + error, + ) return False @abc.abstractmethod @@ -115,18 +121,21 @@ def generate(self, target_dir: str, report: Report | dict) -> None: report: Report | dict The report to be generated. """ + self.rich_handler = access_handler.get_handler() if not isinstance(report, Report): return try: dep_file_name = os.path.join(target_dir, "dependencies.json") serialized_configs = list(report.get_serialized_configs()) self.write_file(dep_file_name, json.dumps(serialized_configs, indent=self.indent)) + self.rich_handler.update_report_table("Dependencies Report", os.path.relpath(dep_file_name, os.getcwd())) for record in report.get_records(): if record.context and record.status == SCMStatus.AVAILABLE: file_name = os.path.join(target_dir, f"{record.context.component.report_file_name}.json") json_data = json.dumps(record.get_dict(), indent=self.indent) self.write_file(file_name, json_data) + self.rich_handler.update_report_table("JSON Report", os.path.relpath(file_name, os.getcwd())) except TypeError as error: logger.critical("Cannot serialize output report to JSON: %s", error) @@ -207,6 +216,7 @@ def generate(self, target_dir: str, report: Report | dict) -> None: report: Report | dict The report to be generated. """ + self.rich_handler = access_handler.get_handler() if not self.template or not isinstance(report, Report): return @@ -218,6 +228,7 @@ def generate(self, target_dir: str, report: Report | dict) -> None: # in the original data. html = self.template.render(deepcopy(record.get_dict())) self.write_file(file_name, html) + self.rich_handler.update_report_table("HTML Report", os.path.relpath(file_name, os.getcwd())) except TemplateSyntaxError as error: location = f"line {error.lineno}" name = error.filename or error.name @@ -261,9 +272,16 @@ def generate(self, target_dir: str, report: Report | dict) -> None: report: Report | dict The report to be generated. """ + self.rich_handler = access_handler.get_handler() if not isinstance(report, dict): return try: - self.write_file(os.path.join(target_dir, "policy_report.json"), json.dumps(report, indent=self.indent)) + self.write_file( + os.path.join(target_dir, "policy_report.json"), + json.dumps(report, indent=self.indent), + ) + self.rich_handler.update_policy_report( + os.path.relpath(os.path.join(target_dir, "policy_report.json"), os.getcwd()) + ) except (TypeError, ValueError, OSError) as error: logger.critical("Cannot serialize the policy report to JSON: %s", error) diff --git a/src/macaron/output_reporter/results.py b/src/macaron/output_reporter/results.py index 5bf8c8806..dddd636a3 100644 --- a/src/macaron/output_reporter/results.py +++ b/src/macaron/output_reporter/results.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains classes that represent the result of the Macaron analysis.""" @@ -9,6 +9,7 @@ from typing import Generic, TypedDict, TypeVar from macaron.config.target_config import Configuration +from macaron.console import access_handler from macaron.output_reporter.scm import SCMStatus from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.checks.check_result import CheckResultType @@ -199,6 +200,7 @@ def __init__(self, root_record: Record) -> None: self.record_mapping: dict[str, Record] = {} if root_record.context: self.record_mapping[root_record.record_id] = root_record + self.rich_handler = access_handler.get_handler() def get_records(self) -> Iterable[Record]: """Get the generator for all records in the report. @@ -297,6 +299,7 @@ def __str__(self) -> str: """Return the string representation of the Report instance.""" ctx_list = list(self.get_ctxs()) main_ctx: AnalyzeContext = ctx_list.pop(0) + self.rich_handler = access_handler.get_handler() output = "".join( [ @@ -306,6 +309,7 @@ def __str__(self) -> str: "\nSLSA REQUIREMENT RESULTS:\n", ] ) + self.rich_handler.update_checks_summary(main_ctx.get_check_summary(), len(main_ctx.check_results)) slsa_req_mesg: dict[SLSALevels, list[str]] = {level: [] for level in SLSALevels if level != SLSALevels.LEVEL0} for req_name, req_status in main_ctx.ctx_data.items(): @@ -320,7 +324,12 @@ def __str__(self) -> str: dep_req = dep.ctx_data.get(ReqName(req.name)) if dep_req and not dep_req.is_pass: fail_count += 1 - message = "".join([message, f" (and {fail_count}/{len(ctx_list)} dependencies FAILED)"]) + message = "".join( + [ + message, + f" (and {fail_count}/{len(ctx_list)} dependencies FAILED)", + ] + ) slsa_req_mesg[req.min_level_required].append(message) diff --git a/src/macaron/policy_engine/policy_engine.py b/src/macaron/policy_engine/policy_engine.py index 1b9bec29c..e815d48f4 100644 --- a/src/macaron/policy_engine/policy_engine.py +++ b/src/macaron/policy_engine/policy_engine.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module handles invoking the souffle policy engine on a database.""" @@ -10,6 +10,7 @@ from sqlalchemy import MetaData, create_engine, select from macaron import __version__ as mcn_version +from macaron.console import access_handler from macaron.database.table_definitions import Analysis from macaron.policy_engine.souffle import SouffleError, SouffleWrapper from macaron.policy_engine.souffle_code_generator import ( @@ -58,7 +59,11 @@ def get_generated(database_path: os.PathLike | str) -> SouffleProgram: return prelude -def copy_prelude(database_path: os.PathLike | str, sfl: SouffleWrapper, prelude: SouffleProgram | None = None) -> None: +def copy_prelude( + database_path: os.PathLike | str, + sfl: SouffleWrapper, + prelude: SouffleProgram | None = None, +) -> None: """ Generate and copy the prelude into the souffle instance's include directory. @@ -132,7 +137,10 @@ def _check_version(database_path: str) -> None: ).scalar() if versions is not None: logger.error("Database generated with unsupported versions (%s).", versions) - logger.error("Only databases generated by Macaron version %s are supported.", mcn_version) + logger.error( + "Only databases generated by Macaron version %s are supported.", + mcn_version, + ) sys.exit(os.EX_DATAERR) @@ -176,4 +184,7 @@ def run_policy_engine(database_path: str, policy_content: str) -> dict: logger.info("Policy results:\n%s", "\n".join(output)) + rich_handler = access_handler.get_handler() + rich_handler.update_policy_engine(res) + return res diff --git a/src/macaron/repo_finder/repo_finder.py b/src/macaron/repo_finder/repo_finder.py index 9017a4ae0..3bc0d1f6c 100644 --- a/src/macaron/repo_finder/repo_finder.py +++ b/src/macaron/repo_finder/repo_finder.py @@ -42,6 +42,7 @@ from macaron.config.defaults import defaults from macaron.config.global_config import global_config +from macaron.console import access_handler from macaron.errors import CloneError, RepoCheckOutError from macaron.repo_finder import repo_finder_pypi, to_domain_from_known_purl_types from macaron.repo_finder.commit_finder import find_commit, match_tags @@ -122,13 +123,17 @@ def find_repo( # Try to find the latest version repo. logger.debug("Could not find repo for PURL: %s", purl) + rich_handler = access_handler.get_handler() latest_version_purl = get_latest_purl_if_different(purl) if not latest_version_purl: logger.debug("Could not find newer PURL than provided: %s", purl) + rich_handler.add_description_table_content("Local Cloned Path:", "[red]Not Found[/]") + rich_handler.add_description_table_content("Remote Path:", "[red]Not Found[/]") return "", RepoFinderInfo.NO_NEWER_VERSION - found_repo, outcome = DepsDevRepoFinder().find_repo(latest_version_purl) if found_repo: + rich_handler.add_description_table_content("Local Cloned Path:", found_repo) + rich_handler.add_description_table_content("Remote Path:", found_repo) return found_repo, outcome if not found_repo: @@ -136,13 +141,17 @@ def find_repo( if not found_repo: logger.debug("Could not find repo from latest version of PURL: %s", latest_version_purl) + rich_handler.add_description_table_content("Local Cloned Path:", "[red]Not Found[/]") + rich_handler.add_description_table_content("Remote Path:", "[red]Not Found[/]") return "", RepoFinderInfo.LATEST_VERSION_INVALID return found_repo, outcome def find_repo_alternative( - purl: PackageURL, outcome: RepoFinderInfo, package_registries_info: list[PackageRegistryInfo] | None = None + purl: PackageURL, + outcome: RepoFinderInfo, + package_registries_info: list[PackageRegistryInfo] | None = None, ) -> tuple[str, RepoFinderInfo]: """Use PURL type specific methods to find the repository when the standard methods have failed. @@ -279,7 +288,12 @@ def find_source(purl_string: str, input_repo: str | None, latest_version_fallbac repo_dir = os.path.join(global_config.output_path, GIT_REPOS_DIR) logging.getLogger("macaron.slsa_analyzer.git_url").disabled = True # The prepare_repo function will also check the latest version of the artifact if required. - git_obj, _ = prepare_repo(repo_dir, found_repo, purl=purl, latest_version_fallback=not checked_latest_purl) + git_obj, _ = prepare_repo( + repo_dir, + found_repo, + purl=purl, + latest_version_fallback=not checked_latest_purl, + ) if git_obj: digest = git_obj.get_head().hash @@ -316,12 +330,22 @@ def find_source(purl_string: str, input_repo: str | None, latest_version_fallbac return find_source(str(purl), latest_repo, False) + rich_handler = access_handler.get_handler() if not input_repo: logger.info("Found repository for PURL: %s", found_repo) + rich_handler.update_find_source_table("Repository URL:", found_repo) + else: + rich_handler.update_find_source_table("Repository URL:", input_repo) logger.info("Found commit for PURL: %s", digest) + rich_handler.update_find_source_table("Commit Hash:", digest) - if not generate_report(purl_string, digest, found_repo, os.path.join(global_config.output_path, "reports")): + if not generate_report( + purl_string, + digest, + found_repo, + os.path.join(global_config.output_path, "reports"), + ): return False return True @@ -381,7 +405,9 @@ def get_latest_repo_if_different(latest_version_purl: PackageURL, original_repo: if check_repo_urls_are_equivalent(original_repo, latest_repo): logger.error( - "Repository from latest PURL is equivalent to original repository: %s ~= %s", latest_repo, original_repo + "Repository from latest PURL is equivalent to original repository: %s ~= %s", + latest_repo, + original_repo, ) return "" @@ -427,6 +453,7 @@ def prepare_repo( tuple[Git | None, CommitFinderInfo] The pydriller.Git object of the repository or None if error, and the outcome of the Commit Finder. """ + rich_handler = access_handler.get_handler() # TODO: separate the logic for handling remote and local repos instead of putting them into this method. logger.info( "Preparing the repository for the analysis (path=%s, branch=%s, digest=%s)", @@ -434,6 +461,7 @@ def prepare_repo( branch_name, digest, ) + rich_handler.add_description_table_content("Remote Path:", repo_path) is_remote = is_remote_repo(repo_path) commit_finder_outcome = CommitFinderInfo.NOT_USED @@ -451,12 +479,14 @@ def prepare_repo( logger.info("Cloning the repository.") try: git_service.clone_repo(resolved_local_path, resolved_remote_path) + rich_handler.add_description_table_content("Local Cloned Path:", repo_unique_path) except CloneError as error: logger.error("Cannot clone %s: %s", resolved_remote_path, str(error)) return None, commit_finder_outcome else: logger.info("Checking if the path to repo %s is a local path.", repo_path) resolved_local_path = resolve_local_path(get_local_repos_path(), repo_path) + rich_handler.add_description_table_content("Local Cloned Path:", resolved_local_path) if resolved_local_path: try: diff --git a/src/macaron/repo_finder/repo_utils.py b/src/macaron/repo_finder/repo_utils.py index f246b98a0..507442498 100644 --- a/src/macaron/repo_finder/repo_utils.py +++ b/src/macaron/repo_finder/repo_utils.py @@ -13,6 +13,7 @@ from pydriller import Git from macaron.config.global_config import global_config +from macaron.console import access_handler from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR, decode_git_tags, parse_git_tags @@ -88,6 +89,9 @@ def generate_report(purl: str, commit: str, repo: str, target_dir: str) -> bool: logger.info("Report written to: %s", os.path.relpath(fullpath, os.getcwd())) + rich_handler = access_handler.get_handler() + rich_handler.update_find_source_table("JSON Report:", os.path.relpath(fullpath, os.getcwd())) + return True diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index e013f8411..774ec4c66 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -25,7 +25,12 @@ ) from macaron.config.global_config import global_config from macaron.config.target_config import Configuration -from macaron.database.database_manager import DatabaseManager, get_db_manager, get_db_session +from macaron.console import access_handler +from macaron.database.database_manager import ( + DatabaseManager, + get_db_manager, + get_db_session, +) from macaron.database.table_definitions import ( Analysis, Component, @@ -107,11 +112,18 @@ def __init__(self, output_path: str, build_log_path: str) -> None: logger.error("Cannot start the analysis. Exiting ...") sys.exit(1) + excluded_checks = [check for check in registry.get_all_checks_mapping() if check not in registry.checks_to_run] logger.info( "The following checks are excluded based on the user configuration: %s", - [check for check in registry.get_all_checks_mapping() if check not in registry.checks_to_run], + excluded_checks, + ) + self.rich_handler = access_handler.get_handler() + self.rich_handler.add_description_table_content( + "Excluded Checks:", + ", ".join(excluded_checks) if excluded_checks else "None", ) logger.info("The following checks will be run: %s", registry.checks_to_run) + self.rich_handler.add_description_table_content("Final Checks:", "\n".join(registry.checks_to_run)) self.output_path = output_path @@ -217,7 +229,10 @@ def run( ) else: # Can't reach here. - logger.critical("Expecting deps depth to be '0', '1' or '-1', got %s", deps_depth) + logger.critical( + "Expecting deps depth to be '0', '1' or '-1', got %s", + deps_depth, + ) return os.EX_USAGE # Merge the automatically resolved dependencies with the manual configuration. @@ -297,7 +312,9 @@ def generate_reports(self, report: Report) -> None: return output_target_path = os.path.join( - global_config.output_path, "reports", report.root_record.context.component.report_dir_name + global_config.output_path, + "reports", + report.root_record.context.component.report_dir_name, ) os.makedirs(output_target_path, exist_ok=True) @@ -484,6 +501,7 @@ def run_single( logger.info("Analyzing %s", repo_id) logger.info("With PURL: %s", component.purl) logger.info("=====================================") + self.rich_handler.add_description_table_content("Package URL:", component.purl) analyze_ctx = self.create_analyze_ctx(component) analyze_ctx.dynamic_data["expectation"] = self.expectations.get_expectation_for_target( @@ -555,7 +573,10 @@ def run_single( slsa_version = extract_predicate_version(provenance_payload) slsa_level = determine_provenance_slsa_level( - analyze_ctx, provenance_payload, provenance_is_verified, provenance_l3_verified + analyze_ctx, + provenance_payload, + provenance_is_verified, + provenance_l3_verified, ) analyze_ctx.dynamic_data["provenance_info"] = Provenance( @@ -673,6 +694,14 @@ def add_repository(self, branch_name: str | None, git_obj: Git) -> Repository | commit_date_str, ) + self.rich_handler.add_description_table_content("Branch:", res_branch if res_branch else "None") + self.rich_handler.add_description_table_content( + "Commit Hash:", commit_sha if commit_sha else "[red]Not Found[/]" + ) + self.rich_handler.add_description_table_content( + "Commit Date:", commit_date_str if commit_date_str else "[red]Not Found[/]" + ) + return repository class AnalysisTarget(NamedTuple): @@ -750,7 +779,8 @@ def add_component( is not None ): raise DuplicateCmpError( - f"{analysis_target.repo_path} is already analyzed.", context=existing_record.context + f"{analysis_target.repo_path} is already analyzed.", + context=existing_record.context, ) repository = self.add_repository(analysis_target.branch, git_obj) @@ -759,6 +789,9 @@ def add_component( # software component. If this happens, we don't raise error and treat the software component as if it # does not have any ``Repository`` attached to it. repository = None + self.rich_handler.add_description_table_content("Branch:", "[red]Not Found[/]") + self.rich_handler.add_description_table_content("Commit Hash:", "[red]Not Found[/]") + self.rich_handler.add_description_table_content("Commit Date:", "[red]Not Found[/]") if not analysis_target.parsed_purl: # If the PURL is not available. This will only mean that the user don't provide PURL but only provide the @@ -973,10 +1006,15 @@ def _determine_git_service(self, analyze_ctx: AnalyzeContext) -> BaseGitService: git_service = get_git_service(remote_path) if isinstance(git_service, NoneGitService): - logger.info("Unable to find repository or unsupported git service for %s", analyze_ctx.component.purl) + logger.info( + "Unable to find repository or unsupported git service for %s", + analyze_ctx.component.purl, + ) else: logger.info( - "Detected git service %s for %s.", git_service.name, analyze_ctx.component.repository.complete_name + "Detected git service %s for %s.", + git_service.name, + analyze_ctx.component.repository.complete_name, ) analyze_ctx.dynamic_data["git_service"] = git_service @@ -988,7 +1026,9 @@ def _determine_build_tools(self, analyze_ctx: AnalyzeContext, git_service: BaseG build_tool.load_defaults() if build_tool.purl_type == analyze_ctx.component.type: logger.debug( - "Found %s build tool based on the %s PackageURL.", build_tool.name, analyze_ctx.component.purl + "Found %s build tool based on the %s PackageURL.", + build_tool.name, + analyze_ctx.component.purl, ) analyze_ctx.dynamic_data["build_spec"]["purl_tools"].append(build_tool) @@ -1016,10 +1056,23 @@ def _determine_build_tools(self, analyze_ctx: AnalyzeContext, git_service: BaseG ) else: logger.info("Unable to discover build tools because repository is None.") + self.rich_handler.add_description_table_content( + "Build Tools:", + "[red]Not Found[/]", + ) + else: + self.rich_handler.add_description_table_content( + "Build Tools:", + "\n".join([build_tool.name for build_tool in analyze_ctx.dynamic_data["build_spec"]["tools"]]), + ) def _determine_ci_services(self, analyze_ctx: AnalyzeContext, git_service: BaseGitService) -> None: """Determine the CI services used by the software component.""" if isinstance(git_service, NoneGitService): + self.rich_handler.add_description_table_content( + "CI Services:", + "[red]Not Found[/]", + ) return # Determine the CI services. @@ -1052,6 +1105,17 @@ def _determine_ci_services(self, analyze_ctx: AnalyzeContext, git_service: BaseG ) ) + if analyze_ctx.dynamic_data["ci_services"]: + self.rich_handler.add_description_table_content( + "CI Services:", + "\n".join([ci_service["service"].name for ci_service in analyze_ctx.dynamic_data["ci_services"]]), + ) + else: + self.rich_handler.add_description_table_content( + "CI Services:", + "[red]Not Found[/]", + ) + def _populate_package_registry_info(self) -> list[PackageRegistryInfo]: """Add all possible package registries to the analysis context.""" package_registries = [] @@ -1070,7 +1134,9 @@ def _populate_package_registry_info(self) -> list[PackageRegistryInfo]: return package_registries def _determine_package_registries( - self, analyze_ctx: AnalyzeContext, package_registries_info: list[PackageRegistryInfo] + self, + analyze_ctx: AnalyzeContext, + package_registries_info: list[PackageRegistryInfo], ) -> None: """Determine the package registries used by the software component based on its build tools.""" build_tools = ( diff --git a/src/macaron/slsa_analyzer/checks/base_check.py b/src/macaron/slsa_analyzer/checks/base_check.py index b1912018e..53f857828 100644 --- a/src/macaron/slsa_analyzer/checks/base_check.py +++ b/src/macaron/slsa_analyzer/checks/base_check.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the BaseCheck class to be inherited by other concrete Checks.""" @@ -6,6 +6,7 @@ import logging from abc import abstractmethod +from macaron.console import access_handler from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.checks.check_result import ( CheckInfo, @@ -49,7 +50,9 @@ def __init__( The status for this check when it's skipped based on another check's result. """ self._check_info = CheckInfo( - check_id=check_id, check_description=description, eval_reqs=eval_reqs if eval_reqs else [] + check_id=check_id, + check_description=description, + eval_reqs=eval_reqs if eval_reqs else [], ) if not depends_on: @@ -58,6 +61,7 @@ def __init__( self._depends_on = depends_on self._result_on_skip = result_on_skip + self.rich_handler = access_handler.get_handler() @property def check_info(self) -> CheckInfo: @@ -92,10 +96,13 @@ def run(self, target: AnalyzeContext, skipped_info: SkippedInfo | None = None) - CheckResult The result of the check. """ + self.rich_handler = access_handler.get_handler() logger.info("----------------------------------") logger.info("BEGIN CHECK: %s", self.check_info.check_id) logger.info("----------------------------------") + self.rich_handler.update_checks(self.check_info.check_id) + check_result_data: CheckResultData if skipped_info: @@ -129,6 +136,11 @@ def run(self, target: AnalyzeContext, skipped_info: SkippedInfo | None = None) - justification_str, ) + self.rich_handler.update_checks( + self.check_info.check_id, + check_result_data.result_type.value, + ) + return CheckResult(check=self.check_info, result=check_result_data) @abstractmethod diff --git a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py index fb363dfb0..65cbf2961 100644 --- a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py +++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py @@ -82,8 +82,7 @@ class DetectMaliciousMetadataCheck(BaseCheck): def __init__(self) -> None: """Initialize a check instance.""" check_id = "mcn_detect_malicious_metadata_1" - description = """Check if the package is malicious. - """ + description = """Check if the package is malicious.""" super().__init__(check_id=check_id, description=description, eval_reqs=[]) def _should_skip( diff --git a/src/macaron/slsa_analyzer/checks/infer_artifact_pipeline_check.py b/src/macaron/slsa_analyzer/checks/infer_artifact_pipeline_check.py index c02fa8380..a33ba4586 100644 --- a/src/macaron/slsa_analyzer/checks/infer_artifact_pipeline_check.py +++ b/src/macaron/slsa_analyzer/checks/infer_artifact_pipeline_check.py @@ -89,11 +89,8 @@ class ArtifactPipelineCheck(BaseCheck): def __init__(self) -> None: """Initialize the InferArtifactPipeline instance.""" check_id = "mcn_find_artifact_pipeline_1" - description = """ - Detects pipelines from which an artifact is published. - - When a verifiable provenance is found for an artifact, we use it to obtain the pipeline trigger. - """ + description = """Detects pipelines from which an artifact is published. +When a verifiable provenance is found for an artifact, we use it to obtain the pipeline trigger.""" depends_on: list[tuple[str, CheckResultType]] = [("mcn_build_as_code_1", CheckResultType.PASSED)] eval_reqs: list[ReqName] = [] super().__init__( diff --git a/src/macaron/slsa_analyzer/registry.py b/src/macaron/slsa_analyzer/registry.py index 3d8b6000f..868f88a4c 100644 --- a/src/macaron/slsa_analyzer/registry.py +++ b/src/macaron/slsa_analyzer/registry.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the Registry class for loading checks.""" @@ -14,6 +14,7 @@ from typing import Any, TypeVar from macaron.config.defaults import defaults +from macaron.console import access_handler from macaron.errors import CheckRegistryError from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.checks.base_check import BaseCheck @@ -51,6 +52,7 @@ def __init__(self) -> None: self.check_tree: CheckTree = {} self.execution_order: list[str] = [] + self.rich_handler = access_handler.get_handler() def register(self, check: BaseCheck) -> None: """Register the check. @@ -76,7 +78,10 @@ def register(self, check: BaseCheck) -> None: else: for parent_relationship in check.depends_on: if not self._add_relationship_entry(check.check_info.check_id, parent_relationship): - logger.error("Cannot load relationships of check %s.", check.check_info.check_id) + logger.error( + "Cannot load relationships of check %s.", + check.check_info.check_id, + ) sys.exit(1) self._all_checks_mapping[check.check_info.check_id] = check @@ -169,7 +174,10 @@ def _validate_check(check: Any) -> bool: if check_file_abs_path: if not (hasattr(check, "result_on_skip") and isinstance(check.result_on_skip, CheckResultType)): - logger.error("The status_on_skipped in the Check at %s is invalid.", str(check.check_info.check_id)) + logger.error( + "The status_on_skipped in the Check at %s is invalid.", + str(check.check_info.check_id), + ) return False if not Registry._validate_check_id_format(check.check_info.check_id): @@ -461,11 +469,18 @@ def scan(self, target: AnalyzeContext) -> dict[str, CheckResult]: results: dict[str, CheckResult] = {} skipped_checks: list[SkippedInfo] = [] + self.rich_handler = access_handler.get_handler() + self.rich_handler.no_of_checks(len(registry.checks_to_run)) + for check_id in self.execution_order: check = all_checks.get(check_id) if not check: - logger.error("Check %s is not defined yet. Please add the implementation for %s.", check_id, check_id) + logger.error( + "Check %s is not defined yet. Please add the implementation for %s.", + check_id, + check_id, + ) results[check_id] = CheckResult( check=CheckInfo( check_id=check_id, @@ -495,7 +510,11 @@ def scan(self, target: AnalyzeContext) -> dict[str, CheckResult]: try: results[check_id] = check.run(target, skipped_info) except Exception as exc: # pylint: disable=broad-exception-caught - logger.error("Exception in check %s: %s. Run in verbose mode to get more information.", check_id, exc) + logger.error( + "Exception in check %s: %s. Run in verbose mode to get more information.", + check_id, + exc, + ) logger.debug(traceback.format_exc()) logger.info("Check %s has failed.", check_id) return results @@ -598,7 +617,10 @@ def _should_skip_check(check: BaseCheck, results: dict[str, CheckResult]) -> Ski f"Check {check.check_info.check_id} is set to {check.result_on_skip.value} " f"because {parent_id} {got_status.value}." ) - skipped_info = SkippedInfo(check_id=check.check_info.check_id, suppress_comment=suppress_comment) + skipped_info = SkippedInfo( + check_id=check.check_info.check_id, + suppress_comment=suppress_comment, + ) return skipped_info return None