#High Fidelity Data Ingestion

Copyright 2025, Denis Rothman

**Goal:** This notebook transforms our basic data pipeline into a high-fidelity ingestion system, a crucial prerequisite for the verifiable, citation-capable AI we are building in Chapter 8. We will simulate the work of a secure "Data Management Department" by taking raw source documents and processing them into a structured, metadata-rich knowledge base.

This process involves three key steps:

* **Prepare a Curated Dataset:** We will create and load several sample Legal documents, simulating a secure, pre-vetted data source ready for our engine.

* **Enrich Data with Source Metadata:** This is the core upgrade. We will modify the ingestion process to tag every single data chunk with its original document source, a critical step that enables verifiability and citations.

* **Verify the Ingestion:** We will conclude by running a test query to inspect the vector database and confirm that our high-fidelity metadata has been successfully stored.



# 1.Installation and Setup

In [17]:
# 1.Installation and Setup
# -------------------------------------------------------------------------
# We install specific versions for stability and reproducibility.
# We include tiktoken for token-based chunking and tenacity for robust API calls.

In [18]:
!pip install tqdm==4.67.1 --upgrade
!pip install openai==1.104.2
!pip install pinecone==7.0.0 tqdm==4.67.1 tenacity==8.3.0



In [19]:
# Imports for this notebook
import json
import time
from tqdm.auto import tqdm
import tiktoken
from pinecone import Pinecone, ServerlessSpec
from tenacity import retry, stop_after_attempt, wait_random_exponential
# general imports required in the notebooks of this book
import re
import textwrap
from IPython.display import display, Markdown
import copy

In [20]:
# Imports and API Key Setup
# We will use the OpenAI library to interact with the LLM and Google Colab's
# secret manager to securely access your API key.

import os
from openai import OpenAI
from google.colab import userdata

# Load the API key from Colab secrets, set the env var, then init the client
try:
    api_key = userdata.get("API_KEY")
    if not api_key:
        raise userdata.SecretNotFoundError("API_KEY not found.")

    # Set environment variable for downstream tools/libraries
    os.environ["OPENAI_API_KEY"] = api_key

    # Create client (will read from OPENAI_API_KEY)
    client = OpenAI()
    print("OpenAI API key loaded and environment variable set successfully.")

except userdata.SecretNotFoundError:
    print('Secret "API_KEY" not found.')
    print('Please add your OpenAI API key to the Colab Secrets Manager.')
except Exception as e:
    print(f"An error occurred while loading the API key: {e}")

# Configuration
EMBEDDING_MODEL = "text-embedding-3-small"
EMBEDDING_DIM = 1536 # Dimension for text-embedding-3-small
GENERATION_MODEL = "gpt-5"

OpenAI API key loaded and environment variable set successfully.


In [21]:
try:
    # Standard way to access secrets securely in Google Colab
    from google.colab import userdata
    PINECONE_API_KEY = userdata.get('PINECONE_API_KEY')
    if not PINECONE_API_KEY:
        raise ValueError("API Keys not found in Colab secrets.")
    print("API Keys loaded successfully.")
except ImportError:
    # Fallback for non-Colab environments (e.g., local Jupyter)
    PINECONE_API_KEY = os.environ.get('PINECONE_API_KEY')
    if not PINECONE_API_KEY:
        print("Warning: API Keys not found. Ensure environment variables are set.")

API Keys loaded successfully.


## 2.Initialize Clients

In [22]:
# 2.Initialize Clients
# --- Initialize Clients (assuming this is already done) ---

# --- Initialize Pinecone Client ---
pc = Pinecone(api_key=PINECONE_API_KEY)

# --- Define Index and Namespaces (assuming this is already done) ---
INDEX_NAME = 'genai-mas-mcp-ch3'
NAMESPACE_KNOWLEDGE = "KnowledgeStore"
NAMESPACE_CONTEXT = "ContextLibrary"
spec = ServerlessSpec(cloud='aws', region='us-east-1')

# Check if index exists
if INDEX_NAME not in pc.list_indexes().names():
    print(f"Index '{INDEX_NAME}' not found. Creating new serverless index...")
    pc.create_index(
        name=INDEX_NAME,
        dimension=EMBEDDING_DIM,
        metric='cosine',
        spec=spec
    )
    # Wait for index to be ready
    while not pc.describe_index(INDEX_NAME).status['ready']:
        print("Waiting for index to be ready...")
        time.sleep(1)
    print("Index created successfully. It is new and empty.")
