# Revoke Inactive Gemini Enterprise Licenses

The purpose of this script is to automatically detect user inactivity in the last 30 days and automatically revoke licenses.

## Imports and Configuration

In [13]:
import os
import requests
import json
import google.auth
import google.auth.transport.requests
from google.cloud import bigquery
from datetime import datetime, timedelta
from subprocess import check_output # Must be imported here for the fallback to work

# Define the specific API audience needed for Discovery Engine
DISCOVERY_ENGINE_AUDIENCE = "https://discoveryengine.googleapis.com/"

try:
    # Use standard default authentication (simpler, less prone to library errors)
    creds, project = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"] # Standard scope
    )
    
    auth_req = google.auth.transport.requests.Request()
    creds.refresh(auth_req) # Refresh to ensure a fresh token is used
    
    if not creds.token:
        raise Exception("Standard token fetch failed.")

    AUTH_HEADERS = {
        "Authorization": f"Bearer {creds.token}",
        "Content-Type": "application/json"
    }
    print("Authentication successful with standard token.")

except Exception as e:
    # The standard token failed, proceed to the reliable gcloud fallback
    print(f"Standard authentication failed: {e}")
    print("Attempting fallback authentication using gcloud shell...")
    try:
        # Use gcloud to generate an audience-specific token, which we know works
        token_bytes = check_output(['gcloud', 'auth', 'print-access-token', f'--audiences={DISCOVERY_ENGINE_AUDIENCE}'])
        creds_token = token_bytes.decode().strip()
        
        AUTH_HEADERS = {
            "Authorization": f"Bearer {creds_token}",
            "Content-Type": "application/json"
        }
        print("Authentication successful via gcloud token (FALLBACK).")

    except Exception as fallback_e:
        print(f"Fallback authentication failed: {fallback_e}")
        raise # Re-raise the exception if the fallback also fails
        
# --- Configuration Variables ---
PROJECT_ID = "meera-demos"
PROJECT_NUMBER = "797032279537"
LOCATION = "global"
SUBSCRIPTION_ID = "default_license_config"
USER_STORE_ID = "meera_ent_data"

# Log Analytics table - fully qualified path
LOG_ANALYTICS_TABLE = f"projects/{PROJECT_ID}/datasets/logs_default_analytics/tables/_AllLogs"

# Agentspace API endpoint definitions
LICENSE_API_ENDPOINT = f"https://{LOCATION}-discoveryengine.googleapis.com/v1/projects/{PROJECT_ID}/locations/{LOCATION}/userStores/{USER_STORE_ID}:batchUpdateUserLicenses"
LIST_API_ENDPOINT = f"https://{LOCATION}-discoveryengine.googleapis.com/v1/projects/{PROJECT_ID}/locations/{LOCATION}/userStores/{USER_STORE_ID}/userLicenses"
LICENSE_CONFIG_PATH = f"projects/{PROJECT_NUMBER}/locations/{LOCATION}/licenseConfigs/{SUBSCRIPTION_ID}"

AUTH_HEADERS["X-Goog-User-Project"] = PROJECT_ID

Authentication successful with standard token.


In [6]:
# NEW AUTH BLOCK

import os
import requests
import json
from subprocess import check_output # Used to run shell commands

# --- NEW AUTHENTICATION BLOCK ---
try:
    # Run the gcloud command to get a valid, scoped access token
    # NOTE: This requires the 'gcloud' tool to be installed and authenticated
    token_bytes = check_output(['gcloud', 'auth', 'print-access-token'])
    creds_token = token_bytes.decode().strip()
    
    AUTH_HEADERS = {
        "Authorization": f"Bearer {creds_token}",
        "Content-Type": "application/json"
    }
    print("Authentication successful via gcloud token.")

except Exception as e:
    print(f"Authentication failed using gcloud: {e}")
    raise
# --- END NEW AUTHENTICATION BLOCK ---

# Fill these values based on your project setup... (rest of your code)

PROJECT_ID = "meera-demos"
PROJECT_NUMBER = "797032279537"
LOCATION = "global" 
SUBSCRIPTION_ID = "default_license_config"
USER_STORE_ID = "meera_ent_data"

# Log Analytics table - may need to be configured
#LOG_ANALYTICS_TABLE = "logs_default_analytics._AllLogs"
LOG_ANALYTICS_TABLE = f"projects/{PROJECT_ID}/datasets/logs_default_analytics/tables/_AllLogs"

# Agentspace API endpoint for assigning/revoking licenses
# Agentspace API endpoint for assigning/revoking licenses
LICENSE_API_ENDPOINT = f"https://{LOCATION}-discoveryengine.googleapis.com/v1/projects/{PROJECT_ID}/locations/{LOCATION}/userStores/{USER_STORE_ID}:batchUpdateUserLicenses"

