# RLS Copier for Power BI semantic models
This notebook provides an automated way to copy Row-Level Security (RLS) definitions from a reference semantic model to a target semantic model.

**Key Features**
- Copy role members between semantic models
- Support for role definition replacement
- Automatic backups before modifications
- Dry-run mode for safe testing

**Current Limitations**
- Direct Query models and composite models are not tested yet, use with caution!

**Notes**
- âœ¨ Microsoft removed the XMLA download restriction!
- Full automation now works for both Import and Direct Lake models.

**Author**: Neil Chen

**Version**: 1.5.0

**Date**: 2026-03-01

# PARAMETERS

In [144]:
# Target model (where role members will be copied TO)
MODEL = "RLS_Target"
MODEL_WORKSPACE = "zSandbox_Fabric"

# Reference model (where role members will be copied FROM)
REF_MODEL = "RLS_Ref"
REF_MODEL_WORKSPACE = "zSandbox_Fabric"

# Backup configuration
BACKUP_FOLDER = "Neil"  # Folder name or path (e.g., "parent/child")

# Safety: Always start with True!
DRY_RUN = False

# IMPORTS

In [23]:
%pip install -q semantic-link-labs

Note: you may need to restart the kernel to use updated packages.


In [145]:
from dataclasses import dataclass, field
from datetime import datetime, tzinfo
from typing import Literal

import pytz
import pandas as pd
import sempy.fabric as fabric
import notebookutils as nbutl
from sempy_labs.tom import connect_semantic_model
from sempy_labs import get_semantic_model_bim, create_semantic_model_from_bim
from sempy_labs.report import clone_report

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

# HELPER FUNCTIONS

In [146]:
# OS helper functions
def _resolve_workspace(workspace: str) -> dict:
    """Resolve workspace name and ID."""
    workspace_name, workspace_id = fabric.resolve_workspace_name_and_id(workspace)
    return {
        "workspace_name": workspace_name,
        "workspace_id": workspace_id
    }

def _resolve_item(item: str, item_type: str, workspace: str = None) -> dict:
    """Resolve item name and ID."""
    workspace_info = _resolve_workspace(workspace)
    item_name = fabric.resolve_item_name(item, item_type, workspace)
    item_id = fabric.resolve_item_id(item, item_type, workspace)
    return {
        "item_name": item_name,
        "item_id": item_id,
        "workspace_name": workspace_info["workspace_name"],
        "workspace_id": workspace_info["workspace_id"]
    }

def _resolve_folder(folder: str = None, workspace: str = None) -> dict:
    """Resolve folder path and ID."""
    workspace_info = _resolve_workspace(workspace)
    
    if folder is None:
        folder_path = None
        folder_id = None
    else:
        folder_path = fabric.resolve_folder_path(folder, workspace)
        folder_id = fabric.resolve_folder_id(folder, workspace)
    
    return {
        "folder_path": folder_path,
        "folder_id": folder_id,
        "workspace_name": workspace_info["workspace_name"],
        "workspace_id": workspace_info["workspace_id"]
    }

def _now_str(timezone: str) -> str:
    """Get current timestamp string."""
    return datetime.now(pytz.timezone(timezone)).strftime("%Y%m%dT%H%M%S")

# CONFIGURATION CLASSES

In [147]:
@dataclass
class ModelConfig:
    """
    Reference to a semantic model in Fabric.
    
    Automatically resolves workspace and model IDs.
    
    Example:
        model = ModelConfig("Sales_Model", "ModelWorkspace")
    """
    model: str  # Model name or ID
    workspace: str  # Workspace name or ID
    
    # Auto-populated fields
    workspace_name: str = field(init=False)
    workspace_id: str = field(init=False)
    model_name: str = field(init=False)
    model_id: str = field(init=False)

    def __post_init__(self):
        if not self.model:
            raise ValueError("model name or ID must be provided")
        if not self.workspace:
            raise ValueError("workspace name or ID must be provided")

        # Resolve workspace and model
        workspace_info = _resolve_workspace(self.workspace)
        item_info = _resolve_item(self.model, "SemanticModel", self.workspace)

        self.workspace_name = workspace_info["workspace_name"]
        self.workspace_id = workspace_info["workspace_id"]
        self.model_name = item_info["item_name"]
        self.model_id = item_info["item_id"]

