In [None]:
!pip install trulens==2.1.2 trulens-core==2.1.2 trulens-dashboard==2.1.2 trulens-feedback==2.1.2 trulens-otel-semconv==2.1.2 trulens_eval==2.1.2 trulens-connectors-snowflake==2.1.2 trulens-providers-cortex==2.1.2  
!pip install snowflake==1.7.0 snowflake-ml-python==1.14.0 tabulate

In [None]:
import os, json, math, textwrap, ast
import numpy as np
import pandas as pd
import streamlit as st
import json
import streamlit as st
import time
from functools import partial
from snowflake.snowpark.context import get_active_session
from snowflake.cortex import complete
from trulens.core import TruSession
from trulens.connectors.snowflake import SnowflakeConnector
#from trulens.apps.custom import instrument
from trulens.apps.app import instrument
from trulens.providers.cortex.provider import Cortex
from trulens.core import Feedback, SnowflakeFeedback, Select
from trulens.core.feedback import feedback as core_feedback
from trulens.apps.custom import TruCustomApp
from snowflake.core import Root


import hashlib
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col


from trulens.core import TruSession, Feedback, Select
from trulens.apps.basic import TruBasicApp
from trulens.providers.cortex import Cortex
TRULENS_AVAILABLE = True




In [None]:
session = get_active_session()
root = Root(session)

print("✅ Snowpark session ready.")

In [None]:
-- pre-uploaded synthetic data
SELECT *
FROM DEMO_DB.CRM.LULULEMON_PRODUCT_SURVEY
LIMIT 10;



In [None]:
select * from SURVEY_KEY_TOPICS limit 10;

In [None]:
TRULENS_AVAILABLE

In [None]:

class TruLensCortexEvaluator:
    """
    Evaluates Snowflake Cortex Complete prompts and outputs using TruLens.
    
    This class provides methods to:
    1. Query Cortex Complete results from Snowflake
    2. Evaluate relevance using TruLens feedback functions
    3. Store evaluation results back to Snowflake
    4. Generate observability reports
    """
    
    def __init__(
        self,
        session: Session,
        provider: str = "cortex",
        model: str = "llama3.1-8b", 
        cache_table: str = "TRULENS_EVALUATION_CACHE"
    ):
        """
        Initialize the evaluator.
        
        Args:
            session: Snowflake Snowpark session
            provider: LLM provider for evaluation ('cortex', 'openai', 'huggingface')
            model: Model to use for evaluation
                   - For 'cortex': 'llama3.1-8b', 'mistral-large', 'mixtral-8x7b', etc.
                   - For 'openai': 'gpt-4', 'gpt-3.5-turbo', etc.
            cache_table: Table name for caching evaluation results
        """
        self.session = session
        self.provider_name = provider
        self.model = model
        self.cache_table = cache_table
        
        # Initialize TruLens
        if TRULENS_AVAILABLE:
            self.tru = TruSession()
            # Note: reset_database() is optional - only use if you want to clear previous data
            # self.tru.reset_database()
            
            # Initialize feedback provider
            if provider == "cortex":
                if Cortex is None:
                    raise ValueError(
                        "Cortex provider not available. "
                        "Install with: pip install trulens-providers-cortex"
                    )
                # Use Snowflake Cortex as the evaluation provider
                self.provider = Cortex(
                    snowflake_connection=self.session.connection,
                    model_engine=model
                )
                print(f"✓ Using Snowflake Cortex provider with model: {model}")
            else:
                raise ValueError(
                    f"Unsupported provider: {provider}. "
                    f"Choose from: 'cortex', 'openai', 'huggingface'"
                )
            
            # Setup feedback functions
            self._setup_feedback_functions()
        else:
            print("TruLens not available. Using fallback Snowflake-native evaluation.")
            self.provider = None
    
    def _setup_feedback_functions(self):
        """Setup TruLens feedback functions for evaluation."""
        # Answer Relevance: How well does the output address the prompt?
        if hasattr(self.provider, 'relevance'):
            self.f_answer_relevance = Feedback(
                self.provider.relevance,
                name="Answer Relevance"
            ).on_input_output()
        else:
            self.f_answer_relevance = None
        
        # Sentiment: Is the response appropriate in tone?
        if hasattr(self.provider, 'sentiment'):
            self.f_sentiment = Feedback(
                self.provider.sentiment,
                name="Sentiment"
            ).on_output()
        else:
            self.f_sentiment = None
        
        # Conciseness: Is the response appropriately concise?
        if hasattr(self.provider, 'conciseness'):
            self.f_conciseness = Feedback(
                self.provider.conciseness,
                name="Conciseness"
            ).on_output()
        else:
            self.f_conciseness = None
        
        # Language match: Does output language match input? (not available in all providers)
        if hasattr(self.provider, 'language_match'):
            self.f_language_match = Feedback(
                self.provider.language_match,
                name="Language Match"
            ).on_input_output()
        else:
            self.f_language_match = None
            
        # Print available feedback functions
        available = []
        if self.f_answer_relevance: available.append("relevance")
        if self.f_sentiment: available.append("sentiment")
        if self.f_conciseness: available.append("conciseness")
        if self.f_language_match: available.append("language_match")
        print(f"✓ Available feedback functions: {', '.join(available)}")
    
    def _create_cache_table(self):
        """Create cache table if it doesn't exist."""
        create_sql = f"""
        CREATE TABLE IF NOT EXISTS {self.cache_table} (
            prompt_hash STRING,
            output_hash STRING,
            prompt TEXT,
            output TEXT,
            relevance_score FLOAT,
            sentiment_score FLOAT,
            conciseness_score FLOAT,
            language_match_score FLOAT,
            evaluation_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
            metadata VARIANT,
            CONSTRAINT unique_eval UNIQUE (prompt_hash, output_hash)
        )
        """
        self.session.sql(create_sql).collect()
    
    def _hash_text(self, text: str) -> str:
        """Generate hash for text."""
        return hashlib.md5(text.encode()).hexdigest()
    
    def _check_cache(self, prompt: str, output: str) -> Optional[Dict[str, float]]:
        """Check if evaluation exists in cache."""
        prompt_hash = self._hash_text(prompt)
        output_hash = self._hash_text(output)
        
        try:
            result = self.session.sql(f"""
                SELECT 
                    relevance_score,
                    sentiment_score,
                    conciseness_score,
                    language_match_score
                FROM {self.cache_table}
                WHERE prompt_hash = '{prompt_hash}'
                AND output_hash = '{output_hash}'
            """).collect()
            
            if result:
                return {
                    'relevance_score': result[0]['RELEVANCE_SCORE'],
                    'sentiment_score': result[0]['SENTIMENT_SCORE'],
                    'conciseness_score': result[0]['CONCISENESS_SCORE'],
                    'language_match_score': result[0]['LANGUAGE_MATCH_SCORE']
                }
        except Exception:
            pass
        
        return None
    
    def _save_to_cache(
        self,
        prompt: str,
        output: str,
        scores: Dict[str, float],
        metadata: Optional[Dict] = None
    ):
        """Save evaluation results to cache."""
        prompt_hash = self._hash_text(prompt)
        output_hash = self._hash_text(output)
        
        # Escape single quotes
        prompt_escaped = prompt.replace("'", "''")
        output_escaped = output.replace("'", "''")
        #metadata_json = json.dumps(metadata or {}).replace("'", "''")
        # Convert metadata to JSON string and escape quotes
        metadata_json = json.dumps(metadata or {})
        # Double escape: first for JSON, then for SQL string
        metadata_escaped = metadata_json.replace("'", "''")
        
        
        insert_sql = f"""
        INSERT INTO {self.cache_table} (
            prompt_hash,
            output_hash,
            prompt,
            output,
            relevance_score,
            sentiment_score,
            conciseness_score,
            language_match_score,
            metadata
        ) 
        select 
            '{prompt_hash}',
            '{output_hash}',
            '{prompt_escaped}',
            '{output_escaped}',
            {scores.get('relevance_score', 0)},
            {scores.get('sentiment_score', 0)},
            {scores.get('conciseness_score', 0)},
            {scores.get('language_match_score', 0)},
            parse_json('{metadata_escaped}')
        """
        try:
            self.session.sql(insert_sql).collect()
        except Exception as e:
            print(f"Warning: Could not cache result: {e}")
    
    def evaluate_single(
        self,
        prompt: str,
        output: str,
        context: Optional[str] = None,
        use_cache: bool = True
    ) -> Dict[str, float]:
        """
        Evaluate a single prompt-output pair.
        
        Args:
            prompt: The input prompt
            output: The Cortex Complete output
            context: Optional context (for RAG evaluation)
            use_cache: Whether to use cached results
            
        Returns:
            Dictionary of evaluation scores
        """
        # Check cache first
        if use_cache:
            cached = self._check_cache(prompt, output)
            if cached:
                return cached
        
        if not TRULENS_AVAILABLE:
            return self._fallback_evaluation(prompt, output)
        
        # Build list of available feedbacks
        feedbacks = []
        if self.f_answer_relevance:
            feedbacks.append(self.f_answer_relevance)
        if self.f_sentiment:
            feedbacks.append(self.f_sentiment)
        if self.f_conciseness:
            feedbacks.append(self.f_conciseness)
        if self.f_language_match:
            feedbacks.append(self.f_language_match)
        
        if not feedbacks:
            print("Warning: No feedback functions available, using fallback")
            return self._fallback_evaluation(prompt, output)
        
        # Initialize scores
        scores = {
            'relevance_score': 0.0,
            'sentiment_score': 0.0,
            'conciseness_score': 0.0,
            'language_match_score': 0.0
        }
        
        # Call provider methods directly instead of using TruBasicApp wrapper
        # This is more reliable and avoids the "No records found" error
        try:
            # Evaluate relevance (prompt + output)
            if hasattr(self.provider, 'relevance'):
                try:
                    relevance_result = self.provider.relevance(prompt, output)
                    scores['relevance_score'] = float(relevance_result) if relevance_result is not None else 0.0
                except Exception as e:
                    print(f"Warning: Relevance evaluation failed: {e}")
            
            # Evaluate sentiment (output only)
            if hasattr(self.provider, 'sentiment'):
                try:
                    sentiment_result = self.provider.sentiment(output)
                    scores['sentiment_score'] = float(sentiment_result) if sentiment_result is not None else 0.0
                except Exception as e:
                    print(f"Warning: Sentiment evaluation failed: {e}")
            
            # Evaluate conciseness (output only)
            if hasattr(self.provider, 'conciseness'):
                try:
                    conciseness_result = self.provider.conciseness(output)
                    scores['conciseness_score'] = float(conciseness_result) if conciseness_result is not None else 0.0
                except Exception as e:
                    print(f"Warning: Conciseness evaluation failed: {e}")
            
            # Evaluate language match (prompt + output)
            if hasattr(self.provider, 'language_match'):
                try:
                    language_result = self.provider.language_match(prompt, output)
                    scores['language_match_score'] = float(language_result) if language_result is not None else 0.0
                except Exception as e:
                    print(f"Warning: Language match evaluation failed: {e}")
                    
        except Exception as e:
            print(f"Error during evaluation: {e}")
            print("Falling back to Snowflake Cortex evaluation")
            return self._fallback_evaluation(prompt, output)
        
        # Cache results with metadata
        if use_cache:
            metadata = {
                'provider': self.provider_name,
                'model': self.model,
                'evaluation_method': 'direct_provider_call',
                'timestamp': datetime.now().isoformat(),
                'available_metrics': [
                    'relevance' if hasattr(self.provider, 'relevance') else None,
                    'sentiment' if hasattr(self.provider, 'sentiment') else None,
                    'conciseness' if hasattr(self.provider, 'conciseness') else None,
                    'language_match' if hasattr(self.provider, 'language_match') else None
                ],
                'context_provided': context is not None
            }
            # Remove None values from available_metrics
            metadata['available_metrics'] = [m for m in metadata['available_metrics'] if m is not None]
            
            self._save_to_cache(prompt, output, scores, metadata)
        
        return scores
    
    def _fallback_evaluation(self, prompt: str, output: str) -> Dict[str, float]:
        """
        Fallback evaluation using Snowflake Cortex when TruLens is not available.
        """
        eval_prompt = f"""
        Evaluate the following prompt and response pair on these criteria (0.0 to 1.0):
        1. Relevance: How well does the response address the prompt?
        2. Sentiment: Is the tone appropriate?
        3. Conciseness: Is the response appropriately concise?
        
        Prompt: {prompt}
        Response: {output}
        
        Return ONLY a JSON object with keys: relevance_score, sentiment_score, conciseness_score
        Each value should be a float between 0.0 and 1.0.
        """
        
        try:
            result = self.session.sql(f"""
                SELECT SNOWFLAKE.CORTEX.COMPLETE(
                    'mistral-large',
                    '{eval_prompt.replace("'", "''")}'
                ) as evaluation
            """).collect()
            
            eval_json = json.loads(result[0]['EVALUATION'])
            # Ensure all expected keys exist
            scores = {
                'relevance_score': eval_json.get('relevance_score', 0.5),
                'sentiment_score': eval_json.get('sentiment_score', 0.5),
                'conciseness_score': eval_json.get('conciseness_score', 0.5),
                'language_match_score': 0.0  # Not evaluated in fallback
            }
            return scores
        except Exception as e:
            print(f"Fallback evaluation failed: {e}")
            return {
                'relevance_score': 0.5,
                'sentiment_score': 0.5,
                'conciseness_score': 0.5,
                'language_match_score': 0.0
            }
    
    def evaluate_batch(
        self,
        data: pd.DataFrame,
        prompt_column: str = 'prompt',
        output_column: str = 'output',
        id_column: str = 'id',
        batch_size: int = 100
    ) -> pd.DataFrame:
        """
        Evaluate a batch of prompt-output pairs.
        
        Args:
            data: DataFrame with prompts and outputs
            prompt_column: Name of prompt column
            output_column: Name of output column
            id_column: Name of ID column
            batch_size: Number of records to process at once
            
        Returns:
            DataFrame with evaluation scores added
        """
        results = []
        
        for idx, row in data.iterrows():
            prompt = str(row[prompt_column])
            output = str(row[output_column])
            record_id = row[id_column] if id_column in row else idx
            
            scores = self.evaluate_single(prompt, output)
            
            result = {
                id_column: record_id,
                prompt_column: prompt,
                output_column: output,
                **scores
            }
            results.append(result)
            
            if (idx + 1) % batch_size == 0:
                print(f"Processed {idx + 1}/{len(data)} records")
        
        return pd.DataFrame(results)
    
    def evaluate_snowflake_table(
        self,
        source_table: str,
        prompt_column: str,
        output_column: str,
        result_table: str,
        id_column: str = 'id',
        sample_size: Optional[int] = None
    ) -> pd.DataFrame:
        """
        Evaluate Cortex Complete results from a Snowflake table.
        
        Args:
            source_table: Source table name
            prompt_column: Column containing prompts
            output_column: Column containing Cortex outputs
            result_table: Table to write results to
            id_column: ID column name
            sample_size: Optional sample size (for testing)
            
        Returns:
            DataFrame with evaluation results
        """
        # Create cache table
        self._create_cache_table()
        
        # Query source data
        query = f"""
        SELECT 
            {id_column},
            {prompt_column},
            {output_column}
        FROM {source_table}
        """
        
        if sample_size:
            query += f" LIMIT {sample_size}"
        
        print(f"Querying {source_table}...")
        df = self.session.sql(query).to_pandas()
        print(f"Retrieved {len(df)} records")
        
        # Evaluate
        print("Evaluating with TruLens...")
        results = self.evaluate_batch(
            df,
            prompt_column=prompt_column,
            output_column=output_column,
            id_column=id_column
        )
        
        # Write results to Snowflake
        print(f"Writing results to {result_table}...")
        self.session.write_pandas(
            results,
            table_name=result_table,
            auto_create_table=True,
            overwrite=True
        )
        
        return results
    
    def generate_report(self, results: pd.DataFrame) -> Dict[str, Any]:
        """
        Generate summary report from evaluation results.
        
        Args:
            results: DataFrame with evaluation scores
            
        Returns:
            Dictionary with summary statistics
        """
        report = {
            'total_records': len(results),
            'timestamp': datetime.now().isoformat(),
            'metrics': {}
        }
        
        for metric in ['relevance_score', 'sentiment_score', 'conciseness_score', 'language_match_score']:
            if metric in results.columns:
                report['metrics'][metric] = {
                    'mean': float(results[metric].mean()),
                    'median': float(results[metric].median()),
                    'std': float(results[metric].std()),
                    'min': float(results[metric].min()),
                    'max': float(results[metric].max()),
                    'below_threshold_count': int((results[metric] < 0.7).sum()),
                    'below_threshold_pct': float((results[metric] < 0.7).sum() / len(results) * 100)
                }
        
        return report
    
    def get_low_quality_records(
        self,
        results: pd.DataFrame,
        threshold: float = 0.7,
        metric: str = 'relevance_score'
    ) -> pd.DataFrame:
        """
        Get records with low quality scores.
        
        Args:
            results: DataFrame with evaluation scores
            threshold: Minimum acceptable score
            metric: Metric to filter on
            
        Returns:
            DataFrame with low quality records
        """
        return results[results[metric] < threshold].sort_values(metric)


