In [1]:
import requests
import json
from pyspark.sql import SparkSession
from notebookutils import mssparkutils
from datetime import datetime, timedelta, timezone
from dateutil import parser
from pyspark.sql.functions import max
from pyspark.sql.types import StructType, StructField, TimestampType, StringType
import time

StatementMeta(, 366a8731-231a-4446-a500-85c908414754, 3, Finished, Available, Finished)

In [2]:
# 1. Prepare Schema and Current Time
config_table_name = "OAP_Load_Config"

schema = StructType([
    StructField("start_date_time", TimestampType(), True),
    StructField("end_date_time", TimestampType(), True),
    StructField("completion_time", TimestampType(), True),
    StructField("status", StringType(), True)
])

# Use UTC for all time calculations
current_time = datetime.now(timezone.utc)

# 2. Determine Start Point and Overlap
rows_to_insert = []

df_config = spark.table(config_table_name)
# Check for existing max end_date_time
max_row = df_config.agg(max("end_date_time").alias("max_end")).collect()
max_end_date = max_row[0]["max_end"]

if max_end_date:
    # If table has data:
    # We want to capture late-arriving data by overlapping 1 hour back.
    # Instead of a separate "catch-up" window, we just back up the start time.
    # The main loop logic will ensure windows are split at midnight correctly.
    
    last_end = max_end_date
    if last_end.tzinfo is None:
        last_end = last_end.replace(tzinfo=timezone.utc)
        
    current_start = last_end - timedelta(hours=1)
    
    print(f"Found existing data. Backing up 1h to overlap and catch late arrivals. Resuming from: {current_start}")
    
else:
    # Migrate Old Data (Table is empty)
    # Populate 3 days in the past
    print("No existing data found. Running migration logic for the last 3 days.")
    
    # Calculate start date: 3 days ago, aligned to midnight to ensure clean windows.
    three_days_ago = current_time - timedelta(days=3)
    current_start = three_days_ago.replace(hour=0, minute=0, second=0, microsecond=0)
    
    if current_start.tzinfo is None:
        current_start = current_start.replace(tzinfo=timezone.utc)
        
    print(f"Migration start date (aligned to midnight): {current_start}")

# 3. Generate Windows until Current Time
# Constraint: A single window cannot cross midnight (Start and End must be on same calendar day).
while current_start < current_time:
    
    # Calculate "Midnight of the next day" relative to current_start
    # This defines the absolute maximum end time for this specific day.
    next_day_date = current_start.date() + timedelta(days=1)
    next_midnight = datetime(next_day_date.year, next_day_date.month, next_day_date.day, 0, 0, 0, 0, tzinfo=timezone.utc)
    
    # Define limit: 1ms before midnight
    day_end_limit = next_midnight - timedelta(milliseconds=1)
    
    # The actual window end is the lesser of the "End of Day" or "Current Time"
    window_end = day_end_limit
    
    if window_end > current_time:
        window_end = current_time
        
    # Safety break to prevent infinite loops if diff is zero or negative
    if window_end <= current_start:
        break

    # Add row: start, end, completion (None), status ('Pending')
    rows_to_insert.append((current_start, window_end, None, "Pending"))
    
    # Advance start to the next millisecond (which is usually Midnight, or just after current_time)
    current_start = window_end + timedelta(milliseconds=1)


# 4. Write to Lakehouse
if rows_to_insert:
    print(f"Populating {len(rows_to_insert)} new time windows into {config_table_name}...")
    new_df = spark.createDataFrame(rows_to_insert, schema=schema)
    new_df.write.format("delta").mode("append").saveAsTable(config_table_name)
    print("Population complete.")
else:
    print("Configuration table is up to date. No new windows needed.")

StatementMeta(, 366a8731-231a-4446-a500-85c908414754, 4, Finished, Available, Finished)

Found existing data. Backing up 1h to overlap and catch late arrivals. Resuming from: 2026-02-05 11:32:43.556479+00:00
Populating 1 new time windows into OAP_Load_Config...
Population complete.


In [None]:
# Authentication using Service Principal
# Replace these values with your actual Tenant, App ID, and Key Vault details
tenant_id = "your-tenant-id"
client_id = "your-client-id-app-id"

# RETRIEVE SECRET FROM KEY VAULT (Recommended)
# akv_name = "your-key-vault-name"
# secret_name = "your-spn-secret-name"
# client_secret = mssparkutils.credentials.getSecret(akv_name, secret_name)