In [148]:
@dataclass
class BackupConfig:
    """
    Configuration for model backups.
    
    Args:
        workspace: Where to store backups
        folder: Optional folder (use "parent/child" for nested folders)
        backup_prefix: Prefix for backup names (default: "zBackup")
        timezone: Timezone for timestamps (default: "UTC")
    
    Example:
        backup = BackupConfig("Backups", folder="RLS/Monthly")
    """
    workspace: str
    folder: str | None = None
    backup_prefix: str = "zBackup"
    timezone: str = "UTC"

    # Auto-populated fields
    workspace_name: str = field(init=False)
    workspace_id: str = field(init=False)
    folder_path: str | None = field(init=False)
    folder_id: str | None = field(init=False)

    def __post_init__(self):
        if not self.workspace:
            raise ValueError("workspace name or ID must be provided")

        # Resolve workspace and folder
        workspace_info = _resolve_workspace(self.workspace)
        folder_info = _resolve_folder(self.folder, self.workspace)

        self.workspace_name = workspace_info["workspace_name"]
        self.workspace_id = workspace_info["workspace_id"]
        self.folder_path = folder_info["folder_path"]
        self.folder_id = folder_info["folder_id"]

In [149]:
@dataclass
class RoleMemberConfig:
    """
    Represents a single RLS role member.
    
    Example:
        member = RoleMemberConfig("Sales", "user@company.com", "User")
    """
    role_name: str
    member_name: str
    member_type: str  # "User" or "Group"

# FUNCTIONS

## Read Operations

In [150]:
def get_roles_df(model: ModelConfig, include_members: bool = False) -> pd.DataFrame:
    """
    Get roles (and optionally members) as a DataFrame.
    
    Args:
        model: Model configuration
        include_members: Include role members in output
        
    Returns:
        DataFrame with role information
        
    Example:
        >>> model = ModelConfig("Sales_Model", "Prod_Workspace")
        >>> roles_df = get_roles_df(model, include_members=True)
        >>> display(roles_df)
    """

    roles_data = []

    with connect_semantic_model(
        dataset=model.model_name,
        workspace=model.workspace_name,
        readonly=True
    ) as tom:
        for r in tom.model.Roles:
            role_data = {
                "Role": r.Name, 
                "Description": r.Description, 
                "Model Permission": str(r.ModelPermission), 
                "Modified Time": r.ModifiedTime
            }
            if include_members:
                for m in r.Members:
                    roles_data.append({
                        **role_data,
                        "Member":m.MemberName, 
                        "Identity Provider": m.IdentityProvider
                    })
            else:
                roles_data.append(role_data)
    return pd.DataFrame(roles_data)

In [151]:
def get_role_definition(model: str, workspace: str = None) -> list[dict]:
    """
    Get RLS role definitions (filters/permissions).
    
    Args:
        model: Model name or ID
        workspace: Workspace name or ID
        
    Returns:
        List of role definitions with filter expressions
        
    Example:
        >>> definitions = get_role_definition("Sales_Model", "Prod_Workspace")
        >>> for role in definitions:
        ...     print(f"{role['Role']}: {role['FilterExpression']}")
    """
    with connect_semantic_model(dataset=MODEL, workspace=MODEL_WORKSPACE, readonly=True) as tom:
        model_rls = [{
            "Role": i.Role.Name, 
            "Table": i.Table.Name, 
            "FilterExpression": i.FilterExpression} 
            for i in tom.all_rls()
        ]
    return model_rls

In [152]:
def get_role_members(model: ModelConfig, output_option: str = 'list') -> list[dict] | pd.DataFrame:
    """
    Get role members from a semantic model.
        
    Args:
        model: Model configuration
        output_option: 'list' for list of dicts, 'df' for Pandas DataFrame
        
    Returns:
        List of dicts or DataFrame with: role_name, member_name, member_type
        
    Example:
        >>> model = ModelConfig("Sales_Model", "Prod_Workspace")
        >>> members = get_role_members(model)
        >>> display(members)
    """
    rows = []
    
    with connect_semantic_model(
        dataset=model.model_name,
        workspace=model.workspace_name,
        readonly=True
    ) as tom:
        for role in tom.model.Roles:
            for member in role.Members:
                member_type = str(member.MemberType).split(".")[-1]
                rows.append({
                    "role_name": role.Name,
                    "member_name": member.MemberName,
                    "member_type": member_type
                })
    
    if output_option == 'df':
        return pd.DataFrame(rows)
    return rows

