In [None]:
# Import required libraries for Snowflake connectivity and data manipulation
import snowflake.connector
import pandas as pd

In [None]:
def run_query_(query, snowflakeaccount, snowflakeuser, snowflakepassword):
    """
    Execute a SQL query against Snowflake and return the results.
    
    Args:
        query (str): SQL query to execute
        snowflakeaccount (str): Snowflake account identifier
        snowflakeuser (str): Username for authentication
        snowflakepassword (str): Password for authentication
    
    Returns:
        list: Query results as a list of tuples
    """
    with snowflake.connector.connect(account=snowflakeaccount,
                                   user=snowflakeuser,
                                   password=snowflakepassword,
                                   warehouse='COMPUTE_WH',
                                    disable_ocsp_checks=True) as conn:
        with conn.cursor() as cur:
            results = cur.execute(query).fetchall()

    return results

In [None]:
# Snowflake connection parameters
# TODO: Move these to environment variables or secure configuration
user = "a"
password = "b?"
account = "d-c"

In [None]:
# Database to analyze for policy extraction
database = "POLICYDB"

# Query to get all grants (privileges) assigned to roles for the specified database
# This includes grants on schemas, tables, and databases with SELECT, USAGE, or OWNERSHIP privileges
query = f"""            select  PRIVILEGE,
                    GRANTED_ON,
                    "NAME",
                    TABLE_CATALOG,
                    TABLE_SCHEMA,
                    GRANTED_TO,
                    GRANTEE_NAME,
            from    SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_ROLES
            where   GRANTED_ON IN ('SCHEMA', 'TABLE', 'DATABASE') and
                    PRIVILEGE in ('SELECT', 'USAGE', 'OWNERSHIP') and
                    TABLE_CATALOG = '{database}' and
                    DELETED_ON is null"""

# Define column names for the DataFrame
columns =   ["PRIVILEGE",
            "GRANTED_ON",
            "NAME",
            "TABLE_CATALOG",
            "TABLE_SCHEMA",
            "GRANTED_TO",
            "GRANTEE_NAME"]

# Execute query and convert results to DataFrame
results = run_query_(query, account, user, password)
df = pd.DataFrame(results, columns=columns)

# Filter for table-level grants only
tables = df[df['GRANTED_ON'] == 'TABLE']
valid_grants = list()

# Validate each table grant to ensure proper privilege hierarchy
# In Snowflake, to access a table, you need:
# 1. SELECT/OWNERSHIP on the table
# 2. USAGE/OWNERSHIP on the schema containing the table  
# 3. USAGE/OWNERSHIP on the database containing the schema
for row in tables.itertuples():
    table_name = row.NAME
    table_schema = row.TABLE_SCHEMA
    table_catalog = row.TABLE_CATALOG
    role = row.GRANTEE_NAME
    privilege = row.PRIVILEGE
    
    print(f"Table: {table_name}, Schema: {table_schema}, Catalog: {table_catalog}, Role: {role} Is User: {row.GRANTED_TO == 'USER'}")
    
    # Check if the role has USAGE privilege on the schema
    schema_usage = df[(df['NAME'] == table_schema) & (df['TABLE_CATALOG'] == table_catalog) & (df["GRANTEE_NAME"] == role)]    
    schema_usage = schema_usage["PRIVILEGE"].to_list()
    
    # Check if the role has USAGE privilege on the database
    database_usage = df[(df['NAME'] == table_catalog)  & (df["GRANTEE_NAME"] == role)]
    database_usage = database_usage["PRIVILEGE"].to_list()

    # Only include grants that satisfy the complete privilege hierarchy
    if privilege in ["SELECT", "OWNERSHIP"] and \
        ("USAGE" in schema_usage or "OWNERSHIP" in schema_usage) and \
              ("USAGE" in database_usage or "OWNERSHIP" in database_usage):
        grant = {"catalog": table_catalog,
                 "schema": table_schema,
                 "table": table_name,
                 "role_user": role,
                 "is_user": row.GRANTED_TO == "USER"}
        valid_grants.append(grant)

