|
| 1 | +from datetime import datetime |
| 2 | +from typing import ClassVar, Dict, Optional, List, Type |
| 3 | + |
| 4 | +from attr import define, field |
| 5 | + |
| 6 | +from fix_plugin_azure.azure_client import AzureResourceSpec |
| 7 | +from fix_plugin_azure.resource.base import ( |
| 8 | + MicrosoftResource, |
| 9 | + GraphBuilder, |
| 10 | + AzureSubscription, |
| 11 | +) |
| 12 | +from fix_plugin_azure.resource.microsoft_graph import ( |
| 13 | + MicrosoftGraphUser, |
| 14 | + MicrosoftGraphDevice, |
| 15 | + MicrosoftGraphServicePrincipal, |
| 16 | + MicrosoftGraphGroup, |
| 17 | + MicrosoftGraphPrincipalTypes, |
| 18 | +) |
| 19 | +from fixlib.baseresources import BaseRole, ModelReference |
| 20 | +from fixlib.graph import BySearchCriteria |
| 21 | +from fixlib.json_bender import Bender, S, ForallBend |
| 22 | +from fixlib.types import Json |
| 23 | + |
| 24 | + |
| 25 | +@define(eq=False, slots=False) |
| 26 | +class AzureDenyAssignmentPermission: |
| 27 | + kind: ClassVar[str] = "azure_deny_assignment_permission" |
| 28 | + mapping: ClassVar[Dict[str, Bender]] = { |
| 29 | + "actions": S("actions"), |
| 30 | + "condition": S("condition"), |
| 31 | + "condition_version": S("conditionVersion"), |
| 32 | + "data_actions": S("dataActions"), |
| 33 | + "not_actions": S("notActions"), |
| 34 | + "not_data_actions": S("notDataActions"), |
| 35 | + } |
| 36 | + actions: Optional[List[str]] = field(default=None, metadata={'description': 'Actions to which the deny assignment does not grant access.'}) # fmt: skip |
| 37 | + condition: Optional[str] = field(default=None, metadata={'description': 'The conditions on the Deny assignment permission. This limits the resources it applies to.'}) # fmt: skip |
| 38 | + condition_version: Optional[str] = field(default=None, metadata={"description": "Version of the condition."}) |
| 39 | + data_actions: Optional[List[str]] = field(default=None, metadata={'description': 'Data actions to which the deny assignment does not grant access.'}) # fmt: skip |
| 40 | + not_actions: Optional[List[str]] = field(default=None, metadata={'description': 'Actions to exclude from that the deny assignment does not grant access.'}) # fmt: skip |
| 41 | + not_data_actions: Optional[List[str]] = field(default=None, metadata={'description': 'Data actions to exclude from that the deny assignment does not grant access.'}) # fmt: skip |
| 42 | + |
| 43 | + |
| 44 | +@define(eq=False, slots=False) |
| 45 | +class AzurePrincipal: |
| 46 | + kind: ClassVar[str] = "azure_principal" |
| 47 | + mapping: ClassVar[Dict[str, Bender]] = { |
| 48 | + "display_name": S("displayName"), |
| 49 | + "email": S("email"), |
| 50 | + "id": S("id"), |
| 51 | + "type": S("type"), |
| 52 | + } |
| 53 | + display_name: Optional[str] = field(default=None, metadata={'description': 'The name of the principal made changes'}) # fmt: skip |
| 54 | + email: Optional[str] = field(default=None, metadata={"description": "Email of principal"}) |
| 55 | + id: Optional[str] = field(default=None, metadata={"description": "The id of the principal made changes"}) |
| 56 | + type: Optional[str] = field(default=None, metadata={"description": "Type of principal such as user , group etc"}) |
| 57 | + |
| 58 | + |
| 59 | +@define(eq=False, slots=False) |
| 60 | +class AzureDenyAssignment(MicrosoftResource): |
| 61 | + kind: ClassVar[str] = "azure_deny_assignment" |
| 62 | + api_spec: ClassVar[AzureResourceSpec] = AzureResourceSpec( |
| 63 | + service="authorization", |
| 64 | + version="2022-04-01", |
| 65 | + path="/subscriptions/{subscriptionId}/providers/Microsoft.Authorization/denyAssignments", |
| 66 | + path_parameters=["subscriptionId"], |
| 67 | + query_parameters=["api-version"], |
| 68 | + access_path="value", |
| 69 | + expect_array=True, |
| 70 | + ) |
| 71 | + mapping: ClassVar[Dict[str, Bender]] = { |
| 72 | + "id": S("id"), |
| 73 | + "tags": S("tags", default={}), |
| 74 | + "name": S("name"), |
| 75 | + "ctime": S("properties", "createdOn"), |
| 76 | + "mtime": S("properties", "updatedOn"), |
| 77 | + "condition": S("properties", "condition"), |
| 78 | + "condition_version": S("properties", "conditionVersion"), |
| 79 | + "created_by": S("properties", "createdBy"), |
| 80 | + "created_on": S("properties", "createdOn"), |
| 81 | + "deny_assignment_name": S("properties", "denyAssignmentName"), |
| 82 | + "description": S("properties", "description"), |
| 83 | + "do_not_apply_to_child_scopes": S("properties", "doNotApplyToChildScopes"), |
| 84 | + "exclude_principals": S("properties", "excludePrincipals") >> ForallBend(AzurePrincipal.mapping), |
| 85 | + "is_system_protected": S("properties", "isSystemProtected"), |
| 86 | + "deny_assignment_permissions": S("properties", "permissions") |
| 87 | + >> ForallBend(AzureDenyAssignmentPermission.mapping), |
| 88 | + "principals": S("properties", "principals") >> ForallBend(AzurePrincipal.mapping), |
| 89 | + "scope": S("properties", "scope"), |
| 90 | + "updated_by": S("properties", "updatedBy"), |
| 91 | + "updated_on": S("properties", "updatedOn"), |
| 92 | + } |
| 93 | + condition: Optional[str] = field(default=None, metadata={'description': 'The conditions on the deny assignment. This limits the resources it can be assigned to. e.g.: @Resource[Microsoft.Storage/storageAccounts/blobServices/containers:ContainerName] StringEqualsIgnoreCase foo_storage_container '}) # fmt: skip |
| 94 | + condition_version: Optional[str] = field(default=None, metadata={"description": "Version of the condition."}) |
| 95 | + created_by: Optional[str] = field(default=None, metadata={'description': 'Id of the user who created the assignment'}) # fmt: skip |
| 96 | + created_on: Optional[datetime] = field(default=None, metadata={"description": "Time it was created"}) |
| 97 | + deny_assignment_name: Optional[str] = field(default=None, metadata={'description': 'The display name of the deny assignment.'}) # fmt: skip |
| 98 | + description: Optional[str] = field(default=None, metadata={'description': 'The description of the deny assignment.'}) # fmt: skip |
| 99 | + do_not_apply_to_child_scopes: Optional[bool] = field(default=None, metadata={'description': 'Determines if the deny assignment applies to child scopes. Default value is false.'}) # fmt: skip |
| 100 | + exclude_principals: Optional[List[AzurePrincipal]] = field(default=None, metadata={'description': 'Array of principals to which the deny assignment does not apply.'}) # fmt: skip |
| 101 | + is_system_protected: Optional[bool] = field(default=None, metadata={'description': 'Specifies whether this deny assignment was created by Azure and cannot be edited or deleted.'}) # fmt: skip |
| 102 | + permissions: Optional[List[AzureDenyAssignmentPermission]] = field(default=None, metadata={'description': 'An array of permissions that are denied by the deny assignment.'}) # fmt: skip |
| 103 | + principals: Optional[List[AzurePrincipal]] = field(default=None, metadata={'description': 'Array of principals to which the deny assignment applies.'}) # fmt: skip |
| 104 | + scope: Optional[str] = field(default=None, metadata={"description": "The deny assignment scope."}) |
| 105 | + updated_by: Optional[str] = field(default=None, metadata={'description': 'Id of the user who updated the assignment'}) # fmt: skip |
| 106 | + updated_on: Optional[datetime] = field(default=None, metadata={"description": "Time it was updated"}) |
| 107 | + |
| 108 | + |
| 109 | +@define(eq=False, slots=False) |
| 110 | +class AzureRoleAssignment(MicrosoftResource): |
| 111 | + kind: ClassVar[str] = "azure_role_assignment" |
| 112 | + api_spec: ClassVar[AzureResourceSpec] = AzureResourceSpec( |
| 113 | + service="authorization", |
| 114 | + version="2022-04-01", |
| 115 | + path="/subscriptions/{subscriptionId}/providers/Microsoft.Authorization/roleAssignments", |
| 116 | + path_parameters=["subscriptionId"], |
| 117 | + query_parameters=["api-version"], |
| 118 | + access_path="value", |
| 119 | + expect_array=True, |
| 120 | + ) |
| 121 | + kind_lookup: ClassVar[Dict[str, str]] = { |
| 122 | + "User": MicrosoftGraphUser.kind, |
| 123 | + "Device": MicrosoftGraphDevice.kind, |
| 124 | + "ServicePrincipal": MicrosoftGraphServicePrincipal.kind, |
| 125 | + "Group": MicrosoftGraphGroup.kind, |
| 126 | + "Subscription": "azure_subscription", |
| 127 | + "ResourceGroup": "azure_resource_group", |
| 128 | + "Resource": "azure_resource", |
| 129 | + } |
| 130 | + reference_kinds: ClassVar[ModelReference] = { |
| 131 | + "successors": {"default": ["azure_role_definition", *(p.kind for p in MicrosoftGraphPrincipalTypes)]}, |
| 132 | + "predecessors": { |
| 133 | + "default": ["azure_role_definition", "azure_subscription", "azure_resource_group", "azure_resource"] |
| 134 | + }, |
| 135 | + } |
| 136 | + mapping: ClassVar[Dict[str, Bender]] = { |
| 137 | + "id": S("id"), |
| 138 | + "tags": S("tags", default={}), |
| 139 | + "name": S("name"), |
| 140 | + "ctime": S("properties", "createdOn"), |
| 141 | + "mtime": S("properties", "updatedOn"), |
| 142 | + "condition": S("properties", "condition"), |
| 143 | + "condition_version": S("properties", "conditionVersion"), |
| 144 | + "created_by": S("properties", "createdBy"), |
| 145 | + "created_on": S("properties", "createdOn"), |
| 146 | + "delegated_managed_identity_resource_id": S("properties", "delegatedManagedIdentityResourceId"), |
| 147 | + "description": S("properties", "description"), |
| 148 | + "principal_id": S("properties", "principalId"), |
| 149 | + "principal_type": S("properties", "principalType"), |
| 150 | + "role_definition_id": S("properties", "roleDefinitionId"), |
| 151 | + "scope": S("properties", "scope"), |
| 152 | + "updated_by": S("properties", "updatedBy"), |
| 153 | + "updated_on": S("properties", "updatedOn"), |
| 154 | + } |
| 155 | + |
| 156 | + condition: Optional[str] = field(default=None, metadata={'description': 'The conditions on the role assignment. This limits the resources it can be assigned to. e.g.: @Resource[Microsoft.Storage/storageAccounts/blobServices/containers:ContainerName] StringEqualsIgnoreCase foo_storage_container '}) # fmt: skip |
| 157 | + condition_version: Optional[str] = field(default=None, metadata={'description': 'Version of the condition. Currently the only accepted value is 2.0 '}) # fmt: skip |
| 158 | + created_by: Optional[str] = field(default=None, metadata={'description': 'Id of the user who created the assignment'}) # fmt: skip |
| 159 | + created_on: Optional[datetime] = field(default=None, metadata={"description": "Time it was created"}) |
| 160 | + delegated_managed_identity_resource_id: Optional[str] = field(default=None, metadata={'description': 'Id of the delegated managed identity resource'}) # fmt: skip |
| 161 | + description: Optional[str] = field(default=None, metadata={"description": "Description of role assignment"}) |
| 162 | + principal_id: Optional[str] = field(default=None, metadata={"description": "The principal ID."}) |
| 163 | + principal_type: Optional[str] = field(default=None, metadata={'description': 'The principal type of the assigned principal ID.'}) # fmt: skip |
| 164 | + role_definition_id: Optional[str] = field(default=None, metadata={"description": "The role definition ID."}) |
| 165 | + scope: Optional[str] = field(default=None, metadata={"description": "The role assignment scope."}) |
| 166 | + updated_by: Optional[str] = field(default=None, metadata={'description': 'Id of the user who updated the assignment'}) # fmt: skip |
| 167 | + updated_on: Optional[datetime] = field(default=None, metadata={"description": "Time it was updated"}) |
| 168 | + |
| 169 | + def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: |
| 170 | + # role definition |
| 171 | + if rid := self.role_definition_id: |
| 172 | + builder.add_edge(self, clazz=AzureRoleDefinition, id=rid) |
| 173 | + |
| 174 | + # scope |
| 175 | + if scope := self.scope: |
| 176 | + scope_parts = scope.split("/") |
| 177 | + if scope.startswith("/providers/Microsoft.Management/managementGroups/"): # management group |
| 178 | + pass |
| 179 | + elif len(scope_parts) == 2: # subscription |
| 180 | + builder.add_edge(self, reverse=True, clazz=AzureSubscription, id=scope_parts[-1]) |
| 181 | + else: # resource group or resource |
| 182 | + builder.add_edge(self, reverse=True, id=scope) |
| 183 | + |
| 184 | + # principal: collected via ms graph -> create a deferred edge |
| 185 | + if (pt := self.principal_type) and (pt_kind := self.kind_lookup.get(pt)) and (pid := self.principal_id): |
| 186 | + builder.add_deferred_edge( |
| 187 | + from_node=self, to_node=BySearchCriteria(f'is({pt_kind}) and reported.id=="{pid}"') |
| 188 | + ) |
| 189 | + |
| 190 | + |
| 191 | +@define(eq=False, slots=False) |
| 192 | +class AzurePermission: |
| 193 | + kind: ClassVar[str] = "azure_permission" |
| 194 | + mapping: ClassVar[Dict[str, Bender]] = { |
| 195 | + "actions": S("actions"), |
| 196 | + "data_actions": S("dataActions"), |
| 197 | + "not_actions": S("notActions"), |
| 198 | + "not_data_actions": S("notDataActions"), |
| 199 | + } |
| 200 | + actions: Optional[List[str]] = field(default=None, metadata={"description": "Allowed actions."}) |
| 201 | + data_actions: Optional[List[str]] = field(default=None, metadata={"description": "Allowed Data actions."}) |
| 202 | + not_actions: Optional[List[str]] = field(default=None, metadata={"description": "Denied actions."}) |
| 203 | + not_data_actions: Optional[List[str]] = field(default=None, metadata={"description": "Denied Data actions."}) |
| 204 | + |
| 205 | + |
| 206 | +@define(eq=False, slots=False) |
| 207 | +class AzureRoleDefinition(MicrosoftResource, BaseRole): |
| 208 | + kind: ClassVar[str] = "azure_role_definition" |
| 209 | + api_spec: ClassVar[AzureResourceSpec] = AzureResourceSpec( |
| 210 | + service="authorization", |
| 211 | + version="2022-04-01", |
| 212 | + path="/subscriptions/{subscriptionId}/providers/Microsoft.Authorization/roleDefinitions", |
| 213 | + path_parameters=["subscriptionId"], |
| 214 | + query_parameters=["api-version"], |
| 215 | + access_path="value", |
| 216 | + expect_array=True, |
| 217 | + ) |
| 218 | + mapping: ClassVar[Dict[str, Bender]] = { |
| 219 | + "id": S("id"), |
| 220 | + "name": S("properties", "roleName"), |
| 221 | + "ctime": S("properties", "createdOn"), |
| 222 | + "mtime": S("properties", "updatedOn"), |
| 223 | + "assignable_scopes": S("properties", "assignableScopes"), |
| 224 | + "created_by": S("properties", "createdBy"), |
| 225 | + "created_on": S("properties", "createdOn"), |
| 226 | + "description": S("properties", "description"), |
| 227 | + "azure_role_permissions": S("properties", "permissions") >> ForallBend(AzurePermission.mapping), |
| 228 | + "role_name": S("properties", "roleName"), |
| 229 | + "updated_by": S("properties", "updatedBy"), |
| 230 | + "updated_on": S("properties", "updatedOn"), |
| 231 | + } |
| 232 | + assignable_scopes: Optional[List[str]] = field(default=None, metadata={'description': 'Role definition assignable scopes.'}) # fmt: skip |
| 233 | + created_by: Optional[str] = field(default=None, metadata={'description': 'Id of the user who created the assignment'}) # fmt: skip |
| 234 | + created_on: Optional[datetime] = field(default=None, metadata={"description": "Time it was created"}) |
| 235 | + description: Optional[str] = field(default=None, metadata={"description": "The role definition description."}) |
| 236 | + azure_role_permissions: Optional[List[AzurePermission]] = field(default=None, metadata={'description': 'Role definition permissions.'}) # fmt: skip |
| 237 | + role_name: Optional[str] = field(default=None, metadata={"description": "The role name."}) |
| 238 | + updated_by: Optional[str] = field(default=None, metadata={'description': 'Id of the user who updated the assignment'}) # fmt: skip |
| 239 | + updated_on: Optional[datetime] = field(default=None, metadata={"description": "Time it was updated"}) |
| 240 | + |
| 241 | + |
| 242 | +resources: List[Type[MicrosoftResource]] = [ |
| 243 | + AzureDenyAssignment, |
| 244 | + AzureRoleAssignment, |
| 245 | + AzureRoleDefinition, |
| 246 | +] |
0 commit comments