In [12]:
# Install/Upgrade purviewcli package
# %pip install --upgrade purviewcli

In [13]:
# Environment Variables
%env PURVIEW_NAME=YOUR_PURVIEW_ACCOUNT_NAME
%env AZURE_CLIENT_ID=YOUR_CLIENT_ID
%env AZURE_TENANT_ID=YOUR_TENANT_ID
%env AZURE_CLIENT_SECRET=YOUR_CLIENT_SECRET

env: PURVIEW_NAME=pvdemoxv2hu-pv


In [14]:
# Helper Methods
import json
def getJSON(raw_output):
    output = ''.join(raw_output)
    json_obj = json.loads(output)
    return json_obj

import csv
def export(table):
    with open('output.csv', 'w',newline='', encoding="utf-8") as output:
        csv_writer = csv.writer(output)
        csv_writer.writerows(table)

In [15]:
# 1. Get Sources
print('[INFO] Retrieving Sources...')
data = !pv scan readDataSources
sources = getJSON(data)

filtered_sources = []
counter = 0
for source in sources['value']:
    if source['kind'] != 'Collection':
        counter += 1
        datasource = source['name']
        filtered_sources.append(datasource)
        print('{0}. [{1}] {2}'.format(counter, source['kind'], datasource))
print('[INFO] Complete!')

[INFO] Retrieving Sources...
1. [AzureSqlDatabase] AzureSqlDatabase
2. [AdlsGen2] AzureDataLakeStorage
[INFO] Complete!


In [16]:
# 2. Get Scans
scans = {}
number_of_sources = len(filtered_sources)
counter = 0

print('[INFO] Retrieving Scans...')
for source in filtered_sources:
    counter += 1
    data2 = !pv scan readScans --dataSourceName {source}
    scan = getJSON(data2)
    print('{0} of {1} (Source: {2}; Scans: {3})'.format(counter, number_of_sources, source, len(scan['value'])))
    scans[source] = scan
print('[INFO] Complete!')

[INFO] Retrieving Scans...
1 of 2 (Source: AzureSqlDatabase; Scans: 1)
2 of 2 (Source: AzureDataLakeStorage; Scans: 2)
[INFO] Complete!


In [17]:
# 3. Get Scan History
results = []
headers = ["assetsClassified","assetsDiscovered","dataSourceType","endTime","error","errorMessage","id","ingestionJobId","parentId","pipelineStartTime","queuedTime","resourceId","runType","scanLevelType","scanRulesetType","scanRulesetVersion","startTime","status","webScanResults","source","scanName"]
results.append(headers)
counter = 0

print('[INFO] Retrieving Scan History...')
for datasource in scans:
    counter += 1
    print('[{0} of {1}] Datasource {2} has {3} scans.'.format(
        counter,
        number_of_sources,
        datasource,
        len(scans[datasource]['value'])
        )
    )
    for scan in scans[datasource]['value']:
        print(' - Getting Scan History for Scan: {0}'.format(scan['name']))
        history = !pv scan readScanHistory --dataSourceName {datasource} --scanName {scan['name']}
        history = getJSON(history)
        for record in history['value']:
            record.pop('diagnostics')
            values = list(record.values())
            values.append(datasource)
            values.append(scan['name'])
            results.append(values) 
print('[INFO] Complete!')       

[INFO] Retrieving Scan History...
[1 of 2] Datasource AzureSqlDatabase has 1 scans.
 - Getting Scan History for Scan: Scan-qZl
[2 of 2] Datasource AzureDataLakeStorage has 2 scans.
 - Getting Scan History for Scan: Scan-2iv
 - Getting Scan History for Scan: Scan-FLr
[INFO] Complete!


In [19]:
# 4. Export to CSV
export(results)
print('[INFO] Scan history has been exported to output.csv')

[INFO] Scan history has been exported to output.csv
