# Conversational Threads

Many LLM applications have a chatbot-like interface in which the user and the LLM application engage in a multi-turn conversation. In order to track these conversations, you can use the Threads feature in LangSmith.

This is relevant to our RAG application, which should maintain context from prior conversations with users.

### Setup

Make sure you set your environment variables, including your Mistral API key.

In [10]:
# You can set them inline
import os
os.environ["MISTRAL_API_KEY"] = "MISTRAL_API_KEY"
os.environ["LANGSMITH_API_KEY"] = "ANGSMITH_API_KEY"
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "langsmith-academy"  # If you don't set this, traces will go to the Default project

In [11]:
# Or you can use a .env file
from dotenv import load_dotenv
load_dotenv(dotenv_path="../../.env", override=True)

True

### Group traces into threads


A Thread is a sequence of traces representing a single conversation. Each response is represented as its own trace, but these traces are linked together by being part of the same thread.

To associate traces together, you need to pass in a special metadata key where the value is the unique identifier for that thread.

The key value is the unique identifier for that conversation. The key name should be one of:

- session_id
- thread_id
- conversation_id.

The value should be a UUID.

In [12]:
import uuid
thread_id = uuid.uuid4()

In [13]:
from langsmith import traceable
from langchain_mistralai.chat_models import ChatMistralAI
from langchain_core.messages import HumanMessage, SystemMessage
from typing import List
import nest_asyncio
from utils import get_vector_db_retriever

mistral_client = ChatMistralAI(model="mistral-small-latest")
nest_asyncio.apply()
retriever = get_vector_db_retriever()

@traceable(run_type="chain")
def retrieve_documents(question: str):
    return retriever.invoke(question)

@traceable(run_type="chain")
def generate_response(question: str, documents):
    formatted_docs = "\n\n".join(doc.page_content for doc in documents)
    rag_system_prompt = """You are an assistant for question-answering tasks. 
    Use the following pieces of retrieved context to answer the latest question in the conversation. 
    If you don't know the answer, just say that you don't know. 
    Use three sentences maximum and keep the answer concise.
    """
    messages = [
        {
            "role": "system",
            "content": rag_system_prompt
        },
        {
            "role": "user",
            "content": f"Context: {formatted_docs} \n\n Question: {question}"
        }
    ]
    return call_mistral(messages)

@traceable(run_type="llm")
def call_mistral(
    messages: List[dict], model: str = "mistral-small-latest", temperature: float = 0.0
) -> str:
    # Convert dict messages to LangChain message objects
    langchain_messages = []
    for msg in messages:
        if msg["role"] == "system":
            langchain_messages.append(SystemMessage(content=msg["content"]))
        elif msg["role"] == "user":
            langchain_messages.append(HumanMessage(content=msg["content"]))
    
    return mistral_client.invoke(langchain_messages)

@traceable(run_type="chain")
def langsmith_rag(question: str):
    documents = retrieve_documents(question)
    response = generate_response(question, documents)
    return response.content


### Now let's run our application twice with this thread_id

In [14]:
question = "What are the best practices for implementing tracing in production ML systems?"
ai_answer = langsmith_rag(question, langsmith_extra={"metadata": {"thread_id": thread_id}})
print(f"Question 1: {question}")
print(f"Answer: {ai_answer}")
print(f"Thread ID: {thread_id}")
print("-" * 80)

Question 1: What are the best practices for implementing tracing in production ML systems?
Answer: To implement tracing in production ML systems, you can ingest traces via the Python or JavaScript LangSmith SDK, the @traceable wrapper, or by submitting traces via the /runs/multipart endpoint. Key services for trace ingestion include the platform backend service, Redis cache, queue service, and ClickHouse. To scale up the write path, monitor these services and consider increasing resources for ClickHouse, platform-backend pods, queue service replicas, or using a larger Redis cache.
Thread ID: f05f4d93-51e0-4f73-8fbd-5de36f6cd1a5
--------------------------------------------------------------------------------


In [15]:
question = "How can I use metadata and tags to organize my traces for better debugging?"
ai_answer = langsmith_rag(question, langsmith_extra={"metadata": {"thread_id": thread_id}})
print(f"Question 2: {question}")
print(f"Answer: {ai_answer}")
print(f"Thread ID: {thread_id}")
print("-" * 80)

Question 2: How can I use metadata and tags to organize my traces for better debugging?
Answer: You can use tags to categorize and filter traces for easier search and analysis. Metadata allows you to attach key-value pairs to traces, storing additional information like environment details or application versions. By combining these, you can effectively organize and query traces to streamline debugging.
Thread ID: f05f4d93-51e0-4f73-8fbd-5de36f6cd1a5
--------------------------------------------------------------------------------


### Let's take a look in LangSmith!

### Custom Conversational Thread Experiments

Let's create a more realistic conversation flow with multiple turns and different thread scenarios.

In [16]:
# Custom Experiment: Multi-Turn Conversation Simulation
import uuid
import time