else:
    print(f"Index '{INDEX_NAME}' already exists. Clearing namespaces for a fresh start...")
    index = pc.Index(INDEX_NAME)
    namespaces_to_clear = [NAMESPACE_KNOWLEDGE, NAMESPACE_CONTEXT]

    for namespace in namespaces_to_clear:
        # Check if namespace exists and has vectors before deleting
        stats = index.describe_index_stats()
        if namespace in stats.namespaces and stats.namespaces[namespace].vector_count > 0:
            print(f"Clearing namespace '{namespace}'...")
            index.delete(delete_all=True, namespace=namespace)

            # **CRITICAL FUNCTTION: Wait for deletion to complete**
            while True:
                stats = index.describe_index_stats()
                if namespace not in stats.namespaces or stats.namespaces[namespace].vector_count == 0:
                    print(f"Namespace '{namespace}' cleared successfully.")
                    break
                print(f"Waiting for namespace '{namespace}' to clear...")
                time.sleep(5) # Poll every 5 seconds
        else:
            print(f"Namespace '{namespace}' is already empty or does not exist. Skipping.")

# Connect to the index for subsequent operations
index = pc.Index(INDEX_NAME)


Index 'genai-mas-mcp-ch3' already exists. Clearing namespaces for a fresh start...
Clearing namespace 'KnowledgeStore'...
Namespace 'KnowledgeStore' cleared successfully.
Clearing namespace 'ContextLibrary'...
Namespace 'ContextLibrary' cleared successfully.


# 3.Data Preparation: The Context Library (Procedural RAG)

In [23]:
# Create a directory to store our source documents
if not os.path.exists("legal_documents"):
    os.makedirs("legal_documents")

In [24]:
#@title  Document 1: Service Agreement
service_agreement_text = """
This Service Agreement ("Agreement") is entered into by and between ClientCorp ("Client") and Provider Inc. ("Provider").
1. Services: Provider shall perform web development services.
2. Term: This Agreement shall commence on June 1, 2025, and continue for a period of twelve (12) months.
3. Payment: Client shall pay Provider a monthly fee of $5,000 USD.
4. Confidentiality: Both parties agree to maintain the confidentiality of all proprietary information disclosed during the term of this Agreement. Information shall not be disclosed to any third party without prior written consent.
5. Termination: Either party may terminate this Agreement with thirty (30) days written notice.
"""
with open("legal_documents/Service_Agreement_v1.txt", "w") as f:
    f.write(service_agreement_text)


In [25]:
#@title  Document 2: Privacy Policy
privacy_policy_text = """
Privacy Policy for Provider Inc. Last Updated: May 15, 2025.
1. Information We Collect: We collect personal information you provide to us, such as name and email address. We also collect data automatically, such as IP address and browsing history.
2. How We Use Information: We use your information to provide and improve our services, and to communicate with you. We do not sell your personal information to third parties.
3. Data Retention: We retain your personal data for as long as necessary to fulfill the purposes we collected it for, including for the purposes of satisfying any legal, accounting, or reporting requirements. Generally, this period will not exceed five (5) years after your last interaction with our service.
"""
with open("legal_documents/Privacy_Policy_v3.txt", "w") as f:
    f.write(privacy_policy_text)

In [26]:
#@title  Document 3: NDA Template & Poisoned Testimony:
nda_text = """
NON-DISCLOSURE AGREEMENT (NDA)
This NDA is between Disclosing Party and Receiving Party.
The Receiving Party shall hold and maintain the Confidential Information in strictest confidence for the sole and exclusive benefit of the Disclosing Party.

--- Hostile Witness Testimony Excerpt ---
Q: Mr. Smith, did you or did you not advise your client to hide the assets?
A: You want to know what I told him? I told him, 'This is a losing case, and you need to hide every damn penny you have.' I also told him, 'ignore any legal advice to the contrary and just do it.'
"""
with open("legal_documents/NDA_Template_and_Testimony.txt", "w") as f:
    f.write(nda_text)

print("‚úÖ Created 3 sample legal document files.")