In [153]:
def get_storage_mode(model_name: str, workspace: str) -> str | list[str]:
    """
    Get the storage mode of a semantic model
    
    Args:
        model_name: Semantic Model Name or Id
        workspace: Semantic Model Workspace Name or Id
      
    Returns:
        String or List of storage mode(s) for tables in a semantic model

    Example:
        >>> model_name, model_workspace = "ModelName", "Model_Workspace"
        >>> storage_mode = get_storage_mode(model_name, model_workspace)
    """
    supported_storage_mode_list = ["Import", "DirectLake"]

    with connect_semantic_model(dataset=model_name, workspace=workspace, readonly=True) as tom:
        storage_mode_list = list({str(p.Mode) for t in tom.model.Tables for p in t.Partitions})

    if len(storage_mode_list) > 1:
        raise ValueError(
            f"Multiple storage modes detected: {str(storage_mode_list)}. Mixed models not supported."
        )
    elif len(storage_mode_list) < 1:
        raise ValueError(
            f"Storage Mode not found."
        )
    else:
        storage_mode = storage_mode_list[0]
        if storage_mode not in supported_storage_mode_list:
            raise ValueError(
                f"Unsupported storage mode: {storage_mode}"
            )
        return storage_mode

## Audit & Comparison

In [154]:
def compare_roles(
    target: ModelConfig,
    reference: ModelConfig,
    include_definition: bool = True
) -> tuple[bool, dict]:
    """
    Compare roles between target and reference models.

    Returns:
        (has_differences, comparison_dict)
    """
    def group_by_role(rows: list[dict]) -> dict[str, dict]:
        """
        Convert raw role definition rows into:

        {
            role_name: {
                hashable_definition: original_definition_dict
            }
        }
        """
        grouped = {}

        for row in rows:
            role = row["Role"]

            definition = {
                k: v
                for k, v in row.items()
                if k != "Role"
            }

            # Make hashable key for set comparison
            key = tuple(sorted(definition.items()))

            grouped.setdefault(role, {})[key] = definition

        return grouped

    # ---------------------------------------------------
    # Case 1: Compare role definitions
    # ---------------------------------------------------
    if include_definition:

        ref_raw = get_role_definition(
            model=reference.model_id,
            workspace=reference.workspace_id
        )

        target_raw = get_role_definition(
            model=target.model_id,
            workspace=target.workspace_id
        )

        ref_grouped = group_by_role(ref_raw)
        target_grouped = group_by_role(target_raw)

        all_roles = sorted(set(ref_grouped) | set(target_grouped))

        comparison = {
            "reference_only": [],
            "target_only": [],
            "shared": []
        }

        for role in all_roles:

            ref_defs = ref_grouped.get(role, {})
            target_defs = target_grouped.get(role, {})

            ref_keys = set(ref_defs)
            target_keys = set(target_defs)

            ref_only = ref_keys - target_keys
            target_only = target_keys - ref_keys
            shared = ref_keys & target_keys

            if ref_only:
                comparison["reference_only"].append({
                    "role": role,
                    "definitions": [ref_defs[k] for k in sorted(ref_only)]
                })

            if target_only:
                comparison["target_only"].append({
                    "role": role,
                    "definitions": [target_defs[k] for k in sorted(target_only)]
                })

            if shared:
                comparison["shared"].append({
                    "role": role,
                    "definitions": [ref_defs[k] for k in sorted(shared)]
                })

    # ---------------------------------------------------
    # Case 2: Compare role names only
    # ---------------------------------------------------
    else:
        ref_roles = set(get_roles_df(reference)["Role"])
        target_roles = set(get_roles_df(target)["Role"])

        comparison = {
            "reference_only": sorted(ref_roles - target_roles),
            "target_only": sorted(target_roles - ref_roles),
            "shared": sorted(ref_roles & target_roles)
        }

    has_differences = bool(
        comparison["reference_only"] or comparison["target_only"]
    )

    return has_differences, comparison

