# Load Metadata from Power BI using the REST API

### Import libraries

In [1]:
import requests

from pyspark.sql import functions as F
from pyspark.sql import types as T

# Import of the Trident Token Library
import json
from trident_token_library_wrapper import PyTridentTokenLibrary

StatementMeta(, 0ec7c8ba-d2ad-4ce2-8ada-d85569984516, 3, Finished, Available)

### Parameters

In [2]:
# Parameters
fabricREST = 'https://api.fabric.microsoft.com/v1'
powerbiREST = 'https://api.powerbi.com/v1.0/myorg'
url = powerbiREST

#Target Lakehouse is where the json files are landed
targetLakehouse = f'abfss://{your-tennant}]@onelake.dfs.fabric.microsoft.com/{your-lakehouse}}'
targetRelativePath = '/Files/PowerBI/'
targetFolder = targetLakehouse + targetRelativePath

#Staging Lakehouse is where the delta tables are loaded
stagingLakehouse = f'abfss://{your-tennant}]@onelake.dfs.fabric.microsoft.com/{your-lakehouse}}'
stagingRelativePath = '/Tables/'
stagingPath = stagingLakehouse + stagingRelativePath

# Name of the Key Vault
nameOfKeyVault = f'{your-keyvault-name}'
keyvault = f'https://{nameOfKeyVault}.vault.azure.net/'

# Names of the secrets saved in Key Vault
tenantId_SecretName = 'FabricTennantID' # Tenant ID
clientId_SecretName = 'FabricSPClientID'    # Client ID of Service Principal
clientSecret_SecretName = 'FabricSPClientSecret' # Client Secret of Service Principal


StatementMeta(, 0ec7c8ba-d2ad-4ce2-8ada-d85569984516, 4, Finished, Available)

### Functions

In [3]:

# Function definition
## Function for getting secret from Key Vault
def get_secret_from_keyvault(secret_name):
    access_token=PyTridentTokenLibrary.get_access_token('keyvault') # Get Access Token for Key Vault
    return PyTridentTokenLibrary.get_secret_with_token(keyvault,secret_name,access_token) # Return Secret

## Function for getting Bearer Token
def get_bearer_token():
    resource = "https://analysis.windows.net/powerbi/api" # Resource for Power BI API
    tenant_id = get_secret_from_keyvault(tenantId_SecretName) # Get Tenant ID
    client_id = get_secret_from_keyvault(clientId_SecretName) # Get Client ID
    client_secret = get_secret_from_keyvault(clientSecret_SecretName) # Get Client Secret
    url = f'https://login.microsoftonline.com/{tenant_id}/oauth2/token' # URL for OAuth2
    data = f'grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&resource={resource}'  # Content that will be sent for OAuth2 
    headers = {'Content-Type': 'application/x-www-form-urlencoded'} # Headers for OAuth2
    response = requests.post(url, headers=headers, data=data)
    return response.json()["access_token"] # Return Bearer Token

## Function for getting response from API
def get_response_json(relativePath, method, data, payload_object):
    fullurl = url + '/' + relativePath # Full URL for API call
    bearer_token = get_bearer_token() # Get Bearer Token
    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {bearer_token}'} # Headers for API call
    response = requests.request(method, fullurl, headers=headers, data=data)
    if payload_object == "": # If payload_object parameter is empty, return response as a JSON
        return [response.text] # Return response as a text
    else:
        return response.json()[payload_object] # Return response as a JSON and select only the payload object

        
#Function to get reports or datasets metadata
def get_data_and_save_to_file(x,endpoint):
    sourcePath = 'groups/'+x.id+'/'+ endpoint
    targetPath = targetFolder + endpoint + '_' +  x.id + '.json'     
    payload = get_response_json(sourcePath,"GET", "", "value")
    data = json.dumps(payload)
    mssparkutils.fs.put(targetPath,data,True) 

StatementMeta(, 0ec7c8ba-d2ad-4ce2-8ada-d85569984516, 5, Finished, Available)

## Ingest

### Get response from API and write JSON to file File

In [4]:

#Set target file
json_path = targetFolder + 'groups.json' 

#Get the payload
payload = get_response_json('groups',"GET", "", "value")

#Create a dataframe for use in the following report and dataset steps
df = spark.createDataFrame(payload)

#Write to file as json string
data = json.dumps(payload)
mssparkutils.fs.put(json_path,data,True)


#Loop through all the groups (workspace) in df to get reports and datasets for each 
#Create an array of workspace ids to loop through
groupArray = df.select('id').collect()

#Loop through all the groups getting all the reports and datasets from each one
for g in groupArray:
    get_data_and_save_to_file(g,'reports')
    get_data_and_save_to_file(g,'datasets')


StatementMeta(, 0ec7c8ba-d2ad-4ce2-8ada-d85569984516, 6, Finished, Available)

# Load

## Load JSON data to simple delta tables

In [5]:
#Load groups
filename = 'groups.json'
tablename = 'pbi_workspaces' 
sourcePath = targetFolder + filename
stagingTable = stagingPath + tablename

df = spark.read.json(sourcePath)

