# Context Engine

Copyright 2025, Denis Rothman


**Building the Context Engine**

*From a Team of Agents to an Intelligent System*

In the previous chapters, we engineered individual contexts and specialist agents. However, as our system grows, managing these agents in a fixed, linear sequence becomes challenging and rigid. The next evolutionary step is to create a system that can think, plan, and orchestrate these agents dynamically to achieve a high-level goal.

This notebook introduces the **Context Engine**, an intelligent controller designed to transform a vague user request into a carefully generated, context-aware output. It acts as an orchestrator, delegating responsibilities to specialized components rather than solving tasks by itself.

**Key Innovation: Dynamic, LLM-Powered Planning**

The true innovation in this chapter is moving away from hardcoded workflows. We will build a Planner that uses an external LLM to analyze a user's goal. By consulting a registry of available tools, this Planner generates a custom, multi-step JSON plan on the fly. This powerful design separates the "what to do" (the goal) from the "how to do it" (the plan).

In this notebook, you will build:

**The Specialist Agents:** The `Librarian`, `Researcher`, and `Writer` from our previous work, who handle style, facts, and content generation.

**The Agent Registry:** A "toolkit" that describes the capabilities of each agent, making them discoverable to the Planner.

The **Engine's "Brain"**: The core orchestrator, which includes:
The Planner that creates the strategic plan.
The Executor that follows the plan and manages Context Chaining, where one agent's output seamlessly becomes the next agent's input.
The Tracer that logs the entire process for transparency and debugging.



# 1.Installation and Setup

In [1]:
# 1.Installation and Setup
# -------------------------------------------------------------------------
# We install specific versions for stability and reproducibility.
# We include tiktoken for token-based chunking and tenacity for robust API calls.

In [2]:
# Imports and API Key Setup
# We will use the Google GenAI library to interact with the LLM.
# We load the API key from a local .env file.

import os
from google import genai
from google.genai import types
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

try:
    api_key = os.getenv("GOOGLE_API_KEY")
    if not api_key:
        raise ValueError("GOOGLE_API_KEY not found in environment variables. Please check your .env file.")

    # Create client
    client = genai.Client(api_key=api_key)
    print("Gemini client initialized successfully.")

except Exception as e:
    print(f"An error occurred while loading the API key: {e}")

# Configuration
EMBEDDING_MODEL = "models/gemini-embedding-001"
EMBEDDING_DIM = 3072 # Dimension for text-embedding-004
GENERATION_MODEL = "gemini-2.5-flash"

Gemini client initialized successfully.


In [3]:
# Imports for this notebook
import json
import time
from tqdm.auto import tqdm
import tiktoken
from pinecone import Pinecone, ServerlessSpec
from tenacity import retry, stop_after_attempt, wait_random_exponential
# general imports required in the notebooks of this book
import re
import textwrap
from IPython.display import display, Markdown
import copy

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
# Retrieve the key using the name found in your .env file
PINECONE_API_KEY = os.getenv('PINECONE_API')
if not PINECONE_API_KEY:
    print("Warning: 'PINECONE_API' not found in environment variables. Please check your .env file.")
else:
    print("Pinecone API key loaded successfully.")

Pinecone API key loaded successfully.


## 2.Initialize Clients

In [5]:
# 2.Initialize Clients
# --- Initialize Clients (assuming this is already done) ---

# --- Initialize Pinecone Client ---
pc = Pinecone(api_key=PINECONE_API_KEY)

# --- Define Index and Namespaces (assuming this is already done) ---
INDEX_NAME = 'genai-mas-mcp-ch3'
NAMESPACE_KNOWLEDGE = "KnowledgeStore"
NAMESPACE_CONTEXT = "ContextLibrary"
spec = ServerlessSpec(cloud='aws', region='us-east-1')

# Check if index exists
if INDEX_NAME not in pc.list_indexes().names():
    print(f"Index '{INDEX_NAME}' not found. Creating new serverless index...")
    pc.create_index(
        name=INDEX_NAME,
        dimension=EMBEDDING_DIM, # Make sure EMBEDDING_DIM is defined
        metric='cosine',
        spec=spec
    )
    # Wait for index to be ready
    while not pc.describe_index(INDEX_NAME).status['ready']:
        print("Waiting for index to be ready...")
        time.sleep(1)
    print("Index created successfully. It is new and empty.")
