In [None]:
import io
import threading
import time
import base64
import cv2
import numpy as np
import tensorflow as tf
import sounddevice as sd
import librosa
import librosa.display
import matplotlib.pyplot as plt
import matplotlib
import ipywidgets as widgets
from IPython.display import display, HTML
from transformers import pipeline

# 1. FIX: Patch Keras to handle the 'batch_shape' error
# This must be defined BEFORE loading the model
from tensorflow.keras.layers import InputLayer

class PatchedInputLayer(InputLayer):
    """Custom InputLayer to ignore Keras 3 batch_shape errors."""
    def __init__(self, *args, **kwargs):
        if 'batch_shape' in kwargs:
            kwargs['batch_input_shape'] = kwargs.pop('batch_shape')
        super().__init__(*args, **kwargs)

# Ensure Matplotlib uses a non-GUI backend for thread safety
matplotlib.use('Agg')

: 

In [None]:
# ==========================================
# 2. Load Models
# ==========================================
print("Loading Models (Face, Audio, Text)...")

# Text Model
text_classifier = pipeline("text-classification", model="michellejieli/emotion_text_classifier")

# Audio Model
audio_classifier = pipeline("audio-classification", model="firdhokk/speech-emotion-recognition-with-openai-whisper-large-v3")

# Face Model Config
face_class_names = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']

