# NB Get Azure Resources Tenant - Notebooks Summary

## Overview
These notebooks (Tenant 1 through Tenant 8) are comprehensive Azure data extraction and processing tools designed to collect cloud resource consumption, licensing, and infrastructure metadata from multiple Azure tenants. Each tenant notebook operates independently to extract data from its respective Azure subscription and tenant.

## Key Functionalities

### 1. **Authentication & Authorization**
- Retrieves credentials from Azure Key Vault (Client ID, Client Secret, Tenant ID)
- Obtains OAuth 2.0 Bearer tokens for both Azure Management and Microsoft Graph APIs
- Supports multi-tenant authentication

### 2. **Azure Subscriptions & Resources**
- **Get Azure Subscriptions**: Fetches all subscriptions associated with the tenant
- Creates `StagingdimAzSubscriptions` Delta table with subscription metadata (ID, name, state, quota type)

### 3. **Azure Consumption Data**
- **Actual Cost & Amortized Cost Reports**: 
  - Extracts cost consumption data using Azure Cost Management API
  - Supports configurable time periods (current month, last month, or custom lookback periods)
  - Downloads cost reports as CSV files to lakehouse storage
  - Implements concurrent processing (ThreadPoolExecutor) for efficient multi-subscription handling
  - Handles rate limiting and API retry logic with exponential backoff

### 4. **Azure Reservations**
- Retrieves all Azure Reserved Instance details
- Extracts utilization metrics (current usage, trend, grain metrics)
- Flattens nested JSON data for CSV export
- Captures billing plans and scope properties

### 5. **Azure Saving Plans**
- Fetches Azure Savings Plan commitment details
- Extracts commitment amounts, currency, and utilization data
- Handles missing/null fields with default values
- Exports comprehensive schema to CSV

### 6. **Azure Tags & Metadata**
- **Resource Group Tags**: Extracts tags from all resource groups across subscriptions
- **Subscription Tags**: Collects subscription-level tags
- Uses multi-threaded approach for efficient processing
- Implements retry logic for API resilience

### 7. **Microsoft 365 Licensing**
- **Tenant Licenses**: Queries Microsoft Graph API for M365 SKU data
  - Available, consumed, and remaining license counts
  - Stores in `StagingdimAzMSTenantLicences` table

- **User Licenses & Overlap Detection**:
  - Retrieves detailed user license assignments
  - Detects over-licensing scenarios (e.g., user with both E3 and E5)
  - Captures user metadata (name, UPN, department, company)
  - Stores in `StagingdimAzMSUserLicenses` table

## Data Storage

**Lakehouse**: lakehouse01
**Tables Created**:
- `StagingdimAzSubscriptions` - Subscription metadata
- `StagingdimAzMSTenantLicences` - Tenant-level license summary
- `StagingdimAzMSUserLicenses` - User-level license details with overlap detection

**File Paths**:
- `/Files/azure-Usages/actualcost/` - Cost consumption files
- `/Files/azure-Usages/amortizedcost/` - Amortized cost files
- `/Files/azure/reservations/` - Reservation details
- `/Files/azure/saving-plans/` - Saving plan details
- `/Files/azure/resource-import-tags/` - Resource group tags
- `/Files/azure/subscription-tags/` - Subscription tags

## Technical Details

**Runtime Environment**: Synapse PySpark
**Key Libraries**:
- `requests` - Azure API calls
- `pandas` - Data manipulation
- `pyspark.sql` - Distributed processing
- `mssparkutils` - Fabric integration
- `concurrent.futures` - Multi-threading

**Error Handling**:
- Rate limit (429) retry logic with configurable backoff
- Request timeout handling
- Missing/null value defaults
- Comprehensive logging and error reporting

## Execution Flow

1. Load workspace variables and credentials
2. Authenticate to Azure Management and Graph APIs
3. Retrieve subscriptions for the tenant
4. Extract consumption data (cost metrics)
5. Fetch reservations and saving plans
6. Collect tags at resource and subscription level
7. Query M365 licensing information
8. Detect license overlaps and over-licensing
9. Write all data to Delta tables and CSV files for downstream ETL


# Import Libraries


In [25]:
# Standardbibliotheken
import os
import time
import json
import csv
import calendar
import hashlib
import logging
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed

# Drittanbieter-Bibliotheken
import requests
import pandas as pd
from pandas.tseries.offsets import DateOffset
from msal import ConfidentialClientApplication
import threading

# PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, when, upper
from pyspark.sql.types import IntegerType

StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 27, Finished, Available, Finished)

# Workspace Variablen importieren

In [26]:
cfg = notebookutils.variableLibrary.getLibrary("ws_variables")

key_vault_name = cfg.key_vault_name
kvtenantid_1 = cfg.kvtenantid_1
kvclientid_1 = cfg.kvclientid_1
kvclientkey_1 = cfg.kvclientkey_1
sku_mapping_path = cfg.sku_mapping_path

StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 28, Finished, Available, Finished)

# Parameter

In [27]:
LakehousePathActualCost = "/lakehouse/default/Files/azure-Usages/actualcost"
LakehousePathAmortizedCost = "/lakehouse/default/Files/azure-Usages/amortizedcost"
ReservationPath = "/lakehouse/default/Files/azure/reservations"
SavingPlansPath = "/lakehouse/default/Files/azure/saving-plans"
LakehouseSubscriptionsPath = "lakehouse01.StagingdimAzSubscriptions"
ResourceTagsPath = "/lakehouse/default/Files/azure/resource-import-tags"
SubscriptionTagsPath = "/lakehouse/default/Files/azure/subscription-tags"

StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 29, Finished, Available, Finished)

# Get Azure Key Vault Keys

In [28]:
app_client_id = mssparkutils.credentials.getSecret(f"https://{key_vault_name}.vault.azure.net/", kvclientid_1)

app_client_secret = mssparkutils.credentials.getSecret(f"https://{key_vault_name}.vault.azure.net/", kvclientkey_1)

microsoft_tenant_id = mssparkutils.credentials.getSecret(f"https://{key_vault_name}.vault.azure.net/", kvtenantid_1)


StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 30, Finished, Available, Finished)

#  Get Access Token

In [29]:
TOKEN_CACHE = {
    "access_token": None,
    "expires_at": 0
}

TOKEN_LOCK = threading.Lock()