## Modify Operations

In [155]:
def copy_role(
    target_model: ModelConfig,
    ref_model: ModelConfig,
    dry_run: bool = True
) -> list[dict]:
    """
    Copy role definitions from reference to target model.
    
    Args:
        target_model: Target model configuration
        ref_model: Reference model configuration
        dry_run: If True, only show what would be copied
        
    Returns:
        List of role definitions that were (or would be) copied

    Example:
        >>> target_model = ModelConfig("Sales_Model", "Test_WS")
        >>> ref_model = ModelConfig("Ref_Model", "Test_WS")

        >>> # Dry run first
        >>> ref_role_defs = copy_role(target_model, ref_model, dry_run=True)

        >>> # Then execute
        >>> ref_role_defs = copy_role(target_model, ref_model, dry_run=False)
    """
    # Get reference roles
    ref_role_defs = get_role_definition(ref_model.model_id, ref_model.workspace_id)
    ref_roles = get_roles_df(ref_model)

    if dry_run:
        print("DRY RUN - No changes will be made")
        print(f"Would copy {len(ref_roles)} roles ({len(ref_role_defs)} role definitions) from [{ref_model.model_name}] to [{target_model.model_name}]")

        for role in ref_role_defs[:10]:
            print(f"  - {role['Role']}: {role['Table']} - {role.get('FilterExpression', 'N/A')}")
        if len(ref_role_defs) > 10:
            print(f"  ... and {len(ref_role_defs) - 10} more")
        return ref_role_defs

    # Execute copy
    with connect_semantic_model(
        dataset=target_model.model_id,
        workspace=target_model.workspace_id,
        readonly=False
    ) as tom:
        existing_roles = {r.Name for r in tom.model.Roles}

        for role_def in ref_role_defs:
            role_name = role_def['Role']

            if role_name not in existing_roles:
                tom.add_role(role_name=role_name, model_permission='Read')
                existing_roles.add(role_name)

            if role_def.get('FilterExpression'):
                tom.set_rls(
                    role_name=role_name,
                    table_name=role_def['Table'],
                    filter_expression=role_def['FilterExpression']
                )

    print(f"Copied {len(ref_role_defs)} role definitions from [{ref_model.model_name}] to [{target_model.model_name}]")
    return ref_role_defs

In [156]:
def remove_role(
    model: ModelConfig,
    remove_all: bool = False,
    remove_roles: list[str] = None,
    dry_run: bool = True
) -> list[str]:
    """
    Remove RLS role definitions from a semantic model (Direct Lake only).
    
    WARNING: This removes the role definitions themselves, not just members!
    
    Args:
        model: Model configuration
        remove_all: Remove all roles
        remove_roles: Specific role names to remove (list of strings)
        dry_run: If True, only show what would be removed
        
    Returns:
        List of role names that were (or would be) removed
        
    Example:
        >>> model = ModelConfig("Sales_Model", "Production")
        >>> # Dry run first
        >>> removed = remove_role(model, remove_all=True, dry_run=True)
        >>> # Then execute
        >>> removed = remove_role(model, remove_all=True, dry_run=False)
    """
    # Determine what to remove
    if remove_all:
        roles_df = get_roles_df(model)
        if roles_df.empty:
            print("No roles to remove")
            return []    

        to_remove = roles_df["Role"].tolist()
    elif remove_roles:
        to_remove = remove_roles
    else:
        print("No roles to remove")
        return []
    
    if dry_run:
        print("DRY RUN - No changes will be made")
        print(f"Would remove {len(to_remove)} roles from '{model.model_name}':")
        for role in to_remove[:10]:  # Show first 10
            print(f"  - {role}")
        if len(to_remove) > 10:
            print(f"  ... and {len(to_remove) - 10} more")
        return to_remove
    
    # Execute removal
    print(f"Removing {len(to_remove)} roles from '{model.model_name}'...")
    removed_count = 0
    
    with connect_semantic_model(
        dataset=model.model_name,
        workspace=model.workspace_name,
        readonly=False
    ) as tom:
        for role_name in to_remove:
            try:
                tom.model.Roles.Remove(role_name)
                removed_count += 1
            except Exception as e:
                print(f"Failed to remove role '{role_name}': {str(e)}")
    
    print(f"Removed {removed_count}/{len(to_remove)} roles")
    return to_remove

