In [None]:
!pip install dotenv==0.9.9 llama_stack==0.2.6 requests==2.32.3

In [None]:
import logging
import uuid
import os
import random
import string

from dotenv import load_dotenv
from llama_stack_client import Agent, LlamaStackClient, RAGDocument
from llama_stack_client.lib.agents.event_logger import EventLogger
from requests import post
from termcolor import cprint

from src.loki import query_loki_logs
from src.utils import step_printer

load_dotenv()

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

### Kick off customer onboarding

In [None]:
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


def onboard_customer():
    customer_onboarding_service_url = 'http://customer-onboarding-service-customer-onboarding.apps.cluster-srb7z.srb7z.sandbox2014.opentlc.com/'
    payload = {
        'customerId': f'CUST-{id_generator(chars=string.digits)}',
        'fullName': id_generator(chars=string.ascii_letters),
        'nationalId': id_generator(),
        'birthDate': id_generator(chars=string.digits),
    }
    post(customer_onboarding_service_url, json=payload)


n_customers = 10
#for i in range(n_customers):
#    onboard_customer()

print(f'submitted {n_customers} new customers')

### RAG agent

In [None]:
base_url = os.getenv("REMOTE_BASE_URL")

client = LlamaStackClient(base_url=base_url)

print("Connected to Llama Stack server")

model_id = "llama32-3b"

temperature = float(os.getenv("TEMPERATURE", 0.0))
if temperature > 0.0:
    top_p = float(os.getenv("TOP_P", 0.95))
    strategy = {"type": "top_p", "temperature": temperature, "top_p": top_p}
else:
    strategy = {"type": "greedy"}

max_tokens = 100000  # int(os.getenv("MAX_TOKENS", 512))

# sampling_params will later be used to pass the parameters to Llama Stack Agents/Inference APIs
sampling_params = {
    "strategy": strategy,
    "max_tokens": max_tokens,
}

# For this demo, we are using Milvus Lite, which is our preferred solution. Any other Vector DB supported by Llama Stack can be used.

# RAG vector DB settings
VECTOR_DB_EMBEDDING_MODEL = os.getenv("VDB_EMBEDDING")
VECTOR_DB_EMBEDDING_DIMENSION = int(os.getenv("VDB_EMBEDDING_DIMENSION", 384))
VECTOR_DB_CHUNK_SIZE = int(os.getenv("VECTOR_DB_CHUNK_SIZE", 512))
VECTOR_DB_PROVIDER_ID = os.getenv("VDB_PROVIDER")

# Unique DB ID for session
vector_db_id = f"test_vector_db_{uuid.uuid4()}"

stream_env = os.getenv("STREAM", "False")
# the Boolean 'stream' parameter will later be passed to Llama Stack Agents/Inference APIs
# any value non equal to 'False' will be considered as 'True'
stream = (stream_env != "False")

client.vector_dbs.register(
    vector_db_id=vector_db_id,
    embedding_model=os.getenv("VDB_EMBEDDING"),
    embedding_dimension=int(os.getenv("VDB_EMBEDDING_DIMENSION", 384)),
    provider_id=os.getenv("VDB_PROVIDER"),
)

# ingest the documents into the newly created document collection
urls = [
    ("https://raw.githubusercontent.com/mamurak/error-identification-demo/refs/heads/main/source_docs/onboarding-application.pdf", "application/pdf"),
]
documents = [
    RAGDocument(
        document_id=f"num-{i}",
        content=url,
        mime_type=url_type,
        metadata={},
    )
    for i, (url, url_type) in enumerate(urls)
]
client.tool_runtime.rag_tool.insert(
    documents=documents,
    vector_db_id=vector_db_id,
    chunk_size_in_tokens=int(os.getenv("VECTOR_DB_CHUNK_SIZE", 512)),
)

print(f"Inference Parameters:\n\tModel: {model_id}\n\tSampling Parameters: {sampling_params}\n\tstream: {stream}")

In [None]:
# Get list of registered tools and extract their toolgroup IDs
registered_tools = client.tools.list()
registered_toolgroups = [tool.toolgroup_id for tool in registered_tools]

if "builtin::rag" not in registered_toolgroups:
    client.toolgroups.register(
        toolgroup_id="builtin::rag",
        provider_id="milvus"
    )

# Log the current toolgroups registered
print(f"Your Llama Stack server is already registered with the following tool groups: {set(registered_toolgroups)}\n")

In [None]:
rag_prompt = """
    You are a helpful assistant. You must use the knowledge search tool to answer user questions.
"""

In [None]:
builtin_rag = dict(
    name="builtin::rag",
    args={"vector_db_ids": [vector_db_id]},
)