def get_access_token():
    now = time.time()

    # Token noch g√ºltig (5 Min Puffer)
    if TOKEN_CACHE["access_token"] and now < TOKEN_CACHE["expires_at"] - 300:
        return TOKEN_CACHE["access_token"]

    with TOKEN_LOCK:
        if TOKEN_CACHE["access_token"] and now < TOKEN_CACHE["expires_at"] - 300:
            return TOKEN_CACHE["access_token"]

        uri = f"https://login.microsoftonline.com/{microsoft_tenant_id}/oauth2/token"

        data = {
            "client_id": app_client_id,
            "grant_type": "client_credentials",
            "client_secret": app_client_secret,
            "resource": "https://management.core.windows.net"
        }

        response = requests.post(uri, data=data)
        response.raise_for_status()

        token_response = response.json()
        TOKEN_CACHE["access_token"] = token_response["access_token"]
        TOKEN_CACHE["expires_at"] = now + int(token_response["expires_in"])

        return TOKEN_CACHE["access_token"]


def get_headers():
    return {
        "Authorization": f"Bearer {get_access_token()}",
        "Content-Type": "application/json"
    }


StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 31, Finished, Available, Finished)

# Get Azure Subscriptions

In [None]:
# Die API-URL
url = "https://management.azure.com/subscriptions?api-version=2022-12-01"

# API-Anfrage senden
response = requests.get(url, headers=get_headers())

# √úberpr√ºfen, ob die Anfrage erfolgreich war
if response.status_code == 200:
    data = response.json()

    if 'value' in data:
        df_pandas = pd.json_normalize(data['value'])
    else:
        df_pandas = pd.DataFrame(data)

    required_columns = [
        'id',
        'subscriptionId',
        'tenantId',
        'displayName',
        'state',
        'subscriptionPolicies.quotaId'
    ]
    df_pandas = df_pandas[required_columns]

    df_pandas.columns = ['value.' + col for col in df_pandas.columns]

    df_spark = spark.createDataFrame(df_pandas)

    print("Daten erfolgreich in PySpark DataFrame geladen.")

else:
    print(f"Fehler: {response.status_code}, {response.text}")


#display(df_spark)


#  Write Delta Table StagingdimAzSubscriptions

In [31]:
# Pfad zur Delta Table
delta_table_path_stagingdimazsubscriptions = "Tables/StagingdimAzSubscriptions"

# Schreiben in die Delta Table
df_spark.write.format("delta").mode("overwrite").save(delta_table_path_stagingdimazsubscriptions)

print("Daten erfolgreich in die Delta Table geschrieben.")


StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 33, Finished, Available, Finished)

Daten erfolgreich in die Delta Table geschrieben.


# Get Azure Consumption

In [None]:
# -------------------------------------------------
# Hilfsfunktion: Zeitr√§ume ermitteln
# -------------------------------------------------
def get_month_periods(months_back=None):
    today = datetime.now()
    periods = []

    # ---------------------------------------------
    # OPTIONAL: letzte X Monate
    # ---------------------------------------------
    if months_back is not None:
        for i in range(months_back):
            ref_date = today.replace(day=1) - DateOffset(months=i)
            start_date = ref_date.to_pydatetime()

            last_day = calendar.monthrange(start_date.year, start_date.month)[1]

            # Aktueller Monat nur bis heute
            if i == 0:
                end_date = today
            else:
                end_date = start_date.replace(day=last_day)

            periods.append((start_date, end_date))

        return periods

    # ---------------------------------------------
    # STANDARD: bisheriges Verhalten
    # ---------------------------------------------
    start_current = today.replace(day=1)
    end_current = today
    periods.append((start_current, end_current))

    if 1 <= today.day <= 6:
        last_month_date = start_current - timedelta(days=1)
        start_last_month = last_month_date.replace(day=1)
        last_day_of_last_month = calendar.monthrange(
            start_last_month.year, start_last_month.month
        )[1]
        end_last_month = start_last_month.replace(day=last_day_of_last_month)
        periods.append((start_last_month, end_last_month))

    return periods


# -------------------------------------------------
# Report-Funktion
# -------------------------------------------------
def fetch_report_data(subscriptionid, TenantId, Vertragsart, metric_type, file_path, months_back=None):

    months = get_month_periods(months_back)

    # -------------------------------------------------
    # F√ºr alle Zeitr√§ume Report ziehen
    # -------------------------------------------------
    for start_date, end_date in months:
        start_str = start_date.strftime('%Y-%m-%d')
        end_str = end_date.strftime('%Y-%m-%d')
        ym_str = start_date.strftime("%Y-%m")

        body = {
            "metric": metric_type,
            "timePeriod": {
                "start": start_str,
                "end": end_str
            }
        }

        url = (
            f"https://management.azure.com/subscriptions/{subscriptionid}"
            f"/providers/Microsoft.CostManagement/generateCostDetailsReport"
            f"?api-version=2022-05-01"
        )

        # -------------------------------------------------
        # Speicherpfad (immer kompletter Monat)
        # -------------------------------------------------
        first_day_of_month = start_date.replace(day=1)
        last_day_num = calendar.monthrange(start_date.year, start_date.month)[1]
        last_day_of_month = start_date.replace(day=last_day_num)

        period_str = f"{first_day_of_month.strftime('%Y%m%d')}-{last_day_of_month.strftime('%Y%m%d')}"
        date_path = f"{file_path}/{period_str}"
        os.makedirs(date_path, exist_ok=True)

        filename = f"{TenantId}-{Vertragsart}-{subscriptionid}-{ym_str}.csv"
        path = f"{date_path}/{filename}"


        # =================================================
        # üîπ HIER: Monat √ºberspringen wenn bereits vorhanden
        # (NUR wenn months_back gesetzt ist)
        # =================================================
        if months_back is not None and os.path.exists(path):
            print(f"√úbersprungen (bereits vorhanden): {path}")
            continue

        # -------------------------------------------------
        # Schritt 1: Report erstellen
        # -------------------------------------------------
        while True:
            response = requests.post(url, headers=get_headers(), json=body)

            if response.status_code == 204:
                print(f"Keine Inhalte: {subscriptionid} ({start_str} bis {end_str})")
                break

            elif response.status_code == 202:
                break

            elif response.status_code == 429:
                retry_after = response.headers.get("Retry-After")
                wait_time = int(retry_after) if retry_after and retry_after.isdigit() else 30

                if wait_time > 60:
                    print(f"Rate Limit ‚Äì Subscription √ºbersprungen: {subscriptionid}")
                    return

                print(f"Rate Limit ‚Äì warte {wait_time}s")
                time.sleep(wait_time)

            else:
                raise Exception(f"Fehler beim Erstellen des Berichts: {response.status_code}")

        # -------------------------------------------------
        # Schritt 2: Report abrufen & speichern
        # -------------------------------------------------
        attempts = 0
        while attempts < 12:
            check_response = requests.get(response.headers["location"], headers=get_headers())

            if check_response.status_code == 200:
                response_dict = check_response.json()

                if 'manifest' in response_dict and 'blobs' in response_dict['manifest']:
                    blob_link = response_dict['manifest']['blobs'][0]['blobLink']
                    csv_df = pd.read_csv(blob_link, low_memory=False)

                    if not csv_df.empty:
                        csv_df.to_csv(path, index=False)
                        print(f"Gespeichert: {path}")
                    else:
                        print(f"Keine Daten: {subscriptionid} ({start_str} bis {end_str})")
                else:
                    raise Exception("Kein Blob-Link im Response")

                break

            elif check_response.status_code == 204:
                print(f"Keine Inhalte zum Abrufen: {subscriptionid}")
                break

            elif check_response.status_code == 429:
                retry_after = check_response.headers.get("Retry-After")
                wait_time = int(retry_after) if retry_after and retry_after.isdigit() else 30

                if wait_time > 60:
                    print(f"Rate Limit ‚Äì √ºberspringe {subscriptionid}")
                    return

                time.sleep(wait_time)
                attempts += 1

            elif check_response.status_code == 202:
                attempts += 1
                time.sleep(30)

            else:
                raise Exception(f"Fehler beim Abrufen: {check_response.status_code}")

        if attempts >= 8:
            print(f"Maximale Versuche erreicht: {subscriptionid}")