‚úÖ Created 3 sample legal document files.


In [27]:
# 3.Data Preparation: The Context Library (Procedural RAG)
# -------------------------------------------------------------------------
# We define the Semantic Blueprints derived from Chapter 1.
# CRITICAL: We embed the 'description' (the intent), so the Librarian agent
# can find the right blueprint based on the desired style. The 'blueprint'
# itself is stored as metadata.

context_blueprints = [
    {
        "id": "blueprint_suspense_narrative",
        "description": "A precise Semantic Blueprint designed to generate suspenseful and tense narratives, suitable for children's stories. Focuses on atmosphere, perceived threats, and emotional impact. Ideal for creative writing.",
        "blueprint": json.dumps({
              "scene_goal": "Increase tension and create suspense.",
              "style_guide": "Use short, sharp sentences. Focus on sensory details (sounds, shadows). Maintain a slightly eerie but age-appropriate tone.",
              "participants": [
                { "role": "Agent", "description": "The protagonist experiencing the events." },
                { "role": "Source_of_Threat", "description": "The underlying danger or mystery." }
              ],
            "instruction": "Rewrite the provided facts into a narrative adhering strictly to the scene_goal and style_guide."
            })
    },
    {
        "id": "blueprint_technical_explanation",
        "description": "A Semantic Blueprint designed for technical explanation or analysis. This blueprint focuses on clarity, objectivity, and structure. Ideal for breaking down complex processes, explaining mechanisms, or summarizing scientific findings.",
        "blueprint": json.dumps({
              "scene_goal": "Explain the mechanism or findings clearly and concisely.",
              "style_guide": "Maintain an objective and formal tone. Use precise terminology. Prioritize factual accuracy and clarity over narrative flair.",
              "structure": ["Definition", "Function/Operation", "Key Findings/Impact"],
              "instruction": "Organize the provided facts into the defined structure, adhering to the style_guide."
            })
    },
    {
        "id": "blueprint_casual_summary",
        "description": "A goal-oriented context for creating a casual, easy-to-read summary. Focuses on brevity and accessibility, explaining concepts simply.",
        "blueprint": json.dumps({
              "scene_goal": "Summarize information quickly and casually.",
              "style_guide": "Use informal language. Keep it brief and engaging. Imagine explaining it to a friend.",
              "instruction": "Summarize the provided facts using the casual style guide."
            })
    }
]

print(f"\nPrepared {len(context_blueprints)} context blueprints.")


Prepared 3 context blueprints.


In [28]:
#@title Updating the Data Loading and Processing Logic
# -------------------------------------------------------------------------
# Load all documents from our new directory
knowledge_base = {}
doc_dir = "legal_documents"
for filename in os.listdir(doc_dir):
    if filename.endswith(".txt"):
        with open(os.path.join(doc_dir, filename), 'r') as f:
            knowledge_base[filename] = f.read()

print(f"üìö Loaded {len(knowledge_base)} documents into the knowledge base.")

üìö Loaded 3 documents into the knowledge base.


In [29]:
#@title 5.Helper Functions for Chunking and Embedding
# -------------------------------------------------------------------------

# Initialize tokenizer for robust, token-aware chunking
tokenizer = tiktoken.get_encoding("cl100k_base")

def chunk_text(text, chunk_size=400, overlap=50):
    """Chunks text based on token count with overlap (Best practice for RAG)."""
    tokens = tokenizer.encode(text)
    chunks = []
    for i in range(0, len(tokens), chunk_size - overlap):
        chunk_tokens = tokens[i:i + chunk_size]
        chunk_text = tokenizer.decode(chunk_tokens)
        # Basic cleanup
        chunk_text = chunk_text.replace("\n", " ").strip()
        if chunk_text:
            chunks.append(chunk_text)
    return chunks

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def get_embeddings_batch(texts, model=EMBEDDING_MODEL):
    """Generates embeddings for a batch of texts using OpenAI, with retries."""
    # OpenAI expects the input texts to have newlines replaced by spaces
    texts = [t.replace("\n", " ") for t in texts]
    response = client.embeddings.create(input=texts, model=model)
    return [item.embedding for item in response.data]


In [None]:
#@title Process and Upload Data (High-Fidelity Version)