def simulate_conversation(questions: List[str], thread_name: str = "default"):
    """Simulate a multi-turn conversation with a unique thread ID"""
    conversation_thread_id = uuid.uuid4()
    print(f"\nStarting conversation thread: {thread_name}")
    print(f"Thread ID: {conversation_thread_id}")
    print("=" * 60)
    
    conversation_history = []
    
    for i, question in enumerate(questions, 1):
        print(f"\nTurn {i}: {question}")
        
        # Add some realistic timing between questions
        if i > 1:
            time.sleep(0.5)
        
        try:
            ai_answer = langsmith_rag(
                question, 
                langsmith_extra={
                    "metadata": {
                        "thread_id": conversation_thread_id,
                        "turn_number": i,
                        "conversation_name": thread_name,
                        "user_id": "rakshit"
                    }
                }
            )
            
            conversation_history.append({
                "turn": i,
                "question": question,
                "answer": ai_answer,
                "timestamp": time.time()
            })
            
            print(f"Response: {ai_answer}")
            print("-" * 40)
            
        except Exception as e:
            print(f"Error in turn {i}: {e}")
            conversation_history.append({
                "turn": i,
                "question": question,
                "error": str(e),
                "timestamp": time.time()
            })
    
    return conversation_thread_id, conversation_history

# Conversation 1: Technical Discussion
tech_questions = [
    "What is the difference between supervised and unsupervised learning?",
    "Can you explain how neural networks learn from data?",
    "What are some common challenges in training deep learning models?"
]

thread1_id, history1 = simulate_conversation(tech_questions, "Technical ML Discussion")


Starting conversation thread: Technical ML Discussion
Thread ID: be8df3e8-94ca-4f6c-a16a-3bd6a13f5db7

Turn 1: What is the difference between supervised and unsupervised learning?
Response: I don't know.
----------------------------------------

Turn 2: Can you explain how neural networks learn from data?
Response: I don't know.
----------------------------------------

Turn 2: Can you explain how neural networks learn from data?
Response: I don't know.
----------------------------------------

Turn 3: What are some common challenges in training deep learning models?
Response: I don't know.
----------------------------------------

Turn 3: What are some common challenges in training deep learning models?
Response: The context provided does not discuss challenges in training deep learning models. I don't know the answer to your question.
----------------------------------------
Response: The context provided does not discuss challenges in training deep learning models. I don't know the

In [17]:
# Conversation 2: Practical Implementation
practical_questions = [
    "How do I choose the right evaluation metrics for my ML model?",
    "What are the key considerations for deploying ML models in production?",
    "How can I monitor model performance after deployment?"
]

thread2_id, history2 = simulate_conversation(practical_questions, "ML Implementation Guide")

# Conversation 3: Debugging and Troubleshooting
debug_questions = [
    "My model is overfitting, what should I do?",
    "How can I improve model performance when I have limited data?"
]

thread3_id, history3 = simulate_conversation(debug_questions, "Debugging Session")

# Summary of all conversations
print("\n" + "="*80)
print("CONVERSATION SUMMARY")
print("="*80)

all_conversations = [
    (thread1_id, history1, "Technical ML Discussion"),
    (thread2_id, history2, "ML Implementation Guide"), 
    (thread3_id, history3, "Debugging Session")
]

for thread_id, history, name in all_conversations:
    print(f"\n{name}")
    print(f"Thread ID: {thread_id}")
    print(f"Total turns: {len(history)}")
    
    if history:
        duration = history[-1]["timestamp"] - history[0]["timestamp"]
        print(f"Duration: {duration:.2f} seconds")
        
        # Count successful vs failed responses
        successful = len([h for h in history if "answer" in h])
        failed = len([h for h in history if "error" in h])
        print(f"Successful responses: {successful}, Failed: {failed}")
        
    print("-" * 50)


Starting conversation thread: ML Implementation Guide
Thread ID: cc095561-7f55-4c03-878e-eabd70e71035

Turn 1: How do I choose the right evaluation metrics for my ML model?
Response: To choose the right evaluation metrics for your ML model, consider the following:

1. **Define Your Goals**: Determine what you want to evaluate. For example, you might want to check correctness, concision, or valid reasoning of the model's outputs.
2. **Use Appropriate Metrics**: For complex evaluations like correctness, use an LLM-as-a-judge approach. For simpler tasks like concision, a simple Python function might suffice.
3. **Align with Human Preferences**: Use few-shot examples to align the LLM-as-a-judge evaluator with human preferences, ensuring the metrics reflect what you value in the model's performance.
----------------------------------------

Turn 2: What are the key considerations for deploying ML models in production?
Response: To choose the right evaluation metrics for your ML model, cons

In [18]:
# Advanced Thread Analytics and Metadata Demo
from datetime import datetime
import json

