In [1]:
def list_fabric_workspaces(token):
    """
    Calls Fabric REST: GET https://api.fabric.microsoft.com/v1/workspaces
    Handles simple pagination using 'continuationToken' if returned.
    Returns: list of workspace dicts.
    """
    # token = acquire_token_device_code(FABRIC_SCOPE)
    headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}

    base_url = "https://api.fabric.microsoft.com/v1/workspaces"
    workspaces = []


    # Initial page
    url = base_url
    while True:
        resp = requests.get(url, headers=headers, timeout=60)
        if resp.status_code == 429:
            # Optional: respect Retry-After for throttling
            retry_after = int(resp.headers.get("Retry-After", "2"))
            import time; time.sleep(retry_after)
            continue
        resp.raise_for_status()
        payload = resp.json() if resp.headers.get("Content-Type","").startswith("application/json") else {}
        page = payload.get("value", [])
        workspaces.extend(page)

        # Handle server-side pagination if continuation token is present
        cont = payload.get("continuationToken") or payload.get("@odata.nextLink")  # depending on API shape
        if cont:
            # If continuation is a token, append as query param; if it's a URL, use it directly
            if isinstance(cont, str) and cont.startswith("http"):
                url = cont
            else:
                # Some Fabric endpoints return a 'continuationToken' that must be passed as a query param
                url = f"{base_url}?continuationToken={cont}"
        else:
            break

    return workspaces



StatementMeta(, 4f808cfa-6cec-4877-96fb-624ee00d8d43, 3, Finished, Available, Finished)

In [2]:
def start_scan(workspace_ids, lineage=True, datasourceDetails=True,
               datasetSchema=False, datasetExpressions=False, getArtifactUsers=False):
    """
    workspace_ids: list of workspace GUID strings (1..100 per call)
    returns scan request object: {id, createdDateTime, status}
    """
    if not (1 <= len(workspace_ids) <= 100):
        raise ValueError("workspace_ids must contain 1..100 IDs per scan request")

    params = {
        "lineage": str(lineage).lower(),
        "datasourceDetails": str(datasourceDetails).lower(),
        "datasetSchema": str(datasetSchema).lower(),
        "datasetExpressions": str(datasetExpressions).lower(),
        "getArtifactUsers": str(getArtifactUsers).lower(),
    }

    url = f"{BASE}/admin/workspaces/getInfo"
    resp = requests.post(url, headers=headers, params=params,
                         json={"workspaces": workspace_ids})
    if resp.status_code != 202:
        raise RuntimeError(f"getInfo failed {resp.status_code}: {resp.text}")
    return resp.json()

# Example: single workspace
#print(workspace_ids)
#workspace_id = "59453e5f-1b41-4ca9-a493-c72eda9e5a50"
#scan_req = start_scan([workspace_ids], lineage=True, datasourceDetails=True)
#scan_id = scan_req["id"]
#scan_req


StatementMeta(, 4f808cfa-6cec-4877-96fb-624ee00d8d43, 4, Finished, Available, Finished)

In [3]:
def get_scan_status(scan_id=""):
    url = f"{BASE}/admin/workspaces/scanStatus/{scan_id}"
    resp = requests.get(url, headers=headers)
    if resp.status_code != 200:
        raise RuntimeError(f"scanStatus failed {resp.status_code}: {resp.text}")
    return resp.json()

# poll
#while True:
#    st = get_scan_status(scan_id)
#    status = st.get("status")
#    print("scanStatus =", status)
#    if status in ("Succeeded", "Failed"):
#        break
#    time.sleep(3)

#if status != "Succeeded":
#    raise RuntimeError(f"Scan failed: {st}")

StatementMeta(, 4f808cfa-6cec-4877-96fb-624ee00d8d43, 5, Finished, Available, Finished)

In [4]:
def get_scan_result(scan_id):
    url = f"{BASE}/admin/workspaces/scanResult/{scan_id}"
    resp = requests.get(url, headers=headers)
    if resp.status_code != 200:
        raise RuntimeError(f"scanResult failed {resp.status_code}: {resp.text}")
    return resp.json()

#scan_result = get_scan_result(scan_id)
#scan_result.keys()

