# LLMOps: Observability

LLMOps involves a comprehensive set of activities, including:
* Model deployment and maintenance: deploying and managing LLMs on cloud platforms or on-premises infrastructure
* Data management: curating and preparing training data, as well as monitoring and maintaining data quality
* Model training and fine-tuning: training and refining LLMs to improve their performance on specific tasks
* Monitoring and evaluation: tracking LLM performance, identifying errors, and optimizing models
* Security and compliance: ensuring the security and regulatory compliance of LLM operations
[What is LLMOps (large language model operations)?](https://cloud.google.com/discover/what-is-llmops?hl=en)


Observability is the broader concept of understanding what is happening under the hood of your LLM application. Traces are the object used to achieve deep observability.
A trace is a piece of logged data from a AIGen Workflow (Agent, RAG, so on) to allow development, debugging, monitoring and explainability
* Developing and debugging: Complex agentic workflows requiere an easy way to check input/outputs
* Monitoring: Ability to drill down in terms on latency and tokens
* Explainability: Track back LLMs outputs in a cohesive lineage 

In addition, using a platform will ease this process and allow a desired level of **reproducibility to experiments**.

Most popular platformns:

![llmops-platforms](docs/llmops-platforms.png)


**STOP PRINTING IN NOTEBOOKS**



# Langfuse

Open Source LLM Engineering Platform: Traces, evals, prompt management and metrics to debug and improve your LLM application
![langufse-landscape](docs/langufse-landscape.png)

In [1]:
%load_ext autoreload
%autoreload 2

In [19]:
import os
from pathlib import Path
from dotenv import load_dotenv
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_qdrant import QdrantVectorStore
from langchain_core.prompts import ChatPromptTemplate
from src import conf

# Params

In [22]:
conf_infra = conf.load(file="infra.yaml")
conf_settings = conf.load(file="settings.yaml")

LLM_WORKHORSE = conf_settings.llm_workhorse
EMBEDDINGS = conf_settings.embeddings
INDEX_NAME = conf_settings.vdb_index
LANGFUSE_HOST = conf_infra.llmops_url
VDB_URL = conf_infra.vdb_url


# Environment Variables

In [23]:
load_dotenv()

OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")
# LANGFUSE_SECRET_KEY = 
# LANGFUSE_PUBLIC_KEY = 
os.environ['LANGFUSE_HOST'] = LANGFUSE_HOST

# Clients

In [25]:
llm = ChatOpenAI(
    api_key=OPENAI_API_KEY,
    model=LLM_WORKHORSE,
)
embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY, model=EMBEDDINGS)
vector_store = QdrantVectorStore.from_existing_collection(
    embedding=embeddings,
    collection_name=INDEX_NAME,
    url=VDB_URL,
    api_key=QDRANT_API_KEY,
)

In [9]:
os.environ["LANGFUSE_SECRET_KEY"][:5]

'sk-lf'

In [10]:
os.environ["LANGFUSE_PUBLIC_KEY"][:5]

'pk-lf'

In [11]:
os.environ['LANGFUSE_HOST']

'https://cloud.langfuse.com'

# Tracing

A **trace** typically represents a single request or operation. It contains the overall input and output of the function, as well as metadata about the request ( i.e. user, session, tags, etc.).  
Each trace can contain **multiple observations** to log the individual steps of the execution. Usually, a trace corresponds to a single api call of an application.  
**Sessions** are used to group traces that are part of the same user interaction. A common example is a thread in a chat interface.  

## Spans
* Manually
* Decorator @observe
* Context manager
* Integrations

In [8]:
from langfuse import get_client
 
langfuse = get_client()
 
# Create a span without a context manager
span = langfuse.start_span(name="user-request")
 
# Your processing logic here
span.update(output="Request processed")
 
# Child spans must be created using the parent span object
nested_span = span.start_span(name="nested-span")
nested_span.update(output="Nested span output")
 
# Important: Manually end the span
nested_span.end()
 
# Important: Manually end the parent span
span.end()
 
# Flush events in short-lived applications
langfuse.flush()

In [None]:
from langfuse import observe, get_client

@observe
def my_function():
    return "Hello, world!" # Input/output and timings are automatically captured
 
my_function()
 
# Flush events in short-lived applications
langfuse.flush()

In [14]:
from langfuse.langchain import CallbackHandler
 
langfuse_handler = CallbackHandler()

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
 
llm = ChatOpenAI(model_name=LLM_WORKHORSE)
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
chain = prompt | llm
 
response = chain.invoke(
    {"topic": "cats"}, 
    config={"callbacks": [langfuse_handler]}
    )

## Sessions, users, tags and other metadata

In [17]:
from langfuse.langchain import CallbackHandler
 
handler = CallbackHandler()
 
ID_SESSION = "2025-07-30"
ID_USER = "manualrg"
# Pass langfuse_session_id as metadata to the chain invocation
chain.invoke(
    {"topic": "devops"},
    config={
        "callbacks": [handler],
        "metadata": {
            "langfuse_session_id": ID_SESSION,
            "langfuse_user_id": ID_USER,
            "langfuse_tags": ["mbit", "retrieve-k=3"],
            # keys not matching will be stored at metadata
            "foo": "bar"
        },
    },
)

AIMessage(content='- Why did the DevOps engineer cross the road? To automate the chicken so it could cross in every environment.\n\n- "It works on my machine" is a valid service-level agreement — if you have exactly one user.\n\n- How many DevOps engineers does it take to change a light bulb? None — they just declare a new bulb in code, apply, and roll out a new immutable bulb.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 410, 'prompt_tokens': 13, 'total_tokens': 423, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 320, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-5-mini-2025-08-07', 'system_fingerprint': None, 'id': 'chatcmpl-CEOpeqlvmCeU9YxxZ5XnQKquYEOpe', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='run--64a7138c-24a7-4e66-b1c1-22c37d0d1f67-0', usage_metadata={'input_tokens': 1

# Integration in Langraph

It is posible to integrate it like in LC at invokation time, but also, at compile time.


In [39]:
from rag import main

langfuse_handler = CallbackHandler()

rag_graph = main(
    "space", 3
)

rag_graph.invoke({"question": "is Althera A larger than the Sun?"},
                 config={
        "callbacks": [handler],
        "metadata": {
            "langfuse_session_id": ID_SESSION,
            "langfuse_user_id": ID_USER,
            "langfuse_tags": ["mbit", "retrieve-k=3"],
        },
    }
)

handler.last_trace_id  # check in UI

  self._langfuse_client._resources._media_manager._find_and_process_media(


'5f4f16e1f93d010b2041d2c801b91697'

#  Scores

There are several ways of setting scores, mainly:
* online: Can access to the current trace
* offline: Have to retrieve a trace id to attach a score

The offline version most common in chatbots, because a user can yield a feedback at any moment.
We will have to follow the next diagram:

```
     frontend                   backend                   langfuse
        |--------------hu----------->|                        |
        |                            |                        |
        |                            |--------trace[i]------->|
        |                            |<----trace[i].id--------|
        |<-----ai, trace_id----------|                        |
        |-----feedback, trace_id---->|                        |
        |                            |----score, trace_id---->|
```

'43d237b0c67122d15be075a76cc5cee0'

In [40]:
langfuse.create_score(
    trace_id="5f4f16e1f93d010b2041d2c801b91697",
    name="user-feedback",
    value=1,
    data_type="NUMERIC",
    comment="This was correct, thank you"
)