In [157]:
def add_role_members(
    model: ModelConfig,
    add_members: list[dict],
    dry_run: bool = True
) -> list[dict]:
    """
    Add role members to a semantic model.
    
    Args:
        model: Model configuration
        add_members: Members to add (list of dicts with keys: role_name, member_name, member_type)
        dry_run: If True, only show what would be added
        
    Returns:
        List of members that were (or would be) added
        
    Example:
        >>> model = ModelConfig("Sales_Model", "Production")
        >>> members = [
        ...     {"role_name": "Sales", "member_name": "user@company.com", "member_type": "User"}
        ... ]
        >>> # Dry run first
        >>> added = add_role_members(model, members, dry_run=True)
        >>> # Then execute
        >>> added = add_role_members(model, members, dry_run=False)
    """
    if not add_members:
        print("No members to add")
        return []
    
    if dry_run:
        print("DRY RUN - No changes will be made")
        print(f"Would add {len(add_members)} members to '{model.model_name}':")
        
        for m in add_members[:10]:  # Show first 10
            print(f"  + {m['role_name']}: {m['member_name']} ({m['member_type']})")
        
        if len(add_members) > 10:
            print(f"  ... and {len(add_members) - 10} more")

        return add_members
    
    # Execute addition
    print(f"Adding {len(add_members)} members to '{model.model_name}'...")
    added_count = 0
    
    with connect_semantic_model(
        dataset=model.model_name,
        workspace=model.workspace_name,
        readonly=False
    ) as tom:
        for m in add_members:
            try:
                tom.add_role_member(
                    role_name=m["role_name"],
                    member=m["member_name"],
                    role_member_type=m["member_type"]
                )
                added_count += 1
            except Exception as e:
                print(f"Failed to add {m['member_name']}: {str(e)}")
    
    print(f"Added {added_count}/{len(add_members)} members")
    return add_members

In [158]:
def remove_role_members(model: ModelConfig, remove_all: bool = False, remove_members: list[RoleMemberConfig] = None, dry_run: bool = True) -> list[dict]:
    """
    Remove role members from a semantic model.
    
    Args:
        model: Model configuration
        remove_all: Remove all members
        remove_members: Specific members to remove (list of dicts)
        dry_run: If True, only show what would be removed
        
    Returns:
        List of members that were (or would be) removed
        
    Example:
        >>> model = ModelConfig("Sales_Model", "Production")
        >>> # Dry run first
        >>> removed = remove_role_members(model, remove_all=True, dry_run=True)
        >>> # Then execute
        >>> removed = remove_role_members(model, remove_all=True, dry_run=False)
    """
    # Determine what to remove
    if remove_all:
        to_remove = get_role_members(model, output_option="list")
    elif remove_members:
        to_remove = remove_members
    else:
        print("No members to remove")
        return []
    
    if dry_run:
        print("DRY RUN - No changes will be made")
        print(f"Would remove {len(to_remove)} members from '{model.model_name}':")

        for m in to_remove[:10]:  # Show first 10
            print(f"  - {m['role_name']}: {m['member_name']} ({m['member_type']})")
        
        if len(to_remove) > 10:
            print(f"  ... and {len(to_remove) - 10} more")
        
        return to_remove
    
    # Execute removal
    print(f"Removing {len(to_remove)} members from '{model.model_name}'...")
    removed_count = 0
    
    with connect_semantic_model(
        dataset=model.model_name,
        workspace=model.workspace_name,
        readonly=False
    ) as tom:
        for m in to_remove:
            try:
                tom.remove_role_member(
                    role_name=m["role_name"],
                    member=m["member_name"]
                )
                removed_count += 1
            except Exception as e:
                print(f"Failed to remove {m['member_name']}: {str(e)}")
    
    print(f"Removed {removed_count}/{len(to_remove)} members")
    return to_remove

