# Databricks Notebook: Ingest Microsoft 365 Group Owners and Maintain SCD Type 2 History (Delta + ADLS Gen2)

This notebook fetches the current owners for all active Microsoft 365 groups (using the Microsoft Graph API), and persists the ownership history to a Delta table using SCD Type 2 semantics. The pipeline is **idempotent**: re-running the notebook for the same `run_timestamp` will produce the same final state.

### How to use
1. Configure the Databricks widgets at the top of the notebook (or set them before running as a job).  
2. Ensure the secret scope referenced by `key_vault_scope` contains the keys: `m365_tenant_id`, `m365_client_id`, `m365_client_secret`.  
3. Run the notebook on a cluster with network access to Microsoft Graph and your ADLS Gen2 storage account.  

### Key principles implemented
- **Idempotency** via single `run_timestamp` and stateful comparison to `is_current = true` records.  
- **Atomicity** by using a single Delta `MERGE`.  
- **Resilience** through pagination and exponential-backoff retries when calling Graph API.  
- **Scalability** by parallelizing API calls using `mapInPandas`.

## 1) Setup & Configuration

Create Databricks widgets for parameterization. We capture a single `run_timestamp` (UTC) at the start of the notebook; this value will be used consistently for all SCD `start_date` / `end_date` assignments to guarantee idempotency.

In [None]:
# Widgets (parameterize path and secret scope; do not hardcode values)
dbutils.widgets.text("source_group_table_path", "abfss://bronze@your_storage_account.dfs.core.windows.net/m365/groups/", "Source group table path (ABFSS)")
dbutils.widgets.text("target_owner_table_path", "abfss://silver@your_storage_account.dfs.core.windows.net/m365/group_owners_history/", "Target owner table path (ABFSS)")
dbutils.widgets.text("key_vault_scope", "my_keyvault_scope", "Databricks secret scope for AAD app credentials")

source_group_table_path = dbutils.widgets.get("source_group_table_path")
target_owner_table_path = dbutils.widgets.get("target_owner_table_path")
key_vault_scope = dbutils.widgets.get("key_vault_scope")

from datetime import datetime, timezone

# Single consistent run timestamp for this execution (UTC). This ensures idempotency.
run_timestamp = datetime.now(timezone.utc).replace(microsecond=0)
run_timestamp_str = run_timestamp.strftime('%Y-%m-%d %H:%M:%S')
print(f"run_timestamp (UTC): {run_timestamp_str}")

print(f"Source path: {source_group_table_path}")
print(f"Target path: {target_owner_table_path}")
print(f"Secret scope: {key_vault_scope}")

## 2) Dependencies

Install required Python packages. If your cluster already has these installed, this is safe — `%pip` will be a no-op or upgrade. If `%pip` is restricted, install libraries at the cluster level or via an init script.


In [None]:
# Install msal and requests if not present
%pip install msal requests

## 3) Imports

Import PySpark and other helpers used throughout the notebook.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BooleanType
from delta.tables import DeltaTable
import json
import time
import requests
import msal
import pyspark
print("Imports OK")

## 4) Retrieve AAD App Credentials (from Databricks secret scope)

Expect the following secrets inside the `key_vault_scope`:
- `m365_tenant_id`
- `m365_client_id`
- `m365_client_secret`

This prevents hardcoding secrets in the notebook.

In [None]:
# Retrieve secrets
tenant_id = dbutils.secrets.get(scope=key_vault_scope, key="m365_tenant_id")
client_id = dbutils.secrets.get(scope=key_vault_scope, key="m365_client_id")
client_secret = dbutils.secrets.get(scope=key_vault_scope, key="m365_client_secret")

if not (tenant_id and client_id and client_secret):
    raise Exception("Missing one or more AAD app credentials in the secret scope.")

print("Successfully retrieved AAD app credentials from secret scope (not showing values).")

## 5) Authentication helper (MSAL)

We use the client credentials (service principal) flow to obtain an access token for Microsoft Graph (`https://graph.microsoft.com/.default`). The token is broadcast to executors for use by `mapInPandas` workers.

