# Introduction

In this guide, we will walk you through building a powerful semantic search engine using [Couchbase](https://www.couchbase.com) as the backend database and [CrewAI](https://github.com/crewAIInc/crewAI) for agent-based RAG operations. CrewAI allows us to create specialized agents that can work together to handle different aspects of the RAG workflow, from document retrieval to response generation. This tutorial is designed to be beginner-friendly, with clear, step-by-step instructions that will equip you with the knowledge to create a fully functional semantic search system from scratch.

Before you start
---------------

1. Create and Deploy Your Free Tier Operational cluster on [Capella](https://cloud.couchbase.com/sign-up)
   - To get started with [Couchbase Capella](https://cloud.couchbase.com), create an account and use it to deploy
     a forever free tier operational cluster
   - This account provides you with an environment where you can explore and learn
     about Capella with no time constraint
   - To learn more, please follow the [Getting Started Guide](https://docs.couchbase.com/cloud/get-started/create-account.html)

2. Couchbase Capella Configuration
   When running Couchbase using Capella, the following prerequisites need to be met:
   - Create the database credentials to access the required bucket (Read and Write) used in the application
   - Allow access to the Cluster from the IP on which the application is running by following the [Network Security documentation](https://docs.couchbase.com/cloud/security/security.html#public-access)

# Viewing Your Crew's Performance in Arize AI

Once you've run your `process_query` function a few times (especially after the first run to allow Arize to set up the schema for your `MODEL_ID`), you can log into your Arize AI account to see the observability data.

**Steps to View Data:**

1.  **Log in to Arize AI:** Go to [app.arize.com](https://app.arize.com/) and log in.
2.  **Navigate to Your Model:**
    *   Find the Space you used (identified by your `ARIZE_SPACE_KEY`).
    *   Within that Space, you should see your model, which we named `crew-ai-rag-app` (or whatever `MODEL_ID` you configured). Click on it.
    *   If it's the first time sending data for this model, Arize might prompt you to **review and accept the schema**. This is where Arize understands the structure of your features (e.g., `query_text`) and predictions (e.g., `final_response`). Accept the schema to proceed. This might take a few minutes to appear after the first data is sent.

3.  **Explore the Data:**
    *   **Overview Tab:** Get a general summary of your model's performance, including prediction volumes and any detected drifts or issues (once you have more data).
    *   **Data Ingestion Tab:** Check the status of data being sent to Arize. You can see batches of data and any potential ingestion errors.
    *   **Prompt Playground & Tracing (for LLMs):** This is where the `ArizeCallbackHandler` data shines.
        *   You should be able to find individual traces for LLM calls made by your agents.
        *   Look for sections related to "Prompts" or "Traces".
        *   You can inspect the exact prompts sent to the LLM, the responses received, token counts, and latencies for each step in your agent's reasoning process.
        *   This is invaluable for debugging prompts, understanding agent behavior, and identifying performance bottlenecks.
    *   **Predictions Table:** View the explicit logs you sent using `arize_client.log()`. You'll see your `prediction_id`, `features` (like `query_text`), `prediction_label` (the `final_response`), and any `tags` you added (like `crew_execution_time_seconds`).
    *   **Embeddings (if applicable):** If you were logging embeddings (not explicitly covered in this basic setup but possible with Arize), you could visualize them here.

**What to Look For:**

*   **Prompt-Response Pairs:** Are the prompts what you expect? Are the LLM responses sensible?
*   **Latency:** How long is each LLM call taking? Where are the delays in your crew's execution?
*   **Token Usage:** Monitor token consumption to manage costs and stay within context limits.
*   **Errors:** The `ArizeCallbackHandler` should also log errors from LLM calls. Explicitly logged errors (like our `crew_error` example) will also appear.
*   **Overall Flow:** By correlating the `prediction_id` from your explicit log with the traces from the `ArizeCallbackHandler`, you can piece together the entire journey of a query through your CrewAI application.

Arize offers many more advanced features like drift detection, performance dashboards, and monitors. As you send more data, you'll be able to leverage these to get deeper insights into your CrewAI application's performance and behavior. Refer to the [Arize Documentation](https://docs.arize.com/) for more detailed guides on using the platform.

*(Optional: You could insert a placeholder here like `<!-- Placeholder for a screenshot of Arize UI -->` if you intend to add an image later)*

# Setting up Arize AI

To start sending your CrewAI data to Arize, you'll need two key pieces of information:

1.  **Arize API Key (`ARIZE_API_KEY`)**: This key authenticates your application when it sends data to Arize.
2.  **Arize Space Key (`ARIZE_SPACE_KEY`)**: This key identifies the specific space within your Arize account where the data will be logged.

**Steps to Get Your Keys:**

1.  **Sign Up for Arize AI:** If you don't have an account, go to the [Arize AI website](https://arize.com/signup/) and create one. They typically offer a free tier that's suitable for getting started.
2.  **Find Your API and Space Key:**
    *   Once logged in, navigate to the 'API Keys' section in your Arize account settings (the exact location might vary slightly, but it's usually under your organization or user settings).
    *   Here you will find your `API Key` and `Space Key`. Copy these values.

**Configuring the Keys:**

You need to make these keys available to your notebook environment. You can do this in a couple of ways:

*   **Environment Variables (Recommended):**
    Set the `ARIZE_API_KEY` and `ARIZE_SPACE_KEY` as environment variables in your system. If you're running this notebook locally, you can set them in your terminal before launching Jupyter, or add them to your `.bashrc` or `.zshrc` file.
    ```bash
    export ARIZE_API_KEY="YOUR_API_KEY"
    export ARIZE_SPACE_KEY="YOUR_SPACE_KEY"
    ```
    If you are using a `.env` file with `python-dotenv` (which is already installed in this notebook), you can add them there:
    ```
    ARIZE_API_KEY="YOUR_API_KEY"
    ARIZE_SPACE_KEY="YOUR_SPACE_KEY"
    ```
    Then, make sure `load_dotenv()` is called early in your script.

*   **Directly in the Notebook (Less Secure):**
    You can define these keys directly in a code cell. However, be cautious with this method, especially if you plan to share your notebook, as it exposes your keys.
    ```python
    # Not recommended for production or shared notebooks
    # ARIZE_API_KEY = "YOUR_API_KEY"
    # ARIZE_SPACE_KEY = "YOUR_SPACE_KEY"
    ```

In the next step, we'll use these keys to initialize the Arize client.

In [None]:
import os
from arize.api import ArizeClient
from dotenv import load_dotenv

# Load environment variables from .env file, if present
load_dotenv()

ARIZE_API_KEY = os.getenv("ARIZE_API_KEY")
ARIZE_SPACE_KEY = os.getenv("ARIZE_SPACE_KEY")

# It's good practice to check if the keys are loaded, especially before trying to use them.
if not ARIZE_API_KEY or not ARIZE_SPACE_KEY:
    print("🔴 Arize API Key or Space Key not found. Please set them as environment variables or in a .env file.")
    print("🔴 For now, Arize logging will be disabled.")
    arize_client = None
else:
    try:
        arize_client = ArizeClient(space_key=ARIZE_SPACE_KEY, api_key=ARIZE_API_KEY)
        print("✅ Arize client initialized successfully!")
        print("⚠️ If you are running this for the first time in a new space, make sure to send some data (e.g. by running a Crew) and then **accept the schema** in the Arize UI for the model 'crew-ai-rag-app'.")
    except Exception as e:
        print(f"🔴 Failed to initialize Arize client: {e}")
        arize_client = None

# Adding Observability with Arize AI

In complex AI systems like those built with CrewAI, understanding what's happening under the hood is crucial. This is where observability comes in. Arize AI is a powerful platform for ML observability, and it's particularly useful for applications involving Large Language Models (LLMs).

**Why is Observability Important for CrewAI?**

CrewAI orchestrates multiple agents, tools, and LLM calls. This can create a complex web of interactions. Observability helps you:

*   **Trace Execution:** Follow the flow of a query as it's processed by different agents and tools.
*   **Monitor Prompts & Responses:** See the exact prompts being sent to LLMs and the responses received. This is vital for debugging and improving prompt engineering.
*   **Evaluate Performance:** Track metrics like latency, token usage, and the quality of outputs.
*   **Detect Issues:** Quickly identify errors, bottlenecks, or unexpected behavior within your crew.
*   **Understand Agent Interactions:** Gain insights into how different agents in your crew are collaborating (or failing to collaborate).
*   **Ensure Data Quality:** For RAG systems, monitor the quality of retrieved documents and their impact on the final response.

By integrating Arize AI, you can gain these insights, leading to more robust, reliable, and performant CrewAI applications. In the following sections, we'll walk through how to set up Arize AI to monitor your CrewAI agents and tasks.

In [None]:
%pip install --quiet arize

# Setting the Stage: Installing Necessary Libraries

We'll install the following key libraries:
- `datasets`: For loading and managing our training data
- `langchain-couchbase`: To integrate Couchbase with LangChain for vector storage and caching
- `langchain-openai`: For accessing OpenAI's embedding and chat models
- `crewai`: To create and orchestrate our AI agents for RAG operations
- `python-dotenv`: For securely managing environment variables and API keys

These libraries provide the foundation for building a semantic search engine with vector embeddings,
database integration, and agent-based RAG capabilities.

In [1]:
%pip install --quiet datasets langchain-couchbase langchain-openai crewai python-dotenv

# Importing Necessary Libraries
The script starts by importing a series of libraries required for various tasks, including handling JSON, logging, time tracking, Couchbase connections, embedding generation, and dataset loading.

In [24]:
import getpass
import json
import logging
import os
import time
import uuid # Added for Arize logging prediction_id
from datetime import timedelta

from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.diagnostics import PingState, ServiceType
from couchbase.exceptions import (InternalServerFailureException,
                                  QueryIndexAlreadyExistsException,
                                  ServiceUnavailableException)
from couchbase.management.buckets import CreateBucketSettings
from couchbase.management.search import SearchIndex
from couchbase.options import ClusterOptions
from datasets import load_dataset
from dotenv import load_dotenv
from crewai.tools import tool # Added import
from langchain_couchbase.vectorstores import CouchbaseVectorStore
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

from crewai import Agent, Crew, Process, Task, LLM

# Setup Logging
Logging is configured to track the progress of the script and capture any errors or warnings.

In [25]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# Suppress httpx logging
logging.getLogger('httpx').setLevel(logging.CRITICAL)

# Loading Sensitive Informnation
In this section, we prompt the user to input essential configuration settings needed. These settings include sensitive information like database credentials, and specific configuration names. Instead of hardcoding these details into the script, we request the user to provide them at runtime, ensuring flexibility and security.

The script uses environment variables to store sensitive information, enhancing the overall security and maintainability of your code by avoiding hardcoded values.

In [26]:
# Load environment variables
load_dotenv()

CB_HOST = os.getenv('CB_HOST') or input("Enter Couchbase host (default: couchbase://localhost): ") or 'couchbase://localhost'
CB_USERNAME = os.getenv('CB_USERNAME') or input("Enter Couchbase username (default: Administrator): ") or 'Administrator'
CB_PASSWORD = os.getenv('CB_PASSWORD') or getpass.getpass("Enter Couchbase password (default: password): ") or 'password'
CB_BUCKET_NAME = os.getenv('CB_BUCKET_NAME') or input("Enter bucket name (default: vector-search-testing): ") or 'vector-search-testing'
INDEX_NAME = os.getenv('INDEX_NAME') or input("Enter index name (default: vector_search_crew): ") or 'vector_search_crew'
SCOPE_NAME = os.getenv('SCOPE_NAME') or input("Enter scope name (default: shared): ") or 'shared'
COLLECTION_NAME = os.getenv('COLLECTION_NAME') or input("Enter collection name (default: crew): ") or 'crew'

print("Configuration loaded successfully")

Configuration loaded successfully


In [27]:
NEBIUS_AI_KEY=os.getenv('NEBIUS_AI_KEY') or input("Enter your Nebius AI key: ")
if not NEBIUS_AI_KEY:
    raise ValueError("NEBIUS_AI_KEY is not set")

# Load environment variables
load_dotenv()

# Connecting to the Couchbase Cluster
Connecting to a Couchbase cluster is the foundation of our project. Couchbase will serve as our primary data store, handling all the storage and retrieval operations required for our semantic search engine. By establishing this connection, we enable our application to interact with the database, allowing us to perform operations such as storing embeddings, querying data, and managing collections. This connection is the gateway through which all data will flow, so ensuring it's set up correctly is paramount.

In [6]:
# Connect to Couchbase
try:
    auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
    options = ClusterOptions(auth)
    cluster = Cluster(CB_HOST, options)
    cluster.wait_until_ready(timedelta(seconds=5))
    print("Successfully connected to Couchbase")
except Exception as e:
    print(f"Failed to connect to Couchbase: {str(e)}")
    raise

Successfully connected to Couchbase


## Setting Up Collections in Couchbase

The setup_collection() function handles creating and configuring the hierarchical data organization in Couchbase:

1. Bucket Creation:
   - Checks if specified bucket exists, creates it if not
   - Sets bucket properties like RAM quota (1024MB) and replication (disabled)
   - Note: If you are using Capella, create a bucket manually called vector-search-testing(or any name you prefer) with the same properties.

2. Scope Management:  
   - Verifies if requested scope exists within bucket
   - Creates new scope if needed (unless it's the default "_default" scope)

3. Collection Setup:
   - Checks for collection existence within scope
   - Creates collection if it doesn't exist
   - Waits 2 seconds for collection to be ready

Additional Tasks:
- Creates primary index on collection for query performance
- Clears any existing documents for clean state
- Implements comprehensive error handling and logging

The function is called twice to set up:
1. Main collection for vector embeddings
2. Cache collection for storing results


In [8]:
def setup_collection(cluster, bucket_name, scope_name, collection_name):
    try:
        # Check if bucket exists, create if it doesn't
        try:
            bucket = cluster.bucket(bucket_name)
            logging.info(f"Bucket '{bucket_name}' exists.")
        except Exception as e:
            logging.info(f"Bucket '{bucket_name}' does not exist. Creating it...")
            bucket_settings = CreateBucketSettings(
                name=bucket_name,
                bucket_type='couchbase',
                ram_quota_mb=1024,
                flush_enabled=True,
                num_replicas=0
            )
            cluster.buckets().create_bucket(bucket_settings)
            bucket = cluster.bucket(bucket_name)
            logging.info(f"Bucket '{bucket_name}' created successfully.")

        bucket_manager = bucket.collections()

        # Check if scope exists, create if it doesn't
        scopes = bucket_manager.get_all_scopes()
        scope_exists = any(scope.name == scope_name for scope in scopes)

        if not scope_exists and scope_name != "_default":
            logging.info(f"Scope '{scope_name}' does not exist. Creating it...")
            bucket_manager.create_scope(scope_name)
            logging.info(f"Scope '{scope_name}' created successfully.")

        # Check if collection exists, create if it doesn't
        collections = bucket_manager.get_all_scopes()
        collection_exists = any(
            scope.name == scope_name and collection_name in [col.name for col in scope.collections]
            for scope in collections
        )

        if not collection_exists:
            logging.info(f"Collection '{collection_name}' does not exist. Creating it...")
            bucket_manager.create_collection(scope_name, collection_name)
            logging.info(f"Collection '{collection_name}' created successfully.")
        else:
            logging.info(f"Collection '{collection_name}' already exists. Skipping creation.")

        # Wait for collection to be ready
        collection = bucket.scope(scope_name).collection(collection_name)
        time.sleep(2)  # Give the collection time to be ready for queries

        # Ensure primary index exists
        try:
            cluster.query(f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{bucket_name}`.`{scope_name}`.`{collection_name}`").execute()
            logging.info("Primary index present or created successfully.")
        except Exception as e:
            logging.warning(f"Error creating primary index: {str(e)}")

        # Clear all documents in the collection
        try:
            query = f"DELETE FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`"
            cluster.query(query).execute()
            logging.info("All documents cleared from the collection.")
        except Exception as e:
            logging.warning(f"Error while clearing documents: {str(e)}. The collection might be empty.")

        return collection
    except Exception as e:
        raise RuntimeError(f"Error setting up collection: {str(e)}")

setup_collection(cluster, CB_BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME)


<couchbase.collection.Collection at 0x7edc3b80ced0>

# Configuring and Initializing Couchbase Vector Search Index for Semantic Document Retrieval

Semantic search requires an efficient way to retrieve relevant documents based on a user's query. This is where the Couchbase Vector Search Index comes into play. In this step, we load the Vector Search Index definition from a JSON file, which specifies how the index should be structured. This includes the fields to be indexed, the dimensions of the vectors, and other parameters that determine how the search engine processes queries based on vector similarity.

This CrewAI vector search index configuration requires specific default settings to function properly. This tutorial uses the bucket named `vector-search-testing` with the scope `shared` and collection `crew`. The configuration is set up for vectors with exactly `1536 dimensions`, using `dot product` similarity and optimized for `recall`. If you want to use a different bucket, scope, or collection, you will need to modify the index configuration accordingly.

For more information on creating a vector search index, please follow the instructions at [Couchbase Vector Search Documentation](https://docs.couchbase.com/cloud/vector-search/create-vector-search-index-ui.html).

In [34]:
# Load index definition
try:
    with open('crew_index.json', 'r') as file:
        index_definition = json.load(file)
except FileNotFoundError as e:
    print(f"Error: crew_index.json file not found: {str(e)}")
    raise
except json.JSONDecodeError as e:
    print(f"Error: Invalid JSON in crew_index.json: {str(e)}")
    raise
except Exception as e:
    print(f"Error loading index definition: {str(e)}")
    raise

# Creating or Updating Search Indexes

With the index definition loaded, the next step is to create or update the **Vector Search Index** in Couchbase. This step is crucial because it optimizes our database for vector similarity search operations, allowing us to perform searches based on the semantic content of documents rather than just keywords. By creating or updating a Vector Search Index, we enable our search engine to handle complex queries that involve finding semantically similar documents using vector embeddings, which is essential for a robust semantic search engine.

In [35]:
try:
    scope_index_manager = cluster.bucket(CB_BUCKET_NAME).scope(SCOPE_NAME).search_indexes()

    # Check if index already exists
    existing_indexes = scope_index_manager.get_all_indexes()
    index_name = index_definition["name"]

    if index_name in [index.name for index in existing_indexes]:
        logging.info(f"Index '{index_name}' found")
    else:
        logging.info(f"Creating new index '{index_name}'...")

    # Create SearchIndex object from JSON definition
    search_index = SearchIndex.from_json(index_definition)

    # Upsert the index (create if not exists, update if exists)
    scope_index_manager.upsert_index(search_index)
    logging.info(f"Index '{index_name}' successfully created/updated.")

except QueryIndexAlreadyExistsException:
    logging.info(f"Index '{index_name}' already exists. Skipping creation/update.")
except ServiceUnavailableException:
    raise RuntimeError("Search service is not available. Please ensure the Search service is enabled in your Couchbase cluster.")
except InternalServerFailureException as e:
    logging.error(f"Internal server error: {str(e)}")
    raise

# Creating Embeddings with Nebius AI

In [30]:
from typing import Any, Dict, List, Optional
import os
from openai import OpenAI
from langchain_core.embeddings import Embeddings


class NebiusE5MistralEmbeddings(Embeddings):
    """Nebius E5 Mistral 7B embedding model integration for LangChain.

    Key init args:
        model: str
            Name of Nebius model to use, defaults to "intfloat/e5-mistral-7b-instruct"
        api_key: Optional[str]
            API key for Nebius. If not provided, will look for NEBIUS_API_KEY env var.
        base_url: str
            Base URL for Nebius API, defaults to "https://api.studio.nebius.com/v1/"
        client: Optional[OpenAI]
            Pre-configured OpenAI client. If provided, other connection params are ignored.

    Instantiate:
        .. code-block:: python

            from nebius_embeddings import NebiusE5MistralEmbeddings

            # Basic usage with API key in environment variable
            embed = NebiusE5MistralEmbeddings()

            # Or with explicit parameters
            embed = NebiusE5MistralEmbeddings(
                model="intfloat/e5-mistral-7b-instruct",
                api_key="your-api-key"
            )

    Embed single text:
        .. code-block:: python

            input_text = "The meaning of life is 42"
            embedding = embed.embed_query(input_text)
            # Returns a list of floats representing the embedding

    Embed multiple texts:
        .. code-block:: python

            input_texts = ["Document 1...", "Document 2..."]
            embeddings = embed.embed_documents(input_texts)
            # Returns a list of lists of floats, each representing a document embedding
    """

    def __init__(
        self,
        model: str = "intfloat/e5-mistral-7b-instruct",
        api_key: Optional[str] = None,
        base_url: str = "https://api.studio.nebius.com/v1/",
        client: Optional[Any] = None,
        **kwargs: Any,
    ):
        """Initialize the Nebius E5 Mistral embeddings.

        Args:
            model: The model to use for embeddings
            api_key: The API key to use. If not provided, will look for NEBIUS_API_KEY env var
            base_url: The base URL for the Nebius API
            client: Pre-configured OpenAI client. If provided, other connection params are ignored
            **kwargs: Additional parameters to pass to the client
        """
        self.model = model

        if client is not None:
            self.client = client
        else:
            api_key = api_key or os.environ.get("NEBIUS_API_KEY")
            if not api_key:
                raise ValueError(
                    "Nebius API key must be provided as an argument or set as environment "
                    "variable NEBIUS_API_KEY"
                )

            self.client = OpenAI(
                base_url=base_url,
                api_key=api_key,
                **kwargs,
            )

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """Embed a list of documents using the Nebius API.

        Args:
            texts: The list of texts to embed

        Returns:
            List of embeddings, one for each text
        """
        # Handle empty inputs
        if not texts:
            return []

        # Process in batch
        embeddings = []
        for text in texts:
            response = self.client.embeddings.create(
                model=self.model,
                input=text,
            )
            embedding = response.data[0].embedding
            embeddings.append(embedding)

        return embeddings

    def embed_query(self, text: str) -> List[float]:
        """Embed a query using the Nebius API.

        Args:
            text: The text to embed

        Returns:
            Embedding for the text
        """
        response = self.client.embeddings.create(
            model=self.model,
            input=text,
        )
        return response.data[0].embedding

    async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
        """Asynchronously embed a list of documents using the Nebius API.

        Note: This implementation falls back to the synchronous version as
        the Nebius OpenAI client might not have native async support.
        Consider implementing a fully async version if needed.

        Args:
            texts: The list of texts to embed

        Returns:
            List of embeddings, one for each text
        """
        return self.embed_documents(texts)

    async def aembed_query(self, text: str) -> List[float]:
        """Asynchronously embed a query using the Nebius API.

        Note: This implementation falls back to the synchronous version as
        the Nebius OpenAI client might not have native async support.
        Consider implementing a fully async version if needed.

        Args:
            text: The text to embed

        Returns:
            Embedding for the text
        """
        return self.embed_query(text)

## Setting Up LLM Components

This section initializes two key OpenAI components needed for our RAG system:

1. Nebius AI Embeddings:
   - Converts text into high-dimensional vector representations (embeddings)
   - These embeddings enable semantic search by capturing the meaning of text
   - Required for vector similarity search in Couchbase

2. Nebius AI Open Source LLM Model:
   - Serves as the cognitive engine for CrewAI agents
   - Powers agent reasoning, decision-making, and task execution
   - Enables agents to:
     - Process and understand retrieved context from vector search
     - Generate thoughtful responses based on that context
     - Follow instructions defined in agent roles and goals
     - Collaborate with other agents in the crew


In the CrewAI framework, the LLM acts as the "brain" for each agent, allowing them
to interpret tasks, retrieve relevant information via the RAG system, and generate
appropriate outputs based on their specialized roles and expertise.

In [31]:
embeddings = NebiusE5MistralEmbeddings(
    api_key=os.getenv('NEBIUS_AI_KEY'),  # Or set NEBIUS_API_KEY environment variable
    model="intfloat/e5-mistral-7b-instruct"
)

from arize_langchain.handlers import ArizeCallbackHandler

# Define the LLM with Nebius AI Studio
# First, ensure arize_client is defined from the previous cell (assumed to be executed prior).
# We'll create the callback handler only if the client was initialized.
arize_callback = None
if 'arize_client' in locals() and arize_client:
    try:
        # Define a unique model_id and model_version for your CrewAI application
        MODEL_ID = "crew-ai-rag-app"
        MODEL_VERSION = "1.0" # You can change this as you update your crew/prompts
        
        arize_callback = ArizeCallbackHandler(
            model_id=MODEL_ID,
            model_version=MODEL_VERSION, # Optional
            arize_client=arize_client
        )
        print("✅ ArizeCallbackHandler initialized.")
    except Exception as e:
        print(f"🔴 Failed to initialize ArizeCallbackHandler: {e}")
        arize_callback = None
else:
    print("🔴 Arize client not available or not initialized in a preceding cell, ArizeCallbackHandler will not be used.")

# If NEBIUS_AI_KEY is not set, this will fail. Add a check.
nebius_api_key = os.getenv('NEBIUS_AI_KEY')
if not nebius_api_key:
    print("🔴 NEBIUS_AI_KEY not set. Cannot initialize LLM.")
    llm = None # Or handle appropriately
else:
    llm = LLM(
        model="openai/meta-llama/Meta-Llama-3.1-70B-Instruct", # This is a model name, not a class
        base_url="https://api.studio.nebius.com/v1/",
        api_key=nebius_api_key 
        # Callbacks will be added to agents/crew, not here directly for crewai.LLM
    )
    print("OpenAI components initialized (LLM for CrewAI)")

# The original code also had OpenAIEmbeddings. This is separate from the Chat LLM.
# embeddings = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY) -> This was old
# Current embeddings:
# embeddings = NebiusE5MistralEmbeddings(...) - This is a custom class, not directly LangChain's OpenAIEmbeddings
# ArizeCallbackHandler is for LLM calls, not embedding calls typically.

OpenAI components initialized


# Setting Up the Couchbase Vector Store
A vector store is where we'll keep our embeddings. Unlike the FTS index, which is used for text-based search, the vector store is specifically designed to handle embeddings and perform similarity searches. When a user inputs a query, the search engine converts the query into an embedding and compares it against the embeddings stored in the vector store. This allows the engine to find documents that are semantically similar to the query, even if they don't contain the exact same words. By setting up the vector store in Couchbase, we create a powerful tool that enables our search engine to understand and retrieve information based on the meaning and context of the query, rather than just the specific words used.

In [36]:
# Setup vector store
vector_store = CouchbaseVectorStore(
    cluster=cluster,
    bucket_name=CB_BUCKET_NAME,
    scope_name=SCOPE_NAME,
    collection_name=COLLECTION_NAME,
    embedding=embeddings,
    index_name=INDEX_NAME,
)
print("Vector store initialized")

Vector store initialized


# Load the BBC News Dataset
To build a search engine, we need data to search through. We use the BBC News dataset from RealTimeData, which provides real-world news articles. This dataset contains news articles from BBC covering various topics and time periods. Loading the dataset is a crucial step because it provides the raw material that our search engine will work with. The quality and diversity of the news articles make it an excellent choice for testing and refining our search engine, ensuring it can handle real-world news content effectively.

The BBC News dataset allows us to work with authentic news articles, enabling us to build and test a search engine that can effectively process and retrieve relevant news content. The dataset is loaded using the Hugging Face datasets library, specifically accessing the "RealTimeData/bbc_news_alltime" dataset with the "2024-12" version.

In [37]:
try:
    news_dataset = load_dataset(
        "RealTimeData/bbc_news_alltime", "2024-12", split="train"
    )
    print(f"Loaded the BBC News dataset with {len(news_dataset)} rows")
    logging.info(f"Successfully loaded the BBC News dataset with {len(news_dataset)} rows.")
except Exception as e:
    raise ValueError(f"Error loading the BBC News dataset: {str(e)}")

Loaded the BBC News dataset with 2687 rows


## Cleaning up the Data
We will use the content of the news articles for our RAG system.

The dataset contains a few duplicate records. We are removing them to avoid duplicate results in the retrieval stage of our RAG system.

In [38]:
news_articles = news_dataset["content"]
unique_articles = set()
for article in news_articles:
    if article:
        unique_articles.add(article)
unique_news_articles = list(unique_articles)
print(f"We have {len(unique_news_articles)} unique articles in our database.")

We have 1749 unique articles in our database.


## Saving Data to the Vector Store
To efficiently handle the large number of articles, we process them in batches of articles at a time. This batch processing approach helps manage memory usage and provides better control over the ingestion process.

We first filter out any articles that exceed 50,000 characters to avoid potential issues with token limits. Then, using the vector store's add_texts method, we add the filtered articles to our vector database. The batch_size parameter controls how many articles are processed in each iteration.

This approach offers several benefits:
1. Memory Efficiency: Processing in smaller batches prevents memory overload
2. Error Handling: If an error occurs, only the current batch is affected
3. Progress Tracking: Easier to monitor and track the ingestion progress
4. Resource Management: Better control over CPU and network resource utilization

We use a conservative batch size of 100 to ensure reliable operation.
The optimal batch size depends on many factors including:
- Document sizes being inserted
- Available system resources
- Network conditions
- Concurrent workload

Consider measuring performance with your specific workload before adjusting.


In [39]:
batch_size = 100

# Automatic Batch Processing
articles = [article for article in unique_news_articles if article and len(article) <= 50000]

try:
    vector_store.add_texts(
        texts=articles,
        batch_size=batch_size
    )
    logging.info("Document ingestion completed successfully.")
except Exception as e:
    raise ValueError(f"Failed to save documents to vector store: {str(e)}")

## Creating a Vector Search Tool
After loading our data into the vector store, we need to create a tool that can efficiently search through these vector embeddings. This involves two key components:

### Vector Retriever
The vector retriever is configured to perform similarity searches. This creates a retriever that performs semantic similarity searches against our vector database. The similarity search finds documents whose vector embeddings are closest to the query's embedding in the vector space.

### Search Tool
The search tool wraps the retriever in a user-friendly interface that:
- Takes a query string as input
- Passes the query to the retriever to find relevant documents
- Formats the results with clear document separation using document numbers and dividers
- Returns the formatted results as a single string with each document clearly delineated

The tool is designed to integrate seamlessly with our AI agents, providing them with reliable access to our knowledge base through vector similarity search. The lambda function in the tool handles both direct string queries and structured query objects, ensuring flexibility in how the tool can be invoked.


In [40]:
# Create vector retriever
retriever = vector_store.as_retriever(
    search_type="similarity",
)

# Define the search tool using the @tool decorator
@tool("vector_search")
def search_tool(query: str) -> str:
    """Search for relevant documents using vector similarity.
    Input should be a simple text query string.
    Returns a list of relevant document contents.
    Use this tool to find detailed information about topics."""
    # Handle potential non-string query input if needed (similar to original lambda)
    # CrewAI usually passes the string directly based on task description
    # but checking doesn't hurt, though the Agent logic might handle this.
    # query_str = query if isinstance(query, str) else str(query.get('query', '')) # Simplified for now

    # Invoke the retriever
    docs = retriever.invoke(query)

    # Format the results
    formatted_docs = "\n\n".join([
        f"Document {i+1}:\n{'-'*40}\n{doc.page_content}"
        for i, doc in enumerate(docs)
    ])
    return formatted_docs

# Creating CrewAI Agents

We'll create two specialized AI agents using the CrewAI framework to handle different aspects of our information retrieval and analysis system:

## Research Expert Agent
This agent is designed to:
- Execute semantic searches using our vector store
- Analyze and evaluate search results
- Identify key information and insights
- Verify facts across multiple sources
- Synthesize findings into comprehensive research summaries

## Technical Writer Agent  
This agent is responsible for:
- Taking research findings and structuring them logically
- Converting technical concepts into clear explanations
- Ensuring proper citation and attribution
- Maintaining engaging yet informative tone
- Producing well-formatted final outputs

The agents work together in a coordinated way:
1. Research agent finds and analyzes relevant documents
2. Writer agent takes those findings and crafts polished responses
3. Both agents use a custom response template for consistent output

This multi-agent approach allows us to:
- Leverage specialized expertise for different tasks
- Maintain high quality through separation of concerns
- Create more comprehensive and reliable outputs
- Scale the system's capabilities efficiently

In [None]:
# Custom response template
response_template = """
Analysis Results
===============
{%- if .Response %}
{{ .Response }}
{%- endif %}

Sources
=======
{%- for tool in .Tools %}
* {{ tool.name }}
{%- endfor %}

Metadata
========
* Confidence: {{ .Confidence }}
* Analysis Time: {{ .ExecutionTime }}
"""

# Create research agent
# Ensure arize_callback is accessible from the LLM initialization cell
researcher_callbacks = [arize_callback] if 'arize_callback' in locals() and arize_callback else None
writer_callbacks = [arize_callback] if 'arize_callback' in locals() and arize_callback else None

researcher = Agent(
    role='Expert Research Analyst',
    goal='Conduct thorough research and analysis on the given topic and provide detailed insights.',
    backstory=(
        "As an Expert Research Analyst, you are adept at sifting through information to find "
        "relevant details, synthesizing them into a coherent analysis, and presenting your findings "
        "with clarity and precision. You are known for your meticulous approach and ability to "
        "uncover key insights that others might miss."
    ),
    llm=llm, # Using the centrally defined llm
    tools=[search_tool],
    verbose=True,
    allow_delegation=False,
    callbacks=researcher_callbacks # Added Arize callback
)

# Create writer agent
writer = Agent(
    role='Professional Technical Writer',
    goal='Take the research findings and craft a well-structured, clear, and concise response or article.',
    backstory=(
        "As a Professional Technical Writer, you specialize in transforming complex research findings "
        "into easily understandable and engaging content. You have a keen eye for detail, ensuring "
        "logical flow, proper formatting, and grammatical correctness to produce polished final outputs."
    ),
    llm=llm, # Using the centrally defined llm
    verbose=True,
    allow_delegation=False,
    callbacks=writer_callbacks # Added Arize callback
)

print("Agents created successfully. Arize callbacks configured if client and callback handler were available.")

Agents created successfully


## How CrewAI Agents Work in this RAG System

### Agent-Based RAG Architecture

This system uses a two-agent approach to implement Retrieval-Augmented Generation (RAG):

1. **Research Expert Agent**:
   - Receives the user query
   - Uses the vector search tool to retrieve relevant documents from Couchbase
   - Analyzes and synthesizes information from retrieved documents
   - Produces a comprehensive research summary with key findings

2. **Technical Writer Agent**:
   - Takes the research summary as input
   - Structures and formats the information
   - Creates a polished, user-friendly response
   - Ensures proper attribution and citation

#### How the Process Works:

1. **Query Processing**: User query is passed to the Research Agent
2. **Vector Search**: Query is converted to embeddings and matched against document vectors
3. **Document Retrieval**: Most similar documents are retrieved from Couchbase
4. **Analysis**: Research Agent analyzes documents for relevance and extracts key information
5. **Synthesis**: Research Agent combines findings into a coherent summary
6. **Refinement**: Writer Agent restructures and enhances the content
7. **Response Generation**: Final polished response is returned to the user

This multi-agent approach separates concerns (research vs. writing) and leverages
specialized expertise for each task, resulting in higher quality responses.


# Testing the Search System

Test the system with some example queries.

In [None]:
def process_query(query, researcher, writer):
    """
    Test the complete RAG system with a user query.

    This function tests both the vector search capability and the agent-based processing:
    1. Vector search: Retrieves relevant documents from Couchbase
    2. Agent processing: Uses CrewAI agents to analyze and format the response

    The function measures performance and displays detailed outputs from each step.
    """
    print(f"\nQuery: {query}")
    print("-" * 80)

    # Create tasks
    research_task = Task(
     ## add definition
    )

    writing_task = Task(
      ## add definition
    )

    # Create and execute crew
    crew = Crew(
     # define crew 
    )

    try:
        start_time = time.time()
        # Generate a unique ID for this prediction/run
        prediction_id = str(uuid.uuid4()) # Make sure uuid is imported

        result = crew.kickoff()
        elapsed_time = time.time() - start_time

        # Arize logging for successful crew execution
        # Assumes arize_client, MODEL_ID, MODEL_VERSION are available from preceding cells
        if 'arize_client' in globals() and arize_client and 'MODEL_ID' in globals() and MODEL_ID:
            try:
                features = {"query_text": query}
                prediction_label = {"final_response": str(result)} 
                
                tags = {
                    "crew_execution_time_seconds": str(round(elapsed_time, 2)),
                    "prediction_id": prediction_id 
                }
                if hasattr(crew, 'usage_metrics') and crew.usage_metrics:
                    tags["crew_total_tokens"] = str(crew.usage_metrics.get('total_tokens', 0))
                    tags["crew_successful_tasks"] = str(crew.usage_metrics.get('successful_tasks', 0))
                    tags["crew_tasks_iterations"] = str(crew.usage_metrics.get('tasks_iterations', 0))
                
                # Use globals() to access MODEL_VERSION if it's set globally
                model_version_to_log = globals().get('MODEL_VERSION') 

                response = arize_client.log(
                    model_id=MODEL_ID,
                    model_version=model_version_to_log, 
                    prediction_id=prediction_id,
                    features=features,
                    prediction_label=prediction_label,
                    tags=tags
                )
                
                if response and response.status_code == 200:
                    print(f"✅ Successfully logged main query/response to Arize with prediction_id: {prediction_id}")
                else:
                    print(f"🔴 Failed to log to Arize. Status: {response.status_code if response else 'Unknown'}, Response: {response.text if response else 'No response'}")

            except Exception as e_arize:
                print(f"🔴 Error logging to Arize: {e_arize}")
        else:
            print("ℹ️ Arize client or MODEL_ID not configured, skipping Arize logging for successful execution.")

        print(f"\nQuery completed in {elapsed_time:.2f} seconds")
        print("=" * 80)
        print("RESPONSE")
        print("=" * 80)
        print(result)

        # The original logic for printing detailed task outputs can remain if needed,
        # though ArizeCallbackHandler should capture individual LLM calls.
        # For simplicity, this part is removed as the primary output is 'result'.
        # if hasattr(crew, 'tasks') and all(hasattr(task, 'output') for task in crew.tasks):
        #     print("\n" + "=" * 80)
        #     print("DETAILED TASK OUTPUTS (from crew.tasks)")
        #     print("=" * 80)
        #     for i, task in enumerate(crew.tasks):
        #         if hasattr(task, 'output') and task.output:
        #             print(f"\nTask {i+1}: {task.description[:100]}...")
        #             print("-" * 40)
        #             # Accessing raw output if available, or the Pydantic model
        #             raw_output = task.output.raw if hasattr(task.output, 'raw') else str(task.output)
        #             print(f"Output: {raw_output}")
        #             print("-" * 40)

    except Exception as e:
        print(f"Error executing crew: {str(e)}")
        logging.error(f"Crew execution failed: {str(e)}", exc_info=True)
        # Arize logging for crew execution error
        if 'arize_client' in globals() and arize_client and 'MODEL_ID' in globals() and MODEL_ID:
            try:
                error_id = str(uuid.uuid4())
                model_version_to_log = globals().get('MODEL_VERSION')
                arize_client.log(
                    model_id=MODEL_ID,
                    model_version=model_version_to_log,
                    prediction_id=error_id, 
                    features={"query_text": query},
                    tags={"error_message": str(e), "event_type": "crew_error", "prediction_id": error_id}
                )
                print(f"✅ Logged crew execution error to Arize with id: {error_id}")
            except Exception as e_arize_err:
                print(f"🔴 Error logging error event to Arize: {e_arize_err}")
        else:
            print("ℹ️ Arize client or MODEL_ID not configured, skipping Arize logging for error.")

In [None]:
# Disable logging before running the query
logging.disable(logging.CRITICAL)

query = "What are the key details about the FA Cup third round draw? Include information about Manchester United vs Arsenal, Tamworth vs Tottenham, and other notable fixtures."
process_query(query, researcher, writer)


Query: What are the key details about the FA Cup third round draw? Include information about Manchester United vs Arsenal, Tamworth vs Tottenham, and other notable fixtures.
--------------------------------------------------------------------------------


[1m[93m 
[2025-04-08 15:45:31][INFO]: Planning the crew execution[00m


[1m[95m# Agent:[00m [1m[92mResearch Expert[00m
[95m## Task:[00m [92mResearch and analyze information relevant to: What are the key details about the FA Cup third round draw? Include information about Manchester United vs Arsenal, Tamworth vs Tottenham, and other notable fixtures.1. The Research Expert will identify the main query regarding the FA Cup third round draw, focusing on Manchester United vs Arsenal, Tamworth vs Tottenham, and other significant fixtures. 2. Using the 'vector_search' tool, the expert will input the query as a simple text string to search for relevant documents utilizing vector similarity algorithms. 3. The search results will consist of a list of documents containing insights and data about notable fixtures, including match previews, historical context, and expectations for the matches. 4. The expert will sift through the results, taking note of key findings and supporting evidence about each fixture, emphasizing the background and any associated narra



[1m[95m# Agent:[00m [1m[92mResearch Expert[00m
[95m## Thought:[00m [92mThought: I need to find detailed information about the FA Cup third round draw, including Manchester United vs Arsenal, Tamworth vs Tottenham, and other notable fixtures. I'll start by searching for relevant documents using the vector search tool.[00m
[95m## Using tool:[00m [92mvector_search[00m
[95m## Tool Input:[00m [92m
"{\"query\": \"FA Cup third round draw Manchester United vs Arsenal Tamworth vs Tottenham\"}"[00m
[95m## Tool Output:[00m [92m
[00m




[1m[95m# Agent:[00m [1m[92mResearch Expert[00m
[95m## Final Answer:[00m [92m
The FA Cup third round draw has thrown up some exciting matches, including Manchester United vs Arsenal and Tamworth vs Tottenham. Manchester United come into the match in good form, having won their last three league matches, while Arsenal have been struggling in recent weeks. Tamworth, meanwhile, will be looking to make the most of their day out against Tottenham, but they'll know it won't be easy against a team of Tottenham's quality.

Other notable fixtures include Liverpool vs Everton, Chelsea vs Morecambe, and Manchester City vs Swindon Town. All of these matches are set to be exciting encounters, with the potential for upsets on the cards.

In terms of history, Manchester United have won the FA Cup a record 12 times, while Arsenal have lifted the trophy 13 times. Tottenham, meanwhile, have won the FA Cup eight times, but they've not won it since 1991.

The match between Manchester United and 

[1m[95m# Agent:[00m [1m[92mTechnical Writer[00m
[95m## Task:[00m [92mCreate a comprehensive and well-structured response1. The Technical Writer will receive the detailed analysis produced by the Research Expert as a foundation for crafting a well-structured response. 2. The writer will begin by reviewing the material to ensure clarity and accuracy, marking main points and key findings that are essential for the structured response. 3. The writer will outline a draft structure, including an introduction to the FA Cup third round, sections for each of the highlighted fixtures (Manchester United vs Arsenal, Tamworth vs Tottenham), and any other notable matches. 4. In each section, the writer will incorporate the relevant insights, ensuring each fixture's importance and details are accentuated. 5. The writer will ensure logical flow and coherence in the response, linking each section effectively. 6. Finally, the response will be thoroughly proofread to correct any grammatical erro



[1m[95m# Agent:[00m [1m[92mTechnical Writer[00m
[95m## Final Answer:[00m [92m
The FA Cup third round draw has produced some thrilling matchups, showcasing the competition's unique ability to pit teams of varying stature against each other. Two fixtures, in particular, stand out: Manchester United vs Arsenal and Tamworth vs Tottenham. These matches promise to deliver exciting encounters, rich in history and laden with potential for upsets.

**Introduction to the FA Cup Third Round**

The FA Cup, renowned for its capacity to deliver surprising results, has once again lived up to its reputation with the third-round draw. This stage of the competition is often marked by David vs Goliath battles, where smaller clubs can make headlines by upsetting their more illustrious opponents. The draw has set the stage for several compelling matches, with some of the biggest names in English football taking on lesser-known opponents.

**Manchester United vs Arsenal**

One of the most anticip


Query completed in 84.77 seconds
RESPONSE
The FA Cup third round draw has produced some thrilling matchups, showcasing the competition's unique ability to pit teams of varying stature against each other. Two fixtures, in particular, stand out: Manchester United vs Arsenal and Tamworth vs Tottenham. These matches promise to deliver exciting encounters, rich in history and laden with potential for upsets.

**Introduction to the FA Cup Third Round**

The FA Cup, renowned for its capacity to deliver surprising results, has once again lived up to its reputation with the third-round draw. This stage of the competition is often marked by David vs Goliath battles, where smaller clubs can make headlines by upsetting their more illustrious opponents. The draw has set the stage for several compelling matches, with some of the biggest names in English football taking on lesser-known opponents.

**Manchester United vs Arsenal**

One of the most anticipated fixtures of the third round is the encoun

## Conclusion
By following these steps, you've built a powerful RAG system that combines Couchbase's vector storage capabilities with CrewAI's agent-based architecture. This multi-agent approach separates research and writing concerns, resulting in higher quality responses to user queries.

The system demonstrates several key advantages:
1. Efficient vector search using Couchbase's vector store
2. Specialized AI agents that focus on different aspects of the RAG pipeline
3. Collaborative workflow between agents to produce comprehensive, well-structured responses
4. Scalable architecture that can be extended with additional agents for more complex tasks

Whether you're building a customer support system, a research assistant, or a knowledge management solution, this agent-based RAG approach provides a flexible foundation that can be adapted to various use cases and domains.