In [1]:
import os
import openai
from trulens.core import Feedback, Select, TruSession
from trulens.providers.openai import OpenAI as TruOpenAI
import numpy as np
import pandas as pd
from typing import List, Dict, Any
import warnings
warnings.filterwarnings('ignore')


In [2]:
try:
    from trulens.apps.app import TruApp, instrument
    print("✅ Successfully imported TruApp and instrument!")
    TruCustomApp = TruApp
except ImportError as e:
    print(f"⚠️ TruApp import failed: {e}")
    try:
        from trulens.apps.basic import TruBasicApp
        from trulens.apps.app import instrument
        print("✅ Using TruBasicApp as fallback")
        TruCustomApp = TruBasicApp
    except ImportError as e2:
        print(f"⚠️ Both TruApp and TruBasicApp failed: {e2}")
        print("Using manual approach")
        TruCustomApp = None
        def instrument(func):
            return func



✅ Successfully imported TruApp and instrument!


In [3]:
tru = TruSession()

🦑 Initialized with db url sqlite:///default.sqlite .
🛑 Secret keys may be written to the database. See the `database_redact_keys` option of `TruSession` to prevent this.


In [4]:
if hasattr(openai, 'api_key'):
    openai.api_key = os.getenv("OPENAI_API_KEY")


In [5]:
knowledge_base = [
    {
        "id": 1,
        "content": "Machine learning is a subset of artificial intelligence that enables computers to learn and improve from experience without being explicitly programmed.",
        "topic": "machine_learning"
    },
    {
        "id": 2,
        "content": "Deep learning uses artificial neural networks with multiple layers to model and understand complex patterns in data.",
        "topic": "deep_learning"
    },
    {
        "id": 3,
        "content": "Natural language processing (NLP) is a branch of AI that helps computers understand, interpret and manipulate human language.",
        "topic": "nlp"
    },
    {
        "id": 4,
        "content": "Computer vision enables machines to interpret and make decisions based on visual information from the world.",
        "topic": "computer_vision"
    },
    {
        "id": 5,
        "content": "Reinforcement learning is a type of machine learning where an agent learns to make decisions by taking actions in an environment to maximize reward.",
        "topic": "reinforcement_learning"
    }
]


class SimpleRAGApp:
    
    def __init__(self, knowledge_base: List[Dict]):
        self.knowledge_base = knowledge_base
        
    @instrument
    def retrieve_context(self, query: str, top_k: int = 2) -> List[str]:
        query_lower = query.lower()
        scored_docs = []
        
        for doc in self.knowledge_base:
            content_words = set(doc['content'].lower().split())
            query_words = set(query_lower.split())
            overlap = len(content_words.intersection(query_words))
            
            if overlap > 0:
                scored_docs.append((doc['content'], overlap))
        
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        return [doc[0] for doc in scored_docs[:top_k]]
    
    @instrument
    def generate_answer(self, query: str, context: List[str]) -> str:
        context_text = "\n\n".join(context)
        
        prompt = f"""Based on the following context, please answer the question.
        
Context:
{context_text}

Question: {query}

Answer: """
        
        try:
            response = openai.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": "You are a helpful AI assistant. Answer questions based on the provided context."},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=200,
                temperature=0.1
            )
            return response.choices[0].message.content.strip()
        except Exception as e:
            return f"Error generating response: {str(e)}"
    
    @instrument
    def query(self, question: str) -> str:
        context = self.retrieve_context(question)
        
        answer = self.generate_answer(question, context)
        
        return answer



In [6]:
rag_app = SimpleRAGApp(knowledge_base)


In [7]:
test_question = "What is machine learning?"
result = rag_app.query(test_question)

print(f"Question: {test_question}")
print(f"Generated Answer: {result}")

Question: What is machine learning?
Generated Answer: Machine learning is a subset of artificial intelligence that enables computers to learn and improve from experience without being explicitly programmed.


In [8]:
provider = TruOpenAI()