# Convert valid grants to DataFrame for easier manipulation
valid_grants = pd.DataFrame(valid_grants)

# Complex query to get effective roles for users
# This handles both direct role assignments and inherited roles (roles granted to other roles)
query = """with
   -- CTE gets all the roles each role is granted
   ROLE_MEMBERSHIPS(ROLE_GRANTEE, ROLE_GRANTED_THROUGH_ROLE)
   as
    (
    select   GRANTEE_NAME, "NAME"
    from     SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_ROLES
    where    GRANTED_TO = 'ROLE' and
             GRANTED_ON = 'ROLE' and
             DELETED_ON is null
    ),
    -- CTE gets all roles a user is granted
    USER_MEMBERSHIPS(ROLE_GRANTED_TO_USER, USER_GRANTEE, GRANTED_BY)
    as
     (
     select ROLE,
            GRANTEE_NAME,
            GRANTED_BY
     from SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_USERS
     where DELETED_ON is null
     )
-- Get effective roles (including inherited ones) for each user
select
        USER_GRANTEE,
        case
            when ROLE_GRANTED_THROUGH_ROLE is null
                then ROLE_GRANTED_TO_USER 
            else ROLE_GRANTED_THROUGH_ROLE
        end
        EFFECTIVE_ROLE,
from    USER_MEMBERSHIPS U
    left join ROLE_MEMBERSHIPS R
        on U.ROLE_GRANTED_TO_USER = R.ROLE_GRANTEE
;
"""

# Execute query to get user-role mappings (including inherited roles)
results = run_query_(query, account, user, password)
columns = ["USER_GRANTEE", "EFFECTIVE_ROLE"]
user_roles = pd.DataFrame(results, columns=columns)

# Also get direct user-role assignments as a backup/complement
query = """    select 
            GRANTEE_NAME,
             ROLE,
     from SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_USERS
     where DELETED_ON is null"""

results = run_query_(query, account, user, password)
columns = ["GRANTEE_NAME", "ROLE"]
user_roles2 = pd.DataFrame(results, columns=columns)
user_roles2.rename(columns={"GRANTEE_NAME": "USER_GRANTEE", "ROLE": "EFFECTIVE_ROLE"}, inplace=True)

# Combine both direct and inherited role assignments
combined_user_roles = pd.concat([user_roles, user_roles2], ignore_index=True)
combined_user_roles.drop_duplicates(inplace=True)

# Get unique roles that have valid grants
roles = valid_grants['role_user'].unique()
policies = list()

# Build policies by mapping roles to their granted tables and associated users
for role in list(roles):
    print(f"Checking role: {role}")
    valid_grants_per_role = valid_grants[valid_grants['role_user'] == role]

    tables = list()
    for row in valid_grants_per_role.itertuples():
        table_name = row.table
        table_schema = row.schema
        table_catalog = row.catalog
        role = row.role_user
        print(f"Table: {table_name}, Schema: {table_schema}, Catalog: {table_catalog}, Role: {role} Is User: {row.is_user}")
    
    # Find all users who have this role (directly or through inheritance)
    users_for_this_role = combined_user_roles[combined_user_roles['EFFECTIVE_ROLE'] == role]
    if not users_for_this_role.empty:
        print(f"Role {role} is effective for user {users_for_this_role['USER_GRANTEE'].values}")
        tables.append({
            "table": table_name,
            "schema": table_schema,
            "catalog": table_catalog,
        })
    
    # Create policy entry if there are tables and users associated with this role
    if tables:
        policy = {
            "role": role,
            "tables": tables,
            "is_user": row.is_user,
            "users": users_for_this_role['USER_GRANTEE'].values.tolist()
        }
        policies.append(policy)

In [None]:
# Display the extracted policies
# Each policy contains: role, associated tables, whether it's a user or role, and list of users
policies

In [None]:
# Final display of extracted policies for verification
policies