In [None]:
!pip install boto3 faiss-cpu sentence-transformers pypdf PyPDF2 PyCryptodome weaviate-client nltk pdfminer.six scikit-learn

import io
import os
import uuid
import boto3
import nltk
import string
from PyPDF2 import PdfReader
from sentence_transformers import SentenceTransformer
from nltk.tokenize import sent_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
from google.colab import userdata
import weaviate
from weaviate.classes.init import Auth
import weaviate.classes.config as wc

nltk.download('punkt')
nltk.download('punkt_tab')

AWS_ACCESS_KEY = userdata.get("AWS_ACCESS_KEY")
AWS_SECRET_KEY = userdata.get("AWS_SECRET_KEY")
S3_BUCKET = "rag-vector-db-poc"
embed_model = SentenceTransformer("all-MiniLM-L6-v2")


s3 = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY
)

host = "34.228.255.23"
weaviate_api_key = None

weaviate_client = weaviate.connect_to_custom(
    http_host=host,
    http_port=8080,
    http_secure=False,
    grpc_host=host,
    grpc_port=50051,
    grpc_secure=False,
    auth_credentials=Auth.api_key(weaviate_api_key) if weaviate_api_key else None,
)

print("Client is ready:", weaviate_client.is_ready())

WEAVIATE_COLLECTION = "docs_chunks"

def create_weaviate_schema():
    if weaviate_client.collections.exists(WEAVIATE_COLLECTION):
        print(f"✅ Collection '{WEAVIATE_COLLECTION}' already exists.")
        return

    weaviate_client.collections.create(
        name=WEAVIATE_COLLECTION,
        properties=[
            wc.Property(name="text", data_type=wc.DataType.TEXT),
            wc.Property(name="s3_path", data_type=wc.DataType.TEXT),
            wc.Property(name="file_name", data_type=wc.DataType.TEXT),
            wc.Property(name="chunk_id", data_type=wc.DataType.TEXT),
            wc.Property(name="chunk_index", data_type=wc.DataType.INT),
            wc.Property(name="token_count", data_type=wc.DataType.INT),
            wc.Property(name="department", data_type=wc.DataType.TEXT),
            wc.Property(name="top_keywords", data_type=wc.DataType.TEXT_ARRAY),
        ],
        vectorizer_config=wc.Configure.Vectorizer.none(),
        generative_config=None
    )

    print(f"✅ Collection '{WEAVIATE_COLLECTION}' created.")

create_weaviate_schema()




