|
6 | 6 | from abc import ABC |
7 | 7 | from copy import deepcopy |
8 | 8 | from datetime import datetime, timezone, timedelta |
9 | | -from enum import Enum, unique |
| 9 | +from enum import Enum, StrEnum, unique |
10 | 10 | from functools import wraps, cached_property |
11 | 11 | from typing import Dict, Iterator, List, ClassVar, Optional, TypedDict, Any, TypeVar, Type, Callable, Set, Tuple |
12 | 12 | from collections import defaultdict |
13 | 13 |
|
14 | 14 | from attr import resolve_types |
15 | | -from attrs import define, field, Factory |
| 15 | +from attrs import define, field, Factory, frozen, evolve |
16 | 16 | from prometheus_client import Counter, Summary |
17 | 17 |
|
18 | | -from fixlib.json import from_json as _from_json, to_json as _to_json |
| 18 | +from fixlib.json import from_json as _from_json, to_json as _to_json, to_json_str |
19 | 19 | from fixlib.logger import log |
20 | 20 | from fixlib.types import Json |
21 | 21 | from fixlib.utils import make_valid_timestamp, utc_str, utc |
@@ -68,6 +68,7 @@ class ModelReference(TypedDict, total=False): |
68 | 68 | class EdgeType(Enum): |
69 | 69 | default = "default" |
70 | 70 | delete = "delete" |
| 71 | + iam = "iam" |
71 | 72 |
|
72 | 73 | @staticmethod |
73 | 74 | def from_value(value: Optional[str] = None) -> EdgeType: |
@@ -1613,4 +1614,77 @@ def delete(self, graph: Any) -> bool: |
1613 | 1614 | return False |
1614 | 1615 |
|
1615 | 1616 |
|
| 1617 | +class PolicySourceKind(StrEnum): |
| 1618 | + principal = "principal" # e.g. IAM user, attached policy |
| 1619 | + group = "group" # policy comes from an IAM group |
| 1620 | + resource = "resource" # e.g. s3 bucket policy |
| 1621 | + |
| 1622 | + |
| 1623 | +ResourceConstraint = str |
| 1624 | + |
| 1625 | +ConditionString = str |
| 1626 | + |
| 1627 | + |
| 1628 | +@frozen |
| 1629 | +class PolicySource: |
| 1630 | + kind: PolicySourceKind |
| 1631 | + uri: str |
| 1632 | + |
| 1633 | + |
| 1634 | +class HasResourcePolicy(ABC): |
| 1635 | + # returns a list of all policies that affects the resource (inline, attached, etc.) |
| 1636 | + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Json]]: |
| 1637 | + raise NotImplementedError |
| 1638 | + |
| 1639 | + |
| 1640 | +@frozen |
| 1641 | +class PermissionCondition: |
| 1642 | + # if nonempty and any evals to true, access is granted, otherwise implicitly denied |
| 1643 | + allow: Optional[Tuple[ConditionString, ...]] = None |
| 1644 | + # if nonempty and any is evals to false, access is implicitly denied |
| 1645 | + boundary: Optional[Tuple[ConditionString, ...]] = None |
| 1646 | + # if nonempty and any evals to true, access is explicitly denied |
| 1647 | + deny: Optional[Tuple[ConditionString, ...]] = None |
| 1648 | + |
| 1649 | + |
| 1650 | +@frozen |
| 1651 | +class PermissionScope: |
| 1652 | + source: PolicySource |
| 1653 | + constraints: Tuple[ResourceConstraint, ...] # aka resource constraints |
| 1654 | + conditions: Optional[PermissionCondition] = None |
| 1655 | + |
| 1656 | + def with_deny_conditions(self, deny_conditions: List[Json]) -> "PermissionScope": |
| 1657 | + c = self.conditions or PermissionCondition() |
| 1658 | + return evolve(self, conditions=evolve(c, deny=tuple([to_json_str(c) for c in deny_conditions]))) |
| 1659 | + |
| 1660 | + def with_boundary_conditions(self, boundary_conditions: List[Json]) -> "PermissionScope": |
| 1661 | + c = self.conditions or PermissionCondition() |
| 1662 | + return evolve(self, conditions=evolve(c, boundary=tuple([to_json_str(c) for c in boundary_conditions]))) |
| 1663 | + |
| 1664 | + def has_no_condititons(self) -> bool: |
| 1665 | + if self.conditions is None: |
| 1666 | + return True |
| 1667 | + |
| 1668 | + if self.conditions.allow is None and self.conditions.boundary is None and self.conditions.deny is None: |
| 1669 | + return True |
| 1670 | + |
| 1671 | + return False |
| 1672 | + |
| 1673 | + |
| 1674 | +class PermissionLevel(StrEnum): |
| 1675 | + list = "list" |
| 1676 | + read = "read" |
| 1677 | + tagging = "tagging" |
| 1678 | + write = "write" |
| 1679 | + permission_management = "permission" |
| 1680 | + unknown = "unknown" # in case a resource is not in the levels database |
| 1681 | + |
| 1682 | + |
| 1683 | +@frozen |
| 1684 | +class AccessPermission: |
| 1685 | + action: str |
| 1686 | + level: PermissionLevel |
| 1687 | + scopes: Tuple[PermissionScope, ...] |
| 1688 | + |
| 1689 | + |
1616 | 1690 | resolve_types(BaseResource) # noqa |
0 commit comments