diff --git a/docs/tutorials/compliance.md b/docs/tutorials/compliance.md index a85037a0dcc..6d39e96e847 100644 --- a/docs/tutorials/compliance.md +++ b/docs/tutorials/compliance.md @@ -1,5 +1,18 @@ # Compliance -Prowler allows you to execute checks based on requirements defined in compliance frameworks. +Prowler allows you to execute checks based on requirements defined in compliance frameworks. By default, it will execute and give you an overview of the status of each compliance framework: + + + +> You can find CSVs containing detailed compliance results inside the compliance folder within Prowler's output folder. + +## Execute Prowler based on Compliance Frameworks +Prowler can analyze your environment based on a specific compliance framework and get more details, to do it, you can use option `--compliance`: +```sh +prowler --compliance +``` +Standard results will be shown and additionally the framework information as the sample below for CIS AWS 1.5. For details a CSV file has been generated as well. + + ## List Available Compliance Frameworks In order to see which compliance frameworks are cover by Prowler, you can use option `--list-compliance`: @@ -10,9 +23,12 @@ Currently, the available frameworks are: - `cis_1.4_aws` - `cis_1.5_aws` +- `cis_2.0_aws` +- `cisa_aws` - `ens_rd2022_aws` - `aws_audit_manager_control_tower_guardrails_aws` - `aws_foundational_security_best_practices_aws` +- `aws_well_architected_framework_reliability_pillar_aws` - `aws_well_architected_framework_security_pillar_aws` - `cisa_aws` - `fedramp_low_revision_4_aws` @@ -22,6 +38,9 @@ Currently, the available frameworks are: - `gxp_eu_annex_11_aws` - `gxp_21_cfr_part_11_aws` - `hipaa_aws` +- `iso27001_2013_aws` +- `iso27001_2013_aws` +- `mitre_attack_aws` - `nist_800_53_revision_4_aws` - `nist_800_53_revision_5_aws` - `nist_800_171_revision_2_aws` @@ -38,7 +57,6 @@ prowler --list-compliance-requirements ``` Example for the first requirements of CIS 1.5 for AWS: - ``` Listing CIS 1.5 AWS Compliance Requirements: @@ -71,15 +89,6 @@ Requirement Id: 1.5 ``` -## Execute Prowler based on Compliance Frameworks -As we mentioned, Prowler can be execute to analyse you environment based on a specific compliance framework, to do it, you can use option `--compliance`: -```sh -prowler --compliance -``` -Standard results will be shown and additionally the framework information as the sample below for CIS AWS 1.5. For details a CSV file has been generated as well. - - - ## Create and contribute adding other Security Frameworks This information is part of the Developer Guide and can be found here: https://docs.prowler.cloud/en/latest/tutorials/developer-guide/. diff --git a/docs/tutorials/img/compliance.png b/docs/tutorials/img/compliance.png new file mode 100644 index 00000000000..1492089c86d Binary files /dev/null and b/docs/tutorials/img/compliance.png differ diff --git a/prowler/__main__.py b/prowler/__main__.py index 88288d9c433..7e03541ec22 100644 --- a/prowler/__main__.py +++ b/prowler/__main__.py @@ -6,6 +6,7 @@ from colorama import Fore, Style +from prowler.config.config import get_available_compliance_frameworks from prowler.lib.banner import print_banner from prowler.lib.check.check import ( bulk_load_checks_metadata, @@ -32,7 +33,7 @@ ) from prowler.lib.cli.parser import ProwlerArgumentParser from prowler.lib.logger import logger, set_logging_config -from prowler.lib.outputs.compliance import display_compliance_table +from prowler.lib.outputs.compliance.compliance import display_compliance_table from prowler.lib.outputs.html import add_html_footer, fill_html_overview_statistics from prowler.lib.outputs.json import close_json from prowler.lib.outputs.outputs import extract_findings_statistics @@ -81,6 +82,9 @@ def prowler(): # We treat the compliance framework as another output format if compliance_framework: args.output_modes.extend(compliance_framework) + # If no input compliance framework, set all + else: + args.output_modes.extend(get_available_compliance_frameworks(provider)) # Set Logger configuration set_logging_config(args.log_level, args.log_file, args.only_logs) @@ -311,8 +315,12 @@ def prowler(): provider, ) - if compliance_framework and findings: - for compliance in compliance_framework: + if findings: + compliance_overview = False + if not compliance_framework: + compliance_overview = True + compliance_framework = get_available_compliance_frameworks(provider) + for compliance in sorted(compliance_framework): # Display compliance table display_compliance_table( findings, @@ -320,6 +328,11 @@ def prowler(): compliance, audit_output_options.output_filename, audit_output_options.output_directory, + compliance_overview, + ) + if compliance_overview: + print( + f"\nDetailed compliance results are in {Fore.YELLOW}{audit_output_options.output_directory}/compliance/{Style.RESET_ALL}\n" ) # If custom checks were passed, remove the modules diff --git a/prowler/config/config.py b/prowler/config/config.py index 859907b1861..62d41b6758f 100644 --- a/prowler/config/config.py +++ b/prowler/config/config.py @@ -26,9 +26,12 @@ actual_directory = pathlib.Path(os.path.dirname(os.path.realpath(__file__))) -def get_available_compliance_frameworks(): +def get_available_compliance_frameworks(provider=None): available_compliance_frameworks = [] - for provider in ["aws", "gcp", "azure"]: + providers = ["aws", "gcp", "azure"] + if provider: + providers = [provider] + for provider in providers: with os.scandir(f"{actual_directory}/../compliance/{provider}") as files: for file in files: if file.is_file() and file.name.endswith(".json"): diff --git a/prowler/lib/outputs/compliance.py b/prowler/lib/outputs/compliance.py deleted file mode 100644 index 4395af2d3cc..00000000000 --- a/prowler/lib/outputs/compliance.py +++ /dev/null @@ -1,641 +0,0 @@ -import sys -from csv import DictWriter - -from colorama import Fore, Style -from tabulate import tabulate - -from prowler.config.config import orange_color, timestamp -from prowler.lib.check.models import Check_Report -from prowler.lib.logger import logger -from prowler.lib.outputs.models import ( - Check_Output_CSV_AWS_CIS, - Check_Output_CSV_AWS_ISO27001_2013, - Check_Output_CSV_AWS_Well_Architected, - Check_Output_CSV_ENS_RD2022, - Check_Output_CSV_GCP_CIS, - Check_Output_CSV_Generic_Compliance, - Check_Output_MITRE_ATTACK, - generate_csv_fields, - unroll_list, -) -from prowler.lib.utils.utils import outputs_unix_timestamp - - -def add_manual_controls(output_options, audit_info, file_descriptors): - try: - # Check if MANUAL control was already added to output - if "manual_check" in output_options.bulk_checks_metadata: - manual_finding = Check_Report( - output_options.bulk_checks_metadata["manual_check"].json() - ) - manual_finding.status = "INFO" - manual_finding.status_extended = "Manual check" - manual_finding.resource_id = "manual_check" - manual_finding.resource_name = "Manual check" - manual_finding.region = "" - manual_finding.location = "" - manual_finding.project_id = "" - fill_compliance( - output_options, manual_finding, audit_info, file_descriptors - ) - del output_options.bulk_checks_metadata["manual_check"] - except Exception as error: - logger.error( - f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" - ) - - -def fill_compliance(output_options, finding, audit_info, file_descriptors): - try: - # We have to retrieve all the check's compliance requirements - check_compliance = output_options.bulk_checks_metadata[ - finding.check_metadata.CheckID - ].Compliance - for compliance in check_compliance: - csv_header = compliance_row = compliance_output = None - if ( - compliance.Framework == "ENS" - and compliance.Version == "RD2022" - and "ens_rd2022_aws" in output_options.output_modes - ): - compliance_output = "ens_rd2022_aws" - for requirement in compliance.Requirements: - requirement_description = requirement.Description - requirement_id = requirement.Id - for attribute in requirement.Attributes: - compliance_row = Check_Output_CSV_ENS_RD2022( - Provider=finding.check_metadata.Provider, - Description=compliance.Description, - AccountId=audit_info.audited_account, - Region=finding.region, - AssessmentDate=outputs_unix_timestamp( - output_options.unix_timestamp, timestamp - ), - Requirements_Id=requirement_id, - Requirements_Description=requirement_description, - Requirements_Attributes_IdGrupoControl=attribute.IdGrupoControl, - Requirements_Attributes_Marco=attribute.Marco, - Requirements_Attributes_Categoria=attribute.Categoria, - Requirements_Attributes_DescripcionControl=attribute.DescripcionControl, - Requirements_Attributes_Nivel=attribute.Nivel, - Requirements_Attributes_Tipo=attribute.Tipo, - Requirements_Attributes_Dimensiones=",".join( - attribute.Dimensiones - ), - Status=finding.status, - StatusExtended=finding.status_extended, - ResourceId=finding.resource_id, - CheckId=finding.check_metadata.CheckID, - ) - - csv_header = generate_csv_fields(Check_Output_CSV_ENS_RD2022) - - elif compliance.Framework == "CIS" and "cis_" in str( - output_options.output_modes - ): - compliance_output = ( - "cis_" + compliance.Version + "_" + compliance.Provider.lower() - ) - # Only with the version of CIS that was selected - if compliance_output in str(output_options.output_modes): - for requirement in compliance.Requirements: - requirement_description = requirement.Description - requirement_id = requirement.Id - for attribute in requirement.Attributes: - if compliance.Provider == "AWS": - compliance_row = Check_Output_CSV_AWS_CIS( - Provider=finding.check_metadata.Provider, - Description=compliance.Description, - AccountId=audit_info.audited_account, - Region=finding.region, - AssessmentDate=outputs_unix_timestamp( - output_options.unix_timestamp, timestamp - ), - Requirements_Id=requirement_id, - Requirements_Description=requirement_description, - Requirements_Attributes_Section=attribute.Section, - Requirements_Attributes_Profile=attribute.Profile, - Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, - Requirements_Attributes_Description=attribute.Description, - Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, - Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, - Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, - Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, - Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, - Requirements_Attributes_References=attribute.References, - Status=finding.status, - StatusExtended=finding.status_extended, - ResourceId=finding.resource_id, - CheckId=finding.check_metadata.CheckID, - ) - csv_header = generate_csv_fields( - Check_Output_CSV_AWS_CIS - ) - elif compliance.Provider == "GCP": - compliance_row = Check_Output_CSV_GCP_CIS( - Provider=finding.check_metadata.Provider, - Description=compliance.Description, - ProjectId=finding.project_id, - Location=finding.location.lower(), - AssessmentDate=outputs_unix_timestamp( - output_options.unix_timestamp, timestamp - ), - Requirements_Id=requirement_id, - Requirements_Description=requirement_description, - Requirements_Attributes_Section=attribute.Section, - Requirements_Attributes_Profile=attribute.Profile, - Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, - Requirements_Attributes_Description=attribute.Description, - Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, - Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, - Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, - Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, - Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, - Requirements_Attributes_References=attribute.References, - Status=finding.status, - StatusExtended=finding.status_extended, - ResourceId=finding.resource_id, - ResourceName=finding.resource_name, - CheckId=finding.check_metadata.CheckID, - ) - csv_header = generate_csv_fields( - Check_Output_CSV_GCP_CIS - ) - - elif ( - "AWS-Well-Architected-Framework" in compliance.Framework - and compliance.Provider == "AWS" - ): - compliance_output = compliance.Framework - if compliance.Version != "": - compliance_output += "_" + compliance.Version - if compliance.Provider != "": - compliance_output += "_" + compliance.Provider - - compliance_output = compliance_output.lower().replace("-", "_") - if compliance_output in output_options.output_modes: - for requirement in compliance.Requirements: - requirement_description = requirement.Description - requirement_id = requirement.Id - for attribute in requirement.Attributes: - compliance_row = Check_Output_CSV_AWS_Well_Architected( - Provider=finding.check_metadata.Provider, - Description=compliance.Description, - AccountId=audit_info.audited_account, - Region=finding.region, - AssessmentDate=outputs_unix_timestamp( - output_options.unix_timestamp, timestamp - ), - Requirements_Id=requirement_id, - Requirements_Description=requirement_description, - Requirements_Attributes_Name=attribute.Name, - Requirements_Attributes_WellArchitectedQuestionId=attribute.WellArchitectedQuestionId, - Requirements_Attributes_WellArchitectedPracticeId=attribute.WellArchitectedPracticeId, - Requirements_Attributes_Section=attribute.Section, - Requirements_Attributes_SubSection=attribute.SubSection, - Requirements_Attributes_LevelOfRisk=attribute.LevelOfRisk, - Requirements_Attributes_AssessmentMethod=attribute.AssessmentMethod, - Requirements_Attributes_Description=attribute.Description, - Requirements_Attributes_ImplementationGuidanceUrl=attribute.ImplementationGuidanceUrl, - Status=finding.status, - StatusExtended=finding.status_extended, - ResourceId=finding.resource_id, - CheckId=finding.check_metadata.CheckID, - ) - - csv_header = generate_csv_fields( - Check_Output_CSV_AWS_Well_Architected - ) - - elif ( - compliance.Framework == "ISO27001" - and compliance.Version == "2013" - and compliance.Provider == "AWS" - ): - compliance_output = compliance.Framework - if compliance.Version != "": - compliance_output += "_" + compliance.Version - if compliance.Provider != "": - compliance_output += "_" + compliance.Provider - - compliance_output = compliance_output.lower().replace("-", "_") - if compliance_output in output_options.output_modes: - for requirement in compliance.Requirements: - requirement_description = requirement.Description - requirement_id = requirement.Id - requirement_name = requirement.Name - for attribute in requirement.Attributes: - compliance_row = Check_Output_CSV_AWS_ISO27001_2013( - Provider=finding.check_metadata.Provider, - Description=compliance.Description, - AccountId=audit_info.audited_account, - Region=finding.region, - AssessmentDate=outputs_unix_timestamp( - output_options.unix_timestamp, timestamp - ), - Requirements_Id=requirement_id, - Requirements_Name=requirement_name, - Requirements_Description=requirement_description, - Requirements_Attributes_Category=attribute.Category, - Requirements_Attributes_Objetive_ID=attribute.Objetive_ID, - Requirements_Attributes_Objetive_Name=attribute.Objetive_Name, - Requirements_Attributes_Check_Summary=attribute.Check_Summary, - Status=finding.status, - StatusExtended=finding.status_extended, - ResourceId=finding.resource_id, - CheckId=finding.check_metadata.CheckID, - ) - - csv_header = generate_csv_fields(Check_Output_CSV_AWS_ISO27001_2013) - - elif ( - compliance.Framework == "MITRE-ATTACK" - and compliance.Version == "" - and compliance.Provider == "AWS" - ): - compliance_output = compliance.Framework - if compliance.Version != "": - compliance_output += "_" + compliance.Version - if compliance.Provider != "": - compliance_output += "_" + compliance.Provider - - compliance_output = compliance_output.lower().replace("-", "_") - if compliance_output in output_options.output_modes: - for requirement in compliance.Requirements: - requirement_description = requirement.Description - requirement_id = requirement.Id - requirement_name = requirement.Name - attributes_aws_services = "" - attributes_categories = "" - attributes_values = "" - attributes_comments = "" - for attribute in requirement.Attributes: - attributes_aws_services += attribute.AWSService + "\n" - attributes_categories += attribute.Category + "\n" - attributes_values += attribute.Value + "\n" - attributes_comments += attribute.Comment + "\n" - compliance_row = Check_Output_MITRE_ATTACK( - Provider=finding.check_metadata.Provider, - Description=compliance.Description, - AccountId=audit_info.audited_account, - Region=finding.region, - AssessmentDate=outputs_unix_timestamp( - output_options.unix_timestamp, timestamp - ), - Requirements_Id=requirement_id, - Requirements_Description=requirement_description, - Requirements_Name=requirement_name, - Requirements_Tactics=unroll_list(requirement.Tactics), - Requirements_SubTechniques=unroll_list( - requirement.SubTechniques - ), - Requirements_Platforms=unroll_list(requirement.Platforms), - Requirements_TechniqueURL=requirement.TechniqueURL, - Requirements_Attributes_AWSServices=attributes_aws_services, - Requirements_Attributes_Categories=attributes_categories, - Requirements_Attributes_Values=attributes_values, - Requirements_Attributes_Comments=attributes_comments, - Status=finding.status, - StatusExtended=finding.status_extended, - ResourceId=finding.resource_id, - CheckId=finding.check_metadata.CheckID, - ) - - csv_header = generate_csv_fields(Check_Output_MITRE_ATTACK) - - else: - compliance_output = compliance.Framework - if compliance.Version != "": - compliance_output += "_" + compliance.Version - if compliance.Provider != "": - compliance_output += "_" + compliance.Provider - - compliance_output = compliance_output.lower().replace("-", "_") - if compliance_output in output_options.output_modes: - for requirement in compliance.Requirements: - requirement_description = requirement.Description - requirement_id = requirement.Id - for attribute in requirement.Attributes: - compliance_row = Check_Output_CSV_Generic_Compliance( - Provider=finding.check_metadata.Provider, - Description=compliance.Description, - AccountId=audit_info.audited_account, - Region=finding.region, - AssessmentDate=outputs_unix_timestamp( - output_options.unix_timestamp, timestamp - ), - Requirements_Id=requirement_id, - Requirements_Description=requirement_description, - Requirements_Attributes_Section=attribute.Section, - Requirements_Attributes_SubSection=attribute.SubSection, - Requirements_Attributes_SubGroup=attribute.SubGroup, - Requirements_Attributes_Service=attribute.Service, - Requirements_Attributes_Soc_Type=attribute.Soc_Type, - Status=finding.status, - StatusExtended=finding.status_extended, - ResourceId=finding.resource_id, - CheckId=finding.check_metadata.CheckID, - ) - - csv_header = generate_csv_fields( - Check_Output_CSV_Generic_Compliance - ) - - if compliance_row: - csv_writer = DictWriter( - file_descriptors[compliance_output], - fieldnames=csv_header, - delimiter=";", - ) - csv_writer.writerow(compliance_row.__dict__) - except Exception as error: - logger.error( - f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" - ) - - -def display_compliance_table( - findings: list, - bulk_checks_metadata: dict, - compliance_framework: str, - output_filename: str, - output_directory: str, -): - try: - if "ens_rd2022_aws" == compliance_framework: - marcos = {} - ens_compliance_table = { - "Proveedor": [], - "Marco/Categoria": [], - "Estado": [], - "Alto": [], - "Medio": [], - "Bajo": [], - "Opcional": [], - } - pass_count = fail_count = 0 - for finding in findings: - check = bulk_checks_metadata[finding.check_metadata.CheckID] - check_compliances = check.Compliance - for compliance in check_compliances: - if ( - compliance.Framework == "ENS" - and compliance.Provider == "AWS" - and compliance.Version == "RD2022" - ): - compliance_version = compliance.Version - compliance_fm = compliance.Framework - compliance_provider = compliance.Provider - for requirement in compliance.Requirements: - for attribute in requirement.Attributes: - marco_categoria = ( - f"{attribute.Marco}/{attribute.Categoria}" - ) - # Check if Marco/Categoria exists - if marco_categoria not in marcos: - marcos[marco_categoria] = { - "Estado": f"{Fore.GREEN}CUMPLE{Style.RESET_ALL}", - "Opcional": 0, - "Alto": 0, - "Medio": 0, - "Bajo": 0, - } - if finding.status == "FAIL": - fail_count += 1 - marcos[marco_categoria][ - "Estado" - ] = f"{Fore.RED}NO CUMPLE{Style.RESET_ALL}" - elif finding.status == "PASS": - pass_count += 1 - if attribute.Nivel == "opcional": - marcos[marco_categoria]["Opcional"] += 1 - elif attribute.Nivel == "alto": - marcos[marco_categoria]["Alto"] += 1 - elif attribute.Nivel == "medio": - marcos[marco_categoria]["Medio"] += 1 - elif attribute.Nivel == "bajo": - marcos[marco_categoria]["Bajo"] += 1 - - # Add results to table - for marco in sorted(marcos): - ens_compliance_table["Proveedor"].append(compliance.Provider) - ens_compliance_table["Marco/Categoria"].append(marco) - ens_compliance_table["Estado"].append(marcos[marco]["Estado"]) - ens_compliance_table["Opcional"].append( - f"{Fore.BLUE}{marcos[marco]['Opcional']}{Style.RESET_ALL}" - ) - ens_compliance_table["Alto"].append( - f"{Fore.LIGHTRED_EX}{marcos[marco]['Alto']}{Style.RESET_ALL}" - ) - ens_compliance_table["Medio"].append( - f"{orange_color}{marcos[marco]['Medio']}{Style.RESET_ALL}" - ) - ens_compliance_table["Bajo"].append( - f"{Fore.YELLOW}{marcos[marco]['Bajo']}{Style.RESET_ALL}" - ) - if fail_count + pass_count < 0: - print( - f"\n {Style.BRIGHT}There are no resources for {Fore.YELLOW}{compliance_fm} {compliance_version} - {compliance_provider}{Style.RESET_ALL}.\n" - ) - else: - print( - f"\nEstado de Cumplimiento de {Fore.YELLOW}{compliance_fm} {compliance_version} - {compliance_provider}{Style.RESET_ALL}:" - ) - overview_table = [ - [ - f"{Fore.RED}{round(fail_count/(fail_count+pass_count)*100, 2)}% ({fail_count}) NO CUMPLE{Style.RESET_ALL}", - f"{Fore.GREEN}{round(pass_count/(fail_count+pass_count)*100, 2)}% ({pass_count}) CUMPLE{Style.RESET_ALL}", - ] - ] - print(tabulate(overview_table, tablefmt="rounded_grid")) - print( - f"\nResultados de {Fore.YELLOW}{compliance_fm} {compliance_version} - {compliance_provider}{Style.RESET_ALL}:" - ) - print( - tabulate( - ens_compliance_table, headers="keys", tablefmt="rounded_grid" - ) - ) - print( - f"{Style.BRIGHT}* Solo aparece el Marco/Categoria que contiene resultados.{Style.RESET_ALL}" - ) - print(f"\nResultados detallados de {compliance_fm} en:") - print( - f" - CSV: {output_directory}/{output_filename}_{compliance_framework}.csv\n" - ) - elif "cis_" in compliance_framework: - sections = {} - cis_compliance_table = { - "Provider": [], - "Section": [], - "Level 1": [], - "Level 2": [], - } - pass_count = fail_count = 0 - for finding in findings: - check = bulk_checks_metadata[finding.check_metadata.CheckID] - check_compliances = check.Compliance - for compliance in check_compliances: - if ( - compliance.Framework == "CIS" - and compliance.Version in compliance_framework - ): - compliance_version = compliance.Version - compliance_fm = compliance.Framework - for requirement in compliance.Requirements: - for attribute in requirement.Attributes: - section = attribute.Section - # Check if Section exists - if section not in sections: - sections[section] = { - "Status": f"{Fore.GREEN}PASS{Style.RESET_ALL}", - "Level 1": {"FAIL": 0, "PASS": 0}, - "Level 2": {"FAIL": 0, "PASS": 0}, - } - if finding.status == "FAIL": - fail_count += 1 - elif finding.status == "PASS": - pass_count += 1 - if attribute.Profile == "Level 1": - if finding.status == "FAIL": - sections[section]["Level 1"]["FAIL"] += 1 - else: - sections[section]["Level 1"]["PASS"] += 1 - elif attribute.Profile == "Level 2": - if finding.status == "FAIL": - sections[section]["Level 2"]["FAIL"] += 1 - else: - sections[section]["Level 2"]["PASS"] += 1 - - # Add results to table - sections = dict(sorted(sections.items())) - for section in sections: - cis_compliance_table["Provider"].append(compliance.Provider) - cis_compliance_table["Section"].append(section) - if sections[section]["Level 1"]["FAIL"] > 0: - cis_compliance_table["Level 1"].append( - f"{Fore.RED}FAIL({sections[section]['Level 1']['FAIL']}){Style.RESET_ALL}" - ) - else: - cis_compliance_table["Level 1"].append( - f"{Fore.GREEN}PASS({sections[section]['Level 1']['PASS']}){Style.RESET_ALL}" - ) - if sections[section]["Level 2"]["FAIL"] > 0: - cis_compliance_table["Level 2"].append( - f"{Fore.RED}FAIL({sections[section]['Level 2']['FAIL']}){Style.RESET_ALL}" - ) - else: - cis_compliance_table["Level 2"].append( - f"{Fore.GREEN}PASS({sections[section]['Level 2']['PASS']}){Style.RESET_ALL}" - ) - if fail_count + pass_count < 1: - print( - f"\n {Style.BRIGHT}There are no resources for {Fore.YELLOW}{compliance_fm}-{compliance_version}{Style.RESET_ALL}.\n" - ) - else: - print( - f"\nCompliance Status of {Fore.YELLOW}{compliance_fm}-{compliance_version}{Style.RESET_ALL} Framework:" - ) - overview_table = [ - [ - f"{Fore.RED}{round(fail_count/(fail_count+pass_count)*100, 2)}% ({fail_count}) FAIL{Style.RESET_ALL}", - f"{Fore.GREEN}{round(pass_count/(fail_count+pass_count)*100, 2)}% ({pass_count}) PASS{Style.RESET_ALL}", - ] - ] - print(tabulate(overview_table, tablefmt="rounded_grid")) - print( - f"\nFramework {Fore.YELLOW}{compliance_fm}-{compliance_version}{Style.RESET_ALL} Results:" - ) - print( - tabulate( - cis_compliance_table, headers="keys", tablefmt="rounded_grid" - ) - ) - print( - f"{Style.BRIGHT}* Only sections containing results appear.{Style.RESET_ALL}" - ) - print(f"\nDetailed results of {compliance_fm} are in:") - print( - f" - CSV: {output_directory}/{output_filename}_{compliance_framework}.csv\n" - ) - elif "mitre_attack" in compliance_framework: - tactics = {} - mitre_compliance_table = { - "Provider": [], - "Tactic": [], - "Status": [], - } - pass_count = fail_count = 0 - for finding in findings: - check = bulk_checks_metadata[finding.check_metadata.CheckID] - check_compliances = check.Compliance - for compliance in check_compliances: - if ( - "MITRE-ATTACK" in compliance.Framework - and compliance.Version in compliance_framework - ): - compliance_fm = compliance.Framework - for requirement in compliance.Requirements: - for tactic in requirement.Tactics: - if tactic not in tactics: - tactics[tactic] = {"FAIL": 0, "PASS": 0} - if finding.status == "FAIL": - fail_count += 1 - tactics[tactic]["FAIL"] += 1 - elif finding.status == "PASS": - pass_count += 1 - tactics[tactic]["PASS"] += 1 - - # Add results to table - tactics = dict(sorted(tactics.items())) - for tactic in tactics: - mitre_compliance_table["Provider"].append(compliance.Provider) - mitre_compliance_table["Tactic"].append(tactic) - if tactics[tactic]["FAIL"] > 0: - mitre_compliance_table["Status"].append( - f"{Fore.RED}FAIL({tactics[tactic]['FAIL']}){Style.RESET_ALL}" - ) - else: - mitre_compliance_table["Status"].append( - f"{Fore.GREEN}PASS({tactics[tactic]['PASS']}){Style.RESET_ALL}" - ) - if fail_count + pass_count < 1: - print( - f"\n {Style.BRIGHT}There are no resources for {Fore.YELLOW}{compliance_fm}{Style.RESET_ALL}.\n" - ) - else: - print( - f"\nCompliance Status of {Fore.YELLOW}{compliance_fm}{Style.RESET_ALL} Framework:" - ) - overview_table = [ - [ - f"{Fore.RED}{round(fail_count/(fail_count+pass_count)*100, 2)}% ({fail_count}) FAIL{Style.RESET_ALL}", - f"{Fore.GREEN}{round(pass_count/(fail_count+pass_count)*100, 2)}% ({pass_count}) PASS{Style.RESET_ALL}", - ] - ] - print(tabulate(overview_table, tablefmt="rounded_grid")) - print( - f"\nFramework {Fore.YELLOW}{compliance_fm}{Style.RESET_ALL} Results:" - ) - print( - tabulate( - mitre_compliance_table, headers="keys", tablefmt="rounded_grid" - ) - ) - print( - f"{Style.BRIGHT}* Only sections containing results appear.{Style.RESET_ALL}" - ) - print(f"\nDetailed results of {compliance_fm} are in:") - print( - f" - CSV: {output_directory}/{output_filename}_{compliance_framework}.csv\n" - ) - else: - print(f"\nDetailed results of {compliance_framework.upper()} are in:") - print( - f" - CSV: {output_directory}/{output_filename}_{compliance_framework}.csv\n" - ) - except Exception as error: - logger.critical( - f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}" - ) - sys.exit(1) diff --git a/prowler/lib/outputs/compliance/__init__.py b/prowler/lib/outputs/compliance/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/prowler/lib/outputs/compliance/aws_well_architected_framework.py b/prowler/lib/outputs/compliance/aws_well_architected_framework.py new file mode 100644 index 00000000000..f13d2f935ec --- /dev/null +++ b/prowler/lib/outputs/compliance/aws_well_architected_framework.py @@ -0,0 +1,55 @@ +from csv import DictWriter + +from prowler.config.config import timestamp +from prowler.lib.outputs.models import ( + Check_Output_CSV_AWS_Well_Architected, + generate_csv_fields, +) +from prowler.lib.utils.utils import outputs_unix_timestamp + + +def write_compliance_row_aws_well_architected_framework( + file_descriptors, finding, compliance, output_options, audit_info +): + compliance_output = compliance.Framework + if compliance.Version != "": + compliance_output += "_" + compliance.Version + if compliance.Provider != "": + compliance_output += "_" + compliance.Provider + compliance_output = compliance_output.lower().replace("-", "_") + csv_header = generate_csv_fields(Check_Output_CSV_AWS_Well_Architected) + csv_writer = DictWriter( + file_descriptors[compliance_output], + fieldnames=csv_header, + delimiter=";", + ) + for requirement in compliance.Requirements: + requirement_description = requirement.Description + requirement_id = requirement.Id + for attribute in requirement.Attributes: + compliance_row = Check_Output_CSV_AWS_Well_Architected( + Provider=finding.check_metadata.Provider, + Description=compliance.Description, + AccountId=audit_info.audited_account, + Region=finding.region, + AssessmentDate=outputs_unix_timestamp( + output_options.unix_timestamp, timestamp + ), + Requirements_Id=requirement_id, + Requirements_Description=requirement_description, + Requirements_Attributes_Name=attribute.Name, + Requirements_Attributes_WellArchitectedQuestionId=attribute.WellArchitectedQuestionId, + Requirements_Attributes_WellArchitectedPracticeId=attribute.WellArchitectedPracticeId, + Requirements_Attributes_Section=attribute.Section, + Requirements_Attributes_SubSection=attribute.SubSection, + Requirements_Attributes_LevelOfRisk=attribute.LevelOfRisk, + Requirements_Attributes_AssessmentMethod=attribute.AssessmentMethod, + Requirements_Attributes_Description=attribute.Description, + Requirements_Attributes_ImplementationGuidanceUrl=attribute.ImplementationGuidanceUrl, + Status=finding.status, + StatusExtended=finding.status_extended, + ResourceId=finding.resource_id, + CheckId=finding.check_metadata.CheckID, + ) + + csv_writer.writerow(compliance_row.__dict__) diff --git a/prowler/lib/outputs/compliance/cis.py b/prowler/lib/outputs/compliance/cis.py new file mode 100644 index 00000000000..5cdfa250f96 --- /dev/null +++ b/prowler/lib/outputs/compliance/cis.py @@ -0,0 +1,36 @@ +from prowler.lib.outputs.compliance.cis_aws import generate_compliance_row_cis_aws +from prowler.lib.outputs.compliance.cis_gcp import generate_compliance_row_cis_gcp +from prowler.lib.outputs.csv import write_csv + + +def write_compliance_row_cis( + file_descriptors, + finding, + compliance, + output_options, + audit_info, + input_compliance_frameworks, +): + compliance_output = "cis_" + compliance.Version + "_" + compliance.Provider.lower() + + # Only with the version of CIS that was selected + if compliance_output in str(input_compliance_frameworks): + for requirement in compliance.Requirements: + for attribute in requirement.Attributes: + if compliance.Provider == "AWS": + (compliance_row, csv_header) = generate_compliance_row_cis_aws( + finding, + compliance, + requirement, + attribute, + output_options, + audit_info, + ) + elif compliance.Provider == "GCP": + (compliance_row, csv_header) = generate_compliance_row_cis_gcp( + finding, compliance, output_options + ) + + write_csv( + file_descriptors[compliance_output], csv_header, compliance_row + ) diff --git a/prowler/lib/outputs/compliance/cis_aws.py b/prowler/lib/outputs/compliance/cis_aws.py new file mode 100644 index 00000000000..31604da5eed --- /dev/null +++ b/prowler/lib/outputs/compliance/cis_aws.py @@ -0,0 +1,34 @@ +from prowler.config.config import timestamp +from prowler.lib.outputs.models import Check_Output_CSV_AWS_CIS, generate_csv_fields +from prowler.lib.utils.utils import outputs_unix_timestamp + + +def generate_compliance_row_cis_aws( + finding, compliance, requirement, attribute, output_options, audit_info +): + compliance_row = Check_Output_CSV_AWS_CIS( + Provider=finding.check_metadata.Provider, + Description=compliance.Description, + AccountId=audit_info.audited_account, + Region=finding.region, + AssessmentDate=outputs_unix_timestamp(output_options.unix_timestamp, timestamp), + Requirements_Id=requirement.Id, + Requirements_Description=requirement.Description, + Requirements_Attributes_Section=attribute.Section, + Requirements_Attributes_Profile=attribute.Profile, + Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, + Requirements_Attributes_Description=attribute.Description, + Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, + Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, + Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, + Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, + Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, + Requirements_Attributes_References=attribute.References, + Status=finding.status, + StatusExtended=finding.status_extended, + ResourceId=finding.resource_id, + CheckId=finding.check_metadata.CheckID, + ) + csv_header = generate_csv_fields(Check_Output_CSV_AWS_CIS) + + return compliance_row, csv_header diff --git a/prowler/lib/outputs/compliance/cis_gcp.py b/prowler/lib/outputs/compliance/cis_gcp.py new file mode 100644 index 00000000000..bbcbb2ff33e --- /dev/null +++ b/prowler/lib/outputs/compliance/cis_gcp.py @@ -0,0 +1,35 @@ +from prowler.config.config import timestamp +from prowler.lib.outputs.models import Check_Output_CSV_GCP_CIS, generate_csv_fields +from prowler.lib.utils.utils import outputs_unix_timestamp + + +def generate_compliance_row_cis_gcp( + finding, compliance, requirement, attribute, output_options +): + compliance_row = Check_Output_CSV_GCP_CIS( + Provider=finding.check_metadata.Provider, + Description=compliance.Description, + ProjectId=finding.project_id, + Location=finding.location.lower(), + AssessmentDate=outputs_unix_timestamp(output_options.unix_timestamp, timestamp), + Requirements_Id=requirement.Id, + Requirements_Description=requirement.Description, + Requirements_Attributes_Section=attribute.Section, + Requirements_Attributes_Profile=attribute.Profile, + Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, + Requirements_Attributes_Description=attribute.Description, + Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, + Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, + Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, + Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, + Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, + Requirements_Attributes_References=attribute.References, + Status=finding.status, + StatusExtended=finding.status_extended, + ResourceId=finding.resource_id, + ResourceName=finding.resource_name, + CheckId=finding.check_metadata.CheckID, + ) + csv_header = generate_csv_fields(Check_Output_CSV_GCP_CIS) + + return compliance_row, csv_header diff --git a/prowler/lib/outputs/compliance/compliance.py b/prowler/lib/outputs/compliance/compliance.py new file mode 100644 index 00000000000..b5675465903 --- /dev/null +++ b/prowler/lib/outputs/compliance/compliance.py @@ -0,0 +1,472 @@ +import sys + +from colorama import Fore, Style +from tabulate import tabulate + +from prowler.config.config import orange_color +from prowler.lib.check.models import Check_Report +from prowler.lib.logger import logger +from prowler.lib.outputs.compliance.aws_well_architected_framework import ( + write_compliance_row_aws_well_architected_framework, +) +from prowler.lib.outputs.compliance.cis import write_compliance_row_cis +from prowler.lib.outputs.compliance.ens_rd2022_aws import ( + write_compliance_row_ens_rd2022_aws, +) +from prowler.lib.outputs.compliance.generic import write_compliance_row_generic +from prowler.lib.outputs.compliance.iso27001_2013_aws import ( + write_compliance_row_iso27001_2013_aws, +) +from prowler.lib.outputs.compliance.mitre_attack_aws import ( + write_compliance_row_mitre_attack_aws, +) + + +def add_manual_controls( + output_options, audit_info, file_descriptors, input_compliance_frameworks +): + try: + # Check if MANUAL control was already added to output + if "manual_check" in output_options.bulk_checks_metadata: + manual_finding = Check_Report( + output_options.bulk_checks_metadata["manual_check"].json() + ) + manual_finding.status = "INFO" + manual_finding.status_extended = "Manual check" + manual_finding.resource_id = "manual_check" + manual_finding.resource_name = "Manual check" + manual_finding.region = "" + manual_finding.location = "" + manual_finding.project_id = "" + fill_compliance( + output_options, + manual_finding, + audit_info, + file_descriptors, + input_compliance_frameworks, + ) + del output_options.bulk_checks_metadata["manual_check"] + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +def get_check_compliance_frameworks_in_input( + check_id, bulk_checks_metadata, input_compliance_frameworks +): + """get_check_compliance_frameworks_in_input returns a list of Compliance for the given check if the compliance framework is present in the input compliance to execute""" + check_compliances = [] + if bulk_checks_metadata and bulk_checks_metadata[check_id]: + for compliance in bulk_checks_metadata[check_id].Compliance: + compliance_name = "" + if compliance.Version: + compliance_name = ( + compliance.Framework.lower() + + "_" + + compliance.Version.lower() + + "_" + + compliance.Provider.lower() + ) + else: + compliance_name = ( + compliance.Framework.lower() + "_" + compliance.Provider.lower() + ) + if compliance_name.replace("-", "_") in input_compliance_frameworks: + check_compliances.append(compliance) + + return check_compliances + + +def fill_compliance( + output_options, finding, audit_info, file_descriptors, input_compliance_frameworks +): + try: + # We have to retrieve all the check's compliance requirements and get the ones matching with the input ones + check_compliances = get_check_compliance_frameworks_in_input( + finding.check_metadata.CheckID, + output_options.bulk_checks_metadata, + input_compliance_frameworks, + ) + + for compliance in check_compliances: + if compliance.Framework == "ENS" and compliance.Version == "RD2022": + write_compliance_row_ens_rd2022_aws( + file_descriptors, finding, compliance, output_options, audit_info + ) + + elif compliance.Framework == "CIS": + write_compliance_row_cis( + file_descriptors, + finding, + compliance, + output_options, + audit_info, + input_compliance_frameworks, + ) + + elif ( + "AWS-Well-Architected-Framework" in compliance.Framework + and compliance.Provider == "AWS" + ): + write_compliance_row_aws_well_architected_framework( + file_descriptors, finding, compliance, output_options, audit_info + ) + + elif ( + compliance.Framework == "ISO27001" + and compliance.Version == "2013" + and compliance.Provider == "AWS" + ): + write_compliance_row_iso27001_2013_aws( + file_descriptors, finding, compliance, output_options, audit_info + ) + + elif ( + compliance.Framework == "MITRE-ATTACK" + and compliance.Version == "" + and compliance.Provider == "AWS" + ): + write_compliance_row_mitre_attack_aws( + file_descriptors, finding, compliance, output_options, audit_info + ) + + else: + write_compliance_row_generic( + file_descriptors, finding, compliance, output_options, audit_info + ) + + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +def display_compliance_table( + findings: list, + bulk_checks_metadata: dict, + compliance_framework: str, + output_filename: str, + output_directory: str, + compliance_overview: bool, +): + try: + if "ens_rd2022_aws" == compliance_framework: + marcos = {} + ens_compliance_table = { + "Proveedor": [], + "Marco/Categoria": [], + "Estado": [], + "Alto": [], + "Medio": [], + "Bajo": [], + "Opcional": [], + } + pass_count = fail_count = 0 + for finding in findings: + check = bulk_checks_metadata[finding.check_metadata.CheckID] + check_compliances = check.Compliance + for compliance in check_compliances: + if ( + compliance.Framework == "ENS" + and compliance.Provider == "AWS" + and compliance.Version == "RD2022" + ): + compliance_version = compliance.Version + compliance_fm = compliance.Framework + compliance_provider = compliance.Provider + for requirement in compliance.Requirements: + for attribute in requirement.Attributes: + marco_categoria = ( + f"{attribute.Marco}/{attribute.Categoria}" + ) + # Check if Marco/Categoria exists + if marco_categoria not in marcos: + marcos[marco_categoria] = { + "Estado": f"{Fore.GREEN}CUMPLE{Style.RESET_ALL}", + "Opcional": 0, + "Alto": 0, + "Medio": 0, + "Bajo": 0, + } + if finding.status == "FAIL": + fail_count += 1 + marcos[marco_categoria][ + "Estado" + ] = f"{Fore.RED}NO CUMPLE{Style.RESET_ALL}" + elif finding.status == "PASS": + pass_count += 1 + if attribute.Nivel == "opcional": + marcos[marco_categoria]["Opcional"] += 1 + elif attribute.Nivel == "alto": + marcos[marco_categoria]["Alto"] += 1 + elif attribute.Nivel == "medio": + marcos[marco_categoria]["Medio"] += 1 + elif attribute.Nivel == "bajo": + marcos[marco_categoria]["Bajo"] += 1 + + # Add results to table + for marco in sorted(marcos): + ens_compliance_table["Proveedor"].append(compliance.Provider) + ens_compliance_table["Marco/Categoria"].append(marco) + ens_compliance_table["Estado"].append(marcos[marco]["Estado"]) + ens_compliance_table["Opcional"].append( + f"{Fore.BLUE}{marcos[marco]['Opcional']}{Style.RESET_ALL}" + ) + ens_compliance_table["Alto"].append( + f"{Fore.LIGHTRED_EX}{marcos[marco]['Alto']}{Style.RESET_ALL}" + ) + ens_compliance_table["Medio"].append( + f"{orange_color}{marcos[marco]['Medio']}{Style.RESET_ALL}" + ) + ens_compliance_table["Bajo"].append( + f"{Fore.YELLOW}{marcos[marco]['Bajo']}{Style.RESET_ALL}" + ) + if fail_count + pass_count < 0: + print( + f"\n {Style.BRIGHT}There are no resources for {Fore.YELLOW}{compliance_fm}_{compliance_version}_{compliance_provider}{Style.RESET_ALL}.\n" + ) + else: + print( + f"\nEstado de Cumplimiento de {Fore.YELLOW}{compliance_fm}_{compliance_version}_{compliance_provider}{Style.RESET_ALL}:" + ) + overview_table = [ + [ + f"{Fore.RED}{round(fail_count / (fail_count + pass_count) * 100, 2)}% ({fail_count}) NO CUMPLE{Style.RESET_ALL}", + f"{Fore.GREEN}{round(pass_count / (fail_count + pass_count) * 100, 2)}% ({pass_count}) CUMPLE{Style.RESET_ALL}", + ] + ] + print(tabulate(overview_table, tablefmt="rounded_grid")) + if not compliance_overview: + print( + f"\nResultados de {Fore.YELLOW}{compliance_fm}_{compliance_version}_{compliance_provider}{Style.RESET_ALL}:" + ) + print( + tabulate( + ens_compliance_table, + headers="keys", + tablefmt="rounded_grid", + ) + ) + print( + f"{Style.BRIGHT}* Solo aparece el Marco/Categoria que contiene resultados.{Style.RESET_ALL}" + ) + print(f"\nResultados detallados de {compliance_fm} en:") + print( + f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework}.csv\n" + ) + elif "cis_" in compliance_framework: + sections = {} + cis_compliance_table = { + "Provider": [], + "Section": [], + "Level 1": [], + "Level 2": [], + } + pass_count = fail_count = 0 + for finding in findings: + check = bulk_checks_metadata[finding.check_metadata.CheckID] + check_compliances = check.Compliance + for compliance in check_compliances: + if ( + compliance.Framework == "CIS" + and compliance.Version in compliance_framework + ): + compliance_version = compliance.Version + compliance_fm = compliance.Framework + for requirement in compliance.Requirements: + for attribute in requirement.Attributes: + section = attribute.Section + # Check if Section exists + if section not in sections: + sections[section] = { + "Status": f"{Fore.GREEN}PASS{Style.RESET_ALL}", + "Level 1": {"FAIL": 0, "PASS": 0}, + "Level 2": {"FAIL": 0, "PASS": 0}, + } + if finding.status == "FAIL": + fail_count += 1 + elif finding.status == "PASS": + pass_count += 1 + if attribute.Profile == "Level 1": + if finding.status == "FAIL": + sections[section]["Level 1"]["FAIL"] += 1 + else: + sections[section]["Level 1"]["PASS"] += 1 + elif attribute.Profile == "Level 2": + if finding.status == "FAIL": + sections[section]["Level 2"]["FAIL"] += 1 + else: + sections[section]["Level 2"]["PASS"] += 1 + + # Add results to table + sections = dict(sorted(sections.items())) + for section in sections: + cis_compliance_table["Provider"].append(compliance.Provider) + cis_compliance_table["Section"].append(section) + if sections[section]["Level 1"]["FAIL"] > 0: + cis_compliance_table["Level 1"].append( + f"{Fore.RED}FAIL({sections[section]['Level 1']['FAIL']}){Style.RESET_ALL}" + ) + else: + cis_compliance_table["Level 1"].append( + f"{Fore.GREEN}PASS({sections[section]['Level 1']['PASS']}){Style.RESET_ALL}" + ) + if sections[section]["Level 2"]["FAIL"] > 0: + cis_compliance_table["Level 2"].append( + f"{Fore.RED}FAIL({sections[section]['Level 2']['FAIL']}){Style.RESET_ALL}" + ) + else: + cis_compliance_table["Level 2"].append( + f"{Fore.GREEN}PASS({sections[section]['Level 2']['PASS']}){Style.RESET_ALL}" + ) + if fail_count + pass_count < 1: + print( + f"\n {Style.BRIGHT}There are no resources for {Fore.YELLOW}{compliance_fm}_{compliance_version}{Style.RESET_ALL}.\n" + ) + else: + print( + f"\nCompliance Status of {Fore.YELLOW}{compliance_fm}_{compliance_version}{Style.RESET_ALL} Framework:" + ) + overview_table = [ + [ + f"{Fore.RED}{round(fail_count / (fail_count + pass_count) * 100, 2)}% ({fail_count}) FAIL{Style.RESET_ALL}", + f"{Fore.GREEN}{round(pass_count / (fail_count + pass_count) * 100, 2)}% ({pass_count}) PASS{Style.RESET_ALL}", + ] + ] + print(tabulate(overview_table, tablefmt="rounded_grid")) + if not compliance_overview: + print( + f"\nFramework {Fore.YELLOW}{compliance_fm}_{compliance_version}{Style.RESET_ALL} Results:" + ) + print( + tabulate( + cis_compliance_table, + headers="keys", + tablefmt="rounded_grid", + ) + ) + print( + f"{Style.BRIGHT}* Only sections containing results appear.{Style.RESET_ALL}" + ) + print(f"\nDetailed results of {compliance_fm} are in:") + print( + f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework}.csv\n" + ) + elif "mitre_attack" in compliance_framework: + tactics = {} + mitre_compliance_table = { + "Provider": [], + "Tactic": [], + "Status": [], + } + pass_count = fail_count = 0 + for finding in findings: + check = bulk_checks_metadata[finding.check_metadata.CheckID] + check_compliances = check.Compliance + for compliance in check_compliances: + if ( + "MITRE-ATTACK" in compliance.Framework + and compliance.Version in compliance_framework + ): + compliance_fm = compliance.Framework + for requirement in compliance.Requirements: + for tactic in requirement.Tactics: + if tactic not in tactics: + tactics[tactic] = {"FAIL": 0, "PASS": 0} + if finding.status == "FAIL": + fail_count += 1 + tactics[tactic]["FAIL"] += 1 + elif finding.status == "PASS": + pass_count += 1 + tactics[tactic]["PASS"] += 1 + + # Add results to table + tactics = dict(sorted(tactics.items())) + for tactic in tactics: + mitre_compliance_table["Provider"].append(compliance.Provider) + mitre_compliance_table["Tactic"].append(tactic) + if tactics[tactic]["FAIL"] > 0: + mitre_compliance_table["Status"].append( + f"{Fore.RED}FAIL({tactics[tactic]['FAIL']}){Style.RESET_ALL}" + ) + else: + mitre_compliance_table["Status"].append( + f"{Fore.GREEN}PASS({tactics[tactic]['PASS']}){Style.RESET_ALL}" + ) + if fail_count + pass_count < 1: + print( + f"\n {Style.BRIGHT}There are no resources for {Fore.YELLOW}{compliance_fm}{Style.RESET_ALL}.\n" + ) + else: + print( + f"\nCompliance Status of {Fore.YELLOW}{compliance_fm}{Style.RESET_ALL} Framework:" + ) + overview_table = [ + [ + f"{Fore.RED}{round(fail_count / (fail_count + pass_count) * 100, 2)}% ({fail_count}) FAIL{Style.RESET_ALL}", + f"{Fore.GREEN}{round(pass_count / (fail_count + pass_count) * 100, 2)}% ({pass_count}) PASS{Style.RESET_ALL}", + ] + ] + print(tabulate(overview_table, tablefmt="rounded_grid")) + if not compliance_overview: + print( + f"\nFramework {Fore.YELLOW}{compliance_fm}{Style.RESET_ALL} Results:" + ) + print( + tabulate( + mitre_compliance_table, + headers="keys", + tablefmt="rounded_grid", + ) + ) + print( + f"{Style.BRIGHT}* Only sections containing results appear.{Style.RESET_ALL}" + ) + print(f"\nDetailed results of {compliance_fm} are in:") + print( + f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework}.csv\n" + ) + else: + pass_count = fail_count = 0 + for finding in findings: + check = bulk_checks_metadata[finding.check_metadata.CheckID] + check_compliances = check.Compliance + for compliance in check_compliances: + if ( + compliance.Framework.upper() + in compliance_framework.upper().replace("_", "-") + and compliance.Version in compliance_framework.upper() + and compliance.Provider in compliance_framework.upper() + ): + for requirement in compliance.Requirements: + for attribute in requirement.Attributes: + if finding.status == "FAIL": + fail_count += 1 + elif finding.status == "PASS": + pass_count += 1 + if fail_count + pass_count < 1: + print( + f"\n {Style.BRIGHT}There are no resources for {Fore.YELLOW}{compliance_framework.upper()}{Style.RESET_ALL}.\n" + ) + else: + print( + f"\nCompliance Status of {Fore.YELLOW}{compliance_framework.upper()}{Style.RESET_ALL} Framework:" + ) + overview_table = [ + [ + f"{Fore.RED}{round(fail_count / (fail_count + pass_count) * 100, 2)}% ({fail_count}) FAIL{Style.RESET_ALL}", + f"{Fore.GREEN}{round(pass_count / (fail_count + pass_count) * 100, 2)}% ({pass_count}) PASS{Style.RESET_ALL}", + ] + ] + print(tabulate(overview_table, tablefmt="rounded_grid")) + if not compliance_overview: + print(f"\nDetailed results of {compliance_framework.upper()} are in:") + print( + f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework}.csv\n" + ) + except Exception as error: + logger.critical( + f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}" + ) + sys.exit(1) diff --git a/prowler/lib/outputs/compliance/ens_rd2022_aws.py b/prowler/lib/outputs/compliance/ens_rd2022_aws.py new file mode 100644 index 00000000000..6b8759acf65 --- /dev/null +++ b/prowler/lib/outputs/compliance/ens_rd2022_aws.py @@ -0,0 +1,45 @@ +from csv import DictWriter + +from prowler.config.config import timestamp +from prowler.lib.outputs.models import Check_Output_CSV_ENS_RD2022, generate_csv_fields +from prowler.lib.utils.utils import outputs_unix_timestamp + + +def write_compliance_row_ens_rd2022_aws( + file_descriptors, finding, compliance, output_options, audit_info +): + compliance_output = "ens_rd2022_aws" + csv_header = generate_csv_fields(Check_Output_CSV_ENS_RD2022) + csv_writer = DictWriter( + file_descriptors[compliance_output], + fieldnames=csv_header, + delimiter=";", + ) + for requirement in compliance.Requirements: + requirement_description = requirement.Description + requirement_id = requirement.Id + for attribute in requirement.Attributes: + compliance_row = Check_Output_CSV_ENS_RD2022( + Provider=finding.check_metadata.Provider, + Description=compliance.Description, + AccountId=audit_info.audited_account, + Region=finding.region, + AssessmentDate=outputs_unix_timestamp( + output_options.unix_timestamp, timestamp + ), + Requirements_Id=requirement_id, + Requirements_Description=requirement_description, + Requirements_Attributes_IdGrupoControl=attribute.IdGrupoControl, + Requirements_Attributes_Marco=attribute.Marco, + Requirements_Attributes_Categoria=attribute.Categoria, + Requirements_Attributes_DescripcionControl=attribute.DescripcionControl, + Requirements_Attributes_Nivel=attribute.Nivel, + Requirements_Attributes_Tipo=attribute.Tipo, + Requirements_Attributes_Dimensiones=",".join(attribute.Dimensiones), + Status=finding.status, + StatusExtended=finding.status_extended, + ResourceId=finding.resource_id, + CheckId=finding.check_metadata.CheckID, + ) + + csv_writer.writerow(compliance_row.__dict__) diff --git a/prowler/lib/outputs/compliance/generic.py b/prowler/lib/outputs/compliance/generic.py new file mode 100644 index 00000000000..f71621aa824 --- /dev/null +++ b/prowler/lib/outputs/compliance/generic.py @@ -0,0 +1,51 @@ +from csv import DictWriter + +from prowler.config.config import timestamp +from prowler.lib.outputs.models import ( + Check_Output_CSV_Generic_Compliance, + generate_csv_fields, +) +from prowler.lib.utils.utils import outputs_unix_timestamp + + +def write_compliance_row_generic( + file_descriptors, finding, compliance, output_options, audit_info +): + compliance_output = compliance.Framework + if compliance.Version != "": + compliance_output += "_" + compliance.Version + if compliance.Provider != "": + compliance_output += "_" + compliance.Provider + + compliance_output = compliance_output.lower().replace("-", "_") + csv_header = generate_csv_fields(Check_Output_CSV_Generic_Compliance) + csv_writer = DictWriter( + file_descriptors[compliance_output], + fieldnames=csv_header, + delimiter=";", + ) + for requirement in compliance.Requirements: + requirement_description = requirement.Description + requirement_id = requirement.Id + for attribute in requirement.Attributes: + compliance_row = Check_Output_CSV_Generic_Compliance( + Provider=finding.check_metadata.Provider, + Description=compliance.Description, + AccountId=audit_info.audited_account, + Region=finding.region, + AssessmentDate=outputs_unix_timestamp( + output_options.unix_timestamp, timestamp + ), + Requirements_Id=requirement_id, + Requirements_Description=requirement_description, + Requirements_Attributes_Section=attribute.Section, + Requirements_Attributes_SubSection=attribute.SubSection, + Requirements_Attributes_SubGroup=attribute.SubGroup, + Requirements_Attributes_Service=attribute.Service, + Requirements_Attributes_Soc_Type=attribute.Soc_Type, + Status=finding.status, + StatusExtended=finding.status_extended, + ResourceId=finding.resource_id, + CheckId=finding.check_metadata.CheckID, + ) + csv_writer.writerow(compliance_row.__dict__) diff --git a/prowler/lib/outputs/compliance/iso27001_2013_aws.py b/prowler/lib/outputs/compliance/iso27001_2013_aws.py new file mode 100644 index 00000000000..8b6f7c70301 --- /dev/null +++ b/prowler/lib/outputs/compliance/iso27001_2013_aws.py @@ -0,0 +1,53 @@ +from csv import DictWriter + +from prowler.config.config import timestamp +from prowler.lib.outputs.models import ( + Check_Output_CSV_AWS_ISO27001_2013, + generate_csv_fields, +) +from prowler.lib.utils.utils import outputs_unix_timestamp + + +def write_compliance_row_iso27001_2013_aws( + file_descriptors, finding, compliance, output_options, audit_info +): + compliance_output = compliance.Framework + if compliance.Version != "": + compliance_output += "_" + compliance.Version + if compliance.Provider != "": + compliance_output += "_" + compliance.Provider + + compliance_output = compliance_output.lower().replace("-", "_") + csv_header = generate_csv_fields(Check_Output_CSV_AWS_ISO27001_2013) + csv_writer = DictWriter( + file_descriptors[compliance_output], + fieldnames=csv_header, + delimiter=";", + ) + for requirement in compliance.Requirements: + requirement_description = requirement.Description + requirement_id = requirement.Id + requirement_name = requirement.Name + for attribute in requirement.Attributes: + compliance_row = Check_Output_CSV_AWS_ISO27001_2013( + Provider=finding.check_metadata.Provider, + Description=compliance.Description, + AccountId=audit_info.audited_account, + Region=finding.region, + AssessmentDate=outputs_unix_timestamp( + output_options.unix_timestamp, timestamp + ), + Requirements_Id=requirement_id, + Requirements_Name=requirement_name, + Requirements_Description=requirement_description, + Requirements_Attributes_Category=attribute.Category, + Requirements_Attributes_Objetive_ID=attribute.Objetive_ID, + Requirements_Attributes_Objetive_Name=attribute.Objetive_Name, + Requirements_Attributes_Check_Summary=attribute.Check_Summary, + Status=finding.status, + StatusExtended=finding.status_extended, + ResourceId=finding.resource_id, + CheckId=finding.check_metadata.CheckID, + ) + + csv_writer.writerow(compliance_row.__dict__) diff --git a/prowler/lib/outputs/compliance/mitre_attack_aws.py b/prowler/lib/outputs/compliance/mitre_attack_aws.py new file mode 100644 index 00000000000..cfffa62f7b4 --- /dev/null +++ b/prowler/lib/outputs/compliance/mitre_attack_aws.py @@ -0,0 +1,66 @@ +from csv import DictWriter + +from prowler.config.config import timestamp +from prowler.lib.outputs.models import ( + Check_Output_MITRE_ATTACK, + generate_csv_fields, + unroll_list, +) +from prowler.lib.utils.utils import outputs_unix_timestamp + + +def write_compliance_row_mitre_attack_aws( + file_descriptors, finding, compliance, output_options, audit_info +): + compliance_output = compliance.Framework + if compliance.Version != "": + compliance_output += "_" + compliance.Version + if compliance.Provider != "": + compliance_output += "_" + compliance.Provider + + compliance_output = compliance_output.lower().replace("-", "_") + csv_header = generate_csv_fields(Check_Output_MITRE_ATTACK) + csv_writer = DictWriter( + file_descriptors[compliance_output], + fieldnames=csv_header, + delimiter=";", + ) + for requirement in compliance.Requirements: + requirement_description = requirement.Description + requirement_id = requirement.Id + requirement_name = requirement.Name + attributes_aws_services = "" + attributes_categories = "" + attributes_values = "" + attributes_comments = "" + for attribute in requirement.Attributes: + attributes_aws_services += attribute.AWSService + "\n" + attributes_categories += attribute.Category + "\n" + attributes_values += attribute.Value + "\n" + attributes_comments += attribute.Comment + "\n" + compliance_row = Check_Output_MITRE_ATTACK( + Provider=finding.check_metadata.Provider, + Description=compliance.Description, + AccountId=audit_info.audited_account, + Region=finding.region, + AssessmentDate=outputs_unix_timestamp( + output_options.unix_timestamp, timestamp + ), + Requirements_Id=requirement_id, + Requirements_Description=requirement_description, + Requirements_Name=requirement_name, + Requirements_Tactics=unroll_list(requirement.Tactics), + Requirements_SubTechniques=unroll_list(requirement.SubTechniques), + Requirements_Platforms=unroll_list(requirement.Platforms), + Requirements_TechniqueURL=requirement.TechniqueURL, + Requirements_Attributes_AWSServices=attributes_aws_services, + Requirements_Attributes_Categories=attributes_categories, + Requirements_Attributes_Values=attributes_values, + Requirements_Attributes_Comments=attributes_comments, + Status=finding.status, + StatusExtended=finding.status_extended, + ResourceId=finding.resource_id, + CheckId=finding.check_metadata.CheckID, + ) + + csv_writer.writerow(compliance_row.__dict__) diff --git a/prowler/lib/outputs/csv.py b/prowler/lib/outputs/csv.py new file mode 100644 index 00000000000..c3ebfd7e33c --- /dev/null +++ b/prowler/lib/outputs/csv.py @@ -0,0 +1,10 @@ +from csv import DictWriter + + +def write_csv(file_descriptor, headers, row): + csv_writer = DictWriter( + file_descriptor, + fieldnames=headers, + delimiter=";", + ) + csv_writer.writerow(row.__dict__) diff --git a/prowler/lib/outputs/file_descriptors.py b/prowler/lib/outputs/file_descriptors.py index 9b5def4d224..1e3a0d0b9c9 100644 --- a/prowler/lib/outputs/file_descriptors.py +++ b/prowler/lib/outputs/file_descriptors.py @@ -23,6 +23,7 @@ ) from prowler.lib.utils.utils import file_exists, open_file from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info +from prowler.providers.azure.lib.audit_info.models import Azure_Audit_Info from prowler.providers.common.outputs import get_provider_output_model from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info @@ -108,7 +109,7 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit elif isinstance(audit_info, GCP_Audit_Info): if output_mode == "cis_2.0_gcp": - filename = f"{output_directory}/{output_filename}_cis_2.0_gcp{csv_file_suffix}" + filename = f"{output_directory}/compliance/{output_filename}_cis_2.0_gcp{csv_file_suffix}" file_descriptor = initialize_file_descriptor( filename, output_mode, audit_info, Check_Output_CSV_GCP_CIS ) @@ -123,7 +124,7 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit file_descriptors.update({output_mode: file_descriptor}) elif output_mode == "ens_rd2022_aws": - filename = f"{output_directory}/{output_filename}_ens_rd2022_aws{csv_file_suffix}" + filename = f"{output_directory}/compliance/{output_filename}_ens_rd2022_aws{csv_file_suffix}" file_descriptor = initialize_file_descriptor( filename, output_mode, @@ -133,14 +134,14 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit file_descriptors.update({output_mode: file_descriptor}) elif output_mode == "cis_1.5_aws": - filename = f"{output_directory}/{output_filename}_cis_1.5_aws{csv_file_suffix}" + filename = f"{output_directory}/compliance/{output_filename}_cis_1.5_aws{csv_file_suffix}" file_descriptor = initialize_file_descriptor( filename, output_mode, audit_info, Check_Output_CSV_AWS_CIS ) file_descriptors.update({output_mode: file_descriptor}) elif output_mode == "cis_1.4_aws": - filename = f"{output_directory}/{output_filename}_cis_1.4_aws{csv_file_suffix}" + filename = f"{output_directory}/compliance/{output_filename}_cis_1.4_aws{csv_file_suffix}" file_descriptor = initialize_file_descriptor( filename, output_mode, audit_info, Check_Output_CSV_AWS_CIS ) @@ -150,7 +151,7 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit output_mode == "aws_well_architected_framework_security_pillar_aws" ): - filename = f"{output_directory}/{output_filename}_aws_well_architected_framework_security_pillar_aws{csv_file_suffix}" + filename = f"{output_directory}/compliance/{output_filename}_aws_well_architected_framework_security_pillar_aws{csv_file_suffix}" file_descriptor = initialize_file_descriptor( filename, output_mode, @@ -163,7 +164,7 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit output_mode == "aws_well_architected_framework_reliability_pillar_aws" ): - filename = f"{output_directory}/{output_filename}_aws_well_architected_framework_reliability_pillar_aws{csv_file_suffix}" + filename = f"{output_directory}/compliance/{output_filename}_aws_well_architected_framework_reliability_pillar_aws{csv_file_suffix}" file_descriptor = initialize_file_descriptor( filename, output_mode, @@ -173,7 +174,7 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit file_descriptors.update({output_mode: file_descriptor}) elif output_mode == "iso27001_2013_aws": - filename = f"{output_directory}/{output_filename}_iso27001_2013_aws{csv_file_suffix}" + filename = f"{output_directory}/compliance/{output_filename}_iso27001_2013_aws{csv_file_suffix}" file_descriptor = initialize_file_descriptor( filename, output_mode, @@ -183,7 +184,7 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit file_descriptors.update({output_mode: file_descriptor}) elif output_mode == "mitre_attack_aws": - filename = f"{output_directory}/{output_filename}_mitre_attack_aws{csv_file_suffix}" + filename = f"{output_directory}/compliance/{output_filename}_mitre_attack_aws{csv_file_suffix}" file_descriptor = initialize_file_descriptor( filename, output_mode, @@ -194,14 +195,26 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit else: # Generic Compliance framework - filename = f"{output_directory}/{output_filename}_{output_mode}{csv_file_suffix}" - file_descriptor = initialize_file_descriptor( - filename, - output_mode, - audit_info, - Check_Output_CSV_Generic_Compliance, - ) - file_descriptors.update({output_mode: file_descriptor}) + if ( + isinstance(audit_info, AWS_Audit_Info) + and "aws" in output_mode + or ( + isinstance(audit_info, Azure_Audit_Info) + and "azure" in output_mode + ) + or ( + isinstance(audit_info, GCP_Audit_Info) + and "gcp" in output_mode + ) + ): + filename = f"{output_directory}/compliance/{output_filename}_{output_mode}{csv_file_suffix}" + file_descriptor = initialize_file_descriptor( + filename, + output_mode, + audit_info, + Check_Output_CSV_Generic_Compliance, + ) + file_descriptors.update({output_mode: file_descriptor}) except Exception as error: logger.error( diff --git a/prowler/lib/outputs/models.py b/prowler/lib/outputs/models.py index 09f42b6e123..faddd07f548 100644 --- a/prowler/lib/outputs/models.py +++ b/prowler/lib/outputs/models.py @@ -13,7 +13,16 @@ from prowler.providers.aws.lib.audit_info.models import AWS_Organizations_Info -def get_check_compliance(finding, provider, output_options): +def get_check_compliance(finding, provider, output_options) -> dict: + """get_check_compliance returns a map with the compliance framework as key and the requirements where the finding's check is present. + + Example: + + { + "CIS-1.4": ["2.1.3"], + "CIS-1.5": ["2.1.3"], + } + """ try: check_compliance = {} # We have to retrieve all the check's compliance requirements diff --git a/prowler/lib/outputs/outputs.py b/prowler/lib/outputs/outputs.py index 288a58152cb..f0666516aa9 100644 --- a/prowler/lib/outputs/outputs.py +++ b/prowler/lib/outputs/outputs.py @@ -4,7 +4,10 @@ from prowler.config.config import available_compliance_frameworks, orange_color from prowler.lib.logger import logger -from prowler.lib.outputs.compliance import add_manual_controls, fill_compliance +from prowler.lib.outputs.compliance.compliance import ( + add_manual_controls, + fill_compliance, +) from prowler.lib.outputs.file_descriptors import fill_file_descriptors from prowler.lib.outputs.html import fill_html from prowler.lib.outputs.json import fill_json_asff, fill_json_ocsf @@ -63,22 +66,26 @@ def report(check_findings, output_options, audit_info): if file_descriptors: # Check if --quiet to only add fails to outputs if not (finding.status != "FAIL" and output_options.is_quiet): - if any( - compliance in output_options.output_modes - for compliance in available_compliance_frameworks - ): - fill_compliance( - output_options, - finding, - audit_info, - file_descriptors, - ) - - add_manual_controls( - output_options, - audit_info, - file_descriptors, + input_compliance_frameworks = list( + set(output_options.output_modes).intersection( + available_compliance_frameworks ) + ) + + fill_compliance( + output_options, + finding, + audit_info, + file_descriptors, + input_compliance_frameworks, + ) + + add_manual_controls( + output_options, + audit_info, + file_descriptors, + input_compliance_frameworks, + ) # AWS specific outputs if finding.check_metadata.Provider == "aws": diff --git a/prowler/providers/common/outputs.py b/prowler/providers/common/outputs.py index 3716df30cce..adcddbcbeeb 100644 --- a/prowler/providers/common/outputs.py +++ b/prowler/providers/common/outputs.py @@ -70,6 +70,9 @@ def __init__(self, arguments, mutelist_file, bulk_checks_metadata): if not isdir(arguments.output_directory): if arguments.output_modes: makedirs(arguments.output_directory) + if not isdir(arguments.output_directory + "/compliance"): + if arguments.output_modes: + makedirs(arguments.output_directory + "/compliance") class Azure_Output_Options(Provider_Output_Options): diff --git a/tests/lib/outputs/compliance/compliance_test.py b/tests/lib/outputs/compliance/compliance_test.py new file mode 100644 index 00000000000..6be964a8fa9 --- /dev/null +++ b/tests/lib/outputs/compliance/compliance_test.py @@ -0,0 +1,111 @@ +from mock import MagicMock + +from prowler.lib.check.compliance_models import ( + CIS_Requirement_Attribute, + Compliance_Base_Model, + Compliance_Requirement, +) +from prowler.lib.outputs.compliance.compliance import ( + get_check_compliance_frameworks_in_input, +) + +CIS_1_4_AWS_NAME = "cis_1.4_aws" +CIS_1_4_AWS = Compliance_Base_Model( + Framework="CIS", + Provider="AWS", + Version="1.4", + Description="The CIS Benchmark for CIS Amazon Web Services Foundations Benchmark, v1.4.0, Level 1 and 2 provides prescriptive guidance for configuring security options for a subset of Amazon Web Services. It has an emphasis on foundational, testable, and architecture agnostic settings", + Requirements=[ + Compliance_Requirement( + Checks=[], + Id="2.1.3", + Description="Ensure MFA Delete is enabled on S3 buckets", + Attributes=[ + CIS_Requirement_Attribute( + Section="2.1. Simple Storage Service (S3)", + Profile="Level 1", + AssessmentStatus="Automated", + Description="Once MFA Delete is enabled on your sensitive and classified S3 bucket it requires the user to have two forms of authentication.", + RationaleStatement="Adding MFA delete to an S3 bucket, requires additional authentication when you change the version state of your bucket or you delete and object version adding another layer of security in the event your security credentials are compromised or unauthorized access is granted.", + ImpactStatement="", + RemediationProcedure="Perform the steps below to enable MFA delete on an S3 bucket.\n\nNote:\n-You cannot enable MFA Delete using the AWS Management Console. You must use the AWS CLI or API.\n-You must use your 'root' account to enable MFA Delete on S3 buckets.\n\n**From Command line:**\n\n1. Run the s3api put-bucket-versioning command\n\n```\naws s3api put-bucket-versioning --profile my-root-profile --bucket Bucket_Name --versioning-configuration Status=Enabled,MFADelete=Enabled --mfa “arn:aws:iam::aws_account_id:mfa/root-account-mfa-device passcode”\n```", + AuditProcedure='Perform the steps below to confirm MFA delete is configured on an S3 Bucket\n\n**From Console:**\n\n1. Login to the S3 console at `https://console.aws.amazon.com/s3/`\n\n2. Click the `Check` box next to the Bucket name you want to confirm\n\n3. In the window under `Properties`\n\n4. Confirm that Versioning is `Enabled`\n\n5. Confirm that MFA Delete is `Enabled`\n\n**From Command Line:**\n\n1. Run the `get-bucket-versioning`\n```\naws s3api get-bucket-versioning --bucket my-bucket\n```\n\nOutput example:\n```\n \n Enabled\n Enabled \n\n```\n\nIf the Console or the CLI output does not show Versioning and MFA Delete `enabled` refer to the remediation below.', + AdditionalInformation="", + References="https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html#MultiFactorAuthenticationDelete:https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMFADelete.html:https://aws.amazon.com/blogs/security/securing-access-to-aws-using-mfa-part-3/:https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_mfa_lost-or-broken.html", + ) + ], + ) + ], +) +CIS_1_5_AWS_NAME = "cis_1.5_aws" +CIS_1_5_AWS = Compliance_Base_Model( + Framework="CIS", + Provider="AWS", + Version="1.5", + Description="The CIS Amazon Web Services Foundations Benchmark provides prescriptive guidance for configuring security options for a subset of Amazon Web Services with an emphasis on foundational, testable, and architecture agnostic settings.", + Requirements=[ + Compliance_Requirement( + Checks=[], + Id="2.1.3", + Description="Ensure MFA Delete is enabled on S3 buckets", + Attributes=[ + CIS_Requirement_Attribute( + Section="2.1. Simple Storage Service (S3)", + Profile="Level 1", + AssessmentStatus="Automated", + Description="Once MFA Delete is enabled on your sensitive and classified S3 bucket it requires the user to have two forms of authentication.", + RationaleStatement="Adding MFA delete to an S3 bucket, requires additional authentication when you change the version state of your bucket or you delete and object version adding another layer of security in the event your security credentials are compromised or unauthorized access is granted.", + ImpactStatement="", + RemediationProcedure="Perform the steps below to enable MFA delete on an S3 bucket.\n\nNote:\n-You cannot enable MFA Delete using the AWS Management Console. You must use the AWS CLI or API.\n-You must use your 'root' account to enable MFA Delete on S3 buckets.\n\n**From Command line:**\n\n1. Run the s3api put-bucket-versioning command\n\n```\naws s3api put-bucket-versioning --profile my-root-profile --bucket Bucket_Name --versioning-configuration Status=Enabled,MFADelete=Enabled --mfa “arn:aws:iam::aws_account_id:mfa/root-account-mfa-device passcode”\n```", + AuditProcedure='Perform the steps below to confirm MFA delete is configured on an S3 Bucket\n\n**From Console:**\n\n1. Login to the S3 console at `https://console.aws.amazon.com/s3/`\n\n2. Click the `Check` box next to the Bucket name you want to confirm\n\n3. In the window under `Properties`\n\n4. Confirm that Versioning is `Enabled`\n\n5. Confirm that MFA Delete is `Enabled`\n\n**From Command Line:**\n\n1. Run the `get-bucket-versioning`\n```\naws s3api get-bucket-versioning --bucket my-bucket\n```\n\nOutput example:\n```\n \n Enabled\n Enabled \n\n```\n\nIf the Console or the CLI output does not show Versioning and MFA Delete `enabled` refer to the remediation below.', + AdditionalInformation="", + References="https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html#MultiFactorAuthenticationDelete:https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMFADelete.html:https://aws.amazon.com/blogs/security/securing-access-to-aws-using-mfa-part-3/:https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_mfa_lost-or-broken.html", + ) + ], + ) + ], +) + +NOT_PRESENT_COMPLIANCE_NAME = "not_present_compliance_name" +NOT_PRESENT_COMPLIANCE = Compliance_Base_Model( + Framework="NOT_EXISTENT", + Provider="NOT_EXISTENT", + Version="NOT_EXISTENT", + Description="NOT_EXISTENT", + Requirements=[], +) + + +class TestCompliance: + def test_get_check_compliance_frameworks_all_none(self): + check_id = None + bulk_checks_metadata = None + input_compliance_frameworks = None + assert ( + get_check_compliance_frameworks_in_input( + check_id, bulk_checks_metadata, input_compliance_frameworks + ) + == [] + ) + + def test_get_check_compliance_frameworks_all(self): + check_id = "test-check" + bulk_check_metadata = [CIS_1_4_AWS, CIS_1_5_AWS] + bulk_checks_metadata = {} + bulk_checks_metadata[check_id] = MagicMock() + bulk_checks_metadata[check_id].Compliance = bulk_check_metadata + input_compliance_frameworks = [CIS_1_4_AWS_NAME, CIS_1_5_AWS_NAME] + assert get_check_compliance_frameworks_in_input( + check_id, bulk_checks_metadata, input_compliance_frameworks + ) == [CIS_1_4_AWS, CIS_1_5_AWS] + + def test_get_check_compliance_frameworks_two_of_three(self): + check_id = "test-check" + bulk_check_metadata = [CIS_1_4_AWS, CIS_1_5_AWS, NOT_PRESENT_COMPLIANCE] + bulk_checks_metadata = {} + bulk_checks_metadata[check_id] = MagicMock() + bulk_checks_metadata[check_id].Compliance = bulk_check_metadata + input_compliance_frameworks = [CIS_1_4_AWS_NAME, CIS_1_5_AWS_NAME] + assert get_check_compliance_frameworks_in_input( + check_id, bulk_checks_metadata, input_compliance_frameworks + ) == [CIS_1_4_AWS, CIS_1_5_AWS]