In [None]:
!pip install nltk google-generativeai dotenv

In [None]:
import json
import re
from typing import Dict, List, Tuple, Any, Optional
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.corpus import stopwords
import string
import google.generativeai as genai
import os
from datetime import datetime
import time
from dotenv import load_dotenv

load_dotenv()

In [None]:
# Download required NLTK data
try:
    nltk.data.find('tokenizers/punkt')
    nltk.data.find('corpora/stopwords')
    nltk.data.find('vader_lexicon')
except LookupError:
    nltk.download('punkt')
    nltk.download('stopwords')
    nltk.download('vader_lexicon')


In [None]:
class AdvancedSentimentAnalyzer:
    def __init__(self, gemini_api_key: Optional[str] = None):
        self.sia = SentimentIntensityAnalyzer()
        self.stop_words = set(stopwords.words('english'))
        
        # Initialize Gemini API
        self.gemini_api_key = gemini_api_key or os.getenv('GEMINI_API_KEY')
        self.gemini_model = None
        self.gemini_enabled = False
        
        if self.gemini_api_key:
            try:
                genai.configure(api_key=self.gemini_api_key)
                self.gemini_model = genai.GenerativeModel('gemini-flash-1.5')
                self.gemini_enabled = True
                print("✅ Gemini API initialized successfully")
            except Exception as e:
                print(f"⚠️ Gemini API initialization failed: {str(e)}")
                self.gemini_enabled = False
        else:
            print("⚠️ No Gemini API key provided. Advanced LLM analysis disabled.")
        
        # Advanced emotion lexicon
        self.emotion_lexicon = {
            'happiness': {
                'words': ['happy', 'joy', 'pleased', 'delighted', 'satisfied', 'glad', 'cheerful', 
                         'excited', 'thrilled', 'elated', 'content', 'grateful', 'thankful', 'appreciate',
                         'wonderful', 'fantastic', 'excellent', 'great', 'amazing', 'awesome', 'perfect'],
                'intensifiers': ['very', 'extremely', 'incredibly', 'absolutely', 'truly', 'really']
            },
            'sadness': {
                'words': ['sad', 'disappointed', 'unhappy', 'depressed', 'miserable', 'sorrowful',
                         'heartbroken', 'devastated', 'down', 'blue', 'gloomy', 'melancholy'],
                'intensifiers': ['very', 'deeply', 'extremely', 'utterly', 'completely']
            },
            'anger': {
                'words': ['angry', 'furious', 'mad', 'irritated', 'annoyed', 'frustrated', 'outraged',
                         'livid', 'enraged', 'incensed', 'irate', 'disgusted', 'hate', 'despise'],
                'intensifiers': ['extremely', 'absolutely', 'totally', 'completely', 'utterly']
            },
            'fear': {
                'words': ['afraid', 'scared', 'worried', 'anxious', 'nervous', 'concerned', 'terrified',
                         'frightened', 'alarmed', 'panicked', 'distressed', 'uneasy'],
                'intensifiers': ['very', 'extremely', 'deeply', 'seriously', 'highly']
            },
            'surprise': {
                'words': ['surprised', 'shocked', 'amazed', 'astonished', 'stunned', 'bewildered',
                         'startled', 'unexpected', 'sudden', 'wow', 'incredible', 'unbelievable'],
                'intensifiers': ['totally', 'completely', 'absolutely', 'utterly']
            },
            'disgust': {
                'words': ['disgusted', 'revolted', 'repulsed', 'sickened', 'appalled', 'horrified',
                         'nauseated', 'repugnant', 'awful', 'terrible', 'horrible', 'dreadful'],
                'intensifiers': ['absolutely', 'completely', 'totally', 'utterly']
            },
            'relief': {
                'words': ['relief', 'relieved', 'thankful', 'grateful', 'better', 'resolved',
                         'settled', 'calm', 'peaceful', 'relaxed', 'comfortable', 'reassured'],
                'intensifiers': ['much', 'greatly', 'significantly', 'considerably']
            },
            'frustration': {
                'words': ['frustrated', 'annoyed', 'irritated', 'bothered', 'vexed', 'exasperated',
                         'fed up', 'tired', 'sick', 'enough', 'problem', 'issue', 'difficulty'],
                'intensifiers': ['very', 'extremely', 'really', 'totally', 'completely']
            }
        }
        
        # Context patterns for better understanding
        self.context_patterns = {
            'conditional_positive': r'\b(should|could|would)\s+\w+\s+(better|faster|improved?)\b',
            'mixed_sentiment': r'\b(but|however|although|though|yet)\b',
            'resolution': r'\b(resolved?|fixed?|solved?|settled?)\b',
            'outcome': r'\b(outcome|result|end)\b',
            'speed_concern': r'\b(faster|quicker|speed|slow|delay)\b'
        }

    def preprocess_text(self, text: str) -> str:
        """Clean and preprocess text"""
        if not text:
            return ""
        
        text = re.sub(r'[^\w\s\.\!\?\,\;\:]', ' ', text)
        text = re.sub(r'\s+', ' ', text)
        return text.strip().lower()

    def detect_emotions(self, text: str) -> Dict[str, float]:
        """Detect emotions and calculate scores"""
        text_lower = self.preprocess_text(text)
        words = word_tokenize(text_lower)
        
        emotion_scores = {}
        
        for emotion, data in self.emotion_lexicon.items():
            score = 0
            emotion_words = data['words']
            intensifiers = data['intensifiers']
            
            for i, word in enumerate(words):
                if word in emotion_words:
                    base_score = 1.0
                    
                    # Check for intensifiers before the emotion word
                    if i > 0 and words[i-1] in intensifiers:
                        base_score *= 1.5
                    
                    # Check for negation
                    if i > 0 and words[i-1] in ['not', 'no', 'never', 'hardly', 'barely']:
                        base_score *= -0.5
                    
                    score += base_score
            
            # Normalize by text length
            if len(words) > 0:
                emotion_scores[emotion] = min(score / len(words) * 10, 1.0)
            else:
                emotion_scores[emotion] = 0.0
        
        return emotion_scores

    def analyze_context(self, text: str) -> Dict[str, Any]:
        """Analyze contextual patterns in the text"""
        context_info = {}
        
        for pattern_name, pattern in self.context_patterns.items():
            matches = re.findall(pattern, text.lower())
            context_info[pattern_name] = len(matches) > 0
        
        return context_info

    def determine_primary_secondary_emotions(self, emotion_scores: Dict[str, float]) -> Tuple[str, str]:
        """Determine primary and secondary emotions"""
        sorted_emotions = sorted(emotion_scores.items(), key=lambda x: x[1], reverse=True)
        
        primary = sorted_emotions[0][0].title() if sorted_emotions[0][1] > 0 else "Neutral"
        secondary = sorted_emotions[1][0].title() if len(sorted_emotions) > 1 and sorted_emotions[1][1] > 0 else "None"
        
        return primary, secondary

    def generate_reason(self, text: str, emotion_scores: Dict[str, float], context: Dict[str, Any]) -> str:
        """Generate reasoning for the sentiment analysis"""
        reasons = []
        
        # Analyze dominant emotions
        sorted_emotions = sorted(emotion_scores.items(), key=lambda x: x[1], reverse=True)
        top_emotions = [e for e, s in sorted_emotions[:2] if s > 0.1]
        
        if 'happiness' in top_emotions and 'frustration' in top_emotions:
            reasons.append("Mixed emotions with satisfaction despite concerns")
        elif 'happiness' in top_emotions and 'relief' in top_emotions:
            reasons.append("Positive outcome with relief")
        elif 'frustration' in top_emotions:
            if context.get('speed_concern'):
                reasons.append("Frustration about speed/efficiency")
            else:
                reasons.append("General frustration expressed")
        
        if context.get('resolution'):
            reasons.append("mentions resolution")
        if context.get('outcome'):
            reasons.append("discusses outcome")
        if context.get('mixed_sentiment'):
            reasons.append("contains contrasting elements")
        
        # Analyze specific phrases
        text_lower = text.lower()
        if 'happy about' in text_lower:
            reasons.append("explicit happiness about outcome")
        if 'should do faster' in text_lower or 'could be faster' in text_lower:
            reasons.append("suggests improvement needed")
        
        return ", ".join(reasons) if reasons else "General sentiment expressed"

    def generate_recommendation(self, primary_emotion: str, secondary_emotion: str, 
                              context: Dict[str, Any], text: str) -> str:
        """Generate actionable recommendation"""
        recommendations = []
        
        if primary_emotion == "Happiness":
            if secondary_emotion in ["Relief", "Frustration"]:
                recommendations.append("Acknowledge positive outcome")
                if context.get('speed_concern') or 'faster' in text.lower():
                    recommendations.append("address speed concerns")
                recommendations.append("maintain positive tone")
            else:
                recommendations.append("Reinforce positive experience")
        
        elif primary_emotion == "Frustration":
            recommendations.append("Address concerns promptly")
            if context.get('speed_concern'):
                recommendations.append("focus on efficiency improvements")
        
        elif primary_emotion == "Relief":
            recommendations.append("Confirm resolution")
            recommendations.append("ensure continued satisfaction")
        
        # Default recommendations
        if not recommendations:
            if any(word in text.lower() for word in ['thanks', 'thank']):
                recommendations.append("Acknowledge appreciation")
            recommendations.append("maintain professional communication")
        
        return ", ".join(recommendations)

    def analyze_with_gemini(self, email_text: str) -> Dict[str, Any]:
        """
        Use Gemini LLM for advanced sentiment analysis
        
        Args:
            email_text: The email content to analyze
            
        Returns:
            Dictionary with Gemini's analysis results
        """
        if not self.gemini_enabled:
            return {
                "gemini_available": False,
                "error": "Gemini API not configured",
                "analysis": None
            }
        
        try:
            # Construct prompt for Gemini
            prompt = f"""
            Analyze the following email text for advanced sentiment analysis. Please provide a detailed JSON response with the following structure:

            {{
                "primary_emotion": "dominant emotion (Happiness, Sadness, Anger, Fear, Surprise, Disgust, Relief, Frustration, Neutral)",
                "secondary_emotion": "secondary emotion or None",
                "emotion_intensity": "score from 1-10",
                "sentiment_polarity": "Positive, Negative, or Neutral",
                "confidence_score": "confidence level from 0.0 to 1.0",
                "emotional_complexity": "Simple, Moderate, or Complex",
                "contextual_analysis": {{
                    "tone": "Professional, Casual, Formal, Informal, etc.",
                    "urgency_level": "Low, Medium, High",
                    "relationship_indicators": "signs of relationship dynamic",
                    "business_context": "relevant business implications"
                }},
                "psychological_insights": {{
                    "stress_indicators": "signs of stress or pressure",
                    "satisfaction_level": "indicators of satisfaction/dissatisfaction",
                    "expectation_management": "expectations expressed or implied",
                    "communication_style": "direct, indirect, assertive, passive, etc."
                }},
                "detailed_reasoning": "comprehensive explanation of the analysis",
                "actionable_recommendations": [
                    "specific recommendation 1",
                    "specific recommendation 2"
                ],
                "key_phrases": ["important phrases that influenced the analysis"],
                "potential_follow_up": "suggested follow-up actions"
            }}

            Email text to analyze:
            "{email_text}"

            Provide only the JSON response, no additional text.
            """
            
            # Generate response from Gemini
            response = self.gemini_model.generate_content(prompt)
            
            # Parse the JSON response
            try:
                gemini_analysis = json.loads(response.text)
                return {
                    "gemini_available": True,
                    "analysis": gemini_analysis,
                    "timestamp": datetime.now().isoformat()
                }
            except json.JSONDecodeError:
                # If JSON parsing fails, try to extract useful information
                return {
                    "gemini_available": True,
                    "analysis": {
                        "raw_response": response.text,
                        "parsing_error": "Could not parse JSON response"
                    },
                    "timestamp": datetime.now().isoformat()
                }
                
        except Exception as e:
            return {
                "gemini_available": True,
                "error": f"Gemini API error: {str(e)}",
                "analysis": None,
                "timestamp": datetime.now().isoformat()
            }

    def combine_analyses(self, traditional_analysis: Dict[str, Any], 
                        gemini_analysis: Dict[str, Any]) -> Dict[str, Any]:
        """
        Combine traditional sentiment analysis with Gemini's advanced analysis
        
        Args:
            traditional_analysis: Results from traditional analysis
            gemini_analysis: Results from Gemini analysis
            
        Returns:
            Combined comprehensive analysis
        """
        combined = traditional_analysis.copy()
        
        if gemini_analysis.get("gemini_available") and gemini_analysis.get("analysis"):
            gemini_data = gemini_analysis["analysis"]
            
            # Add Gemini insights
            combined["advanced_analysis"] = {
                "llm_primary_emotion": gemini_data.get("primary_emotion"),
                "llm_secondary_emotion": gemini_data.get("secondary_emotion"),
                "emotion_intensity": gemini_data.get("emotion_intensity"),
                "confidence_score": gemini_data.get("confidence_score"),
                "emotional_complexity": gemini_data.get("emotional_complexity"),
                "contextual_analysis": gemini_data.get("contextual_analysis", {}),
                "psychological_insights": gemini_data.get("psychological_insights", {}),
                "detailed_reasoning": gemini_data.get("detailed_reasoning"),
                "actionable_recommendations": gemini_data.get("actionable_recommendations", []),
                "key_phrases": gemini_data.get("key_phrases", []),
                "potential_follow_up": gemini_data.get("potential_follow_up")
            }
            
            # Update primary analysis with LLM insights if available
            if gemini_data.get("primary_emotion"):
                combined["LLM_Primary_Emotion"] = gemini_data["primary_emotion"]
            if gemini_data.get("secondary_emotion"):
                combined["LLM_Secondary_Emotion"] = gemini_data["secondary_emotion"]
            
            # Enhanced recommendations combining both analyses
            traditional_rec = combined.get("Recommendation", "")
            llm_recs = gemini_data.get("actionable_recommendations", [])
            
            if llm_recs:
                enhanced_rec = traditional_rec + "; " + "; ".join(llm_recs[:2])
                combined["Enhanced_Recommendation"] = enhanced_rec
            
            # Consensus scoring
            combined["Analysis_Consensus"] = self.calculate_consensus(traditional_analysis, gemini_data)
            
        else:
            combined["advanced_analysis"] = {
                "status": "LLM analysis unavailable",
                "reason": gemini_analysis.get("error", "Unknown error")
            }
        
        return combined

    def calculate_consensus(self, traditional: Dict[str, Any], gemini: Dict[str, Any]) -> Dict[str, Any]:
        """Calculate consensus between traditional and LLM analysis"""
        consensus = {}
        
        # Emotion consensus
        trad_primary = traditional.get("Primary_Emotion", "").lower()
        llm_primary = gemini.get("primary_emotion", "").lower()
        
        consensus["emotion_agreement"] = trad_primary == llm_primary
        consensus["confidence_level"] = "High" if consensus["emotion_agreement"] else "Medium"
        
        # Sentiment consensus
        trad_sentiment = traditional.get("Sentiment", "").lower()
        llm_sentiment = gemini.get("sentiment_polarity", "").lower()
        
        consensus["sentiment_agreement"] = trad_sentiment == llm_sentiment
        
        return consensus
        """Calculate overall emotion intensity score"""
        max_score = max(emotion_scores.values()) if emotion_scores else 0
        total_score = sum(emotion_scores.values())
        
        # Normalize to 0-10 scale
        intensity = min(int(total_score * 10), 10)
        return f"{intensity}/10"

    def calculate_emotion_score(self, emotion_scores: Dict[str, float]) -> str:
        """Calculate overall emotion intensity score"""
        max_score = max(emotion_scores.values()) if emotion_scores else 0
        total_score = sum(emotion_scores.values())
        
        # Normalize to 0-10 scale
        intensity = min(int(total_score * 10), 10)
        return f"{intensity}/10"

    def analyze_sentiment_with_llm(self, email_data: Dict[str, str], use_gemini: bool = True) -> Dict[str, Any]:
        """
        Perform comprehensive sentiment analysis using both traditional methods and Gemini LLM
        
        Args:
            email_data: Dictionary with 'email' key containing the email text
            use_gemini: Whether to use Gemini LLM for advanced analysis
            
        Returns:
            Dictionary with comprehensive sentiment analysis
        """
        # Perform traditional analysis first
        traditional_result = self.analyze_sentiment(email_data)
        
        if not use_gemini or not self.gemini_enabled:
            return traditional_result
        
        email_text = email_data.get('email', '')
        
        # Perform Gemini analysis
        gemini_result = self.analyze_with_gemini(email_text)
        
        # Combine both analyses
        combined_result = self.combine_analyses(traditional_result, gemini_result)
        
        return combined_result

    def process_json_input_with_llm(self, json_input: str, use_gemini: bool = True) -> str:
        """
        Process JSON input with LLM enhancement and return comprehensive JSON output
        
        Args:
            json_input: JSON string with email data
            use_gemini: Whether to use Gemini LLM for advanced analysis
            
        Returns:
            JSON string with comprehensive sentiment analysis results
        """
        try:
            # Parse input JSON
            input_data = json.loads(json_input)
            
            # Perform comprehensive analysis
            result = self.analyze_sentiment_with_llm(input_data, use_gemini)
            
            # Return formatted JSON
            return json.dumps(result, indent=2)
        
        except json.JSONDecodeError as e:
            return json.dumps({
                "error": f"Invalid JSON input: {str(e)}",
                "Primary_Emotion": "Error",
                "Secondary_Emotion": "None",
                "Reason": "JSON parsing failed",
                "Sentiment": "Neutral",
                "Recommendation": "Fix input format",
                "Emotion_score": "0/10"
            }, indent=2)
        
        except Exception as e:
            return json.dumps({
                "error": f"Analysis error: {str(e)}",
                "Primary_Emotion": "Error",
                "Secondary_Emotion": "None", 
                "Reason": "Processing failed",
                "Sentiment": "Neutral",
                "Recommendation": "Check input data",
                "Emotion_score": "0/10"
            }, indent=2)
    def analyze_sentiment(self, email_data: Dict[str, str]) -> Dict[str, Any]:
        """
        Perform advanced sentiment analysis on email data
        
        Args:
            email_data: Dictionary with 'email' key containing the email text
            
        Returns:
            Dictionary with detailed sentiment analysis
        """
        email_text = email_data.get('email', '')
        
        if not email_text.strip():
            return {
                "Primary_Emotion": "Neutral",
                "Secondary_Emotion": "None",
                "Reason": "No content to analyze",
                "Sentiment": "Neutral",
                "Recommendation": "No action needed",
                "Emotion_score": "0/10"
            }
        
        # Detect emotions
        emotion_scores = self.detect_emotions(email_text)
        
        # Analyze context
        context = self.analyze_context(email_text)
        
        # Determine primary and secondary emotions
        primary_emotion, secondary_emotion = self.determine_primary_secondary_emotions(emotion_scores)
        
        # Determine overall sentiment
        vader_scores = self.sia.polarity_scores(email_text)
        if vader_scores['compound'] >= 0.05:
            sentiment = "Positive"
        elif vader_scores['compound'] <= -0.05:
            sentiment = "Negative"
        else:
            sentiment = "Neutral"
        
        # Generate reason and recommendation
        reason = self.generate_reason(email_text, emotion_scores, context)
        recommendation = self.generate_recommendation(primary_emotion, secondary_emotion, context, email_text)
        emotion_score = self.calculate_emotion_score(emotion_scores)
        
        return {
            "Primary_Emotion": primary_emotion,
            "Secondary_Emotion": secondary_emotion,
            "Reason": reason,
            "Sentiment": sentiment,
            "Recommendation": recommendation,
            "Emotion_score": emotion_score
        }

    def process_json_input(self, json_input: str) -> str:
        """
        Process JSON input and return JSON output
        
        Args:
            json_input: JSON string with email data
            
        Returns:
            JSON string with sentiment analysis results
        """
        try:
            # Parse input JSON
            input_data = json.loads(json_input)
            
            # Perform analysis
            result = self.analyze_sentiment(input_data)
            
            # Return formatted JSON
            return json.dumps(result, indent=2)
        
        except json.JSONDecodeError as e:
            return json.dumps({
                "error": f"Invalid JSON input: {str(e)}",
                "Primary_Emotion": "Error",
                "Secondary_Emotion": "None",
                "Reason": "JSON parsing failed",
                "Sentiment": "Neutral",
                "Recommendation": "Fix input format",
                "Emotion_score": "0/10"
            }, indent=2)
        
        except Exception as e:
            return json.dumps({
                "error": f"Analysis error: {str(e)}",
                "Primary_Emotion": "Error",
                "Secondary_Emotion": "None", 
                "Reason": "Processing failed",
                "Sentiment": "Neutral",
                "Recommendation": "Check input data",
                "Emotion_score": "0/10"
            }, indent=2)


