# Expand groups to users

## Config and Imports

In [None]:
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")

# read here https://learn.microsoft.com/en-us/graph/throttling-limits#pattern
getMembersFromGroupResourceUnits = 8000

columnsOfInterest = ['@odata.type', 'id', 'userPrincipalName']

# all the stuff I need
from notebookutils import mssparkutils

from pyspark.sql.types import StringType

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField

from pyspark.sql.functions import col, when

import msal
import requests
import time

## User defined functions

In [None]:
# Function to get an access token
def get_access_token():
    token_response = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
    return token_response

## Read secret from Azure Key Vault

In [None]:
# The Azure Key Vault 
key_vault = "<here goes the url to your azure key vault>"

# The tenant id 
tenant_id = "<here goes the tenant id>"

# The Application Id (Client Id) of the service principal account 
client_id = "here goes the client id that is able to call the graph api"

# Fetching the Client Secret for the service principal account with permissions on the GraphAPI
client_secret = mssparkutils.credentials.getSecret(key_vault , "GraphAPISecret") 

## Create MSAL client application

In [None]:
authority_url = f"https://login.microsoftonline.com/{tenant_id}"

# Create MSAL client application
app = msal.ConfidentialClientApplication(
    client_id, authority=authority_url, client_credential=client_secret
)

## Get the groups from the 'workspaces_scanned_users' table

In [None]:
df = spark.sql("SELECT GraphId FROM FUAM_Lakehouse.workspaces_scanned_users WHERE PrincipalType = 'Group'").distinct()
df = df \
    .withColumn("parentpath", col("GraphId")) \
    .withColumn("theRealAncestor", col("GraphId"))

In [None]:
# the limits described here https://learn.microsoft.com/en-us/graph/throttling-limits
# might to be considered
# https://learn.microsoft.com/en-us/graph/throttling-limits#identity-and-access-service-limits

graph_api_url = "https://graph.microsoft.com/v1.0"

access_token = get_access_token()

getMembersFromGroupResourceUnits_costs = 0

#this converts the dataframe into a simple list
theListOfGroups = df.toPandas().to_dict('records')

if "access_token" in access_token:
    access_token = access_token["access_token"]
    users_from_group = []
    while len(theListOfGroups) > 0:
        
        #print(len(theListOfGroups))
        
        thedict = theListOfGroups[0]

        #Getting the members of a group
        group_id = thedict["GraphId"]
        #print(f"groupid: {group_id}")
        members_url = f"{graph_api_url}/groups/{group_id}/members"
        headers = {"Authorization": f"Bearer {access_token}"}
        response = requests.get(members_url, headers=headers)

        # check the response
        if response.status_code == 200:
            groupmembers = response.json().get("value", [])
            #print(type(groupmembers))
            #Iterating across the groupmembers
            for member in groupmembers:
                    #check if the member is a user or a group
                    member_type = member.get("@odata.type")
                    #if the member is of type user, a dictionary entry is created and added to the outer list object
                    if member_type == "#microsoft.graph.user":
                        user_info = {
                            "graphId_Path": thedict["parentpath"],
                            "graphId_Parent": group_id,
                            "graphId_UserPrincipalName": member.get('userPrincipalName'),
                            "graphId_theRealAncestor": thedict["theRealAncestor"]
                        }
                        users_from_group.append(user_info)
                    elif member_type == "#microsoft.graph.group":
                        parentpath = thedict["parentpath"] + "|" + member.get('id')
                        #forming the dict and appending the initial list of groups
                        theListOfGroups.append({
                            "parentpath": parentpath,
                            "GraphId": member.get('id'),
                            "theRealAncestor": thedict["theRealAncestor"]
                            })
            # this honors the max number of calls assuming the Units threshold is enough, in large organizations this might interfer with
            # other applications leveraging the Graph API, this then requires a more detailed of the response value, 
            # especially if the response.status.code eq 429 (see next elif)
            getMembersFromGroupResourceUnits_costs = getMembersFromGroupResourceUnits_costs + 3
            if getMembersFromGroupResourceUnits - getMembersFromGroupResourceUnits_costs <= 3:
                getMembersFromGroupResourceUnits_costs = 0
                time.sleep(10)
            # remove the current group from the list of groups
            theListOfGroups.pop(0)

        elif response.status_code == 429:
            # this status-code is returned due to too many requests: https://learn.microsoft.com/en-us/graph/throttling
            retry_after = int(response.headers.get("Retry-After", 1))  # Default to 1 second if not provided
            time.sleep(retry_after)
            
        elif response.status_code != 200:
            error_data = response.json().get("error", {})
            error_code = error_data.get("code", "Unknown Error Code")
            if error_code =='Request_ResourceNotFound':
                # remove the current group from the list of groups
                theListOfGroups.pop(0)
            
        # print(f"Error: {error_code}, {response.text}")
        
#display(users_from_group)

### Joining 'workspaces_scanned_users' and 'df_expandedGroups'

In [None]:

schema = StructType([
StructField("graphId_Parent", StringType(), True),
StructField("graphId_Path", StringType(), True),
StructField("graphId_UserPrincipalName", StringType(), True),
StructField("graphId_theRealAncestor", StringType(), True),
])

# Create DataFrame from list
df_expandedGroups = spark.createDataFrame(users_from_group, schema)
#display(df_expandedGroups)

df_workspaces_scanned_users = spark.sql("SELECT * FROM FUAM_Lakehouse.workspaces_scanned_users")
#display(df_workspaces_scanned_users)

df_join = df_workspaces_scanned_users.join(df_expandedGroups, df_workspaces_scanned_users.GraphId == df_expandedGroups.graphId_theRealAncestor, "leftouter")
df_join = df_join \
    .withColumn("UserPrincipalName_expanded", when(df_join.PrincipalType == 'User', df_join.Identifier).otherwise(df_join.graphId_UserPrincipalName))
#display(df_join)

### Writing the dataframe 'df_workspaces_scanned_users_expandedGroups' as delta table 'workspaces_scanned_users_expandedGroups'

In [9]:
df_join.write \
.option("mergeSchema", "true") \
.mode("overwrite") \
.format("delta") \
.saveAsTable("workspaces_scanned_users_expandedGroups")