In [None]:
# LLM Observability using langsmith, langfuse and opentelemetry

# Install necessary libraries
!pip install langchain-core langchain-openai langsmith langfuse openinference-instrumentation[openai] opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc opentelemetry-exporter-otlp-proto-http opentelemetry-sdk-extension-aws # Install necessary libraries
!pip install --upgrade openai # Ensure you have a recent openai library

import os

# --- Set up API Keys ---
# Replace with your actual API keys or set them as environment variables
# It's recommended to use environment variables in a real application
# (e.g., via Colab secrets or os.environ['VAR_NAME'] = 'YOUR_KEY')

# For LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com" # Or your custom endpoint
os.environ["LANGCHAIN_API_KEY"] = "YOUR_LANGSMITH_API_KEY"
os.environ["LANGCHAIN_PROJECT"] = "colab-llm-observability-demo-langsmith" # Replace with your project name

# For LangFuse
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com" # Or your custom endpoint
os.environ["LANGFUSE_PUBLIC_KEY"] = "YOUR_LANGFUSE_PUBLIC_KEY"
os.environ["LANGFUSE_SECRET_KEY"] = "YOUR_LANGFUSE_SECRET_KEY"
# os.environ["LANGFUSE_DEBUG"] = "true" # Uncomment for verbose logging

# For OpenAI
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

# --- Imports ---
from langchain_openai import ChatOpenAI
from langsmith import traceable
from langfuse import Langfuse
from openai import OpenAI
from openinference.instrumentation.openai import OpenAIInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

# --- Initialize the OpenAI LLM ---
# We'll use ChatOpenAI from langchain for LangSmith/LangFuse examples as they integrate well.
# We'll use the raw openai client for the OpenTelemetry example to show general instrumentation.
llm_langchain = ChatOpenAI(model="gpt-3.5-turbo")
llm_openai = OpenAI()


In [None]:
# --- Example 1: LangSmith ---

# LangSmith automatically traces calls when LANGCHAIN_TRACING_V2 is true
# and LANGCHAIN_ENDPOINT/LANGCHAIN_API_KEY are set.
# The traceable decorator is optional but can add structure and metadata.
@traceable
def call_openai_langsmith(prompt: str):
  """Calls OpenAI using LangChain and is traced by LangSmith."""
  response = llm_langchain.invoke(prompt)
  return response

print("--- Calling OpenAI via LangSmith ---")
langsmith_response = call_openai_langsmith("Write a short poem about a cat.")
print(langsmith_response.content)
print("\nCheck your LangSmith project for traces.")


# --- Example 2: LangFuse ---

# Initialize LangFuse client
langfuse = Langfuse()

def call_openai_langfuse(prompt: str):
  """Calls OpenAI using LangChain and is traced by LangFuse."""
  # LangFuse integration with LangChain typically happens via callbacks
  # when the environment variables are set.
  # You can manually create spans if needed for more granular control.
  trace = langfuse.trace(
      name="openai-call-langfuse",
      input=prompt,
  )
  # Use the trace context if needed, but for simple LLM calls,
  # the environment variables handle basic tracing with LangChain.
  response = llm_langchain.invoke(prompt)
  trace.update(output=response.content)
  trace.end()
  return response

print("\n--- Calling OpenAI via LangFuse ---")
langfuse_response = call_openai_langfuse("Write a short story about a dog.")
print(langfuse_response.content)
print("\nCheck your LangFuse project for traces.")

# Flush Langfuse data to ensure it's sent
langfuse.flush()


# --- Example 3: OpenTelemetry (OpenInference) ---

# Configure OpenTelemetry TracerProvider
# For a real application, you would likely export traces to an OTLP collector
# (e.g., Jaeger, Honeycomb, Datadog) instead of the console.
# Install required exporter: !pip install opentelemetry-exporter-otlp-proto-grpc or opentelemetry-exporter-otlp-proto-http

# Example using a simple ConsoleSpanExporter for demonstration
provider = TracerProvider(
    resource=Resource.create({"service.name": "colab-openai-demo"})
)
# To export to a collector via gRPC:
# from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# processor = BatchSpanProcessor(OTLPSpanExporter())

# Example using ConsoleSpanExporter
processor = BatchSpanProcessor(ConsoleSpanExporter())

provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

