diff --git a/python-dsl/codepathfinder/__init__.py b/python-dsl/codepathfinder/__init__.py index 96a6e41c..214db5d6 100644 --- a/python-dsl/codepathfinder/__init__.py +++ b/python-dsl/codepathfinder/__init__.py @@ -28,5 +28,17 @@ from .decorators import rule from .dataflow import flows from .propagation import propagates +from .presets import PropagationPresets +from .config import set_default_propagation, set_default_scope -__all__ = ["calls", "variable", "rule", "flows", "propagates", "__version__"] +__all__ = [ + "calls", + "variable", + "rule", + "flows", + "propagates", + "PropagationPresets", + "set_default_propagation", + "set_default_scope", + "__version__", +] diff --git a/python-dsl/codepathfinder/config.py b/python-dsl/codepathfinder/config.py new file mode 100644 index 00000000..7111dbb3 --- /dev/null +++ b/python-dsl/codepathfinder/config.py @@ -0,0 +1,92 @@ +""" +Global configuration for codepathfinder DSL. + +Allows setting default propagation, scope, etc. +""" + +from typing import List, Optional +from .propagation import PropagationPrimitive + + +class PathfinderConfig: + """Singleton configuration for codepathfinder.""" + + _instance: Optional["PathfinderConfig"] = None + _default_propagation: List[PropagationPrimitive] = [] + _default_scope: str = "global" + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + @property + def default_propagation(self) -> List[PropagationPrimitive]: + """Get default propagation primitives.""" + return self._default_propagation + + @default_propagation.setter + def default_propagation(self, value: List[PropagationPrimitive]): + """Set default propagation primitives.""" + self._default_propagation = value + + @property + def default_scope(self) -> str: + """Get default scope.""" + return self._default_scope + + @default_scope.setter + def default_scope(self, value: str): + """Set default scope.""" + if value not in ["local", "global"]: + raise ValueError(f"scope must be 'local' or 'global', got '{value}'") + self._default_scope = value + + +# Global config instance +_config = PathfinderConfig() + + +def set_default_propagation(primitives: List[PropagationPrimitive]) -> None: + """ + Set global default propagation primitives. + + All flows() calls without explicit propagates_through will use this default. + + Args: + primitives: List of PropagationPrimitive objects + + Example: + set_default_propagation(PropagationPresets.standard()) + + # Now all flows() without propagates_through use standard() + flows( + from_sources=calls("request.GET"), + to_sinks=calls("eval"), + # propagates_through defaults to standard() + ) + """ + _config.default_propagation = primitives + + +def set_default_scope(scope: str) -> None: + """ + Set global default scope. + + Args: + scope: "local" or "global" + + Example: + set_default_scope("local") + """ + _config.default_scope = scope + + +def get_default_propagation() -> List[PropagationPrimitive]: + """Get global default propagation primitives.""" + return _config.default_propagation + + +def get_default_scope() -> str: + """Get global default scope.""" + return _config.default_scope diff --git a/python-dsl/codepathfinder/dataflow.py b/python-dsl/codepathfinder/dataflow.py index 4a5d18da..36b76f41 100644 --- a/python-dsl/codepathfinder/dataflow.py +++ b/python-dsl/codepathfinder/dataflow.py @@ -9,6 +9,7 @@ from .matchers import CallMatcher from .propagation import PropagationPrimitive, create_propagation_list from .ir import IRType +from .config import get_default_propagation, get_default_scope class DataflowMatcher: @@ -36,7 +37,7 @@ def __init__( to_sinks: Union[CallMatcher, List[CallMatcher]], sanitized_by: Optional[Union[CallMatcher, List[CallMatcher]]] = None, propagates_through: Optional[List[PropagationPrimitive]] = None, - scope: str = "global", + scope: Optional[str] = None, ): """ Args: @@ -84,12 +85,14 @@ def __init__( sanitized_by = [sanitized_by] self.sanitizers = sanitized_by - # Validate propagation (EXPLICIT!) + # Validate propagation (use global default if not specified) if propagates_through is None: - propagates_through = [] # NO DEFAULT! Developer must specify! + propagates_through = get_default_propagation() self.propagates_through = propagates_through - # Validate scope + # Validate scope (use global default if not specified) + if scope is None: + scope = get_default_scope() if scope not in ["local", "global"]: raise ValueError(f"scope must be 'local' or 'global', got '{scope}'") self.scope = scope @@ -142,7 +145,7 @@ def flows( to_sinks: Union[CallMatcher, List[CallMatcher]], sanitized_by: Optional[Union[CallMatcher, List[CallMatcher]]] = None, propagates_through: Optional[List[PropagationPrimitive]] = None, - scope: str = "global", + scope: Optional[str] = None, ) -> DataflowMatcher: """ Create a dataflow matcher for taint analysis. diff --git a/python-dsl/codepathfinder/presets.py b/python-dsl/codepathfinder/presets.py new file mode 100644 index 00000000..1b5e6b9e --- /dev/null +++ b/python-dsl/codepathfinder/presets.py @@ -0,0 +1,135 @@ +""" +Propagation presets for common use cases. + +Presets bundle propagation primitives for convenience. +""" + +from typing import List +from .propagation import propagates, PropagationPrimitive + + +class PropagationPresets: + """ + Common propagation bundles. + + Developers can use presets instead of manually listing primitives. + """ + + @staticmethod + def minimal() -> List[PropagationPrimitive]: + """ + Bare minimum propagation (fastest, least false negatives). + + Covers: + - Variable assignments + - Function arguments + + Coverage: ~40% of real-world flows + Performance: Fastest (minimal overhead) + False negatives: Higher (misses return values, strings) + + Use when: + - Performance is critical + - You only care about direct variable flows + + Example: + flows( + from_sources=calls("request.GET"), + to_sinks=calls("eval"), + propagates_through=PropagationPresets.minimal(), + scope="local" + ) + """ + return [ + propagates.assignment(), + propagates.function_args(), + ] + + @staticmethod + def standard() -> List[PropagationPrimitive]: + """ + Recommended default (good balance). + + Covers: + - Phase 1: assignment, function_args, function_returns + - Phase 2: string_concat, string_format + + Coverage: ~75-80% of real-world flows + Performance: Good (moderate overhead) + False negatives: Lower + + Use when: + - General-purpose taint analysis + - OWASP Top 10 detection + - Good balance of coverage and performance + + Example: + flows( + from_sources=calls("request.*"), + to_sinks=calls("execute"), + propagates_through=PropagationPresets.standard(), + scope="global" + ) + """ + return [ + propagates.assignment(), + propagates.function_args(), + propagates.function_returns(), + propagates.string_concat(), + propagates.string_format(), + ] + + @staticmethod + def comprehensive() -> List[PropagationPrimitive]: + """ + All MVP primitives (Phase 1 + Phase 2). + + Covers: + - All standard() primitives + + Coverage: ~80% of real-world flows + Performance: Moderate + False negatives: Low + + Use when: + - Maximum coverage within MVP scope + - Willing to accept moderate performance overhead + + Example: + flows( + from_sources=calls("request.*"), + to_sinks=calls("eval"), + propagates_through=PropagationPresets.comprehensive(), + scope="global" + ) + """ + return PropagationPresets.standard() # For MVP, comprehensive = standard + + @staticmethod + def exhaustive() -> List[PropagationPrimitive]: + """ + All primitives (Phase 1-6, POST-MVP). + + NOTE: For MVP, this is same as comprehensive(). + Post-MVP will include collections, control flow, OOP, advanced. + + Coverage: ~95% of real-world flows (POST-MVP) + Performance: Slower (comprehensive analysis) + False negatives: Minimal + + Use when: + - Maximum security coverage required + - Performance is not a concern + - Production-critical code + + Example: + flows( + from_sources=calls("request.*"), + to_sinks=calls("execute"), + propagates_through=PropagationPresets.exhaustive(), + scope="global" + ) + """ + # MVP: same as comprehensive + # POST-MVP: will include Phase 3-6 primitives + return PropagationPresets.comprehensive() diff --git a/python-dsl/codepathfinder/propagation.py b/python-dsl/codepathfinder/propagation.py index 5370682c..80b42ca2 100644 --- a/python-dsl/codepathfinder/propagation.py +++ b/python-dsl/codepathfinder/propagation.py @@ -176,9 +176,52 @@ def get_user_id(): """ return PropagationPrimitive(PropagationType.FUNCTION_RETURNS) - # ===== PHASE 2: STRING OPERATIONS (MVP - Future PR) ===== - # Will be implemented in PR #4 - # string_concat(), string_format() + # ===== PHASE 2: STRING OPERATIONS (MVP - THIS PR) ===== + + @staticmethod + def string_concat() -> PropagationPrimitive: + """ + Taint propagates through string concatenation. + + Patterns matched: + result = tainted + "suffix" # Right concat + result = "prefix" + tainted # Left concat + result = tainted + safe + more # Mixed concat + + Critical for SQL/Command injection where queries are built via concat (~10% of flows). + + Examples: + user_id = request.GET.get("id") # source + query = "SELECT * FROM users WHERE id = " + user_id # PROPAGATES via string_concat + cursor.execute(query) # sink + + Returns: + PropagationPrimitive for string concatenation + """ + return PropagationPrimitive(PropagationType.STRING_CONCAT) + + @staticmethod + def string_format() -> PropagationPrimitive: + """ + Taint propagates through string formatting. + + Patterns matched: + f"{tainted}" # f-string + "{}".format(tainted) # str.format() + "%s" % tainted # % formatting + "{name}".format(name=tainted) # Named placeholders + + Critical for SQL injection where ORM methods use format() (~8% of flows). + + Examples: + user_id = request.GET.get("id") # source + query = f"SELECT * FROM users WHERE id = {user_id}" # PROPAGATES via string_format + cursor.execute(query) # sink + + Returns: + PropagationPrimitive for string formatting + """ + return PropagationPrimitive(PropagationType.STRING_FORMAT) # ===== PHASE 3-6: POST-MVP ===== # Will be implemented in post-MVP PRs diff --git a/python-dsl/tests/test_config.py b/python-dsl/tests/test_config.py new file mode 100644 index 00000000..37334f67 --- /dev/null +++ b/python-dsl/tests/test_config.py @@ -0,0 +1,207 @@ +"""Tests for global configuration.""" + +import pytest +from codepathfinder import ( + set_default_propagation, + set_default_scope, + PropagationPresets, + flows, + calls, + propagates, +) +from codepathfinder.config import ( + get_default_propagation, + get_default_scope, + PathfinderConfig, +) + + +class TestPathfinderConfig: + """Test PathfinderConfig singleton.""" + + def test_singleton_instance(self): + """Test PathfinderConfig is a singleton.""" + config1 = PathfinderConfig() + config2 = PathfinderConfig() + assert config1 is config2 + + def test_default_propagation_property(self): + """Test default_propagation property getter/setter.""" + config = PathfinderConfig() + config.default_propagation = PropagationPresets.minimal() + assert len(config.default_propagation) == 2 + + def test_default_scope_property(self): + """Test default_scope property getter/setter.""" + config = PathfinderConfig() + config.default_scope = "local" + assert config.default_scope == "local" + + def test_invalid_scope_raises(self): + """Test setting invalid scope raises ValueError.""" + config = PathfinderConfig() + with pytest.raises(ValueError, match="scope must be"): + config.default_scope = "invalid" + + +class TestGlobalConfig: + """Test global configuration functions.""" + + def test_set_default_propagation(self): + """Test setting default propagation.""" + set_default_propagation(PropagationPresets.minimal()) + result = get_default_propagation() + assert len(result) == 2 + + def test_set_default_propagation_standard(self): + """Test setting standard propagation.""" + set_default_propagation(PropagationPresets.standard()) + result = get_default_propagation() + assert len(result) == 5 + + def test_set_default_propagation_custom(self): + """Test setting custom propagation list.""" + custom = [propagates.assignment(), propagates.string_concat()] + set_default_propagation(custom) + result = get_default_propagation() + assert len(result) == 2 + + def test_set_default_scope_local(self): + """Test setting default scope to local.""" + set_default_scope("local") + assert get_default_scope() == "local" + + def test_set_default_scope_global(self): + """Test setting default scope to global.""" + set_default_scope("global") + assert get_default_scope() == "global" + + def test_invalid_scope_function_raises(self): + """Test set_default_scope with invalid scope raises ValueError.""" + with pytest.raises(ValueError, match="scope must be"): + set_default_scope("invalid") + + +class TestFlowsWithDefaults: + """Test flows() uses global defaults when not specified.""" + + def test_flows_uses_default_propagation(self): + """Test flows() uses global default propagation when not specified.""" + # Set default + set_default_propagation(PropagationPresets.minimal()) + + # Create matcher without specifying propagates_through + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("eval"), + # propagates_through NOT specified + ) + + assert len(matcher.propagates_through) == 2 + + def test_flows_uses_default_scope(self): + """Test flows() uses global default scope when not specified.""" + # Set default + set_default_scope("local") + + # Create matcher without specifying scope + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("eval"), + propagates_through=[], + # scope NOT specified + ) + + assert matcher.scope == "local" + + def test_flows_override_default_propagation(self): + """Test flows() can override default propagation.""" + # Set default to minimal + set_default_propagation(PropagationPresets.minimal()) + + # Override with standard + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("eval"), + propagates_through=PropagationPresets.standard(), # OVERRIDE + ) + + assert len(matcher.propagates_through) == 5 # Not 2 + + def test_flows_override_default_scope(self): + """Test flows() can override default scope.""" + # Set default to local + set_default_scope("local") + + # Override with global + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("eval"), + propagates_through=[], + scope="global", # OVERRIDE + ) + + assert matcher.scope == "global" # Not local + + def test_flows_with_empty_default_propagation(self): + """Test flows() with empty default propagation.""" + # Set default to empty list + set_default_propagation([]) + + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("eval"), + # propagates_through NOT specified (uses empty default) + ) + + assert matcher.propagates_through == [] + + def test_flows_explicit_empty_overrides_default(self): + """Test flows() with explicit empty list overrides default.""" + # Set default to standard + set_default_propagation(PropagationPresets.standard()) + + # Explicitly pass empty list + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("eval"), + propagates_through=[], # EXPLICIT empty + ) + + assert matcher.propagates_through == [] # Not standard + + +class TestDefaultsIntegration: + """Integration tests for global defaults.""" + + def test_complete_default_workflow(self): + """Test complete workflow with defaults.""" + # Setup defaults + set_default_propagation(PropagationPresets.standard()) + set_default_scope("global") + + # Create matcher using defaults + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + ) + + assert len(matcher.propagates_through) == 5 + assert matcher.scope == "global" + + def test_partial_override_workflow(self): + """Test workflow with partial overrides.""" + # Setup defaults + set_default_propagation(PropagationPresets.standard()) + set_default_scope("global") + + # Override only scope + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + # propagates_through uses default (standard) + scope="local", # override + ) + + assert len(matcher.propagates_through) == 5 # from default + assert matcher.scope == "local" # overridden diff --git a/python-dsl/tests/test_dataflow.py b/python-dsl/tests/test_dataflow.py index 0143c7aa..33207811 100644 --- a/python-dsl/tests/test_dataflow.py +++ b/python-dsl/tests/test_dataflow.py @@ -17,6 +17,8 @@ def test_create_with_single_source_and_sink(self): matcher = DataflowMatcher( from_sources=calls("request.GET"), to_sinks=calls("execute"), + propagates_through=[], # Explicit empty + scope="global", # Explicit scope ) assert len(matcher.sources) == 1 assert len(matcher.sinks) == 1 @@ -117,6 +119,8 @@ def test_minimal_ir(self): matcher = DataflowMatcher( from_sources=calls("source"), to_sinks=calls("sink"), + propagates_through=[], # Explicit empty + scope="global", # Explicit scope ) ir = matcher.to_ir() assert ir["type"] == IRType.DATAFLOW.value @@ -257,8 +261,14 @@ def test_flows_default_scope_is_global(self): ) assert matcher.scope == "global" - def test_flows_default_propagation_is_empty(self): - """flows() defaults to empty propagation list (EXPLICIT!).""" + def test_flows_default_propagation_uses_global_config(self): + """flows() uses global default propagation when not specified.""" + # This test now reflects PR #4 behavior + from codepathfinder import set_default_propagation + + # Set a known default + set_default_propagation([]) + matcher = flows( from_sources=calls("request.GET"), to_sinks=calls("execute"), diff --git a/python-dsl/tests/test_presets.py b/python-dsl/tests/test_presets.py new file mode 100644 index 00000000..0d799ae7 --- /dev/null +++ b/python-dsl/tests/test_presets.py @@ -0,0 +1,116 @@ +"""Tests for PropagationPresets.""" + +from codepathfinder.presets import PropagationPresets +from codepathfinder.propagation import PropagationType, PropagationPrimitive + + +class TestPropagationPresets: + """Test propagation preset bundles.""" + + def test_minimal_returns_list(self): + """Test minimal preset returns list of primitives.""" + prims = PropagationPresets.minimal() + assert isinstance(prims, list) + assert all(isinstance(p, PropagationPrimitive) for p in prims) + + def test_minimal_count_and_types(self): + """Test minimal preset has correct primitives.""" + prims = PropagationPresets.minimal() + assert len(prims) == 2 + assert prims[0].type == PropagationType.ASSIGNMENT + assert prims[1].type == PropagationType.FUNCTION_ARGS + + def test_standard_returns_list(self): + """Test standard preset returns list of primitives.""" + prims = PropagationPresets.standard() + assert isinstance(prims, list) + assert all(isinstance(p, PropagationPrimitive) for p in prims) + + def test_standard_count_and_types(self): + """Test standard preset (recommended) has all Phase 1+2 primitives.""" + prims = PropagationPresets.standard() + assert len(prims) == 5 + types = [p.type for p in prims] + assert PropagationType.ASSIGNMENT in types + assert PropagationType.FUNCTION_ARGS in types + assert PropagationType.FUNCTION_RETURNS in types + assert PropagationType.STRING_CONCAT in types + assert PropagationType.STRING_FORMAT in types + + def test_comprehensive_equals_standard_for_mvp(self): + """Test comprehensive preset (MVP all) is same as standard for MVP.""" + comp_prims = PropagationPresets.comprehensive() + std_prims = PropagationPresets.standard() + assert len(comp_prims) == len(std_prims) + assert len(comp_prims) == 5 # For MVP, same as standard + + def test_exhaustive_equals_comprehensive_for_mvp(self): + """Test exhaustive preset (future: all phases) is same as comprehensive for MVP.""" + exh_prims = PropagationPresets.exhaustive() + comp_prims = PropagationPresets.comprehensive() + assert len(exh_prims) == len(comp_prims) + assert len(exh_prims) >= 5 # For MVP, same as comprehensive + + def test_minimal_serializes_to_ir(self): + """Test minimal preset primitives can serialize to IR.""" + prims = PropagationPresets.minimal() + ir_list = [p.to_ir() for p in prims] + assert len(ir_list) == 2 + assert ir_list[0]["type"] == "assignment" + assert ir_list[1]["type"] == "function_args" + + def test_standard_serializes_to_ir(self): + """Test standard preset primitives can serialize to IR.""" + prims = PropagationPresets.standard() + ir_list = [p.to_ir() for p in prims] + assert len(ir_list) == 5 + assert all("type" in ir for ir in ir_list) + assert all("metadata" in ir for ir in ir_list) + + +class TestPresetOrdering: + """Test that presets maintain consistent ordering.""" + + def test_minimal_order(self): + """Test minimal preset has consistent order.""" + prims = PropagationPresets.minimal() + types = [p.type.value for p in prims] + assert types == ["assignment", "function_args"] + + def test_standard_order(self): + """Test standard preset has consistent order.""" + prims = PropagationPresets.standard() + types = [p.type.value for p in prims] + assert types == [ + "assignment", + "function_args", + "function_returns", + "string_concat", + "string_format", + ] + + +class TestPresetUsage: + """Test realistic usage patterns with presets.""" + + def test_preset_can_be_used_with_flows(self): + """Test presets can be passed to flows() propagates_through parameter.""" + from codepathfinder import flows, calls + + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("eval"), + propagates_through=PropagationPresets.minimal(), + ) + assert len(matcher.propagates_through) == 2 + + def test_preset_standard_with_flows(self): + """Test standard preset with flows().""" + from codepathfinder import flows, calls + + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + propagates_through=PropagationPresets.standard(), + ) + assert len(matcher.propagates_through) == 5 diff --git a/python-dsl/tests/test_propagation_phase2.py b/python-dsl/tests/test_propagation_phase2.py new file mode 100644 index 00000000..d40c678c --- /dev/null +++ b/python-dsl/tests/test_propagation_phase2.py @@ -0,0 +1,72 @@ +"""Tests for Phase 2 propagation primitives.""" + +from codepathfinder.propagation import propagates, PropagationType, PropagationPrimitive + + +class TestPhase2Primitives: + """Test Phase 2 string operation primitives.""" + + def test_string_concat_returns_primitive(self): + """Test propagates.string_concat() returns PropagationPrimitive.""" + prim = propagates.string_concat() + assert isinstance(prim, PropagationPrimitive) + assert prim.type == PropagationType.STRING_CONCAT + + def test_string_concat_ir(self): + """Test propagates.string_concat() serializes correctly.""" + prim = propagates.string_concat() + ir = prim.to_ir() + assert ir["type"] == "string_concat" + assert ir["metadata"] == {} + + def test_string_format_returns_primitive(self): + """Test propagates.string_format() returns PropagationPrimitive.""" + prim = propagates.string_format() + assert isinstance(prim, PropagationPrimitive) + assert prim.type == PropagationType.STRING_FORMAT + + def test_string_format_ir(self): + """Test propagates.string_format() serializes correctly.""" + prim = propagates.string_format() + ir = prim.to_ir() + assert ir["type"] == "string_format" + assert ir["metadata"] == {} + + def test_string_concat_repr(self): + """Test string_concat __repr__.""" + prim = propagates.string_concat() + assert repr(prim) == "propagates.string_concat()" + + def test_string_format_repr(self): + """Test string_format __repr__.""" + prim = propagates.string_format() + assert repr(prim) == "propagates.string_format()" + + +class TestPhase2Integration: + """Integration tests for Phase 2 primitives.""" + + def test_all_phase2_primitives(self): + """Test all Phase 2 primitives can be used together.""" + prims = [ + propagates.string_concat(), + propagates.string_format(), + ] + assert len(prims) == 2 + types = [p.type for p in prims] + assert PropagationType.STRING_CONCAT in types + assert PropagationType.STRING_FORMAT in types + + def test_phase1_and_phase2_together(self): + """Test Phase 1 and Phase 2 primitives work together.""" + prims = [ + propagates.assignment(), + propagates.function_args(), + propagates.function_returns(), + propagates.string_concat(), + propagates.string_format(), + ] + assert len(prims) == 5 + # All primitives should serialize to IR + ir_list = [p.to_ir() for p in prims] + assert len(ir_list) == 5