[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


Client is ready: True
✅ Collection 'docs_chunks' already exists.


In [None]:
  # === S3 Utilities ===
  def list_text_and_pdf_keys(bucket, prefix=""):
      keys = []
      paginator = s3.get_paginator("list_objects_v2")
      for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
          for obj in page.get("Contents", []):
              key = obj["Key"]
              if key.endswith(".pdf") or key.endswith(".txt"):
                  keys.append(key)
      return keys

  def download_file_from_s3(bucket, key):
      response = s3.get_object(Bucket=bucket, Key=key)
      return io.BytesIO(response['Body'].read())

  # === Text Extraction ===
  def extract_text_from_pdf(pdf_io):
      reader = PdfReader(pdf_io)
      return "\n".join([page.extract_text() for page in reader.pages if page.extract_text()])

  def extract_text_from_txt(txt_io):
      return txt_io.read().decode("utf-8")

  # === Chunking ===
  def chunk_paragraphs(text, s3_path, chunk_token_limit=500):
      key_path = s3_path.replace("s3://", "").split("/", 1)[1]
      department = key_path.split("/")[0] if "/" in key_path else "root"
      file_name = os.path.basename(s3_path)

      sentences = sent_tokenize(text)
      chunks, current_chunk = [], []
      token_count = 0

      for sentence in sentences:
          tokens = sentence.split()
          token_len = len(tokens)
          if token_count + token_len > chunk_token_limit:
              chunks.append(" ".join(current_chunk))
              current_chunk = []
              token_count = 0
          current_chunk.append(sentence)
          token_count += token_len

      if current_chunk:
          chunks.append(" ".join(current_chunk))

      return [
          {
              "text": chunk,
              "s3_path": s3_path,
              "file_name": file_name,
              "chunk_id": f"{s3_path}_{i}",
              "chunk_index": i,
              "token_count": len(chunk.split()),
              "department": department,
              "top_keywords": extract_top_keywords(chunk)
          }
          for i, chunk in enumerate(chunks)
      ]


  # === Keyword Extraction ===
  def extract_top_keywords(text, top_n=5):
      try:
          vectorizer = TfidfVectorizer(stop_words="english", max_features=top_n)
          X = vectorizer.fit_transform([text])
          scores = X.toarray().flatten()
          return {word: round(score, 4) for word, score in zip(vectorizer.get_feature_names_out(), scores)}
      except Exception:
          return {}

  # === Embedding & Upload ===
  def embed_chunks(chunks):
      texts = [c["text"] for c in chunks]
      return embed_model.encode(texts, show_progress_bar=True)

def upload_chunks_to_weaviate(chunks, embeddings):
    collection = weaviate_client.collections.get(WEAVIATE_COLLECTION)

    for chunk, embedding in zip(chunks, embeddings):
        props = {
            "text": chunk["text"],
            "s3_path": chunk["s3_path"],
            "file_name": chunk["file_name"],
            "chunk_id": chunk["chunk_id"],
            "chunk_index": chunk["chunk_index"],
            "token_count": chunk["token_count"],
            "department": chunk["department"],
            "top_keywords": list(chunk.get("top_keywords", {}).keys())
        }
        collection.data.insert(properties=props, vector=embedding)

  # === Orchestration ===
  def process_and_upload_files(bucket, keys):
      for key in keys:
          try:
              print(f"📄 Processing: {key}")
              file_io = download_file_from_s3(bucket, key)

              if key.endswith(".pdf"):
                  text = extract_text_from_pdf(file_io)
              elif key.endswith(".txt"):
                  text = extract_text_from_txt(file_io)
              else:
                  print(f"Skipping unsupported file type: {key}")
                  continue

              chunks = chunk_paragraphs(text, s3_path=f"s3://{bucket}/{key}")
              embeddings = embed_chunks(chunks)
              upload_chunks_to_weaviate(chunks, embeddings)
              print(f"✅ Uploaded: {key}")
          except Exception as e:
              print(f"❌ Failed: {key}\n{e}")

  # === Run ===
  file_keys = list_text_and_pdf_keys(S3_BUCKET)
  process_and_upload_files(S3_BUCKET, file_keys)

📄 Processing: compliance/Arrakis.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: compliance/Arrakis.pdf
📄 Processing: compliance/Crash_Consistency.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: compliance/Crash_Consistency.pdf
📄 Processing: engineering/Exokernel.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: engineering/Exokernel.pdf
📄 Processing: engineering/FFS_Unix.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: engineering/FFS_Unix.pdf
📄 Processing: engineering/FireCracker.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: engineering/FireCracker.pdf
📄 Processing: engineering/ghOSt.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: engineering/ghOSt.pdf
📄 Processing: finance/Demikernel.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: finance/Demikernel.pdf
📄 Processing: finance/HeMem.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: finance/HeMem.pdf
📄 Processing: finance/scheduler-activations.pdf
❌ Failed: finance/scheduler-activations.pdf
Missed the stop code in LZWDecode!
📄 Processing: hr/Unix.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: hr/Unix.pdf
📄 Processing: hr/the_linux_schedule_a_decade_of_wasted_cores.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: hr/the_linux_schedule_a_decade_of_wasted_cores.pdf
📄 Processing: hr/xen_and_the_art_of_virtualization.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: hr/xen_and_the_art_of_virtualization.pdf
📄 Processing: legal/LogFS.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: legal/LogFS.pdf
📄 Processing: legal/Pond.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: legal/Pond.pdf
📄 Processing: legal/navarro.pdf


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Uploaded: legal/navarro.pdf


In [None]:
import uuid, time
import numpy as np
from collections import defaultdict
from sentence_transformers import SentenceTransformer
from openai import AzureOpenAI
from time import time
import random

AZURE_OPENAI_ENDPOINT = "https://ironclad-openai-001.openai.azure.com/"
AZURE_OPENAI_API_KEY = "936856630b764210913d9a8fd6c8212b"
AZURE_DEPLOYMENT_NAME = "gpt-4o"

azure_client = AzureOpenAI(
    api_key=AZURE_OPENAI_API_KEY,
    api_version="2023-05-15",
    azure_endpoint=AZURE_OPENAI_ENDPOINT
)

# ------------------- MONITORING -------------------
monitoring = {
    "access_count": defaultdict(int),
    "latencies": [],
    "query_log": []
}

# ------------------- RAG FUNCTIONS -------------------
def search_weaviate(query, k=5, department_filter=None):
    vec = embed_model.encode([query])[0]
    collection = weaviate_client.collections.get(WEAVIATE_COLLECTION)

    filters = None
    if department_filter:
        filters = weaviate.classes.query.Filter.by_property("department").equal(department_filter)

    results = collection.query.near_vector(
        near_vector=vec,
        limit=k,
        filters=filters
    )

    metadata_list = []
    for o in results.objects:
      props = o.properties
      score = o.metadata.score if o.metadata and o.metadata.score is not None else 0.0

      metadata = {
          "score": round(score, 4),
          "text": props.get("text", ""),
          "s3_path": props.get("s3_path", ""),
          "file_name": props.get("file_name", ""),
          "department": props.get("department", ""),
          "top_keywords": props.get("top_keywords", []),
      }
      metadata_list.append(metadata)

    return metadata_list



def build_prompt(query, top_chunks):
    context = ""
    source_refs = {}

    for i, chunk in enumerate(top_chunks):
        ref = f"[{i+1}]"
        source = chunk.get("s3_path", "unknown")
        context += f"{ref} ({source}):\n{chunk['text']}\n\n"
        source_refs[ref] = source

    prompt = f"""You are a helpful assistant. Use only the following context to answer the question.
Cite sources using [1], [2], etc., based only on the exact chunks below. Do not make up citations. Do not include sources not explicitly mentioned.

Context:
{context}

Question: {query}

Answer:"""
    return prompt, source_refs

def rag_query(query, k=5, department_filter='root'):
    start = time()
    chunks = search_weaviate(query, k, department_filter=department_filter)
    prompt, refs = build_prompt(query, chunks)

    response = azure_client.chat.completions.create(
        model=AZURE_DEPLOYMENT_NAME,
        messages=[
            {"role": "system", "content": "You are a helpful assistant. Use only the following context to answer the question. Cite sources using [1], [2], etc., based only on the exact chunks below. Do not make up citations. Do not include sources not explicitly mentioned."},
            {"role": "user", "content": prompt}
        ],
    )

    answer = response.choices[0].message.content.strip()
    latency = round((time() - start) * 1000, 2)

    for ref in refs.values():
        monitoring["access_count"][ref] += 1
    monitoring["latencies"].append(latency)
    monitoring["query_log"].append({
        "query": query,
        "sources": list(refs.values()),
        "latency_ms": latency
    })

    return answer, refs, latency, chunks

# ------------------- MAIN SCRIPT -------------------

questions = [
    "How does Arrakis ensure security and isolation for applications that have direct access to hardware devices, bypassing traditional kernel mediation?",
    "What are the key hardware support features required to enable Arrakis's direct I/O access model, and how do these features impact hardware complexity and cost?",
    "In what ways can the Arrakis architecture be extended or adapted to virtualized environments and multi-tenant cloud data centers, and what challenges might arise in such contexts?",
    "How does OptFS's approach to decoupling ordering and durability via osync() and dsync() primitives impact overall system performance and reliability?",
    "In what ways does optimistic crash consistency differ from traditional journaling or soft updates methods, and what are its primary advantages and potential drawbacks?",
    "How do the case studies with gedit and SQLite demonstrate the practical benefits and limitations of the proposed optimistic crash consistency techniques?",
    "How does Demikernel achieve nanosecond-scale I/O processing overheads while maintaining portability across heterogeneous kernel-bypass devices?",
    "In what ways does the PDPIX API improve programmability for µs-scale datacenter systems compared to traditional POSIX or existing kernel-bypass APIs?",
    "What challenges did the authors face when integrating networking (e.g., DPDK, RDMA) and storage (e.g., SPDK) libOSes in a single Demikernel datapath OS, and how were they addressed?",
    "How does Pond balance the trade-off between latency sensitivity and DRAM savings when determining VM memory allocation, and what role do its machine learning models play in this process?",
    "Given the increasing access latency with larger CXL memory pool sizes, what are the practical scalability limits of Pond's architecture, and how do these limits impact overall datacenter design?",
    "How does Pond’s zNUMA approach differ from traditional NUMA memory management, and what mechanisms ensure performance remains consistent even with incorrect memory usage predictions?",
    "How does HeMem's asynchronous sampling via PEBS compare in scalability and accuracy to traditional page table scanning for hot data identification, especially as memory capacity reaches terabyte scale?",
    "Given HeMem’s user-space implementation and its reliance on userfaultfd and DMA migration, what are the potential challenges or limitations in extending it to support kernel-level memory or shared memory scenarios in multi-tenant cloud environments?",
    "Considering that HeMem achieves significant reductions in NVM wear, what design principles can be abstracted and applied to emerging memory technologies (e.g., MRAM, ReRAM) with similar asymmetries in read/write performance or endurance?",
    "What architectural and implementation choices enabled Firecracker to achieve both low overhead and strong isolation compared to traditional hypervisors like QEMU?",
    "How does Firecracker’s design and integration with AWS Lambda enable fast function startup and efficient resource utilization at massive scale?",
    "In what ways does Firecracker’s minimal device model and use of Rust for VMM development contribute to reducing the trusted computing base (TCB) and improving security?",
    "What key design strategies did the authors implement to manage physical memory fragmentation and ensure sustained superpage performance under memory pressure?",
    "How does the reservation-based allocation mechanism proposed in the paper differ from eager promotion or relocation-based superpage management strategies used in other operating systems like HP-UX or IRIX?",
    "What are the primary trade-offs involved in the incremental promotion and speculative demotion of superpages, and how do these impact system performance and memory overhead?",
    "What are the main advantages of using a log-structured file system (LFS) compared to traditional Unix file systems, especially in handling small file workloads?",
    "How does Sprite LFS manage free space using segment cleaning, and what is the role of the cost-benefit policy in optimizing write performance?",
    "In what ways does the crash recovery mechanism in Sprite LFS leverage the log structure to improve reliability and recovery time, and how does this differ from traditional file system recovery approaches?"

]

user_roles = ["finance", "hr", "engineering", "legal", "compliance"]
selected_role = random.choice(user_roles)
print(f"Using RBAC role: {selected_role}\n")

start_time = time()
for i,q in enumerate(questions):
    print(f"\n========== QUESTION {i+1}: {q} ==========\n")
    answer, refs, latency, retrieved_chunks = rag_query(q, 5, selected_role)

    print("->> Answer:\n")
    print(answer)

    print("\n->> Top 5 Retrieved Chunks:\n")
    for i, chunk in enumerate(retrieved_chunks):
        print(f"[{i+1}] File: {chunk['file_name']}")
        print(f"    Path: {chunk['s3_path']}")
        print(f"    Department: {chunk.get('department', 'unknown')}")
        print(f"    Text: {chunk['text'][:50].strip()}...\n")

    print(f"->>Latency: {latency} ms")
    print("\n" + "=" * 50)

end_time = time()
total_time = end_time - start_time
qps = len(questions) / total_time if total_time > 0 else 0

# ------------------- SUMMARY -------------------

print("\n========== SUMMARY ==========\n")
print(f"Total Queries: {len(monitoring['query_log'])}")
print(f"Total Time: {total_time:.2f} sec")
print(f"Queries per Second (QPS): {qps:.2f}")
if monitoring["latencies"]:
    print(f"Average Latency: {np.mean(monitoring['latencies']):.2f} ms")
    print("\nTop Accessed Sources:")
    top_sources = sorted(monitoring["access_count"].items(), key=lambda x: x[1], reverse=True)
    for src, count in top_sources:
        print(f"• {src}: {count}x")


Using RBAC role: legal



->> Answer:

The provided context does not specifically address how Arrakis ensures security and isolation for applications that bypass traditional kernel mediation with direct hardware access. There is no direct reference to Arrakis in the given text, and without additional specific information about its mechanisms, the question cannot be accurately answered based on the current context.

->> Top 5 Retrieved Chunks:

[1] File: Pond.pdf
    Path: s3://rag-vector-db-poc/legal/Pond.pdf
    Department: legal
    Text: In Proceedings of
the 14th USENIX Symposium on Ope...

[2] File: navarro.pdf
    Path: s3://rag-vector-db-poc/legal/navarro.pdf
    Department: legal
    Text: USENIX Association
Proceedings of the
5th Symposiu...

[3] File: Pond.pdf
    Path: s3://rag-vector-db-poc/legal/Pond.pdf
    Department: legal
    Text: The EMC offers multiple CXL ports
and appears to e...

[4] File: Pond.pdf
    Path: s3://rag-vector-db-poc/legal/Pond.pdf
    Department: l