f_groundedness = (
    Feedback(
        provider.groundedness_measure_with_cot_reasons, 
        name="Groundedness"
    )
    .on(Select.RecordCalls.retrieve_context.rets.collect()) #context
    .on_output() # actual output
)

f_answer_relevance = (
    Feedback(
        provider.relevance_with_cot_reasons, 
        name="Answer Relevance"
    )
    .on_input() #question
    .on_output() #AI response
)

f_context_relevance = (
    Feedback(
        provider.context_relevance_with_cot_reasons, 
        name="Context Relevance"
    )
    .on_input() #question
    .on(Select.RecordCalls.retrieve_context.rets.collect()) #context
)


✅ In Groundedness, input source will be set to __record__.app.retrieve_context.rets.collect() .
✅ In Groundedness, input statement will be set to __record__.main_output or `Select.RecordOutput` .
✅ In Answer Relevance, input prompt will be set to __record__.main_input or `Select.RecordInput` .
✅ In Answer Relevance, input response will be set to __record__.main_output or `Select.RecordOutput` .
✅ In Context Relevance, input question will be set to __record__.main_input or `Select.RecordInput` .
✅ In Context Relevance, input context will be set to __record__.app.retrieve_context.rets.collect() .


In [9]:
def completeness_feedback(input_text: str, output_text: str) -> float:
    """
    measures if answer addressses question
    """
    question_words = set(input_text.lower().split())
    answer_words = set(output_text.lower().split())
    
    # Remove common stop words
    stop_words = {'what', 'is', 'the', 'how', 'why', 'when', 'where', 'a', 'an', 'and', 'or', 'but'}
    question_words = question_words - stop_words
    
    if not question_words:
        return 0.5
    
    overlap = len(question_words.intersection(answer_words))
    return min(1.0, overlap / len(question_words))

f_completeness = Feedback(completeness_feedback, name="Completeness").on_input().on_output()


Feedback implementation <function completeness_feedback at 0x172d0db20> cannot be serialized: Module __main__ is not importable. This may be ok unless you are using the deferred feedback mode.


✅ In Completeness, input input_text will be set to __record__.main_input or `Select.RecordInput` .
✅ In Completeness, input output_text will be set to __record__.main_output or `Select.RecordOutput` .


In [None]:
if TruCustomApp is not None:
    try:
        tru_rag = TruCustomApp(
            rag_app,
            app_name='SimpleRAG_Demo', 
            app_version='2025_v1',
            feedbacks=[
                f_groundedness,
                f_answer_relevance,
                f_context_relevance,
                f_completeness
            ]
        )
        print("🔗 TruApp wrapper created successfully!")
        print(f"   App name: SimpleRAG_Demo")
        print(f"   App version: 2025_v1")
        print(f"   Feedbacks: {len([f_groundedness, f_answer_relevance, f_context_relevance, f_completeness])}")
        
    except Exception as e:
        print(f"⚠️ TruApp creation failed: {e}")
        print("Falling back to manual approach")
        TruCustomApp = None

if TruCustomApp is None:
    print("⚠️ Using manual instrumentation approach")
    
    class SimpleRAGWrapper:
        def __init__(self, app):
            self.app = app
            self.app_name = 'SimpleRAG_Demo'
            self.app_version = '2025_v1'
        
        def query(self, question: str):
            return self.app.query(question)
    
    tru_rag = SimpleRAGWrapper(rag_app)
    print("🔗 Simple wrapper created!")

print("📊 Ready to run evaluations!")


instrumenting <class '__main__.SimpleRAGApp'> for base <class '__main__.SimpleRAGApp'>
	instrumenting retrieve_context
	instrumenting generate_answer
	instrumenting query
🔗 TruApp wrapper created successfully!
   App name: SimpleRAG_Demo
   App version: 2025_v1
   Feedbacks: 4
📊 Ready to run evaluations!


In [11]:
test_questions = [
    "What is machine learning?",
    "How does deep learning work?", 
    "What is the difference between machine learning and deep learning?",
    "Can you explain natural language processing?",
    "What are the applications of computer vision?",
    "How does reinforcement learning work?",
    "What is artificial intelligence?",
]
results = []