StatementMeta(, 4f808cfa-6cec-4877-96fb-624ee00d8d43, 6, Finished, Available, Finished)

In [5]:
def save_scan_result_as_delta_table_dt(
    scan_id: str,
    scan_result: dict,
    run_dt,  # <-- datetime you pass in (same for workspaces + scans)
    table_name: str = "pbi_scan_results"
):
    # 1) Persist raw JSON too (optional but handy for debugging/audit)
    from pyspark.sql import Row
    from pyspark.sql.types import StructType, StructField, StringType
    import json

    # normalize to a stable string once (UTC recommended)
    run_dt_str = run_dt.isoformat()

    # (optional) include run timestamp in file name for easy correlation
    raw_path = f"Files/pbi_scans/raw/scanResult_{scan_id}_{run_dt_str}.json"

    mssparkutils.fs.mkdirs("Files/delta_tables/raw")
    mssparkutils.fs.put(raw_path, json.dumps(scan_result, ensure_ascii=False), overwrite=True)  # [1](https://microsoft-my.sharepoint.com/personal/noelleli_microsoft_com/Documents/Recordings/[US]%20Fabric%20Data%20Factory%20Feature%20Bug%20Bash-20260219_170322-Meeting%20Recording.mp4?web=1)

    # 2) Create a minimal “raw JSON” delta table (schema-stable)
    payload_str = json.dumps(scan_result, ensure_ascii=False)

    df = spark.createDataFrame(
        [Row(scan_id=scan_id, run_dt=run_dt_str, payload_json=payload_str)],
        schema=StructType([
            StructField("scan_id", StringType(), False),
            StructField("run_dt", StringType(), False),     # <-- added
            StructField("payload_json", StringType(), False),
        ])
    )

    (df.write
       .format("delta")
       .mode("append")
       .saveAsTable(table_name))

    return raw_path, table_name

StatementMeta(, 4f808cfa-6cec-4877-96fb-624ee00d8d43, 7, Finished, Available, Finished)

In [6]:
# [2026-02-19 08:17 CST]
import json
from datetime import datetime, timezone

from pyspark.sql import Row
from pyspark.sql.functions import col, explode, lit, from_json
from pyspark.sql.types import (
    StructType, StructField, StringType, BooleanType, ArrayType, TimestampType
)