# -------------------------------------------------
# Runner
# -------------------------------------------------
def run_script(metric_type, file_path, months_back=None):
    spark = SparkSession.builder.appName("CostReport").getOrCreate()

    SubscriptionId_df = spark.sql(
        f"""
        SELECT `value.subscriptionId` as SubscriptionId,
               `value.tenantId` as CustomerTenantId,
               `value.subscriptionPolicies.quotaId` as Vertragsart
        FROM {LakehouseSubscriptionsPath}
        WHERE `value.tenantId` = '{microsoft_tenant_id}'
          AND `value.subscriptionPolicies.quotaId` LIKE 'CSP%'
          OR `value.subscriptionPolicies.quotaId`LIKE 'Pay%'
          OR `value.subscriptionPolicies.quotaId`LIKE 'Enterprise%'
        """
    )

    SubscriptionId_list = [
        [row["SubscriptionId"], row["CustomerTenantId"], row["Vertragsart"]]
        for row in SubscriptionId_df.collect()
    ]

    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_subscription = {
            executor.submit(
                fetch_report_data,
                subscriptionid,
                TenantId,
                Vertragsart,
                metric_type,
                file_path,
                months_back
            ): subscriptionid
            for subscriptionid, TenantId, Vertragsart in SubscriptionId_list
        }

        for future in as_completed(future_to_subscription):
            try:
                future.result()
            except Exception as exc:
                print(f"Fehler bei {future_to_subscription[future]}: {exc}")


# -------------------------------------------------
# Hauptaufrufe
# -------------------------------------------------

# üîπ Standard (wie bisher)
run_script("ActualCost", LakehousePathActualCost)
run_script("AmortizedCost", LakehousePathAmortizedCost)

# üîπ Optional (z. B. letzte 6 Monate)
#run_script("ActualCost", LakehousePathActualCost, months_back=12)
#run_script("AmortizedCost", LakehousePathAmortizedCost, months_back=12)


# Get Azure Reservations

In [33]:
# API-Version f√ºr Azure Resource Manager
api_version = '2022-11-01'  # Alternativ zu '2021-10-01' ausprobieren

# Header mit Bearer Token
#headers = {
    #'Authorization': f'Bearer {token}',
    #'Content-Type': 'application/json'
#}

# Zielpfad f√ºr die CSV-Datei
destination_path = ReservationPath
csv_filename = os.path.join(destination_path, f'Reservations-{microsoft_tenant_id}.csv')

# √úberpr√ºfen, ob das Zielverzeichnis existiert, falls nicht, erstelle es
if not os.path.exists(destination_path):
    os.makedirs(destination_path, exist_ok=True)
os.chdir(destination_path)
print(os.getcwd())
# URL f√ºr Azure Reservations
reservations_url = f'https://management.azure.com/providers/Microsoft.Capacity/reservations?api-version={api_version}'

# Funktion zum "Flachmachen" von verschachtelten JSON-Objekten
def flatten_json(nested_json, parent_key='', sep='.'):
    flattened = {}
    for key, value in nested_json.items():
        new_key = f'{parent_key}{sep}{key}' if parent_key else key
        if isinstance(value, dict):
            flattened.update(flatten_json(value, new_key, sep=sep))
        elif isinstance(value, list):
            for i, item in enumerate(value):
                if isinstance(item, dict):
                    flattened.update(flatten_json(item, f'{new_key}[{i}]', sep=sep))
                else:
                    flattened[f'{new_key}[{i}]'] = item
        else:
            flattened[new_key] = value
    return flattened

# Funktion zum Abrufen aller Seiten mit Paginierung
def fetch_all_reservations_with_pagination(initial_url):
    all_data = []
    current_url = initial_url
    while current_url:
        response = requests.get(current_url, headers=get_headers()) # NEW get_headers() #
        if response.status_code == 200:
            result = response.json()
            if 'value' in result:
                all_data.extend(result['value'])
            current_url = result.get('nextLink')  # N√§chsten Link abrufen, falls vorhanden
        else:
            print(f"Fehler: {response.status_code}, Nachricht: {response.text}")
            current_url = None  # Abbruch bei Fehler
    return all_data

