<a href="https://colab.research.google.com/github/misabhishek/gcp-iam-recommender/blob/main/create_new_roles_based_on_common_usage_pattern.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Goal

Help users to create a custom role based on the permissions' usage of given `principals` in the entire organizations and/or in a set of folders and/or in set of projects.

## Install packages

In [None]:
#@title
! pip install apache-beam[gcp]  

## Import Modules

In [None]:
#@title
from google.colab import auth
import json
import subprocess
import itertools
import concurrent
import logging
import pickle
import os
import sys
import pandas as pd

import IPython
import uuid
from google.colab import output

logging.basicConfig(format="%(levelname)s[%(asctime)s]:%(message)s")

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import PipelineOptions

## Set Configurations

In [None]:
#@title Principal and container specific Configurations

organization_ids = "" #@param {type:"string"}
folder_ids = "" #@param {type:"string"}
project_ids = "projects/gcplearning-230005" #@param {type:"string"}
# do_you_want_get_a_new_custom_role = False #@param {type: "boolean"}
how_many_maximum_predefined_roles_you_want_for_permission_usage = 5 #@param {type:"slider", min:1, max:10, step:1}
principals_whose_permission_usage_you_want_to_analyze = "user:misabhishek@google.com,user:jsytsma@google.com" #@param {type:"string"}
do_you_want_to_analyze_all_service_accounts = False #@param {type: "boolean"}
do_you_want_to_analyze_all_user_accounts = False #@param {type: "boolean"}
do_you_want_to_analyze_all_group_accounts = False #@param {type: "boolean"}
do_you_want_to_analyze_all_project_owner = False #@param {type: "boolean"}
do_you_want_to_analyze_all_project_editor = False #@param {type: "boolean"}
do_you_want_to_analyze_all_project_viewer = False #@param {type: "boolean"}

## Configuration for GCP Dataflow.