# OR HARDCODED (For testing only - typically unsafe)
client_secret = "your-client-secret-value"

def get_spn_token(scope):
    url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
    payload = {
        'grant_type': 'client_credentials',
        'client_id': client_id,
        'client_secret': client_secret,
        'scope': scope
    }
    try:
        response = requests.post(url, data=payload)
        response.raise_for_status()
        return response.json().get("access_token")
    except requests.exceptions.HTTPError as e:
        print(f"HTTP Error retrieving SPN token for scope {scope}: {e}")
        # Print the full error response from Azure AD (contains specific error codes)
        print(f"Response Body: {response.text}")
        return None
    except Exception as e:
        print(f"Failed to retrieve SPN token for scope {scope}: {e}")
        return None

# 1. Get Power BI Token
# Note the scope includes /.default for client credentials flow
pbi_scope = "https://analysis.windows.net/powerbi/api/.default"
auth_token = get_spn_token(pbi_scope)

if auth_token:
    print("Successfully retrieved Power BI token via Service Principal.")
else:
    print("Failed to retrieve Power BI token.")

headers = {
    "Authorization": f"Bearer {auth_token}",
    "Content-Type": "application/json"
}

StatementMeta(, 366a8731-231a-4446-a500-85c908414754, 5, Finished, Available, Finished)

Successfully retrieved token from mssparkutils.


In [None]:
# KQL Configuration
kusto_cluster_uri = "https://trd-6uegjpfbf030eemxtw.z1.kusto.fabric.microsoft.com"
kusto_database = "MonitoringEventhouse"
target_table = "WorkspaceOutboundAccessProtection"
staging_table = "WorkspaceOutboundAccessProtection_Staging"

try:
    # Use the helper function from the previous cell
    # Scope for Kusto/ADX is usually {ClusterURI}/.default
    kusto_scope = f"{kusto_cluster_uri}/.default"
    kusto_token = get_spn_token(kusto_scope)
    
    if kusto_token:
        print("Kusto Token retrieved via Service Principal.")
    else:
        print("Failed to get Kusto token (returned None).")
except Exception as e:
    print(f"Failed to get Kusto token: {e}")
    kusto_token = None

def run_kusto_command(query):
    """
    Executes a KQL Control Command using the Kusto REST API via the 'requests' library.
    This avoids needing the azure-kusto-data library.
    """
    if not kusto_token:
        print("Cannot run command: No token available.")
        return

    # Kusto Management Endpoint
    # Ensure URI doesn't have trailing slash
    mgmt_endpoint = f"{kusto_cluster_uri.rstrip('/')}/v1/rest/mgmt"
    
    headers = {
        "Authorization": f"Bearer {kusto_token}",
        "Content-Type": "application/json"
    }

    body = {
        "db": kusto_database,
        "csl": query
    }
    
    try:
        response = requests.post(mgmt_endpoint, headers=headers, json=body)
        response.raise_for_status()
        # print(response.json()) # Uncomment to see detailed result
        return True
    except Exception as e:
        print(f"Failed to execute KQL command via REST API: {e}")
        if 'response' in locals() and response.content:
             print(f"Details: {response.content}")
        return False

StatementMeta(, 366a8731-231a-4446-a500-85c908414754, 6, Finished, Available, Finished)

Kusto Token retrieved successfully.