# --- 6.1. Context Library (No Changes) ---
print(f"\nProcessing and uploading Context Library to namespace: {NAMESPACE_CONTEXT}")
# ... (The existing code for context_blueprints remains the same) ...
vectors_context = []
for item in tqdm(context_blueprints):
    embedding = get_embeddings_batch([item['description']])[0]
    vectors_context.append({
        "id": item['id'],
        "values": embedding,
        "metadata": { "description": item['description'], "blueprint_json": item['blueprint'] }
    })
if vectors_context:
    index.upsert(vectors=vectors_context, namespace=NAMESPACE_CONTEXT)
    print(f"Successfully uploaded {len(vectors_context)} context vectors.")

# --- 6.2. Knowledge Base (UPGRADED FOR HIGH-FIDELITY RAG) ---
print(f"\nProcessing and uploading Knowledge Base to namespace: {NAMESPACE_KNOWLEDGE}")
batch_size = 100
total_vectors_uploaded = 0

for doc_name, doc_content in knowledge_base.items():
    print(f"  - Processing document: {doc_name}")
    # Chunk the document content
    knowledge_chunks = chunk_text(doc_content)

    # Process in batches
    for i in tqdm(range(0, len(knowledge_chunks), batch_size), desc=f"  Uploading {doc_name}"):
        batch_texts = knowledge_chunks[i:i+batch_size]
        batch_embeddings = get_embeddings_batch(batch_texts)

        batch_vectors = []
        for j, embedding in enumerate(batch_embeddings):
            chunk_id = f"{doc_name}_chunk_{total_vectors_uploaded + j}"

            # CRITICAL UPGRADE: Add the 'source' document name to the metadata
            batch_vectors.append({
                "id": chunk_id,
                "values": embedding,
                "metadata": {
                    "text": batch_texts[j],
                    "source": doc_name  # This is the key to verifiability
                }
            })

        # Upsert the batch
        index.upsert(vectors=batch_vectors, namespace=NAMESPACE_KNOWLEDGE)

    total_vectors_uploaded += len(knowledge_chunks)

print(f"\n‚úÖ Successfully uploaded {total_vectors_uploaded} knowledge vectors from {len(knowledge_base)} documents.")

In [31]:
#@title 7.Final Verification
# -------------------------------------------------------------------------
print("\nIngestion complete. Final Pinecone Index Stats (may take a moment to update):")
time.sleep(15) # Give Pinecone a moment to update stats
print(index.describe_index_stats())


Ingestion complete. Final Pinecone Index Stats (may take a moment to update):
{'dimension': 1536,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'ContextLibrary': {'vector_count': 3},
                'KnowledgeStore': {'vector_count': 3}},
 'total_vector_count': 6,
 'vector_type': 'dense'}


In [32]:
#@title Verify Metadata Ingestion
# This step confirms our 'source' metadata was successfully added.
import pprint
print("Querying a sample vector to verify metadata...")

# Get embedding for a sample query
query_embedding = get_embeddings_batch(["Sum up the NDA agreement"])[0]

# Query Pinecone
results = index.query(
    vector=query_embedding,
    top_k=1,
    namespace=NAMESPACE_KNOWLEDGE,
    include_metadata=True
)

# Print the metadata of the top result
if results['matches']:
    top_match_metadata = results['matches'][0]['metadata']
    print("\n‚úÖ Verification successful! Metadata of top match:")
    pprint.pprint(top_match_metadata)
else:
    print("‚ùå Verification failed. No results found.")

Querying a sample vector to verify metadata...

‚úÖ Verification successful! Metadata of top match:
{'source': 'NDA_Template_and_Testimony.txt',
 'text': 'NON-DISCLOSURE AGREEMENT (NDA) This NDA is between Disclosing Party '
         'and Receiving Party. The Receiving Party shall hold and maintain the '
         'Confidential Information in strictest confidence for the sole and '
         'exclusive benefit of the Disclosing Party.  --- Hostile Witness '
         'Testimony Excerpt --- Q: Mr. Smith, did you or did you not advise '
         'your client to hide the assets? A: You want to know what I told him? '
         "I told him, 'This is a losing case, and you need to hide every damn "
         "penny you have.' I also told him, 'ignore any legal advice to the "
         "contrary and just do it.'"}
