In [1]:
# import packages

import msal
import requests
import json
import os
import uuid
from datetime import datetime, timezone

Edit the cell below, noting the definite change fields.

In [2]:
# Enter the Key Vault URL - DEFINITE CHANGE

keyvaulturl = "https://kv-coolcustomer-dev.vault.azure.net/"

# Enter the details of your AAD app registration - shouldn't have to change

client_id = notebookutils.credentials.getSecret(keyvaulturl, 'graph-core-client-id')
client_secret = notebookutils.credentials.getSecret(keyvaulturl, 'graph-core-client-secret')

# Enter the Entra Tenant ID below - DEFINITE CHANGE

entra_tenant_id = "daaee2d7-84e4-45e7-9907-8238d02b6e5a"

# Leave the below alone

scope = ["https://graph.microsoft.com/.default"]
authority = f'https://login.microsoftonline.com/{entra_tenant_id}'

In [3]:
# Define the base path for the Lakehouse - top level folder for all Graph data
lakehouse_base_path = "/lakehouse/default/Files/MSGraphAPItest/"

# Ensure the directory exists
os.makedirs(lakehouse_base_path, exist_ok=True)

config_file_name = "graphapiconfigusers.json"

# Generate the current date - used in folder and file generation
current_date = datetime.now().strftime("%Y%m%d")
year = datetime.now().strftime("%Y")
month = datetime.now().strftime("%m")
hour = datetime.now().strftime("%H")


In [4]:
# Read configuration file

with open(f"{lakehouse_base_path}{config_file_name}", "r") as file:
    config_json = json.load(file)

config_list = config_json

i = 1
for config in config_list:
    print(f"** CONFIG {str(i)} **")
    print()
    for c in config:
        print(f"{str(c).ljust(20)}-  {config[c]}")
    print()
    i += 1

** CONFIG 1 **

name                -  Users
description         -  lists all users within the Microsoft 365 directory
endpoint            -  https://graph.microsoft.com/v1.0/users
requesttype         -  Get
folder              -  Users
destinationtable    -  users
destinationschema   -  graphapi



In [5]:
# This cell creates an access token using the details above

# Create an MSAL instance providing the client_id, authority and client_credential parameters
client = msal.ConfidentialClientApplication(client_id, authority=authority, client_credential=client_secret)
print(client)

# Acquire an access token
token_result = client.acquire_token_for_client(scopes=scope)

print(token_result)
access_token = 'Bearer ' + token_result['access_token']

print('New access token was acquired from Azure AD')

