# Run Scan

In [None]:
# widen the display to fit your screen
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

## Requirements

In [None]:
!pip install pyyaml requests

You can run it all from the commandline (in the docker container) with the following:
base_dir is where the output scans directory will go.  -t 500 is so that it doesn't timeout.
run_jnb -a '{"subscription_id": "510f92e0-xxxx-yyyy-zzzz-095d37e6a299", "base_dir": "/engagements/cis_test"}' -v  azure_cis_scanner.ipynb -t 500

to run across multiple subscription_id's do
for subscription in `cat /engagements/cis_test/scans/accounts.json | jq '.[].id'`; \
   do run_jnb -a '{"subscription_id": $subscription, "base_dir": "/engagements/cis_test"}' -v  azure_cis_scanner.ipynb -t 500; \
   done

For more on run_jnb see https://github.com/hz-inova/run_jnb

In [None]:
import yaml

In [None]:
# Verify that you have azure cli installed
!az

if not, follow the instructions to install
https://docs.microsoft.com/en-us/cli/azure/install-azure-cli?view=azure-cli-latest

Check that yor are connected to the correct account

In [None]:
!az account list

In [None]:
# login to azure.  This may require doing from the laptop/vm commandline or exec into the container

In [None]:
# Fill your subscription_id here or run from the commandline
subscription_id = ''

In [None]:
!az account set --subscription {subscription_id}

In [None]:
account = !az account show
account = yaml.load(account.nlstr)
account

In [None]:
subscription_id = account['id']
subscription_name = account['name']
subscription_dirname = subscription_name.split(' ')[0] + '-' + subscription_id.split('-')[0]
subscription_dirname

In [None]:
base_dir = '/engagements/cis_test/'
scanner_dir = '/praetorian-tools/azure_cis_scanner'

You can write to disk or load as needed with %%writefile or %load

In [None]:
#%%writefile /praetorian-tools/azure_cis_scanner/scanner/utils.py
# %load /praetorian-tools/azure_cis_scanner/scanner/utils.py
import datetime
import os
import subprocess
import sys
import re
import functools
import json
import requests

token_expiry = None
access_token = None
filtered_data_dir = ''
scan_data_dir = ''
raw_data_dir = ''

def set_data_paths(subscription_dirname, base_dir='.'):
    """
    Given a base_dir, create subdirs scans/{day}/raw
                                          /filtered
    @returns: scan_data_dir, raw_data_dir
    """
    # Get day in YYYY-MM-DD format

    day = datetime.datetime.now().strftime('%Y-%m-%d')

    scan_data_dir = os.path.join(base_dir, 'scans', subscription_dirname, day)
    print("scan_data_dir", scan_data_dir)
    raw_data_dir = scan_data_dir + '/raw'
    print("raw_data_dir", raw_data_dir)
    if not os.path.exists(raw_data_dir):
        os.makedirs(raw_data_dir)
    filtered_data_dir = scan_data_dir + '/filtered'
    print("filtered_data_dir", filtered_data_dir)
    if not os.path.exists(filtered_data_dir):
        os.makedirs(filtered_data_dir)
    return scan_data_dir, raw_data_dir, filtered_data_dir   

def call(command, retrieving_access_token=False):
    if not valid_token() and not retrieving_access_token:
        get_access_token()
    if(isinstance(command, str)) :
        command = command.split()           # subprocess needs an array of arguments
    try :
        print('running: ', command)
        return subprocess.check_output(command, shell=False, stderr=subprocess.STDOUT).decode('utf-8')
    except:
        print("An exception occurred while processing command " + str(command) + " Halting execution!")
        sys.exit()

def verify_subscription_id_format(subscriptionId) :
    r = re.compile("([a-f]|[0-9]){8}-([a-f]|[0-9]){4}-([a-f]|[0-9]){4}-([a-f]|[0-9]){4}-([a-f]|[0-9]){12}")
    if r.match(subscriptionId):
        return True
    else :
        return False

def valid_token():
     if (not token_expiry) or (datetime.datetime.utcnow() > token_expiry):
        return False
     else:
        return True
    
def get_subscription_id() :
    current_context = jsonify(call("az account show"))
    return current_context["id"]

def get_access_token():
    global token_expiry, access_token
    if not valid_token():
        complete_token = call("az account get-access-token", retrieving_access_token=True)
        complete_token = jsonify(complete_token)
        access_token = complete_token["accessToken"]
        token_expiry = complete_token["expiresOn"]
        print(token_expiry)
        token_expiry = datetime.datetime.strptime(token_expiry, '%Y-%m-%d %H:%M:%S.%f')
    return access_token

def make_request(url, args=[]):
    print('requesting ', url)
    authorization_headers = {"Authorization" : "Bearer " + get_access_token()}
    r = requests.get(url, headers=authorization_headers)
    return r.text

def jsonify(jsonString) :
    return json.loads(jsonString)

def stringify(jsonObject) :
    return json.dumps(jsonObject)


In [None]:
get_access_token()

Run set_data_paths to create {base_dir}/phases/azure/scans/{date}/raw json files for raw output of commands
Run the scripts for the selection criteria that determine failure to produce a filtered subset of the above writing to /phases/azure/scans/{date}/granular_findings_json for the json data of a finding
Run a script to generate the grouped findings at least partially


In [None]:
print(token_expiry)
print(datetime.datetime.utcnow())
print(datetime.datetime.utcnow() > token_expiry)
valid_token()

In [None]:
# We are running this notebook from a container with ~/engagements mounted into /engagements in the container.

In [None]:
scan_data_dir, raw_data_dir, filtered_data_dir = set_data_paths(subscription_dirname, base_dir=base_dir)

In [None]:
# Over-ride dates if necessary
# scan_data_dir = '/engagements/cis_test/scans/2018-05-26'
# raw_data_dir = '/engagements/cis_test/scans/2018-05-26/raw'
# filtered_data_dir = '/engagements/cis_test/scans/2018-05-26/filtered'

### Security Center

An error like the following indicates you have not logged in correctly
ScannerError: while scanning a simple key
  in "<unicode string>", line 6, column 1:
    100    97  100    97    0     0  ... 
    ^
