### Set Variables and Import Modules for use throughout Notebook

In [5]:
import requests
import json


resourceGraphURL = 'https://management.azure.com/providers/Microsoft.ResourceGraph/resources?api-version=2020-04-01-preview'

resourceGraphQuery = """
securityresources
| where type == 'microsoft.security/assessments'
| where properties.metadata.severity == 'High'
| where properties.metadata.implementationEffort == 'Low'
| summarize ThreatCount=count() by Threats=tostring(properties.metadata.threats) 	
"""

saveDestination = "abfss://synapse@zmdeploytestsa20230808.dfs.core.windows.net/rawjson/resourcesapifromspark/"

StatementMeta(sparkXXS, 3, 6, Finished, Available)

### Create the POST call to the REST API 

In [6]:
def callAPI(skipToken = ""):
    token = mssparkutils.credentials.getToken('AzureManagement')
    auth_headers = {'Authorization': 'Bearer ' + token}

    if skipToken != "":
        data = {"query": resourceGraphQuery,"options":{"$skipToken": skipToken} }
    else:
        data = {"query": resourceGraphQuery}

    resourceGraphResponse = requests.post(resourceGraphURL,headers=auth_headers,json=data)

    responseJSON = resourceGraphResponse.text

    return responseJSON

StatementMeta(sparkXXS, 3, 7, Finished, Available)

### Create a Python Function to parse API Response into a usable Data Frame

In [7]:
def parseResponse(responseJSONText):
    # Parallelize JSON string using Spark Context into a Pyspark Data Frame
    responseDF=spark.read.json(sc.parallelize([responseJSONText]))

    # Create a temp view so data can be transformed using SQL
    responseDF.createOrReplaceTempView("v_tempJSON")

    # SQL Query used to parse the responseDF
    sqlQuery = """
    WITH baseExplode AS
    (
    SELECT explode(data.rows) rowData
    FROM v_tempJSON
    )
    SELECT  rowData[0] ThreatName
            ,rowData[1] ThreatCount
    FROM baseExplode
    """

    # Parse response using Spark SQL
    dfParsedResponse = spark.sql(sqlQuery)

    return dfParsedResponse

StatementMeta(sparkXXS, 3, 8, Finished, Available)

### Run initial process to extract data from REST

In [8]:
# Save Response Text from API to a variable
responseText = callAPI()

# Parse Columns in Response Text to a Data Frame
dfParsedResponse = parseResponse(responseText)

# Write the Data Frame to Parquet files on storage.
dfParsedResponse.write.mode("overwrite").parquet(saveDestination)

StatementMeta(sparkXXS, 3, 9, Finished, Available)

### Check if API returned a Skip Token and run until it does not

In [9]:
# Format Response Text to JSON
responseJSON = json.loads(responseText)

# Evaluate whether response contains a Skip Token
while "$skipToken" in responseJSON:
    # Set Skip Token Variable
    skiptoken = responseJSON["$skipToken"]

    # Call the API using the Skip Token option
    responseText = callAPI(skiptoken)

    # Set a new response variable to be evaluated in while loop
    responseJSON = json.loads(responseText)

    # Create a new data frame from new API Response
    dfParsedResponse = parseResponse(responseText)

    # Append Parquet table created in previous steps.
    dfParsedResponse.write.mode("append").parquet(saveDestination)

StatementMeta(sparkXXS, 3, 10, Finished, Available)

### __OPTIONAL__: Create Table in Lake Database to make for easy Power BI integration

In [10]:
# Create query variable (I like to format as multi-line, hence the triple quotes)
createTableStatement = f"""
CREATE TABLE IF NOT EXISTS resourceGraphLakeDb.threatSummary
USING PARQUET
LOCATION '{saveDestination}'
"""

# Create database and table if they do not exist.
spark.sql("CREATE DATABASE IF NOT EXISTS resourceGraphLakeDb")
spark.sql(createTableStatement)

StatementMeta(sparkXXS, 3, 11, Finished, Available)

DataFrame[]