In [None]:
if hasattr(tru_rag, 'app_name') and hasattr(tru_rag, 'feedbacks'):
    print("🚀 Using full TruApp integration!")
    
    for question in test_questions:
        print(f"\n📝 Processing: {question}")
        
        # Automatically trigger TruLens evaluation
        try:
            with tru_rag as recording:
                if hasattr(tru_rag, 'app'):
                    answer = tru_rag.app.query(question)
                else:
                    # For TruBasicApp, the app itself is callable
                    answer = tru_rag(question)
                    
        except Exception as e:
            print(f"⚠️ TruApp context failed: {e}")
            # Fallback to direct query
            try:
                if hasattr(tru_rag, 'app'):
                    answer = tru_rag.app.query(question)
                else:
                    answer = tru_rag(question)
            except:
                answer = rag_app.query(question)  # Ultimate fallback
            
        print(f"✅ Answer: {answer[:100]}...")
        results.append({
            'question': question,
            'answer': answer
        })
        
else:

    print("Manual")
    

🚀 Using full TruApp integration!

📝 Processing: What is machine learning?
✅ Answer: Machine learning is a subset of artificial intelligence that enables computers to learn and improve ...

📝 Processing: How does deep learning work?
✅ Answer: Deep learning works by using artificial neural networks with multiple layers to model and understand...

📝 Processing: What is the difference between machine learning and deep learning?
✅ Answer: The main difference between machine learning and deep learning is that deep learning is a subset of ...

📝 Processing: Can you explain natural language processing?
✅ Answer: Natural language processing (NLP) is a branch of artificial intelligence (AI) that focuses on enabli...

📝 Processing: What are the applications of computer vision?
✅ Answer: Some applications of computer vision include facial recognition, object detection, image classificat...

📝 Processing: How does reinforcement learning work?
✅ Answer: Reinforcement learning works by having an agen

In [16]:
try:
    # Try multiple API parameter variations to find the right one
    try:
        # Try without parameters first (gets all records)
        records, feedback_results = tru.get_records_and_feedback()
        print(f"✅ Retrieved {len(records)} records from TruLens database (all apps)")
    except:
        # Try with app_id parameter (most common)
        records, feedback_results = tru.get_records_and_feedback(app_ids=['SimpleRAG_Demo'])
        print(f"✅ Retrieved {len(records)} records using app_ids parameter")
    
    has_trulens_records = True
    
    # *** EXPLICIT CHANGE: Fixed filtering logic to find correct app records ***
    # Filter records if we got more than expected
    if hasattr(tru_rag, 'app_name') and len(records) > 7:
        print(f"Found {len(records)} total records, looking for SimpleRAG_Demo records...")
        
        # Check what app names/IDs are actually in the database
        if 'app_name' in records.columns:
            unique_apps = records['app_name'].unique()
            print(f"Apps in database by app_name: {unique_apps}")
            # Filter by app_name column
            records = records[records['app_name'] == 'SimpleRAG_Demo']
            
        elif 'app_json' in records.columns:
            print("Checking app_json column for app identifiers...")
            # Look at a few app_json entries to see the structure
            sample_apps = records['app_json'].head(3)
            for i, app_json in enumerate(sample_apps):
                print(f"Sample app_json {i+1}: {str(app_json)[:200]}...")
            
            # Try broader filtering - look for any of our possible names
            possible_names = ['SimpleRAG_Demo', 'SimpleRAG', 'RAG']
            mask = pd.Series([False] * len(records))
            
            for name in possible_names:
                mask = mask | records['app_json'].str.contains(name, na=False)
            
            records = records[mask]
            
        elif 'record_id' in records.columns:
            # If no app filtering available, just take the most recent 7 records
            print("No app name column found, taking most recent 7 records")
            records = records.tail(7)
        
        print(f"After filtering: {len(records)} records found")
        
        # If filtering resulted in 0 records, don't filter - use all records
        if len(records) == 0:
            print("Filtering resulted in 0 records, using all records instead")
            records, _ = tru.get_records_and_feedback()  # Get all records again
            print(f"EXPLICIT DEBUG: After fallback, got {len(records)} records")
            print(f"EXPLICIT DEBUG: has_trulens_records = {has_trulens_records}")
            print(f"EXPLICIT DEBUG: records.empty = {records.empty}")
    
    # EXPLICIT DEBUG: Check final state before proceeding to results
    print(f"EXPLICIT DEBUG FINAL: About to process {len(records)} records")
    print(f"EXPLICIT DEBUG FINAL: has_trulens_records = {has_trulens_records}")
    print(f"EXPLICIT DEBUG FINAL: records.empty = {records.empty}")
    