In [None]:
# Initialize evaluator with Snowflake Cortex provider (recommended)
evaluator = TruLensCortexEvaluator(
    session=session,
    provider="cortex",  # Use Snowflake Cortex for evaluation
    model="claude-4-sonnet"  # or "mistral-large", "mixtral-8x7b", etc.
)

# Alternative: Use OpenAI provider
# evaluator = TruLensCortexEvaluator(
#     session=session,
#     provider="openai",
#     model="gpt-4"
# )

# Example 1: Evaluate a single prompt-output pair
print("\n=== Example 1: Single Evaluation ===")
prompt = "What is the return policy for online orders?"
output = "Our return policy allows returns within 30 days of purchase with original receipt."

scores = evaluator.evaluate_single(prompt, output)
print(f"Scores: {scores}")

In [None]:
create or replace transient table TEMP_SURVEY_KEY_TOPICS_PRMPT
as
SELECT survey_id, 
concat ('Extract 1-6 concise product aspects from this retail review.\n' ||
'Rules: short phrases (2-4 words), copy from text if possible, no sentiment words, no duplicates.\n' ||
'Example: Great quality-husband loves them. Really comfy and true to size. -> ["quality", "comfy", "size"]\n' ||
'Return ONLY a JSON array of strings.\n\n' || REVIEW_TEXT) as PROMPT_GIVEN,
KEYWORDS::string KEYWORDS
FROM SURVEY_KEY_TOPICS
where 1=1
limit 10
;