def persist_scan_to_graph_tables(
    scan_id: str,
    scan_result: dict,
    run_dt: datetime | None = None,
    mode: str = "overwrite",
    raw_table: str = "pbi_scan_results_raw",
    node_workspaces: str = "pbi_graph_workspaces",
    node_artifacts: str = "pbi_graph_artifacts",
    node_datasets: str = "pbi_graph_datasets",
    node_capacities: str = "pbi_graph_capacities",
    edge_ws_contains: str = "pbi_graph_edge_workspace_contains_artifact",
    edge_report_uses_dataset: str = "pbi_graph_edge_report_uses_dataset",
    edge_depends_on: str = "pbi_graph_edge_artifact_depends_on_artifact",
    edge_ws_on_capacity: str = "pbi_graph_edge_workspace_on_capacity",
):
    """
    Persist Power BI/Fabric Scanner API GetScanResult payload into Delta tables suitable for Fabric Graph modeling.

    IMPORTANT:
    - Pin your non-schema 'lineageGraph' lakehouse as the notebook default so saveAsTable lands in the right place. [3](https://microsoft-my.sharepoint.com/personal/gracegong_microsoft_com/_layouts/15/Doc.aspx?sourcedoc=%7B5A2A58F0-C231-41C5-8CD7-91AFD736BA2F%7D&file=Fabric%20Data%20Pipelines%20in%20Power%20BI%20%E2%80%93%20Advanced%20Semantic%20Model%20Refresh%20Customer%20Webcasts%20Summary%20Notes.docx&action=default&mobileredirect=true&DefaultItemOpen=1)
    - Graph doesn't support schema-enabled lakehouses today; use a non-schema lakehouse for these tables. [1](https://learn.microsoft.com/en-us/fabric/graph/tutorial-load-data)[2](https://learn.microsoft.com/en-us/fabric/graph/limitations)
    """

    if run_dt is None:
        run_dt = datetime.now(timezone.utc)

    payload_json = json.dumps(scan_result, ensure_ascii=False)

    # 1) Raw payload table (audit/debug/replay)
    raw_schema = StructType([
        StructField("scan_id", StringType(), False),
        StructField("run_dt", TimestampType(), False),
        StructField("payload_json", StringType(), False),
    ])

    raw_df = spark.createDataFrame(
        [Row(scan_id=scan_id, run_dt=run_dt, payload_json=payload_json)],
        schema=raw_schema
    )

    raw_df.write.format("delta").mode("append").saveAsTable(raw_table)

    # 2) Parse only what we need from the payload (resilient, optional fields)
    relation_schema = StructType([
        StructField("dependentOnArtifactId", StringType(), True),
        StructField("workspaceId", StringType(), True),
        StructField("relationType", StringType(), True),
        StructField("settingsList", StringType(), True),
        StructField("usage", StringType(), True),
    ])

    artifact_schema = StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("description", StringType(), True),
        StructField("state", StringType(), True),
        StructField("createdDate", StringType(), True),
        StructField("lastUpdatedDate", StringType(), True),
        StructField("modifiedDate", StringType(), True),
        StructField("modifiedDateTime", StringType(), True),
        StructField("createdDateTime", StringType(), True),
        StructField("datasetId", StringType(), True),
        StructField("reportType", StringType(), True),
        StructField("format", StringType(), True),
        StructField("relations", ArrayType(relation_schema), True),
        StructField("extendedProperties", StringType(), True),
    ])

    workspace_schema = StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("type", StringType(), True),
        StructField("state", StringType(), True),
        StructField("isOnDedicatedCapacity", BooleanType(), True),
        StructField("capacityId", StringType(), True),
        StructField("defaultDatasetStorageFormat", StringType(), True),

        # NOTE: these keys match your payload casing
        StructField("Lakehouse", ArrayType(artifact_schema), True),
        StructField("DataAgent", ArrayType(artifact_schema), True),
        StructField("DataPipeline", ArrayType(artifact_schema), True),
        StructField("Reflex", ArrayType(artifact_schema), True),
        StructField("Notebook", ArrayType(artifact_schema), True),

        # reports is lower-case in your payload
        StructField("reports", ArrayType(artifact_schema), True),
    ])

    scan_schema = StructType([
        StructField("workspaces", ArrayType(workspace_schema), True)
    ])

    parsed = spark.createDataFrame(
        [Row(payload_json=payload_json)]
    ).select(
        from_json(col("payload_json"), scan_schema).alias("scan")
    )

    # 3) Node: Workspaces
    ws_df = (parsed
        .select(explode(col("scan.workspaces")).alias("w"))
        .select(
            lit(scan_id).alias("scan_id"),
            lit(run_dt).alias("run_dt"),
            col("w.id").alias("workspace_id"),
            col("w.name").alias("workspace_name"),
            col("w.state").alias("workspace_state"),
            col("w.capacityId").alias("capacity_id"),
            col("w.defaultDatasetStorageFormat").alias("default_dataset_storage_format"),
        )
        .where(col("workspace_id").isNotNull())
    )

    # 4) Node: Capacity (from workspace.capacityId)
    cap_df = (ws_df
        .select(
            lit(scan_id).alias("scan_id"),
            lit(run_dt).alias("run_dt"),
            col("capacity_id").alias("capacity_id"),
        )
        .where(col("capacity_id").isNotNull())
        .dropDuplicates(["capacity_id"])
    )

    # Helper to explode any artifact array
    def explode_artifacts(array_col_name: str, artifact_type: str):
        return (parsed
            .select(explode(col("scan.workspaces")).alias("w"))
            .select(
                col("w.id").alias("workspace_id"),
                explode(col(f"w.{array_col_name}")).alias("a")
            )
            .select(
                lit(scan_id).alias("scan_id"),
                lit(run_dt).alias("run_dt"),
                lit(artifact_type).alias("artifact_type"),
                col("workspace_id"),
                col("a.id").alias("artifact_id"),
                col("a.name").alias("artifact_name"),
                col("a.state").alias("artifact_state"),
                col("a.reportType").alias("report_type"),
                col("a.datasetId").alias("dataset_id"),
                col("a.format").alias("format"),
            )
            .where(col("artifact_id").isNotNull())
        )

    # 5) Node: Artifacts (lakehouse, data agent, pipeline, reflex, notebook, report)
    artifacts = [
        explode_artifacts("Lakehouse", "Lakehouse"),
        explode_artifacts("DataAgent", "DataAgent"),
        explode_artifacts("DataPipeline", "DataPipeline"),
        explode_artifacts("Reflex", "Reflex"),
        explode_artifacts("Notebook", "Notebook"),
        explode_artifacts("reports", "Report"),
    ]

    artifacts_df = artifacts[0]
    for d in artifacts[1:]:
        artifacts_df = artifacts_df.unionByName(d, allowMissingColumns=True)

    # 6) Node: Datasets (distinct datasetIds from reports)
    datasets_df = (artifacts_df
        .where((col("artifact_type") == "Report") & col("dataset_id").isNotNull())
        .select(
            lit(scan_id).alias("scan_id"),
            lit(run_dt).alias("run_dt"),
            col("dataset_id").alias("dataset_id")
        )
        .dropDuplicates(["dataset_id"])
    )

    # 7) Edge: Workspace CONTAINS Artifact
    edge_ws_contains_df = (artifacts_df
        .select(
            lit(scan_id).alias("scan_id"),
            lit(run_dt).alias("run_dt"),
            col("workspace_id").alias("from_workspace_id"),
            col("artifact_id").alias("to_artifact_id"),
            col("artifact_type").alias("to_artifact_type"),
        )
        .where(col("from_workspace_id").isNotNull() & col("to_artifact_id").isNotNull())
    )

    # 8) Edge: Report USES Dataset
    edge_report_uses_dataset_df = (artifacts_df
        .where((col("artifact_type") == "Report") & col("dataset_id").isNotNull())
        .select(
            lit(scan_id).alias("scan_id"),
            lit(run_dt).alias("run_dt"),
            col("artifact_id").alias("from_report_id"),
            col("dataset_id").alias("to_dataset_id"),
        )
    )

    # 9) Edge: Artifact DEPENDS ON Artifact (from relations arrays)
    # We need to re-parse relations with the same schema (so we can explode it)
    parsed_ws_rel = (spark.createDataFrame([Row(payload_json=payload_json)])
        .select(from_json(col("payload_json"), scan_schema).alias("scan"))
        .select(explode(col("scan.workspaces")).alias("w"))
    )

    def explode_depends(array_col_name: str):
        return (parsed_ws_rel
            .select(
                col("w.id").alias("workspace_id"),
                explode(col(f"w.{array_col_name}")).alias("a")
            )
            .select(
                col("workspace_id"),
                col("a.id").alias("artifact_id"),
                explode(col("a.relations")).alias("rel")
            )
            .select(
                lit(scan_id).alias("scan_id"),
                lit(run_dt).alias("run_dt"),
                col("workspace_id"),
                col("artifact_id").alias("from_artifact_id"),
                col("rel.dependentOnArtifactId").alias("to_artifact_id"),
                col("rel.relationType").alias("relation_type"),
                col("rel.usage").alias("usage"),
            )
            .where(col("from_artifact_id").isNotNull() & col("to_artifact_id").isNotNull())
        )

    depends_parts = [
        explode_depends("DataAgent"),
        explode_depends("Reflex"),
        # Add others if they have relations in your tenant later
    ]
    edge_depends_df = depends_parts[0]
    for d in depends_parts[1:]:
        edge_depends_df = edge_depends_df.unionByName(d, allowMissingColumns=True)

    # 10) Edge: Workspace ON Capacity
    edge_ws_on_capacity_df = (ws_df
        .where(col("capacity_id").isNotNull())
        .select(
            lit(scan_id).alias("scan_id"),
            lit(run_dt).alias("run_dt"),
            col("workspace_id").alias("from_workspace_id"),
            col("capacity_id").alias("to_capacity_id"),
        )
    )

    # Write tables (graph-friendly; overwrite for “start over”)
    ws_df.write.format("delta").mode(mode).saveAsTable(node_workspaces)
    artifacts_df.write.format("delta").mode(mode).saveAsTable(node_artifacts)
    datasets_df.write.format("delta").mode(mode).saveAsTable(node_datasets)
    cap_df.write.format("delta").mode(mode).saveAsTable(node_capacities)

    edge_ws_contains_df.write.format("delta").mode(mode).saveAsTable(edge_ws_contains)
    edge_report_uses_dataset_df.write.format("delta").mode(mode).saveAsTable(edge_report_uses_dataset)
    edge_depends_df.write.format("delta").mode(mode).saveAsTable(edge_depends_on)
    edge_ws_on_capacity_df.write.format("delta").mode(mode).saveAsTable(edge_ws_on_capacity)

    return {
        "raw_table": raw_table,
        "nodes": [node_workspaces, node_artifacts, node_datasets, node_capacities],
        "edges": [edge_ws_contains, edge_report_uses_dataset, edge_depends_on, edge_ws_on_capacity],
    }