except Exception as e:
    print(f"⚠️ Could not get TruLens records: {e}")
    print("Let's check what methods are available on the TruSession object")
    available_methods = [method for method in dir(tru) if 'record' in method.lower()]
    print(f"Available record methods: {available_methods}")
    
    records = pd.DataFrame()  # Empty dataframe
    feedback_results = pd.DataFrame()  
    has_trulens_records = False

print("📈 Evaluation Results Summary:")
print("=" * 50)

if has_trulens_records and not records.empty:
    # Display TruLens results
    print(f"Total TruLens evaluations: {len(records)}")
    
    # Look for actual feedback score columns (not just cost columns)
    feedback_cols = []
    cost_cols = []
    
    for col in records.columns:
        if 'feedback' in col.lower():
            if 'cost' in col.lower():
                cost_cols.append(col)
            else:
                feedback_cols.append(col)
    
    print(f"Feedback score columns found: {feedback_cols}")
    print(f"Cost columns found: {cost_cols}")
    
    # Check if we have actual scores
    if feedback_cols:
        print("\n📊 Feedback Scores:")
        for col in feedback_cols:
            if col in records.columns and records[col].notna().any():
                avg_score = records[col].mean()
                print(f"{col}: {avg_score:.3f}")
    else:
        print("\n⚠️ No feedback scores found, only cost information.")
        print("This suggests the feedback functions didn't complete evaluation.")
        
    # Check what's actually in the records
    print(f"\n🔍 Record columns: {list(records.columns)}")
    
    # *** EXPLICIT CHANGE: Added manual feedback evaluation check ***
    # Let's also try running one feedback function manually to test OpenAI connectivity
    if not feedback_cols:
        print("\n6. Testing OpenAI connectivity with manual feedback evaluation...")
        try:
            # Get a sample question and answer from our results
            test_question = results[0]['question'] if results else "What is machine learning?"
            test_answer = results[0]['answer'] if results else "Machine learning is AI."
            
            # Try running our completeness feedback function
            manual_score = completeness_feedback(test_question, test_answer)
            print(f"   Manual completeness score: {manual_score:.3f} (This works)")
            
            # Try running an OpenAI-based feedback function manually
            if 'provider' in locals():
                print("   Testing OpenAI-based feedback...")
                try:
                    # Simple relevance test
                    relevance_score = provider.relevance(test_question, test_answer)
                    print(f"   Manual relevance score: {relevance_score:.3f} (OpenAI working)")
                except Exception as openai_error:
                    print(f"   OpenAI feedback failed: {openai_error}")
                    print("   This suggests an issue with the OpenAI provider setup")
            
        except Exception as manual_error:
            print(f"   Manual evaluation failed: {manual_error}")

    # Show a sample record structure
    if len(records) > 0:
        print(f"\n📋 Sample record structure:")
        sample_record = records.iloc[0]
        for col in sample_record.index:
            if pd.notna(sample_record[col]) and col not in ['app_json', 'record_json']:
                val = str(sample_record[col])[:100] + "..." if len(str(sample_record[col])) > 100 else sample_record[col]
                print(f"  {col}: {val}")
    
    # *** EXPLICIT CHANGE: Added detailed debugging section ***
    # Check if feedback functions are running
    print(f"\n🔬 Diagnosis:")
    if not feedback_cols:
        print("- Issue: Feedback functions aren't producing scores")
        print("- Status: Functions are being invoked (costs tracked) but not completing")
        
        # Check if evaluations are pending/deferred
        print("\nDebugging steps:")
        print("1. Checking if evaluations are in deferred mode...")
        
        # Check the TruSession for any pending evaluations
        try:
            # Try to run any deferred feedback evaluations
            print("2. Attempting to run deferred evaluations...")
            tru.start_evaluator()  # This starts the evaluation process
            import time
            time.sleep(5)  # Wait a bit for evaluations to complete
            
            # Try to get records again
            records_retry, feedback_retry = tru.get_records_and_feedback()
            
            # Check for score columns again
            feedback_cols_retry = [col for col in records_retry.columns if 'feedback' in col.lower() and 'cost' not in col.lower()]
            
            if feedback_cols_retry:
                print("✅ Deferred evaluations completed!")
                records = records_retry  # Update records
                feedback_cols = feedback_cols_retry  # Update feedback columns
            else:
                print("❌ Deferred evaluations still not producing scores")
                
        except Exception as eval_error:
            print(f"3. Evaluation attempt failed: {eval_error}")
            
        # Additional diagnostic: Check feedback mode
        if hasattr(tru_rag, 'feedback_mode'):
            print(f"4. Feedback mode: {tru_rag.feedback_mode}")
        
        print("5. Possible causes:")
        print("   - OpenAI API rate limits or permissions")
        print("   - Network connectivity issues")
        print("   - Feedback functions timing out")
        print("   - Evaluation running in background (deferred mode)")
        
    else:
        print("- Status: Feedback functions are working correctly!")
        
    print("\n📊 Detailed Results:")
    print("-" * 30)
    
    # Show results for each question (limit to first 3 for readability)
    for idx, row in records.head(3).iterrows():
        print(f"\nQuestion {idx + 1}:")
        if 'input' in row:
            print(f"  Input: {str(row['input'])[:80]}...")
        if 'output' in row:
            print(f"  Output: {str(row['output'])[:80]}...")
        
        # Show actual feedback scores if they exist
        for col in feedback_cols:
            if col in row and pd.notna(row[col]):
                print(f"  {col}: {row[col]:.3f}")
        
        # If no scores, show what we have
        if not feedback_cols:
            print("  Status: Feedback scores not available (see diagnosis above)")
            
