# Remove capabilities from CDF groups

This playbook removes capabilities from IAM groups in Cognite Data Fusion (CDF).
Authentication uses the same method as **Get_Cognite_Groups** (device-code flow first, then interactive).

## Setup path and imports

In [None]:
import sys
from pathlib import Path

# Same as Get_Cognite_Groups: add utils to path
utils_path = (Path("..") / ".." / "utils").resolve()
if str(utils_path) not in sys.path:
    sys.path.insert(0, str(utils_path))

from cognite_auth import device_code_client, interactive_client, CUSTOMER_CONFIGS
from cognite_groups_export import extract_capability_key, get_group_capability_keys
from remove_capabilities import (
    LEGACY_RESOURCE_NAMES,
    capability_keys_to_remove,
    filter_capabilities_for_removal,
    update_group_capabilities,
)
from group_backup_restore import backup_groups_to_archive, DEFAULT_ARCHIVE_DIR

## Parameters

In [None]:
# Customer (must exist in cognite_auth_config.json)
customer = "oxy-oog-dev"

# Which groups to update: None = all groups, or list of group names or IDs
group_names_or_ids = None  # e.g. ["My Group"] or [123, 456]

# Remove capabilities tied to legacy entities (assets, timeseries, events, files, etc.)
remove_legacy = True

# Additionally remove these specific capability keys (resource:action or resource:action:scope)
specific_capability_keys_to_remove = []  # e.g. ["assets:write", "timeseries:read"]

# If True, only print what would be removed; do not call the API
dry_run = True

token_cache_dir = Path.home() / ".cognite" / "token_cache"

## Authenticate and list groups

In [None]:
cache_path = token_cache_dir / f"{customer}.json" if token_cache_dir else None
last_exc = None
for use_device_code in (True, False):
    try:
        if use_device_code:
            client = device_code_client(customer, cache_path)
        else:
            client = interactive_client(customer, cache_path)
        break
    except Exception as e:
        last_exc = e
        if use_device_code:
            print(f"Device-code failed ({e}), trying interactive...")
        continue
else:
    raise last_exc

groups = client.iam.groups.list(all=True)
print(f"Found {len(groups)} groups for {customer}")

if group_names_or_ids is not None:
    by_name = {g.name: g for g in groups}
    by_id = {g.id: g for g in groups}
    target_groups = []
    for x in group_names_or_ids:
        if isinstance(x, int) and x in by_id:
            target_groups.append(by_id[x])
        elif isinstance(x, str) and x in by_name:
            target_groups.append(by_name[x])
        else:
            print(f"Warning: group {x!r} not found")
    groups = target_groups
    print(f"Targeting {len(groups)} groups")
else:
    groups = list(groups)

## Backup current groups

Back up current group permissions to the archive (Excel + JSON) before removing capabilities. Files are timestamped and stored in the archive folder.

In [None]:
groups_by_customer = {customer: list(groups)}
excel_path, json_path = backup_groups_to_archive(groups_by_customer, archive_dir=DEFAULT_ARCHIVE_DIR)
print(f"Excel: {excel_path}")
print(f"JSON:  {json_path}")

## Compute capabilities to remove and update groups

In [None]:
to_remove_set = capability_keys_to_remove(
    legacy_resources=remove_legacy,
    specific_keys=specific_capability_keys_to_remove or None,
    legacy_list=LEGACY_RESOURCE_NAMES,
)

if remove_legacy:
    print("Legacy resources (capabilities for these will be removed):", LEGACY_RESOURCE_NAMES)
if specific_capability_keys_to_remove:
    print("Specific keys to remove:", specific_capability_keys_to_remove)
print()

for group in groups:
    current_keys = get_group_capability_keys(group)
    new_caps = filter_capabilities_for_removal(
        group, to_remove_set, remove_legacy, extract_capability_key
    )
    removed_count = len(getattr(group, "capabilities", None) or []) - len(new_caps)
    if removed_count == 0:
        print(f"Group {group.name!r} (id={group.id}): no matching capabilities to remove")
        continue
    print(f"Group {group.name!r} (id={group.id}): would remove {removed_count} capability(ies)")
    if dry_run:
        print("  [dry run] Skipping API call")
        continue
    try:
        update_group_capabilities(client, group, new_caps)
        print(f"  Updated successfully")
    except Exception as e:
        print(f"  Error: {e}")

if dry_run and any(get_group_capability_keys(g) for g in groups):
    print("\nSet dry_run = False and re-run to apply changes.")