StatementMeta(, 4f808cfa-6cec-4877-96fb-624ee00d8d43, 8, Finished, Available, Finished)

In [7]:


def save_scan_result_as_delta_table(scan_id: str, scan_result: dict, table_name: str = "pbi_scan_results"):
    # 1) Persist raw JSON too (optional but handy for debugging/audit)
    from pyspark.sql import Row
    from pyspark.sql.types import StructType, StructField, StringType
    import json
    from datetime import datetime

    raw_path = f"Files/pbi_scans/raw/scanResult_{scan_id}.json"
    mssparkutils.fs.mkdirs("Files/delta_tables/raw")
    mssparkutils.fs.put(raw_path, json.dumps(scan_result, ensure_ascii=False), overwrite=True)  # writes string to file [1](https://learn.microsoft.com/en-us/fabric/data-engineering/microsoft-spark-utilities)[2](https://learn.microsoft.com/en-us/fabric/data-engineering/notebook-utilities)

    # 2) Create a minimal “raw JSON” delta table (schema-stable)
    # Store scan_id + full JSON payload as a string column
    payload_str = json.dumps(scan_result, ensure_ascii=False)

    now = datetime.now()

    df = spark.createDataFrame(
        [Row(scan_id=scan_id, payload_json=payload_str)],
        schema=StructType([
            StructField("scan_id", StringType(), False),
            StructField("payload_json", StringType(), False),
        ])
    )

    # Managed delta table under Tables/
    (df.write
       .format("delta")
       .mode("append")
       .saveAsTable(table_name))

    return raw_path, table_name