# Agentspace API endpoint for listing licenses
LIST_API_ENDPOINT = f"https://{LOCATION}-discoveryengine.googleapis.com/v1/projects/{PROJECT_ID}/locations/{LOCATION}/userStores/{USER_STORE_ID}/userLicenses"
LICENSE_CONFIG_PATH = f"projects/{PROJECT_NUMBER}/locations/{LOCATION}/licenseConfigs/{SUBSCRIPTION_ID}"

AUTH_HEADERS["X-Goog-User-Project"] = PROJECT_ID

Authentication successful via gcloud token.


## Functions to get user lists
Retrieve two essential lists: **licensed users** and **recently active users**

In [23]:
# OLD - DELETE ME

def get_licensed_users():
    """Fetches and returns a set of all user emails with assigned Agentspace licenses."""
    print("--- Fetching All Licensed Users... ---")
    
    try:
        response = requests.get(LIST_API_ENDPOINT, headers=AUTH_HEADERS)
        response.raise_for_status()
        
        data = response.json()
        
        # Extract userPrincipal from the userLicenses list
        licensed_users = {
            license_info['userPrincipal']
            for license_info in data.get('userLicenses', [])
        }
        
        print(f"Found {len(licensed_users)} total licensed users.")
        return licensed_users

    except requests.exceptions.RequestException as e:
        print(f"Error fetching licensed users list: {e}")
        return set()

