Install semantic-link package for sempy usage. This step can be skipped, if semantic-link has been installed to the attached environment

In [None]:
pip install semantic-link

Import of required python packages

In [None]:
import pandas as pd
import sempy.fabric as fabric
import json
from pyspark.sql.types import *
import datetime
import time

Configuration of parameters:
- **workspaces_per_request**: This variable determines, how many workspaces will be requested in a single call. Currently a maximum of 100 workspaces can be requested in a single call
- **max_parallel_requests**: This variable determines, how many concurrent requests can be done towards the scanner API. Currently there is a maximum of 16 parallel requests
- **write_to_files**: If true, the JSON results will be written to the files of the lakehouse

In [None]:
workspaces_per_request = 100
max_parallel_requests = 16
write_to_files = True

In [None]:
#Instantiate the client
client = fabric.FabricRestClient()
current_time = datetime.datetime.now()

Get capacity data

Group the total number of workspaces into packages of 100 (workspaces_per_request)

In [None]:
response = client.get(f"/v1.0/myorg/admin/workspaces/modified?excludePersonalWorkspaces=True&excludeInActiveWorkspaces=True")
modified_workspaces = pd.json_normalize(response.json())
modified_workspaces["index"] = pd.to_numeric(modified_workspaces.reset_index()["index"])
modified_workspaces["run"] = modified_workspaces["index"] // workspaces_per_request
modified_workspaces = modified_workspaces.groupby('run')['id'].apply(list)
df_runs = pd.DataFrame(data = modified_workspaces)
df_runs["status"] = "Not Started"

Use getInfo API to request generation of meta data for all workspaces, making sure maximal 16 (workspaces_per_request) requests are running in parallel

In [None]:
df_runs_current = df_runs[df_runs["status"].isin(["Not Started", "Request sent", "Running"])].head(max_parallel_requests)

while df_runs_current.shape[0] > 0:
    time.sleep(5)
    for i, row in df_runs_current.iterrows():
        if row["status"] == "Not Started":
            payload = {}
            payload["workspaces"] = row["id"]
            response = client.post("/v1.0/myorg/admin/workspaces/getInfo?getArtifactUsers=True", json = payload)
            id = pd.json_normalize(response.json())["id"][0]
            df_runs.loc[i, "status"] = "Request sent"
            df_runs.loc[i, "run_id"] = id
        elif row["status"] in [ "Request sent", "Running"]:
            response = client.get("/v1.0/myorg/admin/workspaces/scanStatus/" + row["run_id"])
            stat = pd.json_normalize(response.json())["status"][0]
            df_runs.loc[i, "status"] = stat
    df_runs_current = df_runs[df_runs["status"].isin(["Not Started", "Request sent", "Running"])].head(max_parallel_requests)

Get scanner api results

In [None]:
results = []
for i, row in df_runs.iterrows():
    if row["status"] == "Succeeded":
            response = client.get(f"/v1.0/myorg/admin/workspaces/scanResult/" + row["run_id"])
            print("/v1.0/myorg/admin/workspaces/scanResult/" + row["run_id"])
            results.append(response.json())
            if write_to_files:
                folder_path = mssparkutils.fs.getMountPath('/default') + "/Files/Metadata_Requests/" + current_time.strftime("%Y-%m-%d") + "/" +  current_time.strftime("%H-%M-%S") + "/"
                mssparkutils.fs.mkdirs(f"file://" +folder_path)
                with open(folder_path + row["run_id"] +".json", "w") as f:
                    f.write(json.dumps(response.json()))
                    
        

Function to parse result json into needed tables

In [None]:
def get_details( df, parent_id, col, **kwargs ):
    rename_id = kwargs.get('rename_id' , None)
    df_res = df[[parent_id, col]].explode(col, ignore_index = True)
    df_res = df_res[[parent_id]].join(pd.json_normalize(df_res[col]))
    if not(rename_id is None):
        df_res = df_res.dropna(subset=['id']).rename(columns = {'id' : rename_id})
    return df_res