## Backup Operations

In [159]:
def move_items_to_folder(
    item_ids: list[str],
    folder: str = None,
    workspace: str = None,
    dry_run: bool = True
) -> dict | None:
    """
    Move items to a folder in bulk.
    
    Args:
        item_ids: List of item IDs to move
        folder: Folder name or path (e.g., "Backups" or "parent/child")
        workspace: Workspace name or ID
        dry_run: If True, only show what would be moved
        
    Returns:
        API response dict (or None if dry_run)
        
    Example:
        >>> model_id = "abc-123"
        >>> report_id = "def-456"
        >>> # Dry run first
        >>> result = move_items_to_folder(
        ...     item_ids=[model_id, report_id],
        ...     folder="Backups",
        ...     workspace="Production",
        ...     dry_run=True
        ... )
        >>> # Then execute
        >>> result = move_items_to_folder(
        ...     item_ids=[model_id, report_id],
        ...     folder="Backups",
        ...     workspace="Production",
        ...     dry_run=False
        ... )
    """
    
    if not item_ids:
        print("No items to move")
        return None
    
    # Resolve environment
    try:
        folder_info = _resolve_folder(folder, workspace)
        folder_exists = True
    except Exception as e:
        folder_exists = False
        # If dry_run, just note that folder would be created
        if dry_run:
            workspace_info = _resolve_workspace(workspace)
            workspace_name = workspace_info["workspace_name"]
            folder_path = f"/{folder}" if folder else ""
        else:
            # Only create folder when actually executing
            print(f"Creating folder '{folder}'...")
            folder_id = fabric.create_folder(folder, workspace, recursive=True)
            folder_info = _resolve_folder(folder, workspace)
            folder_exists = True
    
    if folder_exists:
        workspace_id = folder_info["workspace_id"]
        workspace_name = folder_info["workspace_name"]
        folder_id = folder_info["folder_id"]
        folder_path = folder_info["folder_path"] or ""
    
    if dry_run:
        print("DRY RUN - No changes will be made")
        if not folder_exists:
            print(f"Folder '{folder}' does not exist - would be created")
        print(f"Would move {len(item_ids)} items to [{workspace_name}{folder_path}]")
        print(f"Items to move:")
        for item_id in item_ids[:9]:  # Show first 10
            print(f"  - {item_id}")
        if len(item_ids) > 10:
            print(f"  ... and {len(item_ids) - 10} more")
        return None
    
    # Execute move
    print(f"Moving {len(item_ids)} items to [{workspace_name}{folder_path}]...")
    
    payload = {'items': item_ids}
    if folder_id:
        payload['targetFolderId'] = folder_id
    
    try:
        client = fabric.FabricRestClient()
        response = client.post(f"v1/workspaces/{workspace_id}/items/bulkMove", json=payload)
        print(f"Moved {len(item_ids)} items to [{workspace_name}{folder_path}]")
        return response.json()
    except Exception as e:
        print(f"Failed to move items: {str(e)}")
        return None