In [None]:
AUTHORITY = f"https://login.microsoftonline.com/{tenant_id}"
SCOPE = ["https://graph.microsoft.com/.default"]

app = msal.ConfidentialClientApplication(
    client_id,
    authority=AUTHORITY,
    client_credential=client_secret,
)

# Acquire token at driver
token_result = app.acquire_token_for_client(scopes=SCOPE)
if "access_token" not in token_result:
    raise Exception(f"Failed to acquire access token: {token_result}")
access_token = token_result["access_token"]
print("Successfully acquired access token (not shown).")

# Broadcast token to workers
access_token_b = spark.sparkContext.broadcast(access_token)

## 6) Read source groups table and prepare group id list

Read the `m365_groups_delta` Delta table from the supplied path. Filter for active groups using common heuristics. Only the `id` column (aliased to `group_id`) is selected for subsequent Graph API calls.

In [None]:
try:
    groups_df = spark.read.format("delta").load(source_group_table_path)
except Exception as e:
    raise Exception(f"Failed to read source group table at {source_group_table_path}: {e}")

print(f"Source groups columns: {groups_df.columns}")

# Heuristic filter for active groups
if 'deletedDate' in groups_df.columns:
    groups_active_df = groups_df.filter(F.col('deletedDate').isNull())
elif 'deletedDateTime' in groups_df.columns:
    groups_active_df = groups_df.filter(F.col('deletedDateTime').isNull())
elif 'state' in groups_df.columns:
    groups_active_df = groups_df.filter(F.col('state') == 'active')
else:
    groups_active_df = groups_df

group_ids_df = groups_active_df.select(F.col('id').alias('group_id')).distinct()
print(f"Number of distinct groups to process (estimate): {group_ids_df.count()}")

## 7) Graph API fetching helper (pagination + retries) and distributed fetch

Define a robust `get_group_owners` function (handles pagination `@odata.nextLink` and exponential backoff for HTTP 429/5xx). Use `mapInPandas` to run across the cluster in parallel and return a flattened DataFrame with schema: `group_id`, `owner_id`, `owner_displayName`, `owner_userPrincipalName`.

Notes:
- For very large numbers of groups, tune the partition count in `repartition(...)` below according to cluster size.
- If Graph API throttles, our function will back off and retry up to the configured maximum attempts.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd

output_schema = StructType([
    StructField('group_id', StringType(), True),
    StructField('owner_id', StringType(), True),
    StructField('owner_displayName', StringType(), True),
    StructField('owner_userPrincipalName', StringType(), True),
])

def fetch_owners_map(iterator):
    import requests, time
    access_token = access_token_b.value
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Accept': 'application/json'
    }

    def get_group_owners(group_id):
        owners = []
        url = f'https://graph.microsoft.com/v1.0/groups/{group_id}/owners?$select=id,displayName,userPrincipalName'
        max_retries = 5
        backoff_base = 1.0
        while url:
            for attempt in range(max_retries):
                try:
                    resp = requests.get(url, headers=headers, timeout=30)
                except requests.exceptions.RequestException:
                    sleep_sec = backoff_base * (2 ** attempt)
                    time.sleep(sleep_sec)
                    continue

                if resp.status_code == 200:
                    data = resp.json()
                    if 'value' in data:
                        for o in data['value']:
                            owners.append({
                                'id': o.get('id'),
                                'displayName': o.get('displayName'),
                                'userPrincipalName': o.get('userPrincipalName')
                            })
                    url = data.get('@odata.nextLink')
                    break

                elif resp.status_code in (429, 503):
                    retry_after = None
                    try:
                        retry_after = int(resp.headers.get('Retry-After', 0))
                    except Exception:
                        retry_after = None
                    if retry_after and retry_after > 0:
                        time.sleep(retry_after)
                    else:
                        sleep_sec = backoff_base * (2 ** attempt)
                        time.sleep(sleep_sec)
                    continue

                elif 500 <= resp.status_code < 600:
                    sleep_sec = backoff_base * (2 ** attempt)
                    time.sleep(sleep_sec)
                    continue

                else:
                    # Non-retriable client error - log and return empty list
                    try:
                        _err = resp.json()
                    except Exception:
                        _err = resp.text
                    print(f"Non-retriable HTTP error for group {group_id}: {resp.status_code} - {_err}")
                    return []
            else:
                print(f"Exceeded retries fetching owners for group {group_id}. Returning partial results.")
                return owners
        return owners

    for pdf in iterator:
        out_rows = []
        for gid in pdf['group_id'].tolist():
            try:
                owners = get_group_owners(gid)
            except Exception as e:
                print(f"Error fetching owners for group {gid}: {e}")
                owners = []
            if not owners:
                continue
            for o in owners:
                out_rows.append({
                    'group_id': gid,
                    'owner_id': o.get('id'),
                    'owner_displayName': o.get('displayName'),
                    'owner_userPrincipalName': o.get('userPrincipalName')
                })
        if len(out_rows) == 0:
            yield pd.DataFrame(columns=['group_id','owner_id','owner_displayName','owner_userPrincipalName'])
        else:
            yield pd.DataFrame(out_rows)