# Reservations abrufen und direkt in CSV-Datei schreiben
try:
    reservations = fetch_all_reservations_with_pagination(reservations_url)
    
    print("Statuscode:", 200)  # Statuscode f√ºr erfolgreiche Abfrage
    if reservations:
        print("Reservierungen erfolgreich abgerufen.")
        
        # Extrahiere die Daten und schreibe sie direkt in eine CSV-Datei
        data = reservations
        
        # √úberpr√ºfen, ob es Reservierungen gibt
        if data:
            # Flache Liste vorbereiten
            flattened_data = [flatten_json(item) for item in data]
            
            # F√ºge die Filename-Spalte hinzu
            for item in flattened_data:
                item['Filename'] = os.path.basename(csv_filename)  # Nur den Dateinamen ohne Pfad
            
            # Erstelle die CSV-Datei und schreibe die Daten
            with open(csv_filename, mode='w', newline='', encoding='utf-8') as csv_file:
                # CSV Header: alle einzigartigen Keys aus den flachen Daten
                fieldnames = set()
                for item in flattened_data:
                    fieldnames.update(item.keys())
                
                # List of columns to check and create if not exist
                cols_to_check = [
                    'properties.utilization.aggregates[0].value', 
                    'properties.utilization.aggregates[1].value', 
                    'properties.utilization.aggregates[2].value', 
                    'properties.utilization.aggregates[0].valueUnit',
                    'properties.utilization.aggregates[1].valueUnit', 
                    'properties.utilization.aggregates[2].valueUnit', 
                    'properties.utilization.aggregates[0].grain', 
                    'properties.utilization.aggregates[1].grain',
                    'properties.utilization.aggregates[2].grain', 
                    'properties.utilization.aggregates[0].grainUnit', 
                    'properties.utilization.aggregates[1].grainUnit', 
                    'properties.utilization.aggregates[2].grainUnit',
                    'properties.billingPlan',
                    'properties.utilization.trend',
                    'properties.appliedScopeProperties.subscriptionId',
                    'properties.appliedScopeProperties.resourceGroupId',
                    'properties.appliedScopeProperties.TenantId',
                    'properties.appliedScopeProperties.ManagementGroupId',
                    'properties.appliedScopeProperties.displayName'
                ]

                # Create columns with default values if they don't exist
                for col_name in cols_to_check:
                    if col_name not in fieldnames:
                        fieldnames.add(col_name)
                        # Set default values based on the column name
                       
                        if col_name in ['properties.utilization.aggregates[0].value', 'properties.utilization.aggregates[1].value', 'properties.utilization.aggregates[2].value']:
                            default_value = 0
                        elif col_name in ['properties.utilization.aggregates[0].valueUnit', 'properties.utilization.aggregates[1].valueUnit', 'properties.utilization.aggregates[2].valueUnit']:
                            default_value = 'percentage'
                        elif col_name in ['properties.utilization.aggregates[0].grain', 'properties.utilization.aggregates[1].grain', 'properties.utilization.aggregates[2].grain']:
                            default_value = '0'
                        elif col_name in ['properties.utilization.aggregates[0].grainUnit', 'properties.utilization.aggregates[1].grainUnit', 'properties.utilization.aggregates[2].grainUnit']:
                            default_value = 'days'
                        elif col_name in ['properties.billingPlan']:
                            default_value = 'N/A'
                        elif col_name in ['properties.utilization.trend']:
                            default_value = 'N/A'     
                        else:
                            default_value = None
                        
                        # Update each record with the default value only if the value is NULL or empty
                        for item in flattened_data:
                                item[col_name] = default_value  # Set the default value
                # CSV schreiben
                writer = csv.DictWriter(csv_file, fieldnames=sorted(fieldnames))  # Verwende die Keys als Header
                writer.writeheader()
                writer.writerows(flattened_data)
            
            print(f"CSV-Datei erfolgreich im Pfad {csv_filename} erstellt.")
        else:
            print("Keine Reservierungsdaten zum Schreiben in die CSV-Datei vorhanden.")
    else:
        print("Keine Reservierungen gefunden.")
except Exception as e:
    raise e
    print(f"Ein Fehler ist aufgetreten: {e}")


StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 35, Finished, Available, Finished)

/lakehouse/default/Files/azure/reservations
Statuscode: 200
Reservierungen erfolgreich abgerufen.
CSV-Datei erfolgreich im Pfad /lakehouse/default/Files/azure/reservations/Reservations-[REDACTED].csv erstellt.


# Get Azure Saving Plans

In [34]:
# API-Version f√ºr Azure Resource Manager
api_version = '2022-11-01'  # Alternativ zu '2021-10-01' ausprobieren

# Header mit Bearer Token
#headers = {
    #'Authorization': f'Bearer {token}',
    #'Content-Type': 'application/json'
#}

# Zielpfad f√ºr die CSV-Datei
destination_path = SavingPlansPath
csv_filename = os.path.join(destination_path, f'Saving-Plan-{microsoft_tenant_id}.csv')

# √úberpr√ºfen, ob das Zielverzeichnis existiert, falls nicht, erstelle es
if not os.path.exists(destination_path):
    os.makedirs(destination_path, exist_ok=True)

# URL f√ºr Azure Saving
savingplan_url = f'https://management.azure.com/providers/Microsoft.BillingBenefits/savingsPlans?api-version={api_version}'

# Funktion zum "Flachmachen" von verschachtelten JSON-Objekten
def flatten_json(nested_json, parent_key='', sep='.'):
    flattened = {}
    for key, value in nested_json.items():
        new_key = f'{parent_key}{sep}{key}' if parent_key else key
        if isinstance(value, dict):
            flattened.update(flatten_json(value, new_key, sep=sep))
        elif isinstance(value, list):
            for i, item in enumerate(value):
                if isinstance(item, dict):
                    flattened.update(flatten_json(item, f'{new_key}[{i}]', sep=sep))
                else:
                    flattened[f'{new_key}[{i}]'] = item
        else:
            flattened[new_key] = value
    return flattened

