From 52f1b3061529aa02a7c371d34b4e236f59cb52f5 Mon Sep 17 00:00:00 2001 From: Everett Hildenbrandt Date: Fri, 8 Mar 2024 00:23:51 +0000 Subject: [PATCH] Revert "Improve default argument handling (#916)" This reverts commit 0eb8d49e4aea90cf01ed1937a6ff61dc5f875301. --- src/pyk/__main__.py | 351 ++++++++++++++++++++++++++++++++--- src/pyk/cli/args.py | 332 ++++++++++----------------------- src/pyk/cli/cli.py | 110 ----------- src/pyk/cli/pyk.py | 381 -------------------------------------- src/pyk/cli/utils.py | 3 +- src/pyk/kdist/__main__.py | 207 ++++++++------------- 6 files changed, 505 insertions(+), 879 deletions(-) delete mode 100644 src/pyk/cli/cli.py delete mode 100644 src/pyk/cli/pyk.py diff --git a/src/pyk/__main__.py b/src/pyk/__main__.py index 3ee7c734a..84d340ca6 100644 --- a/src/pyk/__main__.py +++ b/src/pyk/__main__.py @@ -1,52 +1,345 @@ from __future__ import annotations +import json import logging import sys +from argparse import ArgumentParser, FileType +from enum import Enum +from pathlib import Path from typing import TYPE_CHECKING -from .cli.args import LoggingOptions -from .cli.cli import CLI -from .cli.pyk import ( - CoverageCommand, - GraphImportsCommand, - JsonToKoreCommand, - KoreToJsonCommand, - PrintCommand, - ProveCommand, - RPCKastCommand, - RPCPrintCommand, +from graphviz import Digraph + +from pyk.kast.inner import KInner +from pyk.kore.rpc import ExecuteResult + +from .cli.args import KCLIArgs +from .cli.utils import LOG_FORMAT, dir_path, loglevel +from .coverage import get_rule_by_id, strip_coverage_logger +from .cterm import CTerm +from .kast.manip import ( + flatten_label, + minimize_rule, + minimize_term, + propagate_up_constraints, + remove_source_map, + split_config_and_constraints, ) -from .cli.utils import LOG_FORMAT, loglevel +from .kast.outer import read_kast_definition +from .kast.pretty import PrettyPrinter +from .kore.parser import KoreParser +from .kore.rpc import StopReason +from .kore.syntax import Pattern, kore_term +from .ktool.kprint import KPrint +from .ktool.kprove import KProve +from .prelude.k import GENERATED_TOP_CELL +from .prelude.ml import is_top, mlAnd, mlOr if TYPE_CHECKING: - from typing import Final + from argparse import Namespace + from typing import Any, Final _LOGGER: Final = logging.getLogger(__name__) +class PrintInput(Enum): + KORE_JSON = 'kore-json' + KAST_JSON = 'kast-json' + + def main() -> None: # KAST terms can end up nested quite deeply, because of the various assoc operators (eg. _Map_, _Set_, ...). # Most pyk operations are defined recursively, meaning you get a callstack the same depth as the term. # This change makes it so that in most cases, by default, pyk doesn't run out of stack space. sys.setrecursionlimit(10**7) - cli = CLI( - [ - CoverageCommand, - GraphImportsCommand, - JsonToKoreCommand, - KoreToJsonCommand, - PrintCommand, - ProveCommand, - RPCKastCommand, - RPCPrintCommand, - ] - ) - command = cli.get_command() - assert isinstance(command, LoggingOptions) - logging.basicConfig(level=loglevel(command), format=LOG_FORMAT) - command.exec() + cli_parser = create_argument_parser() + args = cli_parser.parse_args() + + logging.basicConfig(level=loglevel(args), format=LOG_FORMAT) + + executor_name = 'exec_' + args.command.lower().replace('-', '_') + if executor_name not in globals(): + raise AssertionError(f'Unimplemented command: {args.command}') + + execute = globals()[executor_name] + execute(args) + + +def exec_print(args: Namespace) -> None: + kompiled_dir: Path = args.definition_dir + printer = KPrint(kompiled_dir) + if args.input == PrintInput.KORE_JSON: + _LOGGER.info(f'Reading Kore JSON from file: {args.term.name}') + kore = Pattern.from_json(args.term.read()) + term = printer.kore_to_kast(kore) + else: + _LOGGER.info(f'Reading Kast JSON from file: {args.term.name}') + term = KInner.from_json(args.term.read()) + if is_top(term): + args.output_file.write(printer.pretty_print(term)) + _LOGGER.info(f'Wrote file: {args.output_file.name}') + else: + if args.minimize: + if args.omit_labels != '' and args.keep_cells != '': + raise ValueError('You cannot use both --omit-labels and --keep-cells.') + + abstract_labels = args.omit_labels.split(',') if args.omit_labels != '' else [] + keep_cells = args.keep_cells.split(',') if args.keep_cells != '' else [] + minimized_disjuncts = [] + + for disjunct in flatten_label('#Or', term): + try: + minimized = minimize_term(disjunct, abstract_labels=abstract_labels, keep_cells=keep_cells) + config, constraint = split_config_and_constraints(minimized) + except ValueError as err: + raise ValueError('The minimized term does not contain a config cell.') from err + + if not is_top(constraint): + minimized_disjuncts.append(mlAnd([config, constraint], sort=GENERATED_TOP_CELL)) + else: + minimized_disjuncts.append(config) + term = propagate_up_constraints(mlOr(minimized_disjuncts, sort=GENERATED_TOP_CELL)) + + args.output_file.write(printer.pretty_print(term)) + _LOGGER.info(f'Wrote file: {args.output_file.name}') + + +def exec_rpc_print(args: Namespace) -> None: + kompiled_dir: Path = args.definition_dir + printer = KPrint(kompiled_dir) + input_dict = json.loads(args.input_file.read()) + output_buffer = [] + + def pretty_print_request(request_params: dict[str, Any]) -> list[str]: + output_buffer = [] + non_state_keys = set(request_params.keys()).difference(['state']) + for key in non_state_keys: + output_buffer.append(f'{key}: {request_params[key]}') + state = CTerm.from_kast(printer.kore_to_kast(kore_term(request_params['state']))) + output_buffer.append('State:') + output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) + return output_buffer + + def pretty_print_execute_response(execute_result: ExecuteResult) -> list[str]: + output_buffer = [] + output_buffer.append(f'Depth: {execute_result.depth}') + output_buffer.append(f'Stop reason: {execute_result.reason.value}') + if execute_result.reason == StopReason.TERMINAL_RULE or execute_result.reason == StopReason.CUT_POINT_RULE: + output_buffer.append(f'Stop rule: {execute_result.rule}') + output_buffer.append( + f'Number of next states: {len(execute_result.next_states) if execute_result.next_states is not None else 0}' + ) + state = CTerm.from_kast(printer.kore_to_kast(execute_result.state.kore)) + output_buffer.append('State:') + output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) + if execute_result.next_states is not None: + next_states = [CTerm.from_kast(printer.kore_to_kast(s.kore)) for s in execute_result.next_states] + for i, s in enumerate(next_states): + output_buffer.append(f'Next state #{i}:') + output_buffer.append(printer.pretty_print(s.kast, sort_collections=True)) + return output_buffer + + try: + if 'method' in input_dict: + output_buffer.append('JSON RPC request') + output_buffer.append(f'id: {input_dict["id"]}') + output_buffer.append(f'Method: {input_dict["method"]}') + try: + if 'state' in input_dict['params']: + output_buffer += pretty_print_request(input_dict['params']) + else: # this is an "add-module" request, skip trying to print state + for key in input_dict['params'].keys(): + output_buffer.append(f'{key}: {input_dict["params"][key]}') + except KeyError as e: + _LOGGER.critical(f'Could not find key {str(e)} in input JSON file') + exit(1) + else: + if not 'result' in input_dict: + _LOGGER.critical('The input is neither a request not a resonse') + exit(1) + output_buffer.append('JSON RPC Response') + output_buffer.append(f'id: {input_dict["id"]}') + if list(input_dict['result'].keys()) == ['state']: # this is a "simplify" response + output_buffer.append('Method: simplify') + state = CTerm.from_kast(printer.kore_to_kast(kore_term(input_dict['result']['state']))) + output_buffer.append('State:') + output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) + elif list(input_dict['result'].keys()) == ['module']: # this is an "add-module" response + output_buffer.append('Method: add-module') + output_buffer.append('Module:') + output_buffer.append(input_dict['result']['module']) + else: + try: # assume it is an "execute" response + output_buffer.append('Method: execute') + execute_result = ExecuteResult.from_dict(input_dict['result']) + output_buffer += pretty_print_execute_response(execute_result) + except KeyError as e: + _LOGGER.critical(f'Could not find key {str(e)} in input JSON file') + exit(1) + if args.output_file is not None: + args.output_file.write('\n'.join(output_buffer)) + else: + print('\n'.join(output_buffer)) + except ValueError as e: + # shorten and print the error message in case kore_to_kast throws ValueError + _LOGGER.critical(str(e)[:200]) + exit(1) + + +def exec_rpc_kast(args: Namespace) -> None: + """ + Convert an 'execute' JSON RPC response to a new 'execute' or 'simplify' request, + copying parameters from a reference request. + """ + reference_request = json.loads(args.reference_request_file.read()) + input_dict = json.loads(args.response_file.read()) + execute_result = ExecuteResult.from_dict(input_dict['result']) + non_state_keys = set(reference_request['params'].keys()).difference(['state']) + request_params = {} + for key in non_state_keys: + request_params[key] = reference_request['params'][key] + request_params['state'] = {'format': 'KORE', 'version': 1, 'term': execute_result.state.kore.dict} + request = { + 'jsonrpc': reference_request['jsonrpc'], + 'id': reference_request['id'], + 'method': reference_request['method'], + 'params': request_params, + } + args.output_file.write(json.dumps(request)) + + +def exec_prove(args: Namespace) -> None: + kompiled_dir: Path = args.definition_dir + kprover = KProve(kompiled_dir, args.main_file) + final_state = kprover.prove(Path(args.spec_file), spec_module_name=args.spec_module, args=args.kArgs) + args.output_file.write(json.dumps(mlOr([state.kast for state in final_state]).to_dict())) + _LOGGER.info(f'Wrote file: {args.output_file.name}') + + +def exec_graph_imports(args: Namespace) -> None: + kompiled_dir: Path = args.definition_dir + kprinter = KPrint(kompiled_dir) + definition = kprinter.definition + import_graph = Digraph() + graph_file = kompiled_dir / 'import-graph' + for module in definition.modules: + module_name = module.name + import_graph.node(module_name) + for module_import in module.imports: + import_graph.edge(module_name, module_import.name) + import_graph.render(graph_file) + _LOGGER.info(f'Wrote file: {graph_file}') + + +def exec_coverage(args: Namespace) -> None: + kompiled_dir: Path = args.definition_dir + definition = remove_source_map(read_kast_definition(kompiled_dir / 'compiled.json')) + pretty_printer = PrettyPrinter(definition) + for rid in args.coverage_file: + rule = minimize_rule(strip_coverage_logger(get_rule_by_id(definition, rid.strip()))) + args.output.write('\n\n') + args.output.write('Rule: ' + rid.strip()) + args.output.write('\nUnparsed:\n') + args.output.write(pretty_printer.print(rule)) + _LOGGER.info(f'Wrote file: {args.output.name}') + + +def exec_kore_to_json(args: Namespace) -> None: + text = sys.stdin.read() + kore = KoreParser(text).pattern() + print(kore.json) + + +def exec_json_to_kore(args: dict[str, Any]) -> None: + text = sys.stdin.read() + kore = Pattern.from_json(text) + kore.write(sys.stdout) + sys.stdout.write('\n') + + +def create_argument_parser() -> ArgumentParser: + k_cli_args = KCLIArgs() + + definition_args = ArgumentParser(add_help=False) + definition_args.add_argument('definition_dir', type=dir_path, help='Path to definition directory.') + + pyk_args = ArgumentParser() + pyk_args_command = pyk_args.add_subparsers(dest='command', required=True) + + print_args = pyk_args_command.add_parser( + 'print', + help='Pretty print a term.', + parents=[k_cli_args.logging_args, definition_args, k_cli_args.display_args], + ) + print_args.add_argument('term', type=FileType('r'), help='Input term (in format specified with --input).') + print_args.add_argument('--input', default=PrintInput.KAST_JSON, type=PrintInput, choices=list(PrintInput)) + print_args.add_argument('--omit-labels', default='', nargs='?', help='List of labels to omit from output.') + print_args.add_argument( + '--keep-cells', default='', nargs='?', help='List of cells with primitive values to keep in output.' + ) + print_args.add_argument('--output-file', type=FileType('w'), default='-') + + rpc_print_args = pyk_args_command.add_parser( + 'rpc-print', + help='Pretty-print an RPC request/response', + parents=[k_cli_args.logging_args, definition_args], + ) + rpc_print_args.add_argument( + 'input_file', + type=FileType('r'), + help='An input file containing the JSON RPC request or response with KoreJSON payload.', + ) + rpc_print_args.add_argument('--output-file', type=FileType('w'), default='-') + + rpc_kast_args = pyk_args_command.add_parser( + 'rpc-kast', + help='Convert an "execute" JSON RPC response to a new "execute" or "simplify" request, copying parameters from a reference request.', + parents=[k_cli_args.logging_args], + ) + rpc_kast_args.add_argument( + 'reference_request_file', + type=FileType('r'), + help='An input file containing a JSON RPC request to server as a reference for the new request.', + ) + rpc_kast_args.add_argument( + 'response_file', + type=FileType('r'), + help='An input file containing a JSON RPC response with KoreJSON payload.', + ) + rpc_kast_args.add_argument('--output-file', type=FileType('w'), default='-') + + prove_args = pyk_args_command.add_parser( + 'prove', + help='Prove an input specification (using kprovex).', + parents=[k_cli_args.logging_args, definition_args], + ) + prove_args.add_argument('main_file', type=str, help='Main file used for kompilation.') + prove_args.add_argument('spec_file', type=str, help='File with the specification module.') + prove_args.add_argument('spec_module', type=str, help='Module with claims to be proven.') + prove_args.add_argument('--output-file', type=FileType('w'), default='-') + prove_args.add_argument('kArgs', nargs='*', help='Arguments to pass through to K invocation.') + + pyk_args_command.add_parser( + 'graph-imports', + help='Graph the imports of a given definition.', + parents=[k_cli_args.logging_args, definition_args], + ) + + coverage_args = pyk_args_command.add_parser( + 'coverage', + help='Convert coverage file to human readable log.', + parents=[k_cli_args.logging_args, definition_args], + ) + coverage_args.add_argument('coverage_file', type=FileType('r'), help='Coverage file to build log for.') + coverage_args.add_argument('-o', '--output', type=FileType('w'), default='-') + + pyk_args_command.add_parser('kore-to-json', help='Convert textual KORE to JSON', parents=[k_cli_args.logging_args]) + + pyk_args_command.add_parser('json-to-kore', help='Convert JSON to textual KORE', parents=[k_cli_args.logging_args]) + + return pyk_args if __name__ == '__main__': diff --git a/src/pyk/cli/args.py b/src/pyk/cli/args.py index 7af8b4d99..47b633cf6 100644 --- a/src/pyk/cli/args.py +++ b/src/pyk/cli/args.py @@ -1,287 +1,155 @@ from __future__ import annotations -import sys -from argparse import FileType -from typing import IO, TYPE_CHECKING, Any +from argparse import ArgumentParser +from functools import cached_property +from typing import TYPE_CHECKING -from pyk.utils import ensure_dir_path - -from .cli import Options +from ..utils import ensure_dir_path from .utils import bug_report_arg, dir_path, file_path if TYPE_CHECKING: - from argparse import ArgumentParser - from pathlib import Path from typing import TypeVar - from ..utils import BugReport - T = TypeVar('T') -class LoggingOptions(Options): - debug: bool - verbose: bool - - @staticmethod - def default() -> dict[str, Any]: - return { - 'verbose': False, - 'debug': False, - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output.') - parser.add_argument('--debug', action='store_true', help='Debug output.') - - -class OutputFileOptions(Options): - output_file: IO[Any] - - @staticmethod - def default() -> dict[str, Any]: - return { - 'output_file': sys.stdout, - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument('--output-file', type=FileType('w')) - - -class DefinitionOptions(Options): - definition_dir: Path - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument('definition_dir', type=dir_path, help='Path to definition directory.') - - -class DisplayOptions(Options): - minimize: bool - - @staticmethod - def default() -> dict[str, Any]: - return { - 'minimize': True, - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument('--minimize', dest='minimize', action='store_true', help='Minimize output.') - parser.add_argument('--no-minimize', dest='minimize', action='store_false', help='Do not minimize output.') - - -class KDefinitionOptions(Options): - includes: list[str] - main_module: str | None - syntax_module: str | None - spec_module: str | None - definition_dir: Path | None - md_selector: str - - @staticmethod - def default() -> dict[str, Any]: - return { - 'spec_module': None, - 'main_module': None, - 'syntax_module': None, - 'definition_dir': None, - 'md_selector': 'k', - 'includes': [], - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument( - '-I', type=str, dest='includes', action='append', help='Directories to lookup K definitions in.' - ) - parser.add_argument('--main-module', type=str, help='Name of the main module.') - parser.add_argument('--syntax-module', type=str, help='Name of the syntax module.') - parser.add_argument('--spec-module', type=str, help='Name of the spec module.') - parser.add_argument('--definition', type=dir_path, dest='definition_dir', help='Path to definition to use.') - parser.add_argument( - '--md-selector', - type=str, - help='Code selector expression to use when reading markdown.', - ) - - -class SaveDirOptions(Options): - save_directory: Path | None - - @staticmethod - def default() -> dict[str, Any]: - return { - 'save_directory': None, - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument('--save-directory', type=ensure_dir_path, help='Path to where CFGs are stored.') - - -class SpecOptions(SaveDirOptions): - spec_file: Path - claim_labels: list[str] | None - exclude_claim_labels: list[str] - - @staticmethod - def default() -> dict[str, Any]: - return { - 'claim_labels': None, - 'exclude_claim_labels': [], - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument('spec_file', type=file_path, help='Path to spec file.') - parser.add_argument( - '--claim', - type=str, - dest='claim_labels', - action='append', - help='Only prove listed claims, MODULE_NAME.claim-id', - ) - parser.add_argument( - '--exclude-claim', - type=str, - dest='exclude_claim_labels', - action='append', - help='Skip listed claims, MODULE_NAME.claim-id', +class KCLIArgs: + @cached_property + def logging_args(self) -> ArgumentParser: + args = ArgumentParser(add_help=False) + args.add_argument('--verbose', '-v', default=False, action='store_true', help='Verbose output.') + args.add_argument('--debug', default=False, action='store_true', help='Debug output.') + return args + + @cached_property + def parallel_args(self) -> ArgumentParser: + args = ArgumentParser(add_help=False) + args.add_argument('--workers', '-j', default=1, type=int, help='Number of processes to run in parallel.') + return args + + @cached_property + def bug_report_args(self) -> ArgumentParser: + args = ArgumentParser(add_help=False) + args.add_argument( + '--bug-report', + type=bug_report_arg, + help='Generate bug report with given name', ) + return args - -class KompileOptions(Options): - emit_json: bool - ccopts: list[str] - llvm_kompile: bool - llvm_library: bool - enable_llvm_debug: bool - read_only: bool - o0: bool - o1: bool - o2: bool - o3: bool - - @staticmethod - def default() -> dict[str, Any]: - return { - 'emit_json': True, - 'llvm_kompile': False, - 'llvm_library': False, - 'enable_llvm_debug': False, - 'read_only': False, - 'o0': False, - 'o1': False, - 'o2': False, - 'o3': False, - 'ccopts': [], - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument( + @cached_property + def kompile_args(self) -> ArgumentParser: + args = ArgumentParser(add_help=False) + args.add_argument( '--emit-json', dest='emit_json', + default=True, action='store_true', help='Emit JSON definition after compilation.', ) - parser.add_argument( + args.add_argument( '--no-emit-json', dest='emit_json', action='store_false', help='Do not JSON definition after compilation.' ) - parser.add_argument( + args.add_argument( '-ccopt', dest='ccopts', + default=[], action='append', help='Additional arguments to pass to llvm-kompile.', ) - parser.add_argument( + args.add_argument( '--no-llvm-kompile', dest='llvm_kompile', + default=True, action='store_false', help='Do not run llvm-kompile process.', ) - parser.add_argument( + args.add_argument( '--with-llvm-library', dest='llvm_library', + default=False, action='store_true', help='Make kompile generate a dynamic llvm library.', ) - parser.add_argument( + args.add_argument( '--enable-llvm-debug', dest='enable_llvm_debug', + default=False, action='store_true', help='Make kompile generate debug symbols for llvm.', ) - parser.add_argument( + args.add_argument( '--read-only-kompiled-directory', dest='read_only', + default=False, action='store_true', help='Generated a kompiled directory that K will not attempt to write to afterwards.', ) - parser.add_argument('-O0', dest='o0', action='store_true', help='Optimization level 0.') - parser.add_argument('-O1', dest='o1', action='store_true', help='Optimization level 1.') - parser.add_argument('-O2', dest='o2', action='store_true', help='Optimization level 2.') - parser.add_argument('-O3', dest='o3', action='store_true', help='Optimization level 3.') - - -class ParallelOptions(Options): - workers: int - - @staticmethod - def default() -> dict[str, Any]: - return { - 'workers': 1, - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument('--workers', '-j', type=int, help='Number of processes to run in parallel.') - - -class BugReportOptions(Options): - bug_report: BugReport | None - - @staticmethod - def default() -> dict[str, Any]: - return {'bug_report': None} - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument( - '--bug-report', - type=bug_report_arg, - help='Generate bug report with given name', - ) - - -class SMTOptions(Options): - smt_timeout: int - smt_retry_limit: int - smt_tactic: str | None - - @staticmethod - def default() -> dict[str, Any]: - return { - 'smt_timeout': 300, - 'smt_retry_limit': 10, - 'smt_tactic': None, - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument('--smt-timeout', dest='smt_timeout', type=int, help='Timeout in ms to use for SMT queries.') - parser.add_argument( + args.add_argument('-O0', dest='o0', default=False, action='store_true', help='Optimization level 0.') + args.add_argument('-O1', dest='o1', default=False, action='store_true', help='Optimization level 1.') + args.add_argument('-O2', dest='o2', default=False, action='store_true', help='Optimization level 2.') + args.add_argument('-O3', dest='o3', default=False, action='store_true', help='Optimization level 3.') + return args + + @cached_property + def smt_args(self) -> ArgumentParser: + args = ArgumentParser(add_help=False) + args.add_argument('--smt-timeout', dest='smt_timeout', type=int, help='Timeout in ms to use for SMT queries.') + args.add_argument( '--smt-retry-limit', dest='smt_retry_limit', type=int, help='Number of times to retry SMT queries with scaling timeouts.', ) - parser.add_argument( + args.add_argument( '--smt-tactic', dest='smt_tactic', type=str, help='Z3 tactic to use when checking satisfiability. Example: (check-sat-using smt)', ) + return args + + @cached_property + def display_args(self) -> ArgumentParser: + args = ArgumentParser(add_help=False) + args.add_argument('--minimize', dest='minimize', default=True, action='store_true', help='Minimize output.') + args.add_argument('--no-minimize', dest='minimize', action='store_false', help='Do not minimize output.') + return args + + @cached_property + def definition_args(self) -> ArgumentParser: + args = ArgumentParser(add_help=False) + args.add_argument( + '-I', type=str, dest='includes', default=[], action='append', help='Directories to lookup K definitions in.' + ) + args.add_argument('--main-module', default=None, type=str, help='Name of the main module.') + args.add_argument('--syntax-module', default=None, type=str, help='Name of the syntax module.') + args.add_argument('--spec-module', default=None, type=str, help='Name of the spec module.') + args.add_argument('--definition', type=dir_path, dest='definition_dir', help='Path to definition to use.') + args.add_argument( + '--md-selector', + type=str, + help='Code selector expression to use when reading markdown.', + ) + return args + + @cached_property + def spec_args(self) -> ArgumentParser: + args = ArgumentParser(add_help=False) + args.add_argument('spec_file', type=file_path, help='Path to spec file.') + args.add_argument('--save-directory', type=ensure_dir_path, help='Path to where CFGs are stored.') + args.add_argument( + '--claim', + type=str, + dest='claim_labels', + action='append', + help='Only prove listed claims, MODULE_NAME.claim-id', + ) + args.add_argument( + '--exclude-claim', + type=str, + dest='exclude_claim_labels', + action='append', + help='Skip listed claims, MODULE_NAME.claim-id', + ) + return args diff --git a/src/pyk/cli/cli.py b/src/pyk/cli/cli.py deleted file mode 100644 index 1bbc5e2b4..000000000 --- a/src/pyk/cli/cli.py +++ /dev/null @@ -1,110 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod -from argparse import ArgumentParser -from collections.abc import Iterable -from typing import TYPE_CHECKING, Any - -if TYPE_CHECKING: - from argparse import _SubParsersAction - - -class CLI: - commands: list[type[Command]] - top_level_args: Iterable[type[Options]] - - # Input a list of all Command types to be used - def __init__(self, commands: Iterable[type[Command]], top_level_args: Iterable[type[Options]] = ()): - self.commands = list(commands) - self.top_level_args = top_level_args - - # Return an instance of the correct Options subclass by matching its name with the requested command - def generate_command(self, args: dict[str, Any]) -> Command: - command = args['command'].lower() - for cmd_type in self.commands: - if cmd_type.name() == command: - if issubclass(cmd_type, Options): - return cmd_type(args) - else: - return cmd_type() - raise ValueError(f'Unrecognized command: {command}') - - # Generate the parsers for all commands - def add_parsers(self, base: _SubParsersAction) -> _SubParsersAction: - for cmd_type in self.commands: - base = cmd_type.parser(base) - return base - - def create_argument_parser(self) -> ArgumentParser: - pyk_args = ArgumentParser(parents=[tla.all_args() for tla in self.top_level_args]) - pyk_args_command = pyk_args.add_subparsers(dest='command', required=True) - - pyk_args_command = self.add_parsers(pyk_args_command) - - return pyk_args - - def get_command(self) -> Command: - parser = self.create_argument_parser() - args = parser.parse_args() - stripped_args = { - key: val - for (key, val) in vars(args).items() - if val is not None and not (isinstance(val, Iterable) and not val) - } - return self.generate_command(stripped_args) - - def get_and_exec_command(self) -> None: - cmd = self.get_command() - cmd.exec() - - -class Command(ABC): - @staticmethod - @abstractmethod - def name() -> str: - ... - - @staticmethod - @abstractmethod - def help_str() -> str: - ... - - @abstractmethod - def exec(self) -> None: - ... - - @classmethod - def parser(cls, base: _SubParsersAction) -> _SubParsersAction: - all_args = [cls.all_args()] if issubclass(cls, Options) else [] - base.add_parser( - name=cls.name(), - help=cls.help_str(), - parents=all_args, - ) - return base - - -class Options: - def __init__(self, args: dict[str, Any]) -> None: - # Get defaults from this and all superclasses that define them, preferring the most specific class - defaults: dict[str, Any] = {} - for cl in reversed(type(self).mro()): - if hasattr(cl, 'default'): - defaults = defaults | cl.default() - - # Overwrite defaults with args from command line - _args = defaults | args - - for attr, val in _args.items(): - self.__setattr__(attr, val) - - @classmethod - def all_args(cls: type[Options]) -> ArgumentParser: - # Collect args from this and all superclasses - parser = ArgumentParser(add_help=False) - mro = cls.mro() - mro.reverse() - for cl in mro: - if hasattr(cl, 'update_args') and 'update_args' in cl.__dict__: - cl.update_args(parser) - return parser diff --git a/src/pyk/cli/pyk.py b/src/pyk/cli/pyk.py deleted file mode 100644 index 90c1c257d..000000000 --- a/src/pyk/cli/pyk.py +++ /dev/null @@ -1,381 +0,0 @@ -from __future__ import annotations - -import json -import sys -from argparse import FileType -from enum import Enum -from pathlib import Path -from typing import IO, TYPE_CHECKING, Any - -from graphviz import Digraph - -from pyk.coverage import get_rule_by_id, strip_coverage_logger -from pyk.cterm import CTerm -from pyk.kast.inner import KInner -from pyk.kast.manip import ( - flatten_label, - minimize_rule, - minimize_term, - propagate_up_constraints, - remove_source_map, - split_config_and_constraints, -) -from pyk.kast.outer import read_kast_definition -from pyk.kast.pretty import PrettyPrinter -from pyk.kore.parser import KoreParser -from pyk.kore.rpc import ExecuteResult, StopReason -from pyk.kore.syntax import Pattern, kore_term -from pyk.ktool.kprint import KPrint -from pyk.ktool.kprove import KProve -from pyk.prelude.k import GENERATED_TOP_CELL -from pyk.prelude.ml import is_top, mlAnd, mlOr -from pyk.utils import _LOGGER - -from .args import DefinitionOptions, DisplayOptions, LoggingOptions, OutputFileOptions -from .cli import Command - -if TYPE_CHECKING: - from argparse import ArgumentParser - from collections.abc import Iterable - - -class PrintInput(Enum): - KORE_JSON = 'kore-json' - KAST_JSON = 'kast-json' - - -class JsonToKoreCommand(Command, LoggingOptions): - @staticmethod - def name() -> str: - return 'json-to-kore' - - @staticmethod - def help_str() -> str: - return 'Convert JSON to textual KORE' - - def exec(self) -> None: - text = sys.stdin.read() - kore = Pattern.from_json(text) - kore.write(sys.stdout) - sys.stdout.write('\n') - - -class KoreToJsonCommand(Command, LoggingOptions): - @staticmethod - def name() -> str: - return 'kore-to-json' - - @staticmethod - def help_str() -> str: - return 'Convert textual KORE to JSON' - - def exec(self) -> None: - text = sys.stdin.read() - kore = KoreParser(text).pattern() - print(kore.json) - - -class CoverageCommand(Command, DefinitionOptions, OutputFileOptions, LoggingOptions): - coverage_file: IO[Any] - - @staticmethod - def name() -> str: - return 'coverage' - - @staticmethod - def help_str() -> str: - return 'Convert coverage file to human readable log.' - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument('coverage_file', type=FileType('r'), help='Coverage file to build log for.') - - def exec(self) -> None: - kompiled_dir: Path = self.definition_dir - definition = remove_source_map(read_kast_definition(kompiled_dir / 'compiled.json')) - pretty_printer = PrettyPrinter(definition) - for rid in self.coverage_file: - rule = minimize_rule(strip_coverage_logger(get_rule_by_id(definition, rid.strip()))) - self.output_file.write('\n\n') - self.output_file.write('Rule: ' + rid.strip()) - self.output_file.write('\nUnparsed:\n') - self.output_file.write(pretty_printer.print(rule)) - _LOGGER.info(f'Wrote file: {self.output_file.name}') - - -class GraphImportsCommand(Command, DefinitionOptions, LoggingOptions): - @staticmethod - def name() -> str: - return 'graph-imports' - - @staticmethod - def help_str() -> str: - return 'Graph the imports of a given definition.' - - def exec(self) -> None: - kompiled_dir: Path = self.definition_dir - kprinter = KPrint(kompiled_dir) - definition = kprinter.definition - import_graph = Digraph() - graph_file = kompiled_dir / 'import-graph' - for module in definition.modules: - module_name = module.name - import_graph.node(module_name) - for module_import in module.imports: - import_graph.edge(module_name, module_import.name) - import_graph.render(graph_file) - _LOGGER.info(f'Wrote file: {graph_file}') - - -class RPCKastCommand(Command, OutputFileOptions, LoggingOptions): - reference_request_file: IO[Any] - response_file: IO[Any] - - @staticmethod - def name() -> str: - return 'rpc-kast' - - @staticmethod - def help_str() -> str: - return 'Convert an "execute" JSON RPC response to a new "execute" or "simplify" request, copying parameters from a reference request.' - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument( - 'reference_request_file', - type=FileType('r'), - help='An input file containing a JSON RPC request to server as a reference for the new request.', - ) - parser.add_argument( - 'response_file', - type=FileType('r'), - help='An input file containing a JSON RPC response with KoreJSON payload.', - ) - - def exec(self) -> None: - """ - Convert an 'execute' JSON RPC response to a new 'execute' or 'simplify' request, - copying parameters from a reference request. - """ - reference_request = json.loads(self.reference_request_file.read()) - input_dict = json.loads(self.response_file.read()) - execute_result = ExecuteResult.from_dict(input_dict['result']) - non_state_keys = set(reference_request['params'].keys()).difference(['state']) - request_params = {} - for key in non_state_keys: - request_params[key] = reference_request['params'][key] - request_params['state'] = {'format': 'KORE', 'version': 1, 'term': execute_result.state.kore.dict} - request = { - 'jsonrpc': reference_request['jsonrpc'], - 'id': reference_request['id'], - 'method': reference_request['method'], - 'params': request_params, - } - self.output_file.write(json.dumps(request)) - - -class RPCPrintCommand(Command, DefinitionOptions, OutputFileOptions, LoggingOptions): - input_file: IO[Any] - - @staticmethod - def name() -> str: - return 'rpc-print' - - @staticmethod - def help_str() -> str: - return 'Pretty-print an RPC request/response' - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument( - 'input_file', - type=FileType('r'), - help='An input file containing the JSON RPC request or response with KoreJSON payload.', - ) - - def exec(self) -> None: - kompiled_dir: Path = self.definition_dir - printer = KPrint(kompiled_dir) - input_dict = json.loads(self.input_file.read()) - output_buffer = [] - - def pretty_print_request(request_params: dict[str, Any]) -> list[str]: - output_buffer = [] - non_state_keys = set(request_params.keys()).difference(['state']) - for key in non_state_keys: - output_buffer.append(f'{key}: {request_params[key]}') - state = CTerm.from_kast(printer.kore_to_kast(kore_term(request_params['state']))) - output_buffer.append('State:') - output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) - return output_buffer - - def pretty_print_execute_response(execute_result: ExecuteResult) -> list[str]: - output_buffer = [] - output_buffer.append(f'Depth: {execute_result.depth}') - output_buffer.append(f'Stop reason: {execute_result.reason.value}') - if execute_result.reason == StopReason.TERMINAL_RULE or execute_result.reason == StopReason.CUT_POINT_RULE: - output_buffer.append(f'Stop rule: {execute_result.rule}') - output_buffer.append( - f'Number of next states: {len(execute_result.next_states) if execute_result.next_states is not None else 0}' - ) - state = CTerm.from_kast(printer.kore_to_kast(execute_result.state.kore)) - output_buffer.append('State:') - output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) - if execute_result.next_states is not None: - next_states = [CTerm.from_kast(printer.kore_to_kast(s.kore)) for s in execute_result.next_states] - for i, s in enumerate(next_states): - output_buffer.append(f'Next state #{i}:') - output_buffer.append(printer.pretty_print(s.kast, sort_collections=True)) - return output_buffer - - try: - if 'method' in input_dict: - output_buffer.append('JSON RPC request') - output_buffer.append(f'id: {input_dict["id"]}') - output_buffer.append(f'Method: {input_dict["method"]}') - try: - if 'state' in input_dict['params']: - output_buffer += pretty_print_request(input_dict['params']) - else: # this is an "add-module" request, skip trying to print state - for key in input_dict['params'].keys(): - output_buffer.append(f'{key}: {input_dict["params"][key]}') - except KeyError as e: - _LOGGER.critical(f'Could not find key {str(e)} in input JSON file') - exit(1) - else: - if not 'result' in input_dict: - _LOGGER.critical('The input is neither a request not a resonse') - exit(1) - output_buffer.append('JSON RPC Response') - output_buffer.append(f'id: {input_dict["id"]}') - if list(input_dict['result'].keys()) == ['state']: # this is a "simplify" response - output_buffer.append('Method: simplify') - state = CTerm.from_kast(printer.kore_to_kast(kore_term(input_dict['result']['state']))) - output_buffer.append('State:') - output_buffer.append(printer.pretty_print(state.kast, sort_collections=True)) - elif list(input_dict['result'].keys()) == ['module']: # this is an "add-module" response - output_buffer.append('Method: add-module') - output_buffer.append('Module:') - output_buffer.append(input_dict['result']['module']) - else: - try: # assume it is an "execute" response - output_buffer.append('Method: execute') - execute_result = ExecuteResult.from_dict(input_dict['result']) - output_buffer += pretty_print_execute_response(execute_result) - except KeyError as e: - _LOGGER.critical(f'Could not find key {str(e)} in input JSON file') - exit(1) - if self.output_file is not None: - self.output_file.write('\n'.join(output_buffer)) - else: - print('\n'.join(output_buffer)) - except ValueError as e: - # shorten and print the error message in case kore_to_kast throws ValueError - _LOGGER.critical(str(e)[:200]) - exit(1) - - -class PrintCommand(Command, DefinitionOptions, OutputFileOptions, DisplayOptions, LoggingOptions): - term: IO[Any] - input: PrintInput - minimize: bool - omit_labels: str | None - keep_cells: str | None - - @staticmethod - def name() -> str: - return 'print' - - @staticmethod - def help_str() -> str: - return 'Pretty print a term.' - - @staticmethod - def default() -> dict[str, Any]: - return { - 'input': PrintInput.KAST_JSON, - 'omit_labels': None, - 'keep_cells': None, - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument( - 'term', type=FileType('r'), help='File containing input term (in format specified with --input).' - ) - parser.add_argument('--input', type=PrintInput, choices=list(PrintInput)) - parser.add_argument('--omit-labels', nargs='?', help='List of labels to omit from output.') - parser.add_argument('--keep-cells', nargs='?', help='List of cells with primitive values to keep in output.') - - def exec(self) -> None: - kompiled_dir: Path = self.definition_dir - printer = KPrint(kompiled_dir) - if self.input == PrintInput.KORE_JSON: - _LOGGER.info(f'Reading Kore JSON from file: {self.term.name}') - kore = Pattern.from_json(self.term.read()) - term = printer.kore_to_kast(kore) - else: - _LOGGER.info(f'Reading Kast JSON from file: {self.term.name}') - term = KInner.from_json(self.term.read()) - if is_top(term): - self.output_file.write(printer.pretty_print(term)) - _LOGGER.info(f'Wrote file: {self.output_file.name}') - else: - if self.minimize: - if self.omit_labels != None and self.keep_cells != None: - raise ValueError('You cannot use both --omit-labels and --keep-cells.') - - abstract_labels = self.omit_labels.split(',') if self.omit_labels is not None else [] - keep_cells = self.keep_cells.split(',') if self.keep_cells is not None else [] - minimized_disjuncts = [] - - for disjunct in flatten_label('#Or', term): - try: - minimized = minimize_term(disjunct, abstract_labels=abstract_labels, keep_cells=keep_cells) - config, constraint = split_config_and_constraints(minimized) - except ValueError as err: - raise ValueError('The minimized term does not contain a config cell.') from err - - if not is_top(constraint): - minimized_disjuncts.append(mlAnd([config, constraint], sort=GENERATED_TOP_CELL)) - else: - minimized_disjuncts.append(config) - term = propagate_up_constraints(mlOr(minimized_disjuncts, sort=GENERATED_TOP_CELL)) - - self.output_file.write(printer.pretty_print(term)) - _LOGGER.info(f'Wrote file: {self.output_file.name}') - - -class ProveCommand(Command, DefinitionOptions, OutputFileOptions, LoggingOptions): - main_file: Path - spec_file: Path - spec_module: str - k_args: Iterable[str] - - @staticmethod - def name() -> str: - return 'prove' - - @staticmethod - def help_str() -> str: - return 'Prove an input specification (using kprovex).' - - @staticmethod - def default() -> dict[str, Any]: - return { - 'k_args': [], - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument('main_file', type=str, help='Main file used for kompilation.') - parser.add_argument('spec_file', type=str, help='File with the specification module.') - parser.add_argument('spec_module', type=str, help='Module with claims to be proven.') - parser.add_argument('k_args', nargs='*', help='Arguments to pass through to K invocation.') - - def exec(self) -> None: - kompiled_dir: Path = self.definition_dir - kprover = KProve(kompiled_dir, self.main_file) - final_state = kprover.prove(Path(self.spec_file), spec_module_name=self.spec_module, args=self.k_args) - self.output_file.write(json.dumps(mlOr([state.kast for state in final_state]).to_dict())) - _LOGGER.info(f'Wrote file: {self.output_file.name}') diff --git a/src/pyk/cli/utils.py b/src/pyk/cli/utils.py index 4f236d0d5..c98320fdd 100644 --- a/src/pyk/cli/utils.py +++ b/src/pyk/cli/utils.py @@ -12,7 +12,6 @@ from typing import Final, TypeVar from ..kcfg.kcfg import NodeIdLike - from .args import LoggingOptions T1 = TypeVar('T1') T2 = TypeVar('T2') @@ -20,7 +19,7 @@ LOG_FORMAT: Final = '%(levelname)s %(asctime)s %(name)s - %(message)s' -def loglevel(args: LoggingOptions | Namespace) -> int: +def loglevel(args: Namespace) -> int: if args.debug: return logging.DEBUG diff --git a/src/pyk/kdist/__main__.py b/src/pyk/kdist/__main__.py index 47643d2fe..1a0624310 100644 --- a/src/pyk/kdist/__main__.py +++ b/src/pyk/kdist/__main__.py @@ -2,16 +2,16 @@ import fnmatch import logging -from typing import TYPE_CHECKING, Any +from argparse import ArgumentParser +from typing import TYPE_CHECKING -from pyk.cli.args import LoggingOptions -from pyk.cli.cli import CLI, Command +from pyk.cli.args import KCLIArgs from pyk.cli.utils import loglevel from ..kdist import kdist, target_ids if TYPE_CHECKING: - from argparse import ArgumentParser + from argparse import Namespace from typing import Final @@ -20,14 +20,42 @@ def main() -> None: - kdist_cli = CLI( - {KDistBuildCommand, KDistCleanCommand, KDistWhichCommand, KDistListCommand}, top_level_args=[LoggingOptions] + args = _parse_arguments() + + logging.basicConfig(level=loglevel(args), format=_LOG_FORMAT) + + if args.command == 'build': + _exec_build(**vars(args)) + + elif args.command == 'clean': + _exec_clean(args.target) + + elif args.command == 'which': + _exec_which(args.target) + + elif args.command == 'list': + _exec_list() + + else: + raise AssertionError() + + +def _exec_build( + command: str, + targets: list[str], + args: list[str], + jobs: int, + force: bool, + verbose: bool, + debug: bool, +) -> None: + kdist.build( + target_ids=_process_targets(targets), + args=_process_args(args), + jobs=jobs, + force=force, + verbose=verbose or debug, ) - cmd = kdist_cli.get_command() - assert isinstance(cmd, LoggingOptions) - print(vars(cmd)) - logging.basicConfig(level=loglevel(cmd), format=_LOG_FORMAT) - cmd.exec() def _process_targets(targets: list[str]) -> list[str]: @@ -52,137 +80,66 @@ def _process_args(args: list[str]) -> dict[str, str]: return res -class KDistBuildCommand(Command, LoggingOptions): - targets: list[str] - force: bool - jobs: int - args: list[str] - - @staticmethod - def name() -> str: - return 'build' - - @staticmethod - def help_str() -> str: - return 'build targets' - - @staticmethod - def default() -> dict[str, Any]: - return { - 'force': False, - 'jobs': 1, - 'targets': ['*'], - 'args': [], - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument('targets', metavar='TARGET', nargs='*', help='target to build') - parser.add_argument( - '-a', - '--arg', - dest='args', - metavar='ARG', - action='append', - help='build with argument', - ) - parser.add_argument('-f', '--force', action='store_true', help='force build') - parser.add_argument('-j', '--jobs', metavar='N', type=int, help='maximal number of build jobs') - - def exec(self) -> None: - print(self.verbose) - print(self.debug) - kdist.build( - target_ids=_process_targets(self.targets), - args=_process_args(self.args), - jobs=self.jobs, - force=self.force, - verbose=self.verbose or self.debug, - ) - - -class KDistCleanCommand(Command, LoggingOptions): - target: str - - @staticmethod - def name() -> str: - return 'clean' - - @staticmethod - def help_str() -> str: - return 'clean targets' - - @staticmethod - def default() -> dict[str, Any]: - return { - 'target': None, - } - - @staticmethod - def update_args(parser: ArgumentParser) -> None: - parser.add_argument( - 'target', - metavar='TARGET', - nargs='?', - help='target to clean', - ) +def _exec_clean(target: str | None) -> None: + res = kdist.clean(target) + print(res) - def exec(self) -> None: - res = kdist.clean(self.target) - print(res) +def _exec_which(target: str | None) -> None: + res = kdist.which(target) + print(res) -class KDistWhichCommand(Command, LoggingOptions): - target: str - @staticmethod - def name() -> str: - return 'which' +def _exec_list() -> None: + targets_by_plugin: dict[str, list[str]] = {} + for plugin_name, target_name in target_ids(): + targets = targets_by_plugin.get(plugin_name, []) + targets.append(target_name) + targets_by_plugin[plugin_name] = targets - @staticmethod - def help_str() -> str: - return 'print target location' + for plugin_name in targets_by_plugin: + print(plugin_name) + for target_name in targets_by_plugin[plugin_name]: + print(f'* {target_name}') - @staticmethod - def default() -> dict[str, Any]: - return { - 'target': None, - } - @staticmethod - def update_args(parser: ArgumentParser) -> None: +def _parse_arguments() -> Namespace: + def add_target_arg(parser: ArgumentParser, help_text: str) -> None: parser.add_argument( 'target', metavar='TARGET', nargs='?', - help='target to print directory for', + help=help_text, ) - def exec(self) -> None: - res = kdist.which(self.target) - print(res) - + k_cli_args = KCLIArgs() + + parser = ArgumentParser(prog='kdist', parents=[k_cli_args.logging_args]) + command_parser = parser.add_subparsers(dest='command', required=True) + + build_parser = command_parser.add_parser('build', help='build targets') + build_parser.add_argument('targets', metavar='TARGET', nargs='*', default='*', help='target to build') + build_parser.add_argument( + '-a', + '--arg', + dest='args', + metavar='ARG', + action='append', + default=[], + help='build with argument', + ) + build_parser.add_argument('-f', '--force', action='store_true', default=False, help='force build') + build_parser.add_argument('-j', '--jobs', metavar='N', type=int, default=1, help='maximal number of build jobs') -class KDistListCommand(Command, LoggingOptions): - @staticmethod - def name() -> str: - return 'list' + clean_parser = command_parser.add_parser('clean', help='clean targets') + add_target_arg(clean_parser, 'target to clean') - @staticmethod - def help_str() -> str: - return 'print list of available targets' + which_parser = command_parser.add_parser('which', help='print target location') + add_target_arg(which_parser, 'target to print directory for') - def exec(self) -> None: - targets_by_plugin: dict[str, list[str]] = {} - for plugin_name, target_name in target_ids(): - targets = targets_by_plugin.get(plugin_name, []) - targets.append(target_name) - targets_by_plugin[plugin_name] = targets + command_parser.add_parser('list', help='print list of available targets') - for plugin_name in targets_by_plugin: - print(plugin_name) - for target_name in targets_by_plugin[plugin_name]: - print(f'* {target_name}') + return parser.parse_args() if __name__ == '__main__':