In [None]:
# Evaluate from Snowflake table
print("\n=== Evaluate Snowflake Table ===")
#a table with cortex complete promot and results
results = evaluator.evaluate_snowflake_table(
    source_table='TEMP_SURVEY_KEY_TOPICS_PRMPT',
    prompt_column='PROMPT_GIVEN',
    output_column='KEYWORDS',
    result_table='QA_SCORING_WITH_TRULENS',
    id_column='SURVEY_ID',
    sample_size=10  # Start with sample
)



In [None]:
results

In [None]:
drop table TEMP_SURVEY_KEY_TOPICS_PRMPT;

In [None]:
select * from TRULENS_EVALUATION_CACHE;

In [None]:
SELECT * FROM SENTIMENT_OUTPUT limit 10;

In [None]:
create or replace transient table TEMP_SENTIMENT_OUTPUT_PRMPT
as
SELECT survey_id 
,'provide overall sentiment and sentiment for the given review for classifcation labels deined in the lavel <review>' || review_text || '</review> <label>' || cluster_labels::string || '<label> . provide the output as JSON' as PROMPT_GIVEN
,SENTIMENT::string SENTIMENT_OUTPUT
FROM SENTIMENT_OUTPUT
where 1=1
limit 10
;

In [None]:
# Evaluate from Snowflake table
print("\n=== Evaluate Snowflake Table ===")
#a table with cortex complete promot and results
results = evaluator.evaluate_snowflake_table(
    source_table='TEMP_SENTIMENT_OUTPUT_PRMPT',
    prompt_column='PROMPT_GIVEN',
    output_column='SENTIMENT_OUTPUT',
    result_table='SENTIMENT_OUTPUT_SCORING_WITH_TRULENS',
    id_column='SURVEY_ID',
    sample_size=10  # Start with sample
)



In [None]:
results

In [None]:
select * from SENTIMENT_OUTPUT_SCORING_WITH_TRULENS limit 10;

In [None]:
drop table TEMP_SENTIMENT_OUTPUT_PRMPT;