# Saving abrufen und direkt in CSV-Datei schreiben
try:
    savingplans_response = requests.get(savingplan_url, headers=get_headers()) # NEW get_headers() #
    
    print("Statuscode:", savingplans_response.status_code)  # Statuscode ausgeben
    if savingplans_response.status_code == 200:
        savingplan = savingplans_response.json()  # Gesamte Antwort als JSON-Objekt speichern
        print("Saving Plan erfolgreich abgerufen.")
        
        # Extrahiere die Daten und schreibe sie direkt in eine CSV-Datei
        if 'value' in savingplan:  # Annahme: die Reservierungen sind im "value"-Feld enthalten
            data = savingplan['value']
            
            # √úberpr√ºfen, ob es Saving Plans gibt
            if data:
                # Flache Liste vorbereiten
                flattened_data = [flatten_json(item) for item in data]
                
                # F√ºge die Filename-Spalte hinzu
                for item in flattened_data:
                    item['Filename'] = os.path.basename(csv_filename)  # Nur den Dateinamen ohne Pfad
                
                # CSV Header: alle einzigartigen Keys aus den flachen Daten
                fieldnames = set()
                for item in flattened_data:
                    fieldnames.update(item.keys())
                
                # List of columns to check and create if not exist
                cols_to_check = [
                    'Filename',
                    'id',
                    'name',
                    'properties.appliedScopeProperties.displayName',
                    'properties.appliedScopeProperties.subscriptionId',
                    'properties.appliedScopeProperties.resourceGroupId',
                    'properties.appliedScopeProperties.TenantId',
                    'properties.appliedScopeProperties.ManagementGroupId',
                    'properties.appliedScopeType',
                    'properties.benefitStartTime',
                    'properties.billingAccountId',
                    'properties.billingPlan',
                    'properties.billingProfileId',
                    'properties.billingScopeId',
                    'properties.commitment.amount',
                    'properties.commitment.currencyCode',
                    'properties.commitment.grain',
                    'properties.customerId',
                    'properties.displayName',
                    'properties.displayProvisioningState',
                    'properties.effectiveDateTime',
                    'properties.expiryDateTime',
                    'properties.provisioningState',
                    'properties.purchaseDateTime',
                    'properties.renew',
                    'properties.term',
                    'properties.userFriendlyAppliedScopeType',
                    'properties.utilization.aggregates[0].grain',
                    'properties.utilization.aggregates[0].grainUnit',
                    'properties.utilization.aggregates[0].value',
                    'properties.utilization.aggregates[0].valueUnit',
                    'properties.utilization.aggregates[1].grain',
                    'properties.utilization.aggregates[1].grainUnit',
                    'properties.utilization.aggregates[1].value',
                    'properties.utilization.aggregates[1].valueUnit',
                    'properties.utilization.aggregates[2].grain',
                    'properties.utilization.aggregates[2].grainUnit',
                    'properties.utilization.aggregates[2].value',
                    'properties.utilization.aggregates[2].valueUnit',
                    'properties.utilization.trend',
                    'sku.name',
                    'type'
                ]

                # Create columns with default values if they don't exist
                for col_name in cols_to_check:
                    if col_name not in fieldnames:
                        fieldnames.add(col_name)
                        # Set default values based on the column name
                       
                        if col_name.startswith('properties.utilization.aggregates'):
                            if 'value' in col_name:
                                default_value = 0
                            elif 'valueUnit' in col_name:
                                default_value = 'percentage'
                            elif 'grain' in col_name:
                                default_value = '0'
                            elif 'grainUnit' in col_name:
                                default_value = 'days'
                        elif col_name == 'properties.billingPlan' or col_name == 'properties.utilization.trend':
                            default_value = 'N/A'
                        else:
                            default_value = None
                        
                        # Update each record with the default value only if the value is NULL or empty
                        for item in flattened_data:
                            item[col_name] = default_value
                
                # Schema ausgeben
                print("Schema der CSV-Datei (Spaltennamen):")
                for index, fieldname in enumerate(sorted(fieldnames), start=1):
                    print(f"{index}. {fieldname}")
                
                # Erstelle die CSV-Datei und schreibe die Daten
                with open(csv_filename, mode='w', newline='', encoding='utf-8') as csv_file:
                    writer = csv.DictWriter(csv_file, fieldnames=sorted(fieldnames))  # Verwende die Keys als Header
                    writer.writeheader()
                    writer.writerows(flattened_data)
                
                print(f"CSV-Datei erfolgreich im Pfad {csv_filename} erstellt.")
            else:
                print("Keine Reservierungsdaten zum Schreiben in die CSV-Datei vorhanden.")
        else:
            print("Die JSON-Antwort enth√§lt kein 'value'-Feld.")
    
    elif savingplans_response.status_code == 403:
        print("Keine Saving Plans vorhanden oder kein Zugriff.")
    
    else:
        print(f"Fehler beim Abrufen der Saving Plans: {savingplans_response.status_code}")
        print("Fehlermeldung:", savingplans_response.text)  # Zeige die Fehlermeldung an

except Exception as e:
    print(f"Ein Fehler ist aufgetreten: {e}")

StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 36, Finished, Available, Finished)

Statuscode: 200
Saving Plan erfolgreich abgerufen.
Keine Reservierungsdaten zum Schreiben in die CSV-Datei vorhanden.


# Get Azure Resource Tags and ResourceGroup Tags

In [35]:
# API-Version f√ºr Azure Resource Manager
api_version = '2021-04-01'

# Logging konfigurieren
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Header mit Bearer Token (Token hier einf√ºgen)
#headers = {
    #'Authorization': f'Bearer {token}',
    #'Content-Type': 'application/json'
#}

def perform_request_with_retries(url, method="GET", retries=3, backoff_factor=2, max_retry_after=60, **kwargs):
    """F√ºhrt eine HTTP-Anfrage mit automatischen Wiederholungen bei Rate-Limit-Fehlern (429) aus."""
    for attempt in range(retries):
        try:
            response = requests.request(method, url, **kwargs)
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", backoff_factor))
                retry_after = min(retry_after, max_retry_after)  # Max. 1 Minute warten
                logger.warning(f"Rate limit erreicht. Wiederhole in {retry_after} Sekunden (Versuch {attempt + 1} von {retries})...")
                time.sleep(retry_after)
            else:
                response.raise_for_status()
                return response
        except requests.RequestException as e:
            logger.error(f"Fehler bei der Anfrage an {url}: {e}")
            if attempt < retries - 1:
                sleep_time = backoff_factor * (2 ** attempt)
                logger.info(f"Warte {sleep_time} Sekunden vor erneutem Versuch...")
                time.sleep(sleep_time)
            else:
                logger.error("Maximale Anzahl von Wiederholungen erreicht.")
                raise

    return None

def get_resource_groups(subscription_id):
    """Ruft alle Resource Groups f√ºr eine Subscription ab."""
    resource_groups_url = f'https://management.azure.com/subscriptions/{subscription_id}/resourcegroups?api-version={api_version}'
    response = perform_request_with_retries(resource_groups_url, headers=get_headers(), timeout=10) # NEW get_headers() #
    if response:
        return response.json().get('value', [])
    return []

def get_rg_tags(subscription_id, resource_group_name):
    """Ruft die Tags f√ºr eine Resource Group ab."""
    rg_tags_url = f"https://management.azure.com/subscriptions/{subscription_id}/resourcegroups/{resource_group_name}?api-version={api_version}"
    response = perform_request_with_retries(rg_tags_url, headers=get_headers(), timeout=10) # NEW get_headers() #
    if response:
        return response.json().get('tags', {})
    return {}