# Adjust repartition number according to cluster size and number of groups
owners_df = group_ids_df.repartition(200).mapInPandas(fetch_owners_map, schema=output_schema)
owners_df = owners_df.persist()
print(f"Fetched owner rows (estimate): {owners_df.count()}")

## 8) Prepare SCD Type 2 source

We will create a source that contains:
- All owners fetched from Graph (flagged `present = true`).
- All currently active owners from the target table that are _not_ present in today's snapshot (flagged `present = false`).
This union allows performing all SCD transitions (insert new, close removed, handle attribute changes) using a single atomic `MERGE`.

In [None]:
from pyspark.sql.functions import lit

# Add present flag to owners fetched this run
owners_present_df = owners_df.withColumn('present', lit(True))

# If the target delta table doesn't exist yet, create it from the current snapshot
def delta_table_exists(path):
    try:
        return DeltaTable.isDeltaTable(spark, path)
    except Exception:
        return False

if not delta_table_exists(target_owner_table_path):
    print(f"Target Delta table not found at {target_owner_table_path}. Creating initial table...")
    initial_df = owners_present_df.withColumn('start_date', F.to_timestamp(F.lit(run_timestamp_str))) \
                                     .withColumn('end_date', F.lit(None).cast(TimestampType())) \
                                     .withColumn('is_current', lit(True))
    initial_df.write.format('delta').mode('overwrite').save(target_owner_table_path)
    print("Initial target table created.")
else:
    print(f"Target Delta table exists at {target_owner_table_path}. Proceeding to SCD MERGE flow.")

target_df = spark.read.format('delta').load(target_owner_table_path)
target_current_df = target_df.filter(F.col('is_current') == True).select(
    F.col('group_id'), F.col('owner_id'), F.col('owner_displayName'), F.col('owner_userPrincipalName'), F.col('start_date'), F.col('end_date')
)
# owners currently active in target but missing from source = removed owners
removed_owners_df = target_current_df.join(
    owners_present_df.select('group_id', 'owner_id'),
    on=['group_id', 'owner_id'],
    how='left_anti'
).select('group_id', 'owner_id', 'owner_displayName', 'owner_userPrincipalName')
removed_owners_df = removed_owners_df.withColumn('present', lit(False))

scd_source_df = owners_present_df.unionByName(removed_owners_df)
scd_source_df = scd_source_df.persist()
scd_source_view = 'vw_scd_owners_source'
scd_source_df.createOrReplaceTempView(scd_source_view)
print(f"SCD source rows (present + removed): {scd_source_df.count()}")

## 9) SCD Type 2 MERGE (single atomic operation)

This single `MERGE` handles:
1. **Removed owners**: `present = false` → close existing active record by setting `is_current=false` and `end_date=run_timestamp`.
2. **Attribute changes**: existing active record where attributes differ → close the old record by setting `is_current=false` and `end_date=run_timestamp`.
3. **New owners**: insert new record with `is_current=true` and `start_date=run_timestamp`.

