# Fabric Monitor Hub Extraction
This notebook calls the internal Microsoft Fabric APIs to extract the monitor hub to a lakehouse table.

## Setup

In [1]:
# Parameter and variables
table_schema='logging'
table_name='fabric_monitoring_hub'
drop_existing = False
extract_limit = 50

StatementMeta(, 15549095-de79-4160-aa13-7790a0912e2d, 3, Finished, Available, Finished)

In [2]:
# Imports
import requests
from datetime import datetime, timezone
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType
from pyspark.sql.functions import to_timestamp, unix_timestamp, col, when, expr,udf

StatementMeta(, 15549095-de79-4160-aa13-7790a0912e2d, 4, Finished, Available, Finished)

In [3]:
#If drop_existing is true then drop the table
if drop_existing:
    spark.sql(f"DROP TABLE IF EXISTS {table_schema}.{table_name}")

StatementMeta(, 15549095-de79-4160-aa13-7790a0912e2d, 5, Finished, Available, Finished)

In [4]:
# Get Fabric token
token = mssparkutils.credentials.getToken("https://api.fabric.microsoft.com/")

utc_now = datetime.now(timezone.utc).isoformat(timespec='milliseconds').replace('+00:00', 'Z')
delta_table_path = f"Tables/{table_schema}/{table_name}" 

StatementMeta(, 15549095-de79-4160-aa13-7790a0912e2d, 6, Finished, Available, Finished)

## Get Fabric Cluster
Note API is unofficial

In [5]:
def get_cluster(token: str) -> dict:
    headers = {
        'Authorization': f'Bearer {token}',
        'Accept': 'application/json'
    }

    url = "https://api.powerbi.com/powerbi/globalservice/v201606/clusterdetails"


    try:
        response = requests.get(url, headers=headers,  timeout=10)
        print(f"Status Code: {response.status_code}")

        if response.status_code == 200:
            data = response.json()
            return data.get("clusterUrl")  # Only return clusterUrl
        else:
            print(f"Request failed with status {response.status_code}: {response.text}")
            return None
    except requests.exceptions.RequestException as e:
        print(f"Request error: {e}")
        return None


cluster = get_cluster(token)

# Exit notebook if cluster is null
if cluster is None:
    raise RuntimeError("Failed to retrieve Power BI cluster URL. Aborting notebook.")


StatementMeta(, 15549095-de79-4160-aa13-7790a0912e2d, 7, Finished, Available, Finished)

Status Code: 200


## Extract data from monitoring API
Note API is unofficial

In [6]:

def fetch_monitoring_history(cluster_url, token: str, end_time, limit) -> dict:
    headers = {
        'Authorization': f'Bearer {token}',
        'Accept': 'application/json'
    }

    url = f"{cluster_url}/metadata/monitoringhub/histories"

    params = {
        'limit': limit,
        'endTime': end_time,
        'startTime': '1970-01-01T00:00:00.000Z'
    }

    try:
        response = requests.get(url, headers=headers, params=params, timeout=10)
        print(f"Status Code: {response.status_code}")

        if response.status_code == 200:
            return response.json()
        else:
            print(f"Request failed with status {response.status_code}: {response.text}")
            return {}
    except requests.exceptions.RequestException as e:
        print(f"Request error: {e}")
        return {}


history = fetch_monitoring_history(cluster, token, utc_now, extract_limit)

StatementMeta(, 15549095-de79-4160-aa13-7790a0912e2d, 8, Finished, Available, Finished)

Status Code: 200


## Convert API response to data frame
Perform simple data transformations

In [7]:
# Define a UDF to format duration
def format_duration(seconds):
    if seconds is None:
        return None
    seconds = int(seconds)
    if seconds < 60:
        return f"{seconds}s"
    elif seconds < 3600:
        minutes = seconds // 60
        secs = seconds % 60
        return f"{minutes}m {secs}s"
    else:
        hours = seconds // 3600
        minutes = (seconds % 3600) // 60
        secs = seconds % 60
        return f"{hours}h {minutes}m {secs}s"
        
def format_status(statusString):
    match statusString:
        case "Completed":
            return "Succeeded"
        case "InProgress":
            return "In Progress"
        case "NotStarted":
            return "Not Started"
        case _:
            return statusString

def format_artifact_type(itemType):
    match itemType:
        case "dataset":
            return "Semantic Model"
        case "Pipeline":
            return "Data Pipeline"
        case "SynapseNotebook":
            return "Notebook"
        case _:
            return itemType

def create_df(data):
    schema = StructType([
        StructField("id", LongType(), True),
        StructField("artifactJobInstanceId", StringType(), True),
        StructField("artifactId", LongType(), True),
        StructField("artifactName", StringType(), True),
        StructField("artifactType", StringType(), True),
        StructField("jobScheduleTimeUtc", StringType(), True),
        StructField("jobStartTimeUtc", StringType(), True),
        StructField("jobEndTimeUtc", StringType(), True),
        StructField("statusString", StringType(), True),
        StructField("ownerUser", StructType([
            StructField("objectId", StringType(), True),
            StructField("userPrincipalName", StringType(), True),
            StructField("name", StringType(), True)
        ])),        
        StructField("triggeredByArtifactName", StringType(), True),
        StructField("workspaceName", StringType(), True),
        StructField("isSuccessful", BooleanType(), True),
    ])
    
    df = spark.createDataFrame(data, schema=schema)
    
    # Convert strings to timestamps & Explode ownerUser
    df= df.withColumn("jobScheduleTimeUtc", to_timestamp("jobScheduleTimeUtc")) \
        .withColumn("jobStartTimeUtc", to_timestamp("jobStartTimeUtc")) \
        .withColumn("jobEndTimeUtc", to_timestamp("jobEndTimeUtc")) \
        .withColumn("ownerObjectId", col("ownerUser.objectId")) \
        .withColumn("ownerName", col("ownerUser.name")) \
        .withColumn("ownerUserPrincipalName", col("ownerUser.userPrincipalName")) \
        .drop("ownerUser")

    # Calculate the duration
    df = df.withColumn(
        "durationInSeconds",
        when(
            col("jobEndTimeUtc").isNotNull(),
            (unix_timestamp("jobEndTimeUtc") - unix_timestamp("jobStartTimeUtc"))
        ).otherwise(None)
    )
    
    # Set up the formatting functions
    format_duration_udf = udf(format_duration, StringType())
    format_artifact_type_udf = udf(format_artifact_type, StringType())
    format_status_udf = udf(format_status, StringType())

    # add the formatted columns
    df = df.withColumn("durationFormatted", format_duration_udf(col("durationInSeconds"))) \
        .withColumn("artifactTypeFormatted", format_artifact_type_udf(col("artifactType"))) \
        .withColumn("statusStringFormatted", format_status_udf(col("statusString")))

    return df

df = create_df(history)

StatementMeta(, 15549095-de79-4160-aa13-7790a0912e2d, 9, Finished, Available, Finished)

## Save data frame to table

In [8]:
from delta.tables import *
if DeltaTable.isDeltaTable(spark, delta_table_path):
    delta_table = DeltaTable.forPath(spark, delta_table_path)

    (delta_table.alias("t")
    .merge(df.alias("s"), 's.id = t.id')
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
    )
else: 
    df.write.mode("overwrite").option("mergeSchema","true").format("delta").save(delta_table_path)




StatementMeta(, 15549095-de79-4160-aa13-7790a0912e2d, 10, Finished, Available, Finished)