could not find expected ':'
  in "<unicode string>", line 7, column 1:
    {"error":{"code":"SubscriptionNo ...

In [None]:
accounts_path = os.path.join(base_dir, 'scans', 'accounts.json')

def get_accounts(accounts_path):
    """
    @accounts_path: string - path to output json file
    """
    accounts = !az account list
    accounts = yaml.safe_load(accounts.nlstr)
    with open(accounts_path, 'w') as f:
        json.dump(accounts, f, indent=4, sort_keys=True)
    return accounts

def load_accounts(accounts_path):
    with open(accounts_path, 'r') as f:
        accounts = yaml.safe_load(f)
    return accounts

def get_resource_groups(resource_groups_path):
    """
    @network_path: string - path to output json file
    """
    resource_groups = !az group list
    resource_groups = yaml.safe_load(resource_groups.nlstr)
    with open(resource_groups_path, 'w') as f:
        json.dump(resource_groups, f, indent=4, sort_keys=True)
    return resource_groups

def load_resource_groups(resource_groups_path):
    with open(resource_groups_path, 'r') as f:
        resource_groups = yaml.safe_load(f)
    return resource_groups

In [None]:
print(accounts_path)
get_accounts(accounts_path)
resource_groups_path = os.path.join(raw_data_dir, "resource_groups.json")
get_resource_groups(resource_groups_path)


In [None]:
# %%writefile scanner_dir + '/scanner/security_center.py'
# %load scanner_dir + '/scanner/security_center.py'
import yaml
import os

security_center_path = os.path.join(raw_data_dir, "security_center.json")
security_center_filtered_path = os.path.join(filtered_data_dir, 'security_center_filtered.json')

def get_security_center(security_center_path):
    """
    Query Azure api for storage accounts info and save to disk
    """
    output = !az account get-access-token --query "{subscripton:subscription,accessToken:accessToken}" --out tsv
    print(output.nlstr.split())
    subscription_id, token = output.nlstr.split()
    security_center = !curl -X GET -H "Authorization: Bearer {token}" -H "Content-Type: application/json" https://management.azure.com/subscriptions/{subscription_id}/providers/microsoft.Security/policies?api-version=2015-06-01-preview 2>/dev/null
    security_center = yaml.load(security_center.nlstr)
    security_center = security_center['value']
    print(security_center)
        
    with open(security_center_path, 'w') as f:
        yaml.dump(security_center, f)
    return security_center

def load_security_center(security_center_filtered_path):
    with open(security_center_path, 'r') as f:
        security_center = yaml.load(f)
    return security_center

def get_data():
    """
    Generate json for the security_center findings
    """
    get_security_center(security_center_path)

def test_controls():
    """
    Generate filtered (failing) output in json
    """
    security_center = load_security_center(security_center_path)
    security_center_results = {}

    security_center_results['automatic_provisioning_of_monitoring_agent_is_set_to_on'] = automatic_provisioning_of_monitoring_agent_is_set_to_on_2_2(security_center)
    security_center_results['system_updates_is_set_to_on'] = system_updates_is_set_to_on_2_3(security_center)
    security_center_results['security_configurations_is_set_to_on'] = security_configurations_is_set_to_on_2_4(security_center)
    security_center_results['endpoint_protection_is_set_to_on'] = endpoint_protection_is_set_to_on_2_5(security_center)
    security_center_results['disk_encryption_is_set_to_on'] = disk_encryption_is_set_to_on_2_6(security_center)
    security_center_results['network_security_groups_is_set_to_on'] = network_security_groups_is_set_to_on_2_7(security_center)
    security_center_results['web_application_firewall_is_set_to_on'] = web_application_firewall_is_set_to_on_2_8(security_center)
    security_center_results['next_generation_firewall_is_set_to_on'] = next_generation_firewall_is_set_to_on_2_9(security_center)
    security_center_results['vulnerability_assessment_is_set_to_on'] = vulnerability_assessment_is_set_to_on_2_10(security_center)
    security_center_results['storage_encryption_is_set_to_on'] = storage_encryption_is_set_to_on_2_11(security_center)
    security_center_results['just_in_time_access_is_set_to_on'] = just_in_time_access_is_set_to_on_2_12(security_center)
    security_center_results['adaptive_application_controls_is_set_to_on'] = adaptive_application_controls_is_set_to_on_2_13(security_center)
    security_center_results['sql_auditing_and_threat_detection_is_set_to_on'] = sql_auditing_and_threat_detection_is_set_to_on_2_14(security_center)
    security_center_results['sql_encryption_is_set_to_on'] = sql_encryption_is_set_to_on_2_15(security_center)
    security_center_results['security_contact_emails_is_set'] = security_contact_emails_is_set_2_16(security_center)
    security_center_results['security_contact_phone_number_is_set'] = security_contact_phone_number_is_set_2_17(security_center)
    security_center_results['send_me_emails_about_alerts_is_set_to_on'] = send_me_emails_about_alerts_is_set_to_on_2_18(security_center)
    security_center_results['send_email_also_to_subscription_owners_is_set_to_on'] = send_email_also_to_subscription_owners_is_set_to_on_2_19(security_center)
                
    with open(security_center_filtered_path, 'w') as f:
        json.dump(security_center_results, f, indent=4, sort_keys=True)
    return security_center_results

def automatic_provisioning_of_monitoring_agent_is_set_to_on_2_2(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        automatic_provisioning_of_monitoring_agent = item['properties']['logCollection']
        if automatic_provisioning_of_monitoring_agent != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "automatic_provisioning_of_monitoring_agent_is_set_to_on",
                "negative_name": "automatic_provisioning_of_monitoring_agent_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}
    
def system_updates_is_set_to_on_2_3(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        system_updates = item['properties']['recommendations']['patch']
        if system_updates != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "system_updates_is_set_to_on",
                "negative_name": "system_updates_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}    

def security_configurations_is_set_to_on_2_4(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        security_configurations = item['properties']['recommendations']['baseline']
        if security_configurations != "On":
            items_flagged_list.append((resource_group))
            
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "security_configurations_is_set_to_on",
                "negative_name": "security_configurations_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata} 

def endpoint_protection_is_set_to_on_2_5(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        endpoint_protection = item['properties']['recommendations']['antimalware']
        if endpoint_protection != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "endpoint_protection_is_set_to_on",
                "negative_name": "endpoint_protection_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def disk_encryption_is_set_to_on_2_6(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        disk_encryption = item['properties']['recommendations']['diskEncryption']
        if disk_encryption != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "disk_encryption_is_set_to_on",
                "negative_name": "disk_encryption_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def network_security_groups_is_set_to_on_2_7(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        nsgs = item['properties']['recommendations']['nsgs']
        if nsgs != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "network_security_groups_is_set_to_on",
                "negative_name": "network_security_groups_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def web_application_firewall_is_set_to_on_2_8(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        waf = item['properties']['recommendations']['waf']
        if waf != "On":
            items_flagged_list.append((resource_group))
    
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "web_application_firewall_is_set_to_on",
                "negative_name": "web_application_firewall_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def next_generation_firewall_is_set_to_on_2_9(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        ngfw = item['properties']['recommendations']['ngfw']
        if ngfw != "On":
            items_flagged_list.append((resource_group))
    
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "automatic_provisioning_of_monitoring_agent_is_set_to_on",
                "negative_name": "automatic_provisioning_of_monitoring_agent_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def vulnerability_assessment_is_set_to_on_2_10(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        vulnerability_assessment = item['properties']['recommendations']['vulnerabilityAssessment']
        if vulnerability_assessment != "On":
            items_flagged_list.append((resource_group))
            
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "automatic_provisioning_of_monitoring_agent_is_set_to_on",
                "negative_name": "automatic_provisioning_of_monitoring_agent_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def storage_encryption_is_set_to_on_2_11(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        storage_encryption = item['properties']['recommendations']['storageEncryption']
        if storage_encryption != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "storage_encryption_is_set_to_on",
                "negative_name": "storage_encryption_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def just_in_time_access_is_set_to_on_2_12(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        jit = item['properties']['recommendations']['jitNetworkAccess']
        if jit != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "just_in_time_access_is_set_to_on",
                "negative_name": "just_in_time_access_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def adaptive_application_controls_is_set_to_on_2_13(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        security_configurations = item['properties']['recommendations']['appWhitelisting']
        if security_configurations != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "adaptive_application_controls_is_set_to_on",
                "negative_name": "adaptive_application_controls_noto_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def sql_auditing_and_threat_detection_is_set_to_on_2_14(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        sqlAuditing = item['properties']['recommendations']['sqlAuditing']
        if sqlAuditing != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "sql_auditing_and_threat_detection_is_set_to_on",
                "negative_name": "sql_auditing_and_threat_detection_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def sql_encryption_is_set_to_on_2_15(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        sql_tde = item['properties']['recommendations']['sqlTde']
        if sql_tde != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "sql_encryption_is_set_to_on",
                "negative_name": "sql_encryption_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def security_contact_emails_is_set_2_16(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        emails = item['properties']['securityContactConfiguration']['securityContactEmails']
        if not emails:
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "security_contact_emails_is_set",
                "negative_name": "security_contact_emails_not_set",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def security_contact_phone_number_is_set_2_17(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        phone = item['properties']['securityContactConfiguration']['securityContactPhone']
        if not phone:
            items_flagged_list.append((resource_group))
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "security_contact_phone_number_is_set",
                "negative_name": "security_contact_phone_number_not_set",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def send_me_emails_about_alerts_is_set_to_on_2_18(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        notifications = item['properties']['securityContactConfiguration']['areNotificationsOn']
        if notifications != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "send_email_alerts_about_alerts_is_set_to_on",
                "negative_name": "send_email_alerts_about_alerts_not_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def send_email_also_to_subscription_owners_is_set_to_on_2_19(security_center):
    items_flagged_list = []
    for item in security_center:
        resource_group = item['name']
        send_admin = item['properties']['securityContactConfiguration']['sendToAdminOn']
        if send_admin != "On":
            items_flagged_list.append((resource_group))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(security_center)}
    metadata = {"finding_name": "send_email_also_to_subscription_owners_is_set_to_on",
                "negative_name": "send_email_also_to_subscription_owners_is_set_to_on",
                "columns": ["Resource Group"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

In [None]:
print(security_center_filtered_path)
get_data()
security_center = load_security_center(security_center_path)
security_center[0:2]

In [None]:
test_controls()

# Storage Account

In [None]:
activity_logs_path = os.path.join(raw_data_dir, 'activity_logs.json')
storage_accounts_path = os.path.join(raw_data_dir, 'storage_accounts.json')

def get_storage_accounts(storage_accounts_path):
    """
    Query Azure api for storage accounts info and save to disk
    """
    storage_accounts = !az storage account list
    storage_accounts = yaml.safe_load(storage_accounts.nlstr)
        
    with open(storage_accounts_path, 'w') as f:
        json.dump(storage_accounts, f, indent=4, sort_keys=True)
    return storage_accounts

def load_storage_accounts(storage_accounts_path):
    with open(storage_accounts_path, 'r') as f:
        storage_accounts = yaml.safe_load(f)
    return storage_accounts

activity_logs_starttime_timedelta = datetime.timedelta(days=90)
def get_start_time(timedelta=datetime.timedelta(days=90)):
    """
    Given datetime.timedelta(days=days, hours=hours), return string in iso tz format 
    """
    return datetime.datetime.strftime(datetime.datetime.now() - timedelta, "%Y-%m-%dT%H:%M:%SZ")

def get_activity_logs(activity_logs_path, resource_groups):
    activity_logs = {}
    start_time = get_start_time(activity_logs_starttime_timedelta)
    for resource_group in resource_groups:
        resource_group = resource_group['name']
        activity_log = !az monitor activity-log list --resource-group {resource_group} --start-time {start_time}
        activity_log = yaml.safe_load(activity_log.nlstr)
        activity_logs[resource_group] = activity_log
    with open(activity_logs_path, 'w') as f:
        json.dump(activity_logs, f, indent=4, sort_keys=True)
    return activity_logs    

def load_activity_logs(activity_logs_path):
    with open(activity_logs_path, 'r') as f:
        activity_logs = yaml.safe_load(f)
    return activity_logs



#################
# Tests
#################

def secure_transfer_required_is_set_to_enabled_3_1(storage_accounts):
    items_flagged_list = []
    for account in storage_accounts:
        name = account['name']
        resource_group = account['resourceGroup']
        enabled = account['enableHttpsTrafficOnly']
        if enabled != True:
            items_flagged_list.append((resource_group, name))
    stats = {'items_flagged': len(items_flagged_list), "items_checked": len(storage_accounts)}
    metadata = {"finding_name": "secure_transfer_required_is_set_to_enabled",
                "negative_name": "secure_transfer_required_not_enabled",
                "columns": ["Resource Group", "Storage Account Name"]}
    return {"items": items_flagged_list, 
            "stats": stats, 
            "metadata": metadata }
            

def storage_service_encryption_is_set_to_enabled_for_blob_service_3_2(storage_accounts):
    items_flagged_list = []
    for account in storage_accounts:
        if account['encryption']['services']['blob'] and (account['encryption']['services']['blob']['enabled'] != True):
            items_flagged_list.append((account['resourceGroup'], account['name']))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(storage_accounts)}
    metadata = {"finding_name": "storage_service_encryption_is_set_to_enabled_for_blob_service",
                "negative_name": "storage_service_encryption_not_enabled_for_blob_service",
                "columns": ["Resource Group","Storage Account Name"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata }
           

# may need to run section 6 Networking first to get activity_log
def storage_account_access_keys_are_periodically_regenerated_3_3(activity_logs, storage_accounts, resource_groups):
    items_flagged_list = []
    
    max_rotation_days = 90
    most_recent_rotations = {}
    for resource_group in resource_groups:
        resource_group_name = resource_group['name']
        for log in activity_logs[resource_group_name]:
            if log["authorization"] and (log["authorization"]["action"] == "Microsoft.Storage/storageAccounts/regenerateKey/action"):
                scope = log["authorization"]["scope"]
                _, _, _, resource_group, _, _, _, storage_account_name = scope.split('/')
                timestamp = log["eventTimestamp"]
                event_day = timestamp.split('T')[0]
                event_day = datetime.datetime.strptime(event_time, '%Y-%m-%d')
                status = log["status"]["localizedValue"]
                if status == "Success":
                    # fromtimestamp(0) gives smallest date possible in epoch time
                    existing_update = most_recent_rotations.get(storage_account, datetime.datetime.fromtimestamp(0))
                    most_recent_rotations[storage_account] = max(existing_update, event_time)

    for storage_account in storage_accounts:
        resource_group = storage_account["resourceGroup"]
        storage_account_name = storage_account['name']
        items_flagged_list.append((resource_group, storage_account_name, str(most_recent_rotations.get(storage_account_name, "No rotation"))))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(storage_accounts)}
    metadata = {"finding_name": "storage_account_access_keys_are_periodically_regenerated",
                "negative_name": "storage_account_access_keys_not_periodically_regenerated",
                "columns": ["Resource Group", "Storage Account", "Rotation Date"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def shared_access_signature_tokens_expire_within_an_hour_3_4(storage_accounts):
    """
    There is no automation possible for this currently
    Manual
    """
    pass

def shared_access_signature_tokens_are_allowed_only_over_https_3_5(storage_accounts):
    """
    There is no automation possible for this currently
    Manual
    """
    pass
                                      
def storage_service_encryption_is_set_to_enabled_for_file_service_3_6(storage_accounts):
    items_flagged_list = []
    stats = {}
    for account in storage_accounts:
        if account['encryption']['services']['file'] and (account['encryption']['services']['file']['enabled'] != True):
            items_flagged_list.append((account['name']))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(storage_accounts)}
    metadata = {"finding_name": "storage_service_encryption_is_set_to_enabled_for_file_service",
                "negative_name": "storage_service_encryption_not_enabled_for_file_service",
                "columns": ["Storage Account Name"]}

    return {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def public_access_level_is_set_to_private_for_blob_containers_3_7(storage_accounts):
    items_flagged_list = []
    items_checked = 0
    for account in storage_accounts:
        account_name = account["name"]
        resource_group = account["resourceGroup"]
        # get a key that works.  likely this will be a specific key not key[0]
        keys = !az storage account keys list --account-name {account_name} --resource-group {resource_group}
        keys = yaml.safe_load(keys.nlstr)
        key = keys[0]
        container_list = !az storage container list --account-name {account_name} --account-key {account_key}
        container_list = yaml.load(container_list.nlstr)
        for container in container_list:
            print(container)
            items_checked += 1
            public_access = container["properties"]["public_access"]
            if public_access == True:
                items_flagged_list.append((account_name, container))
    stats = {'items_flagged': len(items_flagged_list), "items_checked": items_checked}
    metadata = {"finding_name": "public_access_level_is_set_to_private_for_blob_containers",
                "negative_name": "public_access_level_not_private_for_blob_containers",
                "columns": ["Storage Account Name", "Container"]}
    
    return {"items": items_flagged_list, "stats": stats, "metadata": metadata }

    
def get_data():
    """
    Generate json for the storage_accounts findings
    """
    resource_groups = get_resource_groups(resource_groups_path)
    get_activity_logs(activity_logs_path, resource_groups)
    get_storage_accounts(storage_accounts_path)

def test_controls():
    """
    Generate filtered (failing) output in json
    """
    resource_groups = load_resource_groups(resource_groups_path)
    storage_accounts = load_storage_accounts(storage_accounts_path)
    activity_logs = load_activity_logs(activity_logs_path)
    
    storage_results = {}
    storage_results['secure_transfer_required_is_set_to_enabled'] = secure_transfer_required_is_set_to_enabled_3_1(storage_accounts)
    storage_results['storage_service_encryption_is_set_to_enabled_for_blob_service'] = storage_service_encryption_is_set_to_enabled_for_blob_service_3_2(storage_accounts)
    storage_results['storage_account_access_keys_are_periodically_regenerated'] = storage_account_access_keys_are_periodically_regenerated_3_3(activity_logs, storage_accounts, resource_groups)
    storage_results['storage_service_encryption_is_set_to_enabled_for_file_service'] = storage_service_encryption_is_set_to_enabled_for_file_service_3_6(storage_accounts)
    #storage_results['public_access_level_is_set_to_private_for_blob_containers'] = public_access_level_is_set_to_private_for_blob_containers_3_7(storage_accounts)
        
    with open(os.path.join(scan_data_dir, 'filtered', 'storage_accounts_filtered.json'), 'w') as f:
        json.dump(storage_results, f, indent=4, sort_keys=True)
    return storage_results


In [None]:
print(storage_accounts_path)
get_data()

In [None]:
test_controls()

In [None]:
#generate_finding??

In [None]:
!pwd

# SQL Services

### SQL Servers

In [None]:
# If editing utils.py you may need to reload here.
# from importlib import reload
# reload(utils)

In [None]:
# %load /praetorian-tools/azure_cis_scanner/scanner/sql_auditing.py
# Generate files in azcli_out
import utils
import yaml

sql_servers_path = os.path.join(raw_data_dir, 'sql_servers.json')
sql_server_policies_path = os.path.join(raw_data_dir, 'sql_server_policies.json')

def get_data():
    sql_servers = get_sql_servers(sql_servers_path)
    get_sql_server_policies(sql_server_policies_path, sql_servers)

def get_sql_servers(sql_servers_path) :
    sql_servers_string = utils.call("az sql server list")
    sql_servers_json = utils.jsonify(sql_servers_string)
    with open(sql_servers_path, 'w') as f:
        json.dump(sql_servers_json, f, indent=4, sort_keys=True)
    return sql_servers_json

def get_sql_server_policies(sql_server_policies_path, sql_servers):
    results = {}
    subscriptionId = utils.get_subscription_id()
    for sql_server in sql_servers:
        server_name = sql_server['name']
        resource_group = sql_server['resourceGroup']
        sql_server_policies = {}
        sql_server_policies['audit_policy'] = get_sql_server_audit_policies(subscriptionId, resource_group, server_name)
        sql_server_policies['threat_detection_policy'] = get_sql_server_threat_detection_policies(subscriptionId, resource_group, server_name)
        sql_server_policies['active_directory_admin_configurations'] = get_sql_server_active_directory_admin_configuration(subscriptionId, resource_group, server_name)
        results[(resource_group, server_name)] = sql_server_policies
    with open(sql_server_policies_path, 'w') as f:
        yaml.dump(results, f)
    return results

def load_sql_servers(sql_servers_path):
    with open(sql_servers_path, 'r') as f:
        sql_servers = yaml.load(f)
    return sql_servers

def load_sql_server_policies(sql_server_policies_path):
    with open(sql_server_policies_path, 'r') as f:
        sql_server_policies = yaml.load(f)
    return sql_server_policies
    
# This function will be recentered around Azure Command Line, after such an option becomes available.
def get_sql_server_audit_policies(subscriptionId, resource_group, server_name):
    endpoint = "https://management.azure.com/subscriptions/"+subscriptionId+"/resourceGroups/"+resource_group+"/providers/Microsoft.Sql/servers/"+server_name+"/auditingSettings/Default?api-version=2015-05-01-preview"
    sql_server_audit_policy = utils.make_request(endpoint)
    sql_server_audit_policy = utils.jsonify(sql_server_audit_policy)
    return sql_server_audit_policy

# This function will be recentered around Azure Command Line, after such an option becomes available.
def get_sql_server_threat_detection_policies(subscriptionId, resource_group, server_name):
    endpoint = "https://management.azure.com/subscriptions/"+subscriptionId+"/resourceGroups/"+resource_group+"/providers/Microsoft.Sql/servers/"+server_name+"/securityAlertPolicies/Default?api-version=2015-05-01-preview"
    sql_server_threat_detection_policy = utils.make_request(endpoint)
    sql_server_threat_detection_policy = utils.jsonify(sql_server_threat_detection_policy)
    return sql_server_threat_detection_policy

def get_sql_server_active_directory_admin_configuration(subscriptionId, resource_group, server_name):
    active_directory_admin_configuration = utils.call("az sql server ad-admin list --resource-group " + resource_group + " --server " + server_name)
    active_directory_admin_configuration = utils.jsonify(active_directory_admin_configuration)
    return active_directory_admin_configuration

##################
# Tests
##################
def wrap(pre, post):
    def decorate(func):
        def call(*args, **kwargs):
            pre(func, *args, **kwargs)
            result = func(*args, **kwargs)
            post(func, result, results, *args, **kwargs)
            return result
        return call
    return decorate

def remove_section_digits(name):
    filtered = []
    name_words = name.split('_')
    # remove trailing digits, taking care not to remove 90 in ...than_90_days_4_1_7
    for i, word in enumerate(name_words):
        if not word.isdigit() or ( (i < len(name_words)-2) and not name_words[i+1].isdigit()):
            filtered.append(word)
    return '_'.join(filtered)

def trace_in(func, *args, **kwargs):
    pass

def trace_out(func, result, *args, **kwargs):
    name = remove_section_digits(func.__name__)
    finding_results = results.get(name, {})
    if finding_results:
        items_flagged_list = finding_results["items"]
        items_checked = finding_results["stats"]["items_checked"]
    else:
        items_flagged_list = []
        items_checked = 0
    items_checked += 1
    if not result:
        items_flagged_list.append((kwargs['resource_group'], kwargs['server_name']))
           
    results[name] = {"items": items_flagged_list, "stats": {"items_checked": items_checked}}
        
results = {}

def test_controls() :
    global results
    sql_servers = load_sql_servers(sql_servers_path)
    sql_server_policies = load_sql_server_policies(sql_server_policies_path)
    
    for (resource_group, server_name), sql_server_policy in sql_server_policies.items():
        sql_server_audit_policy = sql_server_policies[(resource_group, server_name)]['audit_policy']
        sql_server_threat_detection_policy = sql_server_policies[(resource_group, server_name)]['threat_detection_policy']
        sql_server_active_directory_admin_configurations = sql_server_policies[(resource_group, server_name)]['active_directory_admin_configurations']

        auditing_is_set_to_on_4_1_1(sql_server_audit_policy, resource_group=resource_group, server_name=server_name)
        threat_detection_is_set_to_on_4_1_2(sql_server_threat_detection_policy, resource_group=resource_group, server_name=server_name)
        threat_detection_types_is_set_to_all_4_1_3(sql_server_threat_detection_policy, resource_group=resource_group, server_name=server_name)
        send_alerts_to_is_set_4_1_4(sql_server_threat_detection_policy, resource_group=resource_group, server_name=server_name)
        email_service_and_co_administrators_is_enabled_4_1_5(sql_server_threat_detection_policy, resource_group=resource_group, server_name=server_name)
        auditing_retention_is_greater_than_90_days_4_1_6(sql_server_audit_policy, resource_group=resource_group, server_name=server_name)
        threat_detection_retention_is_greater_than_90_days_4_1_7(sql_server_threat_detection_policy, resource_group=resource_group, server_name=server_name)
        azure_active_directory_admin_is_configured_4_1_8(sql_server_active_directory_admin_configurations, resource_group=resource_group, server_name=server_name)

    stats_results = {}
    for finding in results:
        items_flagged_list = results[finding]["items"]
        items_checked = results[finding]["stats"]["items_checked"]
        items_flagged = len(items_flagged_list)
        stats = {'items_flagged': len(items_flagged_list),
                 'items_checked': items_checked}
        metadata = {"finding_name": finding,
                    "negative_name": "",
                    "columns": ["Region", "Server"]}            
        stats_results[finding] = {"items": items_flagged_list, "stats": stats, "metadata": metadata}
        
    with open(os.path.join(scan_data_dir, 'filtered', 'sql_servers_filtered.json'), 'w') as f:
        yaml.dump(stats_results, f)
    # clear results for next run
    results = {}
    return stats_results

@wrap(trace_in, trace_out)
def auditing_is_set_to_on_4_1_1(sql_server_audit_policies, resource_group=None, server_name=None):
    if sql_server_audit_policies["properties"]["state"] == "Disabled" :
        return False
    else:
        return True

@wrap(trace_in, trace_out)
def threat_detection_is_set_to_on_4_1_2(sql_server_threat_detection_policies, resource_group=None, server_name=None):
    if sql_server_threat_detection_policies["properties"]["state"] == "Disabled" :
        return False
    else:
        return True
    
@wrap(trace_in, trace_out)
def threat_detection_types_is_set_to_all_4_1_3(sql_server_threat_detection_policies, resource_group=None, server_name=None):
    if sql_server_threat_detection_policies["properties"]["state"] == "Disabled" or sql_server_threat_detection_policies["properties"]["disabledAlerts"] != "":
        return False
    else:
        return True

@wrap(trace_in, trace_out)
def send_alerts_to_is_set_4_1_4(sql_server_threat_detection_policies, resource_group=None, server_name=None):
    if sql_server_threat_detection_policies["properties"]["state"] == "Disabled" or sql_server_threat_detection_policies["properties"]["emailAddresses"] != "":
        return False
    else:
        return True

@wrap(trace_in, trace_out)
def email_service_and_co_administrators_is_enabled_4_1_5(sql_server_threat_detection_policies, resource_group=None, server_name=None):
    if sql_server_threat_detection_policies["properties"]["state"] == "Disabled" or sql_server_threat_detection_policies["properties"]["emailAccountAdmins"] != "":
        return False
    else:
        return True

@wrap(trace_in, trace_out)
def auditing_retention_is_greater_than_90_days_4_1_6(sql_server_audit_policies, resource_group=None, server_name=None):
    if (sql_server_audit_policies["properties"]["state"] == "Disabled"):
        return False
    retention_days = int(sql_server_audit_policies["properties"]["retentionDays"])
    if (retention_days) ==0 or (retention_days > 90):
        return True
    else:    
        return False
    
@wrap(trace_in, trace_out)
def threat_detection_retention_is_greater_than_90_days_4_1_7(sql_server_threat_detection_policies, resource_group=None, server_name=None):
    if (sql_server_threat_detection_policies["properties"]["state"] == "Disabled"):
        return False
    retention_days = int(sql_server_threat_detection_policies["properties"]["retentionDays"])
    if (retention_days) ==0 or (retention_days > 90):
        return True
    else:    
        return False
    
@wrap(trace_in, trace_out)
def azure_active_directory_admin_is_configured_4_1_8(sql_server_active_directory_admin_configurations, resource_group=None, server_name=None):
    if not sql_server_active_directory_admin_configurations:
        return False
    else:
        return True

In [None]:
get_data()

In [None]:
test_controls()

### SQL Databases

In [None]:
# %%writefile /praetorian-tools/azure_cis_scanner/scanner/sql_databases.py

sql_databases_path = os.path.join(raw_data_dir, 'sql_databases.json')
sql_database_policies_path = os.path.join(raw_data_dir, 'sql_database_policies.json')
sql_databases_filtered_path = os.path.join(scan_data_dir, 'filtered', 'sql_databases_filtered.json')

def get_sql_servers(sql_servers_path):
    sql_servers = !az sql server list
    sql_servers = yaml.load(sql_servers.nlstr)
    with open(sql_servers_path, 'w') as f:
        yaml.dump(sql_servers, f)
    return sql_servers

def load_sql_servers(sql_servers_path):
    with open(sql_servers_path, 'r') as f:
        sql_servers = yaml.load(f)
    return sql_servers

def get_dbs(sql_servers, sql_databases_path):
    server_dbs = {}
    for server in sql_servers:
        server_name = server['name']
        resource_group = server['resourceGroup']
        dbs = !az sql db list --resource-group $resource_group --server $server_name
        dbs = yaml.load(dbs.nlstr)
        server_dbs[(resource_group, server_name)] = dbs
        
    with open(sql_databases_path, 'w') as f:
        yaml.dump(server_dbs, f)
    return server_dbs

def load_dbs(sql_databases_path):
    with open(sql_databases_path, 'r') as f:
        server_dbs = yaml.load(f)
    return server_dbs

def get_sql_database_policies(sql_dbs, sql_database_policies_path):
    """
    For each db in sql_dbs fetch the policies and write to disk
    """
    sql_database_policies = {}
    for (resource_group, server_name), dbs in sql_dbs.items():
        for db in dbs:
            print(db)
            db_name = db['name']
            if db_name == 'master':
                continue
            threat_policy = !az sql db threat-policy show --resource-group {resource_group} --server {server_name} --name {db_name}
            threat_policy = yaml.load(threat_policy.nlstr)

            audit_policy = !az sql db audit-policy show --resource-group {resource_group} --server {server_name} --name {db_name}
            audit_policy = yaml.load(audit_policy.nlstr)

            tde_policy = !az sql db tde show --resource-group {resource_group} --server {server_name} --database {db_name}
            tde_policy = yaml.load(tde_policy.nlstr)

            sql_policy = {}
            sql_policy['threat'] = threat_policy
            sql_policy['audit'] = audit_policy
            sql_policy['tde'] = tde_policy
            sql_database_policies[(resource_group, server_name, db_name)] = sql_policy
        
    with open(sql_database_policies_path, 'w') as f:
        yaml.dump(sql_database_policies, f)
    return sql_database_policies

def load_sql_database_policies(sql_policies_path):
    """
    Load sql database policies
    """
    with open(sql_database_policies_path, 'r') as f:
        sql_database_policies = yaml.load(f)
    return sql_database_policies

################
# Tests
################

def remove_section_digits(name):
    filtered = []
    name_words = name.split('_')
    # remove trailing digits, taking care not to remove 90 in ...than_90_days_4_1_7
    for i, word in enumerate(name_words):
        if not word.isdigit() or ( (i < len(name_words)-2) and not name_words[i+1].isdigit()):
            filtered.append(word)
    return '_'.join(filtered)
    
results = {}
def test_controls():
    global results
    def wrap(pre, post):
        global results
        def decorate(func):
            global results
            def call(*args, **kwargs):
                global results
                pre(func, *args, **kwargs)
                result = func(*args, **kwargs)
                post(func, result, results, *args, **kwargs)
                return result
            return call
        return decorate
    
    def remove_section_digits(name):
        return '_'.join([item for item in name.split('_') if not item.isdigit()])

    def trace_in(func, *args, **kwargs):
        global results
        pass
    
    def trace_out(func, result, *args, **kwargs):
        global results
        name = remove_section_digits(func.__name__)
        finding_results = results.get(name, {})
        if finding_results:
            items_flagged_list = finding_results["items"]
            items_checked = finding_results["stats"]["items_checked"]
        else:
            items_flagged_list = []
            items_checked = 0
        items_checked += 1
        if not result:
            items_flagged_list.append((resource_group, server_name, db))
           
        results[name] = {"items": items_flagged_list, "stats": {"items_checked": items_checked}}


    @wrap(trace_in, trace_out)
    def auditing_is_set_to_on_4_2_1(audit_policy):
        if audit_policy['state'] != 'Enabled':
            return False
        else:
            return True
  
    @wrap(trace_in, trace_out)
    def threat_detection_is_set_to_on_4_2_2(threat_policy):
        if threat_policy['state'] != 'Enabled':
            return False
        else:
            return True
        
    @wrap(trace_in, trace_out)
    def threat_detection_types_is_set_to_all_4_2_3(threat_policy):
        if threat_policy['disabledAlerts'] in ['All', '']:
            return True
        else:
            print('threat_detection_types_is_set_to_all_4_2_3 disabledAlerts', threat_policy['disabledAlerts'])
            return False
        
    @wrap(trace_in, trace_out)
    def send_alerts_to_is_set_4_2_4(threat_policy):
        if threat_policy['emailAddresses']:
            return set(threat_policy['emailAddresses'])
        else:
            return False
        
    @wrap(trace_in, trace_out)
    def email_service_and_co_administrators_is_enabled_4_2_5(threat_policy):
        if threat_policy['emailAccountAdmins'] != "Enabled":
            return False
        else:
            return True
        
    @wrap(trace_in, trace_out)
    def data_encryption_is_set_to_on_4_2_6(tde_policy):
        if tde_policy['status'] != "Enabled":
            return False
        else:
            return True

    @wrap(trace_in, trace_out)
    def auditing_retention_is_greater_than_90_days_4_2_7(audit_policy):
        if (audit_policy['retentionDays'] > 0) and (audit_policy['retention_days'] <= 90):
            return False
        else:
            return True

    @wrap(trace_in, trace_out)
    def threat_retention_is_greater_than_90_days_4_2_8(threat_policy):
        if (threat_policy['retentionDays'] > 0) and (threat_policy['retention_days'] == 0) <= 90:
            return False
        else:
            return True
    
    sql_database_policies = load_sql_database_policies(sql_database_policies_path)
    for (resource_group, server_name, db), sql_database_policy in sql_database_policies.items():
        
        audit_policy = sql_database_policy['audit']
        threat_policy = sql_database_policy['threat']
        tde_policy = sql_database_policy['tde']            
        
        auditing_is_set_to_on_4_2_1(audit_policy)
        threat_detection_is_set_to_on_4_2_2(threat_policy)
        threat_detection_types_is_set_to_all_4_2_3(threat_policy)
        send_alerts_to_is_set_4_2_4(threat_policy)
        email_service_and_co_administrators_is_enabled_4_2_5(threat_policy)
        data_encryption_is_set_to_on_4_2_6(tde_policy)
        auditing_retention_is_greater_than_90_days_4_2_7(audit_policy)
        threat_retention_is_greater_than_90_days_4_2_8(threat_policy)

    stats_results = {}
    for finding in results:
        items_flagged_list = results[finding]["items"]
        items_checked = results[finding]["stats"]["items_checked"]
        items_flagged = len(items_flagged_list)
        stats = {'items_flagged': len(items_flagged_list),
                 'items_checked': items_checked}
        metadata = {"finding_name": finding,
                    "negative_name": "",
                    "columns": ["Region", "Server", "Database"]}            
        stats_results[finding] = {"items": items_flagged_list, "stats": stats, "metadata": metadata}
    
    with open(sql_databases_filtered_path, 'w') as f:
        yaml.dump(stats_results, f)
    # clear results for next run
    results = {}
    return stats_results

def get_data():
    sql_servers = get_sql_servers(sql_servers_path)
    sql_dbs = get_dbs(sql_servers, sql_databases_path)
    
    sql_database_policies = get_sql_database_policies(sql_dbs, sql_database_policies_path)

    return {"sql_servers": sql_servers, "sql_databases": sql_dbs, "sql_database_policies": sql_database_policies}




In [None]:
print(sql_databases_path)
get_data()

In [None]:
test_controls()

# 5 Logging and Monitoring

In [None]:
# Generate files in raw_data_dir

monitor_diagnostic_settings_path = os.path.join(raw_data_dir, 'monitor_diagnostic_settings.json')
activity_logs_path = os.path.join(raw_data_dir, 'activity_logs.json')

resource_ids_for_diagnostic_settings_path = os.path.join(raw_data_dir, 'resource_ids_for_diagnostic_settings.json')
resource_diagnostic_settings_path = os.path.join(raw_data_dir, 'resource_diagnostic_settings.json')

logging_and_monitoring_filtered_path = os.path.join(filtered_data_dir, 'logging_and_monitoring_filtered.json')

def get_resource_ids_for_diagnostic_settings():
    resource_ids = []
    # Other resource_ids could be gathered.  So far, only keyvault
    keyvaults = !az keyvault list    
    keyvaults = yaml.load(keyvaults.nlstr)
    for keyvault in keyvaults:
        resource_ids.append(keyvault['id'])
    with open(resource_ids_for_diagnostic_settings_path, 'w') as f:
        json.dump(resource_ids, f, indent=4, sort_keys=True)
    return resource_ids

def load_resource_ids_for_diagnostic_settings(resource_ids_for_diagnostic_settings_path):
    with open(resource_ids_for_diagnostic_settings_path, 'r') as f:
        resource_ids_for_diagnostic_settings = yaml.load(f)
    return resource_ids_for_diagnostic_settings

def get_resource_diagnostic_settings(resource_ids_for_diagnostic_settings):
    keyvault_settings_list = []
    for resource_id in resource_ids_for_diagnostic_settings:
        keyvault_settings = !az monitor diagnostic-settings list --resource {resource_id}
        keyvault_settings = yaml.load(keyvault_settings.nlstr)
        *prefix, resource_group, _, _, _, keyvault_name = resource_id.split('/')
        for setting in keyvault_settings['value']:
            print('settings: ', keyvault_settings)
            setting['keyvault_name'] = keyvault_name
        keyvault_settings_list.append(keyvault_settings)
        
    with open(resource_diagnostic_settings_path, 'w') as f:
        yaml.dump(keyvault_settings_list, f)
    return resource_ids_for_diagnostic_settings 

def load_resource_diagnostic_settings(resource_diagnostic_settings_path):
    with open(resource_diagnostic_settings_path, 'r') as f:
        resource_diagnostic_settings = yaml.load(f)
    return resource_diagnostic_settings        
        
def get_monitor_diagnostic_settings(monitor_diagnostic_settings_path, resource_ids):
    """
    @monitor_diagnostic_settings_path: string - path to output json file
    @returns: list of activity_log_alerts dicts
    """
    monitor_diagnostic_settings_results = {}
    for resource_id in resource_ids:
        monitor_diagnostic_settings = !az monitor diagnostic-settings list --resource {resource_id}
        monitor_diagnostic_settings = yaml.load(monitor_diagnostic_settings.nlstr)
        monitor_diagnostic_settings_results[resource_id] = monitor_diagnostic_settings
    with open(monitor_diagnostic_settings_path, 'w') as f:
        json.dump(monitor_diagnostic_settings_results, f, indent=4, sort_keys=True)
    return monitor_diagnostic_settings_results

def load_monitor_diagnostic_settings(monitor_diagnostic_settings_path):
    with open(monitor_diagnostic_settings_path, 'r') as f:
        monitor_diagnostic_settings = yaml.load(f)
    return monitor_diagnostic_settings

monitor_log_profiles_path = os.path.join(raw_data_dir, 'monitor_log_profiles.json')

def get_monitor_log_profiles(monitor_log_profiles_path):
    monitor_log_profiles = !az monitor log-profiles list
    monitor_log_profiles = yaml.load(monitor_log_profiles.nlstr)
    with open(monitor_log_profiles_path, 'w') as f:
        json.dump(monitor_log_profiles, f, indent=4, sort_keys=True)
    return monitor_log_profiles

def load_monitor_log_profiles(monitor_log_profiles_path):
    with open(monitor_log_profiles_path, 'r') as f:
        monitor_log_profiles = yaml.load(f)
    return monitor_log_profiles


activity_logs_starttime_timedelta = datetime.timedelta(days=90)
def get_start_time(timedelta=datetime.timedelta(days=90)):
    """
    Given datetime.timedelta(days=days, hours=hours), return string in iso tz format 
    """
    return datetime.datetime.strftime(datetime.datetime.now() - timedelta, "%Y-%m-%dT%H:%M:%SZ")

def get_activity_logs(activity_logs_path, resource_groups):
    activity_logs = {}
    start_time = get_start_time(activity_logs_starttime_timedelta)
    for resource_group in resource_groups:
        resource_group = resource_group['name']
        activity_log = !az monitor activity-log list --resource-group {resource_group} --start-time {start_time}
        activity_log = yaml.load(activity_log.nlstr)
        activity_logs[resource_group] = activity_log
    with open(activity_logs_path, 'w') as f:
        json.dump(activity_logs, f, indent=4, sort_keys=True)
    return activity_logs    

def load_activity_logs(activity_logs_path):
    with open(activity_logs_path, 'r') as f:
        activity_log = yaml.load(f)
    return activity_log

activity_log_alerts_path = os.path.join(raw_data_dir, 'activity_log_alerts.json')

def get_activity_log_alerts(activity_log_alerts_path):
    activity_log_alerts = !az monitor activity-log alert list
    activity_log_alerts = yaml.load(activity_log_alerts.nlstr)
    with open(activity_log_alerts_path, 'w') as f:
        json.dump(activity_log_alerts, f, indent=4, sort_keys=True)
    return activity_log_alerts   

def load_activity_log_alerts(activity_log_alerts_path):
    with open(activity_log_alerts_path, 'r') as f:
        activity_log_alerts = yaml.load(f)
    return activity_log_alerts

def get_data():
    resource_ids_for_diagnostic_settings = get_resource_ids_for_diagnostic_settings()
    resource_groups = get_resource_groups(resource_groups_path)
    get_monitor_log_profiles(monitor_log_profiles_path)
    get_monitor_diagnostic_settings(monitor_diagnostic_settings_path, resource_ids_for_diagnostic_settings)
    get_activity_log_alerts(activity_log_alerts_path)
    get_activity_logs(activity_logs_path, resource_groups)
    get_resource_diagnostic_settings(resource_ids_for_diagnostic_settings)

    
    
##################
# Tests
##################

def test_controls():
    """
    Use the data in raw_data_dir or in memory to run tests.
    Filtered output of raw_data_dir for failing systems is placed in filtered_data_dir
    """
    resource_ids_for_diagnostic_settings = load_resource_ids_for_diagnostic_settings(resource_ids_for_diagnostic_settings_path)
    resource_diagnostic_settings = load_resource_diagnostic_settings(resource_diagnostic_settings_path)
    resource_groups = load_resource_groups(resource_groups_path)
    monitor_log_profiles = load_monitor_log_profiles(monitor_log_profiles_path)
    monitor_diagnostic_settings = load_monitor_diagnostic_settings(monitor_diagnostic_settings_path)
    activity_log_alerts = load_activity_log_alerts(activity_log_alerts_path)
    activity_logs = load_activity_logs(activity_logs_path)
    
    
    results = {}
    results['a_log_profile_exists'] = a_log_profile_exists_5_1(monitor_log_profiles)
    results['activity_log_retention_is_set_365_days_or_greater'] = activity_log_retention_is_set_365_days_or_greater_5_2(monitor_log_profiles)
    results['activity_log_alert_is_configured'] = activity_log_alert_is_configured(activity_log_alerts, log_alert_policies)
    results['logging_for_azure_keyvault_is_enabled'] = logging_for_azure_keyvault_is_enabled_5_13(resource_diagnostic_settings)

    with open(logging_and_monitoring_filtered_path, 'w') as f:
        json.dump(results, f, indent=4, sort_keys=True)
    return results


def a_log_profile_exists_5_1(monitor_log_profiles):
    items_flagged_list = []
    if monitor_log_profiles:
        pass
    else:
        items_flagged_list.append(("No log profile"))
        
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': min(1, len(items_flagged_list))}
    metadata = {"finding_name": "a_log_profile_exists",
                "negative_name": "",
                "columns": ["Monitor Log Profile"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}
    

# Todo, untested as we have [] for log-profiles
#@gen_results(results)
def activity_log_retention_is_set_365_days_or_greater_5_2(monitor_log_profiles):
    items_flagged_list = []
    if monitor_log_profiles:
        for profile in monitor_log_profiles:
            if monitor_log_profiles['retentionPolicy'] <= MIN_ACTIVITY_LOG_RETENDION_DAYS:
                items_flagged_list.append((profile))
    
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(monitor_log_profiles) if monitor_log_profiles else 1}
    metadata = {"finding_name": "activity_log_retention_is_set_365_days_or_greater",
                "negative_name": "",
                "columns": ["Monitor Log Profile", "Retention Days"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}

# 5.3, 5.4, 5.5, 5.6, 5.7, 5.8, 5.9, 5.10, 5.11, 5.12    
log_alert_policies_str = '''
- alert_name: 'create_policy_assignment'
  operation_name: 'Microsoft.Authorization/policyAssignments/write'
  present: False
- alert_name: 'create_or_update_network_security_group'
  operation_name: 'Microsoft.Network/networkSecurityGroups/write'
  present: False
- alert_name: 'delete_network_security_group'
  operation_name: 'Microsoft.Network/networkSecurityGroups/delete'
  present: False
- alert_name: 'create_or_update_network_security_group_rule'
  operation_name: 'Microsoft.Network/networkSecurityGroups/securityRules/write'
  present: False
- alert_name: 'delete_network_security_group_rule'
  operation_name: 'Microsoft.Network/networkSecurityGroups/securityRules/delete'
  present: False
- alert_name: 'create_or_update_security_solution'
  operation_name: 'Microsoft.Security/securitySolutions/write'
  present: False
- alert_name: 'delete_security_solution'
  operation_name: 'Microsoft.Security/securitySolutions/delete'
  present: False
- alert_name: 'update_or_create_SQL_server_firewall_rule'
  operation_name: 'Microsoft.Sql/servers/firewallRules/write'
  present: False
- alert_name: 'delete_SQL_server_firewall_rule'
  operation_name: 'Microsoft.Sql/servers/firewallRules/delete'
  present: False
- alert_name: 'update_security_policy'
  operation_name: 'Microsoft.Security/policies/write'
  present: False
'''
log_alert_policies = yaml.load(log_alert_policies_str)

def activity_log_alert_is_configured(activity_log_alerts, log_alert_policies):
    """
    #TODO WIP
    For each resource_group determine if activity-log alerts are configured correctly
    @returns: list of [resource_group, True of False for 5.3 to 5.12 in succession]
    """
    items_flagged_list = []

  
    for log_alert in activity_log_alerts:
        condition = log_alert.get('condition', [])
        if not condition:
            continue
        conditions = condition.get('allOf', [])
        if not conditions:
            continue
        for condition in conditions:
            for log_alert_policy in log_alert_policies:
                if condition.get('equals') and (condition.get('equals') == log_alert_policy["operation_name"]):
                    log_alert_policy["present"] = True

    for log_alert_policy in log_alert_policies:
        if log_alert_policy['present'] == False:
            items_flagged_list.append((log_alert_policy['alert_name'], log_alert_policy['operation_name']))
    
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(log_alert_policies)}
    metadata = {"finding_name": "activity_log_alert_is_configured",
                "negative_name": "",
                "columns": ["Missing Policy"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}
                                                                    
#@gen_results(results)
MIN_ACTIVITY_LOG_RETENDION_DAYS = 365
MIN_KEY_VAULT_RETENTION_DAYS = 180
def logging_for_azure_keyvault_is_enabled_5_13(resource_diagnostic_settings):        
    items_flagged_list = []
    for setting in resource_diagnostic_settings:
        keyvault_settings_values = setting['value']
        if keyvault_settings_values:
            for value in keyvault_settings_values:
                # Do we need to loop over ['logs'] list as well?  My lists are length 1, only checking [0]
                keyvault_name = value['keyvault_name']
                enabled = value['logs'][0]['enabled']
                retention_enabled = value['logs'][0]['retentionPolicy']['enabled']
                retention_days = value['logs'][0]['retentionPolicy']['days']
                if not (enabled and retention_enabled and (retention_days >= MIN_KEY_VAULT_RETENTION_DAYS)):
                    items_flagged_list.append((keyvault_name, enabled, retention_enabled, retention_days))
        else:
            items_flagged_list.append((resource_group, keyvault_name, "False", "False", "None"))
            
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(resource_diagnostic_settings)}
    metadata = {"finding_name": "logging_for_azure_keyvault_is_enabled",
                "negative_name": "",
                "columns": ["Keyvault", "Enabled", "Retention Enabled", "Retention Days"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}
              
    

In [None]:
get_data()

In [None]:
test_controls()

# 6 Networking

In [None]:
##########################
# Get Raw Data
##########################

network_flows_path = os.path.join(raw_data_dir, "network_flows.json")
resource_groups_path = os.path.join(raw_data_dir, "resource_groups.json")
networking_filtered_path = os.path.join(scan_data_dir, 'filtered', 'networking_filtered.json')

network_security_groups_path = os.path.join(raw_data_dir, "network_security_groups.json")

def get_data():
    get_resource_groups(resource_groups_path)
    network_security_groups = get_network_security_groups(network_security_groups_path)
    get_network_watcher(network_watcher_path)
    get_network_flows(network_flows_path, network_security_groups)
    
def get_network_security_groups(network_security_groups_path):
    """
    @network_path: string - path to output json file
    """
    network_security_groups = !az network nsg list
    network_security_groups = yaml.load(network_security_groups.nlstr)
    with open(network_security_groups_path, 'w') as f:
        json.dump(network_security_groups, f, indent=4, sort_keys=True)
    return network_security_groups

def load_network_security_groups(network_security_groups_path):
    with open(network_security_groups_path, 'r') as f:
        network_security_groups = yaml.load(f)
    return network_security_groups

network_watcher_path = os.path.join(raw_data_dir, "network_watcher.json")
approved_regions = []
def get_network_watcher(network_watcher_path):
    """
    @network_watcher_path: string - path to output json file
    """
    network_watcher = !az network watcher list
    network_watcher = yaml.load(network_watcher.nlstr)
    with open(network_watcher_path, 'w') as f:
        json.dump(network_watcher, f, indent=4, sort_keys=True)
    return network_watcher

def load_network_watcher(network_watcher_path):
    with open(network_watcher_path, 'r') as f:
        network_watcher = yaml.load(f)
    return network_watcher

def get_network_flows(network_flows_path, network_security_groups):
    """
    @network_flows_path: string - path to output json file
    @network_security_groups: list of nsgs
    @returns: list of network flow dicts
    """
    network_flows = []
    for nsg in network_security_groups:
        resource_group = nsg['resourceGroup']
        nsg_id = nsg['id']
        network_flow = !az network watcher flow-log show --resource-group {resource_group} --nsg {nsg_id}
        network_flow = yaml.load(network_flow.nlstr)
        nsg_name = nsg["name"]
        network_flows.append({"resource_group": resource_group, "nsg_name": nsg_name, "network_flow": network_flow})
        
    with open(network_flows_path, 'w') as f:
        json.dump(network_flows, f, indent=4, sort_keys=True)
    return network_flows

def load_network_flows(network_flows_path):
    with open(network_flows_path, 'r') as f:
        network_flows = yaml.load(f)
    return network_flows

##########################
# Tests
##########################

def test_controls():
    """
    Generate filtered (failing) output in json
    """
    network_watcher = load_network_watcher(network_watcher_path)
    network_security_groups = load_network_security_groups(network_security_groups_path)
    resource_groups = load_resource_groups(resource_groups_path)
    network_flows = load_network_flows(network_flows_path)
    networking_results = {}

    networking_results['access_is_restricted_from_the_internet'] = access_is_restricted_from_the_internet_6_1(network_security_groups)
    networking_results['network_security_group_flow_log_retention_period_is_greater_than_90_days'] = network_security_group_flow_log_retention_period_is_greater_than_90_days_6_4(network_flows)
    networking_results['network_watcher_is_enabled'] = network_watcher_is_enabled_6_5(network_watcher)
                
    with open(networking_filtered_path, 'w') as f:
        json.dump(networking_results, f, indent=4, sort_keys=True)
    return networking_results

# 6.1, 6.2, 
def access_is_restricted_from_the_internet_6_1(network_security_groups):
    items_flagged_list = []
    for nsg in network_security_groups:
        # should actually be any port range that includes 3389
        security_rules = nsg['securityRules']
        for security_rule in security_rules:
            if security_rule['destinationPortRange'] == '3389' and security_rule['direction'] == 'Inbound' and security_rule['protocol'] == 'TCP':
                if security_rule['sourceAddressPrefix'] in ['*', '/0', 'internet', 'any']:
                    items_flagged_list.append((nsg['resourceGroup'],nsg['name'], '3389', security_rule))
            if security_rule['destinationPortRange'] == '22' and security_rule['direction'] == 'Inbound':
                if security_rule['sourceAddressPrefix'] in ['*', '/0', 'internet', 'any']:
                    items_flagged_list.append((nsg['resourceGroup'],nsg['name'], '22', security_rule))
                    
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(network_security_groups)}
    metadata = {"finding_name": "access_is_restricted_from_the_internet",
                "negative_name": "",
                "columns": ["Resource Group", "NSG", "Port", "Rule"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}


def sql_server_access_is_restricted_from_the_internet_6_3():
    """
    Powershell
    """
    pass                

def network_security_group_flow_log_retention_period_is_greater_than_90_days_6_4(network_flows):
    items_flagged_list = []
    for network_flow in network_flows:
        flow = network_flow['network_flow']
        if flow['enabled'] == False:
            status = "Not enabled"
            items_flagged_list.append((network_flow['resource_group'], network_flow['nsg_name'], status))
        elif flow['retentionPolicy']['days'] == 0:
            continue
        elif (flow['retentionPolicy']['days'] < 90) or (flow['retentionPolicy']['enabled'] == False):
            status("Days {}, Enabled {}".format(flow['retentionPolicy']['days'], flow['retentionPolicy']['enabled']))
            items_flagged_list.append((network_flow['resource_group'], network_flow['nsg_name'], status))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(network_flows)}
    metadata = {"finding_name": "network_security_group_flow_log_retention_period_is_greater_than_90_days",
                "negative_name": "",
                "columns": ["Resource Group", "Network Flow", "Status"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def network_watcher_is_enabled_6_5(network_watcher):
    items_flagged_list = []    
    for watcher in network_watcher:
        if watcher['provisioningState'] != 'Succeeded':
            items_flagged_list.append((watcher))
            
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(network_watcher)}
    metadata = {"finding_name": "network_security_group_flow_log_retention_period_is_greater_than_90_days",
                "negative_name": "",
                "columns": ["Resource Group", "Network Flow", "Status"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}


Beware of permission errors visible from the cli

$ az network watcher flow-log show -g <my-group> --nsg <my-nsg>
The client 'kesten.Broughton@texascapitalbank.com' with object id 'abdcxxx-asdlj-fdskl-yyy-asfd17' does not have authorization to perform action 'Microsoft.Network/networkWatchers/queryFlowLogStatus/action' over scope '/subscriptions/exxxxxx-cbbbb-4444-bbbb-aaaaaaa2e3/resourceGroups/NetworkWatcherRG/providers/Microsoft.Network/networkWatchers/NetworkWatcher_southcentralus'.

In [None]:
get_data()

In [None]:
test_controls()

In [None]:
#generate_finding(findings_template_path, r_parsed, 'Virtual Machines', 2, output='', findings_output_path=findings_out_path )

In [None]:
!ls /praetorian-tools


# Virtual Machines

In [None]:
# Virtual Machines

filtered_virtual_machines_path = os.path.join(filtered_data_dir, 'virtual_machines_filtered.json')
virtual_machines_path = os.path.join(raw_data_dir, 'virtual_machines.json')

def get_virtual_machines(virtual_machines_path):
    """
    @virtual_machines_path: string - path to output json file
    @returns: list of virtual_machines dict
    """
    virtual_machines = !az vm list
    virtual_machines = yaml.load(virtual_machines.nlstr)
    with open(virtual_machines_path, 'w') as f:
        json.dump(virtual_machines, f, indent=4, sort_keys=True)
    return virtual_machines

def load_virtual_machines(virtual_machines_path):
    with open(virtual_machines_path, 'r') as f:
        virtual_machines = yaml.load(f)
    return virtual_machines

def get_data():
    get_virtual_machines(virtual_machines_path)

def vm_agent_is_installed_7_1(virtual_machines):
    items_flagged_list = []
    for vm in virtual_machines:
        has_agent = False
        if vm['resources']:
            for resource in vm["resources"]:
                if ((vm['resources'][0]['virtualMachineExtensionType'] == 'MicrosoftMonitoringAgent') and (vm['resources'][0]['provisioningState'] == 'Succeeded')):
                    has_agent = True
        if has_agent:
            items_flagged_list.append((vm['resourceGroup'], vm['name']))
    
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(virtual_machines)}
    metadata = {"finding_name": "vm_agent_is_installed",
                "negative_name": "",
                "columns": ["Resource Group", "Name"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def os_disk_is_encrypted_7_2(virtual_machines):
    items_flagged_list = []
    items_checked = 0
    for vm in virtual_machines:
        if vm['storageProfile']['osDisk']['encryptionSettings']:
            if not (vm['storageProfile']['osDisk']['encryptionSettings']['enabled'] == True):
                items_flagged_list.append((vm['resourceGroup'], vm['name'], vm['storageProfile']['osDisk']['name']))
                items_checked += 1
        else:
            items_flagged_list.append((vm['resourceGroup'], vm['name'], vm['storageProfile']['osDisk']['name']))
            items_checked += 1

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(virtual_machines)}
    metadata = {"finding_name": "os_disk_is_encrypted",
                "negative_name": "",
                "columns": ["Resource Group", "Name", "Disk Name"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def data_disks_are_encrypted_7_3(virtual_machines):
    items_flagged_list = []
    items_checked = 0
    for vm in virtual_machines:
        name = vm['name']
        resource_group = vm['resourceGroup']
#         encrypted = !az vm encryption show --name {name} --resource-group {resource_group} --query dataDisk
#         encrypted = yaml.load(encrypted.nlstr)
#         if encrypted != "Encrypted":
#             items_flagged_list.append((vm['resourceGroup'], vm['name']))
        for disk in vm['storageProfile']['dataDisks']:
            if disk['encryptionSettings'] == None:
                items_flagged_list.append((vm['resourceGroup'], vm['name'], disk['name']))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(virtual_machines)}
    metadata = {"finding_name": "data_disks_are_encrypted",
                "negative_name": "",
                "columns": ["Resource Group", "Name", "Disk Name"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}


def only_approved_extensions_are_installed_7_4(virtual_machines):
    # items in the following list do not imply failure, but require review
    items_flagged_list = []
    approved_extensions = [
        'AzureDiskEncryption',
        'IaaSAntimalware',
        'IaaSDiagnostics',
        'MicrosoftMonitoringAgent',
        'SqlIaaSAgent',
        'OmsAgentForLinux', 
        'VMAccessForLinux',
    ]
    for vm in virtual_machines:
        name = vm['name']
        resource_group = vm['resourceGroup']
        extensions = !az vm extension list --vm-name {name} --resource-group {resource_group}
        extensions = yaml.load(extensions.nlstr)
        for extension in extensions:
            if extension['virtualMachineExtensionType'] not in approved_extensions:
                items_flagged_list.append((resource_group, name, extension['virtualMachineExtensionType']))
    
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(virtual_machines)}
    metadata = {"finding_name": "only_approved_extensions_are_installed",
                "negative_name": "",
                "columns": ["Resource Group", "VM Name", "Extension Name"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def latest_patches_for_all_virtual_machines_are_applied_7_5(virtual_machines):
    pass

def endpoint_protection_for_all_virtual_machines_is_installed_7_6(virtual_machines):
    items_flagged_list = []
    accepted_protections = set(['EndpointSecurity', 'TrendMicroDSA', 'Antimalware', 'EndpointProtection','SCWPAgent', 'PortalProtectExtension', 'FileSecurity', 'IaaSAntimalware'])
    for vm in virtual_machines:
        name = vm['name']
        resource_group = vm['resourceGroup']
#         endpoint_protection = !az vm show --resource-group {resource_group} --name {name} -d
#         endpoint_protection = yaml.load(endpoint_protection.nlstr)
        extensions = !az vm extension list --vm-name {name} --resource-group {resource_group}
        extensions = yaml.load(extensions.nlstr)
        has_protection = False
        for extension in extensions:
            if set([extension['virtualMachineExtensionType']]).intersection(accepted_protections):
                has_protection = True
        if not has_protection:
            items_flagged_list.append((resource_group, name, extension.get('virtualMachineExtensionType', "No extension")))

    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': len(virtual_machines)}
    metadata = {"finding_name": "endpoint_protection_for_all_virtual_machines_is_installed",
                "negative_name": "",
                "columns": ["Resource Group", "Name", "Unapproved Extension"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}


def test_controls():
    results = {}
    virtual_machines = load_virtual_machines(virtual_machines_path)
    results['vm_agent_is_installed'] = vm_agent_is_installed_7_1(virtual_machines)
    results['os_disk_is_encrypted'] = os_disk_is_encrypted_7_2(virtual_machines)
    results['data_disks_are_encrypted'] = data_disks_are_encrypted_7_3(virtual_machines)
    results['only_approved_extensions_are_installed'] = only_approved_extensions_are_installed_7_4(virtual_machines)
    results['endpoint_protection_for_all_virtual_machines_is_installed'] = endpoint_protection_for_all_virtual_machines_is_installed_7_6(virtual_machines)
    
    with open(filtered_virtual_machines_path, 'w') as f:
        json.dump(results, f, indent=4, sort_keys=True)
    return results

In [None]:
get_data()

In [None]:
test_controls()

# Other Security Considerations


In [None]:
keyvaults_path = os.path.join(raw_data_dir, 'keyvaults.json')
keyvault_keys_and_secrets_metadata_path = os.path.join(raw_data_dir, 'keyvault_keys_and_secrets_metadata.json')
locked_resources_path = os.path.join(raw_data_dir, 'locked_resources.json')
other_security_considerations_filtered_path = os.path.join(filtered_data_dir, 'other_security_considerations_filtered.json')

def get_keyvaults(keyvaults_path):
    """
    @keyvaults_path: string - path to output json file
    @returns: list of virtual_machines dict
    """
    keyvaults = !az keyvault list
    keyvaults = yaml.load(keyvaults.nlstr)
    with open(keyvaults_path, 'w') as f:
        json.dump(keyvaults, f, indent=4, sort_keys=True)
    return keyvaults

def load_keyvaults(keyvaults_path):
    with open(keyvaults_path, 'r') as f:
        keyvaults = yaml.load(f)
    return keyvaults

def get_locked_resources():
    lock_list = !az lock list
    lock_list = yaml.load(lock_list.nlstr)

    with open(locked_resources_path, 'w') as f:
        json.dump(lock_list, f, indent=4, sort_keys=True)
    return lock_list

def load_locked_resources(locked_resources_path):
    with open(locked_resources_path, 'r') as f:
        locked_list = yaml.load(f)
    return locked_list

def get_keyvault_keys_and_secrets_metadata(keyvault_keys_and_secrets_metadata_path, keyvaults):
    metadata = {}
    for keyvault in keyvaults:        
        vault_name = keyvault['name']
        metadata[vault_name] = {}
        keys = !az keyvault key list --vault-name {vault_name}
        keys = yaml.load(keys.nlstr)
        metadata[vault_name]['keys'] = keys
        secrets = !az keyvault secret list --vault-name {vault_name}
        secrets = yaml.load(secrets.nlstr)
        metadata[vault_name]['secrets'] = secrets
    
    with open(keyvault_keys_and_secrets_metadata_path, 'w') as f:
        json.dump(metadata, f, indent=4, sort_keys=True)
    return metadata

def load_keyvault_keys_and_secrets_metadata(keyvault_keys_and_secrets_metadata_path):
    with open(keyvault_keys_and_secrets_metadata_path, 'r') as f:
        metadata = yaml.load(f)
    return metadata

def get_data():
    keyvaults = get_keyvaults(keyvaults_path)
    get_keyvault_keys_and_secrets_metadata(keyvault_keys_and_secrets_metadata_path, keyvaults)
    get_locked_resources()

MAX_EXPIRY_ROTATION_DAYS = 730
# 8.1 and 8.2
def expiry_date_is_set_on_all_keys_and_secrets(keyvault_keys_and_secrets_metadata):
    items_flagged_list = []
    items_checked = 0
    today = datetime.datetime.today()
    
    def get_key_or_secret_status(info):
        enabled = info['attributes']['enabled']
        created = datetime.datetime.strptime(info['attributes']['created'].split('T')[0], '%Y-%m-%d')
        expires = info['attributes']['expires']
        status = "ok"
        if expires:
            expires = datetime.datetime.strptime(expires.split('T')[0], '%Y-%m-%d')
            expiry_delta = expires - created
            if today > expires:
                satus = "expired"
            elif expiry_delta > datetime.timedelta(days=MAX_EXPIRY_ROTATION_DAYS):
                status = "exceeds max expiry days"
            # convert times back to a string for display
            expires = expires.strftime('%Y-%m-%d')
        else:
            status = "no expiry"
            expires = "None"
        created = created.strftime('%Y-%m-%d')            
            
        return status, created, expires
    
    for keyvault_name, metadata in keyvault_keys_and_secrets_metadata.items():
        for key_info in metadata['keys']:
            items_checked += 1
            key_name = key_info['kid'].split('/')[-1]
            status, created, expires = get_key_or_secret_status(key_info)
            if status != "ok":
                items_flagged_list.append((keyvault_name, key_name, "key", status, created, expires))
                
        for secret_info in metadata['secrets']:
            items_checked += 1
            secret_name = secret_info['id'].split('/')[-1]
            status, created, expires = get_key_or_secret_status(secret_info)                
            if status != "ok":
                items_flagged_list.append((keyvault_name, secret_name, "secret", status, created, expires))
    
    stats = {'items_flagged': len(items_flagged_list),
             'items_checked': items_checked}
    metadata = {"finding_name": "expiry_date_is_set_on_all_keys_and_secrets",
                "negative_name": "",
                "columns": ["KeyVault Name", "Name", "Type", "Status", "Created", "Expires"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}

critical_resources_list = []
def resource_locks_are_set_for_mission_critical_azure_resources_8_3(critical_resources_list, locked_resources):
    items_flagged_list = []
    if len(locked_resources) == 0 and len(critical_resources_list) == 0:
        stats = {'items_flagged': 1,
                 'items_checked': 1}
    else:
        {'items_flagged': len(critical_resources_list) - len(locked_resources),
         'items_checked': len(critical_resources_list)}
        items_flagged_list = critical_resources_list.intersection(locked_resources)
    metadata = {"finding_name": "resource_locks_are_set_for_mission_critical_Azure_resources",
                "negative_name": "",
                "columns": ["Resource Name"]}            
    return  {"items": items_flagged_list, "stats": stats, "metadata": metadata}

def test_controls():
    keyvault_keys_and_secrets_metadata = load_keyvault_keys_and_secrets_metadata(keyvault_keys_and_secrets_metadata_path)
    locked_resources = load_locked_resources(locked_resources_path)
    results = {}
    results['expiry_date_is_set_on_all_keys_and_secrets'] = expiry_date_is_set_on_all_keys_and_secrets(keyvault_keys_and_secrets_metadata)
    results['resource_locks_are_set_for_mission_critical_azure_resources'] = resource_locks_are_set_for_mission_critical_azure_resources_8_3(critical_resources_list, locked_resources)
    
    with open(other_security_considerations_filtered_path, 'w') as f:
        json.dump(results, f, indent=4, sort_keys=True)
    return results

In [None]:
get_data()

In [None]:
test_controls()

In [None]:
keyvaults = load_keyvaults(keyvaults_path)
keyvault_keys_and_secrets_metadata = get_keyvault_keys_and_secrets_metadata(keyvault_keys_and_secrets_metadata_path, keyvaults)
keyvault_keys_and_secrets_metadata = load_keyvault_keys_and_secrets_metadata(keyvault_keys_and_secrets_metadata_path)
keyvault_keys_and_secrets_metadata

In [None]:
#%%writefile render_utils.py
import functools
import yaml

cis_scanner_root = '/praetorian-tools/azure_cis_scanner/'
scans_base = os.path.expanduser('/engagements/tcb-edp/scans')

@functools.lru_cache(1, typed=False)
def get_dirs(directory):
    return [x for x in os.listdir(directory) if os.path.isdir(directory)]

# figure out better way to get base dir or let user select in UI
active_subscription_dir = get_dirs(scans_base)[0]
active_subscription_dir = subscription_dirname
#active_subscription_dir = 'Development-6ff7f744'

accounts = {}
with open(os.path.join(scans_base, 'accounts.json'), 'r') as f:
    accounts = yaml.load(f)

scans_root = os.path.join(scans_base, active_subscription_dir)

#APP_ROOT = os.path.dirname(os.path.abspath(__file__))
APP_ROOT = os.path.join(cis_scanner_root, 'report')
STATIC = os.path.join(APP_ROOT, 'static')

with open(os.path.expanduser(APP_ROOT + '/cis_structure.yaml'), 'r') as f:
    cis_structure = yaml.load(f)

def set_scans_root(subscription_dirname=active_subscription_dir):
    scans_root = os.path.join(scans_base, subscription_dirname)
    return scans_root

@functools.lru_cache(maxsize=32, typed=False)
def get_filtered_data_path(date=None, subscription_dirname=active_subscription_dir):
    """
    Get the filtered data root for the scan run on date=date or latest if date=None
    Returns path, date where date is the most recent date with data <= requested date
    
    Directory structure is
    <scans_root>/scans/<date>/<section_lowercase_underscores>.json
    """
    scans_root = set_scans_root(subscription_dirname)
    if date:
        if os.path.exists:
            return os.path.join(scans_root, 'scans', date, 'filtered'), date
        else:
            raise ValueError("Filtered data requested for {} but file does not exist at {}".format(
                date, os.path.join(scans_root, 'scans')))
    else:
        dir_list = get_dirs(scans_root)
        if len(dir_list) == 0:
            print("No data found in {}.  Please run scanner first".format(scans_root))
        else:
            date = sorted(dir_list)[0]
            return os.path.join(scans_root, 'scans', date, 'filtered'), date

@functools.lru_cache(maxsize=32, typed=False)
def get_filtered_data(date=None, subscription_dirname=active_subscription_dir):
    """
    Returns a dict of filtered data for a specific date or latest (default)
    
    If a section is missing it will not be returned.
    The structure is {"Identity and Access Management": {"finding1": results_dict1}, "finding2": results_dict2}
    where results_dict has keys stats, metadata, items, date - where date is actual date where data was found
    """
    subscription_dirname = set_scans_root(subscription_dirname)
    filtered_data = {}
    for section_name in cis_structure['section_ordering']:
        section_data = get_filtered_data_by_section(section_name, subscription_dirname=subscription_dirname)
        filtered_data[section_name] = section_data
    return filtered_data

@functools.lru_cache(maxsize=128, typed=False)
def get_filtered_data_by_section(section_name, date=None, subscription_dirname=active_subscription_dir):
    """
    Get the latest data for a section returning first found <= date
    @params sectoin_name: Name of CIS section as a string e.g. ("Identity and Access Management")
    @params date: date in format 'YYYY-M-D', i.e. strftime("%Y-%m-%d")
    @returns filtered data, date
    """
    # get date folders, most to least recent
    scans_root = set_scans_root(subscription_dirname)
    dir_list = reversed(sorted(get_dirs(scans_root)))
    section_name_file = '_'.join(map(str.lower, section_name.split(' '))) + '_filtered.json'
    for dir_date in dir_list:
        if date and (dir_date > date):
            continue
        filtered_data_path = os.path.join(scans_root, dir_date, 'filtered', section_name_file)
        if os.path.exists(filtered_data_path):
            with open(filtered_data_path, 'r') as f:
                data = yaml.load(f)
                data['date'] = dir_date
                return data
    else:
        return None


@functools.lru_cache(maxsize=1, typed=False)
def get_latest_filtered_data(date=None, subscription_dirname=active_subscription_dir):
    """
    Returns a dict as in get_filtered_data, but if a section is missing, it will search
    back in time for a date where the section does exist.
    """
    scans_root = set_scans_root(subscription_dirname)
    data = get_filtered_data(date)
    if not data:
        return None
    else:
        for section_name in cis_structure['section_ordering']:
            if section_name not in data:
                section_data = get_filtered_data_by_name(section_name, date, subscription_dirname=subsription_dirname)
                if section_data:
                    data[section_name] = section_data
    return data

@functools.lru_cache(maxsize=1, typed=False)
def get_stats(subscription_dirname=active_subscription_dir):
    scans_root = set_scans_root(subscription_dirname)
    stats = {}
    dir_list = sorted(get_dirs(scans_root))
    for section_name in cis_structure['section_ordering']:
        stats[section_name] = {}
        section_name_file = '_'.join(map(str.lower, section_name.split(' '))) + '_filtered.json'
        for dir_date in dir_list:
            filtered_data_path = os.path.join(scans_root, dir_date, 'filtered', section_name_file)
            if os.path.exists(filtered_data_path):
                with open(filtered_data_path, 'r') as f:
                    data = yaml.load(f)
                for finding_name, finding_data in data.items():
                    if not finding_name in stats[section_name]:
                        stats[section_name][finding_name] = {}
                    stats[section_name][finding_name][dir_date] = finding_data['stats']
    return stats

@functools.lru_cache(maxsize=1, typed=False)
def get_latest_stats(subscription_dirname=active_subscription_dir):
    scans_root = set_scans_root(subscription_dirname)
    latest_stats = {}
    stats = get_stats()
    for section_name in stats:
        latest_stats[section_name] = {}
        for finding_name in stats[section_name]:
            date = max(stats[section_name][finding_name])
            latest_stats[section_name][finding_name] = {"date": date, **stats[section_name][finding_name][date]}

    return latest_stats

def get_finding_name(finding_name, subsection_name):
    """
    Get finding name from CIS_TOC using over-ride (finding_name) but defaulting to parsed subsection_name
    """
    if finding_name:
        return finding_name
    else:
        return underscore_name(subsection_name)

def underscore_name(subsection_name):
    return '_'.join(map(lambda x: x.lower(), subsection_name.split(' ')))

def title_except(string):
    articles = ['a', 'an', 'of', 'the', 'is', 'not', 'for', 'if']
    word_list = re.split(' ', string)       # re.split behaves as expected
    final = [word_list[0].capitalize()]
    for word in word_list[1:]:
        final.append(word if word in articles else word.capitalize())
    return " ".join(final)

def get_finding_index(findings_list, finding):
    for finding_entry in findings_list:
        if finding_entry['subsection_name'] == finding:
            return finding_entry
    raise ValueError("finding {} not found in {}".format(finding, findings_list))
    
# def get_summary_stats(section=None):
# 	"""
# 	Return the sum over impacted_items in all findings in a section

# 	If section == None, sum over sections
# 	"""
# 	stats = get_stats()
# 	if section:

# 	else:
# 		for section in stats:





In [None]:
from pprint import pprint

def find_finding_entry(section_toc, finding_underscore_name):
    for finding_entry in section_toc:
        if finding_underscore_name == get_finding_name(finding_entry['finding_name'], finding_entry['subsection_name']):
            return finding_entry
    return None

def findings_summary(latex=False, subscription_dirname=subscription_dirname):
    print('1', subscription_dirname)
    data = get_filtered_data(subscription_dirname=subscription_dirname)
    for section_name, section_findings in data.items():
        if section_findings:
            for finding_name, finding in section_findings.items():
                if finding_name == 'date':
                    continue
                try:
                    section_toc = cis_structure['TOC'][section_name]
                    finding_entry = find_finding_entry(section_toc, finding_name)
                    if finding_entry:
                        title = finding_entry['subsection_name']
                    else:
                        title = ' '.join(finding_name.split('_'))
                    if finding.get('stats') and (finding['stats']['items_flagged'] > 0):
                        print("{}: {} - {} of {} failed".format(section_name, finding_name, finding['stats']['items_flagged'], finding['stats']['items_checked']))
                        if latex:
                            render_latex(finding['items'], finding['metadata']['columns'], title_except(title))
                        else:
                            pprint(finding['items'])
                        print('\n')
                    if finding_name in ['security_contact_phone_number_is_set', 'activity_log_alert_is_configured']:
                        print(finding['items'])
                except Exception as e:
                    print("           finding", finding.keys())
                    print(e)
                    return

In [None]:
subscription_dirname

In [None]:
findings_summary(latex=True, subscription_dirname=subscription_dirname)


In [None]:
subscription_dirname

In [None]:
findings_summary(latex=True, subscription_dirname='Development-6ff7f744')


In [None]:
import json
import os
import yaml
    
def clean_latex(tuple_entry):
    """
    Filter/escape problematic characters
    Our minerva pdf generator chokes on '_', '*', ...
    and possibly other things.
    """
    def _clean_latex(tuple_entry_string):
        processed = False
        for symbol in ['_', '*']:
            if symbol in tuple_entry_string:
                tuple_entry_string = tuple_entry_string.replace(symbol, '\\' + symbol)
                processed = True
        if processed:
            return '\\texttt{' + tuple_entry_string + '}'
        else:
            return tuple_entry_string

    return _clean_latex(str(tuple_entry))
    
    
def render_latex(resource_tuples, header, title):
    header = list(map(clean_latex, header))
    title = clean_latex(title)
    
    render_table_start(header, title)
    num_columns = len(header)
    if num_columns > 1:
        line = ' & '.join(['{}']*num_columns)
    else:
        line = '{}'
    line = line + ' \\\\'
    for resource_tuple in resource_tuples:
        if type(resource_tuple) == str:
            resource_tuple = (resource_tuple,)
        print('    ' + line.format(*map(clean_latex, resource_tuple)))
    render_table_end(header)
    
def render_table_start(header, title):
    """
    Render latex table suitable for minerva rendering
    
            
    If the elements in the table are very long you can correct spacing with invisible text like this:
    {\color[HTML]{FFFFFF} {}} & {\color[HTML]{333333}{spacing}}{\color[HTML]{FFFFFF} {}}{\color[HTML]{333333}{spacing_2}} & {\color[HTML]{333333}{spacing_3}}{\color[HTML]{FFFFFF}{}} \\

    TODO: Paginate pages to break nicely over many pages.
    
    Some exceptions used to manually fix for specific cases:
    
    Text wrap some long arrays based on answer by zyy on
    https://tex.stackexchange.com/questions/54069/table-with-text-wrapping
    
    GROUPS column had variable length and this worked.
    \begin{tabular}{|l|>{\centering\arraybackslash}m{10cm}|}
    \hline
    \multicolumn{2}{|c|}{IAM Users without MFA} \\
    \rowcolor[HTML]{333333}
    {\color[HTML]{FFFFFF}USER NAME} & {\color[HTML]{FFFFFF}GROUPS} \\
    """
    num_columns = len(header)
    entries = ['\color[HTML]{FFFFFF}' + '{}'.format(clean_latex(x)) for x in header]
    if num_columns > 1: 
        line = '} & {'.join(entries)
    else:
        line = entries[0]
    columns_format = '{|' + '|'.join(['l']*num_columns) + '|}'
    print('\\begin{tabular}' + '{}'.format(columns_format) + '\n'
         '    ' + '\\hline\n' +
         '    ' + '\\multicolumn{' + str(num_columns) + '}' +
        '{|c|}' + '{' + title + '}' +  ' \\\\\n' +
        '    ' + '\\rowcolor[HTML]{333333}\n' +
        '    ' + '{' + line + '}' + ' \\\\' 
    )
    

def render_table_end(header):
    """
    Render table end
    """
    print('    \\hline\n' +
          '\\end{tabular}\n')

In [None]:
!az monitor diagnostic-settings list --resource 

In [None]:
header = ["KeyVault", "Name", "Type", "Status", "Created", "Expires"]
render_latex(expiry_dates, header, "Expiry Date is Set on All Keys and Secrets")

In [None]:
for key in sql_results.keys():
    print(key.split('_')[-1], key, len(sql_results[key]))
    for resource_group, server_name, db_name in sql_results[key]:
        print('{resource_group} & {server_name} & {db_name} \\\\'.format(resource_group=resource_group,
             server_name=server_name,
             db_name=db_name)) 

In [None]:
def build_results(results, impacted_system_dict):
    """
    Wraps functions that return a tuple (passed(bool), impacted_closure(OrderedDict), result(json object - dict, list, ...)
    @results: object reperesenting results dict of {func.__name__'s: [(impacted_system_dict + impacted_closure, result)]
              impacted_system_tyuple is know prior to query
              impacted_closure is the final identifying component(s) such as {db: db1} when (resource_group, sql_server) are given
                  $ az sql db list --resource-group $resource_group --server $server_name

    @impacted_systems_dict: OrderedDict([(key1, val1), (key2, val2),]) identifying (possibly) failing resource
                 eg. {resource_group: rg-val, server: server-val} for sql tests
                 or {resource_group: rg-val, storage_account: storage-val, disk: disk-val) for storage
                 impacted_system_tuple is often required in the query:
    @returns: (bool) passed, (OrderedDict) impacted_system, (json object) result
              where impacted_system = OrderedDict(list(impacted_system_dict.items()) + list(impacted_closure.items())) 
    """
    def decorate(func):
        def call(*args, **kwargs):
            # *impacted_closure matches anything between first and last in func's return tuple if it exists
            passed, *impacted_closure, result = func(*args, **kwargs)
            impacted_system = OrderedDict(list(impacted_system_dict.items()) + list(impacted_closure.items()))
            return apply_gen_findings(func, passed, impacted_system, result, results, *args, **kwargs)
        return call
    return decorate

def apply_gen_results(func, passed, impacted_system, result, results, *args, **kwargs):
    """
    @func: function to wrap
    @passed: bool - True of passed, False if failed
    @impacted_system: OrderedDict(list(impacted_system_dict.items()) + list(impacted_closure.items())) 
    @result: object with important output data from test
    @results: dict with keys func.__name__ and values list of dicts with keys impacted_system_tuple, value data
    """
    if not passed:
        pairs = results.get(func.__name__, [])
        pairs.append((impacted_system, result))
        results[func.__name__] = pairs
    return passed, impacted_system, results

In [None]:
" ".join("expiry_date_is_set_on_all_keys_and_secrets".split('_'))

In [None]:
pd.D