# Trino OpenFGA Integration Testing

This notebook demonstrates how to interact with Trino using the OpenFGA authorization system for access control. We'll test row-level security, column masking, and general access control features.

## Setup

First, we need to install the required Python packages to connect to Trino:

In [ ]:
# Install required packages
!pip install trino requests pandas openfga-sdk

## Connect to Trino

Let's establish a connection to our Trino server:

In [ ]:
import trino
import pandas as pd

# Create a connection to Trino
conn = trino.dbapi.connect(
    host="trino",  # Service name in docker-compose
    port=8080,     # Port inside the container
    user="admin",  # Using admin user initially
    catalog="memory",
    schema="default"
)

cursor = conn.cursor()
print("Connected to Trino!")

## OpenFGA Setup

Now let's create an OpenFGA store and authorization model, then update the Trino configuration:

In [ ]:
from openfga import OpenFgaClient
from openfga.models import ClientConfiguration, CreateStoreRequest, WriteAuthorizationModelRequest
import uuid
import json

# Initialize OpenFGA client
config = ClientConfiguration(
    api_url="http://openfga:8080",  # Service name in docker compose
    store_id=""
)
client = OpenFgaClient(config)

# Create a store
store_request = CreateStoreRequest(name=f"trino-test-{uuid.uuid4()}")
store_response = client.create_store(store_request)
store_id = store_response.id
print(f"Created OpenFGA store with ID: {store_id}")

# Update client with store ID
config = ClientConfiguration(
    api_url="http://openfga:8080", 
    store_id=store_id
)
client = OpenFgaClient(config)

# Create authorization model
# We'll use a simple model for testing row-level security and column masking
model_json = '''
{
  "schema_version": "1.1",
  "type_definitions": [
    {
      "type": "user",
      "relations": {}
    },
    {
      "type": "table",
      "relations": {
        "can_read": {
          "this": {}
        },
        "has_filter": {
          "this": {}
        }
      }
    },
    {
      "type": "column",
      "relations": {
        "can_read": {
          "this": {}
        },
        "has_mask": {
          "this": {}
        }
      }
    },
    {
      "type": "filter",
      "relations": {
        "has_expression": {
          "this": {}
        }
      }
    },
    {
      "type": "mask",
      "relations": {
        "has_expression": {
          "this": {}
        }
      }
    }
  ]
}
'''

model_request = WriteAuthorizationModelRequest(schema_version="1.1", type_definitions=json.loads(model_json)["type_definitions"])
model_response = client.write_authorization_model(body=model_request)
model_id = model_response.authorization_model_id
print(f"Created OpenFGA authorization model with ID: {model_id}")

# Now we need to update the Trino access-control.properties file with these values
# In production, you would modify the file directly, but for a notebook demo
# we'll just print the values to use in the Docker configuration
print("\nUpdate your /etc/access-control.properties file with these values:")
print(f"openfga.store.id={store_id}")
print(f"openfga.model.id={model_id}")

## Create Test Data

Let's create some test data in the memory connector:

In [ ]:
# Create a test table with sample data
cursor.execute("""
CREATE TABLE memory.default.employee (
  id INTEGER,
  name VARCHAR,
  department VARCHAR,
  salary INTEGER,
  ssn VARCHAR
)
""")

# Insert test data
cursor.execute("""
INSERT INTO memory.default.employee VALUES
  (1, 'Alice', 'HR', 100000, '123-45-6789'),
  (2, 'Bob', 'Engineering', 120000, '234-56-7890'),
  (3, 'Carol', 'HR', 90000, '345-67-8901'),
  (4, 'Dave', 'Finance', 130000, '456-78-9012')
""")

# Verify data was inserted
cursor.execute("SELECT * FROM memory.default.employee")
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
df = pd.DataFrame(rows, columns=columns)
df

## Configure Row-Level Security

Let's set up row-level security in OpenFGA so test_user can only see HR department rows:

In [ ]:
from openfga.models import TupleKey, WriteRequest
import uuid

# Table path
table_path = "memory/default/employee"
user_id = "test_user"

# Give test_user read access to the table
tuple_key = TupleKey(
    object=table_path,
    relation="can_read",
    user=user_id
)
write_request = WriteRequest(writes=[tuple_key])
client.write(body=write_request)

