In [None]:
import msal
import requests
import pandas as pd
import time
import json
import os
from datetime import datetime, timedelta, timezone 
import urllib.parse

# For Fabric Lakehouse operations
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp, col
from pyspark.sql.types import StringType, IntegerType, BooleanType, TimestampType, StructType, StructField, MapType

# For explicit Delta table operations
from delta.tables import DeltaTable 

try:
    spark = SparkSession.builder.getOrCreate()
    print("Spark session initialized.")
except Exception as e:
    print(f"Error initializing Spark session: {e}")
    pass 


# --- Configuration ---
CONFIG_FILE_PATH = "/lakehouse/default/Files/utils/config.json"

# Lakehouse table name - This is the name your Delta table will have in the 'Tables' section
LAKEHOUSE_TABLE_NAME = "activity_events" 

# Load configuration from config.json
try:
    with open(CONFIG_FILE_PATH, 'r') as f:
        config = json.load(f)
    print(f"Successfully loaded configuration from {CONFIG_FILE_PATH}")
except FileNotFoundError:
    print(f"Error: {CONFIG_FILE_PATH} not found. Please ensure the config.json file exists at the specified path.")
    exit(1)
except json.JSONDecodeError:
    print(f"Error: Could not decode JSON from {CONFIG_FILE_PATH}. Please check the file's format.")
    exit(1)
except Exception as e:
    print(f"An unexpected error occurred while loading config.json: {e}")
    exit(1)

# Get Power BI credentials from config.json.
CLIENT_ID = config.get("CLIENT_ID", "YOUR_APPLICATION_CLIENT_ID")
CLIENT_SECRET = config.get("CLIENT_SECRET", "YOUR_CLIENT_SECRET")
TENANT_ID = config.get("TENANT_ID", "YOUR_DIRECTORY_TENANT_ID")

# --- Security & Verification ---
# It's recommended to keep VERIFY_SSL = True in production.
VERIFY_SSL = False

if not VERIFY_SSL:
    print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    print("!!! WARNING: SSL CERTIFICATE VERIFICATION IS DISABLED.                     !!!")
    print("!!! This is a security risk and should only be a temporary workaround.     !!!")
    print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n")
    import urllib3
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# --- API Constants ---
AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
SCOPE = ["https://analysis.windows.net/powerbi/api/.default"]
BASE_URL = "https://api.powerbi.com/v1.0/myorg"
ADMIN_BASE_URL = f"{BASE_URL}/admin"

# --- Authentication ---
def get_access_token():
    """Authenticates using MSAL with client credentials and returns an access token."""
    if "YOUR_" in CLIENT_ID or "YOUR_" in CLIENT_SECRET or "YOUR_" in TENANT_ID:
        raise ValueError(
            "Placeholder Power BI credentials detected. Please set CLIENT_ID, "
            "CLIENT_SECRET, and TENANT_ID in your config.json file."
        )

    app = msal.ConfidentialClientApplication(
        CLIENT_ID,
        authority=AUTHORITY,
        client_credential=CLIENT_SECRET
    )
    result = app.acquire_token_silent(SCOPE, account=None)
    if not result:
        print("No suitable token in cache, acquiring a new one...")
        result = app.acquire_token_for_client(scopes=SCOPE)

    if "access_token" in result:
        return result["access_token"]
    else:
        print("Error acquiring token:")
        print(result.get("error"))
        print(result.get("error_description"))
        raise Exception("Failed to acquire access token for Service Principal.")

