# RBAC with RAG
This code accompanies the blog [RBAC with RAG - Best of Friends](https://www.elastic.co/search-labs/blog/rbac-and-rag-best-friends)

It is a simple demonstration of how users assigned to different groups are able to query the same index pattern, but only retrieve documents they should have access to.

Indices:
- `rbac_rag_demo-data_public` contains data that is not restricted
- `rbac_rag_demo-data_senstive` contains data is is restricted to only managers

Users:
- `engineer_role` will have access to the `rbac_rag_demo-data_public` index

> Add blockquote


- `manager_role` will have access to both `rbac_rag_demo-data_public` and `rbac_rag_demo-data_sensitive` indices

# Environment setup

## Install and import required python libraries

In [6]:
!pip install elasticsearch python-dotenv

[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m[33m
[0mLooking in indexes: http://mirrors.aliyun.com/pypi/simple/
[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m[33m
[0m

In [7]:
from elasticsearch import Elasticsearch
from IPython.display import HTML, display
from pprint import pprint
from dotenv import load_dotenv
import os, json

# Elasticsearch Setup

## Create elasticsearch connection for index and user setup

In [15]:
load_dotenv()

ES_USER = os.getenv("ES_USER")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_ENDPOINT = os.getenv("ES_ENDPOINT")

url = f"https://{ES_USER}:{ES_PASSWORD}@{ES_ENDPOINT}:9200"
print(url)

es = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
print(es.info())

https://elastic:VDMlz5QnM_0g-349fFq7@localhost:9200
{'name': 'liuxgm.local', 'cluster_name': 'elasticsearch', 'cluster_uuid': 'xuoGAd4TQ-urrB6jPMAWpg', 'version': {'number': '8.13.2', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': '16cc90cd2d08a3147ce02b07e50894bc060a4cbf', 'build_date': '2024-04-05T14:45:26.420424304Z', 'build_snapshot': False, 'lucene_version': '9.10.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


## Delete demo indices if they previously existed

In [16]:
# Delete indices
def delete_indices():
    try:
        es.indices.delete(index="rbac_rag_demo-data_public")
        print("Deleted index: rbac_rag_demo-data_public")
    except Exception as e:
        print(f"Error deleting index rbac_rag_demo-data_public: {str(e)}")

    try:
        es.indices.delete(index="rbac_rag_demo-data_sensitive")
        print("Deleted index: rbac_rag_demo-data_sensitive")
    except Exception as e:
        print(f"Error deleting index rbac_rag_demo-data_sensitive: {str(e)}")


delete_indices()

Error deleting index rbac_rag_demo-data_public: NotFoundError(404, 'index_not_found_exception', 'no such index [rbac_rag_demo-data_public]', rbac_rag_demo-data_public, index_or_alias)
Error deleting index rbac_rag_demo-data_sensitive: NotFoundError(404, 'index_not_found_exception', 'no such index [rbac_rag_demo-data_sensitive]', rbac_rag_demo-data_sensitive, index_or_alias)


## Create and load data into indices


In [18]:
# Create indices
def create_indices():
    # Create data_public index
    es.indices.create(
        index="rbac_rag_demo-data_public",
        ignore=400,
        body={
            "settings": {"number_of_shards": 1},
            "mappings": {"properties": {"info": {"type": "text"}}},
        },
    )

    # Create data_sensitive index
    es.indices.create(
        index="rbac_rag_demo-data_sensitive",
        ignore=400,
        body={
            "settings": {"number_of_shards": 1},
            "mappings": {
                "properties": {
                    "document": {"type": "text"},
                    "confidentiality_level": {"type": "keyword"},
                }
            },
        },
    )


# Populate sample data
def populate_data():
    # Public HR information
    public_docs = [
        {"title": "Annual leave policies updated.", "confidentiality_level": "low"},
        {"title": "Remote work guidelines available.", "confidentiality_level": "low"},
        {
            "title": "Health benefits registration period starts next month.",
            "confidentiality_level": "low",
        },
    ]
    for doc in public_docs:
        es.index(index="rbac_rag_demo-data_public", document=doc)

    # Sensitive HR information
    sensitive_docs = [
        {
            "title": "Executive compensation details Q2 2024.",
            "confidentiality_level": "high",
        },
        {
            "title": "Bonus payout structure for all levels.",
            "confidentiality_level": "high",
        },
        {
            "title": "Employee stock options plan details.",
            "confidentiality_level": "high",
        },
    ]
    for doc in sensitive_docs:
        es.index(index="rbac_rag_demo-data_sensitive", document=doc)


create_indices()
populate_data()

  es.indices.create(
  es.indices.create(


## Create two users with different access levels


In [20]:
# Create roles
def create_roles():
    # Role for the engineer
    es.security.put_role(
        name="engineer_role",
        body={
            "indices": [
                {"names": ["rbac_rag_demo-data_public"], "privileges": ["read"]}
            ]
        },
    )

    # Role for the manager
    es.security.put_role(
        name="manager_role",
        body={
            "indices": [
                {
                    "names": [
                        "rbac_rag_demo-data_public",
                        "rbac_rag_demo-data_sensitive",
                    ],
                    "privileges": ["read"],
                }
            ]
        },
    )


# Create users with respective roles
def create_users():
    # User 'engineer'
    es.security.put_user(
        username="engineer",
        body={
            "password": "password123",
            "roles": ["engineer_role"],
            "full_name": "Engineer User",
        },
    )

    # User 'manager'
    es.security.put_user(
        username="manager",
        body={
            "password": "password123",
            "roles": ["manager_role"],
            "full_name": "Manager User",
        },
    )


create_roles()
create_users()

# Test how security roles affect ability to query data

## Create helper functions

Helper functions to query for each user

and some output formatting

In [22]:
"""
def get_es_connection(cid, username, password):
    return Elasticsearch(cloud_id=cid, basic_auth=(username, password))
"""

def get_es_connection(username, password):
    url = f"https://{username}:{password}@{ES_ENDPOINT}:9200"
    print(url)
    return Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)


def query_index(es, index_name, username):
    try:
        response = es.search(index=index_name, body={"query": {"match_all": {}}})

        # Prepare the message
        results_message = f'Results from querying as <span style="color: orange;">{username}:</span><br>'
        for hit in response["hits"]["hits"]:
            confidentiality_level = hit["_source"].get("confidentiality_level", "N/A")
            index_name = hit.get("_index", "N/A")
            title = hit["_source"].get("title", "No title")

            # Set color based on confidentiality level
            if confidentiality_level == "low":
                conf_color = "lightgreen"
            elif confidentiality_level == "high":
                conf_color = "red"
            else:
                conf_color = "black"

            # Set color based on index name
            if index_name == "rbac_rag_demo-data_public":
                index_color = "lightgreen"
            elif index_name == "rbac_rag_demo-data_sensitive":
                index_color = "red"
            else:
                index_color = "black"  # Default color

            results_message += (
                f'Index: <span style="color: {index_color};">{index_name}</span>\t '
                f'confidentiality level: <span style="color: {conf_color};">{confidentiality_level}</span> '
                f'title: <span style="color: lightblue;">{title}</span><br>'
            )

        display(HTML(results_message))

    except Exception as e:
        print(f"Error accessing {index_name}: {str(e)}")

## Siumlate querying as an "engineer"

In [23]:
index_pattern = "rbac_rag_demo-data*"
print(
    f"Each user will log in with their credentials and query the same index pattern: {index_pattern}\n\n"
)

for user in ["engineer", "manager"]:
    print(f"Logged in as {user}:")

    es_conn = get_es_connection(user, "password123")
    results = query_index(es_conn, index_pattern, user)
    print("\n\n")

Each user will log in with their credentials and query the same index pattern: rbac_rag_demo-data*


Logged in as engineer:
https://engineer:password123@localhost:9200





Logged in as manager:
https://manager:password123@localhost:9200