In [None]:

# Example usage and testing
def main():
    # Initialize with Gemini API key (replace with your actual API key)
    # You can also set GEMINI_API_KEY environment variable
    analyzer = AdvancedSentimentAnalyzer(gemini_api_key="YOUR_GEMINI_API_KEY_HERE")
    
    # Test with the provided example
    test_input = '{"email": "Thanks for the resolution..should do faster but I am very happy about the outcome."}'
    
    print("=== Traditional Sentiment Analysis ===")
    print("Input:")
    print(test_input)
    print("\nTraditional Output:")
    traditional_result = analyzer.process_json_input(test_input)
    print(traditional_result)
    
    print("\n" + "="*60)
    print("=== Enhanced Analysis with Gemini LLM ===")
    print("Input:")
    print(test_input)
    print("\nEnhanced Output:")
    enhanced_result = analyzer.process_json_input_with_llm(test_input, use_gemini=True)
    print(enhanced_result)
    
    # Additional test cases
    test_cases = [
        '{"email": "I am extremely frustrated with the constant delays and poor service quality!"}',
        '{"email": "What a wonderful surprise! The project exceeded all our expectations."}',
        '{"email": "I was worried about the deadline, but thankfully everything worked out fine."}',
        '{"email": "The meeting was okay, nothing special to report."}',
        '{"email": "This is absolutely disgusting! I cannot believe how terrible this is."}'
    ]
    
    print("\n" + "="*60)
    print("=== Comparison: Traditional vs LLM-Enhanced Analysis ===")
    
    for i, test_case in enumerate(test_cases[:2], 1):  # Limit to 2 for demo
        print(f"\n--- Test Case {i} ---")
        print("Input:", test_case)
        
        print("\n📊 Traditional Analysis:")
        trad_result = analyzer.process_json_input(test_case)
        trad_data = json.loads(trad_result)
        print(f"Emotion: {trad_data.get('Primary_Emotion')} | Sentiment: {trad_data.get('Sentiment')}")
        
        print("🤖 LLM-Enhanced Analysis:")
        enhanced_result = analyzer.process_json_input_with_llm(test_case, use_gemini=True)
        enhanced_data = json.loads(enhanced_result)
        print(f"Traditional Emotion: {enhanced_data.get('Primary_Emotion')}")
        if 'advanced_analysis' in enhanced_data:
            llm_emotion = enhanced_data['advanced_analysis'].get('llm_primary_emotion')
            complexity = enhanced_data['advanced_analysis'].get('emotional_complexity')
            print(f"LLM Emotion: {llm_emotion} | Complexity: {complexity}")