class FacePredictor:
    def __init__(self, model_path="best_cnn_aug.h5"):
        try:
            # FIX: Use custom_objects to inject our PatchedInputLayer
            self.model = tf.keras.models.load_model(
                model_path,
                custom_objects={'InputLayer': PatchedInputLayer},
                compile=False
            )
            self.loaded = True
            print("‚úÖ Face Model Loaded with Compatibility Patch.")
        except Exception as e:
            print(f"‚ùå Face Model Loading Failed: {e}")
            self.loaded = False

        self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")

    def predict(self, frame_rgb):
        if not self.loaded: return frame_rgb, "N/A", 0
        gray = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2GRAY)
        faces = self.face_cascade.detectMultiScale(gray, 1.1, 5, minSize=(30,30))

        label, conf = "None", 0
        annotated = frame_rgb.copy()

        for (x, y, w, h) in faces[:1]:
            face_roi = cv2.resize(gray[y:y+h, x:x+w], (48, 48))
            inp = face_roi.astype("float32") / 255.0
            inp = inp.reshape(1, 48, 48, 1)

            # Run prediction
            probs = self.model(inp, training=False).numpy()[0]
            idx = np.argmax(probs)
            label, conf = face_class_names[idx], probs[idx]

            cv2.rectangle(annotated, (x,y), (x+w,y+h), (0,255,0), 2)
            cv2.putText(annotated, f"{label} ({conf:.2f})", (x, y-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)
        return annotated, label, conf

In [None]:
import os
import google.generativeai as genai

# read GEMINI_API_KEY from ../.env
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

try:
    genai.configure(api_key=GEMINI_API_KEY)
    gemini_model = genai.GenerativeModel('gemini-2.5-flash')
    print("‚úÖ Gemini API configured successfully")
except Exception as e:
    print(f"‚ö†Ô∏è Gemini API configuration failed: {e}")
    gemini_model = None


In [None]:
for m in genai.list_models():
        if 'generateContent' in m.supported_generation_methods:
            print(f" - {m.name}")

In [None]:
# ==========================================
# 3. Unified Multimodal Logic
# ==========================================
class EmotionIntelligenceApp:
    def __init__(self):
        self.fs = 16000
        self.is_running = False
        self.face_predictor = FacePredictor()

        # State storage
        self.cur_face = "N/A"
        self.cur_audio = "N/A"
        self.audio_chunks = []
        self.chat_history = []
        # UI Elements
        self.btn_toggle = widgets.Button(description='‚ñ∂Ô∏è Start System', button_style='success', layout={'width': '315px'})

        self.status = widgets.HTML("<b>Status:</b> Ready")

        self.video_out = widgets.Image(format='jpeg', width=640)
        self.audio_out = widgets.Image(format='png', width=640)

        self.text_in = widgets.Textarea(placeholder='Type your thoughts here...', layout={'width': '635px', 'height': '70px'})

        self.btn_report = widgets.Button(description='üìä Get Multimodal Report', button_style='info', layout={'width': '315px'})
        self.btn_chat = widgets.Button(description='üó£Ô∏è Chat with AI', button_style='info', layout={'width': '315px'})
        self.report_out = widgets.Output()

        # Connections
        self.btn_toggle.on_click(self.toggle_process)
        self.btn_report.on_click(self.generate_report)
        self.btn_chat.on_click(self.chat_with_ai)

    def toggle_process(self, _):
        if not self.is_running:
            self.is_running = True
            self.btn_toggle.description = '‚èπÔ∏è Stop System'
            self.btn_toggle.button_style = 'danger'
            self.audio_chunks = []
            self.status.value = "<b style='color:red;'>üî¥ LIVE MONITORING</b>"
            threading.Thread(target=self._video_thread, daemon=True).start()
            threading.Thread(target=self._audio_thread, daemon=True).start()
        else:
            self.is_running = False
            self.btn_toggle.description = '‚ñ∂Ô∏è Start System'
            self.btn_toggle.button_style = 'success'
            self.status.value = "<b>Status:</b> System Stopped"

    def _video_thread(self):
        cap = cv2.VideoCapture(0)
        while self.is_running:
            ret, frame = cap.read()
            if not ret: break

            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            annotated, label, conf = self.face_predictor.predict(rgb)
            self.cur_face = label

            _, jpeg = cv2.imencode('.jpg', cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR))
            self.video_out.value = jpeg.tobytes()
            time.sleep(0.03)
        cap.release()

    def _audio_thread(self):
        VOLUME_THRESHOLD = 0.005

        with sd.InputStream(samplerate=self.fs, channels=1) as stream:
            while self.is_running:
                chunk, _ = stream.read(int(self.fs * 0.5))
                self.audio_chunks.append(chunk)

                # Plot/Analyze last bits
                all_y = np.concatenate(self.audio_chunks, axis=0).flatten()
                recent_y = all_y[-(self.fs * 3):] # Analysis window

                # Calculate the Root Mean Square (average volume)
                rms = np.sqrt(np.mean(recent_y**2))
                if rms < VOLUME_THRESHOLD:
                    # Audio is not audible (silence)
                    self.cur_audio = "neutral"
                    # Still update the waveform so the user sees it's moving
                    self._draw_wave(all_y[-(self.fs * 10):], "neutral (silence)")
                else:
                    try:
                        res = audio_classifier(recent_y)
                        self.cur_audio = res[0]['label']
                        self._draw_wave(all_y[-(self.fs * 10):], self.cur_audio) # View window
                    except: pass

    def _draw_wave(self, y, label):
        # Normalize for visibility
        peak = np.max(np.abs(y))
        y_scaled = y / peak * 0.8 if peak > 0.01 else y

        fig, ax = plt.subplots(figsize=(10, 2))
        librosa.display.waveshow(y_scaled, sr=self.fs, ax=ax, color='#3498db', alpha=0.6)
        ax.set_ylim(-1, 1)
        ax.axis('off')
        ax.text(0.5, 0.5, f"AUDIO: {label.upper()}", transform=ax.transAxes, color='red',
                fontsize=30, fontweight='bold', alpha=0.1, ha='center', va='center')

        buf = io.BytesIO()
        plt.savefig(buf, format='png', transparent=False)
        plt.close(fig)
        buf.seek(0)
        self.audio_out.value = buf.read()

    def generate_report(self, _):
        with self.report_out:
            self.report_out.clear_output()
            txt = self.text_in.value.strip()
            text_emo = "N/A"
            if txt:
                t_res = text_classifier(txt)
                text_emo = t_res[0]['label'].upper()

            display(HTML(f"""
                <div style="border-left: 5px solid #3498db; background: #f9f9f9; padding: 15px; border-radius: 5px; width: 620px;">
                    <h3 style="margin-top:0;">Multimodal Emotion Report</h3>
                    <b>üì∑ Face:</b> {self.cur_face.upper()}<br>
                    <b>üéôÔ∏è Voice:</b> {self.cur_audio.upper()}<br>
                    <b>üí¨ Text:</b> {text_emo}
                    <hr>
                    <small>Timestamp: {time.ctime()}</small>
                </div>
            """))
    def chat_with_ai(self, _):
        """Chat with Gemini AI with emotion context."""
        with self.report_out:
            self.report_out.clear_output()

            # Get user input
            user_text = self.text_in.value.strip()
            if not user_text:
                display(HTML("<p style='color: red;'>Please enter some text to chat with AI.</p>"))
                return

            # Check if Gemini is configured
            if gemini_model is None:
                display(HTML("<p style='color: red;'>‚ùå Gemini API not configured. Please set your API key.</p>"))
                return

            # Get current text emotion
            text_emotion = "N/A"
            try:
                t_res = text_classifier(user_text)
                text_emotion = t_res[0]['label']
            except:
                pass

            # Build emotion context
            emotion_context = f"""
                ## Current User Emotional State:
                - Facial Expression: {self.cur_face}
                - Voice Tone: {self.cur_audio}
                - Text Sentiment: {text_emotion}
                """

            # Create system prompt with emotion awareness
            system_prompt = """You are an empathetic AI assistant with access to multimodal emotion detection.
                You can see the user's emotional state through their facial expressions, voice tone, and text sentiment.

                Your role is to:
                1. Acknowledge and validate the user's emotions
                2. Provide supportive and appropriate responses based on their emotional state
                3. Offer helpful suggestions or coping strategies when appropriate
                4. Maintain a warm, understanding, and non-judgmental tone

                When the user appears distressed (sad, angry, fearful), be extra compassionate and supportive.
                When they appear happy or neutral, be engaging and positive.
                Always consider the emotional context in your responses."""

            # Combine system prompt, emotion context, and user message
            full_prompt = f"""{system_prompt}

                {emotion_context}

                User message: {user_text}

                Please respond in a supportive and contextually appropriate way, taking into account their current emotional state."""

            # Display user message
            display(HTML(f"""
                <div style="background: #e3f2fd; padding: 12px; border-radius: 8px; margin-bottom: 10px; width: 620px;">
                    <b>üßë You:</b> {user_text}
                    <div style="font-size: 0.85em; color: #666; margin-top: 5px;">
                        üòä Face: {self.cur_face} | üé§ Voice: {self.cur_audio} | üí≠ Text: {text_emotion}
                    </div>
                </div>
            """))

            # Show loading indicator
            display(HTML("<p>ü§ñ AI is thinking...</p>"))

            try:
                # Call Gemini API
                response = gemini_model.generate_content(full_prompt)
                ai_response = response.text

                # Store in chat history
                self.chat_history.append({
                    'user': user_text,
                    'ai': ai_response,
                    'emotions': {
                        'face': self.cur_face,
                        'audio': self.cur_audio,
                        'text': text_emotion
                    },
                    'timestamp': time.ctime()
                })

                # Clear and display response
                self.report_out.clear_output()

                # Display user message again
                display(HTML(f"""
                    <div style="background: #e3f2fd; padding: 12px; border-radius: 8px; margin-bottom: 10px; width: 620px;">
                        <b>üßë You:</b> {user_text}
                        <div style="font-size: 0.85em; color: #666; margin-top: 5px;">
                            üòä Face: {self.cur_face} | üé§ Voice: {self.cur_audio} | üí≠ Text: {text_emotion}
                        </div>
                    </div>
                """))

                # Display AI response
                display(HTML(f"""
                    <div style="background: #f1f8e9; padding: 12px; border-radius: 8px; margin-bottom: 10px; width: 620px; border-left: 4px solid #8bc34a;">
                        <b>ü§ñ AI Assistant:</b><br>
                        <div style="margin-top: 8px; line-height: 1.6;">{ai_response.replace(chr(10), '<br>')}</div>
                    </div>
                """))

                # Clear text input for next message
                self.text_in.value = ""

            except Exception as e:
                self.report_out.clear_output()
                display(HTML(f"""
                    <div style="background: #ffebee; padding: 12px; border-radius: 8px; width: 620px; border-left: 4px solid #f44336;">
                        <b>‚ùå Error:</b> {str(e)}<br>
                        <small>Please check your API key and internet connection.</small>
                    </div>
                """))

# ==========================================
# 4. Display Application
# ==========================================
app = EmotionIntelligenceApp()

title = HTML("<h2 style='text-align:center;'>Multimodal Emotion Intelligence</h2>")
ui_layout = widgets.VBox([
    widgets.HBox([app.btn_toggle], layout={'justify_content': 'center'}),
    app.status,
    app.video_out,
    app.audio_out,
    widgets.HTML("<b>Analyze Text Sentiment:</b>"),
    app.text_in,
    widgets.HBox([app.btn_chat, app.btn_report], layout={'justify_content': 'center'}),
    app.report_out
], layout={'align_items': 'center', 'width': '100%'})

display(title, ui_layout)