## LlamaStack + CrewAI Integration Tutorial

This notebook guides you through integrating **LlamaStack** with **CrewAI** to build a complete Retrieval-Augmented Generation (RAG) system.

### Overview

- **LlamaStack**: Provides the infrastructure for running LLMs and vector databases.
- **CrewAI**: Offers a framework for orchestrating agents and tasks.
- **Integration**: Leverages LlamaStack's OpenAI-compatible API with CrewAI.

### What You Will Learn

1.  How to set up and start the LlamaStack server using the Together AI provider.
2.  How to create and manage vector databases within LlamaStack.
3.  How to build RAG chains with CrewAI by utilizing the LlamaStack server.
4.  How to query the RAG chain for effective information retrieval and generation.

### Prerequisites

A Together AI API key is required to run the examples in this notebook.

---

### 1. Installation and Setup
#### Install Required Dependencies

Begin by installing all necessary packages for CrewAI integration. Ensure your `TOGETHER_API_KEY` is set as an environment variable.

In [None]:
!pip install uv
!uv tool install crewai
import os
import getpass

try:
    from google.colab import userdata
    os.environ['TOGETHER_API_KEY'] = userdata.get('TOGETHER_API_KEY')
except ImportError:
    print("Not in Google Colab environment")

for key in ['TOGETHER_API_KEY']:
    try:
        api_key = os.environ[key]
        if not api_key:
            raise ValueError(f"{key} environment variable is empty")
    except KeyError:
        api_key = getpass.getpass(f"{key} environment variable is not set. Please enter your API key: ")
        os.environ[key] = api_key

`[36mcrewai[39m` is already installed
Not in Google Colab environment


TOGETHER_API_KEY environment variable is not set. Please enter your API key:  ········


TOGETHER_API_KEY environment variable is not set. Please enter your API key:  ········


### 2. LlamaStack Server Setup

#### Build and Start LlamaStack Server

This section sets up the LlamaStack server with:
- **Together AI** as the inference provider
- **FAISS** as the vector database
- **Sentence Transformers** for embeddings

The server runs on `localhost:8321` and provides OpenAI-compatible endpoints.

In [None]:
import os
import subprocess
import time


if "UV_SYSTEM_PYTHON" in os.environ:
    del os.environ["UV_SYSTEM_PYTHON"]

# this command installs all the dependencies needed for the llama stack server with the together inference provider
!uv run --with llama-stack llama stack build --distro starter --image-type venv


def run_llama_stack_server_background():
    log_file = open("llama_stack_server.log", "w")
    process = subprocess.Popen(
        "uv run --with llama-stack llama stack run starter --image-type venv",
        shell=True,
        stdout=log_file,
        stderr=log_file,
        text=True,
    )

    print(f"Starting Llama Stack server with PID: {process.pid}")
    return process


def wait_for_server_to_start():
    import requests
    from requests.exceptions import ConnectionError

    url = "http://0.0.0.0:8321/v1/health"
    max_retries = 30
    retry_interval = 2

    print("Waiting for server to start", end="")
    for _ in range(max_retries):
        try:
            response = requests.get(url)
            if response.status_code == 200:
                print("\nServer is ready!")
                return True
        except ConnectionError:
            print(".", end="", flush=True)
            time.sleep(retry_interval)

    print("\nServer failed to start after", max_retries * retry_interval, "seconds")
    return False


# use this helper if needed to kill the server
def kill_llama_stack_server():
    # Kill any existing llama stack server processes
    os.system("ps aux | grep -v grep | grep llama_stack.core.server.server | awk '{print $2}' | xargs kill -9")
server_process = run_llama_stack_server_background()
assert wait_for_server_to_start()