# Instrument the OpenAI client
# This will automatically create spans for OpenAI API calls.
OpenAIInstrumentor().instrument()

# Use the raw OpenAI client
print("\n--- Calling OpenAI via OpenTelemetry Instrumented Client ---")
otel_response = llm_openai.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=[{"role": "user", "content": "Write a short haiku about a bird."}]
)
print(otel_response.choices[0].message.content)
print("\nCheck the console output or your OpenTelemetry collector for spans.")

# Force export of remaining spans (useful for ConsoleExporter or interactive sessions)
provider.force_flush()


In [None]:
# --- Extensive Examples ---

print("\n--- Extensive Examples ---")

# Example with LangSmith and a more complex prompt
print("\n--- LangSmith Example with Complex Prompt ---")
@traceable
def analyze_sentiment_langsmith(text: str):
    """Analyzes sentiment of the given text using LangChain and is traced by LangSmith."""
    prompt = f"Analyze the sentiment of the following text: '{text}'. Is it positive, negative, or neutral? Explain briefly."
    response = llm_langchain.invoke(prompt)
    return response

complex_text_langsmith = "I loved the movie! The acting was superb, but the ending was a bit disappointing."
langsmith_complex_response = analyze_sentiment_langsmith(complex_text_langsmith)
print(f"Analyzing text: '{complex_text_langsmith}'")
print(f"Sentiment Analysis Result: {langsmith_complex_response.content}")
print("Check your LangSmith project for the trace of this call.")


# Example with LangFuse and a chained call or sequence
print("\n--- LangFuse Example with Chained Calls ---")

def generate_and_summarize_langfuse(topic: str):
    """Generates text about a topic and then summarizes it, traced by LangFuse."""
    # Manually create a parent span for the entire operation
    parent_trace = langfuse.trace(
        name="generate-and-summarize",
        input=topic,
    )

    # First call: generate text
    with parent_trace.span(name="generate-text") as generate_span:
        generate_prompt = f"Write a paragraph about {topic}."
        generate_span.update(input=generate_prompt)
        generated_text_response = llm_langchain.invoke(generate_prompt)
        generated_text = generated_text_response.content
        generate_span.update(output=generated_text)

    print(f"Generated text about {topic}:\n{generated_text}\n")

    # Second call: summarize text
    with parent_trace.span(name="summarize-text") as summarize_span:
        summarize_prompt = f"Summarize the following text:\n{generated_text}"
        summarize_span.update(input=summarize_prompt)
        summarized_text_response = llm_langchain.invoke(summarize_prompt)
        summarized_text = summarized_text_response.content
        summarize_span.update(output=summarized_text)

    print(f"Summarized text:\n{summarized_text}\n")

    parent_trace.update(output=summarized_text)
    parent_trace.end() # End the parent trace

    return summarized_text

topic_langfuse = "the benefits of machine learning"
langfuse_chained_response = generate_and_summarize_langfuse(topic_langfuse)
print(f"Result of Generate and Summarize for '{topic_langfuse}':\n{langfuse_chained_response}")
print("Check your LangFuse project for traces, including the parent trace and its nested spans.")

# Flush Langfuse data again after the chained example
langfuse.flush()


# Example with OpenTelemetry and a more complex interaction
print("\n--- OpenTelemetry Example with Multiple Calls ---")

# The OpenAIInstrumentor will capture all chat.completions.create calls
# on the instrumented client (llm_openai).
# You can manually create custom spans around groups of calls if needed,
# but the instrumentor handles the individual API calls.

def process_query_otel(query: str):
    """Processes a query using multiple OpenAI calls and is traced by OpenTelemetry."""
    # Manual span to group the entire process
    with trace.get_tracer(__name__).start_as_current_span("process-query") as span:
        span.set_attribute("query.input", query)

        # Call 1: Classify the query
        classify_prompt = f"Classify the following query: '{query}'. Is it about programming, cooking, or history?"
        span.add_event("calling_openai_classify", attributes={"prompt": classify_prompt})
        classification_response = llm_openai.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": classify_prompt}]
        )
        classification = classification_response.choices[0].message.content
        span.set_attribute("query.classification", classification)
        span.add_event("received_classification", attributes={"classification": classification})

        print(f"Query Classification: {classification}")

        # Call 2: Generate a response based on classification
        if "programming" in classification.lower():
            response_prompt = f"Provide a brief programming tip related to: {query}"
        elif "cooking" in classification.lower():
            response_prompt = f"Give a quick cooking tip related to: {query}"
        elif "history" in classification.lower():
            response_prompt = f"Mention a historical fact related to: {query}"
        else:
            response_prompt = f"Provide a general response to the query: {query}"

        span.add_event("calling_openai_response", attributes={"prompt": response_prompt})
        response_response = llm_openai.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": response_prompt}]
        )
        final_response = response_response.choices[0].message.content
        span.set_attribute("query.output", final_response)
        span.add_event("received_response", attributes={"response": final_response})

        print(f"Generated Response:\n{final_response}\n")

    return final_response

