# ABAC on Elasticsearch

## Install Prerequisites

In [None]:
! pip install -q -U -r requirements.txt

## Create Deployment

In [37]:
import os
from dotenv import load_dotenv
import requests
import time
from elasticsearch import Elasticsearch

load_dotenv(override=True)

payload = {
    "name": "demo-deployment",
    "region": "gcp-us-central1"
}

headers = {
    "Authorization": f"ApiKey {os.getenv("ECH_API_KEY")}",
    "Content-Type": "application/json"
}

response = requests.post(f"{os.getenv("ECH_URL")}?template_id=gcp-general-purpose", headers=headers, json=payload)
response.raise_for_status()
DEPLOYMENT_ID = response.json()["id"]
CREDENTIALS = response.json()["resources"][0]["credentials"]
CLOUD_ID = response.json()["resources"][0]["cloud_id"]

print("Awaiting deployment build")
while True:
    time.sleep(30)
    response = requests.get(f"{os.getenv("ECH_URL")}/{DEPLOYMENT_ID}", headers=headers)
    response.raise_for_status()
    es_status = response.json()["resources"]["elasticsearch"][0]["info"]["status"]
    kibana_status = response.json()["resources"]["kibana"][0]["info"]["status"]
    print(f"Elasticsearch status: {es_status}, Kibana status: {kibana_status}")
    if es_status == "started" and kibana_status == "started":
        break 
print("Deployment ready")

es = Elasticsearch(cloud_id=CLOUD_ID, basic_auth=(CREDENTIALS["username"], CREDENTIALS["password"]))

Awaiting deployment build
Elasticsearch status: initializing, Kibana status: initializing
Elasticsearch status: initializing, Kibana status: initializing
Elasticsearch status: started, Kibana status: initializing
Elasticsearch status: started, Kibana status: started
Deployment ready


## Load Data

In [38]:
from elasticsearch.helpers import bulk

INDEX_NAME = "nuke_docs"
mappings = {
    "properties": {
        "title": {
            "type": "text"
        },
        "attributes": {
            "type": "object",
            "properties": {
                "departments": {
                    "type": "keyword"
                },
                "training": {
                    "type": "keyword"
                },
                "min_training": {
                    "type": "integer"
                }
            }
        }
    }
}

es.options(ignore_status=[404]).indices.delete(index=INDEX_NAME)
es.indices.create(index=INDEX_NAME, body={"mappings": mappings})

def gen_data():
    with open("assets/data.jsonl", "r") as f:
        for line in f:
            yield line.strip()
            
result = bulk(client=es, index=INDEX_NAME, actions=gen_data())
print(result[0], "documents indexed")

5 documents indexed


## Create ABAC Role

In [39]:
es.security.put_role(name="abac_role", body={
    "indices": [
        {
            "names": [INDEX_NAME],
            "privileges": ["read"],
            "query": {
                "template": {
                    "source": "{\"bool\": {\"filter\": [{\"terms_set\": {\"attributes.training\": {\"terms\": {{#toJson}}_user.metadata.attributes.training{{/toJson}},\"minimum_should_match_field\": \"attributes.min_training\"}}}, {\"terms\": {\"attributes.departments\": {{#toJson}}_user.metadata.attributes.departments{{/toJson}}}}]}}"
                }
            
            }
        }]
})

ObjectApiResponse({'role': {'created': True}})

## Create Users

In [40]:
import string
import secrets
def random_password(length=12):
    alphabet = string.ascii_letters + string.digits + string.punctuation
    return ''.join(secrets.choice(alphabet) for i in range(length))

booger_creds = {
    "username": "Booger",
    "password": random_password()
}
fritz_creds = {
    "username": "Fritz",
    "password": random_password()
}
gork_creds = {
    "username": "Gork",
    "password": random_password()
}

es.security.put_user(
    username=booger_creds["username"],
    password=booger_creds["password"],
    roles=["abac_role"],
    metadata={
        "attributes": {
            "departments": ["Safety Oversight"],
            "training": ["Radiation Safety", "Regulatory Compliance"]
        }
    }
)

es.security.put_user(
    username=fritz_creds["username"],
    password=fritz_creds["password"],
    roles=["abac_role"],
    metadata={
        "attributes": {
            "departments": ["Reactor Operations"],
            "training": ["Core Procedures", "Radiation Safety"]
        }
    }
)

es.security.put_user(
    username=gork_creds["username"],
    password=gork_creds["password"],
    roles=["abac_role"],
    metadata={
        "attributes": {
            "departments": ["Nuclear Materials"],
            "training": ["Fuel Handling", "Radiation Safety"]
        }
    }
)

ObjectApiResponse({'created': True})

## Test Users

In [41]:
import json
def search_with_user(client, index_name, creds, description):
    query = {"_source":["title"],"query": {"match_all": {}}}
    response = client.options(basic_auth=(creds["username"], creds["password"])).search(index=index_name, body=query)
    print(f"\n*** {description} ***")
    results = []
    for hit in response['hits']['hits']:
        results.append(hit['_source'])
    print(json.dumps(results, indent=2))

search_with_user(es, INDEX_NAME, booger_creds, "Booger's Docs")
search_with_user(es, INDEX_NAME, fritz_creds, "Fritz's Docs")
search_with_user(es, INDEX_NAME, gork_creds, "Gork's Docs")


*** Booger's Docs ***
[
  {
    "title": "Radiation Safety Manual"
  }
]

*** Fritz's Docs ***
[
  {
    "title": "Reactor Startup Protocol"
  },
  {
    "title": "Radiation Safety Manual"
  },
  {
    "title": "Emergency Shutdown Procedures"
  }
]

*** Gork's Docs ***
[
  {
    "title": "Fuel Rod Handling Guidelines"
  },
  {
    "title": "Radiation Safety Manual"
  },
  {
    "title": "Waste Storage Protocol"
  }
]


## Delete Deployment

In [42]:
headers = {
    "Authorization": f"ApiKey {os.getenv("ECH_API_KEY")}",
    "Content-Type": "application/json"
}
response = requests.post(f"{os.getenv("ECH_URL")}/{DEPLOYMENT_ID}/_shutdown", headers=headers)
response.raise_for_status()
print(f"{response.json()['name']} is shut down")

demo-deployment is shut down