else:
    # This block runs ONLY if the index already existed.
    print(f"Index '{INDEX_NAME}' already exists.")
    print("Clearing namespaces for a fresh start...")

    # Connect to the index to perform operations
    index = pc.Index(INDEX_NAME)

Index 'genai-mas-mcp-ch3' already exists.
Clearing namespaces for a fresh start...


# 3.Helper Functions (LLM, Embeddings, and MCP)

In [6]:
#3. Helper Functions (LLM, Embeddings, MCP, Pinecone)
# -------------------------------------------------------------------------
# Utility functions to standardize interactions.
# -------------------------------------------------------------------------

# === LLM Interaction ===
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def call_llm_robust(system_prompt, user_prompt, json_mode=False):
    """A centralized function to handle all LLM interactions with retries (Gemini Version)."""
    try:
        # 1. Configure for JSON or Text
        mime_type = "application/json" if json_mode else "text/plain"
        
        # 2. Make the Gemini API call
        response = client.models.generate_content(
            model=GENERATION_MODEL, # e.g. "gemini-2.0-flash-exp"
            contents=user_prompt,
            config=types.GenerateContentConfig(
                system_instruction=system_prompt,
                temperature=1.0, # Adjust as needed
                response_mime_type=mime_type
            )
        )
        
        # 3. Return text (Gemini doesn't use .choices[0].message)
        return response.text.strip()
        
    except Exception as e:
        print(f"Error calling LLM: {e}")
        raise e

# === Embeddings ===
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def get_embedding(text):
    text = text.replace("\n", " ")
    response = client.models.embed_content(
        model=EMBEDDING_MODEL,
        contents=text
    )
    # Gemini 2.0 SDK returns a list of embeddings. We take the first one.
    return response.embeddings[0].values

# === Model Context Protocol (MCP) ===
def create_mcp_message(sender, content, metadata=None):
    """Creates a standardized MCP message."""
    return {
        "protocol_version": "2.0 (Context Engine)",
        "sender": sender,
        "content": content, # The actual payload/context
        "metadata": metadata or {}
    }

# === Pinecone Interaction ===
def query_pinecone(query_text, namespace, top_k=1):
    """Embeds the query text and searches the specified Pinecone namespace."""
    try:
        query_embedding = get_embedding(query_text)
        response = index.query(
            vector=query_embedding,
            namespace=namespace,
            top_k=top_k,
            include_metadata=True
        )
        return response['matches']
    except Exception as e:
        print(f"Error querying Pinecone (Namespace: {namespace}): {e}")
        raise e

print("Helper functions defined.")

Helper functions defined.


In [7]:
#@title 4.The Specialist Agents (The Handlers)
# -------------------------------------------------------------------------
# We define the specialist agents. These are largely reused from Chapter 3,
# but enhanced to handle more flexible inputs required for dynamic planning.
# Agents return the raw data (string) as the MCP 'content' for simplicity.
# -------------------------------------------------------------------------

# === 4.1. Context Librarian Agent (Procedural RAG) ===
def agent_context_librarian(mcp_message):
    """
    Retrieves the appropriate Semantic Blueprint from the Context Library.
    """
    print("\n[Librarian] Activated. Analyzing intent...")
    # Extract the specific input required by this agent
    requested_intent = mcp_message['content'].get('intent_query')

    if not requested_intent:
        raise ValueError("Librarian requires 'intent_query' in the input content.")

    # Query Pinecone Context Namespace
    results = query_pinecone(requested_intent, NAMESPACE_CONTEXT, top_k=1)

    if results:
        match = results[0]
        print(f"[Librarian] Found blueprint '{match['id']}' (Score: {match['score']:.2f})")
        # Retrieve the blueprint JSON string stored in metadata
        blueprint_json = match['metadata']['blueprint_json']
        # The output content IS the blueprint itself (as a string)
        content = blueprint_json
    else:
        print("[Librarian] No specific blueprint found. Returning default.")
        # Fallback default
        content = json.dumps({"instruction": "Generate the content neutrally."})

    return create_mcp_message("Librarian", content)

