|
| 1 | +import logging |
| 2 | +from datetime import datetime |
| 3 | +from functools import partial |
| 4 | +from typing import ClassVar, Dict, Optional, List, Tuple, Type, Any |
| 5 | + |
| 6 | +from attrs import define, field |
| 7 | +from boto3.exceptions import Boto3Error |
| 8 | + |
| 9 | +from fix_plugin_aws.resource.base import AwsResource, AwsApiSpec, GraphBuilder |
| 10 | +from fix_plugin_aws.resource.ec2 import AwsEc2Instance |
| 11 | +from fix_plugin_aws.resource.ecr import AwsEcrRepository |
| 12 | +from fix_plugin_aws.resource.lambda_ import AwsLambdaFunction |
| 13 | +from fixlib.baseresources import PhantomBaseResource, Severity, Finding |
| 14 | +from fixlib.json_bender import Bender, S, ForallBend, Bend, F |
| 15 | +from fixlib.types import Json |
| 16 | + |
| 17 | +log = logging.getLogger("fix.plugins.aws") |
| 18 | +service_name = "inspector2" |
| 19 | + |
| 20 | +amazon_inspector = "amazon_inspector" |
| 21 | + |
| 22 | + |
| 23 | +@define(eq=False, slots=False) |
| 24 | +class AwsInspectorRecommendation: |
| 25 | + kind: ClassVar[str] = "aws_inspector_recommendation" |
| 26 | + mapping: ClassVar[Dict[str, Bender]] = {"url": S("Url"), "text": S("text")} |
| 27 | + url: Optional[str] = field(default=None, metadata={"description": "The URL address to the CVE remediation recommendations."}) # fmt: skip |
| 28 | + text: Optional[str] = field(default=None, metadata={"description": "The recommended course of action to remediate the finding."}) # fmt: skip |
| 29 | + |
| 30 | + |
| 31 | +@define(eq=False, slots=False) |
| 32 | +class AwsInspectorRemediation: |
| 33 | + kind: ClassVar[str] = "aws_inspector_remediation" |
| 34 | + mapping: ClassVar[Dict[str, Bender]] = { |
| 35 | + "recommendation": S("recommendation") >> Bend(AwsInspectorRecommendation.mapping) |
| 36 | + } |
| 37 | + recommendation: Optional[AwsInspectorRecommendation] = field(default=None, metadata={"description": "An object that contains information about the recommended course of action to remediate the finding."}) # fmt: skip |
| 38 | + |
| 39 | + |
| 40 | +@define(eq=False, slots=False) |
| 41 | +class AwsInspectorResource: |
| 42 | + kind: ClassVar[str] = "aws_inspector_resource" |
| 43 | + mapping: ClassVar[Dict[str, Bender]] = { |
| 44 | + # "details": S("details") # not used |
| 45 | + "id": S("id"), |
| 46 | + "partition": S("partition"), |
| 47 | + "region": S("region"), |
| 48 | + "type": S("type"), |
| 49 | + } |
| 50 | + id: Optional[str] = field(default=None, metadata={"description": "The ID of the resource."}) # fmt: skip |
| 51 | + partition: Optional[str] = field(default=None, metadata={"description": "The partition of the resource."}) # fmt: skip |
| 52 | + region: Optional[str] = field(default=None, metadata={"description": "The Amazon Web Services Region the impacted resource is located in."}) # fmt: skip |
| 53 | + type: Optional[str] = field(default=None, metadata={"description": "The type of resource."}) # fmt: skip |
| 54 | + |
| 55 | + |
| 56 | +@define(eq=False, slots=False) |
| 57 | +class AwsInspectorFinding(AwsResource, PhantomBaseResource): |
| 58 | + kind: ClassVar[str] = "aws_inspector_finding" |
| 59 | + api_spec: ClassVar[AwsApiSpec] = AwsApiSpec(service_name, "list-findings") |
| 60 | + _model_export: ClassVar[bool] = False # do not export this class, since there will be no instances of it |
| 61 | + mapping: ClassVar[Dict[str, Bender]] = { |
| 62 | + "id": S("findingArn") >> F(AwsResource.id_from_arn), |
| 63 | + "name": S("title"), |
| 64 | + "mtime": S("updatedAt"), |
| 65 | + "arn": S("findingArn"), |
| 66 | + "aws_account_id": S("awsAccountId"), |
| 67 | + "description": S("description"), |
| 68 | + "epss": S("epss", "score"), |
| 69 | + "exploit_available": S("exploitAvailable"), |
| 70 | + "exploitability_details": S("exploitabilityDetails", "lastKnownExploitAt"), |
| 71 | + "finding_arn": S("findingArn"), |
| 72 | + "first_observed_at": S("firstObservedAt"), |
| 73 | + "fix_available": S("fixAvailable"), |
| 74 | + "inspector_score": S("inspectorScore"), |
| 75 | + "last_observed_at": S("lastObservedAt"), |
| 76 | + "remediation": S("remediation") >> Bend(AwsInspectorRemediation.mapping), |
| 77 | + "finding_resources": S("resources", default=[]) >> ForallBend(AwsInspectorResource.mapping), |
| 78 | + "finding_severity": S("severity"), |
| 79 | + "status": S("status"), |
| 80 | + "title": S("title"), |
| 81 | + "type": S("type"), |
| 82 | + "updated_at": S("updatedAt"), |
| 83 | + # available but not used properties: |
| 84 | + # "inspector_score_details": S("inspectorScoreDetails") |
| 85 | + # "code_vulnerability_details": S("codeVulnerabilityDetails") |
| 86 | + # "network_reachability_details": S("networkReachabilityDetails") |
| 87 | + # "package_vulnerability_details": S("packageVulnerabilityDetails") |
| 88 | + } |
| 89 | + aws_account_id: Optional[str] = field(default=None, metadata={"description": "The Amazon Web Services account ID associated with the finding."}) # fmt: skip |
| 90 | + description: Optional[str] = field(default=None, metadata={"description": "The description of the finding."}) # fmt: skip |
| 91 | + epss: Optional[float] = field(default=None, metadata={"description": "The finding's EPSS score."}) # fmt: skip |
| 92 | + exploit_available: Optional[str] = field(default=None, metadata={"description": "If a finding discovered in your environment has an exploit available."}) # fmt: skip |
| 93 | + exploitability_details: Optional[datetime] = field(default=None, metadata={"description": "The details of an exploit available for a finding discovered in your environment."}) # fmt: skip |
| 94 | + finding_arn: Optional[str] = field(default=None, metadata={"description": "The Amazon Resource Number (ARN) of the finding."}) # fmt: skip |
| 95 | + first_observed_at: Optional[datetime] = field(default=None, metadata={"description": "The date and time that the finding was first observed."}) # fmt: skip |
| 96 | + fix_available: Optional[str] = field(default=None, metadata={"description": "Details on whether a fix is available through a version update. This value can be YES, NO, or PARTIAL. A PARTIAL fix means that some, but not all, of the packages identified in the finding have fixes available through updated versions."}) # fmt: skip |
| 97 | + inspector_score: Optional[float] = field(default=None, metadata={"description": "The Amazon Inspector score given to the finding."}) # fmt: skip |
| 98 | + last_observed_at: Optional[datetime] = field(default=None, metadata={"description": "The date and time the finding was last observed. This timestamp for this field remains unchanged until a finding is updated."}) # fmt: skip |
| 99 | + remediation: Optional[AwsInspectorRemediation] = field(default=None, metadata={"description": "An object that contains the details about how to remediate a finding."}) # fmt: skip |
| 100 | + finding_resources: Optional[List[AwsInspectorResource]] = field(factory=list, metadata={"description": "Contains information on the resources involved in a finding. The resource value determines the valid values for type in your request. For more information, see Finding types in the Amazon Inspector user guide."}) # fmt: skip |
| 101 | + finding_severity: Optional[str] = field(default=None, metadata={"description": "The severity of the finding. UNTRIAGED applies to PACKAGE_VULNERABILITY type findings that the vendor has not assigned a severity yet. For more information, see Severity levels for findings in the Amazon Inspector user guide."}) # fmt: skip |
| 102 | + status: Optional[str] = field(default=None, metadata={"description": "The status of the finding."}) # fmt: skip |
| 103 | + title: Optional[str] = field(default=None, metadata={"description": "The title of the finding."}) # fmt: skip |
| 104 | + type: Optional[str] = field(default=None, metadata={"description": "The type of the finding. The type value determines the valid values for resource in your request. For more information, see Finding types in the Amazon Inspector user guide."}) # fmt: skip |
| 105 | + updated_at: Optional[datetime] = field(default=None, metadata={"description": "The date and time the finding was last updated at."}) # fmt: skip |
| 106 | + |
| 107 | + def parse_finding(self, source: Json) -> Finding: |
| 108 | + severity_mapping = { |
| 109 | + "INFORMATIONAL": Severity.info, |
| 110 | + "LOW": Severity.low, |
| 111 | + "MEDIUM": Severity.medium, |
| 112 | + "HIGH": Severity.high, |
| 113 | + "CRITICAL": Severity.critical, |
| 114 | + } |
| 115 | + finding_title = self.safe_name |
| 116 | + if not self.finding_severity: |
| 117 | + finding_severity = Severity.medium |
| 118 | + else: |
| 119 | + finding_severity = severity_mapping.get(self.finding_severity, Severity.medium) |
| 120 | + description = self.description |
| 121 | + remediation = "" |
| 122 | + if self.remediation and self.remediation.recommendation: |
| 123 | + remediation = self.remediation.recommendation.text or "" |
| 124 | + updated_at = self.updated_at |
| 125 | + details = source.get("packageVulnerabilityDetails", {}) | source.get("codeVulnerabilityDetails", {}) |
| 126 | + return Finding(finding_title, finding_severity, description, remediation, updated_at, details) |
| 127 | + |
| 128 | + @classmethod |
| 129 | + def collect_resources(cls, builder: GraphBuilder) -> None: |
| 130 | + def check_type_and_adjust_id( |
| 131 | + class_type: Optional[str], resource_id: Optional[str] |
| 132 | + ) -> Tuple[Optional[Type[Any]], Optional[Dict[str, Any]]]: |
| 133 | + if not resource_id or not class_type: |
| 134 | + return None, None |
| 135 | + match class_type: |
| 136 | + case "AWS_LAMBDA_FUNCTION": |
| 137 | + # remove lambda's version from arn |
| 138 | + lambda_arn = resource_id.rsplit(":", 1)[0] |
| 139 | + return AwsLambdaFunction, {"arn": lambda_arn} |
| 140 | + case "AWS_EC2_INSTANCE": |
| 141 | + return AwsEc2Instance, {"id": resource_id} |
| 142 | + case "AWS_ECR_REPOSITORY": |
| 143 | + return AwsEcrRepository, {"id": resource_id, "_region": builder.region} |
| 144 | + case _: |
| 145 | + return None, None |
| 146 | + |
| 147 | + def add_finding( |
| 148 | + provider: str, finding: Finding, clazz: Optional[Type[AwsResource]] = None, **node: Any |
| 149 | + ) -> None: |
| 150 | + if resource := builder.node(clazz=clazz, **node): |
| 151 | + resource.add_finding(provider, finding) |
| 152 | + |
| 153 | + # Default behavior: in case the class has an ApiSpec, call the api and call collect. |
| 154 | + log.debug(f"Collecting {cls.__name__} in region {builder.region.name}") |
| 155 | + try: |
| 156 | + for item in builder.client.list( |
| 157 | + aws_service=service_name, |
| 158 | + action="list-findings", |
| 159 | + result_name="findings", |
| 160 | + expected_errors=["AccessDeniedException"], |
| 161 | + filterCriteria={"awsAccountId": [{"comparison": "EQUALS", "value": f"{builder.account.id}"}]}, |
| 162 | + ): |
| 163 | + if finding := AwsInspectorFinding.from_api(item, builder): |
| 164 | + for fr in finding.finding_resources or []: |
| 165 | + clazz, res_filter = check_type_and_adjust_id(fr.type, fr.id) |
| 166 | + if clazz and res_filter: |
| 167 | + # append the finding when all resources have been collected |
| 168 | + builder.after_collect_actions.append( |
| 169 | + partial( |
| 170 | + add_finding, |
| 171 | + amazon_inspector, |
| 172 | + finding.parse_finding(item), |
| 173 | + clazz, |
| 174 | + **res_filter, |
| 175 | + ) |
| 176 | + ) |
| 177 | + except Boto3Error as e: |
| 178 | + msg = f"Error while collecting {cls.__name__} in region {builder.region.name}: {e}" |
| 179 | + builder.core_feedback.error(msg, log) |
| 180 | + raise |
| 181 | + except Exception as e: |
| 182 | + msg = f"Error while collecting {cls.__name__} in region {builder.region.name}: {e}" |
| 183 | + builder.core_feedback.info(msg, log) |
| 184 | + raise |
| 185 | + |
| 186 | + |
| 187 | +resources: List[Type[AwsResource]] = [AwsInspectorFinding] |
0 commit comments