def write_to_csv(subscription_id, csv_data):
    """Schreibt die CSV-Daten in eine Datei."""
    destination_path = "/lakehouse/default/Files/azure/resource-import-tags"
    os.makedirs(destination_path, exist_ok=True)
    csv_filename = os.path.join(destination_path, f'ResourceTags-{subscription_id}.csv')

    if csv_data:
        try:
            with open(csv_filename, mode='w', newline='') as file:
                writer = csv.DictWriter(file, fieldnames=['ResourceGroup', 'ResourceId', 'ResourceName', 'TagKey', 'TagValue'], delimiter=';')
                writer.writeheader()
                writer.writerows(csv_data)
            logger.info(f"CSV-Datei erfolgreich im Pfad {csv_filename} erstellt.")
        except Exception as e:
            logger.error(f"Fehler beim Schreiben der CSV-Datei f√ºr Subscription {subscription_id}: {e}")
    else:
        logger.info(f"Keine Daten zum Schreiben in die CSV-Datei f√ºr Subscription {subscription_id} gefunden.")

def process_subscription(subscription):
    subscription_id = subscription['subscriptionId']
    logger.info(f"Processing Subscription ID: {subscription_id}, Name: {subscription['displayName']}")

    csv_data = []
    resource_groups = get_resource_groups(subscription_id)

    if not resource_groups:
        logger.info(f"Keine Resource Groups f√ºr Subscription {subscription_id} gefunden.")
        return

    for resource_group in resource_groups:
        logger.info(f"Resource Group: {resource_group['name']}")

        rg_tags = get_rg_tags(subscription_id, resource_group['name'])

        if not rg_tags:
            logger.info(f"Keine Tags f√ºr Resource Group {resource_group['name']} gefunden.")
        else:
            for key, value in rg_tags.items():
                csv_data.append({
                    'ResourceGroup': resource_group['name'],
                    'ResourceId': resource_group['id'],
                    'ResourceName': 'KEINE RESSOURCE',
                    'TagKey': key,
                    'TagValue': value
                })
                logger.info(f"Tag f√ºr Resource Group hinzugef√ºgt: {key} = {value}")

    write_to_csv(subscription_id, csv_data)

def get_subscriptions():
    """Ruft alle Subscriptions ab."""
    subscriptions_url = f'https://management.azure.com/subscriptions?api-version={api_version}'
    response = perform_request_with_retries(subscriptions_url, headers=get_headers(), timeout=10) # NEW get_headers() #
    if response:
        return response.json().get('value', [])
    return []

def main():
    subscriptions = get_subscriptions()

    if not subscriptions:
        logger.info("Keine Subscriptions gefunden.")
        return

    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(process_subscription, sub): sub for sub in subscriptions}

        for future in as_completed(futures):
            subscription = futures[future]
            try:
                future.result()
            except Exception as e:
                logger.error(f"Fehler bei der Verarbeitung von Subscription {subscription['subscriptionId']}: {e}")

if __name__ == "__main__":
    main()


StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 37, Finished, Available, Finished)

2026-02-02 14:44:00,371 - INFO - Processing Subscription ID: c7b39f3a-054f-456d-a883-83b7a00ad1cd, Name: Recucare Microsoft Azure
2026-02-02 14:44:00,384 - INFO - Processing Subscription ID: f1359d62-957b-43b5-b106-5d874b4e91be, Name: MCMS RECU - 001
2026-02-02 14:44:00,607 - INFO - Resource Group: rg-BusinessCentralIntegration
2026-02-02 14:44:00,750 - INFO - Resource Group: rg-recu-vdworkspace-p1-ts1-mde01
2026-02-02 14:44:01,191 - INFO - Keine Tags f√ºr Resource Group rg-recu-vdworkspace-p1-ts1-mde01 gefunden.
2026-02-02 14:44:01,192 - INFO - Resource Group: rg-recu-vdbackup01-mde01
2026-02-02 14:44:01,845 - INFO - Tag f√ºr Resource Group hinzugef√ºgt: responsible = vd
2026-02-02 14:44:01,848 - INFO - Resource Group: rg-customer-foundry-mde01
2026-02-02 14:44:01,919 - INFO - Keine Tags f√ºr Resource Group rg-BusinessCentralIntegration gefunden.
2026-02-02 14:44:01,928 - INFO - Resource Group: VisualStudioOnline-EEE1CD28FEAF4380AB6AE5FAB3725DE9
2026-02-02 14:44:02,238 - INFO - Keine 

# Get Azure Subcription Tags

In [36]:
# API-Version f√ºr Azure Resource Manager
api_version = '2021-04-01'

# Header mit Bearer Token
#headers = {
    #'Authorization': f'Bearer {token}',
    #'Content-Type': 'application/json'
#}

# Zielpfad f√ºr die CSV-Datei (z.B. Data Lake House Pfad)
destination_path = "/lakehouse/default/Files/azure/subscription-tags"
csv_filename = os.path.join(destination_path, f'SubscriptionTags-{microsoft_tenant_id}.csv')

# √úberpr√ºfen, ob der Pfad existiert, falls nicht, erstelle ihn
os.makedirs(destination_path, exist_ok=True)

# Alle Subscriptions abrufen
subscriptions_url = f'https://management.azure.com/subscriptions?api-version={api_version}'
subscriptions_response = requests.get(subscriptions_url, headers=get_headers()) # NEW get_headers() #

# CSV-Daten f√ºr alle Subscriptions speichern
csv_data = []

if subscriptions_response.status_code == 200:
    subscriptions = subscriptions_response.json()['value']
    
    if subscriptions:
        for subscription in subscriptions:
            subscription_id = subscription['subscriptionId']

            # Tags f√ºr die Subscription abrufen
            tags_url = f"https://management.azure.com/subscriptions/{subscription_id}/providers/Microsoft.Resources/tags/default?api-version={api_version}"
            tags_response = requests.get(tags_url, headers=get_headers()) # NEW get_headers() #
            
            if tags_response.status_code == 200 and 'tags' in tags_response.json()['properties']:
                tags = tags_response.json()['properties']['tags']

                # Tags zur CSV-Datenliste hinzuf√ºgen
                for key, value in tags.items():
                    csv_data.append({
                        'Id': subscription_id,
                        'TagKey': key,
                        'Value': value
                    })

# √úberpr√ºfen, ob Daten zum Schreiben vorhanden sind
if csv_data:
    # CSV-Datei direkt im Zielpfad erstellen
    with open(csv_filename, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=['Id', 'TagKey', 'Value'], delimiter=',')
        writer.writeheader()
        writer.writerows(csv_data)
    print(f"CSV-Datei erfolgreich im Pfad {csv_filename} erstellt.")