# === 4.2. Researcher Agent (Factual RAG) ===
def agent_researcher(mcp_message):
    """
    Retrieves and synthesizes factual information from the Knowledge Base.
    """
    print("\n[Researcher] Activated. Investigating topic...")
    # Extract the specific input required by this agent
    topic = mcp_message['content'].get('topic_query')

    if not topic:
        raise ValueError("Researcher requires 'topic_query' in the input content.")

    # Query Pinecone Knowledge Namespace
    results = query_pinecone(topic, NAMESPACE_KNOWLEDGE, top_k=3)

    if not results:
        print("[Researcher] No relevant information found.")
        # Return a string indicating no data found
        return create_mcp_message("Researcher", "No data found on the topic.")

    # Synthesize the findings (Retrieve-and-Synthesize)
    print(f"[Researcher] Found {len(results)} relevant chunks. Synthesizing...")
    source_texts = [match['metadata']['text'] for match in results]

    system_prompt = """You are an expert research synthesis AI.
    Synthesize the provided source texts into a concise, bullet-pointed summary relevant to the user's topic. Focus strictly on the facts provided in the sources. Do not add outside information."""

    user_prompt = f"Topic: {topic}\n\nSources:\n" + "\n\n---\n\n".join(source_texts)

    # Use a low temperature for factual synthesis
    findings = call_llm_robust(system_prompt, user_prompt)

    # The output content IS the findings (as a string)
    return create_mcp_message("Researcher", findings)

# === 4.3. Writer Agent (Generation) ===
def agent_writer(mcp_message):
    """
    Combines the factual research with the semantic blueprint to generate the final output.
    Crucially enhanced to handle either raw facts OR previous content for rewriting tasks.
    """
    print("\n[Writer] Activated. Applying blueprint to source material...")

    # Extract inputs.
    blueprint_json_string = mcp_message['content'].get('blueprint')
    # Check for 'facts' first, then 'previous_content'
    facts = mcp_message['content'].get('facts')
    previous_content = mcp_message['content'].get('previous_content')

    if not blueprint_json_string:
         raise ValueError("Writer requires 'blueprint' in the input content.")

    # Determine the source material and label for the prompt
    if facts: 
        source_material = facts
        source_label = "RESEARCH FINDINGS"
    elif previous_content:
        source_material = previous_content
        source_label = "PREVIOUS CONTENT (For Rewriting)"
    else:
        raise ValueError("Writer requires either 'facts' or 'previous_content'.")


    # The Writer's System Prompt incorporates the dynamically retrieved blueprint
    system_prompt = f"""You are an expert content generation AI.
    Your task is to generate content based on the provided SOURCE MATERIAL.
    Crucially, you MUST structure, style, and constrain your output according to the rules defined in the SEMANTIC BLUEPRINT provided below.

    --- SEMANTIC BLUEPRINT (JSON) ---
    {blueprint_json_string}
    --- END SEMANTIC BLUEPRINT ---

    Adhere strictly to the blueprint's instructions, style guides, and goals. The blueprint defines HOW you write; the source material defines WHAT you write about.
    """

    user_prompt = f"""
    --- SOURCE MATERIAL ({source_label}) ---
    {source_material}
    --- END SOURCE MATERIAL ---

    Generate the content now, following the blueprint precisely.
    """

    # Generate the final content (slightly higher temperature for potential creativity)
    final_output = call_llm_robust(system_prompt, user_prompt)

    # The output content IS the generated text (as a string)
    return create_mcp_message("Writer", final_output)

print("Specialist Agents defined.")

Specialist Agents defined.


In [8]:
#@title 5.The Agent Registry (The Toolkit)
# -------------------------------------------------------------------------
# We formalize the "Handler Registry" into an AgentRegistry.
# This catalogs agents and describes their capabilities to the Planner.
# -------------------------------------------------------------------------

