
# Memorization vs RAG, and 4 Layers of Protection

In this notebook, you will:
  - Understand model memorization and how Differential Privacy (DP)
  mitigates it
  - Implement four security layers that work together to protect
  sensitive information while maintaining system utility.

  By combining these layers, we create a defense-in-depth approach
  where even if one layer fails, others provide backup protection.




# 1. Model Memorization & Differential Privacy

  Large language models can sometimes memorize rare or sensitive data
  seen during training, a phenomenon known as **model memorization**. This
  creates **a risk that private information could unintentionally
  resurface in model outputs**. For example, an LLM trained on customer
  support tickets might inadvertently reproduce specific email
  addresses, account numbers, or confidential details when prompted in
  certain ways.


  In the real world, there is a technique called **differential privacy**
  (DP) which is designed to **prevent such leakage by adding controlled
  noise during training.** This ensures the model learns general patterns
   rather than exact records.


Imagine a group of employees wants to compute their average salary
  without revealing individual salaries:
  1. Each person adds a small random offset (e.g., +50€ or -120€)
  to their actual salary before reporting it
  2. Individually, the reported salaries are "noisy", so no one can
  determine a person's true salary
  3. However, when averaged across many people, the random offsets
  cancel out statistically
  4. The final average remains accurate while each individual's privacy
   is protected

  This is how differential privacy works: by
  injecting carefully calibrated noise during training, we ensure that
  no single data point (like one person's salary or one customer's
  email) can be reliably extracted from the model, while the overall
  learned patterns remain useful.


However, applying differential privacy it requires retraining the
  model from scratch with specialized algorithms. This is:
  - **Computationally expensive** - Requires massive GPU resources
  - **Time-intensive** - Training large models takes weeks or months
  - **Access-limited** - Most organizations don't control model training

**Our Solution: Retrieval-Augmented Generation (RAG)**

Instead of retraining models with DP, we take a more practical
  approach using Retrieval-Augmented Generation (RAG), where sensitive
  data remains outside the model weights. The model never sees private
  information during training - instead, it retrieves relevant context
  at query time from a controlled, secure database.

  Through complementary measures like:
  - **Data anonymization** - Removing PII before indexing
  - **Access control** - Filtering retrieval based on user permissions
  - **Prompt Constraint** - Instructing the model how to use retrieved
  information safely
  - **Output filtering** - Redacting any sensitive details that slip
  through

  ...we keep private information secure while still enabling the model
  to generate useful, context-aware responses.




In [None]:
# Installing libraries
!pip install openai pinecone tiktoken

In [None]:
# Importing
import json, re, math, uuid, ipaddress
import numpy as np
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, Any

# 2. Securing RAG


## 2.1 Loading the Data

We're loading the raw tickets dataset, which contains unprocessed support ticket data as it would appear in a production system.

In [None]:
  import json

  # Load raw tickets
  raw_tickets = []
  with open('support_tickets_raw.jsonl', 'r') as f:
      for line in f:
          raw_tickets.append(json.loads(line))

  # Load sanitized tickets
  sanitized_tickets = []
  with open('support_tickets_sanitized.jsonl', 'r') as f:
      for line in f:
          sanitized_tickets.append(json.loads(line))

  print(f"Loaded {len(raw_tickets)} raw tickets")
  print(f"Loaded {len(sanitized_tickets)} sanitized tickets")

Let's examine what this data looks like.

Each ticket contains standard support information:
  - Organizational metadata: org, department, severity, issue_type
  - Ticket identifiers: ticket_id, customer_id, asset
  - Contact information: contact_name, contact_email
  - Technical details: account_number, source_ip, tags
  - Content: body (initial report) and conversation (back-and-forth
  messages)