class ConversationAnalyzer:
    """Custom class to analyze conversation threads with enhanced metadata"""
    
    def __init__(self, user_id: str = "rakshit"):
        self.user_id = user_id
        self.session_start = datetime.now()
        
    def analyze_conversation_quality(self, questions: List[str], thread_name: str):
        """Analyze conversation with quality metrics and detailed metadata"""
        
        thread_id = uuid.uuid4()
        session_metadata = {
            "user_id": self.user_id,
            "session_start": self.session_start.isoformat(),
            "thread_name": thread_name,
            "total_questions": len(questions),
            "analysis_version": "v1.2"
        }
        
        print(f"\nAdvanced Analysis: {thread_name}")
        print(f"Session Metadata: {json.dumps(session_metadata, indent=2)}")
        print("-" * 60)
        
        results = []
        
        for i, question in enumerate(questions, 1):
            question_metadata = {
                **session_metadata,
                "thread_id": str(thread_id),
                "question_number": i,
                "question_length": len(question),
                "question_complexity": "high" if len(question.split()) > 10 else "medium" if len(question.split()) > 5 else "low",
                "timestamp": datetime.now().isoformat()
            }
            
            try:
                start_time = time.time()
                
                response = langsmith_rag(
                    question,
                    langsmith_extra={"metadata": question_metadata}
                )
                
                end_time = time.time()
                response_time = end_time - start_time
                
                result = {
                    "question_num": i,
                    "question": question,
                    "response": response,
                    "response_time": response_time,
                    "response_length": len(response),
                    "success": True,
                    "metadata": question_metadata
                }
                
                print(f"Q{i}: {question}")
                print(f"A{i}: {response[:150]}{'...' if len(response) > 150 else ''}")
                print(f"Response time: {response_time:.2f}s | Length: {len(response)} chars")
                print()
                
            except Exception as e:
                result = {
                    "question_num": i,
                    "question": question,
                    "error": str(e),
                    "success": False,
                    "metadata": question_metadata
                }
                print(f"Q{i}: {question}")
                print(f"Error: {e}")
                print()
            
            results.append(result)
        
        # Generate analytics
        successful_responses = [r for r in results if r["success"]]
        if successful_responses:
            avg_response_time = sum(r["response_time"] for r in successful_responses) / len(successful_responses)
            avg_response_length = sum(r["response_length"] for r in successful_responses) / len(successful_responses)
            
            print(f"Analytics Summary:")
            print(f"Success rate: {len(successful_responses)}/{len(results)} ({len(successful_responses)/len(results)*100:.1f}%)")
            print(f"Average response time: {avg_response_time:.2f}s")
            print(f"Average response length: {avg_response_length:.0f} characters")
        
        return thread_id, results

# Test the advanced analyzer
analyzer = ConversationAnalyzer(user_id="rakshit")

advanced_questions = [
    "Explain the concept of transfer learning and its applications",
    "What are the ethical considerations in AI development?",
    "How do I implement model versioning and experiment tracking?"
]

advanced_thread_id, advanced_results = analyzer.analyze_conversation_quality(
    advanced_questions, 
    "Advanced ML Ethics & Implementation"
)

print(f"\nFinal Thread ID for Advanced Analysis: {advanced_thread_id}")


Advanced Analysis: Advanced ML Ethics & Implementation
Session Metadata: {
  "user_id": "rakshit",
  "session_start": "2025-10-04T17:18:53.931900",
  "thread_name": "Advanced ML Ethics & Implementation",
  "total_questions": 3,
  "analysis_version": "v1.2"
}
------------------------------------------------------------
Q1: Explain the concept of transfer learning and its applications
A1: I don't know.
Response time: 0.43s | Length: 13 chars

Q1: Explain the concept of transfer learning and its applications
A1: I don't know.
Response time: 0.43s | Length: 13 chars

Q2: What are the ethical considerations in AI development?
A2: The context provided does not contain information about the ethical considerations in AI development. I don't know the answer to your question.
Response time: 0.50s | Length: 143 chars

Q2: What are the ethical considerations in AI development?
A2: The context provided does not contain information about the ethical considerations in AI development. I don't know th

## Summary

This notebook was a game-changer for understanding how conversations work in AI systems! I learned that LangSmith's Thread feature is brilliant for tracking multi-turn conversations - it's like having a smart way to connect all the back-and-forth exchanges between users and the AI. The magic happens with simple metadata keys like `thread_id`, `session_id`, or `conversation_id` that link everything together.

**What I Tweaked and Learned:**
- Switched from OpenAI to Mistral AI but kept all the awesome conversation tracking features working smoothly
- Updated the API keys (goodbye OpenAI, hello MISTRAL_API_KEY) and made sure the environment setup was solid
- Built some really cool conversation simulations that feel like real chat sessions with proper timing delays between questions
- Created a fancy ConversationAnalyzer class that tracks quality metrics, response times, and conversation complexity - it's like having analytics for your chatbot!
- Added tons of useful metadata to each conversation turn, including user IDs, timestamps, question complexity, and conversation names
- Experimented with different conversation types - technical discussions, practical implementation guides, and debugging sessions
- Discovered how powerful it is to analyze conversation patterns, success rates, and response quality across multiple threads

The coolest part was realizing how easy it is to track entire conversation flows in LangSmith. Whether someone's asking three quick questions or having a deep technical discussion, everything gets organized automatically. Plus, all the custom experiments showed me how you can build really sophisticated conversation analytics - tracking everything from response times to conversation quality metrics. It's perfect for understanding how well your AI assistant is actually performing in real conversations!