query_otel = "How do I sort a list in Python?"
print(f"Processing query via OpenTelemetry: '{query_otel}'")
otel_process_response = process_query_otel(query_otel)
print(f"Final processed response: {otel_process_response}")
print("Check the console output or your OpenTelemetry collector for spans, including the 'process-query' span and nested OpenAI call spans.")

# Force export again after the OpenTelemetry example
provider.force_flush()

print("--- Examples Complete ---")
print("Remember to replace placeholder API keys and project names.")
print("Refer to the documentation for LangSmith, LangFuse, and OpenTelemetry for advanced configurations and export options.")


In [None]:
# 1. adding another LLM call, 2. integrating a different tool, 3. visualizing data, 4. handling errors

# Add these imports at the beginning of your code
import pandas as pd
import matplotlib.pyplot as plt
import json # To potentially handle tool output if it's JSON
from langchain.agents import tool, initialize_agent, AgentType
from langchain.pydantic_v1 import BaseModel, Field
from typing import Optional

# --- 1. Adding another LLM Call ---
# You already have multiple LLM calls shown in the LangFuse and OpenTelemetry examples.
# Let's add another simple one demonstrating a different task, still using LangChain for ease of tracing integration.

@traceable
def translate_text_langsmith(text: str, target_language: str):
    """Translates the given text to the target language using LangChain and is traced by LangSmith."""
    prompt = f"Translate the following text to {target_language}:\n'{text}'"
    response = llm_langchain.invoke(prompt)
    return response

print("\n--- Adding Another LLM Call: Translation ---")
text_to_translate = "Hello, how are you?"
target_lang = "French"
translation_response = translate_text_langsmith(text_to_translate, target_lang)
print(f"Original text: '{text_to_translate}'")
print(f"Translated to {target_lang}: {translation_response.content}")
print("Check your LangSmith project for the translation trace.")


# --- 2. Integrating a Different Tool ---
# We can use LangChain's agent capabilities to integrate a tool.
# Let's create a simple tool that looks up a definition (as an example).

class DefinitionSearchInput(BaseModel):
    query: str = Field(description="The term to search the definition for")

@tool("definition-search", args_schema=DefinitionSearchInput)
def definition_search_tool(query: str) -> str:
    """Searches for the definition of a given term."""
    # This is a mock tool. In a real scenario, you'd integrate with a dictionary API or database.
    definitions = {
        "python": "An interpreted, object-oriented, high-level programming language with dynamic semantics.",
        "colaboratory": "A Google product that allows anyone to write and execute arbitrary python code through the browser.",
        "llm": "Large Language Model: A type of artificial intelligence algorithm that uses deep learning techniques and massively large data sets to understand, summarize, generate and predict new content.",
        "observability": "The ability to understand the internal state of a system based on external data.",
    }
    definition = definitions.get(query.lower(), f"Definition for '{query}' not found in our mock database.")
    return definition

# Initialize an agent that can use the tool and the LLM
# The agent's actions and thought process will be traced by LangSmith/LangFuse
# if the environment variables are set.
agent = initialize_agent(
    [definition_search_tool],
    llm_langchain,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True, # Set to True to see the agent's thought process
    handle_parsing_errors=True # Basic error handling for parsing agent output
)

print("\n--- Integrating a Tool (Definition Search) ---")
try:
    tool_query = "What is a LLM?"
    print(f"Using agent to answer: '{tool_query}'")
    tool_response = agent.run(tool_query)
    print(f"Agent's response: {tool_response}")
    print("Check your LangSmith/LangFuse project for the agent trace, including tool calls.")

    tool_query_unknown = "Define 'quux'."
    print(f"\nUsing agent to answer: '{tool_query_unknown}' (unknown term)")
    tool_response_unknown = agent.run(tool_query_unknown)
    print(f"Agent's response: {tool_response_unknown}")
    print("Check your LangSmith/LangFuse project for the trace.")