Notice the sensitive information scattered throughout this ticket:
  - Personal names: "Grace Williams"
  - Email addresses: grace.williams@ironpeakinsurance.com
  - Account numbers: 74353833 (8-digit identifier)
  - IP addresses: 233.168.89.205
  - Asset identifiers: WS-1053 (potentially links to a specific
  person's workstation)

In [None]:
raw_tickets[14]

If we were to index this data directly into a vector database and expose it through a RAG system, we'd face several risks such as privacy violations, compliance issues, data leakage, etc.

In the next section, we'll tackle the first layer: Data anonymization.


## 2.2 Data Anonymization

The first step is protecting sensitive data by removing PII (Personally Identifiable Information) so that they never enter the system. PII includes details like names, emails, phone numbers, account numbers and IP addresses - anything that could identify a specific person. We will **replaces each sensitive element with a consistent placeholder**. This way, the data still retains enough structure to be useful for analysis and retrieval.

This approach has two key benefits:
1. **Risk reduction**: even if a query slips through access control or a model tries to reveal more than it should, the sensitive data simply isn’t there to give away.
2. **Utility preservation**: placeholders remain consistent across documents, so the system can still link related tickets or detect patterns without exposing private details.

In [None]:
# Define the token formats we’ll use for each PII type
PLACEHOLDER_FMT = {
    "email":  "EMAIL_{:04d}",
 #   "phone":  "PHONE_{:04d}",
    "account":"ACCT_{:04d}",
    "ip":     "IP_{:04d}",
    "person": "PERSON_{:04d}",
}

# Global token vault:
# Keeps a STABLE, GLOBAL mapping: original value -> placeholder (per type)
# Ensures the same email, etc. always maps to the same token across tickets
class TokenVault:
    """
    Maps original values -> stable placeholders, per type.
    Example: vault.get_token("email", "alice@example.com") -> "{{EMAIL_0001}}"
    """
    def __init__(self):
        self._maps: Dict[str, Dict[str, str]] = {t:{} for t in PLACEHOLDER_FMT}
        self._counters: Dict[str, int] = {t:0 for t in PLACEHOLDER_FMT}

    def _normalize_key(self, typ: str, value: str) -> str:
        # Normalize by type (e.g., emails to lowercase)
        v = value.strip()
        if typ == "email":
            v = v.lower()
        elif typ == "ip":
            v = v.strip()
        elif typ == "account":
            v = re.sub(r"\D", "", v)
        elif typ == "person":
            v = re.sub(r"\s+", " ", v)
        return v

    def get_token(self, typ: str, value: str) -> str:
        # Return the existing token for (typ,value) OR create a new one
        # Final tokens are wrapped like "{{EMAIL_0001}}"
        assert typ in PLACEHOLDER_FMT, f"Unknown token type: {typ}"
        key = self._normalize_key(typ, value)
        if not key:
            return value
        if key not in self._maps[typ]:
            self._counters[typ] += 1
            token = "{{" + PLACEHOLDER_FMT[typ].format(self._counters[typ]) + "}}"
            self._maps[typ][key] = token
        return self._maps[typ][key]

    def stats(self) -> Dict[str, int]:
        return {k: len(v) for k, v in self._maps.items()}


# Regexes (free-text pass)
# -------------------------
# Patterns to find PII inside unstructured text fields (body, messages)
EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
# Long numbers that might be account IDs
ACCOUNT_RE = re.compile(r"\b\d{8,12}\b")  # generic long number (use with caution in free text)
# IPv4 candidates in text. We’ll validate them with ipaddress before replacing.
IP_CANDIDATE_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")


# Helpers
# -------------------------
# Find/replace emails in free text
def replace_emails(text: str, vault: TokenVault) -> str:
    return EMAIL_RE.sub(lambda m: vault.get_token("email", m.group(0)), text)

def replace_accounts_text(text: str, vault: TokenVault) -> str:
    # Find/replace long numeric sequences as account IDs in free text.
    def repl(m):
        num = m.group(0)
        return vault.get_token("account", num)
    return ACCOUNT_RE.sub(repl, text)

def replace_ips_text(text: str, vault: TokenVault) -> str:
    # Find/validate/replace IPv4 addresses in free text.
    def repl(m):
        cand = m.group(0)
        try:
            ipaddress.IPv4Address(cand)
            return vault.get_token("ip", cand)
        except Exception:
            return cand
    return IP_CANDIDATE_RE.sub(repl, text)

def anonymize_free_text(text: str, vault: TokenVault) -> str:
    # Apply free-text anonymization
    t = text
    t = replace_emails(t, vault)
 #   t = replace_phones_text(t, vault)
    t = replace_ips_text(t, vault)
    t = replace_accounts_text(t, vault)
    return t


# Field-level anonymization
# -------------------------
STRUCTURED_FIELD_RULES = [
    # Define which structured fields to anonymize, with an optional validator
    (("contact_email",), "email",  None),
    (("account_number",),"account", None),
    (("source_ip",),     "ip",     lambda s: _is_ipv4(s:=str(s))),
    (("contact_name",),  "person", None),
]

def _is_ipv4(s: str) -> bool:
    # True if s is a valid IPv4 address
    try:
        ipaddress.IPv4Address(s)
        return True
    except Exception:
        return False

def set_in(obj: Dict[str, Any], path: tuple, value: Any) -> None:
    # Write a value at a dotted path in a dict (shallow)
    d = obj
    for p in path[:-1]:
        d = d.get(p, {})
    d[path[-1]] = value

def get_in(obj: Dict[str, Any], path: tuple) -> Any:
    # Read a value at a dotted path in a dict; return None if missing
    d = obj
    for p in path:
        if not isinstance(d, dict) or p not in d:
            return None
        d = d[p]
    return d

def anonymize_ticket(ticket: Dict[str, Any], vault: TokenVault) -> Dict[str, Any]:
    # Produce an anonymized copy of a single ticket:
    # 1) Structured fields: precise tokenization using rules above.
    # 2) Free-text fields: regex-based replacement (body + conversation messages)
    t = json.loads(json.dumps(ticket))

    # 1) Structured fields (email, account, IP, person)
    for path, typ, validator in STRUCTURED_FIELD_RULES:
        val = get_in(t, path)
        if not isinstance(val, str):
            continue
        if validator is None or validator(val):
            token = vault.get_token(typ, val)
            set_in(t, path, token)

    # 2) Free text fields (body)
    if isinstance(t.get("body"), str):
        t["body"] = anonymize_free_text(t["body"], vault)

    # 2b) Free text in conversation messages
    conv = t.get("conversation", [])
    if isinstance(conv, list):
        new_conv = []
        for turn in conv:
            if not isinstance(turn, dict):
                new_conv.append(turn); continue
            msg = turn.get("message")
            if isinstance(msg, str):
                turn = dict(turn)
                turn["message"] = anonymize_free_text(msg, vault)
            new_conv.append(turn)
        t["conversation"] = new_conv

    return t

def anonymize_corpus(tickets: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    # Anonymize a list of tickets
    vault = TokenVault()
    out = [anonymize_ticket(t, vault) for t in tickets]
    # You can inspect vault.stats() if you want counts per type
    return out

Now, we'll call `anonymize_corpus` function on "raw_tickets". This will return a new list of tickets where PII fields have been replaced with placeholders:

In [None]:
sanitized = anonymize_corpus(raw_tickets)
out_path = Path("support_tickets_sanitized.jsonl")
with out_path.open("w", encoding="utf-8") as f:
    for doc in sanitized:
        f.write(json.dumps(doc, ensure_ascii=False) + "\n")

print("Wrote:", out_path)

To persist the anonymized data, write it out in JSON Lines:

In [None]:
  sanitized_path = "support_tickets_sanitized.jsonl"

  def load_jsonl(file_path):
      """Load JSONL file"""
      with open(file_path, 'r', encoding='utf-8') as f:
          return [json.loads(line) for line in f if line.strip()]

  # Load the sanitized tickets
  sanitized_tickets = load_jsonl(sanitized_path)

  print(f"✅ Loaded {len(sanitized_tickets)} sanitized tickets")

And now let's load them back take a look at one support ticket (raw and sanitized) to confirm placeholders look right:

In [None]:
# Raw ticket
raw_tickets[10]

Notice what was sanitized:
  - 'contact_name': '{{PERSON_0011}}' - Real name replaced
  - 'contact_email': '{{EMAIL_0011}}' - Email anonymized
  - 'account_number': '{{ACCT_0011}}' - Account number tokenized
  - 'source_ip': '{{IP_0011}}' - IP address replaced

In [None]:
# Sanitized ticket
sanitized_tickets[10]


## 2.3 Access Control - Preparing Metadata

The second layer is about **ensuring that users can only retrieve information they're authorized to see**.

  Access control in RAG works differently than in traditional
  databases. Instead of checking permissions after retrieval, **we embed
  access rules directly into the metadata that travels with each
  document chunk**. When a user queries the system, their permissions are
   translated into filters that the vector database applies during the
  search itself - so unauthorized data never even reaches the retrieval
   stage.

  Think of metadata as a set of security labels attached to each
  ticket. These labels answer questions like:
  - Who owns this data? (Which organization?)
  - What department handles this? (HR, IT, Security?)
  - Can customers see this? (Internal-only vs. customer-facing?)

  At query time, we translate the user's permissions into database
  filters. For example:


1. **Tenant isolation** (multi-customer separation):
  - Business rule: Users from Zephyr Telecom should never see tickets
  from IronPeak Insurance
  - How it works: Each ticket has an **org** field in metadata
  - Filter logic: ticket.org == user.org
  - Example: When a Zephyr Telecom employee queries the system, we add
  a filter `{"org": "Zephyr Telecom"}` - Pinecone will only search within
   Zephyr's tickets

2. **Department scoping**
  - Business rule: HR staff should only see HR-related tickets, not
  Security or IT tickets
  - How it works: Each ticket has a **department** field in metadata
  - Filter logic: ticket.department IN user.allowed_departments
  - Example: An HR employee's query includes filter `{"department":
  {"$in": ["HR"]}}` - they can't accidentally retrieve Security incident
   reports
  
3. **Visibility flags** (internal vs. customer-facing)
  
  - Business rule: External customers shouldn't see internal
  troubleshooting notes
  - How it works: Each ticket has a **customer_visible** boolean flag
  - Filter logic: If user is a customer, filter ticket.customer_visible
   == true
  - Example: Internal analysts see everything. cĆustomers only see
  tickets marked as customer-facing

 Let's attach these access-control labels to our sanitized tickets:

In [None]:
def add_metadata(sanitized_tickets):
    enriched = []
    for t in sanitized_tickets:
        t2 = dict(t)  # making a copy
        t2["metadata"] = {
            "org": t.get("org"),
            "department": t.get("department"),
            "customer_visible": True,
            "tags": t.get("tags", []),
        }
        enriched.append(t2)
    return enriched

In [None]:
# Adding metadata to sanitized tickets
enriched_tickets = add_metadata(sanitized_tickets)

  Let's inspect a ticket after enrichment. Notice the new metadata section at the bottom of the ticket.

In [None]:
# Inspecting
enriched_tickets[10]

## 2.4 Preparing for Embeddings

Now that our tickets are sanitized and enriched with metadata, the next step is to prepare them for embeddings. Instead of embedding an entire ticket as one big block of text, we’ll **break it into smaller, more meaningful pieces**.

Each ticket will become multiple rows:
- 1 row for the ticket body
- 1 row for each message in the conversation

Treating these smaller chunks as separate rows has two big benefits:
1. it **improves retrieval accuracy**: we can return the exact part of a ticket that matches a query
2. it **respects model limitations on input length**

Along with the text, each row will carry its own metadata, such as organization, department and visibility, which will later let us enforce access control when querying the database.

In [None]:
## Break it into smaller, more meaningful pieces

def ticket_to_rows(t):
    rows = []
    tid = t["ticket_id"]

    # this is the per-chunk "catalog metadata"
    base_md = {
        **t["metadata"],            # org, department, tags, customer_visible
        "ticket_id": tid,           # for provenance & catalog writes
        "created_at": t.get("created_at"),
    }

    body = (t.get("body") or "").strip()
    if body:
        rows.append({
            "id": f"{tid}:body",
            "text": body,
            "metadata": {**base_md, "part": "body"}
        })

    for i, turn in enumerate(t.get("conversation", [])):
        msg = (turn or {}).get("message", "").strip()
        if not msg:
            continue
        rows.append({
            "id": f"{tid}:conv:{i}",
            "text": msg,
            "metadata": {**base_md, "part": "conversation", "role": turn.get("role"), "idx": i}
        })
    return rows

In [None]:
# Apply ticket_to_rows() function to every ticket in the dataset
rows = [r for t in enriched_tickets for r in ticket_to_rows(t)]

In [None]:
# Peek at the first few rows
for r in rows[:5]:
    print(json.dumps(r, indent=2))

There's an important security benefit to this chunking strategy:
  **fine-grained access control enforcement**. By splitting tickets into
  individual message-level chunks, each with its own metadata, we
  ensure that access control filters apply to every single piece of
  content independently.

  For example, if a support conversation contains both customer
  messages and internal notes, we could theoretically mark them with
  different customer_visible flags. When a customer queries the system,
   they'd only retrieve their own messages, not the internal
  discussion. While our current dataset marks everything as
  customer-visible for simplicity, this architecture supports much more
   granular policies in production systems.

  Additionally, smaller chunks mean **less context leakage** - if a query
  retrieves 3 conversation turns instead of an entire 20-message
  ticket, there's less chance of accidentally exposing unrelated
  sensitive information through the LLM's response.

## 2.5 Creating Embeddings

Now that we have our chunks prepared with their metadata, we need to convert the text into embeddings. We will use embedding model `all-MiniLM-L6-v2`:

In [None]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2", device = "cpu")

We batch-encode all our chunks at once for efficiency. The model processes 64 chunks at a time.

In [None]:
texts = [r["text"] for r in rows]

# Embedding in batches
embeddings = model.encode(
    texts,
    batch_size = 64,
    show_progress_bar = True,
    convert_to_numpy = True,
    normalize_embeddings = True
)

# Attaching embeddings back to rows as Python lists
for r, vec in zip(rows, embeddings):
    r["values"] = vec.tolist()

In [None]:
vectors = [
    {"id": r["id"], "values": r["values"], "metadata": r["metadata"]}
    for r in rows
]

Each row contains:
  - "id": Unique identifier
  - "metadata": Access control labels (org, department, etc.)
  - "values": 384-dimensional embedding vector

In [None]:
vectors[0]

## 2.6 Upserting Data into Pinecone

Now we'll upload the embeddings into Pinecone:

In [None]:
from google.colab import userdata
from pinecone import Pinecone, ServerlessSpec

# Load API key from Google Colab secrets
pinecone_client = Pinecone(api_key=userdata.get('PINECONE_API_KEY'))

In [None]:
# Creating an Index
pinecone_client.create_index(name = "support-tickets-demo",
                             dimension = 384,
                             metric = "cosine",
                             spec = ServerlessSpec(
                                 cloud = "aws",
                                 region = "us-east-1"
                             ))

In [None]:
index = pinecone_client.Index("support-tickets-demo")

 We upload our vectors in batches of 200 to avoid overwhelming the API:

In [None]:
BATCH = 200
total = len(vectors)

for i in range(0, total, BATCH):
    batch = vectors[i:i+BATCH]
    index.upsert(vectors=batch)

### 2.6.1 Storing Raw Data with SQLite

In production, the copy of our documents would be in a primary data store, usually a relational database like Postgres/MySQL or object storage (S3/GCS) referenced from a small catalog table. Pinecone holds only the embeddings plus minimal filterable metadata and a pointer back to the source.

In this demo we use `sqlite3` to create a simple local data store.

Let's understand how LLM will actually use it:
1. We'll query Pinecone with ACL filters and get back chunk IDs/pointers.
2. We'll fetch the raw text for those IDs from sqlite3.
3. We'll pass only those snippets into the LLM to compose the answer.

So this sets us up for the next section of this notebook - constraining the model - where we instruct the LLM to answer strictly from those snippets.

Let's create a content catalog which stores the raw text for each chunk (plus basic provenance like ticket ID, org, department and timestamps):

In [None]:
import sqlite3
from pathlib import Path

CATALOG_PATH = Path("chunk_catalog.sqlite")

def init_catalog(db_path=CATALOG_PATH):
    # Open (or create) the SQLite database file
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    cur.executescript("""
    PRAGMA journal_mode = WAL;
    PRAGMA synchronous = NORMAL;

    CREATE TABLE IF NOT EXISTS chunks (
      id         TEXT PRIMARY KEY,   -- e.g. "87428682:conv:2"
      ticket_id  TEXT,
      part       TEXT,               -- 'body' | 'conversation'
      idx        INTEGER,            -- turn index for conversation
      org        TEXT,
      department TEXT,
      created_at TEXT,
      text       TEXT                -- raw chunk text lives here (not in Pinecone)
    );

    CREATE INDEX IF NOT EXISTS idx_chunks_ticket ON chunks(ticket_id);
    CREATE INDEX IF NOT EXISTS idx_chunks_org_dept ON chunks(org, department);
    """)
    conn.commit()
    conn.close()

def write_chunks_to_catalog(rows, db_path=CATALOG_PATH):
    # Insert/overwrite chunk rows produced by ticket_to_rows()
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    data = []
    for r in rows:
        md = r["metadata"]
        data.append((
            r["id"],
            md.get("ticket_id"),
            md.get("part"),
            md.get("idx"),
            md.get("org"),
            md.get("department"),
            md.get("created_at"),
            r["text"],
        ))
    cur.executemany(
        "INSERT OR REPLACE INTO chunks (id, ticket_id, part, idx, org, department, created_at, text) VALUES (?,?,?,?,?,?,?,?)",
        data
    )
    conn.commit()
    conn.close()

# Initialization + load the current batch of chunks
init_catalog()
write_chunks_to_catalog(rows)
print("Catalog ready")

Before we run real queries, we sanity-check the vector index: confirm the index is ready and try fetching a single known vector by ID:

In [None]:
# Index readiness
desc = pinecone_client.describe_index("support-tickets-demo")
print("Index ready:", desc.status.get("ready"))

probe_id = rows[0]["id"]
fetched = index.fetch(ids=[probe_id])

exists = probe_id in (fetched.vectors or {})
print("Fetch exists in Pinecone:", exists)

if exists:
    vec_obj = fetched.vectors[probe_id]
    # Values
    vals = vec_obj.values
    print("Dimensions:", len(vals))

    # Metadata
    md = vec_obj.metadata
    print("Metadata:", md)

    # ACL filters
    if md:
        print("organization:", md.get("org"), "| department:", md.get("department"),
              "| part:", md.get("part"), "| idx:", md.get("idx"))

In [None]:
# Check some organization/department we expect to exist
conn = sqlite3.connect(CATALOG_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM chunks WHERE org=? AND department=?", ("Zephyr Telecom", "Security"))
print("Zephyr/Security chunks:", cursor.fetchone()[0])

## 2.7 Querying the Database - Testing ACL Filtering

Now it's time to test that our access control layer is working correctly before we add the LLM.

 In this section we're not generating answers yet - we need to verify that:
  1. ✅ Different users see different data based on their organization
  2. ✅ Filters are applied at the database level (not after retrieval)
  3. ✅ The metadata we added in Section 2.3 successfully controls what
   gets retrieved

We'll ask the same question twice, but pretend to be users from two different organizations:



In [None]:
# TEST 1: User from Zephyr Telecom
user_org = "Zephyr Telecom"
allowed_depts = {"Security"}
acl_filter = {
    "$and": [
        {"org": {"$eq": user_org}},
        {"department": {"$in": list(allowed_depts)}},
        {"customer_visible": {"$eq": True}},
    ]
}

We'll encode the question into an embedding with the same model used for our data:

In [None]:
# Encoding the question
question = "Okta group membership was wrong; what was the fix?"
q_vec = model.encode([question], convert_to_numpy=True, normalize_embeddings=False)[0].tolist()

Finally, we query the Pinecone with `acl_filter`:

In [None]:
# Querying
res = index.query(
    vector=q_vec,
    top_k=8,
    include_metadata=True,
    filter=acl_filter
)

Expected result is that only Zephyr Telecom's Security tickets are returned

In [None]:
# Returned matches
matches = res.get("matches", [])
print(f"Got {len(matches)} match(es).")
for m in matches:
    mid = m["id"]
    score = m["score"]
    md = m.get("metadata", {})
    print(f"{mid:25s}  score={score:.3f}  org={md.get('org')}  dept={md.get('department')}  part={md.get('part')}  role={md.get('role')}")

We can try the same query with a different organization in the filter, such as "BlueShield Bank". Pinecone returned different chunks from BlueShield Bank tickets. That means our ACL filter is working: we’re not seeing Zephyr data anymore. We’re seeing BlueShield’s own relevant tickets about Okta-like issues.

In [None]:
# TEST 2: User from BlueShield Bank
user_org = "BlueShield Bank"
allowed_depts = {"Security"}  # tweak to test
acl_filter = {
    "$and": [
        {"org": {"$eq": user_org}},
        {"department": {"$in": list(allowed_depts)}},
        {"customer_visible": {"$eq": True}},
    ]
}

# Encoding the question
question = "Okta group membership was wrong; what was the fix?"
q_vec = model.encode([question], convert_to_numpy=True, normalize_embeddings=False)[0].tolist()

# Querying
res = index.query(
    vector=q_vec,
    top_k=8,
    include_metadata=True,
    filter=acl_filter
)

# Returned matches
matches = res.get("matches", [])
print(f"Got {len(matches)} match(es).")
for m in matches:
    mid = m["id"]
    score = m["score"]
    md = m.get("metadata", {})
    print(f"{mid:25s}  score={score:.3f}  org={md.get('org')}  dept={md.get('department')}  part={md.get('part')}  role={md.get('role')}")


## 2.8 Prompt Constraints

We've now confirmed that our access control filters work - users only retrieve data they're authorized to see. But there's still a risk: what if the LLM invents information, quotes sensitive details verbatim, or generates content beyond what's in our retrieved chunks?

This is where Layer 3: Prompt Constraints (the "Chaperone") comes in. **We'll tell the LLM exactly how it's allowed to behave through strict instructions in the system prompt.**

Why Constraint Matters:

  Without constraints, an LLM might:
  - ❌ Hallucinate plausible-sounding but false information
  - ❌ Quote anonymized placeholders like {{EMAIL_0011}} verbatim
  (looks odd to users)
  - ❌ Mix retrieved facts with its training knowledge (breaking trust)
  - ❌ Fail to cite sources (making answers unverifiable)

  With constraints, we guide the model to:
  - ✅ Answer only from retrieved snippets
  - ✅ Summarize naturally (don't quote placeholder tokens)
  - ✅ Avoid mentioning any PII or specific identifiers
  - ✅ Cite sources by number (e.g., [1], [2])
  - ✅ Admit when evidence is insufficient


**Building the Context Pipeline**

Before we can constrain the LLM, we need to prepare the retrieved chunks into a clean, numbered format that the model can reference.



**Step 1: Fetch the raw text**

We have chunk IDs from Pinecone - now we fetch their actual text from SQLite:



In [None]:
CATALOG_PATH = Path("chunk_catalog.sqlite")

def fetch_texts_by_ids(ids, db_path=CATALOG_PATH):
    # Fetch raw chunk text for a list of chunk IDs, preserving input order."""
    if not ids:
        return {}
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    qmarks = ",".join("?" * len(ids))
    cur.execute(f"SELECT id, text FROM chunks WHERE id IN ({qmarks})", ids)
    rows = cur.fetchall()
    conn.close()
    # Map found ids -> text
    found = dict(rows)
    # Preserve Pinecone order; missing ids map to ""
    return {i: found.get(i, "") for i in ids}

**Step 2: Format chunks with labels**

Next, we turn each chunk into a numbered snippet that the LLM can cite.

We'll create a function `build_context_from_matches()` that turns each Pinecone match into a small, numbered snippet such as `[1 | f5bfddc9:body]\n<text>`. The numbers preserve relevance order and give the model (and us) stable citation handles like `[1], [2]` to reference in the answer. We trim long snippets to keep the prompt compact so the LLM focuses on the most useful details, and we skip any IDs that didn’t resolve to text to avoid noise.

The result is a clean context: answers can cite exactly which chunk supported each claim, making the system more explainable.

In [None]:
def build_context_from_matches(matches, id_to_text, max_chars=4000):
    """
    Create labeled context blocks like:
    [1 | f5bfddc9:body]
    <snippet>

    Returns (context_str, used_ids)
    """
    blocks = []
    used_ids = []
    total = 0
    for i, m in enumerate(matches, start=1):
        cid = m["id"]
        txt = (id_to_text.get(cid) or "").strip()
        if not txt:
            continue
        # Light truncation to avoid oversized prompts
        snippet = txt if len(txt) <= 800 else (txt[:800] + " …")
        block = f"[{i} | {cid}]\n{snippet}"
        if total + len(block) + 2 > max_chars:
            break
        blocks.append(block)
        used_ids.append(cid)
        total += len(block) + 2
    return "\n\n".join(blocks), used_ids

**The Constrained Answer Function**

Now we create the function that actually calls the LLM with strict rules.




In [None]:
import os

# Configure OpenAI API key
OPENAI_API_KEY = None

try:
    from google.colab import userdata  # type: ignore
    OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
    if OPENAI_API_KEY:
        print('✅ API key loaded from Colab secrets')
except Exception:
    pass

if not OPENAI_API_KEY:
    OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')

if not OPENAI_API_KEY:
    try:
        from getpass import getpass
        print('💡 To use Colab secrets: Go to 🔑 (left sidebar) → Add new secret → Name: OPENAI_API_KEY')
        OPENAI_API_KEY = getpass('Enter your OpenAI API Key: ')
    except Exception as exc:
        raise ValueError('❌ ERROR: No API key provided! Set OPENAI_API_KEY as an environment variable or Colab secret.') from exc

if not OPENAI_API_KEY or OPENAI_API_KEY.strip() == '':
    raise ValueError('❌ ERROR: No API key provided!')

os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY

print('✅ Authentication configured!')

OPENAI_MODEL = 'gpt-5-nano'  # Using gpt-5-nano for cost efficiency
print(f'🤖 Selected Model: {OPENAI_MODEL}')


The rules for a model are simple:
  - 🔒 Scope limitation: "Answer ONLY using the provided context
  snippets"
  - 🔒 Style guidance: "Summarize; do not quote verbatim" (avoids
  echoing {{EMAIL_0011}})
  - 🔒 PII protection: Explicit list of what NOT to include
  - 🔒 Citation requirement: Forces transparency with numbered
  references
  - 🔒 Fallback behavior: If context is insufficient, say so (no
  hallucination)


In [None]:
from openai import OpenAI

client = OpenAI()

def answer_with_constrained_model(question, context):
    if not context.strip():
        return (
            "No eligible context was found for this user. "
            "Please refine the query or check permissions."
        )

    resp = client.responses.create(
        model=OPENAI_MODEL,
        input=(
            "You are a helpful security support assistant. "
            "Answer ONLY using the provided context snippets. "
            "Summarize; do not quote verbatim. "
            "Do NOT include PII or specifics (no names, emails, phone numbers, "
            "IPs, account numbers, dates, project codes, or dollar figures). "
            "If specifics are essential, use generic placeholders like [REDACTED]. "
            "If the context is insufficient, say so and suggest safe next steps.\n\n"
            f"SOURCES:\n{context}\n\n"
            f"QUESTION: {question}\n\n"
            "Answer in 3-5 sentences and cite sources by their bracket labels, e.g., [1], [2]."
        ),
    )

    return resp.output_text


**Testing the Constrained Pipeline**

Let's see the full workflow in action. We'll simulate a Zephyr Telecom Security user asking about a phishing incident:





In [None]:
# Step 1: Set up access control for the user
user_org = "Zephyr Telecom"
allowed_depts = {"Security"}
acl_filter = {
    "$and": [
        {"org": {"$eq": user_org}},
        {"department": {"$in": list(allowed_depts)}},
        {"customer_visible": {"$eq": True}},
    ]
}

Then we turn the question into an embedding and run a similarity search, asking Pinecone to return metadata and applying the ACL filter:

In [None]:
# Step 2: Encode the question and search Pinecone (with ACL filters applied)

# User question
question = "I clicked a phishing link and my endpoint is acting weird"

# Encoding
q_vec = model.encode([question], convert_to_numpy = True, normalize_embeddings = False)[0].tolist()

# Querying
res = index.query(
    vector = q_vec,
    top_k = 8,
    include_metadata = True,
    filter = acl_filter)

In [None]:
# ✅ Let's display adn verify what organizations/departments were actually retrieved
print("=" * 80)
print("RETRIEVAL RESULTS - Access Control Verification")
print("=" * 80)

matches = res.get("matches", [])
print(f"Total matches found: {len(matches)}\n")

if matches:
    # Show organization and department for each match
    print("Retrieved chunks (verifying ACL filtering):")
    for i, m in enumerate(matches, 1):
        md = m.get("metadata", {})
        print(
            f"  [{i}] {m['id']:25s} | "
            f"Org: {md.get('org', 'N/A'):20s} | "
            f"Dept: {md.get('department', 'N/A'):10s} | "
            f"Score: {m['score']:.3f}"
        )

    # Verify all are from the expected org/dept
    orgs = set(m.get("metadata", {}).get("org") for m in matches)
    depts = set(m.get("metadata", {}).get("department") for m in matches)

    print(f"\n✅ All retrieved chunks are from:")
    print(f"   Organizations: {orgs}")
    print(f"   Departments: {depts}")
    print(f"   (Expected: {user_org} / {allowed_depts})")
else:
    print("⚠️ No matches found with the given ACL filter")

print("=" * 80 + "\n")

# Step 3: Build the numbered context from matches
ids = [m["id"] for m in matches]


Finally, let's put that together: matches → context → answer

In [None]:
# Step 3: Build the numbered context from matches

# We grab the returned chunk IDs in ranked order. These are just pointers, they don’t contain the raw text.
matches = res.get("matches", [])
ids = [m["id"] for m in matches]

# Fetch raw text for those IDs
id_to_text = fetch_texts_by_ids(ids)

# Build labeled, trimmed context blocks
context, used_ids = build_context_from_matches(matches, id_to_text, max_chars=4000)
print("Context used:\n", context[:600], "...\n","\n-----------------\n")


In [None]:
# Step 4: Generate a constrained answer

# Ask the model under strict rules
final_answer = answer_with_constrained_model(question, context)
print("Model's answer: ","\n", final_answer)

What happens here:

  1. ACL filtering ensures only Zephyr Security tickets are searched
  2. Semantic search finds the 8 most relevant chunks about
  phishing/endpoint issues
  3. Context building formats them with citation numbers
  4. Constrained LLM generates an answer that:
    - Cites specific chunks (e.g., "According to [1] and [3]...")
    - Summarizes the fix steps without exposing PII
    - Stays within the boundaries of retrieved content
    - Provides actionable guidance

Even with these constraints, there's still a small risk that something slips through. In Section 2.9, we'll add Layer 4 (Output Filtering) as a final safety net.

## 2.9 Output Filtering and Guardrails


The last layer scans the model's output just before showing it to the user and redacts any suspicious patterns that match PII formats.

We use regex patterns to catch:
  - Emails: user@example.com → [REDACTED_EMAIL]
  - Account numbers: 8-12 digit sequences → [REDACTED_ACCOUNT]
  - IP addresses: IPv4/IPv6 → [REDACTED_IP]
  - Customer IDs: CUST-2407 → [REDACTED_CUSTOMER_ID]
  - Asset IDs: WS-1054 → [REDACTED_ASSET_ID]
  - Names: Two capitalized words (heuristic) → [REDACTED_NAME]

In [None]:
import re

# Emails
EMAIL_RE = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b')

# 8–12 digit account-like numbers
ACCOUNT_RE = re.compile(r'(?<!\d)\d{8,12}(?!\d)')

# Customer IDs like CUST-2407
CUSTOMER_ID_RE = re.compile(r'\bCUST-\d{3,6}\b', re.IGNORECASE)

# Asset IDs like WS-1054 (two letters + dash + 3–6 digits)
ASSET_ID_RE = re.compile(r'\b[A-Z]{2,4}-\d{3,6}\b')

# IPv4 0–255 per octet
IPV4_RE = re.compile(
    r'\b(?:(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){3}'
    r'(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\b'
)

# Basic IPv6 (not exhaustive but useful)
IPV6_RE = re.compile(r'\b(?:[A-Fa-f0-9]{1,4}:){7}[A-Fa-f0-9]{1,4}\b')

# Very light "name-like" pattern (two capitalized words).
NAME_RE = re.compile(r'\b([A-Z][a-z]+)\s+([A-Z][a-z]+)\b')

  ⚠️ Note on NAME_RE: This pattern catches two consecutive capitalized
  words, which is a simple heuristic for person names. However, it can
  also match non-person phrases like "Active Directory" or "Conditional
   Access". To reduce false positives, you could:
  - Add a whitelist of common technical terms to exclude
  - Only apply name detection when "person cues" appear nearby (e.g.,
  "contact", "reported by")
  - Use a lightweight NER model like spaCy for more accurate person
  detection

In [None]:
 def redact_output(output: str) -> str:
    """
    Replace PII-ish patterns in the model's output with placeholders.
    Designed around our dataset: emails, 8–12 digit accounts, IPs, customer & asset IDs,
    and (optionally) name-like strings.
    """
    s = output

    # Order can reduce false positives: more specific first, then broader
    s = EMAIL_RE.sub('[REDACTED_EMAIL]', s)
    s = IPV4_RE.sub('[REDACTED_IP]', s)
    s = IPV6_RE.sub('[REDACTED_IP]', s)
    s = CUSTOMER_ID_RE.sub('[REDACTED_CUSTOMER_ID]', s)
    s = ASSET_ID_RE.sub('[REDACTED_ASSET_ID]', s)
    s = ACCOUNT_RE.sub('[REDACTED_ACCOUNT]', s)
    s = NAME_RE.sub('[REDACTED_NAME]', s)

    return s

Why the order matters: We apply specific patterns (like CUSTOMER_ID_RE) before generic ones (like ACCOUNT_RE) to avoid over-redaction. For example, "CUST-2407" should become [REDACTED_CUSTOMER_ID], not get partially matched by the account number pattern.



Now we tie all four layers together into a single function.

  What happens behind the scenes:

  1. Layer 1 (Anonymization): The indexed data already has PII replaced
   with tokens like {{EMAIL_0011}}
  2. Layer 2 (ACL): Only Zephyr Telecom Security tickets are searched
  3. Layer 3 (Constraints): The LLM summarizes without quoting exact
  tokens, cites sources by number
  4. Layer 4 (Redaction): If any PII-like patterns appear in the answer
   (e.g., the model says "Contact john.doe@example.com"), they're
  caught and replaced with [REDACTED_EMAIL].


In [None]:
def secure_rag_answer(
    *,
    question: str,
    index,
    model,
    acl_filter: dict,
    top_k: int = 8,
    namespace: str | None = None,
    max_context_chars: int = 4000,
):
    """
    End-to-end solution: ACL-filtered search -> fetch texts -> labeled context -> constrained LLM -> redaction
    Returns (context, safe_answer)
    """
    # 1) Encode and query Pinecone with ACLs
    q_vec = model.encode([question], convert_to_numpy=True, normalize_embeddings=False)[0].tolist()
    res = index.query(
        vector = q_vec,
        top_k = top_k,
        include_metadata = True,
        filter = acl_filter,
        **({"namespace": namespace} if namespace is not None else {})
    )
    matches = res.get("matches", [])
    ids = [m["id"] for m in matches]

    # 2) Fetch raw text from SQLite and build labeled context
    id_to_text = fetch_texts_by_ids(ids)
    context, _used_ids = build_context_from_matches(matches, id_to_text, max_chars=max_context_chars)

    # 3) Constrained answer, then redact
    draft = answer_with_constrained_model(question, context)
    safe_answer = redact_output(draft)

    return context, safe_answer


**Testing the Complete Pipeline**

Let's see all four layers working together.

In [None]:
# User question
question = "I clicked a phishing link and my endpoint is acting weird"

context, safe_answer = secure_rag_answer(
    question = question,
    index = index,
    model = model,
    acl_filter = acl_filter,
    top_k = 8,
    namespace = None,
    max_context_chars = 4000
)

print("Context used:\n", context, "...\n","\n-----------------\n")
print("Model's answer:\n", safe_answer)

In [None]:
# User question
question = "Hi, my workstation popped up a malware alert from the endpoint agent."

context, safe_answer = secure_rag_answer(
    question = question,
    index = index,
    model = model,
    acl_filter = acl_filter,
    top_k = 8,
    namespace = None,
    max_context_chars = 4000
)

print("Context used:\n", context, "...\n","\n-----------------\n")
print("Model's answer:\n", safe_answer)