else:
    print("Keine Tags gefunden, CSV-Datei wurde nicht erstellt.")


StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 38, Finished, Available, Finished)

CSV-Datei erfolgreich im Pfad /lakehouse/default/Files/azure/subscription-tags/SubscriptionTags-[REDACTED].csv erstellt.


# Get Access Token for Graph API

In [37]:
# Die App-ID, das Client-Geheimnis und der Tenant-ID aus dem Key Vault oder der Umgebung
# app_client_id = mssparkutils.credentials.getSecret(f"https://{key_vault_name}.vault.azure.net/", kvclientid_1)
# app_client_secret = mssparkutils.credentials.getSecret(f"https://{key_vault_name}.vault.azure.net/", kvclientkey_1)
# microsoft_tenant_id = mssparkutils.credentials.getSecret(f"https://{key_vault_name}.vault.azure.net/", kvtenantid_1)

# URL zum Abrufen des Bearer Tokens f√ºr Microsoft Graph API
uri = f"https://login.microsoftonline.com/{microsoft_tenant_id}/oauth2/v2.0/token"

# Erforderliche Parameter f√ºr das Abrufen des Tokens
data = {
    'client_id': app_client_id,
    'client_secret': app_client_secret,
    'grant_type': 'client_credentials',
    'scope': 'https://graph.microsoft.com/.default'  # Scopes f√ºr Microsoft Graph API
}

# API-Aufruf zum Abrufen des Bearer Tokens
response = requests.post(uri, data=data)
response = response.json()

# Setzen von Variablen aus der API-Antwort: access_token und token_type
ms_token_type = response.get('token_type', None)
token = response.get('access_token', None)

if token:
    print("Token erfolgreich abgerufen!")
else:
    print("Fehler beim Abrufen des Tokens:", response)


StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 39, Finished, Available, Finished)

Token erfolgreich abgerufen!


# Abfragen der Microsoft Lizenzen

In [38]:
# CSV-Mapping einlesen
#mapping_path = "abfss://2044bd38-1f90-4cd0-b1b1-5f7a7d26e734@onelake.dfs.fabric.microsoft.com/44b967f6-ac81-40a7-bc53-128091aba32b/Files/m365/product-mapping/sku_mapping.csv"
mapping_path = sku_mapping_path
mapping_df = pd.read_csv(mapping_path)
sku_mapping = dict(zip(mapping_df['skuPartNumber'], mapping_df['DisplayName']))

# Authentifizierung und Setup
headers = {
    'Authorization': f"Bearer {token}",
    'Content-Type': 'application/json'
}

# Lizenzen abrufen
sku_url = 'https://graph.microsoft.com/v1.0/subscribedSkus'

# v1.0 API
response = requests.get(sku_url, headers=headers)
skus = response.json().get('value', [])

#display(skus)

# Daten extrahieren und Display-Namen hinzuf√ºgen
license_data = []
for sku in skus:
    sku_part_number = sku.get('skuPartNumber', '')
    product_name = sku_mapping.get(sku_part_number, sku_part_number)  # Mapping mit Fallback
    license_data.append({
        #'SkuId': sku.get('skuId', ''),
        'ProductName': product_name,
        'AvailableLicenses': sku.get('prepaidUnits', {}).get('enabled', 0),
        'UsedLicences': sku.get('consumedUnits', 0),
        'StillAvailableLicenses': sku.get('prepaidUnits', {}).get('enabled', 0) - sku.get('consumedUnits', 0),
        'Tenant_Id': microsoft_tenant_id  # Tenant ID zu jedem Eintrag hinzuf√ºgen
    })

# DataFrame erstellen & anzeigen
df = pd.DataFrame(license_data)

# Pandas DataFrame in PySpark DataFrame umwandeln
df_sparkms_tenant_licenses = spark.createDataFrame(df)

#display(df_sparkms_tenant_licenses)


StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 40, Finished, Available, Finished)

2026-02-02 14:44:19,858 - INFO - _get_kwargs_from_urls: out:{'account_name': 'onelake', 'account_host': 'onelake.blob.fabric.microsoft.com'}
2026-02-02 14:44:19,863 - INFO - _get_kwargs_from_urls: out:{'account_name': 'onelake', 'account_host': 'onelake.blob.fabric.microsoft.com'}
2026-02-02 14:44:19,871 - INFO - Request URL: 'https://onelake.blob.fabric.microsoft.com/2044bd38-1f90-4cd0-b1b1-5f7a7d26e734/44b967f6-ac81-40a7-bc53-128091aba32b/Files/m365/product-mapping/sku_mapping.csv'
Request method: 'HEAD'
Request headers:
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'a821865e-0045-11f1-a18a-6045bd170581'
    'User-Agent': 'azsdk-python-storage-blob/12.19.0 Python/3.11.8 (Linux-5.15.186.1-1.cm2-x86_64-with-glibc2.35)'
    'Authorization': 'REDACTED'
No body was attached to the request
2026-02-02 14:44:19,934 - INFO - Response status: 200
Response headers:
    'Content-Length': '4379'
    'Content-Type': 'text/p

# Create Staging Table Microsot Tenant Licenses

In [39]:
# Pfad zur Delta Table
delta_table_path_stagingdimazmstenantlicenses = "Tables/StagingdimAzMSTenantLicences"

# Schreiben in die Delta Table
df_sparkms_tenant_licenses.write.format("delta").mode("overwrite").save(delta_table_path_stagingdimazmstenantlicenses)

print("Daten erfolgreich in die Delta Table geschrieben.")

StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 41, Finished, Available, Finished)

Daten erfolgreich in die Delta Table geschrieben.


In [40]:
# === Eingebettetes Lizenz-Mapping ===
mapping_json = {
    "ENTERPRISEPACK": {
        "name": "Microsoft 365 E3",
        "includes": ["Office_Apps", "Exchange_Online", "SharePoint_Online", "Teams",
                     "Intune", "Azure_AD_P1", "PowerApps_Standard", "PowerAutomate_Standard"]
    },
    "E5": {
        "name": "Microsoft 365 E5",
        "includes": ["Office_Apps", "Exchange_Online", "SharePoint_Online", "Teams",
                     "Intune", "Azure_AD_P2", "Defender", "PowerApps_Standard",
                     "PowerAutomate_Standard", "POWER_BI_PRO"]
    },
    "BUSINESS_PREMIUM": {
        "name": "Microsoft 365 Business Premium",
        "includes": ["Office_Apps", "Exchange_Online", "SharePoint_Online", "Teams",
                     "PowerApps_Standard", "PowerAutomate_Standard"]
    },
    "POWER_BI_PRO": {
        "name": "Power BI Pro",
        "includes": ["POWER_BI_PRO"]
    },
    "PBI_PREMIUM_PER_USER": {
        "name": "Power BI Premium per User",
        "includes": ["POWER_BI_PRO", "PowerBI_Premium"]
    },
    "POWERAPPS_PER_USER": {
        "name": "Power Apps per User",
        "includes": ["PowerApps_Premium"]
    },
    "O365_BUSINESS_PREMIUM": {
        "name": "Office 365 Business Premium",
        "includes": ["Office_Apps", "Exchange_Online", "SharePoint_Online", "Teams",
                     "PowerApps_Standard", "PowerAutomate_Standard"]
    }
}