def get_active_users(days_inactive=7):
    """Queries BigQuery Log Analytics for users active in the last 'days_inactive'."""
    print(f"\n--- Querying Active Users (Last {days_inactive} days)... ---")
    
    # BigQuery SQL Query - Fixed and working version
    active_query = f"""
    SELECT DISTINCT
        

In [14]:
def get_licensed_users():
    print("--- Fetching All Licensed Users... ---")
    
    try:
        response = requests.get(LIST_API_ENDPOINT, headers=AUTH_HEADERS)
        
        # 1. Check for empty response text *first*
        if not response.text:
            data = {}
        else:
            # 2. Try to parse JSON
            data = response.json()
        
        # 3. NOW check the HTTP status. If the body was empty, we safely set data={} 
        #    and the check here will pass if the status code is a success (200).
        response.raise_for_status() 

        # Extract userPrincipal from the userLicenses list (safe because data is defined)
        licensed_users = {
            license_info['userPrincipal']
            for license_info in data.get('userLicenses', [])
        }
        
        print(f"Found {len(licensed_users)} total licensed users.")
        return licensed_users

    except requests.exceptions.RequestException as e:
        # This now only catches actual request failures, not empty bodies
        print(f"Error fetching licensed users list: {e}")
        return set()

## Determine Inactive Users
Set difference to identify target list for revocation

In [8]:
def determine_inactive_users(licensed_users, active_users):
    """
    Calculates the set of users who have a license but were NOT active recently.
    
    Set Difference: (All Licensed Users) - (All Active Users) = (Inactive Licensed Users)
    """
    
    # Use set difference to find users who are licensed but not in the active set
    inactive_users_to_revoke = licensed_users.difference(active_users)
    
    print("\n--- Inactive User Analysis ---")
    print(f"Total Licensed Users: {len(licensed_users)}")
    print(f"Total Active Users:   {len(active_users)}")
    print(f"üéØ Users to Revoke:    {len(inactive_users_to_revoke)}")
    
    return list(inactive_users_to_revoke)

## License Revocation Function
Send the API call to revoke licenses for the final list of inactive users

In [15]:
def revoke_licenses_for_inactive(inactive_users):
    """Prepares and executes the batch license revocation API call."""
    
    if not inactive_users:
        print("\n--- No inactive users found. Revocation skipped. ---")
        return

    print(f"\n--- REVOKING licenses for {len(inactive_users)} users... ---")
    
    # Prepare the list of users with an empty licenseConfig to revoke
    revoke_entries = [
        {"userPrincipal": user_email, "licenseConfig": ""} 
        for user_email in inactive_users
    ]

    revoke_payload = {
        "inlineSource": {
            "userLicenses": revoke_entries,
            "updateMask": {"paths": ["licenseConfig"]}
        }
    }
    
    try:
        response = requests.post(LICENSE_API_ENDPOINT, headers=AUTH_HEADERS, json=revoke_payload)
        response.raise_for_status()
        
        print(f"‚úÖ Revocation request submitted successfully (Status: {response.status_code}).")
        print("Response JSON:", response.json())
    except requests.exceptions.RequestException as e:
        print(f"Error during revocation API call: {e}")
        if hasattr(e.response, 'text'):
            print(f"Response Text: {e.response.text}")

## Orchestration
Execute your license cleanup policy

In [16]:
print("\n****************************************************")
print("STARTING INACTIVE LICENSE REVOCATION PROCESS")
print("****************************************************")

# 1. Get all currently licensed users
all_licensed = get_licensed_users()

if not all_licensed:
    print("Process aborted: No licenses found to manage.")
else:
    # 2. Get all recently active users (defined as active in the last 30 days)
    recently_active = get_active_users(days_inactive=30)

    # 3. Determine the list of inactive users
    targets_for_revocation = determine_inactive_users(all_licensed, recently_active)

    # 4. Revoke licenses for the inactive users
    # Uncomment the line below when you are ready to execute the revocation:
    # revoke_licenses_for_inactive(targets_for_revocation) 
    
    # For safety, we print the list before running the revocation
    if targets_for_revocation:
        print("\n--- INACTIVE USERS (Ready for Revocation) ---")
        for user in targets_for_revocation:
            print(f"    - {user}")
        print("\nTO REVOKE LICENSES, UNCOMMENT THE 'revoke_licenses_for_inactive' LINE IN THIS BLOCK.")


****************************************************
STARTING INACTIVE LICENSE REVOCATION PROCESS
****************************************************
--- Fetching All Licensed Users... ---
Error fetching licensed users list: 403 Client Error: Forbidden for url: https://global-discoveryengine.googleapis.com/v1/projects/meera-demos/locations/global/userStores/meera_ent_data/userLicenses
Process aborted: No licenses found to manage.


In [4]:
def find_subscription_id_from_user(user_email_to_check):
    """Fetches the license details for a single known user to extract the Subscription ID."""
    print(f"\n--- Checking license for {user_email_to_check} ---")
    # The API endpoint is the same as the list_licenses() function, but we add a filter
    list_endpoint = (
        f"https://{LOCATION}-discoveryengine.googleapis.com/v1/projects/{PROJECT_ID}/"
        f"locations/{LOCATION}/userStores/default_user_store/userLicenses?filter=userPrincipal%3D%22{user_email_to_check}%22"
    )
    
    try:
        response = requests.get(list_endpoint, headers=AUTH_HEADERS)
        response.raise_for_status()
        data = response.json()
        
        # Check if the user has a license
        if data.get('userLicenses'):
            license_path = data['userLicenses'][0]['licenseConfig']
            print("‚úÖ Found License Path:")
            print(f"Path: {license_path}")
            
            # The Subscription ID is the last part of the path
            subscription_id = license_path.split('/')[-1]
            print("\nüåü Your Subscription ID is:")
            print(f"--- {subscription_id} ---")
            return subscription_id
        else:
            print(f"‚ùå Could not find an active license for {user_email_to_check}.")
            return None
            
    except requests.exceptions.RequestException as e:
        print(f"‚ùå Error finding license: {e}")
        return None

# Replace 'known_user@example.com' with an actual email that has a license
find_subscription_id_from_user('known_user@example.com')


--- Checking license for known_user@example.com ---
‚ùå Error finding license: 400 Client Error: Bad Request for url: https://global-discoveryengine.googleapis.com/v1/projects/meera-demos/locations/global/userStores/default_user_store/userLicenses?filter=userPrincipal%3D%22known_user@example.com%22


In [5]:
def list_license_configs():
    """Fetches all available License Configs in the project to extract the Subscription ID."""
    print("\n--- Fetching All License Configurations ---")
    
    # This endpoint is specifically for listing the available license types/IDs
    list_configs_endpoint = (
        f"https://{LOCATION}-discoveryengine.googleapis.com/v1/projects/{PROJECT_ID}/"
        f"locations/{LOCATION}/licenseConfigs"
    )
    
    try:
        response = requests.get(list_configs_endpoint, headers=AUTH_HEADERS)
        response.raise_for_status()
        
        data = response.json()
        
        if data.get('licenseConfigs'):
            print("‚úÖ Found License Configs:")
            
            # Print all available license names (resources)
            for config in data['licenseConfigs']:
                license_path = config.get('name', 'N/A')
                
                # The Subscription ID is the last part of the resource path
                subscription_id = license_path.split('/')[-1]
                
                print(f"  - Resource Name: {license_path}")
                print(f"  - **Subscription ID:** {subscription_id}")
            
            # Return the first found ID for convenience
            return data['licenseConfigs'][0]['name'].split('/')[-1]
        else:
            print("‚ùå No license configurations found in this project.")
            return None
            
    except requests.exceptions.RequestException as e:
        print(f"‚ùå Error listing license configurations: {e}")
        return None

# Run the function to get your ID
found_id = list_license_configs()


--- Fetching All License Configurations ---
‚ùå Error listing license configurations: 404 Client Error: Not Found for url: https://global-discoveryengine.googleapis.com/v1/projects/meera-demos/locations/global/licenseConfigs