except Exception as e:
    print(f"An error occurred during agent execution: {e}")
    # Tracing systems might capture this error, depending on configuration.


# --- 3. Visualizing Data ---
# Let's create some dummy data that we might get from processing LLM outputs or other sources
# and visualize it using matplotlib.

print("\n--- Visualizing Data ---")

# Example: Sentiment distribution from hypothetical LLM calls
sentiment_data = {'Positive': 15, 'Negative': 7, 'Neutral': 10}
sentiments = list(sentiment_data.keys())
counts = list(sentiment_data.values())

plt.figure(figsize=(6, 4))
plt.bar(sentiments, counts, color=['green', 'red', 'grey'])
plt.xlabel("Sentiment")
plt.ylabel("Count")
plt.title("Distribution of Sentiments (Example Data)")
plt.show()

# Example: Hypothetical latency of different LLM calls (milliseconds)
latency_data = {'Call 1': 350, 'Call 2': 420, 'Call 3': 380, 'Call 4': 510, 'Call 5': 450}
calls = list(latency_data.keys())
latencies = list(latency_data.values())

plt.figure(figsize=(8, 4))
plt.plot(calls, latencies, marker='o', linestyle='-')
plt.xlabel("LLM Call")
plt.ylabel("Latency (ms)")
plt.title("Hypothetical LLM Call Latency Over Time/Calls")
plt.grid(True)
plt.show()

print("Visualization plots displayed above.")


# --- 4. Handling Errors ---
# Errors can occur at various stages: API calls, tool execution, parsing outputs.
# Implement basic try-except blocks. Observability tools help you see *where* errors happen.

print("\n--- Handling Errors ---")

def safe_llm_call(prompt: str, trace_name: str):
    """Safely calls the LLM and handles potential exceptions."""
    try:
        # Use LangFuse tracing manually around the potentially failing call
        trace = langfuse.trace(
            name=trace_name,
            input=prompt,
        )
        print(f"Attempting LLM call with prompt: '{prompt[:50]}...'")
        response = llm_langchain.invoke(prompt)
        trace.update(output=response.content)
        trace.end(status="SUCCESS")
        print("LLM call successful.")
        return response.content
    except Exception as e:
        print(f"An error occurred during the LLM call: {e}")
        # Log the error to LangFuse/LangSmith/OpenTelemetry
        # LangFuse example: Add error details to the trace
        if 'trace' in locals() and trace:
             trace.end(status="ERROR", status_message=str(e))
             print("Error logged to LangFuse trace.")
        # OpenTelemetry example: In a manual span, you'd add the exception as an event or status
        # In the instrumented client case, the instrumentor might capture it.
        return f"Error: {e}"

# Example of a potentially problematic call (e.g., extremely long prompt, invalid parameters if not using LangChain helper)
# We'll simulate an error by passing a very large string that might exceed context limits or cause issues.
# Note: This specific prompt might not *always* fail depending on the model/API,
# but it serves as an example structure for error handling.

long_prompt = "Explain the entire history of the universe in one paragraph. " * 1000 # Very long prompt

error_prone_response = safe_llm_call(long_prompt, "potential-error-call")
print(f"Result of error-prone call: {error_prone_response[:100]}...") # Print only a snippet

# Example of handling errors from tool execution (already partially covered by agent's handle_parsing_errors)
# If you called the tool directly:
try:
    print("\nAttempting a tool call that might fail...")
    # Simulate a tool call that raises an error
    # For demonstration, let's just raise an exception here
    # definition_search_tool("this should cause an error") # This tool won't error, adding a manual raise
    raise ValueError("Simulated error in tool execution")

except Exception as e:
    print(f"Caught an error during tool execution: {e}")
    # Log this error using your chosen observability tool

# Flush Langfuse data one last time
langfuse.flush()

print("\n--- Error Handling Demonstration Complete ---")
print("Check your observability dashboards (LangSmith, LangFuse, OpenTelemetry collector) to see how these errors were captured.")
