From 6dbc3dfce60871ec7444045f4f37f9a9a03378d1 Mon Sep 17 00:00:00 2001 From: akshat4703 Date: Fri, 13 Mar 2026 12:52:06 +0530 Subject: [PATCH 1/2] reject unsupported logic edge cases during rule loading --- capa/rules/__init__.py | 33 +++++++++++++++++------ tests/test_rules.py | 59 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 8 deletions(-) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 23fd0dd3c..a6830ee20 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1052,6 +1052,24 @@ def evaluate(self, features: FeatureSet, short_circuit=True): capa.perf.counters["evaluate.feature.rule"] += 1 return self.statement.evaluate(features, short_circuit=short_circuit) + @staticmethod + def _lint_logic_edge_cases(statement: Union[Statement, Feature], is_root: bool = True) -> None: + """ + reject rule constructs known to be unsupported by the optimized matcher. + """ + if isinstance(statement, ceng.Not): + if is_root: + raise InvalidRule("top level statement may not be a `not` statement") + if isinstance(statement.child, ceng.Not): + raise InvalidRule("nested `not` statements are not supported") + + if is_root and isinstance(statement, ceng.Range) and statement.min == 0 and statement.max == 0: + raise InvalidRule("top level statement may not be `count(...): 0`") + + if isinstance(statement, Statement): + for child in statement.get_children(): + Rule._lint_logic_edge_cases(child, is_root=False) + @classmethod def from_dict(cls, d: dict[str, Any], definition: str) -> "Rule": meta = d["rule"]["meta"] @@ -1087,7 +1105,9 @@ def from_dict(cls, d: dict[str, Any], definition: str) -> "Rule": if not isinstance(meta.get("mbc", []), list): raise InvalidRule("MBC mapping must be a list") - return cls(name, scopes, build_statements(statements[0], scopes), meta, definition) + statement = build_statements(statements[0], scopes) + cls._lint_logic_edge_cases(statement) + return cls(name, scopes, statement, meta, definition) @staticmethod @lru_cache() @@ -2109,13 +2129,10 @@ def match( This wrapper around _match exists so that we can assert it matches precisely the same as `capa.engine.match`, just faster. - This matcher does not handle some edge cases: - - top level NOT statements - - also top level counted features with zero occurances, like: `count(menmonic(mov)): 0` - - nested NOT statements (NOT: NOT: foo) - - We should discourage/forbid these constructs from our rules and add lints for them. - TODO(williballenthin): add lints for logic edge cases + This matcher does not handle some edge cases, such as top level NOT statements, + top level counted features with zero occurrences (`count(mnemonic(mov)): 0`), + and nested NOT statements (`not: not: ...`). + These constructs are rejected during rule loading. Args: paranoid: when true, demonstrate that the naive matcher agrees with this diff --git a/tests/test_rules.py b/tests/test_rules.py index 9a8b6874f..72f61fb6f 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -1685,3 +1685,62 @@ def test_circular_dependency(): ] with pytest.raises(capa.rules.InvalidRule): list(capa.rules.get_rules_and_dependencies(rules, rules[0].name)) + + +def test_invalid_top_level_not_statement(): + with pytest.raises(capa.rules.InvalidRule, match="top level statement may not be a `not` statement"): + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: function + dynamic: process + features: + - not: + - number: 1 + """ + ) + ) + + +def test_invalid_nested_not_statement(): + with pytest.raises(capa.rules.InvalidRule, match="nested `not` statements are not supported"): + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: function + dynamic: process + features: + - and: + - mnemonic: mov + - not: + - not: + - number: 1 + """ + ) + ) + + +def test_invalid_top_level_count_zero_statement(): + with pytest.raises(capa.rules.InvalidRule, match="top level statement may not be `count\\(...\\): 0`"): + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: function + dynamic: process + features: + - count(number(100)): 0 + """ + ) + ) From 1e85550b804fe1e17272f079e76373f76514bac4 Mon Sep 17 00:00:00 2001 From: akshat4703 Date: Fri, 13 Mar 2026 13:23:57 +0530 Subject: [PATCH 2/2] requested-changes --- capa/rules/__init__.py | 44 +++++++++++++++--------------------------- tests/test_match.py | 27 ++++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 30 deletions(-) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index a6830ee20..9e2c0e989 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1052,24 +1052,6 @@ def evaluate(self, features: FeatureSet, short_circuit=True): capa.perf.counters["evaluate.feature.rule"] += 1 return self.statement.evaluate(features, short_circuit=short_circuit) - @staticmethod - def _lint_logic_edge_cases(statement: Union[Statement, Feature], is_root: bool = True) -> None: - """ - reject rule constructs known to be unsupported by the optimized matcher. - """ - if isinstance(statement, ceng.Not): - if is_root: - raise InvalidRule("top level statement may not be a `not` statement") - if isinstance(statement.child, ceng.Not): - raise InvalidRule("nested `not` statements are not supported") - - if is_root and isinstance(statement, ceng.Range) and statement.min == 0 and statement.max == 0: - raise InvalidRule("top level statement may not be `count(...): 0`") - - if isinstance(statement, Statement): - for child in statement.get_children(): - Rule._lint_logic_edge_cases(child, is_root=False) - @classmethod def from_dict(cls, d: dict[str, Any], definition: str) -> "Rule": meta = d["rule"]["meta"] @@ -1105,9 +1087,7 @@ def from_dict(cls, d: dict[str, Any], definition: str) -> "Rule": if not isinstance(meta.get("mbc", []), list): raise InvalidRule("MBC mapping must be a list") - statement = build_statements(statements[0], scopes) - cls._lint_logic_edge_cases(statement) - return cls(name, scopes, statement, meta, definition) + return cls(name, scopes, build_statements(statements[0], scopes), meta, definition) @staticmethod @lru_cache() @@ -1670,6 +1650,9 @@ class _RuleFeatureIndex: # Mapping from rule name to list of Bytes features that have to match. # All these features will be evaluated whenever a Bytes feature is encountered. bytes_rules: dict[str, list[Feature]] + # Rules that cannot be safely handled by optimized candidate filtering. + # These are always evaluated via the unoptimized path. + fallback_rules: set[str] # this routine is unstable and may change before the next major release. @staticmethod @@ -1822,13 +1805,18 @@ def and_score_key(item): # Ideally we find a way to get rid of all of these, eventually. string_rules: dict[str, list[Feature]] = {} bytes_rules: dict[str, list[Feature]] = {} + fallback_rules: set[str] = set() for rule in rules: rule_name = rule.meta["name"] root = rule.statement item = rec(rule_name, root) - assert item is not None + if item is None: + logger.debug("indexing: no stable index feature for rule, will use fallback matcher: %s", rule_name) + scores_by_rule[rule_name] = 0 + fallback_rules.add(rule_name) + continue score, features = item string_features = [ @@ -1867,8 +1855,9 @@ def and_score_key(item): logger.debug( "indexing: %d scanning string features, %d scanning bytes features", len(string_rules), len(bytes_rules) ) + logger.debug("indexing: %d rules evaluated via fallback matcher", len(fallback_rules)) - return RuleSet._RuleFeatureIndex(rules_by_feature, string_rules, bytes_rules) + return RuleSet._RuleFeatureIndex(rules_by_feature, string_rules, bytes_rules, fallback_rules) @staticmethod def _get_rules_for_scope(rules, scope) -> list[Rule]: @@ -1996,7 +1985,7 @@ def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[Fea # Find all the rules that could match the given feature set. # Ideally we want this set to be as small and focused as possible, # and we can tune it by tweaking `_index_rules_by_feature`. - candidate_rule_names: set[str] = set() + candidate_rule_names: set[str] = set(feature_index.fallback_rules) for feature in features: candidate_rule_names.update(feature_index.rules_by_feature.get(feature, ())) @@ -2129,10 +2118,9 @@ def match( This wrapper around _match exists so that we can assert it matches precisely the same as `capa.engine.match`, just faster. - This matcher does not handle some edge cases, such as top level NOT statements, - top level counted features with zero occurrences (`count(mnemonic(mov)): 0`), - and nested NOT statements (`not: not: ...`). - These constructs are rejected during rule loading. + Some edge-case rule constructs cannot be safely optimized for candidate filtering. + In those cases, this matcher transparently falls back to unoptimized evaluation + for the affected rules. Args: paranoid: when true, demonstrate that the naive matcher agrees with this diff --git a/tests/test_match.py b/tests/test_match.py index 8fdd146dc..0c1b451a7 100644 --- a/tests/test_match.py +++ b/tests/test_match.py @@ -198,6 +198,31 @@ def test_match_range_with_zero(): assert "test rule" not in matches +def test_match_top_level_range_exact_zero(): + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: function + dynamic: process + features: + - count(number(100)): 0 + """ + ) + r = capa.rules.Rule.from_yaml(rule) + + _, matches = match([r], {}, 0x0) + assert "test rule" in matches + + _, matches = match([r], {capa.features.insn.Number(100): {}}, 0x0) + assert "test rule" in matches + + _, matches = match([r], {capa.features.insn.Number(100): {1}}, 0x0) + assert "test rule" not in matches + + def test_match_adds_matched_rule_feature(): """show that using `match` adds a feature for matched rules.""" rule = textwrap.dedent( @@ -585,7 +610,6 @@ def test_regex_get_value_str(pattern): assert capa.features.common.Regex(pattern).get_value_str() == pattern -@pytest.mark.xfail(reason="can't have top level NOT") def test_match_only_not(): rule = textwrap.dedent( """ @@ -630,7 +654,6 @@ def test_match_not(): assert "test rule" in matches -@pytest.mark.xfail(reason="can't have nested NOT") def test_match_not_not(): rule = textwrap.dedent( """