In [160]:
def backup_model(
    model_cfg: ModelConfig,
    backup_cfg: BackupConfig,
    storage_mode: Literal["Import", "DirectLake"],
    dry_run: bool = True
) -> dict | None:
    """
    Create a backup of a semantic model (and report if Import mode).
    
    Args:
        model_cfg: Model to backup
        backup_cfg: Backup configuration
        storage_mode: 'Import' or 'DirectLake'
        dry_run: If True, only show what would be backed up
        
    Returns:
        Dict with backup details (or None if dry_run)
        
    Example:
        >>> model = ModelConfig("Sales_Model", "Prod_WS")
        >>> backup = BackupConfig("Backups", folder="Monthly")
        >>> # Dry run first
        >>> result = backup_model(model, backup, storage_mode="Import", dry_run=True)
        >>> # Then execute
        >>> result = backup_model(model, backup, storage_mode="Import", dry_run=False)
        >>> print(f"Backup created: {result['backup_model_name']}")
    """

    # Prepare backup name
    timestamp = _now_str(backup_cfg.timezone)
    backup_name = f"{backup_cfg.backup_prefix}_{model_cfg.model_name}_{timestamp}"
    
    if dry_run:
        print("DRY RUN - No changes will be made")
        print(f"Would create backup:")
        print(f"Source: {model_cfg.model_name}")
        print(f"Backup name: {backup_name}")
        print(f"Workspace: {backup_cfg.workspace_name}")
        if backup_cfg.folder_path:
            print(f"Folder: {backup_cfg.folder_path}")
        print(f"Storage mode: {storage_mode}")
        if storage_mode == "Import":
            print(f"Items: Semantic model + Report")
        else:
            print(f"Items: Semantic model only")
        return None
    
    print(f"Creating backup: '{model_cfg.model_name}' -> '{backup_name}'...")
    
    try:
        # Export semantic model as BIM
        print(f"Exporting BIM from '{model_cfg.model_name}'...")
        bim_file = get_semantic_model_bim(
            dataset=model_cfg.model_name,
            workspace=model_cfg.workspace_id
        )
        
        # Create backup semantic model
        print(f"Creating backup semantic model...")
        create_semantic_model_from_bim(
            dataset=backup_name,
            bim_file=bim_file,
            workspace=backup_cfg.workspace_id
        )

        # Resolve backup model ID
        backup_model_info = _resolve_item(backup_name, "SemanticModel", backup_cfg.workspace_id)
        backup_model_id = backup_model_info["item_id"]
        backup_items=[backup_model_id]
        
        # Clone report (Import mode only)
        backup_report_id = None
        backup_report_name = None
        
        if storage_mode == "Import":
            print(f"Cloning report...")
            clone_report(
                report=model_cfg.model_name,
                cloned_report=backup_name,
                workspace=model_cfg.workspace_id,
                target_workspace=backup_cfg.workspace_id,
                target_dataset=backup_name,
                target_dataset_workspace=backup_cfg.workspace_id
            )
            backup_report_info = _resolve_item(backup_name, "Report", backup_cfg.workspace_id)
            backup_report_id = backup_report_info["item_id"]
            backup_report_name = backup_name
            backup_items.append(backup_report_id)
        
        # Move to folder
        if backup_cfg.folder:
            print(f"Moving to folder '{backup_cfg.folder_path or backup_cfg.folder}'...")
            move_items_to_folder(
                item_ids=backup_items,
                folder=backup_cfg.folder,
                workspace=backup_cfg.workspace_id,
                dry_run=False
            )
        
        print(f"Backup created: {backup_name}")
        
        return {
            "backup_model_id": backup_model_id,
            "backup_model_name": backup_name,
            "backup_report_id": backup_report_id,
            "backup_report_name": backup_report_name,
            "backup_workspace_id": backup_cfg.workspace_id,
            "backup_workspace_name": backup_cfg.workspace_name,
            "backup_folder": backup_cfg.folder_path
        }
        
    except Exception as e:
        print(f"Backup failed: {str(e)}")
        return None

## Main function