Because we use `run_timestamp_str` consistently, re-running the same run will be idempotent.

In [None]:
merge_sql = f"""
MERGE INTO delta.`{target_owner_table_path}` AS tgt
USING (SELECT group_id, owner_id, owner_displayName, owner_userPrincipalName, present FROM {scd_source_view}) AS src
ON tgt.group_id = src.group_id AND tgt.owner_id = src.owner_id

-- 1) Close out records that are currently active in target but missing from today's snapshot
WHEN MATCHED AND tgt.is_current = true AND src.present = false
  THEN UPDATE SET tgt.is_current = false, tgt.end_date = TIMESTAMP('{run_timestamp_str}')

-- 2) Handle attribute changes for an owner (close existing record)
WHEN MATCHED AND tgt.is_current = true AND src.present = true AND (
      (coalesce(tgt.owner_displayName, '') <> coalesce(src.owner_displayName, ''))
   OR (coalesce(tgt.owner_userPrincipalName, '') <> coalesce(src.owner_userPrincipalName, ''))
)
  THEN UPDATE SET tgt.is_current = false, tgt.end_date = TIMESTAMP('{run_timestamp_str}')

-- 3) Insert new records (new owners and also inserts after attribute-change closures)
WHEN NOT MATCHED AND src.present = true
  THEN INSERT (group_id, owner_id, owner_displayName, owner_userPrincipalName, start_date, end_date, is_current)
       VALUES (src.group_id, src.owner_id, src.owner_displayName, src.owner_userPrincipalName, TIMESTAMP('{run_timestamp_str}'), NULL, true)
"""

print("Executing MERGE ...")
spark.sql(merge_sql)
print("MERGE completed.")

## 10) Post-merge maintenance: OPTIMIZE + ZORDER

Run `OPTIMIZE` and `ZORDER` to keep the Delta table performant for queries. If your runtime does not support `OPTIMIZE`, the command may error — this is safe to ignore or run manually later.

In [None]:
try:
    print("Running OPTIMIZE and ZORDER... This may take time depending on table size.")
    spark.sql(f"OPTIMIZE delta.`{target_owner_table_path}` ZORDER BY (group_id)")
    print("OPTIMIZE completed.")
except Exception as e:
    print(f"OPTIMIZE failed or not supported on this runtime: {e}")

## 11) Validation / Quick checks and example queries

Run a few quick checks and then example queries you can use to validate the pipeline (counts, current owners, history for a group).

In [None]:
def quick_counts():
    tgt = spark.read.format('delta').load(target_owner_table_path)
    total = tgt.count()
    current = tgt.filter(F.col('is_current') == True).count()
    closed = tgt.filter(F.col('is_current') == False).count()
    print(f"Target total rows: {total}, current: {current}, closed: {closed}")

quick_counts()

# Example validation queries (uncomment to run):
# 1) Show some currently active owners
# display(spark.read.format('delta').load(target_owner_table_path).filter('is_current = true').limit(50))

# 2) Show history of a specific group ID (replace GROUP_ID)
# display(spark.sql(f"SELECT * FROM delta.`{target_owner_table_path}` WHERE group_id = 'GROUP_ID' ORDER BY start_date DESC"))

# 3) Show Delta history for the table (useful for debugging)
# display(spark.sql(f"DESCRIBE HISTORY delta.`{target_owner_table_path}`"))

## 12) Operational notes / recommended production enhancements

- Replace `print` statements with structured logging (send to Azure Monitor / Log Analytics).  
- Consider chunking the `group_ids_df` into batches if you have thousands of groups and the Graph API throttles heavily.  
- Monitor Delta table sizes and optimize file compaction/retention according to your operational needs.  
- Ensure the AAD app has the minimal required permissions (e.g., `Group.Read.All`).  
- Parameterize and schedule the notebook as a Databricks job, and set retry/backoff policies on the job level for failover handling.