# Fabric Capacity Metrics Long Term Retention

## Background and Purpose
The Fabric Capacity Metrics App enables Microsoft Fabric Capacity (and Premium Capacity) administrators to track utilization of Fabric Capacities over the most recent 14-day period.

There may be scenarios where longer-term history of capacity utilization is needed to understand long term trends, identify seasonal workload patterns, and facilitate future capacity planning. 

This notebook can be used as a solution accelerator for importing summaries of Fabric Capacity metrics from the Fabric Capacity Metrics App for long term storage and analysis. The data is imported at the granularity of one record per item per operation type per hour. The data is stored in a Fabric Lakehouse for long-term storage and analysis.

## Warning!
<mark>This solution is not approved and not supported by Microsoft! The structure and content of the Fabric Capacity Metrics App that serves as the data source for this solution may change at any time without notice, rendering this solution inoperable. Use at your own risk.</mark>

## Installation and Usage Instructions
1. You must be an administrator of one or more Fabric and/or Premium Capacities.
1. Install the [Fabric Capacity Metrics App](https://learn.microsoft.com/en-us/fabric/enterprise/metrics-app-install?tabs=1st)
1. Download and [import this Spark Notebook into a Fabric Workspace](https://learn.microsoft.com/en-us/fabric/data-engineering/how-to-use-notebook#import-existing-notebooks). The workspace must be hosted on a Fabric (or Premium) Capacity. This workspace can (and should) be different from the workspace where the Fabric Capacity Metrics App resides -- this will ensure that your Lakehouse with historical data remains unaffected if you choose to delete and reinstall the Fabric Capacity Metrics App.
1. [Connect a new or existing Fabric Lakehouse to your notebook](https://learn.microsoft.com/en-us/fabric/data-engineering/how-to-use-notebook#connect-lakehouses-and-notebooks).
1. If necessary, modify the parameter values in this notebook, such as the name of the Workspace and Semantic Model associated with the Fabric Capacity Metrics App that will serve as the source of your data.
1. Execute the notebook for the first time, which will create the following tables in your Lakehouse: 
    - **hourly_capacity_metrics** stores summaries of capacity operations at the granularity of one record per hour per item
    - **items** stores a catalog of items that can provide context for metrics stored in the hourly_capacity_metrics table
    - **processing_log** a helper table that will store one record for each data load operation. This table facilitates efficient incremental retrieval of historical metrics.
1. Finally, schedule the Notebook to run on a recurring schedule (such as daily). Note: to avoid gaps in historical data, the notebook must be executed at least once per 14-day period (preferably at least weekly).

## Usage Notes
- This solution imports summaries of data at the granularity of one record per hour per item (rather than a detailed log of individual operations). The granularity of this data may or may not be sufficient for your needs.
- This solution uses the identity context of the person who is running this notebook (or who owns the notebook in a scheduled/unattended execution scenario)
- This solution will retrieve data only from capacities in which the user is an administrator
- By default, this solution will merge all data into delta tables in the connected Lakehouse and will also store raw data files in parquet format in the Files area of the Lakehouse. If desired, you may disable the storage of raw parquet files by setting the *store_raw_data* parameter to *False*.
- To improve efficiency, this solution filters out summary records where CU consumption, total duration and total operation counts are all 0.
- To analyze the data, you may use Spark-based notebooks, query the data using the built-in SQL Analytics Endpoints, create a semantic model and corresponding reports, or use any other suitable analytical tools.

### Acknowledgements
The design of this notebook has benefited from the work of [Kristian Bubalo](https://pbi-guy.com/2024/04/24/how-to-extract-data-from-the-fabric-metrics-app-part-2/). Please review his blog posts and sample solutions for additional perspectives on extracting and storing historical capacity metrics.

In [None]:
#Import required libraries
import sempy.fabric as fabric
from datetime import date, datetime, timedelta
from pyspark.sql.functions import current_timestamp, date_format
from pyspark.sql import functions as F

#Set relevant spark configuration settings
spark.conf.set("sprk.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")

In [None]:
#Define parameters
source_workspace = 'Microsoft Fabric Capacity Metrics'
source_dataset = 'Fabric Capacity Metrics'

#Specify whether to store raw data in the Files area of the Lakehouse (default is true)
store_raw_data = True
path_prefix = 'Files/Raw Capacity Metrics History/'

In [None]:
#Initialize the processing log table (if needed)
sql_statement = """
    -- Create a table to store the processing log (if it does not already exist)
    CREATE TABLE IF NOT EXISTS processing_log AS 
    SELECT current_timestamp() AS Timestamp, CAST('2000-01-01' AS timestamp) AS WatermarkStart, CAST('2000-01-01' AS timestamp) AS WatermarkEnd;
    """
spark.sql(sql_statement)

#Get the latest ending watermark from the processing_log
sql_statement = " SELECT MAX(WatermarkEnd) AS Watermark FROM processing_log"
df = spark.sql(sql_statement)
watermark_start = df.first()['Watermark']

#Set new watermark to a time point 1-day ago (to allow for potential to reprocess late-arriving records)
watermark_end = datetime.now() - timedelta(hours=25)

current_timestamp = datetime.now()

#Add a record to processing log to indicate completion of a data loading operation
sql_statement = """INSERT INTO processing_log (Timestamp, WatermarkStart) 
SELECT '""" + format(current_timestamp) + "', '" + format(watermark_start) + "'"
spark.sql(sql_statement)

## Extract and Store Item Catalog

In [None]:
#Define a function to extract the catalog of items from the Fabric Capacity Metrics App semantic model
def extractItems(store_raw_data, current_timestamp, path_prefix):
    dax_query = """
    EVALUATE
        SUMMARIZECOLUMNS (
            Items[Billable type],
            Items[capacityId],
            Items[dcount_Identity],
            Items[IsVirtualArtifactName],
            Items[IsVirtualArtifactStatus],
            Items[IsVirtualWorkspaceName],
            Items[IsVirtualWorkspaceStatus],
            Items[ItemId],
            Items[ItemKey],
            Items[ItemKind],
            Items[ItemName],
            Items[Timestamp],
            Items[UniqueKey],
            Items[WorkspaceId],
            Items[WorkspaceName]
        )"""

    df_items = fabric.evaluate_dax(
        workspace = source_workspace,
        dataset = source_dataset,
        dax_string = dax_query,
        verbose = 1
        )

    if not df_items.empty: 
        #Convert Fabric DataFrames into Spark DataFrames
        dfs_items = spark.createDataFrame(df_items)
        #Clean up column names
        dfs_items = dfs_items.select([F.col(x).alias(x.replace('Items[','').replace(' ', '_').replace('[', '').replace(']', '').replace('(', '').replace(')','')) for x in dfs_items.columns])
        #Create a temporary view on top of the data frame
        dfs_items.createOrReplaceTempView('current_items')

        sql_statement = """
            -- Create a table to store items catalog (if it does not already exist)
            CREATE TABLE IF NOT EXISTS items AS 
            SELECT * FROM current_items LIMIT 0; 
            """
        spark.sql(sql_statement)

        sql_statement = """
            -- Merge currenly retrieved items into the permanent items table
            MERGE INTO items AS target
            USING current_items AS source
            ON source.UniqueKey = target.UniqueKey
            WHEN MATCHED THEN
            UPDATE SET *
            WHEN NOT MATCHED THEN
            INSERT *;
            """
        spark.sql(sql_statement)

        if store_raw_data:
            #Set folder path
            path = path_prefix + 'Items/' + current_timestamp.strftime("%Y/%m/%d") + '/' + current_timestamp.strftime("%H-%M-%S")
            #Save DataFrames to OneLake
            dfs_items.write.mode("overwrite").format("parquet").save(path)

#Execute the function to extract Items
extractItems(store_raw_data, current_timestamp, path_prefix)

In [None]:
%%sql
SELECT * FROM items 
ORDER BY ItemKey, capacityid

## Extract and Store Hourly Capacity Metrics

In [None]:
#Extract the hourly metrics by item by operation from the Fabric Capacity Metrics App semantic model
#The data is pre-filtered from start watermark forward
def extractMetrics(capacityId, watermark_start, store_raw_data, current_timestamp, path_prefix):

    #Parse date parts for 
    wm_year = str(watermark_start.year)
    wm_month = str(watermark_start.month)
    wm_day = str(watermark_start.day)

    dax_query = """
    DEFINE
            MPARAMETER 'CapacityID' = """ + '"' + capacityId + '"' + """
            VAR varFilter_Capacity =
                TREATAS ( { """ + '"' + capacityId + '"' + """ }, 'Capacities'[capacityId] )
    EVALUATE
        SUMMARIZECOLUMNS (
            'MetricsByItemandOperationandHour'[UniqueKey],
            'MetricsByItemandOperationandHour'[ItemId],
            'MetricsByItemandOperationandHour'[WorkspaceId],
            'MetricsByItemandOperationandHour'[PremiumCapacityId],
            'MetricsByItemandOperationandHour'[DateTime],
            'MetricsByItemandOperationandHour'[OperationName],
            'MetricsByItemandOperationandHour'[sum_CU],
            'MetricsByItemandOperationandHour'[sum_duration],
            'MetricsByItemandOperationandHour'[Throttling (min)],
            'MetricsByItemandOperationandHour'[count_users],
            'MetricsByItemandOperationandHour'[count_successful_operations],
            'MetricsByItemandOperationandHour'[count_rejected_operations],
            'MetricsByItemandOperationandHour'[count_operations],
            'MetricsByItemandOperationandHour'[count_Invalid_operations],
            'MetricsByItemandOperationandHour'[count_InProgress_operations],
            'MetricsByItemandOperationandHour'[count_failure_operations],
            'MetricsByItemandOperationandHour'[count_cancelled_operations],
            'MetricsByItemandOperationandHour'[avg_DurationMS],
            'MetricsByItemandOperationandHour'[percentile_DurationMs_50],
            'MetricsByItemandOperationandHour'[percentile_DurationMs_90],
            FILTER ('MetricsByItemandOperationandHour', 
                'MetricsByItemandOperationandHour'[DateTime] >= DATE(""" + wm_year + ", " + wm_month + ", " + wm_day + """)
                && ('MetricsByItemandOperationandHour'[sum_CU] > VALUE(0) 
                    || 'MetricsByItemandOperationandHour'[sum_duration] > VALUE(0)
                    || 'MetricsByItemandOperationandHour'[count_operations] > VALUE(0))
                ),
            varFilter_Capacity 
            )
        """

    df_metrics = fabric.evaluate_dax(
        workspace = source_workspace,
        dataset = source_dataset,
        dax_string = dax_query,
        verbose = 1
        )

    if not df_metrics.empty:        
        #Convert Fabric DataFrames into Spark DataFrames
        dfs_metrics = spark.createDataFrame(df_metrics)
        #Clean up column names
        dfs_metrics = dfs_metrics.select([F.col(x).alias(x.replace(' ', '_').replace('[', '').replace(']', '').replace('(', '').replace(')','').replace('MetricsByItemandOperationandHour','')) for x in dfs_metrics.columns])
        #Create a temporary view on top of the data frame
        dfs_metrics.createOrReplaceTempView('current_metrics')

        sql_statement_create = """
            -- Create a table to store hourly capacity metrics (if it does not already exist)
            CREATE TABLE IF NOT EXISTS hourly_capacity_metrics AS 
            SELECT * FROM current_metrics LIMIT 0;  
            """
        spark.sql(sql_statement_create)

        sql_statement_load = """
            -- Merge currenly retrieved metrics into the permanent table
            MERGE INTO hourly_capacity_metrics AS target
            USING current_metrics AS source
            ON source.UniqueKey = target.UniqueKey
                AND source.DateTime = target.DateTime
                AND source.OperationName = target.OperationName
            WHEN MATCHED THEN
            UPDATE SET *
            WHEN NOT MATCHED THEN
            INSERT *;
            """
        spark.sql(sql_statement_load)

        if store_raw_data:
            #Set folder path
            path = 'Hourly Metrics/' + current_timestamp.strftime("%Y/%m/%d") + '/' \
                + capacityId + '-' + current_timestamp.strftime("%H-%M-%S")
            #Save DataFrames to OneLake
            dfs_metrics.write.mode("overwrite").format("parquet").save(path_prefix + path)


#Get a list of available capacities (excluding the logical Premium Per User - PPU capacity)
df_capacities = fabric.list_capacities()
df_capacities = df_capacities[df_capacities['Sku'] != 'PP3']

#Execute functions for extracting items and metrics for each capacity
for capacity in df_capacities.Id:
    print('+ Started processing metrics for capacity ' + capacity)
    extractMetrics(capacity, watermark_start, store_raw_data, current_timestamp, path_prefix)
    print(' - Finished processing metrics for capacity ' + capacity)

## Log Completion

In [None]:
#Update processing log to indicate completion of a data loading operation
sql_statement = """
UPDATE processing_log 
SET WatermarkEnd = '""" + format(watermark_end) + """'
WHERE Timestamp = '""" + format(current_timestamp) + "' AND WatermarkStart = '" + format(watermark_start) + "'"
spark.sql(sql_statement)

## Analyze Results

After importing the data, please use your prefered tools to analyze the results. For example, the following Spark SQL query illustrates the top 100 items based on total Capacity Unit consumption.

```
SELECT I.capacityId, I.WorkspaceName, I.ItemKind, I.ItemName, ROUND(SUM(M.sum_CU),2) AS CapacityUnitSecondsConsumed
FROM hourly_capacity_metrics M
    LEFT JOIN items I 
        ON M.UniqueKey = I.UniqueKey
GROUP BY I.capacityId, I.WorkspaceName, I.ItemKind, I.ItemName
ORDER BY CapacityUnitSecondsConsumed DESC
LIMIT 100
```