# === CSV-Laden mit Caching ===
mapping_path = (
    "abfss://2044bd38-1f90-4cd0-b1b1-5f7a7d26e734@onelake.dfs.fabric.microsoft.com/"
    "44b967f6-ac81-40a7-bc53-128091aba32b/Files/m365/product-mapping/sku_mapping.csv"
)
mapping_df = pd.read_csv(mapping_path)
sku_mapping = dict(zip(mapping_df["skuPartNumber"], mapping_df["DisplayName"]))

# === Auth-Header vorbereiten ===
session = requests.Session()
session.headers.update({
    'Authorization': f"Bearer {token}",
    'Content-Type': 'application/json'
})

# === Benutzer abfragen (alle Seiten laden) ===
users = []
users_url = 'https://graph.microsoft.com/v1.0/users?$select=id,displayName,userPrincipalName,mail,givenName,surname,companyName,department,userType'
while users_url:
    r = session.get(users_url)
    data = r.json()
    users.extend(data.get('value', []))
    users_url = data.get('@odata.nextLink')

# === Funktionen ===
def get_license_details(user):
    """ Holt Lizenzdetails eines Users mit Fehler-Handling """
    uid = user['id']
    url = f"https://graph.microsoft.com/v1.0/users/{uid}/licenseDetails"
    try:
        resp = session.get(url, timeout=10).json()
    except Exception:
        resp = {}

    licenses = resp.get('value', [])
    results = []

    if not licenses:
        results.append({
            "sku": "None",
            "name": "Keine Lizenz",
            "features": set()
        })
        return uid, results

    for lic in licenses:
        sku = lic.get('skuPartNumber', 'Unbekannt')
        readable_name = sku_mapping.get(sku, sku)
        features = set(mapping_json.get(sku, {}).get("includes", []))
        results.append({
            "sku": sku,
            "name": readable_name,
            "features": features
        })
    return uid, results


# === Lizenzdetails parallel laden (deutlich schneller) ===
user_license_map = {}
with ThreadPoolExecutor(max_workers=20) as executor:  # 20 Threads = ideal f√ºr Graph-API
    futures = {executor.submit(get_license_details, u): u for u in users}
    for f in as_completed(futures):
        uid, licenses = f.result()
        user_license_map[uid] = licenses

# === √úberlizenzierung pr√ºfen ===
user_license_data = []
for user in users:
    uid = user['id']
    display_name = user.get('displayName', '')
    upn = user.get('userPrincipalName', '')
    mail = user.get('mail') or 'N/A'
    given_name = user.get('givenName') or 'N/A'
    surname = user.get('surname', '')
    companyName = user.get('companyName') or 'N/A'
    department = user.get('department') or 'N/A'
    userType = user.get('userType', '')

    licenses = user_license_map.get(uid, [])
    covered_by = {}

    # Feature-Vergleich beschleunigt durch Vorberechnung
    for i, lic in enumerate(licenses):
        features_i = lic["features"]
        if not features_i:
            continue
        for j, other in enumerate(licenses):
            if i == j:
                continue
            if features_i.issubset(other["features"]) and features_i != other["features"]:
                covered_by[lic["name"]] = other["name"]

    for lic in licenses:
        overlic = covered_by.get(lic["name"], "Keine")
        if overlic not in sku_mapping.values():
            overlic = "Keine"

        user_license_data.append({
            'ID': uid,
            'Name': display_name,
            'UPN': upn,
            'License': lic["name"],
            'SKU_Code': lic["sku"],
            'Tenant_Id': microsoft_tenant_id,
            'E-Mail': mail,
            'GivenName': given_name,
            'SurName': surname,
            'CompanyName': companyName,
            'Department': department,
            'UserType': userType,
            'OverLicensing': overlic
        })

# === In DataFrame umwandeln ===
df = pd.DataFrame(user_license_data)
df_sparkms_user_licenses = spark.createDataFrame(df)

#display(df_sparkms_user_licenses)


StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 42, Finished, Available, Finished)

2026-02-02 14:44:27,098 - INFO - _get_kwargs_from_urls: out:{'account_name': 'onelake', 'account_host': 'onelake.blob.fabric.microsoft.com'}
2026-02-02 14:44:27,101 - INFO - _get_kwargs_from_urls: out:{'account_name': 'onelake', 'account_host': 'onelake.blob.fabric.microsoft.com'}
2026-02-02 14:44:27,118 - INFO - Request URL: 'https://onelake.blob.fabric.microsoft.com/2044bd38-1f90-4cd0-b1b1-5f7a7d26e734/44b967f6-ac81-40a7-bc53-128091aba32b/Files/m365/product-mapping/sku_mapping.csv'
Request method: 'HEAD'
Request headers:
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'ac737208-0045-11f1-a18a-6045bd170581'
    'User-Agent': 'azsdk-python-storage-blob/12.19.0 Python/3.11.8 (Linux-5.15.186.1-1.cm2-x86_64-with-glibc2.35)'
    'Authorization': 'REDACTED'
No body was attached to the request
2026-02-02 14:44:27,129 - INFO - Response status: 200
Response headers:
    'Content-Length': '4379'
    'Content-Type': 'text/p

In [41]:
# Pfad zur Delta Table
delta_table_path_stagingdimazmsuserlicenses = "Tables/StagingdimAzMSUserLicenses"

# Schreiben in die Delta Table
df_sparkms_user_licenses.write.format("delta").mode("overwrite").save(delta_table_path_stagingdimazmsuserlicenses)

print("Daten erfolgreich in die Delta Table geschrieben.")

StatementMeta(, 564724c6-dbe5-4a67-9f18-ef0800e8f833, 43, Finished, Available, Finished)

Daten erfolgreich in die Delta Table geschrieben.