Environment '/Users/kaiwu/work/kaiwu/llama-stack/.venv' already exists, re-using it.
Virtual environment /Users/kaiwu/work/kaiwu/llama-stack/.venv is already active
[2mUsing Python 3.12.9 environment at: /Users/kaiwu/work/kaiwu/llama-stack/.venv[0m
[2mAudited [1m1 package[0m [2min 211ms[0m[0m
Installing pip dependencies
[2mUsing Python 3.12.9 environment at: /Users/kaiwu/work/kaiwu/llama-stack/.venv[0m
[2K[2mResolved [1m185 packages[0m [2min 1.84s[0m[0m                                       [0m
[2mUninstalled [1m3 packages[0m [2min 133ms[0m[0m
[2K[2mInstalled [1m3 packages[0m [2min 59ms[0m[0m                                [0m
 [31m-[39m [1mprotobuf[0m[2m==5.29.5[0m
 [32m+[39m [1mprotobuf[0m[2m==5.29.4[0m
 [31m-[39m [1mruamel-yaml[0m[2m==0.18.14[0m
 [32m+[39m [1mruamel-yaml[0m[2m==0.17.40[0m
 [31m-[39m [1mruff[0m[2m==0.12.5[0m
 [32m+[39m [1mruff[0m[2m==0.9.10[0m
Installing special provider module: torch torchvision tor

### 3. Initialize LlamaStack Client

Create a client connection to the LlamaStack server with API key for Together provider.



In [None]:
from llama_stack_client import LlamaStackClient

client = LlamaStackClient(
    base_url="http://0.0.0.0:8321",
    provider_data={"together_api_key": os.environ["TOGETHER_API_KEY"]},
)

#### Explore Available Models and Safety Features

Check what models and safety shields are available through your LlamaStack instance.

In [None]:
print("Available models:")
for m in client.models.list():
    print(f"- {m.identifier}")

print("----")
print("Available shields (safety models):")
for s in client.shields.list():
    print(s.identifier)
print("----")

INFO:httpx:HTTP Request: GET http://0.0.0.0:8321/v1/models "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET http://0.0.0.0:8321/v1/shields "HTTP/1.1 200 OK"


Available models:
- all-minilm
- ollama/all-minilm:l6-v2
- ollama/codellama:7b-code
- ollama/llama3.2:1b-instruct-fp16
- ollama/nomic-embed-text
- fireworks/accounts/fireworks/models/llama-v3p1-8b-instruct
- fireworks/accounts/fireworks/models/llama-v3p1-70b-instruct
- fireworks/accounts/fireworks/models/llama-v3p1-405b-instruct
- fireworks/accounts/fireworks/models/llama-v3p2-3b-instruct
- fireworks/accounts/fireworks/models/llama-v3p2-11b-vision-instruct
- fireworks/accounts/fireworks/models/llama-v3p2-90b-vision-instruct
- fireworks/accounts/fireworks/models/llama-v3p3-70b-instruct
- fireworks/accounts/fireworks/models/llama4-scout-instruct-basic
- fireworks/accounts/fireworks/models/llama4-maverick-instruct-basic
- fireworks/nomic-ai/nomic-embed-text-v1.5
- fireworks/accounts/fireworks/models/llama-guard-3-8b
- fireworks/accounts/fireworks/models/llama-guard-3-11b-vision
- together/meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo
- together/meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo
-

### 4. Vector Database Setup

#### Register a Vector Database

Create a FAISS vector database for storing document embeddings:

- **Vector DB ID**: Unique identifier for the database
- **Provider**: FAISS (Facebook AI Similarity Search)
- **Embedding Model**: Sentence Transformers model for text embeddings
- **Dimensions**: 384-dimensional embeddings

In [None]:
# Register a new clean vector database
vector_db = client.vector_dbs.register(
    vector_db_id="acme_docs",  # Use a new unique name
    provider_id="faiss",
    provider_vector_db_id="acme_docs_v2",
    embedding_model="sentence-transformers/all-MiniLM-L6-v2",
    embedding_dimension=384,
)
print("Registered new vector DB:", vector_db)

# List all registered vector databases
dbs = client.vector_dbs.list()
print("Existing vector DBs:", dbs)

INFO:httpx:HTTP Request: POST http://0.0.0.0:8321/v1/vector-dbs "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET http://0.0.0.0:8321/v1/vector-dbs "HTTP/1.1 200 OK"


Registered new vector DB: VectorDBRegisterResponse(embedding_dimension=384, embedding_model='sentence-transformers/all-MiniLM-L6-v2', identifier='acme_docs', provider_id='faiss', type='vector_db', provider_resource_id='acme_docs_v2', vector_db_name=None, owner=None, source='via_register_api')
Existing vector DBs: [VectorDBListResponseItem(embedding_dimension=384, embedding_model='sentence-transformers/all-MiniLM-L6-v2', identifier='acme_docs', provider_id='faiss', type='vector_db', provider_resource_id='acme_docs_v2', vector_db_name=None)]


#### Prepare Sample Documents

Create LLAMA Stack Chunks for FAISS vector store

In [None]:
from llama_stack_client.types.vector_io_insert_params import Chunk

docs = [
    ("Acme ships globally in 3-5 business days.", {"title": "Shipping Policy"}),
    ("Returns are accepted within 30 days of purchase.", {"title": "Returns Policy"}),
    ("Support is available 24/7 via chat and email.", {"title": "Support"}),
]

# Convert to Chunk objects
chunks = []
for _, (content, metadata) in enumerate(docs):
    # Transform metadata to required format with document_id from title
    metadata = {"document_id": metadata["title"]}
    chunk = Chunk(
        content=content,  # Required[InterleavedContent]
        metadata=metadata,  # Required[Dict]
    )
    chunks.append(chunk)

#### Insert Documents into Vector Database

Store the prepared documents in the FAISS vector database. This process:
1. Generates embeddings for each document
2. Stores embeddings with metadata
3. Enables semantic search capabilities

In [None]:
# Insert chunks into FAISS vector store

response = client.vector_io.insert(vector_db_id="acme_docs", chunks=chunks)
print("Documents inserted:", response)

INFO:httpx:HTTP Request: POST http://0.0.0.0:8321/v1/vector-io/insert "HTTP/1.1 200 OK"


Documents inserted: None


#### Test Vector Search

Query the vector database to verify it's working correctly. This performs semantic search to find relevant documents based on the query.

In [None]:
# Query chunks from FAISS vector store

query_chunk_response = client.vector_io.query(
    vector_db_id="acme_docs",
    query="How long does Acme take to ship orders?",
)
for chunk in query_chunk_response.chunks:
    print("metadata", ":", chunk.metadata)
    print("content", ":", chunk.content)

INFO:httpx:HTTP Request: POST http://0.0.0.0:8321/v1/vector-io/query "HTTP/1.1 200 OK"


metadata : {'document_id': 'Shipping Policy'}
content : Acme ships globally in 3-5 business days.
metadata : {'document_id': 'Shipping Policy'}
content : Acme ships globally in 3-5 business days.
metadata : {'document_id': 'Shipping Policy'}
content : Acme ships globally in 3-5 business days.


### 5. CrewAI Integration

#### Configure CrewAI with LlamaStack

Set up CrewAI to use LlamaStack's OpenAI-compatible API:

- **Base URL**: Points to LlamaStack's OpenAI endpoint
- **Headers**: Include Together AI API key for model access
- **Model**: Use Meta Llama 3.3 70B model via Together AI

In [None]:
import os
from crewai.llm import LLM

# Point LLM class to Llamastack Server

llamastack_llm = LLM(
    model="openai/together/meta-llama/Llama-3.3-70B-Instruct-Turbo", # it's an openai-api compatible model
    base_url="http://localhost:8321/v1/openai/v1",
    api_key = os.getenv("OPENAI_API_KEY", "dummy"),
)

INFO:httpx:HTTP Request: GET https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json "HTTP/1.1 200 OK"


#### Test LLM Connection

Verify that CrewAI LLM can successfully communicate with the LlamaStack server.

In [None]:
# Test llm with simple message
messages = [
    {"role": "system", "content": "You are a friendly assistant."},
    {"role": "user", "content": "Write a two-sentence poem about llama."},
]
llamastack_llm.call(messages)

[92m11:25:55 - LiteLLM:INFO[0m: utils.py:3258 - 
LiteLLM completion() model= together/meta-llama/Llama-3.3-70B-Instruct-Turbo; provider = openai
INFO:LiteLLM:
LiteLLM completion() model= together/meta-llama/Llama-3.3-70B-Instruct-Turbo; provider = openai
INFO:httpx:HTTP Request: POST http://localhost:8321/v1/openai/v1/chat/completions "HTTP/1.1 200 OK"
[92m11:25:59 - LiteLLM:INFO[0m: utils.py:1260 - Wrapper: Completed Call, calling success_handler
INFO:LiteLLM:Wrapper: Completed Call, calling success_handler


'With gentle eyes and a soft, fuzzy face, the llama roams the Andes with a peaceful, gentle pace. Its long neck bends as it grazes with ease, a symbol of serenity in the mountain breeze.'

#### Create CrewAI Custom Tool

Define a custom CrewAI tool, `LlamaStackRAGTool`, to encapsulate the logic for querying the LlamaStack vector database. This tool will be used by the CrewAI agent to perform retrieval during the RAG process.

-   **Input Schema**: Defines the expected input parameters for the tool, such as the user query, the vector database ID, and optional parameters like `top_k` and `score_threshold`.
-   **Tool Logic**: Implements the `_run` method, which takes the user query and vector database ID, calls the LlamaStack client's `vector_io.query` method, and formats the retrieved documents into a human-readable string for the LLM to use as context.

In [None]:
from crewai.tools import BaseTool
from typing import Any, List, Optional, Type
from pydantic import BaseModel, Field

# ---------- 1. Input schema ----------
class RAGToolInput(BaseModel):
    """Input schema for LlamaStackRAGTool."""
    query: str = Field(..., description="The user query for RAG search")
    vector_db_id: str = Field(...,
        description="ID of the vector database to search inside the Llama-Stack server",
    )
    top_k: Optional[int] = Field(
        default=5,
        description="How many documents to return",
    )
    score_threshold: Optional[float] = Field(
        default=None,
        description="Optional similarity score cut-off (0-1).",
    )

# ---------- 2. The tool ----------
class LlamaStackRAGTool(BaseTool):
    name: str = "Llama Stack RAG tool"
    description: str = (
        "This tool calls a Llama-Stack endpoint for retrieval-augmented generation. "
        "It takes a natural-language query and returns the most relevant documents."
    )
    args_schema: Type[BaseModel] = RAGToolInput
    client: Any
    vector_db_id: str = ""
    top_k: int = 5
    score_threshold: Optional[float] = None

    def _run(self, **kwargs: Any) -> str:
        # 1. Resolve parameters (use instance defaults when not supplied)
        print(kwargs)
        query: str = kwargs.get("query")                    # Required – schema enforces presence
        vector_db_id: str = kwargs.get("vector_db_id", self.vector_db_id)
        top_k: int = kwargs.get("top_k", self.top_k)
        score_threshold: float | None = kwargs.get(
            "score_threshold", self.score_threshold
        )
        if vector_db_id == "":
            print('vector_db_id is empty, please specify which vector_db to search')
            return "No documents found."
        # 2. Issue request to Llama-Stack
        response: List[dict] = self.client.vector_io.query(
            vector_db_id=vector_db_id,
            query=query,
            params={"max_chunks": top_k, "score_threshold": score_threshold}
        )

        # 3. Massage results into a single human-readable string
        if not response:
            return "No documents found."

        docs: List[str] = []
        for metadata,content in response:
            docs.append(f"metadata: {metadata}, content: {content}")
        return "\n".join(docs)


### 6. Building the RAG Chain

#### Create a Complete RAG Pipeline

Construct a CrewAI pipeline that orchestrates the RAG process. This pipeline includes:

1.  **Agent Definition**: Defining a CrewAI agent with a specific role (`RAG assistant`), goal, backstory, and the LlamaStack LLM and the custom RAG tool.
2.  **Task Definition**: Defining a CrewAI task for the agent to perform. The task description includes placeholders for the user query and vector database ID, which will be provided during execution. The task's expected output is an answer to the question based on the retrieved context.
3.  **Crew Definition**: Creating a CrewAI `Crew` object with the defined task and agent. This crew represents the complete RAG pipeline.

**CrewAI workflow**:
`User Query → CrewAI Task → Agent invokes LlamaStackRAGTool → LlamaStack Vector Search → Retrieved Context → Agent uses Context + Question → LLM Generation → Final Response`

In [None]:
from crewai import Agent, Crew, Task, Process

# ---- 3. Define the agent -----------------------------------------
agent = Agent(
    role="RAG assistant",
    goal="Answer user's question with provided context",
    backstory="You are an experienced search assistant specializing in finding relevant information from documentation and vector_db to answer user questions accurately.",
    allow_delegation=False,
    llm=llamastack_llm,
    tools=[LlamaStackRAGTool(client=client)])
# ---- 4. Wrap everything in a Crew task ---------------------------
task = Task(
    description="Answer the following questions: {query}, using the RAG_tool to search the provided vector_db_id {vector_db_id} if needed",
    expected_output="An answer to the question with provided context",
    agent=agent,
)
crew = Crew(tasks=[task], verbose=True)


### 7. Testing the RAG System

#### Example 1: Shipping Query

In [None]:
query = "How long does shipping take?"
response = crew.kickoff(inputs={"query": query,"vector_db_id": "acme_docs"})
print("❓", query)
print("💡", response)

Output()

[92m11:27:26 - LiteLLM:INFO[0m: utils.py:3258 - 
LiteLLM completion() model= together/meta-llama/Llama-3.3-70B-Instruct-Turbo; provider = openai
INFO:LiteLLM:
LiteLLM completion() model= together/meta-llama/Llama-3.3-70B-Instruct-Turbo; provider = openai
INFO:httpx:HTTP Request: POST http://localhost:8321/v1/openai/v1/chat/completions "HTTP/1.1 200 OK"
[92m11:27:29 - LiteLLM:INFO[0m: utils.py:1260 - Wrapper: Completed Call, calling success_handler
INFO:LiteLLM:Wrapper: Completed Call, calling success_handler


INFO:httpx:HTTP Request: POST http://0.0.0.0:8321/v1/vector-io/query "HTTP/1.1 200 OK"
[92m11:27:29 - LiteLLM:INFO[0m: utils.py:3258 - 
LiteLLM completion() model= together/meta-llama/Llama-3.3-70B-Instruct-Turbo; provider = openai
INFO:LiteLLM:
LiteLLM completion() model= together/meta-llama/Llama-3.3-70B-Instruct-Turbo; provider = openai
INFO:httpx:HTTP Request: POST http://localhost:8321/v1/openai/v1/chat/completions "HTTP/1.1 200 OK"
[92m11:27:31 - LiteLLM:INFO[0m: utils.py:1260 - Wrapper: Completed Call, calling success_handler
INFO:LiteLLM:Wrapper: Completed Call, calling success_handler


❓ How long does shipping take?
💡 Acme ships globally in 3-5 business days.


#### Example 2: Returns Policy Query

In [None]:
query = "Can I return a product after 40 days?"
response = crew.kickoff(inputs={"query": query,"vector_db_id": "acme_docs"})
print("❓", query)
print("💡", response)

Output()

[92m11:28:03 - LiteLLM:INFO[0m: utils.py:3258 - 
LiteLLM completion() model= together/meta-llama/Llama-3.3-70B-Instruct-Turbo; provider = openai
INFO:LiteLLM:
LiteLLM completion() model= together/meta-llama/Llama-3.3-70B-Instruct-Turbo; provider = openai
INFO:httpx:HTTP Request: POST http://localhost:8321/v1/openai/v1/chat/completions "HTTP/1.1 200 OK"
[92m11:28:07 - LiteLLM:INFO[0m: utils.py:1260 - Wrapper: Completed Call, calling success_handler
INFO:LiteLLM:Wrapper: Completed Call, calling success_handler
[92m11:28:07 - LiteLLM:INFO[0m: utils.py:3258 - 
LiteLLM completion() model= together/meta-llama/Llama-3.3-70B-Instruct-Turbo; provider = openai
INFO:LiteLLM:
LiteLLM completion() model= together/meta-llama/Llama-3.3-70B-Instruct-Turbo; provider = openai
INFO:httpx:HTTP Request: POST http://localhost:8321/v1/openai/v1/chat/completions "HTTP/1.1 200 OK"
[92m11:28:08 - LiteLLM:INFO[0m: utils.py:1260 - Wrapper: Completed Call, calling success_handler
INFO:LiteLLM:Wrapper: Comp

❓ Can I return a product after 40 days?
💡 Returns are accepted within 30 days of purchase. Since the question asks about returning a product after 40 days, it is clear that this falls outside of the specified return window. The return policy does not specify any exceptions or extensions, so it is likely that returns will not be accepted after 40 days. However, it's always best to contact the seller or retailer directly to confirm their return policy and see if they can make any exceptions. The exact wording of the return policy is: "Returns are accepted within 30 days of purchase."


---

We have successfully built a RAG system that combines:

-   **LlamaStack** for infrastructure (LLM serving + vector database)
-   **CrewAI** for orchestration (agents, tasks, and tools)
-   **Together AI** for high-quality language models

### Key Benefits

1.  **Unified Infrastructure**: A single server for LLMs and vector databases simplifies deployment and management.
2.  **OpenAI Compatibility**: Enables easy integration with existing libraries and frameworks that support the OpenAI API standard, such as CrewAI.
3.  **Multi-Provider Support**: Offers the flexibility to switch between different LLM and embedding providers without altering the core application logic.
4.  **Production Ready**: LlamaStack includes features designed for production environments, such as built-in safety shields and monitoring capabilities.

### Next Steps

-   Implement more sophisticated document processing and chunking techniques.
-   Add conversation memory within the CrewAI pipeline for multi-turn interactions.
-   Integrate advanced safety filtering and monitoring using LlamaStack's features.
-   Scale the system to handle larger document collections and higher query volumes.
-   Integrate the RAG system with web frameworks like FastAPI or Streamlit to build interactive applications.

---

##### 🔧 Cleanup

Remember to stop the LlamaStack server process when you are finished to free up resources. You can use the `kill_llama_stack_server()` helper function defined earlier in the notebook.