df.write.format("delta").mode("overwrite").save(stagingTable)

StatementMeta(, 0ec7c8ba-d2ad-4ce2-8ada-d85569984516, 7, Finished, Available)

In [6]:
#Load reports
filename = 'reports_*.json'
tablename = 'pbi_reports' 
sourcePath = targetFolder + filename
stagingTable = stagingPath + tablename

df = spark.read.json(sourcePath)

df.write.format("delta").mode("overwrite").save(stagingTable)

StatementMeta(, 0ec7c8ba-d2ad-4ce2-8ada-d85569984516, 8, Finished, Available)

In [7]:
#Load datasets
filename = 'datasets_*.json'
tablename = 'pbi_datasets' 
sourcePath = targetFolder + filename
stagingTable = stagingPath + tablename

df = spark.read.json(sourcePath)

df.write.format("delta").mode("overwrite").save(stagingTable)

StatementMeta(, 0ec7c8ba-d2ad-4ce2-8ada-d85569984516, 9, Finished, Available)

## Transform and Load Reports and Datasets

### Cleaning Functions

In [8]:
def clean_workspace_data(df_workspaces):    
    #Rename columns
    df_workspaces = df_workspaces.withColumnRenamed('id','WorkspaceId')
    df_workspaces = df_workspaces.withColumnRenamed('name','WorkspaceName')
    df_workspaces = df_workspaces.withColumnRenamed('type','WorkspaceType')

    return df_workspaces

def clean_dataset_data(df_datasets):
   
    # Derive column 'WorkspaceId' from column: 'webUrl'
    df_datasets = df_datasets.withColumn('WorkspaceId', F.substring(df_datasets.webUrl, 32, 36))

    #Rename columns
    df_datasets = df_datasets.withColumnRenamed('id','DatasetId')
    df_datasets = df_datasets.withColumnRenamed('name','DatasetName')

    return df_datasets

def clean_report_data(df_reports):
    # Derive column 'WorkspaceId' from column: 'webUrl'
    df_reports = df_reports.withColumn('WorkspaceId', F.substring(df_reports.webUrl, 32, 36))
    
    #Rename columns
    df_reports = df_reports.withColumnRenamed('id','ReportId')
    df_reports = df_reports.withColumnRenamed('name','ReportName')

    return df_reports

StatementMeta(, 0ec7c8ba-d2ad-4ce2-8ada-d85569984516, 10, Finished, Available)

### Load and Clean data

In [9]:
#Load Reports
tablename = 'pbi_reports' 
sourceTable = stagingPath+tablename
df_reports = spark.read.format('delta').load(sourceTable)

#Load Datasets
tablename = 'pbi_datasets' 
sourceTable = stagingPath+tablename
df_datasets = spark.read.format('delta').load(sourceTable)

#Load Workspaces
tablename = 'pbi_workspaces' 
sourceTable = stagingPath+tablename
df_workspaces = spark.read.format('delta').load(sourceTable)

#Clean dataframes
df_reports_clean = clean_report_data(df_reports)
df_datasets_clean = clean_dataset_data(df_datasets)
df_workspaces_clean = clean_workspace_data(df_workspaces)


StatementMeta(, 0ec7c8ba-d2ad-4ce2-8ada-d85569984516, 11, Finished, Available)

### Write clean data to table

In [10]:
#Reports
tablename = 'dim_pbi_reports'
targetTable = stagingPath + tablename

df = df_reports_clean.join(df_workspaces_clean,df_reports_clean.WorkspaceId == df_workspaces_clean.WorkspaceId, 'outer').select(
    df_reports_clean.ReportId, 
    df_reports_clean.ReportName, 
    df_reports_clean.reportType,
    df_reports_clean.webUrl,
    df_reports_clean.embedUrl,
    df_reports_clean.datasetId,
    df_reports_clean.datasetWorkspaceId,
    df_reports_clean.WorkspaceId,
    df_workspaces_clean.WorkspaceName,
    df_workspaces_clean.WorkspaceType,
    df_workspaces_clean.isOnDedicatedCapacity
)

df.write.format("delta").mode("overwrite").save(targetTable)

#Datasets
tablename = 'dim_pbi_datasets'
targetTable = stagingPath + tablename

df = df_datasets_clean.join(df_workspaces_clean,df_datasets_clean.WorkspaceId == df_workspaces_clean.WorkspaceId, 'outer').select(
    df_datasets_clean.DatasetId,
    df_datasets_clean.DatasetName,
    df_datasets_clean.createdDate,
    df_datasets_clean.configuredBy,
    df_datasets_clean.isOnPremGatewayRequired,
    df_datasets_clean.isRefreshable,
    df_datasets_clean.targetStorageMode,
    df_datasets_clean.webUrl,
    df_datasets_clean.WorkspaceId,
    df_workspaces_clean.WorkspaceName,
    df_workspaces_clean.WorkspaceType,
    df_workspaces_clean.isOnDedicatedCapacity
)

#isplay(df)
df.write.format("delta").mode("overwrite").save(targetTable)

StatementMeta(, 0ec7c8ba-d2ad-4ce2-8ada-d85569984516, 12, Finished, Available)