rag_agent = Agent(
    client=client,
    model=model_id,
    instructions=rag_prompt,
    tools=[builtin_rag],
    sampling_params={"max_tokens": 100000},
)

In [None]:
user_prompts = [
    """What is the AmlValidationService?"""
]

for prompt in user_prompts:
    print("\n"+"="*50)
    cprint(f"Processing user query: {prompt}", "blue")
    print("="*50)
    response = rag_agent.create_turn(
        messages=[
            {
                "role": "user",
                "content": prompt,
            }
        ],
        session_id=rag_agent.create_session(f"rag-session_{uuid.uuid4()}"),
        stream=stream
    )
    if stream:
        for log in EventLogger().log(response):
            log.print()
    else:
        step_printer(response.steps)

### Loki tool

In [None]:
instructions = """
You are a helpful assistant that retrieves error logs from Kubernetes containers using Loki.

When users ask for error logs:
1. Use the query_loki_logs tool to retrieve actual logs from containers from the past 1 hour
2. Extract error and failure messages found within the retrieved logs
3. Present the error messages in a readable format with timestamps and parsed messages
4. If no logs are found, suggest checking container/namespace names

The logs are returned in a parsed format showing timestamp and the actual log message content.
Always use the tool when log data is requested rather than giving general explanations.
"""

loki_agent = Agent(
    client,
    model=model_id,
    instructions=instructions,
    tools=[query_loki_logs],
    sampling_params={"max_tokens": 100000},
)

In [None]:
user_prompts = [
    """Get error logs from container customer-validation-service in namespace customer-onboarding"""
]

for prompt in user_prompts:
    print("\n"+"="*50)
    cprint(f"Processing user query: {prompt}", "blue")
    print("="*50)
    response = loki_agent.create_turn(
        messages=[
            {
                "role": "user",
                "content": prompt,
            }
        ],
        session_id=loki_agent.create_session(f"loki-session_{uuid.uuid4()}"),
        stream=stream
    )
    if stream:
        for log in EventLogger().log(response):
            log.print()
    else:
        step_printer(response.steps)

In [None]:
instructions = """
You are a helpful assistant that retrieves logs from Kubernetes containers using Loki to identify the processing status of individual customers.

When users ask for the onboarding status of a customer with a given customer ID:
1. Use the query_loki_logs tool to retrieve the logs from the 'customer-validation-service' container in namespace 'customer-onboarding' from the last 1 hour
2. Look for all logged messages associated with the given customer ID
3. Present the messages in a readable format with timestamps and parsed messages
4. If no logs are found, suggest checking container/namespace names

The logs are returned in a parsed format showing timestamp and the actual log message content.
Always use the tool when log data is requested rather than giving general explanations.
"""

loki_agent = Agent(
    client,
    model=model_id,
    instructions=instructions,
    tools=[query_loki_logs],
    sampling_params={"max_tokens": 100000},
)

In [None]:
user_prompts = [
    """What is the onboarding status of customer CUST-695437?"""
]

for prompt in user_prompts:
    print("\n"+"="*50)
    cprint(f"Processing user query: {prompt}", "blue")
    print("="*50)
    response = loki_agent.create_turn(
        messages=[
            {
                "role": "user",
                "content": prompt,
            }
        ],
        session_id=loki_agent.create_session(f"loki-session_{uuid.uuid4()}"),
        stream=stream
    )
    if stream:
        for log in EventLogger().log(response):
            log.print()
    else:
        step_printer(response.steps)

### RAG + Agent

In [None]:
instructions = """
You are a helpful assistant. You have access to a number of tools.
Whenever a tool is called, be sure return the Response in a friendly and helpful tone.
"""

full_agent = Agent(
    client,
    model=model_id,
    instructions=instructions,
    tools=[query_loki_logs, builtin_rag],
    sampling_params={"max_tokens": 100000},
)

In [None]:
user_prompts = [
    "Retrieve the logs from the 'aml-validation-service' container in namespace 'customer-onboarding' from the last 1 hour",
    "Within the retrieved logs find any error messages associated with customer ID CUST-695437",
    "Use the knowledge search tool to look up additional details about this error message",
    "Report your findings and provide a summary based on your search",
]
session_id = full_agent.create_session(session_name="full")
for i, prompt in enumerate(user_prompts):
    print("\n"+"="*50)
    cprint(f"Processing user query: {prompt}", "blue")
    print("="*50)
    response = full_agent.create_turn(
        messages=[
            {
                "role": "user",
                "content": prompt,
            }
        ],
        session_id=session_id,
        stream=stream
    )
    if stream:
        for log in EventLogger().log(response):
            log.print()
    else:
        step_printer(response.steps)