# --- Data Fetching for Activity Events ---
def get_paginated_activity_data(token, start_date_time, end_date_time):
    """
    Retrieves all activity events from the Power BI Admin API, handling
    pagination using 'continuationUri' and 'lastResultSet'.
    """
    all_events = []
    headers = {"Authorization": f"Bearer {token}"}
    
    base_url_activity_events = f"{ADMIN_BASE_URL}/activityevents"
    
    initial_request_params = {
        'startDateTime': start_date_time, 
        'endDateTime': end_date_time
    }
    
    next_page_uri = None
    is_first_request = True

    while True:
        request_url = ""
        request_params = {}

        if is_first_request:
            request_url = base_url_activity_events
            request_params = initial_request_params.copy()
            is_first_request = False
        elif next_page_uri:
            request_url = next_page_uri
            request_params = {}
        else:
            print("     > No more activity events or continuation URI found. Completing fetch.")
            break 
        
        try:
            print(f"Fetching activity events from: {request_url}")
            if request_params:
                print(f"     > With params: {request_params}")

            response = requests.get(request_url, headers=headers, params=request_params, verify=VERIFY_SSL)
            response.raise_for_status()
            data = response.json()

            page_events = data.get("activityEventEntities", [])
            all_events.extend(page_events)
            print(f"     > Retrieved {len(page_events)} items. Total so far: {len(all_events)}.")

            next_page_uri = data.get("continuationUri")
            last_result_set = data.get("lastResultSet", False)

            if last_result_set:
                print("     > Reached the last result set. Completing fetch.")
                break

        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                retry_after = int(e.response.headers.get("Retry-After", 30))
                print(f"     > Rate limited. Retrying after {retry_after} seconds...")
                time.sleep(retry_after)
            else:
                print(f"HTTP Error fetching activity events: {e.response.status_code} - {e.response.text}")
                break
        except Exception as e:
            print(f"An unexpected error occurred while fetching activity events: {e}")
            break
        
        time.sleep(1)

    return all_events