def setup_instructions():
    """Print setup instructions for Gemini API"""
    print("""
    🚀 SETUP INSTRUCTIONS FOR GEMINI LLM INTEGRATION:
    
    1. Get Gemini API Key:
       - Go to https://makersuite.google.com/app/apikey
       - Create a new API key
    
    2. Install required packages:
       pip install google-generativeai nltk
    
    3. Set up API key (choose one method):
       Method A - Environment Variable:
       export GEMINI_API_KEY="your_api_key_here"
       
       Method B - Direct initialization:
       analyzer = AdvancedSentimentAnalyzer(gemini_api_key="your_api_key_here")
    
    4. Usage Examples:
       # Traditional analysis only
       result = analyzer.process_json_input(json_string)
       
       # Enhanced analysis with Gemini LLM
       result = analyzer.process_json_input_with_llm(json_string, use_gemini=True)
    
    📈 ENHANCED FEATURES WITH GEMINI:
    - Contextual tone analysis (Professional, Casual, etc.)
    - Psychological insights and stress indicators
    - Business context understanding
    - Detailed reasoning and explanations
    - Advanced actionable recommendations
    - Consensus scoring between traditional and LLM analysis
    - Key phrase extraction
    - Follow-up suggestions
    """)

if __name__ == "__main__":
    setup_instructions()
    print("\n" + "="*60)
    main()