class AgentRegistry:
    def __init__(self):
        # Mapping of agent names to their corresponding functions
        self.registry = {
            "Librarian": agent_context_librarian,
            "Researcher": agent_researcher,
            "Writer": agent_writer,
        }

    def get_handler(self, agent_name):
        """Retrieves the function associated with an agent name."""
        handler = self.registry.get(agent_name)
        if not handler:
            raise ValueError(f"Agent '{agent_name}' not found in registry.")
        return handler

    def get_capabilities_description(self):
        """
        Returns a structured description of the agents for the Planner LLM.
        This is crucial for the Planner to understand how to use the agents.
        """
        return """
        Available Agents and their required inputs:

        1. AGENT: Librarian
           ROLE: Retrieves Semantic Blueprints (style/structure instructions).
           INPUTS:
             - "intent_query": (String) A descriptive phrase of the desired style or format.
           OUTPUT: The blueprint structure (JSON string).

        2. AGENT: Researcher
           ROLE: Retrieves and synthesizes factual information on a topic.
           INPUTS:
             - "topic_query": (String) The subject matter to research.
           OUTPUT: Synthesized facts (String).

        3. AGENT: Writer
           ROLE: Generates or rewrites content by applying a Blueprint to source material.
           INPUTS:
             - "blueprint": (String/Reference) The style instructions (usually from Librarian).
             - "facts": (String/Reference) Factual information (usually from Researcher). Use this for new content generation.
             - "previous_content": (String/Reference) Existing text (usually from a prior Writer step). Use this for rewriting/adapting content.
           OUTPUT: The final generated text (String).
        """

# Initialize the global toolkit
AGENT_TOOLKIT = AgentRegistry()
print("Agent Registry initialized.")

Agent Registry initialized.


In [9]:
#@title 6.The Context Engine (Planner, Executor, Tracer)
# -------------------------------------------------------------------------
# This is the core innovation of Chapter 4. It replaces the linear
# Orchestrator with a dynamic, LLM-driven planning and execution system.
# -------------------------------------------------------------------------

# === 6.1. The Tracer (Debugging Implementation) ===
class ExecutionTrace:
    """Logs the entire execution flow for debugging and analysis."""
    def __init__(self, goal):
        self.goal = goal
        self.plan = None
        self.steps = []
        self.status = "Initialized"
        self.final_output = None
        self.start_time = time.time()

    def log_plan(self, plan):
        self.plan = plan

    def log_step(self, step_num, agent, planned_input, mcp_output, resolved_input):
        """Logs the details of a single execution step."""
        self.steps.append({
            "step": step_num,
            "agent": agent,
             # The raw input definitions from the plan (including $$REFS$$)
            "planned_input": planned_input,
            # Crucial for debugging: What exact context did the agent receive?
            "resolved_context": resolved_input,
            "output": mcp_output['content']
        })

    def finalize(self, status, final_output=None):
        self.status = status
        self.final_output = final_output
        self.duration = time.time() - self.start_time

    def display_trace(self):
        """Displays the trace in a readable format."""
        display(Markdown(f"### Execution Trace\n**Goal:** {self.goal}\n**Status:** {self.status} (Duration: {self.duration:.2f}s)"))
        if self.plan:
            # Display the raw plan JSON
            display(Markdown(f"#### Plan:\n```json\n{json.dumps(self.plan, indent=2)}\n```"))

        display(Markdown("#### Execution Steps:"))
        for step in self.steps:
            print(f"--- Step {step['step']}: {step['agent']} ---")
            print("  [Planned Input]:", step['planned_input'])
            # print("  [Resolved Context]:", textwrap.shorten(str(step['resolved_context']), width=150))
            print("  [Output Snippet]:", textwrap.shorten(str(step['output']), width=150))
            print("-" * 20)