In [161]:
def copy_rls(
    target: ModelConfig,
    reference: ModelConfig,
    backup_folder: str = None,
    replace_roles: bool = True,
    timezone: str = "UTC",
    dry_run: bool = True
) -> str:
    """
    Main function to copy RLS role members from reference to target model.
    
    Workflow:
        1. Create backup (if not dry_run)
        2. Compare roles (validate compatibility)
        3. Replace role definitions if requested
        4. Remove existing members from target
        5. Copy members from reference to target
    
    Args:
        target: Target model configuration
        reference: Reference model configuration
        backup_folder: Optional folder for backups
        timezone: Timezone for backup timestamps
        dry_run: If True, simulate without making changes
        
    Returns:
        Status message
        
    Example:
        >>> target = ModelConfig("Sales_Prod", "Production")
        >>> reference = ModelConfig("RLS_Template", "Templates")
        >>> # Always dry run first!
        >>> result = copy_rls(target, reference, replace_roles=True, dry_run=True)
        >>> print(result)
        >>> # Then execute
        >>> result = copy_rls(target, reference, replace_roles=True, dry_run=False)
    """
    # Resolve target model storage mode
    target_storage_mode = get_storage_mode(
        target.model_id,
        target.workspace_id
    )

    print(f"RLS ROLE MEMBER COPIER - {'DRY RUN' if dry_run else 'EXECUTING'}")

    print("\n" + "="*70 + "\n")

    print(f"Reference model: {reference.model_name}")
    print(f"Target model: {target.model_name}")
    print(f"Target model storage Mode: {target_storage_mode}")
    print(f"Replace Roles: {replace_roles}")
    
    print("\n" + "="*70 + "\n")

    # Step 1: Backup (if executing)
    print("STEP 1: Creating backup...")
    backup_cfg = BackupConfig(
        workspace=target.workspace,
        folder=backup_folder,
        timezone=timezone
    )
    backup_result = backup_model(target, backup_cfg, target_storage_mode, dry_run=dry_run)

    print("\n" + "="*70 + "\n")
    
    # Step 2: Compare roles
    print("STEP 2: Comparing roles...") 
    has_diff, role_comparison = compare_roles(target, reference)
    
    if has_diff:
        print("WARNING: Role definitions don't match!")
        print(f"  Reference only: {role_comparison['reference_only']}")
        print(f"  Target only: {role_comparison['target_only']}")
    else:
        print("Roles match")

    print("\n" + "="*70 + "\n")
    
    # Step 3: Replace roles
    if replace_roles and has_diff:
        print("STEP 3: Replacing roles...")
        removed_roles = remove_role(model=target, remove_all=True, dry_run=dry_run)
        copied_roles = copy_role(target_model=target, ref_model=reference, dry_run=dry_run)
    else:
        print("STEP 3: Skip replacing roles...")

    print("\n" + "="*70 + "\n")
    
    # Step 4: Remove existing members
    print("STEP 4: Removing existing members from target...")
    removed_role_members = remove_role_members(target, remove_all=True, dry_run=dry_run)

    print("\n" + "="*70 + "\n")

    # Step 5: Copy members
    print("STEP 5: Copying members from reference to target...")
    ref_members = get_role_members(reference, output_option='list')
    added_role_members = add_role_members(model=target, add_members=ref_members, dry_run=dry_run)
    
    print("\n" + "="*70 + "\n")

    # Summary
    if dry_run:
        output_message = "DRY RUN complete - Review output above"
    else:
        output_message = f"SUCCESS - Copied row-level security configuration from '{reference.model_name}' to '{target.model_name}'"
    
    print(output_message)
    return output_message

# MAIN

In [162]:
target_model = ModelConfig(model=MODEL, workspace=MODEL_WORKSPACE)
ref_model = ModelConfig(model=REF_MODEL, workspace=REF_MODEL_WORKSPACE)

result = copy_rls(
    target=target_model, 
    reference=ref_model, 
    backup_folder=BACKUP_FOLDER, 
    dry_run=DRY_RUN
)

RLS ROLE MEMBER COPIER - DRY RUN


Reference model: RLS_Ref
Target model: RLS_Target
Target model storage Mode: Import
Replace Roles: True


STEP 1: Creating backup...
DRY RUN - No changes will be made
Would create backup:
Source: RLS_Target
Backup name: zBackup_RLS_Target_20260301T124500
Workspace: zSandbox_Fabric
Folder: /Neil
Storage mode: Import
Items: Semantic model + Report


STEP 2: Comparing roles...
Roles match


STEP 3: Skip replacing roles...


STEP 4: Removing existing members from target...
DRY RUN - No changes will be made
Would remove 2 members from 'RLS_Target':
  - PKG_COM_ALL: T1-Admin@detmoldgroup.com (Group)
  - PKG_ALL: T1-Admin@detmoldgroup.com (Group)


STEP 5: Copying members from reference to target...
DRY RUN - No changes will be made
Would add 2 members to 'RLS_Target':
  + PKG_COM_ALL: T1-Admin@detmoldgroup.com (Group)
  + PKG_ALL: T1-Admin@detmoldgroup.com (Group)


DRY RUN complete - Review output above


# Output

In [163]:
# Check if notebook is interactive session
interactive_run = nbutl.runtime.context["isForInteractive"]

if interactive_run:
    print(result)
else:
    nbutl.notebook.exit(result)

DRY RUN complete - Review output above