In [5]:
# Define Processing Function
def process_activity_events_window(start_dt_obj, end_dt_obj):
    """
    Fetches data from PBI Activity API for the given window and loads it into KQL.
    Returns True if successful, False otherwise.
    """
    
    # 1. Format Dates for API (Single Quoted ISO String)
    # Ensure they are in UTC
    # Manually handle milliseconds to ensure wecapture the exact time (including .999) 
    # and strictly output 3 decimal places as preferred by many APIs.
    # %f produces 6 digits (microseconds), slicing [:-3] gives milliseconds.
    s_iso = start_dt_obj.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
    e_iso = end_dt_obj.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

    s_str = f"'{s_iso}'"
    e_str = f"'{e_iso}'"
    
    print(f"\n--- Processing Window: {s_str} to {e_str} ---")

    # 2. Get data from PBI REST API
    filters = [
        "Activity eq 'EnableWorkspaceOutboundAccessProtection'",
        "Activity eq 'DisableWorkspaceOutboundAccessProtection'"
    ]
    
    base_url = "https://api.powerbi.com/v1.0/myorg/admin/activityevents"
    all_local_events = []

    for f in filters:
        if not f or not f.strip():
            continue
        
        url = f"{base_url}?startDateTime={s_str}&endDateTime={e_str}&$filter={f}"
        
        try:
            while url:
                response = requests.get(url, headers=headers)
                response.raise_for_status()
                data = response.json()
                
                if 'activityEventEntities' in data:
                    all_local_events.extend(data['activityEventEntities'])
                elif 'value' in data:
                    all_local_events.extend(data['value'])
                
                url = data.get('continuationUri')
        except Exception as e:
            print(f"Error processing filter '{f}': {e}")
            return False

    print(f"Total events fetched: {len(all_local_events)}")

    if not all_local_events:
        #print("No events found. Marking as success (empty).")
        return True

    # 3. Load data to KQL Table
    try:
        # Create Spark DataFrame
        spark_df_local = spark.createDataFrame(all_local_events)
        
        #print(f"Writing {spark_df_local.count()} rows to Staging Table: {staging_table}")

        spark_df_local.write \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("kustoCluster", kusto_cluster_uri) \
            .option("kustoDatabase", kusto_database) \
            .option("kustoTable", staging_table) \
            .option("accessToken", kusto_token) \
            .option("tableCreateOptions", "CreateIfNotExist") \
            .mode("Append") \
            .save()
            
        #print("Write to staging complete.")

        # Merge Logic
        merge_query = f"""
        .set-or-append {target_table} <| 
        {staging_table} 
        | join kind=leftanti {target_table} on Id
        """
        
        cleanup_query = f".clear table {staging_table} data"

        # Using the run_kusto_command helper defined in earlier cell
        if run_kusto_command(merge_query):
            print("Merge to target complete.")
            if run_kusto_command(cleanup_query):
                print("Staging cleanup complete.")
                return True
            else:
                print("Warning: Staging cleanup failed.")
                return False # Or True if you consider data loaded as success regardless of cleanup
        else:
            print("Merge failed.")
            return False
            
    except Exception as e:
        print(f"Error (Load/Merge): {e}")
        return False

StatementMeta(, 366a8731-231a-4446-a500-85c908414754, 7, Finished, Available, Finished)

In [6]:


# Process Pending Windows
# Query pending items from configuration table
df_pending = spark.sql(f"""
    SELECT * FROM {config_table_name} 
    WHERE status != 'Completed' OR status IS NULL 
    ORDER BY start_date_time ASC
""")

pending_list = df_pending.collect()

if not pending_list:
    print("No pending time windows found in configuration table.")
else:
    total_items = len(pending_list)
    print(f"Found {total_items} pending windows. Starting processing loop...")
    
    for i, row in enumerate(pending_list):
        # Extract row values
        raw_start = row['start_date_time']
        raw_end = row['end_date_time']
        
        # Run logic
        success = process_activity_events_window(raw_start, raw_end)
        
        if success:
            # Update status in Delta Table
            # Note: Timestamps in string format usually work with Spark CAST if standard ISO
            if raw_start.tzinfo:
                # If tz-aware, str() typically includes offset which Spark understands
                pass
            
            try:
                update_cmd = f"""
                    UPDATE {config_table_name}
                    SET status = 'Completed', completion_time = current_timestamp()
                    WHERE start_date_time = CAST('{raw_start}' AS TIMESTAMP)
                      AND end_date_time = CAST('{raw_end}' AS TIMESTAMP)
                """
                spark.sql(update_cmd)
                #print("Status updated to 'Completed'.")
            except Exception as e:
                print(f"Failed to update status in Delta table: {e}")
                # Don't necessarily stop, but it's risky to continue if we can't mark progress
                print("Stopping loop to prevent reprocessing on next run.")
                break
            
            # Wait 10s if not the last item
            if i < total_items - 1:
                print("Waiting 10s...")
                time.sleep(10)
        else:
            print("Processing failed for this window. Stopping loop.")
            break
            
    print("Batch processing finished.")

StatementMeta(, 366a8731-231a-4446-a500-85c908414754, 8, Finished, Available, Finished)

Found 1 pending windows. Starting processing loop...

--- Processing Window: '2026-02-05T11:32:43.556Z' to '2026-02-05T13:00:39.944Z' ---
Total events fetched: 1
Merge to target complete.
Staging cleanup complete.
Batch processing finished.