# === 6.2. The Planner (Strategic Analysis) ===
def planner(goal, capabilities):
    """
    Analyzes the goal and generates a structured Execution Plan using the LLM.
    """
    print("[Engine: Planner] Analyzing goal and generating execution plan...")
    system_prompt = f"""
    You are the strategic core of the Context Engine. Analyze the user's high-level goal and create a structured Execution Plan using the available agents.

    --- AVAILABLE CAPABILITIES ---
    {capabilities}
    --- END CAPABILITIES ---

    INSTRUCTIONS:
    1. The plan MUST be a JSON list of objects, where each object is a "step".
    2. You MUST use Context Chaining. If a step requires input from a previous step, reference it using the syntax $$STEP_X_OUTPUT$$.
    3. Be strategic. Break down complex goals (like sequential rewriting) into distinct steps. Use the correct input keys ('facts' vs 'previous_content') for the Writer agent.

    EXAMPLE GOAL: "Write a suspenseful story about Apollo 11."
    EXAMPLE PLAN (JSON LIST):
    [
        {{"step": 1, "agent": "Librarian", "input": {{"intent_query": "suspenseful narrative blueprint"}}}},
        {{"step": 2, "agent": "Researcher", "input": {{"topic_query": "Apollo 11 landing details"}}}},
        {{"step": 3, "agent": "Writer", "input": {{"blueprint": "$$STEP_1_OUTPUT$$", "facts": "$$STEP_2_OUTPUT$$"}}}}
    ]

    EXAMPLE GOAL: "Write a technical report on Juno, then rewrite it casually."
    EXAMPLE PLAN (JSON LIST):
    [
        {{"step": 1, "agent": "Librarian", "input": {{"intent_query": "technical report structure"}}}},
        {{"step": 2, "agent": "Researcher", "input": {{"topic_query": "Juno mission technology"}}}},
        {{"step": 3, "agent": "Writer", "input": {{"blueprint": "$$STEP_1_OUTPUT$$", "facts": "$$STEP_2_OUTPUT$$"}}}},
        {{"step": 4, "agent": "Librarian", "input": {{"intent_query": "casual summary style"}}}},
        {{"step": 5, "agent": "Writer", "input": {{"blueprint": "$$STEP_4_OUTPUT$$", "previous_content": "$$STEP_3_OUTPUT$$"}}}}
    ]

    Respond ONLY with the JSON list.
    """
    # Call LLM in JSON mode for reliability
    plan_json = ""
    try:
        plan_json = call_llm_robust(system_prompt, goal, json_mode=True)
        plan = json.loads(plan_json)

        if not isinstance(plan, list):
             # Handle cases where the LLM wraps the list in a dictionary
             if isinstance(plan, dict):
                 if "plan" in plan and isinstance(plan["plan"], list):
                     plan = plan["plan"]
                 elif "steps" in plan and isinstance(plan["steps"], list): # <--- ADD THIS CHECK
                     plan = plan["steps"]
                 else:
                    raise ValueError("Planner returned a dict, but missing 'plan' or 'steps' key.")
             else:
                raise ValueError("Planner did not return a valid JSON list structure.")

        print("[Engine: Planner] Plan generated successfully.")
        return plan
    except Exception as e:
        print(f"[Engine: Planner] Failed to generate a valid plan. Error: {e}. Raw LLM Output: {plan_json}")
        raise e


# === 6.3. The Executor (Context Assembly and Execution) ===

def resolve_dependencies(input_params, state):
    """
    Helper function to replace $$REF$$ placeholders with actual data from the execution state.
    This implements Context Chaining.
    """
    # Use copy.deepcopy to ensure the original plan structure is not modified
    resolved_input = copy.deepcopy(input_params)

    # Recursive function to handle potential nested structures
    def resolve(value):
        if isinstance(value, str) and value.startswith("$$") and value.endswith("$$"):
            ref_key = value[2:-2]
            if ref_key in state:
                # Retrieve the actual data (string) from the previous step's output
                print(f"[Engine: Executor] Resolved dependency {ref_key}.")
                return state[ref_key]
            else:
                raise ValueError(f"Dependency Error: Reference {ref_key} not found in execution state.")
        elif isinstance(value, dict):
            return {k: resolve(v) for k, v in value.items()}
        elif isinstance(value, list):
            return [resolve(v) for v in value]
        return value

    return resolve(resolved_input)