# --- Fabric Lakehouse Operations ---
def write_activity_events_to_lakehouse(activity_events, bDate_str):
    """
    Converts activity events to a Spark DataFrame and writes/merges them
    into the specified Fabric Lakehouse table using explicit delete and append.
    """
    if not activity_events:
        print("No activity events to write to Lakehouse.")
        return

    # Define the schema explicitly for the DataFrame to ensure correct types
    schema = StructType([
        StructField("Id", StringType(), True),
        StructField("RecordType", IntegerType(), True),
        StructField("CreationTime", StringType(), True), 
        StructField("Operation", StringType(), True),
        StructField("OrganizationId", StringType(), True),
        StructField("UserType", StringType(), True),
        StructField("UserKey", StringType(), True),
        StructField("Workload", StringType(), True),
        StructField("UserId", StringType(), True),
        StructField("Activity", StringType(), True),
        StructField("EmbedTokenId", StringType(), True),
        StructField("IsSuccess", BooleanType(), True),
        StructField("RequestId", StringType(), True),
        StructField("ActivityId", StringType(), True),
        StructField("BillingType", StringType(), True),
        StructField("ClientIP", StringType(), True),
        StructField("UserAgent", StringType(), True),
        StructField("ItemName", StringType(), True),
        StructField("WorkSpaceName", StringType(), True),
        StructField("CapacityId", StringType(), True),
        StructField("CapacityName", StringType(), True),
        StructField("WorkspaceId", StringType(), True),
        StructField("ObjectId", StringType(), True),
        StructField("DataflowId", StringType(), True),
        StructField("DataflowName", StringType(), True),
        StructField("DataflowType", StringType(), True),
        StructField("DatasetName", StringType(), True),
        StructField("DatasetId", StringType(), True),
        StructField("DataConnectivityMode", StringType(), True),
        StructField("ArtifactId", StringType(), True),
        StructField("ArtifactName", StringType(), True),
        StructField("RefreshType", StringType(), True),
        StructField("LastRefreshTime", StringType(), True),
        StructField("ArtifactKind", StringType(), True),
        StructField("ItemId", StringType(), True),
        StructField("ReportName", StringType(), True),
        StructField("AppName", StringType(), True),
        StructField("ReportId", StringType(), True),
        StructField("ReportType", StringType(), True),
        StructField("AppReportId", StringType(), True),
        StructField("DistributionMethod", StringType(), True),
        StructField("ConsumptionMethod", StringType(), True),
        StructField("AppId", StringType(), True),
        StructField("DataflowRefreshScheduleType", StringType(), True),
        StructField("ExportedArtifactInfo", StringType(), True),
        StructField("ExportedArtifactDownloadInfo", StringType(), True),
        StructField("EndPoint", StringType(), True),
        StructField("HasFullReportAttachment", StringType(), True), 
        StructField("SubscriptionDetails", StringType(), True),
        StructField("GatewayId", StringType(), True),
        StructField("DatasourceId", StringType(), True),
        StructField("SubfolderId", StringType(), True),
        StructField("SubfolderObjectId", StringType(), True),
        StructField("SubfolderName", StringType(), True),
        StructField("FolderObjectId", StringType(), True),
        StructField("FolderDisplayName", StringType(), True),
        StructField("FolderAccessRequests", StringType(), True),
        StructField("TableName", StringType(), True),
        StructField("ArtifactAccessRequestInfo", StringType(), True),
        StructField("Schedules", StringType(), True),
        StructField("OriginalOwner", StringType(), True),
        StructField("TakingOverOwner", StringType(), True),
        StructField("SubscribeeInformation", StringType(), True),
        StructField("ExternalSubscribeeInformation", StringType(), True),
        StructField("SubscriptionSchedule", StringType(), True),
        StructField("IsTenantAdminApi", StringType(), True), 
        StructField("GatewayClustersObjectIds", StringType(), True),
        StructField("DatasourceInformations", StringType(), True),
        StructField("ArtifactObjectId", StringType(), True),
        StructField("AdditionalInfo", StringType(), True) # Store as JSON string
    ])

    # Transform list of dictionaries into a list of tuples/lists that match the schema
    data_for_df = []
    explicit_keys = [field.name for field in schema.fields if field.name != "AdditionalInfo"]

    for event in activity_events:
        row = []
        additional_info_dict = {}
        for key in explicit_keys:
            value = event.get(key)
            if key == "IsSuccess":
                row.append(bool(value)) 
            elif key in ["HasFullReportAttachment", "IsTenantAdminApi"] and isinstance(value, str):
                row.append(value.lower() == 'true' if value else None)
            else:
                row.append(value)
        
        for k, v in event.items():
            if k not in explicit_keys:
                if isinstance(v, (list, dict)):
                    try:
                        additional_info_dict[k] = json.dumps(v)
                    except TypeError:
                        additional_info_dict[k] = str(v)
                else:
                    additional_info_dict[k] = v
        
        row.append(json.dumps(additional_info_dict)) 
        data_for_df.append(row)

    # Create Spark DataFrame
    df = spark.createDataFrame(data_for_df, schema=schema)
    
    # Add a 'LoadDate' column for tracking when the data was loaded.
    df = df.withColumn("LoadDate", lit(bDate_str)) 
    df = df.withColumn("ProcessingTimestamp", current_timestamp()) 

    print(f"DataFrame created with {df.count()} rows and schema:")
    df.printSchema()

    # --- Explicit Delete and Append Logic ---
    try:
        # Check if the table already exists
        if spark.catalog.tableExists(LAKEHOUSE_TABLE_NAME):
            print(f"\nDelta table '{LAKEHOUSE_TABLE_NAME}' exists. Proceeding with delete and append.")
            delta_table = DeltaTable.forName(spark, LAKEHOUSE_TABLE_NAME)

            # Delete existing records for the current LoadDate
            print(f"Attempting to delete existing records for LoadDate = '{bDate_str}'...")
            delete_condition = f"LoadDate = '{bDate_str}'"
            delta_table.delete(delete_condition)
            print(f"Successfully deleted existing records for LoadDate = '{bDate_str}'.")

            # Append new data
            df.write \
              .format("delta") \
              .mode("append") \
              .partitionBy("LoadDate") \
              .saveAsTable(LAKEHOUSE_TABLE_NAME)
            print(f"\nSuccessfully appended new Power BI activity events for {bDate_str} to Lakehouse table: {LAKEHOUSE_TABLE_NAME}")

        else:
            # If the table doesn't exist, create it with the first set of data
            print(f"\nDelta table '{LAKEHOUSE_TABLE_NAME}' does not exist. Creating it now.")
            df.write \
              .format("delta") \
              .mode("append") \
              .option("overwriteSchema", "true") \
              .partitionBy("LoadDate") \
              .saveAsTable(LAKEHOUSE_TABLE_NAME)
            print(f"\nSuccessfully created and written Power BI activity events for {bDate_str} to Lakehouse table: {LAKEHOUSE_TABLE_NAME}")

    except Exception as e:
        print(f"Error writing to Lakehouse: {e}")
        print("Please ensure your Fabric workspace and lakehouse are correctly configured and you have write permissions.")
        print("Also, check if the schema matches and that you have a 'LoadDate' column for partitioning.")