Parse the information from the result into data frames which can be written to lakehouse in later step. In case some object types are not existing in Fabric tenant, this might fail. In this case, just comment out the lines which fail

In [None]:
capacities_response = client.get(f"/v1.0/myorg/capacities")
df_capacities = pd.json_normalize(pd.json_normalize(capacities_response.json()).explode("value")["value"])

df_json = pd.json_normalize(pd.json_normalize(results).explode("workspaces")["workspaces"])
df_json = df_json.rename(columns = {'id' : 'workspace_id'})

df_workspaces = df_json[["workspace_id", "name", "type", "state", "isOnDedicatedCapacity", "capacityId", "defaultDatasetStorageFormat"]]
df_workspace_users = get_details(df_json, "workspace_id", "users")

df_reports = get_details(df_json, "workspace_id" , "reports", rename_id = "report_id")
df_report_users = get_details( df_reports, "report_id", "users")

df_datasets = get_details(df_json, "workspace_id" , "datasets",rename_id =  "dataset_id")
df_dataset_users = get_details( df_datasets, "dataset_id", "users")

df_dashboards =  get_details( df_json, "workspace_id", "dashboards",rename_id =  "dashboard_id")
df_dashboard_users = get_details( df_dashboards, "dashboard_id", "users")

df_lakehouses =  get_details( df_json, "workspace_id", "Lakehouse", rename_id = "lakehouse_id")
df_lakehouse_users = get_details( df_lakehouses, "lakehouse_id", "users")

df_warehouses =  get_details( df_json, "workspace_id", "warehouses", rename_id = "warehouse_id")
df_warehouse_users = get_details( df_warehouses, "warehouse_id", "users")

df_eventstreams =  get_details( df_json, "workspace_id", "Eventstream" ,rename_id =  "eventstream_id")
df_eventstream_users = get_details( df_eventstreams, "eventstream_id", "users")

df_datapipelines =  get_details( df_json, "workspace_id", "DataPipeline",rename_id =  "datapipeline_id")
df_datapipeline_users = get_details( df_datapipelines, "datapipeline_id", "users")

df_notebooks =  get_details( df_json, "workspace_id", "Notebook" ,rename_id =  "notebook_id")
df_notebook_users = get_details( df_notebooks, "notebook_id", "users")

Function to dynamically write different data frames to lakehouse. Data will be overwritten in Lakehouse

In [None]:
def write_data( df, table_name):
    df["ts_load"] = current_time
    df = spark.createDataFrame(df)
    columns_to_drop = []
    for it in df.dtypes:
        dtype = it[1]
        if dtype == "array<void>":
            columns_to_drop.append(it[0])

    df = df.drop(*columns_to_drop )
    df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").save("Tables/" + table_name)

Apply function to different data frames in order to write them to Delta format

In [None]:
write_list = [
{"df": df_datasets , "name" : "datasets"},
{"df": df_workspaces , "name" : "workspaces"},
{"df": df_reports , "name" : "reports"},
{"df": df_report_users , "name" : "report_users"},
{"df": df_workspace_users , "name" : "workspace_users"},
{"df": df_dataset_users , "name" : "dataset_users"},
{"df": df_capacities , "name" : "capacities"},
{"df": df_lakehouses , "name" : "lakehouses"},
{"df": df_lakehouse_users , "name" : "lakehouses_users"},
{"df": df_dashboards , "name" : "dashboards"},
{"df": df_dashboard_users , "name" : "dashboard_users"},
{"df": df_warehouses , "name" : "warehouses"},
{"df": df_warehouse_users , "name" : "warehouse_users"},
{"df": df_eventstreams , "name" : "eventstreams"},
{"df": df_eventstream_users, "name" : "eventstream_users"},
{"df": df_datapipelines , "name" : "datapipelines"},
{"df": df_datapipeline_users , "name" : "datapipeline_users"},
{"df": df_notebooks , "name" : "notebooks"},
{"df": df_notebook_users , "name" : "notebook_users"}
]

for it in write_list:
    write_data(it["df"], it["name"])