diff --git a/py_abac/_policy/__init__.py b/py_abac/_policy/__init__.py new file mode 100644 index 0000000..d48023e --- /dev/null +++ b/py_abac/_policy/__init__.py @@ -0,0 +1,5 @@ +""" + Exposed classes and methods +""" + +# from .policy import Policy diff --git a/py_abac/_policy/conditions/__init__.py b/py_abac/_policy/conditions/__init__.py new file mode 100644 index 0000000..6a26758 --- /dev/null +++ b/py_abac/_policy/conditions/__init__.py @@ -0,0 +1,3 @@ +""" + Conditions +""" diff --git a/py_abac/_policy/conditions/attribute/__init__.py b/py_abac/_policy/conditions/attribute/__init__.py new file mode 100644 index 0000000..a66d3ce --- /dev/null +++ b/py_abac/_policy/conditions/attribute/__init__.py @@ -0,0 +1,12 @@ +""" + Attribute conditions +""" + +from .all_in import AllInAttribute +from .all_not_in import AllNotInAttribute +from .any_in import AnyInAttribute +from .any_not_in import AnyNotInAttribute +from .equals import EqualsAttribute +from .is_in import IsInAttribute +from .is_not_in import IsNotInAttribute +from .not_equals import NotEqualsAttribute diff --git a/py_abac/_policy/conditions/attribute/all_in.py b/py_abac/_policy/conditions/attribute/all_in.py new file mode 100644 index 0000000..1b283a9 --- /dev/null +++ b/py_abac/_policy/conditions/attribute/all_in.py @@ -0,0 +1,46 @@ +""" + All in attribute condition +""" + +import logging + +from .base import AttributeCondition +from ..collection.base import is_collection + +LOG = logging.getLogger(__name__) + + +class AllInAttribute(AttributeCondition): + """ + Condition for all attribute values in that of another + """ + # Condition type specifier + condition: str = "AllInAttribute" + + def is_satisfied(self, ctx) -> bool: + # Extract attribute value from request to match + self._value = ctx.get_attribute_value(self.ace, self.path) + # Check if attribute value to match is a collection + if not is_collection(ctx.attribute_value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(ctx.attribute_value), + ctx.attribute_path, + ctx.ace + ) + return False + return self._is_satisfied(ctx.attribute_value) + + def _is_satisfied(self, what) -> bool: + # Check if value is a collection + if not is_collection(self._value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(self._value), + self.path, + self.ace + ) + return False + return set(what).issubset(self._value) diff --git a/py_abac/_policy/conditions/attribute/all_not_in.py b/py_abac/_policy/conditions/attribute/all_not_in.py new file mode 100644 index 0000000..bb53d3f --- /dev/null +++ b/py_abac/_policy/conditions/attribute/all_not_in.py @@ -0,0 +1,46 @@ +""" + All not in attribute condition +""" + +import logging + +from .base import AttributeCondition +from ..collection.base import is_collection + +LOG = logging.getLogger(__name__) + + +class AllNotInAttribute(AttributeCondition): + """ + Condition for all attribute values not in that of another + """ + # Condition type specifier + condition: str = "AllNotInAttribute" + + def is_satisfied(self, ctx) -> bool: + # Extract attribute value from request to match + self._value = ctx.get_attribute_value(self.ace, self.path) + # Check if attribute value to match is a collection + if not is_collection(ctx.attribute_value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(ctx.attribute_value), + ctx.attribute_path, + ctx.ace + ) + return False + return self._is_satisfied(ctx.attribute_value) + + def _is_satisfied(self, what) -> bool: + # Check if value is a collection + if not is_collection(self._value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(self._value), + self.path, + self.ace + ) + return False + return not set(what).issubset(self._value) diff --git a/py_abac/_policy/conditions/attribute/any_in.py b/py_abac/_policy/conditions/attribute/any_in.py new file mode 100644 index 0000000..3e50b2f --- /dev/null +++ b/py_abac/_policy/conditions/attribute/any_in.py @@ -0,0 +1,46 @@ +""" + Any in attribute condition +""" + +import logging + +from .base import AttributeCondition +from ..collection.base import is_collection + +LOG = logging.getLogger(__name__) + + +class AnyInAttribute(AttributeCondition): + """ + Condition for any attribute values in that of another + """ + # Condition type specifier + condition: str = "AnyInAttribute" + + def is_satisfied(self, ctx) -> bool: + # Extract attribute value from request to match + self._value = ctx.get_attribute_value(self.ace, self.path) + # Check if attribute value to match is a collection + if not is_collection(ctx.attribute_value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(ctx.attribute_value), + ctx.attribute_path, + ctx.ace + ) + return False + return self._is_satisfied(ctx.attribute_value) + + def _is_satisfied(self, what) -> bool: + # Check if value is a collection + if not is_collection(self._value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(self._value), + self.path, + self.ace + ) + return False + return bool(set(what).intersection(self._value)) diff --git a/py_abac/_policy/conditions/attribute/any_not_in.py b/py_abac/_policy/conditions/attribute/any_not_in.py new file mode 100644 index 0000000..27d7b46 --- /dev/null +++ b/py_abac/_policy/conditions/attribute/any_not_in.py @@ -0,0 +1,46 @@ +""" + Any not in attribute condition +""" + +import logging + +from .base import AttributeCondition +from ..collection.base import is_collection + +LOG = logging.getLogger(__name__) + + +class AnyNotInAttribute(AttributeCondition): + """ + Condition for any attribute values not in that of another + """ + # Condition type specifier + condition: str = "AnyNotInAttribute" + + def is_satisfied(self, ctx) -> bool: + # Extract attribute value from request to match + self._value = ctx.get_attribute_value(self.ace, self.path) + # Check if attribute value to match is a collection + if not is_collection(ctx.attribute_value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(ctx.attribute_value), + ctx.attribute_path, + ctx.ace + ) + return False + return self._is_satisfied(ctx.attribute_value) + + def _is_satisfied(self, what) -> bool: + # Check if value is a collection + if not is_collection(self._value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(self._value), + self.path, + self.ace + ) + return False + return not bool(set(what).intersection(self._value)) diff --git a/py_abac/_policy/conditions/attribute/base.py b/py_abac/_policy/conditions/attribute/base.py new file mode 100644 index 0000000..35a2095 --- /dev/null +++ b/py_abac/_policy/conditions/attribute/base.py @@ -0,0 +1,86 @@ +""" + Attribute Condition Base +""" + +from typing import Any + +from marshmallow import ValidationError +from objectpath import Tree +from pydantic import PrivateAttr + +from ..base import ConditionBase, ABCMeta, abstractmethod + + +def validate_path(path): + """ + Validate given attribute path satisfies ObjectPath notation. + Throws ValidationError for invalid path. + """ + try: + Tree({}).execute(path) + except Exception as err: + raise ValidationError(*err.args) + + +class AccessControlElementField(str): + """ + Access control element field + """ + # Access control elements + aces = ["subject", "resource", "action", "context"] + + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, v): + if v not in cls.aces: + raise ValueError("Must be one of: {}".format(cls.aces)) + return v + + +class ObjectPathField(str): + """ + ObjectPath field + """ + + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, v): + try: + Tree({}).execute(v) + except Exception: + raise ValueError("Invalid ObjectPath notation") + return v + + +class AttributeCondition(ConditionBase, metaclass=ABCMeta): + """ + Base class for attribute conditions + """ + ace: AccessControlElementField + path: ObjectPathField + _value: Any = PrivateAttr() + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._value = None + + def is_satisfied(self, ctx) -> bool: + # Extract attribute value from request to match + self._value = ctx.get_attribute_value(self.ace, self.path) + return self._is_satisfied(ctx.attribute_value) + + @abstractmethod + def _is_satisfied(self, what) -> bool: + """ + Is attribute conditions satisfied + + :param what: attribute value to check + :return: True if satisfied else False + """ + raise NotImplementedError() diff --git a/py_abac/_policy/conditions/attribute/equals.py b/py_abac/_policy/conditions/attribute/equals.py new file mode 100644 index 0000000..c8999ea --- /dev/null +++ b/py_abac/_policy/conditions/attribute/equals.py @@ -0,0 +1,16 @@ +""" + Equals attribute condition +""" + +from .base import AttributeCondition + + +class EqualsAttribute(AttributeCondition): + """ + Condition for attribute value equals that of another + """ + # Condition type specifier + condition: str = "EqualsAttribute" + + def _is_satisfied(self, what) -> bool: + return what == self._value diff --git a/py_abac/_policy/conditions/attribute/is_in.py b/py_abac/_policy/conditions/attribute/is_in.py new file mode 100644 index 0000000..165f135 --- /dev/null +++ b/py_abac/_policy/conditions/attribute/is_in.py @@ -0,0 +1,31 @@ +""" + Is in attribute condition +""" + +import logging + +from .base import AttributeCondition +from ..collection.base import is_collection + +LOG = logging.getLogger(__name__) + + +class IsInAttribute(AttributeCondition): + """ + Condition for attribute value in that of another + """ + # Condition type specifier + condition: str = "IsInAttribute" + + def _is_satisfied(self, what) -> bool: + # Check if value is a collection + if not is_collection(self._value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(self._value), + self.path, + self.ace + ) + return False + return what in self._value diff --git a/py_abac/_policy/conditions/attribute/is_not_in.py b/py_abac/_policy/conditions/attribute/is_not_in.py new file mode 100644 index 0000000..b537fc0 --- /dev/null +++ b/py_abac/_policy/conditions/attribute/is_not_in.py @@ -0,0 +1,31 @@ +""" + Is not in attribute condition +""" + +import logging + +from .base import AttributeCondition +from ..collection.base import is_collection + +LOG = logging.getLogger(__name__) + + +class IsNotInAttribute(AttributeCondition): + """ + Condition for attribute value not in that of another + """ + # Condition type specifier + condition: str = "IsNotInAttribute" + + def _is_satisfied(self, what) -> bool: + # Check if value is a collection + if not is_collection(self._value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(self._value), + self.path, + self.ace + ) + return False + return what not in self._value diff --git a/py_abac/_policy/conditions/attribute/not_equals.py b/py_abac/_policy/conditions/attribute/not_equals.py new file mode 100644 index 0000000..ae60411 --- /dev/null +++ b/py_abac/_policy/conditions/attribute/not_equals.py @@ -0,0 +1,16 @@ +""" + Not equals attribute condition +""" + +from .base import AttributeCondition + + +class NotEqualsAttribute(AttributeCondition): + """ + Condition for attribute value not equals that of another + """ + # Condition type specifier + condition: str = "NotEqualsAttribute" + + def _is_satisfied(self, what) -> bool: + return what != self._value diff --git a/py_abac/_policy/conditions/base.py b/py_abac/_policy/conditions/base.py new file mode 100644 index 0000000..da94ff1 --- /dev/null +++ b/py_abac/_policy/conditions/base.py @@ -0,0 +1,33 @@ +""" + Operation base class +""" + +from abc import ABCMeta, abstractmethod + +from pydantic import BaseModel + +from py_abac.context import EvaluationContext + + +class ConditionBase(BaseModel, metaclass=ABCMeta): + """ + Base class for conditions + """ + # Condition type specifier + condition: str + + class Config: + """ + Pydantic configuration + """ + extra = 'forbid' + + @abstractmethod + def is_satisfied(self, ctx: EvaluationContext) -> bool: + """ + Is conditions satisfied? + + :param ctx: evaluation context + :return: True if satisfied else False + """ + raise NotImplementedError() diff --git a/py_abac/_policy/conditions/collection/__init__.py b/py_abac/_policy/conditions/collection/__init__.py new file mode 100644 index 0000000..054ae68 --- /dev/null +++ b/py_abac/_policy/conditions/collection/__init__.py @@ -0,0 +1,12 @@ +""" + Collection conditions +""" + +from .all_in import AllIn +from .all_not_in import AllNotIn +from .any_in import AnyIn +from .any_not_in import AnyNotIn +from .is_empty import IsEmpty +from .is_in import IsIn +from .is_not_empty import IsNotEmpty +from .is_not_in import IsNotIn diff --git a/py_abac/_policy/conditions/collection/all_in.py b/py_abac/_policy/conditions/collection/all_in.py new file mode 100644 index 0000000..250ccab --- /dev/null +++ b/py_abac/_policy/conditions/collection/all_in.py @@ -0,0 +1,16 @@ +""" + All of the values in collection conditions +""" + +from .base import CollectionCondition + + +class AllIn(CollectionCondition): + """ + Condition for all values of `what` in `values` + """ + # Condition type specifier + condition: str = "AllIn" + + def _is_satisfied(self, what) -> bool: + return set(what).issubset(self.values) diff --git a/py_abac/_policy/conditions/collection/all_not_in.py b/py_abac/_policy/conditions/collection/all_not_in.py new file mode 100644 index 0000000..9b75ee1 --- /dev/null +++ b/py_abac/_policy/conditions/collection/all_not_in.py @@ -0,0 +1,16 @@ +""" + All of the values not in collection conditions +""" + +from .base import CollectionCondition + + +class AllNotIn(CollectionCondition): + """ + Condition for all values of `what` not in `values` + """ + # Condition type specifier + condition: str = "AllNotIn" + + def _is_satisfied(self, what) -> bool: + return not set(what).issubset(self.values) diff --git a/py_abac/_policy/conditions/collection/any_in.py b/py_abac/_policy/conditions/collection/any_in.py new file mode 100644 index 0000000..b674af9 --- /dev/null +++ b/py_abac/_policy/conditions/collection/any_in.py @@ -0,0 +1,16 @@ +""" + Any of the values in collection conditions +""" + +from .base import CollectionCondition + + +class AnyIn(CollectionCondition): + """ + Condition for any value of `what` in `values` + """ + # Condition type specifier + condition: str = "AnyIn" + + def _is_satisfied(self, what) -> bool: + return bool(set(what).intersection(self.values)) diff --git a/py_abac/_policy/conditions/collection/any_not_in.py b/py_abac/_policy/conditions/collection/any_not_in.py new file mode 100644 index 0000000..ed36a43 --- /dev/null +++ b/py_abac/_policy/conditions/collection/any_not_in.py @@ -0,0 +1,16 @@ +""" + Any of the values not in collection conditions +""" + +from .base import CollectionCondition + + +class AnyNotIn(CollectionCondition): + """ + Condition for any values of `what` not in `values` + """ + # Condition type specifier + condition: str = "AnyNotIn" + + def _is_satisfied(self, what) -> bool: + return not bool(set(what).intersection(self.values)) diff --git a/py_abac/_policy/conditions/collection/base.py b/py_abac/_policy/conditions/collection/base.py new file mode 100644 index 0000000..f592cb3 --- /dev/null +++ b/py_abac/_policy/conditions/collection/base.py @@ -0,0 +1,46 @@ +""" + Collection conditions base class +""" + +import logging +from typing import Union, List, Set, Tuple + +from ..base import ConditionBase, ABCMeta, abstractmethod + +LOG = logging.getLogger(__name__) + + +def is_collection(value) -> bool: + """ + Check if value is a collection + """ + return any([isinstance(value, list), isinstance(value, set), isinstance(value, tuple)]) + + +class CollectionCondition(ConditionBase, metaclass=ABCMeta): + """ + Base class for collection conditions + """ + values: Union[List, Set, Tuple] + + def is_satisfied(self, ctx) -> bool: + if not is_collection(ctx.attribute_value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(ctx.attribute_value), + ctx.attribute_path, + ctx.ace + ) + return False + return self._is_satisfied(ctx.attribute_value) + + @abstractmethod + def _is_satisfied(self, what) -> bool: + """ + Is collection conditions satisfied + + :param what: collection to check + :return: True if satisfied else False + """ + raise NotImplementedError() diff --git a/py_abac/_policy/conditions/collection/is_empty.py b/py_abac/_policy/conditions/collection/is_empty.py new file mode 100644 index 0000000..740ea37 --- /dev/null +++ b/py_abac/_policy/conditions/collection/is_empty.py @@ -0,0 +1,35 @@ +""" + Collection is empty conditions +""" + +from .base import ConditionBase, is_collection, LOG + + +class IsEmpty(ConditionBase): + """ + Condition for `what` being an empty collection + """ + # Condition type specifier + condition: str = "IsEmpty" + + def is_satisfied(self, ctx) -> bool: + if not is_collection(ctx.attribute_value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(ctx.attribute_value), + ctx.attribute_path, + ctx.ace + ) + return False + return self._is_satisfied(ctx.attribute_value) + + @staticmethod + def _is_satisfied(what) -> bool: + """ + Is collection conditions satisfied + + :param what: collection to check + :return: True if satisfied else False + """ + return len(what) == 0 diff --git a/py_abac/_policy/conditions/collection/is_in.py b/py_abac/_policy/conditions/collection/is_in.py new file mode 100644 index 0000000..a037fee --- /dev/null +++ b/py_abac/_policy/conditions/collection/is_in.py @@ -0,0 +1,19 @@ +""" + Value is in collection conditions +""" + +from .base import CollectionCondition + + +class IsIn(CollectionCondition): + """ + Condition for `what` is a member of `values` + """ + # Condition type specifier + condition: str = "IsIn" + + def is_satisfied(self, ctx): + return self._is_satisfied(ctx.attribute_value) + + def _is_satisfied(self, what) -> bool: + return what in self.values diff --git a/py_abac/_policy/conditions/collection/is_not_empty.py b/py_abac/_policy/conditions/collection/is_not_empty.py new file mode 100644 index 0000000..a3781a9 --- /dev/null +++ b/py_abac/_policy/conditions/collection/is_not_empty.py @@ -0,0 +1,35 @@ +""" + Collection is not empty conditions +""" + +from .base import ConditionBase, is_collection, LOG + + +class IsNotEmpty(ConditionBase): + """ + Condition for `what` not being an empty collection + """ + # Condition type specifier + condition: str = "IsNotEmpty" + + def is_satisfied(self, ctx) -> bool: + if not is_collection(ctx.attribute_value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(ctx.attribute_value), + ctx.attribute_path, + ctx.ace + ) + return False + return self._is_satisfied(ctx.attribute_value) + + @staticmethod + def _is_satisfied(what) -> bool: + """ + Is collection conditions satisfied + + :param what: collection to check + :return: True if satisfied else False + """ + return len(what) != 0 diff --git a/py_abac/_policy/conditions/collection/is_not_in.py b/py_abac/_policy/conditions/collection/is_not_in.py new file mode 100644 index 0000000..239ac67 --- /dev/null +++ b/py_abac/_policy/conditions/collection/is_not_in.py @@ -0,0 +1,19 @@ +""" + Value is not in collection conditions +""" + +from .base import CollectionCondition + + +class IsNotIn(CollectionCondition): + """ + Condition for `what` is not a member of `values` + """ + # Condition type specifier + condition: str = "IsNotIn" + + def is_satisfied(self, ctx) -> bool: + return self._is_satisfied(ctx.attribute_value) + + def _is_satisfied(self, what) -> bool: + return what not in self.values diff --git a/py_abac/_policy/conditions/field.py b/py_abac/_policy/conditions/field.py new file mode 100644 index 0000000..2f4612a --- /dev/null +++ b/py_abac/_policy/conditions/field.py @@ -0,0 +1,111 @@ +""" + Condition one-of schema +""" + +from .attribute.all_in import AllInAttribute +from .attribute.all_not_in import AllNotInAttribute +from .attribute.any_in import AnyInAttribute +from .attribute.any_not_in import AnyNotInAttribute +# -- Attribute Conditions --- +from .attribute.equals import EqualsAttribute +from .attribute.is_in import IsInAttribute +from .attribute.is_not_in import IsNotInAttribute +from .attribute.not_equals import NotEqualsAttribute +# --- Collection Conditions --- +from .collection.all_in import AllIn +from .collection.all_not_in import AllNotIn +from .collection.any_in import AnyIn +from .collection.any_not_in import AnyNotIn +from .collection.is_empty import IsEmpty +from .collection.is_in import IsIn +from .collection.is_not_empty import IsNotEmpty +from .collection.is_not_in import IsNotIn +# --- Logic Conditions --- +from .logic.all_of import AllOf +from .logic.any_of import AnyOf +from .logic.not_ import Not +# --- Numeric Conditions --- +from .numeric.eq import Eq +from .numeric.gt import Gt +from .numeric.gte import Gte +from .numeric.lt import Lt +from .numeric.lte import Lte +from .numeric.neq import Neq +# --- Object Conditions --- +from .object.equals_object import EqualsObject +# --- Other Conditions --- +from .others.any import Any +from .others.cidr import CIDR +from .others.exists import Exists +from .others.not_exists import NotExists +# --- String Conditions --- +from .string.contains import Contains +from .string.ends_with import EndsWith +from .string.equals import Equals +from .string.not_contains import NotContains +from .string.not_equals import NotEquals +from .string.regex_match import RegexMatch +from .string.starts_with import StartsWith + + +class ConditionField: + """ + Polymorphic JSON field for conditions + """ + type_field = "condition" + type_schemas = { + # --- Numeric Conditions --- + Eq.__name__: Eq, + Gt.__name__: Gt, + Lt.__name__: Lt, + Gte.__name__: Gte, + Lte.__name__: Lte, + Neq.__name__: Neq, + # --- String Conditions --- + Contains.__name__: Contains, + NotContains.__name__: NotContains, + Equals.__name__: Equals, + NotEquals.__name__: NotEquals, + StartsWith.__name__: StartsWith, + EndsWith.__name__: EndsWith, + RegexMatch.__name__: RegexMatch, + # --- Collection Conditions --- + IsIn.__name__: IsIn, + IsNotIn.__name__: IsNotIn, + AllIn.__name__: AllIn, + AllNotIn.__name__: AllNotIn, + AnyIn.__name__: AnyIn, + AnyNotIn.__name__: AnyNotIn, + IsEmpty.__name__: IsEmpty, + IsNotEmpty.__name__: IsNotEmpty, + # --- Logic Conditions --- + AllOf.__name__: AllOf, + AnyOf.__name__: AnyOf, + Not.__name__: Not, + # --- Other Conditions --- + CIDR.__name__: CIDR, + Exists.__name__: Exists, + NotExists.__name__: NotExists, + Any.__name__: Any, + # --- Attribute Conditions --- + EqualsAttribute.__name__: EqualsAttribute, + NotEqualsAttribute.__name__: NotEqualsAttribute, + AnyInAttribute.__name__: AnyInAttribute, + AnyNotInAttribute.__name__: AnyNotInAttribute, + AllInAttribute.__name__: AllInAttribute, + AllNotInAttribute.__name__: AllNotInAttribute, + IsInAttribute.__name__: IsInAttribute, + IsNotInAttribute.__name__: IsNotInAttribute, + # --- Object Condition --- + EqualsObject.__name__: EqualsObject, + } + + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, v, values): + item_type = values[cls.type_field] + condition = cls.type_schemas[item_type] + return condition(**v) diff --git a/py_abac/_policy/conditions/logic/__init__.py b/py_abac/_policy/conditions/logic/__init__.py new file mode 100644 index 0000000..3ff440e --- /dev/null +++ b/py_abac/_policy/conditions/logic/__init__.py @@ -0,0 +1,7 @@ +""" + Logic conditions +""" + +from .all_of import AllOf +from .any_of import AnyOf +from .not_ import Not diff --git a/py_abac/_policy/conditions/logic/all_of.py b/py_abac/_policy/conditions/logic/all_of.py new file mode 100644 index 0000000..c18e0e8 --- /dev/null +++ b/py_abac/_policy/conditions/logic/all_of.py @@ -0,0 +1,16 @@ +""" + Logical AND conditions +""" + +from .base import LogicCondition + + +class AllOf(LogicCondition): + """ + Condition for all of the sub-rules are satisfied + """ + # Condition type specifier + condition: str = "AllOf" + + def is_satisfied(self, ctx) -> bool: + return all(value.is_satisfied(ctx) for value in self.values) diff --git a/py_abac/_policy/conditions/logic/any_of.py b/py_abac/_policy/conditions/logic/any_of.py new file mode 100644 index 0000000..445905a --- /dev/null +++ b/py_abac/_policy/conditions/logic/any_of.py @@ -0,0 +1,16 @@ +""" + Logical OR conditions +""" + +from .base import LogicCondition + + +class AnyOf(LogicCondition): + """ + Condition for any of sub-rules are satisfied + """ + # Condition type specifier + condition: str = "AnyOf" + + def is_satisfied(self, ctx) -> bool: + return any(value.is_satisfied(ctx) for value in self.values) diff --git a/py_abac/_policy/conditions/logic/base.py b/py_abac/_policy/conditions/logic/base.py new file mode 100644 index 0000000..03c89f0 --- /dev/null +++ b/py_abac/_policy/conditions/logic/base.py @@ -0,0 +1,17 @@ +""" + Logic conditions base class +""" + +from typing import List + +from ..base import ConditionBase, ABCMeta + + +class LogicCondition(ConditionBase, metaclass=ABCMeta): + """ + Base class for logical conditions + """ + values: List + + def is_satisfied(self, ctx) -> bool: + raise NotImplementedError() diff --git a/py_abac/_policy/conditions/logic/not_.py b/py_abac/_policy/conditions/logic/not_.py new file mode 100644 index 0000000..27531a0 --- /dev/null +++ b/py_abac/_policy/conditions/logic/not_.py @@ -0,0 +1,19 @@ +""" + Logical NOT conditions +""" + +from typing import Any + +from .base import ConditionBase + + +class Not(ConditionBase): + """ + Condition for logical NOT condition + """ + # Condition type specifier + condition: str = "Not" + value: Any + + def is_satisfied(self, ctx) -> bool: + return not self.value.is_satisfied(ctx) diff --git a/py_abac/_policy/conditions/numeric/__init__.py b/py_abac/_policy/conditions/numeric/__init__.py new file mode 100644 index 0000000..50bac70 --- /dev/null +++ b/py_abac/_policy/conditions/numeric/__init__.py @@ -0,0 +1,10 @@ +""" + Numeric conditions +""" + +from .eq import Eq +from .gt import Gt +from .gte import Gte +from .lt import Lt +from .lte import Lte +from .neq import Neq diff --git a/py_abac/_policy/conditions/numeric/base.py b/py_abac/_policy/conditions/numeric/base.py new file mode 100644 index 0000000..0fac59e --- /dev/null +++ b/py_abac/_policy/conditions/numeric/base.py @@ -0,0 +1,46 @@ +""" + Numeric conditions base class +""" + +import logging +from typing import Union + +from ..base import ConditionBase, ABCMeta, abstractmethod + +LOG = logging.getLogger(__name__) + + +def is_number(value) -> bool: + """ + Check if value is a number + """ + return isinstance(value, (float, int)) + + +class NumericCondition(ConditionBase, metaclass=ABCMeta): + """ + Base class for numeric conditions + """ + value: Union[int, float] + + def is_satisfied(self, ctx) -> bool: + if not is_number(ctx.attribute_value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(ctx.attribute_value), + ctx.attribute_path, + ctx.ace + ) + return False + return self._is_satisfied(ctx.attribute_value) + + @abstractmethod + def _is_satisfied(self, what) -> bool: + """ + Is numeric conditions satisfied + + :param what: numeric value to check + :return: True if satisfied else False + """ + raise NotImplementedError() diff --git a/py_abac/_policy/conditions/numeric/eq.py b/py_abac/_policy/conditions/numeric/eq.py new file mode 100644 index 0000000..1f3bb3a --- /dev/null +++ b/py_abac/_policy/conditions/numeric/eq.py @@ -0,0 +1,16 @@ +""" + Numeric equal conditions +""" + +from .base import NumericCondition + + +class Eq(NumericCondition): + """ + Condition for number `what` equals `value` + """ + # Condition type specifier + condition: str = "Eq" + + def _is_satisfied(self, what) -> bool: + return what == self.value diff --git a/py_abac/_policy/conditions/numeric/gt.py b/py_abac/_policy/conditions/numeric/gt.py new file mode 100644 index 0000000..dace9bf --- /dev/null +++ b/py_abac/_policy/conditions/numeric/gt.py @@ -0,0 +1,16 @@ +""" + Numeric greater than conditions +""" + +from .base import NumericCondition + + +class Gt(NumericCondition): + """ + Condition for number `what` greater than `value` + """ + # Condition type specifier + condition: str = "Gt" + + def _is_satisfied(self, what) -> bool: + return what > self.value diff --git a/py_abac/_policy/conditions/numeric/gte.py b/py_abac/_policy/conditions/numeric/gte.py new file mode 100644 index 0000000..86c7889 --- /dev/null +++ b/py_abac/_policy/conditions/numeric/gte.py @@ -0,0 +1,16 @@ +""" + Numeric greater than equal conditions +""" + +from .base import NumericCondition + + +class Gte(NumericCondition): + """ + Condition for number `what` greater than equals `value` + """ + # Condition type specifier + condition: str = "Gte" + + def _is_satisfied(self, what) -> bool: + return what >= self.value diff --git a/py_abac/_policy/conditions/numeric/lt.py b/py_abac/_policy/conditions/numeric/lt.py new file mode 100644 index 0000000..deeb47b --- /dev/null +++ b/py_abac/_policy/conditions/numeric/lt.py @@ -0,0 +1,16 @@ +""" + Numeric less than conditions +""" + +from .base import NumericCondition + + +class Lt(NumericCondition): + """ + Condition for number `what` less than `value` + """ + # Condition type specifier + condition: str = "Lt" + + def _is_satisfied(self, what) -> bool: + return what < self.value diff --git a/py_abac/_policy/conditions/numeric/lte.py b/py_abac/_policy/conditions/numeric/lte.py new file mode 100644 index 0000000..7e50420 --- /dev/null +++ b/py_abac/_policy/conditions/numeric/lte.py @@ -0,0 +1,16 @@ +""" + Numeric less than equal conditions +""" + +from .base import NumericCondition + + +class Lte(NumericCondition): + """ + Condition for number `what` less than equals `value` + """ + # Condition type specifier + condition: str = "Lte" + + def _is_satisfied(self, what) -> bool: + return what <= self.value diff --git a/py_abac/_policy/conditions/numeric/neq.py b/py_abac/_policy/conditions/numeric/neq.py new file mode 100644 index 0000000..8cfba78 --- /dev/null +++ b/py_abac/_policy/conditions/numeric/neq.py @@ -0,0 +1,16 @@ +""" + Numeric not equal conditions +""" + +from .base import NumericCondition + + +class Neq(NumericCondition): + """ + Condition for number `what` not equals `value` + """ + # Condition type specifier + condition: str = "Neq" + + def _is_satisfied(self, what) -> bool: + return what != self.value diff --git a/py_abac/_policy/conditions/object/__init__.py b/py_abac/_policy/conditions/object/__init__.py new file mode 100644 index 0000000..06e8cd5 --- /dev/null +++ b/py_abac/_policy/conditions/object/__init__.py @@ -0,0 +1,5 @@ +""" + Object conditions +""" + +from .equals_object import EqualsObject diff --git a/py_abac/_policy/conditions/object/equals_object.py b/py_abac/_policy/conditions/object/equals_object.py new file mode 100644 index 0000000..532f486 --- /dev/null +++ b/py_abac/_policy/conditions/object/equals_object.py @@ -0,0 +1,19 @@ +""" + Equals object conditions +""" + +from typing import Dict + +from ..base import ConditionBase + + +class EqualsObject(ConditionBase): + """ + Equals object conditions + """ + # Condition type specifier + condition: str = "EqualsObject" + value: Dict + + def is_satisfied(self, ctx) -> bool: + return self.value == ctx.attribute_value diff --git a/py_abac/_policy/conditions/others/__init__.py b/py_abac/_policy/conditions/others/__init__.py new file mode 100644 index 0000000..5a06282 --- /dev/null +++ b/py_abac/_policy/conditions/others/__init__.py @@ -0,0 +1,8 @@ +""" + Other conditions +""" + +from .any import Any +from .cidr import CIDR +from .exists import Exists +from .not_exists import NotExists diff --git a/py_abac/_policy/conditions/others/any.py b/py_abac/_policy/conditions/others/any.py new file mode 100644 index 0000000..36eb494 --- /dev/null +++ b/py_abac/_policy/conditions/others/any.py @@ -0,0 +1,17 @@ +""" + Attribute any value conditions +""" +from marshmallow import Schema, post_load + +from ..base import ConditionBase + + +class Any(ConditionBase): + """ + Condition for attribute having any value + """ + # Condition type specifier + condition: str = "Any" + + def is_satisfied(self, ctx) -> bool: + return True diff --git a/py_abac/_policy/conditions/others/cidr.py b/py_abac/_policy/conditions/others/cidr.py new file mode 100644 index 0000000..571f2d1 --- /dev/null +++ b/py_abac/_policy/conditions/others/cidr.py @@ -0,0 +1,47 @@ +""" + Conditions relevant to networking context +""" + +import ipaddress +import logging + +from pydantic import StrictStr + +from ..base import ConditionBase + +LOG = logging.getLogger(__name__) + + +class CIDR(ConditionBase): + """ + Condition for IP address `what` in CIDR `value`. + """ + # Condition type specifier + condition: str = "CIDR" + value: StrictStr + + def is_satisfied(self, ctx) -> bool: + if not isinstance(ctx.attribute_value, str): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(ctx.attribute_value), + ctx.attribute_path, + ctx.ace + ) + return False + return self._is_satisfied(ctx.attribute_value) + + def _is_satisfied(self, what) -> bool: + """ + Is CIDR conditions satisfied + + :param what: IP address to check + :return: True if satisfied else False + """ + try: + ip_addr = ipaddress.ip_address(what) + net = ipaddress.ip_network(self.value) + except ValueError: + return False + return ip_addr in net diff --git a/py_abac/_policy/conditions/others/exists.py b/py_abac/_policy/conditions/others/exists.py new file mode 100644 index 0000000..35a057f --- /dev/null +++ b/py_abac/_policy/conditions/others/exists.py @@ -0,0 +1,16 @@ +""" + Attribute exists conditions +""" + +from ..base import ConditionBase + + +class Exists(ConditionBase): + """ + Condition for attribute value exists + """ + # Condition type specifier + condition: str = "Exists" + + def is_satisfied(self, ctx) -> bool: + return ctx.attribute_value is not None diff --git a/py_abac/_policy/conditions/others/not_exists.py b/py_abac/_policy/conditions/others/not_exists.py new file mode 100644 index 0000000..08fb903 --- /dev/null +++ b/py_abac/_policy/conditions/others/not_exists.py @@ -0,0 +1,16 @@ +""" + Attribute does not exists conditions +""" + +from ..base import ConditionBase + + +class NotExists(ConditionBase): + """ + Condition for attribute value not exists + """ + # Condition type specifier + condition: str = "NotExists" + + def is_satisfied(self, ctx) -> bool: + return ctx.attribute_value is None diff --git a/py_abac/_policy/conditions/string/__init__.py b/py_abac/_policy/conditions/string/__init__.py new file mode 100644 index 0000000..8f63c39 --- /dev/null +++ b/py_abac/_policy/conditions/string/__init__.py @@ -0,0 +1,11 @@ +""" + String conditions +""" + +from .contains import Contains +from .ends_with import EndsWith +from .equals import Equals +from .not_contains import NotContains +from .not_equals import NotEquals +from .regex_match import RegexMatch +from .starts_with import StartsWith diff --git a/py_abac/_policy/conditions/string/base.py b/py_abac/_policy/conditions/string/base.py new file mode 100644 index 0000000..0614903 --- /dev/null +++ b/py_abac/_policy/conditions/string/base.py @@ -0,0 +1,48 @@ +""" + String conditions base class +""" + +import logging + +from pydantic import StrictStr + +from ..base import ConditionBase, ABCMeta, abstractmethod + +LOG = logging.getLogger(__name__) + + +def is_string(value) -> bool: + """ + Check if value is string + """ + return isinstance(value, str) + + +class StringCondition(ConditionBase, metaclass=ABCMeta): + """ + Base class for string conditions + """ + value: StrictStr + case_insensitive: bool = False + + def is_satisfied(self, ctx) -> bool: + if not is_string(ctx.attribute_value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(ctx.attribute_value), + ctx.attribute_path, + ctx.ace + ) + return False + return self._is_satisfied(ctx.attribute_value) + + @abstractmethod + def _is_satisfied(self, what) -> bool: + """ + Is string conditions satisfied + + :param what: string value to check + :return: True if satisfied else False + """ + raise NotImplementedError() diff --git a/py_abac/_policy/conditions/string/contains.py b/py_abac/_policy/conditions/string/contains.py new file mode 100644 index 0000000..f48ff00 --- /dev/null +++ b/py_abac/_policy/conditions/string/contains.py @@ -0,0 +1,18 @@ +""" + String contains conditions +""" + +from .base import StringCondition + + +class Contains(StringCondition): + """ + Condition for string `what` contains `value` + """ + # Condition type specifier + condition: str = "Contains" + + def _is_satisfied(self, what) -> bool: + if self.case_insensitive: + return self.value.lower() in what.lower() + return self.value in what diff --git a/py_abac/_policy/conditions/string/ends_with.py b/py_abac/_policy/conditions/string/ends_with.py new file mode 100644 index 0000000..2abed2c --- /dev/null +++ b/py_abac/_policy/conditions/string/ends_with.py @@ -0,0 +1,18 @@ +""" + String ends with conditions +""" + +from .base import StringCondition + + +class EndsWith(StringCondition): + """ + Condition for string `what` ends with `value` + """ + # Condition type specifier + condition: str = "EndsWith" + + def _is_satisfied(self, what) -> bool: + if self.case_insensitive: + return what.lower().endswith(self.value.lower()) + return what.endswith(self.value) diff --git a/py_abac/_policy/conditions/string/equals.py b/py_abac/_policy/conditions/string/equals.py new file mode 100644 index 0000000..bb0a1a9 --- /dev/null +++ b/py_abac/_policy/conditions/string/equals.py @@ -0,0 +1,20 @@ +""" + String equals conditions +""" + +from marshmallow import post_load + +from .base import StringCondition + + +class Equals(StringCondition): + """ + Condition for string `what` equals `value` + """ + # Condition type specifier + condition: str = "Equals" + + def _is_satisfied(self, what) -> bool: + if self.case_insensitive: + return what.lower() == self.value.lower() + return what == self.value diff --git a/py_abac/_policy/conditions/string/not_contains.py b/py_abac/_policy/conditions/string/not_contains.py new file mode 100644 index 0000000..e0625ef --- /dev/null +++ b/py_abac/_policy/conditions/string/not_contains.py @@ -0,0 +1,20 @@ +""" + String not contains conditions +""" + +from marshmallow import post_load + +from .base import StringCondition + + +class NotContains(StringCondition): + """ + Condition for string `what` not contains `value` + """ + # Condition type specifier + condition: str = "NotContains" + + def _is_satisfied(self, what) -> bool: + if self.case_insensitive: + return self.value.lower() not in what.lower() + return self.value not in what diff --git a/py_abac/_policy/conditions/string/not_equals.py b/py_abac/_policy/conditions/string/not_equals.py new file mode 100644 index 0000000..c7b9716 --- /dev/null +++ b/py_abac/_policy/conditions/string/not_equals.py @@ -0,0 +1,20 @@ +""" + String not equals conditions +""" + +from marshmallow import post_load + +from .base import StringCondition + + +class NotEquals(StringCondition): + """ + Condition for string `what` not equals `value` + """ + # Condition type specifier + condition: str = "NotEquals" + + def _is_satisfied(self, what) -> bool: + if self.case_insensitive: + return what.lower() != self.value.lower() + return what != self.value diff --git a/py_abac/_policy/conditions/string/regex_match.py b/py_abac/_policy/conditions/string/regex_match.py new file mode 100644 index 0000000..a214a2e --- /dev/null +++ b/py_abac/_policy/conditions/string/regex_match.py @@ -0,0 +1,49 @@ +""" + String regex match conditions +""" + +import re + +from .base import ConditionBase, LOG, is_string +from pydantic import validator, ValidationError + + +class RegexMatch(ConditionBase): + """ + Condition for string `what` matches regex `value` + """ + # Condition type specifier + condition: str = "RegexMatch" + value: str + + @validator('value') + def is_regex(cls, v): + validate_regex(v) + return v + + def is_satisfied(self, ctx) -> bool: + if not is_string(ctx.attribute_value): + LOG.debug( + "Invalid type '%s' for attribute value at path '%s' for element '%s'." + " Condition not satisfied.", + type(ctx.attribute_value), + ctx.attribute_path, + ctx.ace + ) + return False + return self._is_satisfied(ctx.attribute_value) + + def _is_satisfied(self, what) -> bool: + return re.search(self.value, what) is not None + + +def validate_regex(value): + """ + Validate given regex. Throws ValidationError exception + for invalid regex expressions. + """ + # noinspection PyBroadException + try: + re.compile(value) + except Exception: + raise ValidationError("Invalid regex expression '{}'.".format(value)) diff --git a/py_abac/_policy/conditions/string/starts_with.py b/py_abac/_policy/conditions/string/starts_with.py new file mode 100644 index 0000000..b73138d --- /dev/null +++ b/py_abac/_policy/conditions/string/starts_with.py @@ -0,0 +1,20 @@ +""" + String starts with conditions +""" + +from marshmallow import post_load + +from .base import StringCondition + + +class StartsWith(StringCondition): + """ + Condition for string `what` starts with `value` + """ + # Condition type specifier + condition: str = "StartsWith" + + def _is_satisfied(self, what) -> bool: + if self.case_insensitive: + return what.lower().startswith(self.value.lower()) + return what.startswith(self.value) diff --git a/py_abac/_policy/policy.py b/py_abac/_policy/policy.py new file mode 100644 index 0000000..3414a5f --- /dev/null +++ b/py_abac/_policy/policy.py @@ -0,0 +1,64 @@ +""" + Policy class +""" + +import enum + +from .rules import Rules +from .targets import Targets +from ..context import EvaluationContext +from ..exceptions import PolicyCreateError + +from pydantic import BaseModel, conint, ValidationError + + +class Access(enum.Enum): + """ + Access decisions + """ + DENY_ACCESS = "deny" + ALLOW_ACCESS = "allow" + + +class Policy(BaseModel): + """ + Policy class containing rules and targets + """ + uid: str + description: str = "" + rules: Rules + targets: Targets + effect: Access + priority: conint(ge=0) = 0 + + @classmethod + def from_json(cls, data: dict) -> "Policy": + """ + Create Policy object from JSON + """ + try: + return cls.parse_obj(data) + except ValidationError as err: + raise PolicyCreateError(err.json()) + + def to_json(self): + """ + Convert policy object to JSON + """ + return self.dict() + + def fits(self, ctx: EvaluationContext) -> bool: + """ + Check if the request fits policy + + :param ctx: evaluation context + :return: True if fits else False + """ + return self.rules.is_satisfied(ctx) and self.targets.match(ctx) + + @property + def is_allowed(self) -> bool: + """ + Check if access is allowed + """ + return self.effect == Access.ALLOW_ACCESS diff --git a/py_abac/_policy/rules.py b/py_abac/_policy/rules.py new file mode 100644 index 0000000..3e6875f --- /dev/null +++ b/py_abac/_policy/rules.py @@ -0,0 +1,97 @@ +""" + Policy rules class +""" + +from typing import Union, List, Dict + +from pydantic import BaseModel + +from .conditions.field import Condition +from ..context import EvaluationContext + + +class ObjectPathField: + """ + ObjectPath field + """ + + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, v): + if not isinstance(v, str): + raise TypeError("str type in ObjectPath notion expected") + return v + + +class Rules(BaseModel): + """ + Policy rules + """ + subject: Union[ + Dict[ObjectPathField, Condition], + List[Dict[ObjectPathField, Condition]] + ] + resource: Union[ + Dict[ObjectPathField, Condition], + List[Dict[ObjectPathField, Condition]] + ] + action: Union[ + Dict[ObjectPathField, Condition], + List[Dict[ObjectPathField, Condition]] + ] + context: Union[ + Dict[ObjectPathField, Condition], + List[Dict[ObjectPathField, Condition]] + ] + + def is_satisfied(self, ctx: EvaluationContext): + """ + Check if request satisfies all conditions + + :param ctx: policy evaluation context + :return: True if satisfied else False + """ + return self._is_satisfied("subject", self.subject, ctx) and \ + self._is_satisfied("resource", self.resource, ctx) and \ + self._is_satisfied("action", self.action, ctx) and \ + self._is_satisfied("context", self.context, ctx) + + def _is_satisfied(self, ace_name: str, ace_conditions, ctx: EvaluationContext): + """ + Check if the access control element satisfies request + + :param ace_name: access control element name + :param ace_conditions: access control element conditions + :param ctx: policy evaluation context + :return: True if satisfied else False + """ + if isinstance(ace_conditions, list): + return self._implicit_or(ace_name, ace_conditions, ctx) + if isinstance(ace_conditions, dict): + return self._implicit_and(ace_name, ace_conditions, ctx) + + # If ace is not in correct format, return False. This condition is just for best + # practice and will never happen + return False # pragma: no cover + + def _implicit_or(self, ace_name: str, ace_conditions: list, ctx: EvaluationContext): + for _ace_conditions in ace_conditions: + # If even one of the conditions is satisfied, return True + if self._implicit_and(ace_name, _ace_conditions, ctx): + return True + # If no conditions are satisfied, return False + return False + + @staticmethod + def _implicit_and(ace_name: str, ace_conditions: dict, ctx: EvaluationContext): + for attribute_path, condition in ace_conditions.items(): + ctx.ace = ace_name + ctx.attribute_path = attribute_path + # If even one of the conditions is not satisfied, return False + if not condition.is_satisfied(ctx): + return False + # If all conditions are satisfied, return True + return True diff --git a/py_abac/_policy/targets.py b/py_abac/_policy/targets.py new file mode 100644 index 0000000..ad0d145 --- /dev/null +++ b/py_abac/_policy/targets.py @@ -0,0 +1,42 @@ +""" + Policy targets class +""" + +import fnmatch +from typing import Union, List + +from pydantic import BaseModel, constr + +from ..context import EvaluationContext + + +class Targets(BaseModel): + """ + Policy targets + """ + subject_id: Union[constr(min_length=1), List[constr(min_length=1)]] = "*" + resource_id: Union[constr(min_length=1), List[constr(min_length=1)]] = "*" + action_id: Union[constr(min_length=1), List[constr(min_length=1)]] = "*" + + def match(self, ctx: EvaluationContext): + """ + Check if request matches policy targets + + :param ctx: policy evaluation context + :return: True if matches else False + """ + return self._is_in(self.subject_id, ctx.subject_id) and \ + self._is_in(self.resource_id, ctx.resource_id) and \ + self._is_in(self.action_id, ctx.action_id) + + @staticmethod + def _is_in(ace_ids, ace_id: str): + """ + Returns True if `ace_id` is in `ace_ids`. + """ + _ace_ids = ace_ids if isinstance(ace_ids, list) else [ace_ids] + for _id in _ace_ids: + # Unix file name type string matching + if fnmatch.fnmatch(ace_id, _id): + return True + return False diff --git a/setup.py b/setup.py index 0b76291..5083749 100644 --- a/setup.py +++ b/setup.py @@ -59,6 +59,7 @@ install_requires=[ 'marshmallow~=3.2', 'marshmallow-oneofschema~=2.0', + 'pydantic~=1.8.1', 'objectpath~=0.6', 'lru-dict~=1.1' ], diff --git a/tests/test__policy/__init__.py b/tests/test__policy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test__policy/test_conditions/__init__.py b/tests/test__policy/test_conditions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test__policy/test_conditions/test_attribute.py b/tests/test__policy/test_conditions/test_attribute.py new file mode 100644 index 0000000..c193d57 --- /dev/null +++ b/tests/test__policy/test_conditions/test_attribute.py @@ -0,0 +1,269 @@ +""" + Attribute conditions tests +""" + +import pytest +from pydantic import ValidationError + +from py_abac._policy.conditions.attribute import AllInAttribute +from py_abac._policy.conditions.attribute import AllNotInAttribute +from py_abac._policy.conditions.attribute import AnyInAttribute +from py_abac._policy.conditions.attribute import AnyNotInAttribute +from py_abac._policy.conditions.attribute import EqualsAttribute +from py_abac._policy.conditions.attribute import IsInAttribute +from py_abac._policy.conditions.attribute import IsNotInAttribute +from py_abac._policy.conditions.attribute import NotEqualsAttribute +from py_abac.context import EvaluationContext +from py_abac.request import AccessRequest + + +class TestAttributeCondition(object): + + @pytest.mark.parametrize("condition, condition_json", [ + ( + EqualsAttribute(ace="subject", path="$.name"), + {"condition": "EqualsAttribute", "ace": "subject", "path": "$.name"} + ), + ( + NotEqualsAttribute(ace="subject", path="$.name"), + {"condition": "NotEqualsAttribute", "ace": "subject", "path": "$.name"} + ), + ( + IsInAttribute(ace="subject", path="$.teams"), + {"condition": "IsInAttribute", "ace": "subject", "path": "$.teams"} + ), + ( + IsNotInAttribute(ace="subject", path="$.teams"), + {"condition": "IsNotInAttribute", "ace": "subject", "path": "$.teams"} + ), + ( + AllInAttribute(ace="subject", path="$.name"), + {"condition": "AllInAttribute", "ace": "subject", "path": "$.name"} + ), + ( + AllNotInAttribute(ace="subject", path="$.name"), + {"condition": "AllNotInAttribute", "ace": "subject", "path": "$.name"} + ), + ( + AnyInAttribute(ace="subject", path="$.name"), + {"condition": "AnyInAttribute", "ace": "subject", "path": "$.name"} + ), + ( + AnyNotInAttribute(ace="subject", path="$.name"), + {"condition": "AnyNotInAttribute", "ace": "subject", "path": "$.name"} + ), + ]) + def test_to_json(self, condition, condition_json): + assert condition.dict() == condition_json + + @pytest.mark.parametrize("condition, condition_json", [ + ( + EqualsAttribute(ace="subject", path="$.name"), + {"condition": "EqualsAttribute", "ace": "subject", "path": "$.name"} + ), + ( + NotEqualsAttribute(ace="subject", path="$.name"), + {"condition": "NotEqualsAttribute", "ace": "subject", "path": "$.name"} + ), + ( + IsInAttribute(ace="subject", path="$.teams"), + {"condition": "IsInAttribute", "ace": "subject", "path": "$.teams"} + ), + ( + IsNotInAttribute(ace="subject", path="$.teams"), + {"condition": "IsNotInAttribute", "ace": "subject", "path": "$.teams"} + ), + ( + AllInAttribute(ace="subject", path="$.name"), + {"condition": "AllInAttribute", "ace": "subject", "path": "$.name"} + ), + ( + AllNotInAttribute(ace="subject", path="$.name"), + {"condition": "AllNotInAttribute", "ace": "subject", "path": "$.name"} + ), + ( + AnyInAttribute(ace="subject", path="$.name"), + {"condition": "AnyInAttribute", "ace": "subject", "path": "$.name"} + ), + ( + AnyNotInAttribute(ace="subject", path="$.name"), + {"condition": "AnyNotInAttribute", "ace": "subject", "path": "$.name"} + ), + ]) + def test_from_json(self, condition, condition_json): + new_condition = condition.__class__.parse_obj(condition_json) + for attr in condition.__dict__: + assert getattr(new_condition, attr) == getattr(condition, attr) + + @pytest.mark.parametrize("condition_type, data", [ + (EqualsAttribute, {"condition": "EqualsAttribute", "ace": "test", "path": "$.name"}), + (EqualsAttribute, {"condition": "EqualsAttribute", "ace": "subject", "path": ")"}), + (EqualsAttribute, {"condition": "EqualsAttribute", "ace": "subject", "path": None}), + + (NotEqualsAttribute, {"condition": "NotEqualsAttribute", "ace": "test", "path": "$.name"}), + (NotEqualsAttribute, {"condition": "NotEqualsAttribute", "ace": "subject", "path": ")"}), + (NotEqualsAttribute, {"condition": "NotEqualsAttribute", "ace": "subject", "path": None}), + + (IsInAttribute, {"condition": "IsInAttribute", "ace": "test", "path": "$.name"}), + (IsInAttribute, {"condition": "IsInAttribute", "ace": "subject", "path": ")"}), + (IsInAttribute, {"condition": "IsInAttribute", "ace": "subject", "path": None}), + + (IsNotInAttribute, {"condition": "IsNotInAttribute", "ace": "test", "path": "$.name"}), + (IsNotInAttribute, {"condition": "IsNotInAttribute", "ace": "subject", "path": ")"}), + (IsNotInAttribute, {"condition": "IsNotInAttribute", "ace": "subject", "path": None}), + + (AllInAttribute, {"condition": "AllInAttribute", "ace": "test", "path": "$.name"}), + (AllInAttribute, {"condition": "AllInAttribute", "ace": "subject", "path": ")"}), + (AllInAttribute, {"condition": "AllInAttribute", "ace": "subject", "path": None}), + + (AllNotInAttribute, {"condition": "AllNotInAttribute", "ace": "test", "path": "$.name"}), + (AllNotInAttribute, {"condition": "AllNotInAttribute", "ace": "subject", "path": ")"}), + (AllNotInAttribute, {"condition": "AllNotInAttribute", "ace": "subject", "path": None}), + + (AnyInAttribute, {"condition": "AnyInAttribute", "ace": "test", "path": "$.name"}), + (AnyInAttribute, {"condition": "AnyInAttribute", "ace": "subject", "path": ")"}), + (AnyInAttribute, {"condition": "AnyInAttribute", "ace": "subject", "path": None}), + + (AnyNotInAttribute, {"condition": "AnyNotInAttribute", "ace": "test", "path": "$.name"}), + (AnyNotInAttribute, {"condition": "AnyNotInAttribute", "ace": "subject", "path": ")"}), + (AnyNotInAttribute, {"condition": "AnyNotInAttribute", "ace": "subject", "path": None}), + ]) + def test_create_error(self, condition_type, data): + with pytest.raises(ValidationError): + condition_type.parse_obj(data) + + @pytest.mark.parametrize("condition, what, result", [ + (EqualsAttribute(ace="subject", path="$.what"), "test", True), + (EqualsAttribute(ace="resource", path="$.what"), "test", False), + (EqualsAttribute(ace="resource", path="$.*"), "test", False), + (EqualsAttribute(ace="resource", path="$.name"), "test", False), + (EqualsAttribute(ace="resource", path="$.name.what"), {"test": True}, True), + ]) + def test_is_satisfied_equals_attribute(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, + resource={"attributes": {"name": {"what": what}}}, + action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result + + @pytest.mark.parametrize("condition, what, result", [ + (NotEqualsAttribute(ace="subject", path="$.what"), "test", False), + (NotEqualsAttribute(ace="resource", path="$.what"), "test", True), + (NotEqualsAttribute(ace="resource", path="$.*"), "test", True), + (NotEqualsAttribute(ace="resource", path="$.name"), "test", True), + (NotEqualsAttribute(ace="resource", path="$.name.what"), {"test": True}, False), + ]) + def test_is_satisfied_not_equals_attribute(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, + resource={"attributes": {"name": {"what": what}}}, + action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result + + @pytest.mark.parametrize("condition, what, result", [ + (IsInAttribute(ace="subject", path="$.what"), "test", False), + (IsInAttribute(ace="resource", path="$.what"), "test", False), + (IsInAttribute(ace="resource", path="$.*"), "test", False), + (IsInAttribute(ace="resource", path="$.name"), "test", False), + (IsInAttribute(ace="resource", path="$.name.what"), "test", True), + ]) + def test_is_satisfied_is_in_attribute(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, + resource={"attributes": {"name": {"what": ["test"]}}}, + action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result + + @pytest.mark.parametrize("condition, what, result", [ + (IsNotInAttribute(ace="subject", path="$.what"), "test", False), + (IsNotInAttribute(ace="resource", path="$.what"), "test", False), + (IsNotInAttribute(ace="resource", path="$.*"), "test", False), + (IsNotInAttribute(ace="resource", path="$.name"), "test", False), + (IsNotInAttribute(ace="resource", path="$.name.what"), "test", True), + ]) + def test_is_satisfied_is_not_in_attribute(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, + resource={"attributes": {"name": {"what": ["test-2"]}}}, + action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result + + @pytest.mark.parametrize("condition, what, result", [ + (AllInAttribute(ace="subject", path="$.what"), "test", False), + (AllInAttribute(ace="subject", path="$.what"), ["test"], True), + (AllInAttribute(ace="resource", path="$.what"), ["test"], False), + (AllInAttribute(ace="resource", path="$.*"), ["test"], False), + (AllInAttribute(ace="resource", path="$.name"), ["test"], False), + (AllInAttribute(ace="resource", path="$.name.what"), ["test_1"], True), + (AllInAttribute(ace="resource", path="$.name.what"), ["test_1", "test_2"], False), + ]) + def test_is_satisfied_all_in_attribute(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, + resource={"attributes": {"name": {"what": ["test_1"]}}}, + action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result + + @pytest.mark.parametrize("condition, what, result", [ + (AllNotInAttribute(ace="subject", path="$.what"), "test", False), + (AllNotInAttribute(ace="subject", path="$.what"), ["test"], False), + (AllNotInAttribute(ace="resource", path="$.what"), ["test"], False), + (AllNotInAttribute(ace="resource", path="$.*"), ["test"], False), + (AllNotInAttribute(ace="resource", path="$.name"), ["test"], False), + (AllNotInAttribute(ace="resource", path="$.name.what"), ["test_1"], False), + (AllNotInAttribute(ace="resource", path="$.name.what"), ["test_1", "test_2"], True), + ]) + def test_is_satisfied_all_not_in_attribute(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, + resource={"attributes": {"name": {"what": ["test_1"]}}}, + action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result + + @pytest.mark.parametrize("condition, what, result", [ + (AnyInAttribute(ace="subject", path="$.what"), "test", False), + (AnyInAttribute(ace="subject", path="$.what"), ["test"], True), + (AnyInAttribute(ace="resource", path="$.what"), ["test"], False), + (AnyInAttribute(ace="resource", path="$.*"), ["test"], False), + (AnyInAttribute(ace="resource", path="$.name"), ["test"], False), + (AnyInAttribute(ace="resource", path="$.name.what"), ["test_1"], True), + (AnyInAttribute(ace="resource", path="$.name.what"), ["test_1", "test_2"], True), + ]) + def test_is_satisfied_any_in_attribute(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, + resource={"attributes": {"name": {"what": ["test_1"]}}}, + action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result + + @pytest.mark.parametrize("condition, what, result", [ + (AnyNotInAttribute(ace="subject", path="$.what"), "test", False), + (AnyNotInAttribute(ace="subject", path="$.what"), ["test"], False), + (AnyNotInAttribute(ace="resource", path="$.what"), ["test"], False), + (AnyNotInAttribute(ace="resource", path="$.*"), ["test"], False), + (AnyNotInAttribute(ace="resource", path="$.name"), ["test"], False), + (AnyNotInAttribute(ace="resource", path="$.name.what"), ["test_3"], True), + (AnyNotInAttribute(ace="resource", path="$.name.what"), ["test_1", "test_2"], False), + ]) + def test_is_satisfied_any_not_in_attribute(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, + resource={"attributes": {"name": {"what": ["test_1"]}}}, + action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result diff --git a/tests/test__policy/test_conditions/test_collection.py b/tests/test__policy/test_conditions/test_collection.py new file mode 100644 index 0000000..04b0435 --- /dev/null +++ b/tests/test__policy/test_conditions/test_collection.py @@ -0,0 +1,124 @@ +""" + Collection condition tests +""" + +import pytest +from pydantic import ValidationError + +from py_abac._policy.conditions.collection import AllIn +from py_abac._policy.conditions.collection import AllNotIn +from py_abac._policy.conditions.collection import AnyIn +from py_abac._policy.conditions.collection import AnyNotIn +from py_abac._policy.conditions.collection import IsEmpty +from py_abac._policy.conditions.collection import IsIn +from py_abac._policy.conditions.collection import IsNotEmpty +from py_abac._policy.conditions.collection import IsNotIn +from py_abac.context import EvaluationContext +from py_abac.request import AccessRequest + + +class TestCollectionCondition(object): + + @pytest.mark.parametrize("condition, condition_json", [ + (AllIn(values=[2]), {"condition": "AllIn", "values": [2]}), + (AllNotIn(values=[{"test": 2}]), {"condition": "AllNotIn", "values": [{"test": 2}]}), + (AnyIn(values=[2, {"test": 2}]), {"condition": "AnyIn", "values": [2, {"test": 2}]}), + (AnyNotIn(values=[2, {"test": 2}, []]), {"condition": "AnyNotIn", "values": [2, {"test": 2}, []]}), + (IsIn(values=[2]), {"condition": "IsIn", "values": [2]}), + (IsNotIn(values=[2]), {"condition": "IsNotIn", "values": [2]}), + (IsEmpty(), {"condition": "IsEmpty"}), + (IsNotEmpty(), {"condition": "IsNotEmpty"}), + ]) + def test_to_json(self, condition, condition_json): + assert condition.dict() == condition_json + + @pytest.mark.parametrize("condition, condition_json", [ + (AllIn(values=[2]), {"condition": "AllIn", "values": [2]}), + (AllNotIn(values=[{"test": 2}]), {"condition": "AllNotIn", "values": [{"test": 2}]}), + (AnyIn(values=[2, {"test": 2}]), {"condition": "AnyIn", "values": [2, {"test": 2}]}), + (AnyNotIn(values=[2, {"test": 2}, []]), {"condition": "AnyNotIn", "values": [2, {"test": 2}, []]}), + (IsIn(values=[2]), {"condition": "IsIn", "values": [2]}), + (IsNotIn(values=[2]), {"condition": "IsNotIn", "values": [2]}), + (IsEmpty(), {"condition": "IsEmpty"}), + (IsNotEmpty(), {"condition": "IsNotEmpty"}), + ]) + def test_from_json(self, condition, condition_json): + new_condition = condition.__class__.parse_obj(condition_json) + for attr in condition.__dict__: + assert getattr(new_condition, attr) == getattr(condition, attr) + + @pytest.mark.parametrize("condition_type, data", [ + (AllIn, {"condition": "AllIn", "values": "test"}), + (AllNotIn, {"condition": "AllNotIn", "values": 1}), + (AnyIn, {"condition": "AnyIn", "values": {}}), + (AnyNotIn, {"condition": "AnyNotIn", "values": None}), + (IsIn, {"condition": "IsIn", "values": 1.0}), + (IsNotIn, {"condition": "IsNotIn", "values": object}), + (IsEmpty, {"condition": "IsEmpty", "values": []}), + (IsNotEmpty, {"condition": "IsNotEmpty", "values": 1}), + ]) + def test_create_error(self, condition_type, data): + with pytest.raises(ValidationError): + condition_type.parse_obj(data) + + @pytest.mark.parametrize("condition, what, result", [ + (AllIn(values=[]), 1, False), + (AllIn(values=[]), [], True), + (AllIn(values=[2]), [], True), + (AllIn(values=[2]), [2], True), + (AllIn(values=[2]), [1, 2], False), + (AllIn(values=[3, 2]), [1, 2], False), + (AllIn(values=[1, 2, 3]), [1, 2], True), + (AllIn(values=[1, 2, 3]), None, False), + + (AllNotIn(values=[]), 1, False), + (AllNotIn(values=[]), [], False), + (AllNotIn(values=[2]), [], False), + (AllNotIn(values=[2]), [2], False), + (AllNotIn(values=[2]), [1, 2], True), + (AllNotIn(values=[3, 2]), [1, 2], True), + (AllNotIn(values=[1, 2, 3]), [1, 2], False), + (AllNotIn(values=[1, 2, 3]), None, False), + + (AnyIn(values=[]), 1, False), + (AnyIn(values=[]), [], False), + (AnyIn(values=[2]), [], False), + (AnyIn(values=[2]), [2], True), + (AnyIn(values=[2]), [1, 2], True), + (AnyIn(values=[3, 2]), [1, 4], False), + (AnyIn(values=[1, 2, 3]), [1, 2], True), + (AnyIn(values=[1, 2, 3]), None, False), + + (AnyNotIn(values=[]), 1, False), + (AnyNotIn(values=[]), [], True), + (AnyNotIn(values=[2]), [], True), + (AnyNotIn(values=[2]), [2], False), + (AnyNotIn(values=[2]), [1, 2], False), + (AnyNotIn(values=[3, 2]), [1, 4], True), + (AnyNotIn(values=[1, 2, 3]), [1, 2], False), + (AnyNotIn(values=[1, 2, 3]), None, False), + + (IsIn(values=[]), [], False), + (IsIn(values=[1, 2, 3]), 1, True), + (IsIn(values=[1, 2, 3]), 4, False), + (IsIn(values=[1, 2, 3]), None, False), + + (IsNotIn(values=[]), [], True), + (IsNotIn(values=[1, 2, 3]), 1, False), + (IsNotIn(values=[1, 2, 3]), 4, True), + (IsNotIn(values=[1, 2, 3]), None, True), + + (IsEmpty(), [], True), + (IsEmpty(), [1], False), + (IsEmpty(), None, False), + + (IsNotEmpty(), [], False), + (IsNotEmpty(), [1], True), + (IsNotEmpty(), None, False), + ]) + def test_is_satisfied(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, resource={}, action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result diff --git a/tests/test__policy/test_conditions/test_logic.py b/tests/test__policy/test_conditions/test_logic.py new file mode 100644 index 0000000..25cbe12 --- /dev/null +++ b/tests/test__policy/test_conditions/test_logic.py @@ -0,0 +1,110 @@ +""" + Logic condition tests +""" + +import pytest +from pydantic import ValidationError + +from py_abac._policy.conditions.logic import AllOf +from py_abac._policy.conditions.logic import AnyOf +from py_abac._policy.conditions.logic import Not +from py_abac._policy.conditions.numeric import Gt, Lt +from py_abac.context import EvaluationContext +from py_abac.request import AccessRequest + + +class TestLogicCondition(object): + + @pytest.mark.parametrize("condition, condition_json", [ + (AllOf(values=[Gt(value=0.0), Lt(value=1.0)]), + {"condition": "AllOf", "values": [ + {"condition": "Gt", "value": 0.0}, + {"condition": "Lt", "value": 1.0} + ]}), + + (AnyOf(values=[Gt(value=0.0), Lt(value=1.0)]), + {"condition": "AnyOf", "values": [ + {"condition": "Gt", "value": 0.0}, + {"condition": "Lt", "value": 1.0} + ]}), + + (Not(value=Gt(value=1.0)), + {"condition": "Not", "value": { + "condition": "Gt", "value": 1.0 + }}), + ]) + def test_to_json(self, condition, condition_json): + assert condition.dict() == condition_json + + def test_from_json_and(self): + condition = AllOf(values=[Gt(value=0.0), Lt(value=1.0)]) + condition_json = { + "condition": "AllOf", "values": [ + {"condition": "Gt", "value": 0.0}, + {"condition": "Lt", "value": 1.0} + ]} + new_condition = AllOf.parse_obj(condition_json) + assert isinstance(new_condition, AllOf) + assert len(condition.values) == len(new_condition.values) + assert isinstance(new_condition.values[0], condition.values[0].__class__) + assert new_condition.values[0].value == condition.values[0].value + assert isinstance(new_condition.values[1], condition.values[1].__class__) + assert new_condition.values[1].value == condition.values[1].value + + def test_from_json_or(self): + condition = AnyOf(values=[Gt(value=0.0), Lt(value=1.0)]) + condition_json = { + "condition": "AnyOf", "values": [ + {"condition": "Gt", "value": 0.0}, + {"condition": "Lt", "value": 1.0} + ]} + new_condition = AnyOf.parse_obj(condition_json) + assert isinstance(new_condition, AnyOf) + assert len(condition.values) == len(new_condition.values) + assert isinstance(new_condition.values[0], condition.values[0].__class__) + assert new_condition.values[0].value == condition.values[0].value + assert isinstance(new_condition.values[1], condition.values[1].__class__) + assert new_condition.values[1].value == condition.values[1].value + + def test_from_json_not(self): + condition = Not(value=Gt(value=1.0)) + condition_json = { + "condition": "Not", "value": { + "condition": "Gt", "value": 1.0 + }} + new_condition = Not.parse_obj(condition_json) + assert isinstance(new_condition, Not) + assert isinstance(new_condition.value, condition.value.__class__) + assert new_condition.value.value == condition.value.value + + @pytest.mark.parametrize("condition_type, condition_json", [ + (AllOf, {"condition": "AllOf", "values": []}), + (AllOf, {"condition": "AllOf", "values": None}), + (AllOf, {"condition": "AllOf", "values": [None]}), + (AnyOf, {"condition": "AnyOf", "values": []}), + (AnyOf, {"condition": "AnyOf", "values": None}), + (AnyOf, {"condition": "AnyOf", "values": [None]}), + (Not, {"condition": "Not", "values": 1.0}), + ]) + def test_create_error(self, condition_type, condition_json): + with pytest.raises(ValidationError): + condition_type.parse_obj(condition_json) + + @pytest.mark.parametrize("condition, what, result", [ + (AllOf(values=[Gt(value=0.0), Lt(value=1.0)]), -1.5, False), + (AllOf(values=[Gt(value=0.0), Lt(value=1.0)]), 0.5, True), + (AllOf(values=[Gt(value=0.0), Lt(value=1.0)]), 1.5, False), + + (AnyOf(values=[Gt(value=1.0), Lt(value=0.0)]), -1.5, True), + (AnyOf(values=[Gt(value=1.0), Lt(value=0.0)]), 0.5, False), + (AnyOf(values=[Gt(value=1.0), Lt(value=0.0)]), 1.5, True), + + (Not(value=Gt(value=1.0)), 0.5, True), + (Not(value=Gt(value=1.0)), 1.5, False), + ]) + def test_is_satisfied(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, resource={}, action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result diff --git a/tests/test__policy/test_conditions/test_numeric.py b/tests/test__policy/test_conditions/test_numeric.py new file mode 100644 index 0000000..3f2edfc --- /dev/null +++ b/tests/test__policy/test_conditions/test_numeric.py @@ -0,0 +1,106 @@ +""" + Numeric condition tests +""" + +import pytest +from pydantic import ValidationError + +from py_abac._policy.conditions.numeric import Eq +from py_abac._policy.conditions.numeric import Gt +from py_abac._policy.conditions.numeric import Gte +from py_abac._policy.conditions.numeric import Lt +from py_abac._policy.conditions.numeric import Lte +from py_abac._policy.conditions.numeric import Neq +from py_abac.context import EvaluationContext +from py_abac.request import AccessRequest + + +class TestNumericCondition(object): + + @pytest.mark.parametrize("condition, condition_json", [ + (Eq(value=2), {"condition": "Eq", "value": 2}), + (Eq(value=3.0), {"condition": "Eq", "value": 3.0}), + (Gt(value=2), {"condition": "Gt", "value": 2}), + (Gt(value=3.0), {"condition": "Gt", "value": 3.0}), + (Lt(value=2), {"condition": "Lt", "value": 2}), + (Lt(value=3.0), {"condition": "Lt", "value": 3.0}), + (Gte(value=2), {"condition": "Gte", "value": 2}), + (Gte(value=3.0), {"condition": "Gte", "value": 3.0}), + (Lte(value=2), {"condition": "Lte", "value": 2}), + (Lte(value=3.0), {"condition": "Lte", "value": 3.0}), + (Neq(value=2), {"condition": "Neq", "value": 2}), + (Neq(value=3.0), {"condition": "Neq", "value": 3.0}), + ]) + def test_to_json(self, condition, condition_json): + assert condition.dict() == condition_json + + @pytest.mark.parametrize("condition_type, value, condition_json", [ + (Eq, 2, {"condition": "Eq", "value": 2}), + (Eq, 3.0, {"condition": "Eq", "value": 3.0}), + (Gt, 2, {"condition": "Gt", "value": 2}), + (Gt, 3.0, {"condition": "Gt", "value": 3.0}), + (Lt, 2, {"condition": "Lt", "value": 2}), + (Lt, 3.0, {"condition": "Lt", "value": 3.0}), + (Gte, 2, {"condition": "Gte", "value": 2}), + (Gte, 3.0, {"condition": "Gte", "value": 3.0}), + (Lte, 2, {"condition": "Lte", "value": 2}), + (Lte, 3.0, {"condition": "Lte", "value": 3.0}), + (Neq, 2, {"condition": "Neq", "value": 2}), + (Neq, 3.0, {"condition": "Neq", "value": 3.0}), + ]) + def test_from_json(self, condition_type, value, condition_json): + condition = condition_type.parse_obj(condition_json) + assert condition.value == value + + @pytest.mark.parametrize("condition_type, data", [ + (Eq, {"condition": "Eq", "value": "test"}), + (Gt, {"condition": "Gt", "value": []}), + (Lt, {"condition": "Lt", "value": {}}), + (Gte, {"condition": "Gte", "value": None}), + (Lte, {"condition": "Lte", "value": {1, }}), + (Neq, {"condition": "Neq", "value": ()}), + ]) + def test_create_error(self, condition_type, data): + with pytest.raises(ValidationError): + condition_type.parse_obj(data) + + @pytest.mark.parametrize("condition, what, result", [ + (Eq(value=2), 2, True), + (Eq(value=2), 2.0, True), + (Eq(value=2.0), 2, True), + (Eq(value=2.0), 2.0, True), + (Eq(value=2), 3.0, False), + (Eq(value=2), None, False), + + (Gt(value=2), 2, False), + (Gt(value=2), 2.1, True), + (Gt(value=2), 1.9, False), + (Gt(value=2), None, False), + + (Gte(value=2), 2, True), + (Gte(value=2), 2.1, True), + (Gte(value=2), 1.9, False), + (Gte(value=2), None, False), + + (Lt(value=2), 2, False), + (Lt(value=2), 2.1, False), + (Lt(value=2), 1.9, True), + (Lt(value=2), None, False), + + (Lte(value=2), 2, True), + (Lte(value=2), 2.1, False), + (Lte(value=2), 1.9, True), + (Lte(value=2), None, False), + + (Neq(value=2), 2, False), + (Neq(value=2.0), 2, False), + (Neq(value=2), 2.0, False), + (Neq(value=2), 1.9, True), + (Neq(value=2), None, False), + ]) + def test_is_satisfied(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, resource={}, action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result diff --git a/tests/test__policy/test_conditions/test_object.py b/tests/test__policy/test_conditions/test_object.py new file mode 100644 index 0000000..bcd1e64 --- /dev/null +++ b/tests/test__policy/test_conditions/test_object.py @@ -0,0 +1,58 @@ +""" + Object condition tests +""" + +import pytest +from pydantic import ValidationError + +from py_abac._policy.conditions.object import EqualsObject +from py_abac.context import EvaluationContext +from py_abac.request import AccessRequest + + +class TestCollectionCondition(object): + + @pytest.mark.parametrize("condition, condition_json", [ + (EqualsObject(value={}), {"condition": "EqualsObject", "value": {}}), + (EqualsObject(value={"test": 2}), {"condition": "EqualsObject", "value": {"test": 2}}), + ]) + def test_to_json(self, condition, condition_json): + assert condition.dict() == condition_json + + @pytest.mark.parametrize("condition, condition_json", [ + (EqualsObject(value={}), {"condition": "EqualsObject", "value": {}}), + (EqualsObject(value={"test": 2}), {"condition": "EqualsObject", "value": {"test": 2}}), + ]) + def test_from_json(self, condition, condition_json): + new_condition = condition.__class__.parse_obj(condition_json) + for attr in condition.__dict__: + assert getattr(new_condition, attr) == getattr(condition, attr) + + @pytest.mark.parametrize("data", [ + {"condition": "EqualsObject", "value": None}, + {"condition": "EqualsObject", "value": 2}, + ]) + def test_create_error(self, data): + with pytest.raises(ValidationError): + EqualsObject.parse_obj(data) + + @pytest.mark.parametrize("condition, what, result", [ + (EqualsObject(value={}), + {}, + True), + (EqualsObject(value={'a': {'b': [1, {'c': 1}], 'd': 'test'}, 'e': []}), + {'a': {'b': [1, {'c': 1}], 'd': 'test'}, 'e': []}, + True), + (EqualsObject(value={'a': {'b': [1, {'c': 1}], 'd': 'test'}, 'e': []}), + {'a': {'b': [1], 'd': 'test'}, 'e': []}, + False), + (EqualsObject(value={'a': {'b': [1, {'c': 1}], 'd': 'test'}, 'e': []}), + {'a': {'b': [1, {'c': 1}], 'd': 'test'}}, + False), + ]) + def test_is_satisfied(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, resource={}, action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result diff --git a/tests/test__policy/test_conditions/test_others.py b/tests/test__policy/test_conditions/test_others.py new file mode 100644 index 0000000..49bb137 --- /dev/null +++ b/tests/test__policy/test_conditions/test_others.py @@ -0,0 +1,69 @@ +""" + Other condition tests +""" + +import pytest +from pydantic import ValidationError + +from py_abac._policy.conditions.others import Any +from py_abac._policy.conditions.others import CIDR +from py_abac._policy.conditions.others import Exists +from py_abac._policy.conditions.others import NotExists +from py_abac.context import EvaluationContext +from py_abac.request import AccessRequest + + +class TestOtherCondition(object): + + @pytest.mark.parametrize("condition, condition_json", [ + (CIDR(value="127.0.0.0/16"), {"condition": "CIDR", "value": "127.0.0.0/16"}), + (Exists(), {"condition": "Exists"}), + (NotExists(), {"condition": "NotExists"}), + (Any(), {"condition": "Any"}), + ]) + def test_to_json(self, condition, condition_json): + assert condition.dict() == condition_json + + @pytest.mark.parametrize("condition, condition_json", [ + (CIDR(value="127.0.0.0/16"), {"condition": "CIDR", "value": "127.0.0.0/16"}), + (Exists(), {"condition": "Exists"}), + (NotExists(), {"condition": "NotExists"}), + (Any(), {"condition": "Any"}), + ]) + def test_from_json(self, condition, condition_json): + new_condition = condition.__class__.parse_obj(condition_json) + for attr in condition.__dict__: + assert getattr(new_condition, attr) == getattr(condition, attr) + + @pytest.mark.parametrize("data", [ + {"condition": "CIDR", "value": 1.0}, + ]) + def test_create_error(self, data): + with pytest.raises(ValidationError): + CIDR.parse_obj(data) + + @pytest.mark.parametrize("condition, what, result", [ + (CIDR(value="127.0.0.0/24"), "10.0.0.0", False), + (CIDR(value="127.0.0.0/24"), "127.0.0.1", True), + (CIDR(value="127.0.0.0/24"), ")", False), + (CIDR(value="127.0.0.0/24"), None, False), + + (Exists(), None, False), + (Exists(), 1.0, True), + + (NotExists(), None, True), + (NotExists(), 1.0, False), + + (Any(), None, True), + (Any(), 1.0, True), + (Any(), {"value": 1.0}, True), + (Any(), [1.0, 2.0, "a"], True), + ]) + def test_is_satisfied(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, + resource={"attributes": {"name": {"what": what}}}, + action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result diff --git a/tests/test__policy/test_conditions/test_string.py b/tests/test__policy/test_conditions/test_string.py new file mode 100644 index 0000000..8251c4e --- /dev/null +++ b/tests/test__policy/test_conditions/test_string.py @@ -0,0 +1,143 @@ +""" + String condition tests +""" + +import pytest +from pydantic import ValidationError + +from py_abac._policy.conditions.string import Contains +from py_abac._policy.conditions.string import EndsWith +from py_abac._policy.conditions.string import Equals +from py_abac._policy.conditions.string import NotContains +from py_abac._policy.conditions.string import NotEquals +from py_abac._policy.conditions.string import RegexMatch +from py_abac._policy.conditions.string import StartsWith +from py_abac.context import EvaluationContext +from py_abac.request import AccessRequest + + +class TestStringCondition(object): + + @pytest.mark.parametrize("condition, condition_json", [ + (Contains(value="2"), {"condition": "Contains", "value": "2", "case_insensitive": False}), + (Contains(value="2", case_insensitive=True), + {"condition": "Contains", "value": "2", "case_insensitive": True}), + (NotContains(value="2"), {"condition": "NotContains", "value": "2", "case_insensitive": False}), + (NotContains(value="2", case_insensitive=True), + {"condition": "NotContains", "value": "2", "case_insensitive": True}), + (Equals(value="2"), {"condition": "Equals", "value": "2", "case_insensitive": False}), + (Equals(value="2", case_insensitive=True), + {"condition": "Equals", "value": "2", "case_insensitive": True}), + (NotEquals(value="2"), {"condition": "NotEquals", "value": "2", "case_insensitive": False}), + (NotEquals(value="2", case_insensitive=True), + {"condition": "NotEquals", "value": "2", "case_insensitive": True}), + (StartsWith(value="2"), {"condition": "StartsWith", "value": "2", "case_insensitive": False}), + (StartsWith(value="2", case_insensitive=True), + {"condition": "StartsWith", "value": "2", "case_insensitive": True}), + (EndsWith(value="2"), {"condition": "EndsWith", "value": "2", "case_insensitive": False}), + (EndsWith(value="2", case_insensitive=True), + {"condition": "EndsWith", "value": "2", "case_insensitive": True}), + (RegexMatch(value="2"), {"condition": "RegexMatch", "value": "2"}), + ]) + def test_to_json(self, condition, condition_json): + assert condition.dict() == condition_json + + @pytest.mark.parametrize("condition, condition_json", [ + (Contains(value="2"), {"condition": "Contains", "value": "2", "case_insensitive": False}), + (Contains(value="2", case_insensitive=True), + {"condition": "Contains", "value": "2", "case_insensitive": True}), + (NotContains(value="2"), {"condition": "NotContains", "value": "2", "case_insensitive": False}), + (NotContains(value="2", case_insensitive=True), + {"condition": "NotContains", "value": "2", "case_insensitive": True}), + (Equals(value="2"), {"condition": "Equals", "value": "2", "case_insensitive": False}), + (Equals(value="2", case_insensitive=True), + {"condition": "Equals", "value": "2", "case_insensitive": True}), + (NotEquals(value="2"), {"condition": "NotEquals", "value": "2", "case_insensitive": False}), + (NotEquals(value="2", case_insensitive=True), + {"condition": "NotEquals", "value": "2", "case_insensitive": True}), + (StartsWith(value="2"), {"condition": "StartsWith", "value": "2", "case_insensitive": False}), + (StartsWith(value="2", case_insensitive=True), + {"condition": "StartsWith", "value": "2", "case_insensitive": True}), + (EndsWith(value="2"), {"condition": "EndsWith", "value": "2", "case_insensitive": False}), + (EndsWith(value="2", case_insensitive=True), + {"condition": "EndsWith", "value": "2", "case_insensitive": True}), + (RegexMatch(value="2"), {"condition": "RegexMatch", "value": "2"}), + ]) + def test_from_json(self, condition, condition_json): + new_condition = condition.__class__.parse_obj(condition_json) + for attr in condition.__dict__: + assert getattr(new_condition, attr) == getattr(condition, attr) + + @pytest.mark.parametrize("condition_type, data", [ + (Contains, {"condition": "Contains", "value": 2, "case_insensitive": False}), + (Contains, {"condition": "Contains", "value": "2", "case_insensitive": 2}), + (NotContains, {"condition": "NotContains", "value": [], "case_insensitive": False}), + (NotContains, {"condition": "NotContains", "value": "2", "case_insensitive": []}), + (Equals, {"condition": "Equals", "value": {}, "case_insensitive": False}), + (Equals, {"condition": "Equals", "value": "2", "case_insensitive": {}}), + (NotEquals, {"condition": "NotEquals", "value": None, "case_insensitive": False}), + (NotEquals, {"condition": "NotEquals", "value": "2", "case_insensitive": None}), + (StartsWith, {"condition": "StartsWith", "value": {1, }, "case_insensitive": False}), + (StartsWith, {"condition": "StartsWith", "value": "2", "case_insensitive": {1, }}), + (EndsWith, {"condition": "EndsWith", "value": (), "case_insensitive": False}), + (EndsWith, {"condition": "EndsWith", "value": "2", "case_insensitive": ()}), + (RegexMatch, {"condition": "RegexMatch", "value": "("}), + ]) + def test_create_error(self, condition_type, data): + with pytest.raises(ValidationError): + condition_type.parse_obj(data) + + @pytest.mark.parametrize("condition, what, result", [ + (Contains(value="b"), "abc", True), + (Contains(value="B"), "abc", False), + (Contains(value="B", case_insensitive=True), "abc", True), + (Contains(value="b"), "cde", False), + (Contains(value="b"), None, False), + + (NotContains(value="b"), "abc", False), + (NotContains(value="b"), "cde", True), + (NotContains(value="D"), "cde", True), + (NotContains(value="D", case_insensitive=True), "cde", False), + (NotContains(value="D", case_insensitive=True), None, False), + + (Equals(value="abc"), "abc", True), + (Equals(value="ABC"), "abc", False), + (Equals(value="ABC", case_insensitive=True), "abc", True), + (Equals(value="b"), "cde", False), + (Equals(value="b"), None, False), + + (NotEquals(value="abc"), "abc", False), + (NotEquals(value="ABC"), "abc", True), + (NotEquals(value="ABC", case_insensitive=True), "abc", False), + (NotEquals(value="b"), "cde", True), + (NotEquals(value="b"), None, False), + + (StartsWith(value="ab"), "abc", True), + (StartsWith(value="AB"), "abc", False), + (StartsWith(value="AB", case_insensitive=True), "abc", True), + (StartsWith(value="ab"), "ab", True), + (StartsWith(value="ab"), "cab", False), + (StartsWith(value="ab"), None, False), + + (EndsWith(value="ab"), "abc", False), + (EndsWith(value="ab"), "ab", True), + (EndsWith(value="ab"), "cab", True), + (EndsWith(value="AB"), "cab", False), + (EndsWith(value="AB", case_insensitive=True), "cab", True), + (EndsWith(value="AB", case_insensitive=True), None, False), + + (RegexMatch(value=".*"), "foo", True), + (RegexMatch(value="abc"), "abc", True), + (RegexMatch(value="abc"), "abd", False), + (RegexMatch(value=r"[\d\w]+"), "567asd", True), + (RegexMatch(value=""), "", True), + (RegexMatch(value=r"^python\?exe"), "python?exe", True), + (RegexMatch(value=r"^python?exe"), "python?exe", False), + (RegexMatch(value=r"^python?exe"), None, False), + ]) + def test_is_satisfied(self, condition, what, result): + request = AccessRequest(subject={"attributes": {"what": what}}, resource={}, action={}, context={}) + ctx = EvaluationContext(request) + ctx.ace = "subject" + ctx.attribute_path = "$.what" + assert condition.is_satisfied(ctx) == result diff --git a/tests/test__policy/test_targets.py b/tests/test__policy/test_targets.py new file mode 100644 index 0000000..1f318bc --- /dev/null +++ b/tests/test__policy/test_targets.py @@ -0,0 +1,70 @@ +""" + Unit test target schema +""" + +import pytest +from pydantic import ValidationError + +from py_abac.context import EvaluationContext +from py_abac._policy.targets import Targets +from py_abac.request import AccessRequest + + +def test_create(): + targets_json = { + "subject_id": ["abc", "a*"], + "resource_id": ["123"], + } + targets = Targets.parse_obj(targets_json) + assert isinstance(targets, Targets) + assert targets.subject_id == targets_json["subject_id"] + assert targets.resource_id == targets_json["resource_id"] + assert targets.action_id == "*" + + +@pytest.mark.parametrize("targets_json", [ + {"subject_id": [None, ]}, + {"resource_id": [{}, ]}, + {"action_id": {}}, + {"subject_id": [""]}, + {"resource_id": ""}, +]) +def test_create_error(targets_json): + with pytest.raises(ValidationError): + Targets.parse_obj(targets_json) + + +@pytest.mark.parametrize("targets_json, result", [ + ({}, True), + ({"subject_id": "abc"}, True), + ({"subject_id": "ab*"}, True), + ({"subject_id": "abd"}, False), + ({"subject_id": ["abd", "ab*"]}, True), + ({"subject_id": "ab*", "resource_id": "1*"}, True), + ({"subject_id": "ab*", "resource_id": "1*", "action_id": "1"}, False), +]) +def test_match(targets_json, result): + request_json = { + "subject": { + "id": "abc", + "attributes": { + "firstName": "Carl", + "lastName": "Right" + } + }, + "resource": { + "id": "12", + "attributes": { + "name": "Calendar" + } + }, + "action": { + "id": ">", + "attributes": {} + }, + "context": {} + } + request = AccessRequest.from_json(request_json) + ctx = EvaluationContext(request) + targets = Targets.parse_obj(targets_json) + assert targets.match(ctx) == result