StatementMeta(, 4f808cfa-6cec-4877-96fb-624ee00d8d43, 9, Finished, Available, Finished)

In [8]:


def save_scan_result(scan_id: str, scan_result: dict, folder: str = "Files/pbi_scans") -> tuple[str, bool]:
    """
    Save a Power BI Scanner API scan_result JSON into the Lakehouse Files area.

    Returns:
      (raw_path, ok)
        raw_path: the full path written to
        ok: True if write succeeded, False otherwise (per mssparkutils.fs.put)
    """
    import json
    raw_path = f"{folder}/scanResult_{scan_id}.json"

    # Ensure folder exists (creates parents as needed)
    mssparkutils.fs.mkdirs(folder)  

    # Write JSON file (put returns bool; overwrite supported)
    ok = mssparkutils.fs.put(
        raw_path,
        json.dumps(scan_result, indent=2),
        overwrite=True
    )  

    return raw_path, ok

StatementMeta(, 4f808cfa-6cec-4877-96fb-624ee00d8d43, 10, Finished, Available, Finished)

In [9]:
def get_access_token():
    import msal
    app = msal.ConfidentialClientApplication(
        client_id=CLIENT_ID,
        authority=f"https://login.microsoftonline.com/{TENANT_ID}",
        client_credential=CLIENT_SECRET
    )
    result = app.acquire_token_for_client(scopes=SCOPE)
    if "access_token" not in result:
        raise RuntimeError(f"Token acquisition failed: {result}")
    return result["access_token"]


StatementMeta(, 4f808cfa-6cec-4877-96fb-624ee00d8d43, 11, Finished, Available, Finished)