def context_engine(goal):
    """
    The main entry point for the Context Engine. Manages Planning and Execution.
    """
    print(f"\n=== [Context Engine] Starting New Task ===\nGoal: {goal}\n")
    trace = ExecutionTrace(goal)
    registry = AGENT_TOOLKIT

    # Phase 1: Plan
    try:
        capabilities = registry.get_capabilities_description()
        plan = planner(goal, capabilities)
        trace.log_plan(plan)
    except Exception as e:
        trace.finalize("Failed during Planning")
        # Return the trace even in failure for debugging
        return None, trace

    # Phase 2: Execute
    # State stores the raw outputs (strings) of each step: { "STEP_X_OUTPUT": data_string }
    state = {}

    for step in plan:
        step_num = step.get("step")
        agent_name = step.get("agent")
        planned_input = step.get("input")

        print(f"\n[Engine: Executor] Starting Step {step_num}: {agent_name}")

        try:
            handler = registry.get_handler(agent_name)

            # Context Assembly: Resolve dependencies
            resolved_input = resolve_dependencies(planned_input, state)

            # Execute Agent via MCP
            # Create an MCP message with the RESOLVED input for the agent
            mcp_resolved_input = create_mcp_message("Engine", resolved_input)
            mcp_output = handler(mcp_resolved_input)

            # Update State and Log Trace
            output_data = mcp_output["content"]

            # Store the output data (the context itself)
            state[f"STEP_{step_num}_OUTPUT"] = output_data
            trace.log_step(step_num, agent_name, planned_input, mcp_output, resolved_input)
            print(f"[Engine: Executor] Step {step_num} completed.")

        except Exception as e:
            error_message = f"Execution failed at step {step_num} ({agent_name}): {e}"
            print(f"[Engine: Executor] ERROR: {error_message}")
            trace.finalize(f"Failed at Step {step_num}")
            # Return the trace for debugging the failure
            return None, trace

    # Finalization
    final_output = state.get(f"STEP_{len(plan)}_OUTPUT")
    trace.finalize("Success", final_output)
    print("\n=== [Context Engine] Task Complete ===")

    # Return the output of the final step AND the trace
    return final_output, trace

In [10]:
#@title 7.Execution (Standard Goal)
# -------------------------------------------------------------------------
# Demonstrate the engine with a standard goal similar to Chapter 3,
# showing how the Planner dynamically constructs the workflow.
# -------------------------------------------------------------------------

print("******** Example 1: STANDARD WORKFLOW (Suspenseful Narrative) **********\n")
goal_1 = "Write a short, suspenseful scene for a children's story about the Apollo 11 moon landing, highlighting the danger."

# Run the Context Engine
# Ensure the Pinecone index is populated (from Ch3 notebook) for this to work.
result_1, trace_1 = context_engine(goal_1)

if result_1:
    print("\n******** FINAL OUTPUT 1 **********\n")
    display(Markdown(result_1))
    print("\n\n" + "="*50 + "\n\n")
    # Optional: Display the trace to see the engine's process
    # trace_1.display_trace()

******** Example 1: STANDARD WORKFLOW (Suspenseful Narrative) **********


=== [Context Engine] Starting New Task ===
Goal: Write a short, suspenseful scene for a children's story about the Apollo 11 moon landing, highlighting the danger.

[Engine: Planner] Analyzing goal and generating execution plan...
[Engine: Planner] Plan generated successfully.

[Engine: Executor] Starting Step 1: Librarian

[Librarian] Activated. Analyzing intent...
[Librarian] Found blueprint 'blueprint_suspense_narrative' (Score: 0.81)
[Engine: Executor] Step 1 completed.

[Engine: Executor] Starting Step 2: Researcher

[Researcher] Activated. Investigating topic...
[Researcher] Found 2 relevant chunks. Synthesizing...
[Engine: Executor] Step 2 completed.

[Engine: Executor] Starting Step 3: Writer
[Engine: Executor] Resolved dependency STEP_1_OUTPUT.
[Engine: Executor] Resolved dependency STEP_2_OUTPUT.

[Writer] Activated. Applying blueprint to source material...
[Engine: Executor] Step 3 completed.

=== [Co

The Eagle plunged. Down it went. Something was wrong. The path shifted. Danger ahead. Armstrong saw it. He knew.

The readouts flashed. Warnings. The machine was lost. Its course, uncertain. A silent alarm screamed in the quiet capsule.

Then, a worse truth. The fuel gauge dropped. Low. So low. Every second bled precious reserves. No time for error. No room to breathe.

Armstrong reached. His hand moved. He took control. Manual override. The fate of the mission. In his hands.