else:
    # Show basic results without full TruLens integration  
    print("📊 Basic Evaluation Results:")
    print(f"Total questions processed: {len(results)}")
    print("\nSample questions and answers:")
    
    for i, result in enumerate(results[:3]):  # Show first 3
        print(f"\n{i+1}. Question: {result['question']}")
        print(f"   Answer: {result['answer'][:100]}...")
        
        # If we ran manual evaluations, show those results
        if 'feedback_results' in locals():
            print(f"   Manual evaluation available")
    
    print(f"\n💡 To get full TruLens integration:")
    print("1. Ensure your OpenAI API key is set correctly")
    print("2. Check that all TruLens packages are installed") 
    print("3. Try running the notebook again")
    
    # Create a simple summary
    print(f"\n📋 Summary:")
    print(f"  Questions processed: {len(results)}")
    print(f"  All questions received responses: {'✅' if all('answer' in r for r in results) else '❌'}")
    print(f"  Average answer length: {np.mean([len(r['answer']) for r in results]):.0f} characters")

## Advanced Analysis Examples

def analyze_performance_trends():
    """Analyze performance trends across different question types"""
    if records.empty:
        print("No data available for trend analysis")
        return
    
    print("\n🔍 Performance Analysis:")
    print("=" * 40)
    
    # Group by question type (simple keyword analysis)
    question_types = []
    for question in test_questions[:len(records)]:  # Match length with records
        if 'what is' in question.lower():
            question_types.append('Definition')
        elif 'how does' in question.lower():
            question_types.append('Explanation')
        elif 'difference' in question.lower():
            question_types.append('Comparison')
        else:
            question_types.append('Other')
    
    # Add question types to results
    if len(question_types) == len(records):
        records['question_type'] = question_types
        
        # Analyze by question type
        feedback_metrics = [col for col in records.columns if 'feedback' in col.lower()]
        
        for qtype in set(question_types):
            subset = records[records['question_type'] == qtype]
            if not subset.empty:
                print(f"\n{qtype} Questions (n={len(subset)}):")
                
                for metric in feedback_metrics:
                    if metric in subset.columns and subset[metric].notna().any():
                        avg_score = subset[metric].mean()
                        print(f"  {metric}: {avg_score:.3f}")

