|
1 | | -from typing import ClassVar, Dict, Optional, List, Any |
| 1 | +from typing import ClassVar, Dict, Optional, List, Any, Tuple |
2 | 2 | from json import loads as json_loads |
3 | 3 |
|
4 | 4 | from attrs import define, field |
|
8 | 8 | from fix_plugin_aws.resource.kms import AwsKmsKey |
9 | 9 | from fix_plugin_aws.aws_client import AwsClient |
10 | 10 | from fix_plugin_aws.utils import ToDict |
11 | | -from fixlib.baseresources import HasResourcePolicy, MetricName, ModelReference |
| 11 | +from fixlib.baseresources import HasResourcePolicy, MetricName, ModelReference, PolicySource, PolicySourceKind |
12 | 12 | from fixlib.graph import Graph |
13 | 13 | from fixlib.json_bender import Bender, S, Bend, bend, ForallBend |
14 | 14 | from fixlib.types import Json |
@@ -133,6 +133,12 @@ class AwsKinesisStream(AwsResource, HasResourcePolicy): |
133 | 133 | kinesis_key_id: Optional[str] = field(default=None) |
134 | 134 | kinesis_policy: Optional[Json] = field(default=None) |
135 | 135 |
|
| 136 | + def resource_policy(self, builder: Any) -> List[Tuple[PolicySource, Dict[str, Any]]]: |
| 137 | + if not self.kinesis_policy or not self.arn: |
| 138 | + return [] |
| 139 | + |
| 140 | + return [(PolicySource(PolicySourceKind.resource, self.arn), self.kinesis_policy)] |
| 141 | + |
136 | 142 | @classmethod |
137 | 143 | def called_collect_apis(cls) -> List[AwsApiSpec]: |
138 | 144 | return [ |
|
0 commit comments