## Fabric Activity Event API
This notebook calls the [Power BI Activity Events API](https://learn.microsoft.com/en-us/rest/api/power-bi/admin/get-activity-events) to maintain the following files in a Lakehouse:
- A fact table in Delta format containing a subset of fields from each activity record, appended to each time the notebook is run
- A .json file containing a complete record of all the results from the Activity Events API for each day the notebook is run

**Important Notes:**
- **Schedule this notebook to run nightly after midnight UTC** to capture all activities every day

**Prerequisites:**
- A Power BI Admin must turn on the "[Allow Service Principals to use read-only Admin APIs](https://learn.microsoft.com/en-us/power-bi/enterprise/read-only-apis-service-principal-authentication)" feature. 
- If the Allow Service Principals to use read-only Admin APIs switch is turned on _and limited to specific security groups_, the service principal used to acquire the bearer token must be in one of the group(s) allowed to use the read-only Admin APIs
- The service principal used to acquire the bearer token must have Tenant.Read.All or Tenant.ReadWrite.All permissions
- Multi-Factor Authentication (MFA) must be disabled on the service account/user used to acquire the API bearer tokens
- **This notebook assumes you are storing service principal Client ID and Client Secret in an Azure Key Vault**. Fabric will retrieve them securely, use them during Notebook runs as obscure the values in logs and cell outputs. Avoid hard-coding service principal secrets in Notebooks

**Future Work:**
- Support for band-aids with known issues with the Activity Events API, including:
    - Analyzed By External Application activities don't show the Workspace

#### Define parmaters for Key Vault, Fabric base URIs, write mode, time zone, ignored activities and Lakehouse File directories

In [None]:
# Name of the Key Vault and names of the secrets
nameOfKeyVault = 'xxx' # Name of the Key Vault
tenantId_SecretName = 'xxx'   # Tenant ID secret name
clientId_SecretName = 'xxx'   # Name for Client ID of Service Principal
clientSecret_SecretName = 'xxx' # Name for Client Secret of Service Principal

# Base URLS for Power BI and Fabric
powerBIAPIBaseUri = 'https://api.powerbi.com/v1.0/myorg/'
fabricAPIBaseUri = 'https://api.fabric.microsoft.com/v1/'

# Time zone for UTC converstion - see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones for list of possible values
timeZone = 'America/New_York'

# Set recalculateTable = 'Overwrite' if you want to completely repopulate the table from the archived json files, 'Append' if you want to append yesterday's results only
writeMode = 'Append'

# list of activities to ignore - add/remove as necessary for your requirements but in my experience these tend to make a lot of noise in the API and provide little value
ignoreActivities = ['GenerateCustomVisualAADAccessToken','GenerateCustomVisualWACAccessToken','GenerateDataflowSasToken','GenerateScreenshot']

# folder/file names
nameOfActivitiesFileFolder = 'Files/ActivityEventsAPI/'
nameOfActivitiesDeltaTable = 'Activities'

#### Import libraries and set Spark session settings

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import DataFrame
from delta.tables import *
import requests 
import json
from datetime import datetime, date, timedelta, timezone 

#### Define functions to get bearer token, make API calls and recreate/appent to Activities table

In [None]:
def fxGetFabricBearerToken():
    keyvault = f'https://{nameOfKeyVault}.vault.azure.net/'
    # Begin don't hard code this stuff
    tenant_id = mssparkutils.credentials.getSecret(keyvault,tenantId_SecretName)
    client_id = mssparkutils.credentials.getSecret(keyvault,clientId_SecretName)
    client_secret = mssparkutils.credentials.getSecret(keyvault,clientSecret_SecretName)
    # End don't hard code this stuff
    url = f'https://login.microsoftonline.com/{tenant_id}/oauth2/token'
    data = f'grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&resource=https://analysis.windows.net/powerbi/api'  
    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    response = requests.post(url, headers=headers, data=data)
    return response.json()["access_token"]

def fxRequestWithRetry(method, url, data, num_retries=3, success_list=[200, 202, 404], **kwargs):
    bearer_token = fxGetFabricBearerToken()
    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {bearer_token}'}
    for i in range(num_retries):
        try:
            response = requests.request(method, url, headers=headers, data=data, **kwargs)
            if response.status_code in success_list:
                ## Return response if successful
                return response
            ## Captures the 500 requests in an hour limit
            if response.status_code == 429 and response.headers.get('Retry-After',None) is not None:
                waitTime = int(response.headers['Retry-After'])
                print(f'Hit the 500 requests per hour rate limit - waiting {str(waitTime)} seconds until next retry')
                time.sleep(waitTime)
            ## Captures the 16 simultaneous requests limit
            if response.status_code == 429 and response.headers.get('Retry-After',None) is None:
                waitTime = 120
                print(f'Hit the 16 simultaneous requests limit - waiting {str(waitTime)} seconds until next retry')
                time.sleep(waitTime)
        except requests.exceptions.ConnectionError:
            pass
    return None

def fxCalculateActivitiesTable(df):
    activities = df.withColumn('WorkspaceID',coalesce(col('WorkspaceId'),col('FolderObjectId')))\
    .withColumn('ReportID',coalesce(col('AppReportId'),col('ReportId')))\
    .withColumn('ActivityDateUTC',col('CreationTime').cast('date'))\
    .withColumn('ActivityDatetimeUTC',col('CreationTime').cast('timestamp'))\
    .withColumn('ActivityDateLocal',from_utc_timestamp(col('CreationTime'),timeZone).cast('date'))\
    .withColumn('ActivityDatetimeLocal',from_utc_timestamp(col('CreationTime'),timeZone).cast('timestamp'))\
    .filter(~col('Activity').isin(ignoreActivities))\
    .select(col('Id').alias('ActivityID'), 
        col('UserId').alias('UserID'), 
        col('UserAgent'),
        col('ClientIP'),
        col('Activity'),
        col('ActivityDatetimeLocal'),
        col('ActivityDateLocal'),
        col('DistributionMethod'),
        col('ConsumptionMethod'),
        col('ItemName'),
        col('CapacityId').alias('CapacityID'),
        col('DomainId').alias('DomainID'),
        col('WorkspaceID'),
        col('AppId').alias('AppID'),
        col('ArtifactId').alias('ArtifactID'),
        col('ObjectId').alias('ObjectID'),
        col('ReportID'),
        col('DatasetId').alias('SemanticModelID'),
        col('DataflowId').alias('DataflowID'),
        col('DatamartId').alias('DatamartID'),
        col('DashboardId').alias('DashboardID'),
        col('TileId').alias('DashboardTileID'))\
    .fillna({'DomainID':'00000000-0000-0000-0000-00000000',
        'CapacityID':'00000000-0000-0000-0000-00000000',
        'WorkspaceID':'00000000-0000-0000-0000-00000000',
        'DistributionMethod':'N/A',
        'ConsumptionMethod':'N/A',
        'ItemName':'N/A',
        'AppID':'00000000-0000-0000-0000-00000000',
        'ArtifactID':'00000000-0000-0000-0000-00000000',
        'ObjectID':'00000000-0000-0000-0000-00000000',
        'ReportID':'00000000-0000-0000-0000-00000000',
        'SemanticModelID':'00000000-0000-0000-0000-00000000',
        'DataflowID':'00000000-0000-0000-0000-00000000',
        'DatamartID':'00000000-0000-0000-0000-00000000',
        'DashboardID':'00000000-0000-0000-0000-00000000',
        'DashboardTileID':'00000000-0000-0000-0000-00000000'})\
    .write.mode(writeMode).format("delta").save("Tables/" + nameOfActivitiesDeltaTable)
    if writeMode == 'Overwrite':
        return print(f'{nameOfActivitiesDeltaTable} overwritten from archived JSON files')
    if writeMode == 'Append':
        return print(f'Appended restults from {str(yesterday)} into {nameOfActivitiesDeltaTable}')

#### Define schema of Activities payload

In [None]:
## define initial dataframe schema
schema = StructType([
  StructField("Id",StringType(),True),
  StructField("RequestId",StringType(),True),
  StructField("CreationTime",StringType(),True),
  StructField("UserId",StringType(),True),
  StructField("UserAgent",StringType(),True),
  StructField("ClientIP",StringType(),True),
  StructField("Activity",StringType(),True),
  StructField("DistributionMethod",StringType(),True),
  StructField("ConsumptionMethod",StringType(),True),
  StructField("ItemName",StringType(),True),
  StructField("CapacityId",StringType(),True),
  StructField("DomainId",StringType(),True),
  StructField("WorkspaceId",StringType(),True),
  StructField("FolderObjectId",StringType(),True),
  StructField("DatasetId",StringType(),True),
  StructField("ReportId",StringType(),True),
  StructField("ArtifactId",StringType(),True),
  StructField("ObjectId",StringType(),True),
  StructField("AppId",StringType(),True),
  StructField("AppReportId",StringType(),True),
  StructField("DataflowId",StringType(),True),
  StructField("DatamartId",StringType(),True),
  StructField("DashboardId",StringType(),True),
  StructField("TileId",StringType(),True)
])

#### Define URI for Activity Events API based on current date in UTC

In [None]:
## define initial batch number (Activity Events API chunks responses)
batch_number = 1
## define yesterday
yesterday = date.today() - timedelta(days = 1)
## create string for yesterday in format Activity Events API expects
yesterday_uri = yesterday.strftime("%Y-%m-%d")
## create string for yesterday in format for file name
yesterday_file = yesterday.strftime("%Y%m%d")
## create full Uri for Activity Events API
activities_Uri = powerBIAPIBaseUri + "admin/activityevents?startDateTime='" + yesterday_uri + "T00:00:00'&endDateTime='" + yesterday_uri + "T23:59:59'"
print(f'Using {activities_Uri} to retrieve activities')

#### Get all batches of yesterday's activities and store in a dataframe

In [None]:
print(f'Starting to retrieve activities for {str(yesterday)}')
## get first batch of activity data
data = fxRequestWithRetry('GET', activities_Uri,'')
## set the continuation uri for next batch
contUri = data.json()['continuationUri']
## create dict with the batches' activities
j = data.json()['activityEventEntities']
## create data frame for delta table
df = spark.createDataFrame(spark.sparkContext.parallelize(j),schema)
print(f'Batch {str(batch_number)} completed - {str(len(j))} activities found')
## do until last batch is completed
while contUri is not None: 
    ## increment batch number       
    batch_number = batch_number + 1
    ## get next batch of activity data
    data_cont = fxRequestWithRetry('GET', contUri,'')
    ## set the continuation uri for next batch
    contUri = data_cont.json()['continuationUri']
    ## append json
    d = data_cont.json()['activityEventEntities'] 
    j.extend(d)
    ## create data frame for delta table
    df_cont = spark.createDataFrame(spark.sparkContext.parallelize(d),schema)    
    ## union dataframes
    df = df.union(df_cont)
    print(f'Batch {str(batch_number)} completed - {str(len(d))} activities found')
print('All Batches Retrieved and appended')

#### Check if Activities Lakehouse folder exists and create if it doesn't; then write activity batches to JSON file in Lakehouse

In [None]:
mssparkutils.fs.mkdirs(nameOfActivitiesFileFolder)
with open(f'/lakehouse/default/{nameOfActivitiesFileFolder}Activities_{yesterday_file}.json', 'w') as f:
    f.write(json.dumps(j))
    f.close()
    print(f'/lakehouse/default/{nameOfActivitiesFileFolder}Activities_{yesterday_file}.json created')

#### Overwrite or append to Activities table

In [None]:
archive_df = spark.read.json(f'{nameOfActivitiesFileFolder}/*.json',schema = schema)
if writeMode == 'Overwrite':
    fxCalculateActivitiesTable(archive_df) 
if writeMode == 'Append': 
    fxCalculateActivitiesTable(df)