# --- Main Execution ---
def main(date_to_process: str = None):
    """
    Main function to run the activity events extraction and Lakehouse insertion process.
    :param date_to_process: Optional date string in 'YYYY-MM-DD' format. If None,
                            defaults to the previous day (UTC).
    """
    try:
        print("Attempting to authenticate Power BI with Service Principal...")
        access_token = get_access_token()
        print("Successfully authenticated and acquired access token.\n")

        if date_to_process:
            try:
                selected_date = datetime.strptime(date_to_process, "%Y-%m-%d").date()
                bDate_str = selected_date.strftime("%Y-%m-%d")
                print(f"Running for manually entered date: {bDate_str}")
            except ValueError:
                print(f"Invalid date format '{date_to_process}'. Expected YYYY-MM-DD. "
                      "Using previous day's date instead.")
                previous_day_utc = datetime.now(timezone.utc).date() - timedelta(days=1)
                bDate_str = previous_day_utc.strftime("%Y-%m-%d")
                print(f"Running for previous day (UTC): {bDate_str}")
        else:
            previous_day_utc = datetime.now(timezone.utc).date() - timedelta(days=1)
            bDate_str = previous_day_utc.strftime("%Y-%m-%d")
            print(f"No date provided. Running for previous day (UTC): {bDate_str}")
        
        # Date-time format as required by the API (enclosed in single quotes, with .000Z)
        start_datetime_str = f"'{bDate_str}T00:00:00.000Z'" 
        end_datetime_str = f"'{bDate_str}T23:59:59.000Z'"
        
        print(f"\n--- Fetching all activity events for {bDate_str} ---")
        activity_events = get_paginated_activity_data(
            access_token,
            start_datetime_str,
            end_datetime_str
        )

        if activity_events:
            write_activity_events_to_lakehouse(activity_events, bDate_str)
        else:
            print(f"No activity events found for {bDate_str}. Skipping Lakehouse insertion.")

    except Exception as e:
        print(f"\nAn error occurred during the process: {e}")
        print("Please check your configuration, permissions, and network connectivity.")


In [60]:
main() 

StatementMeta(, a5d7601d-e37f-4496-80fb-8f613b9c4a79, 61, Finished, Available, Finished)

Attempting to authenticate Power BI with Service Principal...
No suitable token in cache, acquiring a new one...
Successfully authenticated and acquired access token.

Running for manually entered date: 2025-06-18

--- Fetching all activity events for 2025-06-18 ---
Fetching activity events from: https://api.powerbi.com/v1.0/myorg/admin/activityevents
     > With params: {'startDateTime': "'2025-06-18T00:00:00.000Z'", 'endDateTime': "'2025-06-18T23:59:59.000Z'"}
HTTP Error fetching activity events: 400 - 
No activity events found for 2025-06-18. Skipping Lakehouse insertion.
