diff --git a/CHANGELOG.md b/CHANGELOG.md index cd6d1ad..dd12321 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Configurable simplification steps (#54) - Parallel simplification (#53) - Terminate fuzzer workers in atexit() handler +- Subcommand to prune invalid crashes (#52) ### Changed diff --git a/cobrafuzz/main.py b/cobrafuzz/main.py index 4be9f31..7965865 100644 --- a/cobrafuzz/main.py +++ b/cobrafuzz/main.py @@ -4,7 +4,7 @@ from pathlib import Path from typing import Callable -from cobrafuzz import fuzzer, simplifier +from cobrafuzz import fuzzer, prune, simplifier class CobraFuzz: @@ -23,6 +23,12 @@ def __call__(self) -> None: subparsers = parser.add_subparsers(dest="subcommands") + parser_prune = subparsers.add_parser( + "prune", + help="Prune invalid files from crash directory.", + ) + parser_prune.set_defaults(func=self.prune) + parser_show = subparsers.add_parser( "show", help="Run target on examples in crash directory, print errors and exit.", @@ -145,6 +151,9 @@ def __call__(self) -> None: args.func(args) + def prune(self, args: argparse.Namespace) -> None: + prune.prune(crash_dir=args.crash_dir, target=self.function) + def show(self, args: argparse.Namespace) -> None: fuzzer.Fuzzer(crash_dir=args.crash_dir, target=self.function, regression=True) diff --git a/cobrafuzz/prune.py b/cobrafuzz/prune.py new file mode 100644 index 0000000..3adce20 --- /dev/null +++ b/cobrafuzz/prune.py @@ -0,0 +1,21 @@ +import logging +from pathlib import Path +from typing import Callable + +from cobrafuzz import util + + +def prune(crash_dir: Path, target: Callable[[bytes], None]) -> None: + for f in sorted(crash_dir.glob(pattern="*")): + assert isinstance(f, Path) + if not f.is_file(): + continue + + try: + with util.disable_logging(): + target(f.read_bytes()) + except Exception: # noqa: BLE001, S110 + pass + else: + logging.info("No crash, deleting %s", f.name) + f.unlink() diff --git a/cobrafuzz/simplifier.py b/cobrafuzz/simplifier.py index 3a64b53..499a08a 100644 --- a/cobrafuzz/simplifier.py +++ b/cobrafuzz/simplifier.py @@ -5,9 +5,8 @@ import multiprocessing as mp import re import time -from contextlib import contextmanager from pathlib import Path -from typing import Callable, Iterator, Optional, Union, cast +from typing import Callable, Optional, Union, cast import dill as pickle # type: ignore[import-untyped] @@ -105,20 +104,9 @@ def equivalent_to(self, other: Metrics) -> bool: return self.coverage == other.coverage -@contextmanager -def disable_logging() -> Iterator[None]: - previous_level = logging.root.manager.disable - logging.disable(logging.CRITICAL) - - try: - yield - finally: - logging.disable(previous_level) - - def run_target(target: Callable[[bytes], None], data: bytes) -> Optional[Metrics]: try: - with disable_logging(): + with util.disable_logging(): target(data) except Exception as e: # noqa: BLE001 return Metrics(data, util.covered(e.__traceback__, 1)) diff --git a/cobrafuzz/util.py b/cobrafuzz/util.py index 1ef9f5d..1a648ad 100644 --- a/cobrafuzz/util.py +++ b/cobrafuzz/util.py @@ -1,9 +1,11 @@ from __future__ import annotations +import logging import random from abc import abstractmethod +from contextlib import contextmanager from types import TracebackType -from typing import Generator, Generic, Optional, TypeVar +from typing import Generator, Generic, Iterator, Optional, TypeVar from . import common @@ -223,3 +225,14 @@ def covered( prev_file = tb.tb_frame.f_code.co_filename tb = tb.tb_next return set(result[skip_first_n:]) + + +@contextmanager +def disable_logging() -> Iterator[None]: + previous_level = logging.root.manager.disable + logging.disable(logging.CRITICAL) + + try: + yield + finally: + logging.disable(previous_level) diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index 2f37c69..42c7acc 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -6,7 +6,7 @@ import pytest -from cobrafuzz import fuzzer, simplifier +from cobrafuzz import fuzzer, prune, simplifier from cobrafuzz.main import CobraFuzz @@ -107,6 +107,31 @@ def simplify(self) -> None: assert args["target"] is not None +def test_main_prune(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + args: Optional[dict[str, Union[Path, Callable[[bytes], None]]]] = None + + def dummy_prune(**a: Union[Path, Callable[[bytes], None]]) -> None: + nonlocal args + args = a + + with monkeypatch.context() as mp: + mp.setattr( + sys, + "argv", + [ + "main", + "--crash-dir", + str(tmp_path), + "prune", + ], + ) + mp.setattr(prune, "prune", dummy_prune) + c = CobraFuzz(lambda _: None) # pragma: no cover + c() + assert args is not None + assert args["crash_dir"] == tmp_path + + def test_main_no_subcommand(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: with monkeypatch.context() as mp: mp.setattr( diff --git a/tests/unit/test_prune.py b/tests/unit/test_prune.py new file mode 100644 index 0000000..b259ba4 --- /dev/null +++ b/tests/unit/test_prune.py @@ -0,0 +1,31 @@ +import logging +from pathlib import Path + +import pytest + +from cobrafuzz.prune import prune + + +def test_prune(caplog: pytest.LogCaptureFixture, tmp_path: Path) -> None: + def target(data: bytes) -> None: + assert b"crash" not in data + + (tmp_path / "f1").write_bytes(b"crash 1") + (tmp_path / "f2").write_bytes(b"crash 2") + (tmp_path / "f3").write_bytes(b"invalid 1") + (tmp_path / "f4").write_bytes(b"invalid 2") + (tmp_path / "d").mkdir() + + with caplog.at_level(logging.INFO): + prune(crash_dir=tmp_path, target=target) + + assert (tmp_path / "f1").read_bytes() == b"crash 1" + assert (tmp_path / "f2").read_bytes() == b"crash 2" + assert (tmp_path / "d").is_dir() + assert not (tmp_path / "f3").exists() + assert not (tmp_path / "f4").exists() + + assert caplog.record_tuples == [ + ("root", logging.INFO, "No crash, deleting f3"), + ("root", logging.INFO, "No crash, deleting f4"), + ]