In [11]:
import json
import time
import uuid
import requests
from datetime import datetime, timezone, timedelta
from datetime import datetime, timezone
from pyspark.sql import Row
from pyspark.sql.functions import col, explode, lit, from_json
from pyspark.sql.types import (
    StructType, StructField, StringType, BooleanType, ArrayType, TimestampType
)


# ---- CONFIG (fill these in from Fabric secrets / Key Vault) ----
# need to fix this to pull from keyvault

TENANT_ID     = "<Your Tenant ID>"
CLIENT_ID     = "<Your Client ID>"
CLIENT_SECRET = "<Your Secret>)"


#
#  Initialize Variables used for API calls
#
SCOPE = ["https://analysis.windows.net/powerbi/api/.default"]
BASE = "https://api.powerbi.com/v1.0/myorg"

token = get_access_token()

headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
}

#
#  Initialize variables used in this cell
#
CST = timezone(timedelta(hours=-6), name="CST")            # Define CST as UTC -6
now = datetime.now(CST)                                    # Get datetime to timestamp scan runs in data
BATCH_SIZE = 100                                           # Number of workspaces per batch
batch_ids = []                                             # Used to store Batch IDs
scan_requests = []                                         # Used to store Scan Request IDs

#
#  Loop to break workspace scans into groups of 100 and one final one of less than 100
#

workspaces = list_fabric_workspaces(token)

for ws in workspaces:  

    name = ws.get("displayName") or ws.get("name") or "<no name>"
    wid  = ws.get("id")

    #print(f"{name} | {wid}")

    if not wid:
        continue

    batch_ids.append(wid)

    # When we hit 100, submit a scan and reset the batch
    if len(batch_ids) == BATCH_SIZE:
        scan_req = start_scan(
            workspace_ids=batch_ids,
            lineage=True,
            datasourceDetails=True,
            datasetSchema=False,
            datasetExpressions=False,
            getArtifactUsers=False
        )
        scan_requests.append(scan_req)
        batch_ids = []

# After loop ends, submit the final partial batch (1..99)
if len(batch_ids) > 0:
    scan_req = start_scan(
        workspace_ids=batch_ids,
        lineage=True,
        datasourceDetails=True,
        datasetSchema=False,
        datasetExpressions=False,
        getArtifactUsers=False
    )
    scan_requests.append(scan_req)
#print("scan request ID: ", scan_req['id'])


# 
#  Get scan status.  I do not pull results until the scan is complete and has a status of Succeeded.
#
while True:
    st = get_scan_status(scan_req['id'])
    status = st.get("status")
    print("scanStatus =", status)
    if status in ("Succeeded", "Failed"):
        break
    time.sleep(3)

if status != "Succeeded":
    raise RuntimeError(f"Scan failed: {st}")
print(scan_req)

# path, ok = save_scan_result(scan_req['id'], )

#
#  Write the scan results and workspace info to OneLake as delta parquet
#

# write scans to delta
#wresult = save_scan_result_as_delta_table_dt(
#    scan_id=scan_req['id'], 
#    scan_result=get_scan_result(scan_req['id']),
#    run_dt=now,
#    table_name="scans" )

#write workspaces to delta
#wsresult = save_scan_result_as_delta_table_dt(
#    scan_id=scan_req['id'], 
#    scan_result=workspaces,
#    run_dt=now, 
#    table_name="workspaces")

out = persist_scan_to_graph_tables(
    scan_id=scan_req['id'],
    scan_result=get_scan_result(scan_req['id']),
    run_dt=now,
    mode="append")

print("run date time: ",now)
print(out)
print(f"Submitted {len(scan_requests)} scans")
print(wid)
print(batch_ids)




StatementMeta(, 4f808cfa-6cec-4877-96fb-624ee00d8d43, 13, Finished, Available, Finished)

{'raw_table': 'pbi_scan_results_raw', 'nodes': ['pbi_graph_workspaces', 'pbi_graph_artifacts', 'pbi_graph_datasets', 'pbi_graph_capacities'], 'edges': ['pbi_graph_edge_workspace_contains_artifact', 'pbi_graph_edge_report_uses_dataset', 'pbi_graph_edge_artifact_depends_on_artifact', 'pbi_graph_edge_workspace_on_capacity']}