> Use [Dataflow](https://cloud.google.com/dataflow) if you have many projects, folders and you want to analyze the permssion utilization across all of them. Dataflow runs the apache-beam pipeline for you in Google Cloud Platform and manage resources for it. It subsequently stores the output of the pipeline into a GCS buckets for you. If you just want to experiment with the colab with few projects, the colab runtime resources should be sufficient for you.

In [None]:
billing_project_id = "xiangwa-playground" #@param {type:"string"}
job_name = "custom-role-creation-job" #@param {type:"string"}
region = "Region where the dataflow job is running." #@param {type:"string"}
staging_location = "gs://xiangwa-playground/staging" #@param {type:"string"}
temp_location = 'gs://xiangwa-playground/temp' #@param {type:"string"}
output_location = 'gs://xiangwa-playground' #@param {type:"string"}

## Setup Dataflow workflow parameters

In [None]:
#@title
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = billing_project_id
google_cloud_options.staging_location = staging_location
google_cloud_options.job_name = job_name
google_cloud_options.region = region
google_cloud_options.temp_location = temp_location
options.view_as(StandardOptions).runner = 'DirectRunner'

## Lets authenticate and set billing project for calling APIs

In [None]:
#@title
auth.authenticate_user()
print('Authenticated')

! gcloud config set project $billing_project_id

## Use IAM Policy Insight and Cloud Asset Inventory API to  get all needed permissions for the principals

In [None]:
#@title
def execute_command(command):
  return json.loads(subprocess.check_output(filter(lambda x: x, command.split(" "))).decode("utf-8"))

get_all_projects_command = """gcloud asset search-all-resources \
                               --asset-types=cloudresourcemanager.googleapis.com/Project \
                               --scope={} --format=json --billing-project={}"""
insight_command = """gcloud recommender insights list \
    --project={} \
    --location=global \
    --insight-type=google.iam.policy.Insight \
    --format=json \
    --billing-project={}"""

def get_all_projects(container_id):
  if container_id.startswith("projects"):
    return [container_id.split("/")[1]]
  try:
    projects = execute_command(get_all_projects_command.format(
        container_id, billing_project_id))
    return [p["additionalAttributes"]["projectId"] for p in projects]
  except:
    logging.warning(f"You don't have permissions to access container:`{container_id}`")
    return []

def get_insights(project_id):
  try: 
    return execute_command(insight_command.format(project_id, billing_project_id))
  except:
    return []

def is_insight_useful(insight):
  member = insight["content"]["member"]
  role = insight["content"]["role"]
  wanted_principals = principals_whose_permission_usage_you_want_to_analyze.split(",") 
  if member in wanted_principals:
    return True
  if do_you_want_to_analyze_all_service_accounts:
    if member.startswith("ServiceAccount:"):
      return True
  if do_you_want_to_analyze_all_user_accounts:
    if member.startswith("User:"):
      return True 
  if do_you_want_to_analyze_all_group_accounts:
    if member.startswith("Group:"):
      return True
  if do_you_want_to_analyze_all_project_owner:
    if role == "roles/owner":
      return True
  if do_you_want_to_analyze_all_project_editor:
    if role == "roles/editor":
      return True
  if do_you_want_to_analyze_all_project_viewer:
    if role == "roles/viewer":
      return True
  return False

def get_needed_permissions(insight):
  permissions = (insight["content"]["exercisedPermissions"] 
                  + insight["content"]["inferredPermissions"])
  return [p["permission"] for p in permissions]

container_ids = (organization_ids.split(",") + folder_ids.split(",")
                + project_ids.split(","))
with beam.Pipeline(options=options) as pipeline:
  needed_permissions = (
      pipeline
      | beam.Create(container_ids)
      | beam.FlatMap(get_all_projects)
      | beam.FlatMap(get_insights)
      | beam.Filter(is_insight_useful)
      | beam.FlatMap(get_needed_permissions)
      | beam.Distinct()
      | beam.io.WriteToText(os.path.join(output_location, "needed_permissions"))
  )

# Get all the recommendation from GCS bucket to current location
!gsutil cp -r $output_location/needed_permissions* /tmp/
needed_permissions = {permission.strip() for permission in 
                      open("/tmp/needed_permissions-00000-of-00001").readlines()
                      }

## Show the needed permissions by members

In [None]:
#@title
import IPython
from google.colab import output

display(IPython.display.HTML('''
    <button id='button'>Show the Needed IAM Pemissions by Members</button>
    <script>
      document.querySelector('#button').onclick = () => {
        google.colab.kernel.invokeFunction('notebook.DisplayNeededPermissions', 
        [], {});
      };
    </script>
    '''))

def DisplayNeededPermissions():
  df_needed_permissions = pd.DataFrame({"needed_permissions" : 
                                        sorted(needed_permissions)})

  display(IPython.display.HTML(df_needed_permissions.to_html()))

output.register_callback('notebook.DisplayNeededPermissions', DisplayNeededPermissions)

## Do you want to create a custom role out of it?

In [None]:
id_of_custom_role = "[Enter-a-unique-id-of-custom-role with pattern '[a-zA-Z0-9_\.]{3,64}']" #@param {type:"string"}
project_to_create_custom_role = "xiangwa-playground" #@param {type:"string"}
title_of_custom_role = "[Custom-role-title]" #@param {type:"string"}
description = "[description-for-custom-role]" #@param {type:"string"}
launch_stage = "ALPHA" #@param ["ALPHA", "BETA", "GA"]

In [None]:
role_id_of_custom_role = "eng_role_4" #@param {type:"string"}
project_to_create_custom_role = "gcplearning-230005" #@param {type:"string"}
title_of_custom_role = "Eng-role" #@param {type:"string"}
description = "Role assigned to all engineerins in my organization." #@param {type:"string"}
launch_stage = "ALPHA" #@param ["ALPHA", "BETA", "GA"]

## Process for creating custom roles

In [None]:
#@title
def print_output(s):
  sys.stdout.write(s + "\n")
  sys.stdout.flush()

def get_testable_permissions(project_id):
  with output.use_tags('remove_outputs'):
    resource_name = f"//cloudresourcemanager.googleapis.com/projects/{project_id}"
    print_output(f"Getting permissions that can be applied on the resource -- {resource_name}.")
    permissions = execute_command(
      f"""gcloud iam list-testable-permissions \
      {resource_name} \
      --format=json""")
    print_output(f"Got {len(permissions)} for the resource -- {resource_name}")
  output.clear(output_tags='remove_outputs')
  return permissions


def user_interactions(project_id, permissions, message):
  with output.use_tags('remove_outputs'):
    print_output(f"{message} at the project: {project_id}")
    print_output("\n".join(permissions))
    print_output("Do you want to create a custom role without these permissions (Y/n)")
    user_response = input()
    while True:
      if (user_response.lower() not in ("n", "no", "y", "yes")):
        print_output("Please enter a valid response.")
        user_response = input()
        continue
      break
  return user_response

def create_cusom_role(project_id : str, needed_permissions):
  testable_permissions = get_testable_permissions(project_id)
  not_granted_permissions = needed_permissions - {
      p["name"] for p in testable_permissions}
  if not_granted_permissions:
    response = user_interactions(project_id, not_granted_permissions, 
                        "The following permissions cannot be granted")
    if response in ("n", "no"):
      print_output("We cannot create a custom role for not-grantable permissions.")
      print_output("skipping the custom role creation process.")
      print_output("Good Bye!")
      return
  # Remoing the not granted permissions.
  needed_permissions = needed_permissions - not_granted_permissions

  permissions_not_supported_by_custom_roles = needed_permissions.intersection({
    p["name"] for p in testable_permissions
    if p.get("customRolesSupportLevel", "SUPPORTED") == "NOT_SUPPORTED"
    })
  
  if permissions_not_supported_by_custom_roles:
    response = user_interactions(project_id, permissions_not_supported_by_custom_roles,
                      "The following permissions are not supported for creating a custom role")
    if response in ("n", "no"):
      print_output("We cannot create a custom role for permissions not supported for custom roles.")
      print_output("skipping the custom role creation process.")
      print_output("Good Bye!")
      return

  # Removing the not supported permissions
  needed_permissions = needed_permissions - permissions_not_supported_by_custom_roles

  permissions_in_test_mode_for_custom_roles = needed_permissions.intersection({
    p["name"] for p in testable_permissions
    if p.get("customRolesSupportLevel", "SUPPORTED") == "TESTING"
    })
  
  if permissions_in_test_mode_for_custom_roles:
    response = user_interactions(project_id, permissions_in_test_mode_for_custom_roles,
                      "The following permissions are in test mode and may not be backward compatible")
    if response in ("n", "no"):
      print_output("We are going to create a custom role for permissions in test mode. Be aware that they may not be backward compatible.")
    else:
      needed_permissions = needed_permissions - permissions_in_test_mode_for_custom_roles

  needed_permissions_string = ",".join(needed_permissions)

  !gcloud iam roles create \
    $role_id_of_custom_role \
    --project=$project_to_create_custom_role \
    --title=$title_of_custom_role \
    --description="$description" \
    --permissions=$needed_permissions_string \
    --stage=$launch_stage
  print_output("Please go to the Pantheon for further inspection of the role. ")
  print_output(f"https://pantheon.corp.google.com/iam-admin/roles/details/projects%3C{project_id}%3Croles%3C{role_id_of_custom_role}?project={project_id}")

In [None]:
create_cusom_role(project_to_create_custom_role, set(needed_permissions))

In [None]:
import time
import sys
from google.colab import output

print('Starting.')

with output.use_tags('some_outputs'):
  sys.stdout.write('working....\n')
  sys.stdout.flush();
  time.sleep(2)

  sys.stdout.write('still working...\n')
  sys.stdout.flush();
  time.sleep(2)

# Now clear the previous outputs.
output.clear(output_tags='some_outputs')
print('All done!')

In [None]:
create_cusom_role("gcplearning-230005", set(needed_permissions))

In [None]:
df = pd.DataFrame(t.values())

In [None]:
!gcloud iam roles create \
 test-role \
 --project="gcplearning-230005" \
 --title="" \
 --description="" \
 --permissions=${needed_permissions \
 --stage="ALPHA"

## Use IAM Role APIs to get all predefined roles

In [None]:
def get_role_permission_mapping(role):
  command = f"gcloud iam roles describe {role} --format=json"
  permissions = execute_command(command).get("includedPermissions", [])
  return json.dumps({role: permissions})

In [None]:
def get_role_names():
  predefined_roles_command = "gcloud iam roles list --format=json"
  predefined_roles = execute_command(predefined_roles_command)
  return [r["name"] for r in predefined_roles]

In [None]:
predefined_roles = get_role_names()

with beam.Pipeline(options=options) as pipeline:
  role_permission_mappings = (
      pipeline
      | beam.Create(predefined_roles)
      | beam.Map(get_role_permission_mapping)
      | beam.io.WriteToText(os.path.join(output_location, "role_to_permission"))
  )

## Find a set of predefined roles



In [None]:
def get_all_role_permission_mappings():
  !gsutil cp -r $output_location/role_to_permission-00000-of-00001 /tmp
  basic_roles = {"roles/owner", "roles/editor", "roles/viewer"}
  role_permission_mapping = {}
  with open("/tmp/role_to_permission-00000-of-00001") as f:
    for line in f:
      for role, permission in json.loads(line).items():
        if role in basic_roles:
          continue
        role_permission_mapping[role] = set(permission)
  return role_permission_mapping

In [None]:
role_permission_mapping = get_all_role_permission_mappings()

In [None]:
class RoleCandidate:
  def __init__(self, needed_permissions_granted, granted_permissons, name):
    self.name = name
    self.granted_permissons = granted_permissons
    self.needed_permissions_granted = needed_permissions_granted

  def __repr__(self):
    return self.name
  
  def __lt__(self, other):
    return (
        (-self.needed_permissions_granted, self.granted_permissons, self.name)
        < (-other.needed_permissions_granted, other.granted_permissons, other.name)
        )
    
  
  def __eq__(self, other):
    return (
        (self.needed_permissions_granted, -self.granted_permissons, self.name)
        == (other.needed_permissions_granted, -other.granted_permissons, other.name)
        )
    
class GetRoles(object):
  def __init__(self, needed_permissions, role_to_permissions, max_number_role):
    self.needed_permissions = needed_permissions
    self.role_to_permissions = role_to_permissions
    self.max_number_role = max_number_role
    self.candidates = self.get_candidates()


  def get_roles(self):
    k = 0
    roles = []
    while (k < self.max_number_role) and (len(self.needed_permissions) > 0):
      optimal_role = min(self.candidates)
      roles.append(optimal_role.name)
      self.update_needed_permissions(optimal_role)
      self.update_candidates(optimal_role)
      k += 1
    return roles


  def get_candidates(self):
    candidates = []
    for role in self.role_to_permissions:
      granted_permissions = self.role_to_permissions[role]
      needed_permissions_granted = len(self.needed_permissions.intersection(
          granted_permissions))
      candidates.append(RoleCandidate(needed_permissions_granted, 
                                      len(granted_permissions),
                                      role))
    return candidates


  def update_needed_permissions(self, optimal_role):
    print(len(self.needed_permissions))
    self.needed_permissions = (self.needed_permissions 
                               - self.role_to_permissions[optimal_role.name])


  def update_candidates(self, optimal_role):
    self.candidates.remove(optimal_role)
    for candidate in self.candidates:
      candidate.needed_permissions_granted = len(
          self.needed_permissions.intersection(
              self.role_to_permissions[candidate.name]))

In [None]:
g = GetRoles(needed_permissions, role_permission_mapping, 5)

In [None]:
d = g.get_roles()

In [None]:
g.needed_permissions

In [None]:
len(needed_permissions)

### Create a custom role using the needed permissions 

In [None]:
all_roles.remove("roles/owner")
all_roles.remove("roles/editor")
all_roles.remove("roles/viewer") 

In [None]:
!gsutil mkdir gs://xiangwa-playground/misabhishek-iam-curated-role

In [None]:
ls

In [None]:
! gsutil ls gs://xiangwa-playground/misabhishek-iam-curated-role/

In [None]:
len(permission_ids)