<msal.application.ConfidentialClientApplication object at 0x73c6ca6f9c50>
{'token_type': 'Bearer', 'expires_in': 3599, 'ext_expires_in': 3599, 'access_token': 'eyJ0eXAiOiJKV1QiLCJub25jZSI6IlBldnhMS1lZVWFpdEpVWS0yY1ZMeU5YMUxPR0NRcS1jaFVTMEllVTFZZW8iLCJhbGciOiJSUzI1NiIsIng1dCI6InJ0c0ZULWItN0x1WTdEVlllU05LY0lKN1ZuYyIsImtpZCI6InJ0c0ZULWItN0x1WTdEVlllU05LY0lKN1ZuYyJ9.eyJhdWQiOiJodHRwczovL2dyYXBoLm1pY3Jvc29mdC5jb20iLCJpc3MiOiJodHRwczovL3N0cy53aW5kb3dzLm5ldC9kYWFlZTJkNy04NGU0LTQ1ZTctOTkwNy04MjM4ZDAyYjZlNWEvIiwiaWF0IjoxNzYyOTA3NTQ1LCJuYmYiOjE3NjI5MDc1NDUsImV4cCI6MTc2MjkxMTQ0NSwiYWlvIjoiazJKZ1lFandsNzJ1TTJscitoUGhHMCs0TEQ1c0JBQT0iLCJhcHBfZGlzcGxheW5hbWUiOiJjb29sY3VzdG9tZXItZGV2LWdyYXBoLWNvcmUiLCJhcHBpZCI6Ijg1YTk2NzhlLTgyNTUtNDc1Yi1iZjU4LTZkOTFjMDY5YjZlMSIsImFwcGlkYWNyIjoiMSIsImlkcCI6Imh0dHBzOi8vc3RzLndpbmRvd3MubmV0L2RhYWVlMmQ3LTg0ZTQtNDVlNy05OTA3LTgyMzhkMDJiNmU1YS8iLCJpZHR5cCI6ImFwcCIsIm9pZCI6IjVjYWM4NTNjLWRhOGQtNDdlYi1iZTUwLTlkYWZkYzdlMTdhMSIsInJoIjoiMS5BVUlBMS1LdTJ1U0U1MFdaQjRJNDBDdHVXZ01BQUF

In [6]:
# This cell defines a function to query endpoint on the graph API and save into JSON files

def _iso_z(dt=None):
    dt = dt or datetime.now(timezone.utc)
    return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

def query_graph_api(entry, access_token, output_root=None):
    """
    Full ingest (no $filter) for one metadata entry.
    Writes a JSON snapshot and returns a manifest dict.
    """
    # ---- paths & timestamps (self-contained) ----
    root = output_root or lakehouse_base_path  # falls back to your global
    now  = datetime.now(timezone.utc)
    year, month, day = now.strftime("%Y"), now.strftime("%m"), now.strftime("%d")
    stamp = now.strftime("%Y%m%dT%H%M%SZ")

    endpoint_dir = os.path.join(root, entry["folder"])
    ym_dir      = os.path.join(endpoint_dir, year, month)
    os.makedirs(ym_dir, exist_ok=True)

    run_id    = str(uuid.uuid4())
    ingest_ts = _iso_z(now)
    file_name = f"{entry['folder'].lower()}_{stamp}_{run_id[:8]}.json"
    file_path = os.path.join(ym_dir, file_name)

    # ---- HTTP (full snapshot with paging) ----
    url = entry["endpoint"]                 # don’t lowercase the URL
    headers = {
        "Authorization": access_token,
        "Accept": "application/json",
        "Prefer": "odata.maxpagesize=999"
    }
    params = entry.get("params")  # optional

    records, pages, first = [], 0, True
    print(f"\nRunning {entry['name']} → {entry['endpoint']}")
    while url:
        resp = requests.get(url, headers=headers, params=(params if first else None), timeout=60)
        first = False
        pages += 1
        if resp.status_code == 200:
            data = resp.json()
            records.extend(data.get("value", []))
            url = data.get("@odata.nextLink")
        else:
            err = resp.text
            print(f"ERROR {resp.status_code} on {entry['name']} p{pages}\n{err}\n{'*'*50}")
            return {
                "api_name": entry.get("name", entry.get("folder", "unknown")),
                "endpoint": entry.get("endpoint"),
                "destination_table": entry.get("destinationtable", entry.get("folder","")).lower(),
                "folder": entry.get("folder"),
                "file_path": None,
                "row_count": 0,
                "page_count": pages,
                "ingest_ts": ingest_ts,
                "run_id": run_id,
                "status": "error",
                "http_status": resp.status_code,
                "error": err[:5000]
            }

    # ---- write + manifest ----
    status = "empty"
    if records:
        with open(file_path, "w") as f:
            json.dump(records, f, indent=4)
        print(f"Ingested {len(records)} rows across {pages} page(s) for {entry['name']}\n{'*'*50}")
        status = "ok"
    else:
        file_path = None
        print(f"No data for {entry['name']} (0 rows)\n{'*'*50}")

    return {
        "api_name": entry.get("name", entry.get("folder", "unknown")),
        "endpoint": entry.get("endpoint"),
        "destination_table": entry.get("destinationtable", entry.get("folder","")).lower(),  # e.g., users/groups
        "folder": entry.get("folder"),
        "file_path": file_path,
        "row_count": len(records),
        "page_count": pages,
        "ingest_ts": ingest_ts,
        "run_id": run_id,
        "status": status,
        "mode": "full"
    }



In [7]:
# run the function for the standard graph API endpoints

entry = config_list

manifests = []

for entry in config_list:
    m = query_graph_api(entry, access_token)   # <- your function from earlier
    manifests.append(m)

run_manifest = {
    "run_id": str(uuid.uuid4()),
    "run_ts": _iso_z(),
    "manifests": manifests
}

print("ingestion complete")


Running Users → https://graph.microsoft.com/v1.0/users
Ingested 2 rows across 1 page(s) for Users
**************************************************
ingestion complete


In [8]:
notebookutils.notebook.exit(run_manifest)

ExitValue: {'run_id': '81553aa9-755f-48fc-867c-ed67017344ae', 'run_ts': '2025-11-12T00:37:26Z', 'manifests': [{'api_name': 'Users', 'endpoint': 'https://graph.microsoft.com/v1.0/users', 'destination_table': 'users', 'folder': 'Users', 'file_path': '/lakehouse/default/Files/MSGraphAPItest/Users/2025/11/users_20251112T003726Z_5bd286d3.json', 'row_count': 2, 'page_count': 1, 'ingest_ts': '2025-11-12T00:37:26Z', 'run_id': '5bd286d3-b929-4e4d-928a-018ae227ecac', 'status': 'ok', 'mode': 'full'}]}