analyze_performance_trends()


✅ Retrieved 14 records from TruLens database (all apps)
Found 14 total records, looking for SimpleRAG_Demo records...
Apps in database by app_name: ['SimpleRAG_Demo']
After filtering: 14 records found
EXPLICIT DEBUG FINAL: About to process 14 records
EXPLICIT DEBUG FINAL: has_trulens_records = True
EXPLICIT DEBUG FINAL: records.empty = False
📈 Evaluation Results Summary:
Total TruLens evaluations: 14
Feedback score columns found: []
Cost columns found: ['Groundedness feedback cost in USD', 'Answer Relevance feedback cost in USD', 'Context Relevance feedback cost in USD', 'Completeness feedback cost in USD']

⚠️ No feedback scores found, only cost information.
This suggests the feedback functions didn't complete evaluation.

🔍 Record columns: ['app_name', 'app_version', 'app_id', 'app_json', 'type', 'record_id', 'input', 'output', 'tags', 'record_json', 'cost_json', 'perf_json', 'ts', 'Groundedness', 'Answer Relevance', 'Context Relevance', 'Completeness', 'Groundedness_calls', 'Answer 

In [37]:
def analyze_performance_trends():
    """Analyze performance trends across different question types"""
    if records.empty:
        print("No data available for trend analysis")
        return
    
    print("\n🔍 Performance Analysis:")
    print("=" * 40)
    
    # Group by question type (simple keyword analysis)
    question_types = []
    for question in test_questions[:len(records)]:  # Match length with records
        if 'what is' in question.lower():
            question_types.append('Definition')
        elif 'how does' in question.lower():
            question_types.append('Explanation')
        elif 'difference' in question.lower():
            question_types.append('Comparison')
        else:
            question_types.append('Other')
    
    # Add question types to results
    if len(question_types) == len(records):
        records['question_type'] = question_types
        
        # Analyze by question type
        feedback_metrics = [col for col in records.columns if 'feedback' in col.lower()]
        
        for qtype in set(question_types):
            subset = records[records['question_type'] == qtype]
            if not subset.empty:
                print(f"\n{qtype} Questions (n={len(subset)}):")
                
                for metric in feedback_metrics:
                    if metric in subset.columns and subset[metric].notna().any():
                        avg_score = subset[metric].mean()
                        print(f"  {metric}: {avg_score:.3f}")

analyze_performance_trends()


🔍 Performance Analysis:

Explanation Questions (n=2):
  Groundedness feedback cost in USD: 0.000
  Answer Relevance feedback cost in USD: 0.000
  Context Relevance feedback cost in USD: 0.000
  Completeness feedback cost in USD: 0.000

Definition Questions (n=3):
  Groundedness feedback cost in USD: 0.000
  Answer Relevance feedback cost in USD: 0.000
  Context Relevance feedback cost in USD: 0.000
  Completeness feedback cost in USD: 0.000

Other Questions (n=2):
  Groundedness feedback cost in USD: 0.000
  Answer Relevance feedback cost in USD: 0.000
  Context Relevance feedback cost in USD: 0.000
  Completeness feedback cost in USD: 0.000


In [17]:
import os
print("OpenAI API Key set:", "OPENAI_API_KEY" in os.environ)
print("Key preview:", os.getenv("OPENAI_API_KEY", "NOT_SET")[:10] + "..." if os.getenv("OPENAI_API_KEY") else "NOT_SET")


OpenAI API Key set: True
Key preview: sk-proj-sb...