# Create a filter expression ID
filter_expr_id = f"dept_filter_{uuid.uuid4().hex}"
row_filter = "department = 'HR'"

# Add the filter expression
tuple_key = TupleKey(
    object=f"filter:{filter_expr_id}",
    relation="has_expression",
    user=row_filter
)
write_request = WriteRequest(writes=[tuple_key])
client.write(body=write_request)

# Link filter to table and user
tuple_key = TupleKey(
    object=table_path,
    relation="has_filter",
    user=f"filter:{filter_expr_id}#has_expression@{user_id}"
)
write_request = WriteRequest(writes=[tuple_key])
client.write(body=write_request)

print(f"Row-level security configured for {user_id} to only see HR department rows")

## Configure Column Masking

Now let's set up column masking for sensitive columns like salary and SSN:

In [ ]:
# Configure masking for salary column
salary_mask_id = f"salary_mask_{uuid.uuid4().hex}"
salary_mask = "0"  # Completely mask salary
salary_col_path = f"{table_path}/salary"

# Give test_user read access to the column
tuple_key = TupleKey(
    object=salary_col_path,
    relation="can_read",
    user=user_id
)
write_request = WriteRequest(writes=[tuple_key])
client.write(body=write_request)

# Add mask expression
tuple_key = TupleKey(
    object=f"mask:{salary_mask_id}",
    relation="has_expression",
    user=salary_mask
)
write_request = WriteRequest(writes=[tuple_key])
client.write(body=write_request)

# Link mask to column and user
tuple_key = TupleKey(
    object=salary_col_path,
    relation="has_mask",
    user=f"mask:{salary_mask_id}#has_expression@{user_id}"
)
write_request = WriteRequest(writes=[tuple_key])
client.write(body=write_request)

# Now configure masking for SSN column
ssn_mask_id = f"ssn_mask_{uuid.uuid4().hex}"
ssn_mask = "regexp_replace(ssn, '(\\d{3})-(\\d{2})-(\\d{4})', 'XXX-XX-$3')"  # Show only last 4 digits
ssn_col_path = f"{table_path}/ssn"

# Give test_user read access to the column
tuple_key = TupleKey(
    object=ssn_col_path,
    relation="can_read",
    user=user_id
)
write_request = WriteRequest(writes=[tuple_key])
client.write(body=write_request)

# Add mask expression
tuple_key = TupleKey(
    object=f"mask:{ssn_mask_id}",
    relation="has_expression",
    user=ssn_mask
)
write_request = WriteRequest(writes=[tuple_key])
client.write(body=write_request)

# Link mask to column and user
tuple_key = TupleKey(
    object=ssn_col_path,
    relation="has_mask",
    user=f"mask:{ssn_mask_id}#has_expression@{user_id}"
)
write_request = WriteRequest(writes=[tuple_key])
client.write(body=write_request)

print("Column masking configured for salary and SSN columns")

## Test Access Controls

Now let's test the access controls we've set up. First, we'll connect as the admin user to see everything:

In [ ]:
# Query as admin
cursor.execute("SELECT * FROM memory.default.employee")
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
print("Results as admin user:")
pd.DataFrame(rows, columns=columns)

Now let's connect as test_user and see the effects of row-level security and column masking:

In [ ]:
# Connect as test_user
test_conn = trino.dbapi.connect(
    host="trino",
    port=8080,
    user="test_user",  # Connect as test_user
    catalog="memory",
    schema="default"
)

test_cursor = test_conn.cursor()

# Query as test_user
test_cursor.execute("SELECT * FROM memory.default.employee")
rows = test_cursor.fetchall()
columns = [desc[0] for desc in test_cursor.description]
print("Results as test_user:")
pd.DataFrame(rows, columns=columns)

## Cleanup

Let's clean up our test data:

In [ ]:
# Drop the test table
cursor.execute("DROP TABLE memory.default.employee")
print("Test table dropped")

# Close connections
cursor.close()
conn.close()
test_cursor.close()
test_conn.close()
print("Connections closed")

## Conclusion

In this notebook, we've demonstrated:

1. Setting up OpenFGA store and authorization model
2. Creating test data in Trino's memory connector
3. Configuring row-level security to filter data based on department
4. Configuring column masking for sensitive data like salary and SSN
5. Testing access controls with different users

This demonstrates the powerful access control capabilities of OpenFGA when integrated with Trino.