## Final Project

Jupyter Notebook version of Chat Bot with emotional detection

#### How to run the notebook
    1. Create a new virtual environment and install all the packages below
    2. Select the environment as kernel of Jupyter notebook
    3. Setup the api keys in .env file or just paste in notebook
    3. Run all the cells
    
#### input 
An audio file with human vocal

#### output
Emotion analyzation result
Text transformed from voice
Audio and text reply for voice 

### Import Packages

In [14]:
from __future__ import annotations

# System and environment
import os
import io
import shutil
import subprocess
import tempfile
from datetime import datetime

# Third-party libraries
from dotenv import load_dotenv
from pydantic import BaseModel
from typing import Optional, List, Dict
from enum import Enum
import librosa
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset, random_split,Dataset
import soundfile as sf
import openai
import requests
import nest_asyncio

# Jupyter environment
from IPython.display import Audio, display

### API Setup 

In [2]:
load_dotenv('.env.local') # load the environment variables from the .env.local file

if not os.getenv('OPENAI_API_KEY'):
    os.environ['OPENAI_API_KEY'] = 'your_openai_api_key_here'
if not os.getenv('ELEVENLABS_API_KEY'):
    os.environ['ELEVENLABS_API_KEY'] = 'your_elevenlabs_api_key_here'

print("OPENAI_API_KEY:", os.getenv('OPENAI_API_KEY'))
print("ELEVENLABS_API_KEY:", os.getenv('ELEVENLABS_API_KEY'))

OPENAI_API_KEY: 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
ELEVENLABS_API_KEY: sk_cb2d91c7cbbe71a9a305e37eafdb0356774b5a9d3a0f374f


### Type Definition 

In [3]:
class EmotionType(str, Enum):
    HAPPY = "happy"
    SAD = "sad"
    ANGRY = "angry"
    FEAR = "fear"
    SURPRISE = "surprise"
    DISGUST = "disgust"
    NEUTRAL = "neutral"
    EXCITED = "excited"
    CALM = "calm"

class ChatMessage(BaseModel):
    text: str
    emotion: Optional[EmotionType] = None
    confidence: Optional[float] = None
    timestamp: Optional[str] = None

class EmotionResponse(BaseModel):
    emotion: EmotionType
    confidence: float
    features: Optional[dict] = None

class ChatResponse(BaseModel):
    message: str
    emotion_adapted: bool
    suggested_emotion: Optional[EmotionType] = None
    confidence: float

class VoiceResponse(BaseModel):
    audio_data: bytes
    text: str
    duration: Optional[float] = None

class ConversationSession(BaseModel):
    session_id: str
    messages: List[ChatMessage] = []
    current_emotion: Optional[EmotionType] = None
    emotion_history: List[EmotionResponse] = []
    created_at: str = datetime.now().isoformat()
    updated_at: str = datetime.now().isoformat()

class SystemStatus(BaseModel):
    emotion_service: bool
    chat_service: bool
    voice_service: bool
    model_loaded: bool
    memory_usage: Optional[float] = None




### Emotion Service

In [None]:
class EmotionCNN(nn.Module):
    def __init__(self, num_classes: int = 8):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.5)
        self.adaptive_pool = nn.AdaptiveAvgPool2d((8, 8))
        self.fc1 = nn.Linear(128 * 8 * 8, 512)
        self.fc2 = nn.Linear(512, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        if len(x.shape) == 3:
            x = x.unsqueeze(1)
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.dropout(self.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

class EmotionDataset(Dataset):
    def __init__(self, mel_specs, labels):
        self.mel_specs = mel_specs
        self.labels = labels

    def __len__(self):
        return len(self.mel_specs)

    def __getitem__(self, idx):
        return self.mel_specs[idx], self.labels[idx]


class EmotionService:
    def __init__(self):
        self.model: Optional[EmotionCNN] = None
        self.scaler = None
        self.emotion_labels = [
            EmotionType.NEUTRAL,
            EmotionType.CALM,
            EmotionType.HAPPY,
            EmotionType.SAD,
            EmotionType.ANGRY,
            EmotionType.FEAR,
            EmotionType.DISGUST,
            EmotionType.SURPRISE,
        ]
        self.sample_rate = 22050
        self.duration = 3
        self.ffmpeg_path = shutil.which('ffmpeg') or '/opt/homebrew/bin/ffmpeg'
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self._load_model()

    def _load_data(self,data_dir='data',batch_size=32) :
        files=os.listdir(data_dir)
        mel_specs=[]
        labels=[]
        for file in files:
          
            parts=file.split('-')
            emotion_id = int(parts[2]) - 1
            audio_path=os.path.join(data_dir,file)
            audio,sr=sf.read(audio_path)
            if len(audio)>self.sample_rate*self.duration:
                audio=audio[:self.sample_rate*self.duration]
            else:
                audio=np.pad(audio, (0, self.sample_rate*self.duration - len(audio)))

            mel_spec = librosa.feature.melspectrogram(y=audio, sr=self.sample_rate, n_mels=128)
            mel_db = librosa.power_to_db(mel_spec, ref=np.max)

            if mel_db.shape[1] < 128:
                mel_db = np.pad(mel_db, ((0, 0), (0, 128 - mel_db.shape[1])), mode='constant')
            elif mel_db.shape[1] > 128:
                mel_db = mel_db[:, :128]
            if( len(mel_db.shape)>2):
                print(file)
            mel_db = (mel_db - mel_db.mean()) / (mel_db.std() + 1e-6)
            if(len(mel_db.shape)==2):
                mel_specs.append(mel_db)
                labels.append(emotion_id)

        mel_specs = torch.tensor(mel_specs).unsqueeze(1).float()  # (N, 1, n_mels, T)
        labels = torch.tensor(labels, dtype=torch.long)

    # Split training and testing data
        dataset = EmotionDataset(mel_specs, labels)
        print(len(dataset))
        train_size = int(0.8 * len(dataset))
        test_size = len(dataset) - train_size
        train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        return train_loader, test_loader


    def _train_model(self,train_loader, test_loader, num_epochs=100, lr=0.01,device='cuda') -> None:
        self.model.to(device)
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
        self.model.train()

        for epoch in range(num_epochs):
            train_loss = 0.0
            for inputs, labels in train_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

            print(f"Epoch {epoch+1}, Train Loss: {train_loss/len(train_loader):.4f}")
        
        self.model.eval()
        test_loss = 0.0
        correct = 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = self.model(inputs)
                test_loss += criterion(outputs, labels).item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
        test_loss /= len(test_loader)
        accuracy = 100 * correct / len(test_loader.dataset)
        print(f"Test Loss: {test_loss:.4f}, Accuracy: {accuracy:.2f}%")
        torch.save(self.model.state_dict(), 'backend/app/models/emotion_model.pth')
        return test_loss, accuracy

    def _load_model(self) -> None:
        try:
            self.model = EmotionCNN(num_classes=len(self.emotion_labels))
            self.model.load_state_dict(torch.load('backend/app/models/emotion_model.pth'))
            self.model.eval()
        except Exception as e:
            print(f"Model loading failed: {e}")
            self.model = None

    def _convert_audio_to_wav(self, audio_data: bytes) -> bytes:
        try:
            with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as input_file:
                input_file.write(audio_data)
                input_path = input_file.name
            with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as output_file:
                output_path = output_file.name
            cmd = [
                self.ffmpeg_path, "-y",
                "-i", input_path,
                "-acodec", "pcm_s16le",
                "-ar", str(self.sample_rate),
                "-ac", "1",
                "-af", "highpass=f=200,lowpass=f=3000,volume=1.5",
                output_path,
            ]
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
                with open(output_path, "rb") as f:
                    converted_audio = f.read()
                os.unlink(input_path)
                os.unlink(output_path)
                return converted_audio
            else:
                os.unlink(input_path)
                os.unlink(output_path)
                return audio_data
        except Exception as e:
            print(f"Audio conversion failed: {e}")
            return audio_data

    def _extract_features(self, audio_data: bytes) -> np.ndarray:
        try:
            wav_audio = self._convert_audio_to_wav(audio_data)
            audio_array, sr = sf.read(io.BytesIO(wav_audio))
            if sr != self.sample_rate:
                audio_array = librosa.resample(audio_array, orig_sr=sr, target_sr=self.sample_rate)
            target_length = self.sample_rate * self.duration
            if len(audio_array) > target_length:
                audio_array = audio_array[:target_length]
            else:
                audio_array = np.pad(audio_array, (0, target_length - len(audio_array)))
            mfccs = librosa.feature.mfcc(y=audio_array, sr=self.sample_rate, n_mfcc=13)
            spectral_centroids = librosa.feature.spectral_centroid(y=audio_array, sr=self.sample_rate)[0]
            spectral_rolloff = librosa.feature.spectral_rolloff(y=audio_array, sr=self.sample_rate)[0]
            zero_crossing_rate = librosa.feature.zero_crossing_rate(audio_array)[0]
            features: List[float] = []
            for feature in [mfccs, spectral_centroids, spectral_rolloff, zero_crossing_rate]:
                features.extend([
                    float(np.mean(feature)),
                    float(np.std(feature)),
                    float(np.min(feature)),
                    float(np.max(feature)),
                ])
            return np.array(features)
        except Exception as e:
            print(f"Feature extraction failed: {e}")
            return np.zeros(60)

    def _extract_mel_spectrogram(self, audio_data: bytes) -> torch.Tensor:
        try:
            wav_audio = self._convert_audio_to_wav(audio_data)
            try:
                audio_array, sr = sf.read(io.BytesIO(wav_audio))
            except Exception:
                with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file:
                    temp_file.write(wav_audio)
                    temp_file_path = temp_file.name
                try:
                    audio_array, sr = sf.read(temp_file_path)
                    os.unlink(temp_file_path)
                except Exception:
                    return torch.zeros(1, 1, 128, 128)
            if len(audio_array.shape) > 1:
                audio_array = np.mean(audio_array, axis=1)
            if sr != self.sample_rate:
                audio_array = librosa.resample(audio_array, orig_sr=sr, target_sr=self.sample_rate)
            min_length = self.sample_rate * 1
            if len(audio_array) < min_length:
                audio_array = np.pad(audio_array, (0, min_length - len(audio_array)))
            mel_spec = librosa.feature.melspectrogram(
                y=audio_array,
                sr=self.sample_rate,
                n_mels=128,
                n_fft=2048,
                hop_length=512,
            )
            mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
            if mel_spec_db.std() > 0:
                mel_spec_db = (mel_spec_db - mel_spec_db.mean()) / mel_spec_db.std()
            if mel_spec_db.shape[1] < 128:
                mel_spec_db = np.pad(mel_spec_db, ((0, 0), (0, 128 - mel_spec_db.shape[1])))
            elif mel_spec_db.shape[1] > 128:
                mel_spec_db = mel_spec_db[:, :128]
            mel_tensor = torch.FloatTensor(mel_spec_db).unsqueeze(0).unsqueeze(0)
            return mel_tensor
        except Exception as e:
            print(f"Mel spectrogram extraction failed: {e}")
            return torch.zeros(1, 1, 128, 128)

    def _rule_based_emotion_detection(self, features: np.ndarray) -> EmotionResponse:
        import random
        emotion = random.choice(list(EmotionType))
        confidence = float(np.clip(np.random.uniform(0.6, 0.9), 0.0, 1.0))
        return EmotionResponse(
            emotion=emotion,
            confidence=confidence,
            features={"method": "rule_based"},
        )

    async def analyze_emotion(self, audio_data: bytes) -> EmotionResponse:
        try:
            if self.model is not None:
                mel_tensor = self._extract_mel_spectrogram(audio_data)
                with torch.no_grad():
                    outputs = self.model(mel_tensor)
                    probabilities = torch.softmax(outputs, dim=1)
                    predicted_idx = int(torch.argmax(probabilities, dim=1).item())
                    confidence = float(probabilities[0][predicted_idx].item())
                emotion = self.emotion_labels[predicted_idx]
                return EmotionResponse(
                    emotion=emotion,
                    confidence=confidence,
                    features={"method": "deep_learning"},
                )
            else:
                features = self._extract_features(audio_data)
                return self._rule_based_emotion_detection(features)
        except Exception as e:
            print(f"Emotion analysis failed: {e}")
            return EmotionResponse(
                emotion=EmotionType.NEUTRAL,
                confidence=0.5,
                features={"method": "fallback", "error": str(e)},
            )

### Chat Service

In [5]:
class ChatService:
    def __init__(self):
        self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
        self.conversation_history: List[ChatMessage] = []
        self.max_history = 10
        self.emotion_prompts: Dict[EmotionType, str] = {
            EmotionType.HAPPY: "The user is in a good mood now. Please respond with a positive and cheerful tone, and you can share some interesting thoughts or suggestions.",
            EmotionType.SAD: "The user is feeling down now. Please respond with a warm and comforting tone, providing emotional support and encouragement.",
            EmotionType.ANGRY: "The user is emotionally agitated now. Please respond with a calm and understanding tone, helping the user to calm down.",
            EmotionType.FEAR: "The user is feeling scared or anxious now. Please respond with a safe and reassuring tone, providing a sense of security.",
            EmotionType.SURPRISE: "The user is feeling surprised now. Please respond with an equally surprised but positive tone, sharing this excitement.",
            EmotionType.DISGUST: "The user is feeling disgusted now. Please respond with an understanding and sympathetic tone, avoiding aggravating negative emotions.",
            EmotionType.NEUTRAL: "The user is emotionally calm now. Please respond with a natural and friendly tone, maintaining the flow of conversation.",
            EmotionType.EXCITED: "The user is very excited now. Please respond with an equally excited and enthusiastic tone, sharing this positive emotion.",
        }

    def _get_emotion_description(self, emotion: EmotionType) -> str:
        descriptions = {
            EmotionType.HAPPY: "happy",
            EmotionType.SAD: "sad",
            EmotionType.ANGRY: "angry",
            EmotionType.FEAR: "fearful",
            EmotionType.SURPRISE: "surprised",
            EmotionType.DISGUST: "disgusted",
            EmotionType.NEUTRAL: "calm",
            EmotionType.EXCITED: "excited",
        }
        return descriptions.get(emotion, "calm")

    def _get_emotion_context(self, emotion: EmotionType, confidence: float) -> str:
        base_prompt = self.emotion_prompts.get(emotion, "")
        confidence_level = "very" if confidence > 0.8 else "quite" if confidence > 0.6 else "slightly"
        return f"Detected that the user is {confidence_level} {self._get_emotion_description(emotion)}. {base_prompt}"

    def _build_system_prompt(self, emotion: EmotionType, confidence: float) -> str:
        emotion_context = self._get_emotion_context(emotion, confidence)
        return (
            """You are an emotionally intelligent AI assistant, specifically designed to provide emotional support and meaningful conversations.

{emotion_context}

Please follow these principles:
1. Adjust your response style and tone based on the user's emotional state
2. Provide sincere and empathetic responses
3. Avoid overly formal or mechanical language
4. Offer emotional support and encouragement when appropriate
5. Maintain naturalness and coherence in conversation
6. Respond in English, unless the user uses another language

Remember: Your goal is to be an understanding and supportive friend, not just an information provider."""
        ).format(emotion_context=emotion_context)

    def _build_conversation_context(self) -> str:
        if not self.conversation_history:
            return ""
        context_lines: List[str] = ["Recent conversation history:"]
        for i, msg in enumerate(self.conversation_history[-5:], 1):
            context_lines.append(f"{i}. User: {msg.text}")
            if msg.emotion:
                context_lines.append(
                    f"   Emotion: {self._get_emotion_description(msg.emotion)} (confidence: {msg.confidence:.2f})"
                )
        return "\n".join(context_lines)

    def _add_to_history(self, text: str, emotion: Optional[EmotionType] = None, confidence: Optional[float] = None) -> None:
        message = ChatMessage(
            text=text,
            emotion=emotion,
            confidence=confidence,
            timestamp=datetime.now().isoformat(),
        )
        self.conversation_history.append(message)
        if len(self.conversation_history) > self.max_history:
            self.conversation_history = self.conversation_history[-self.max_history:]

    async def generate_response(self, user_text: str, emotion: EmotionType, confidence: float) -> ChatResponse:
        try:
            self._add_to_history(user_text, emotion, confidence)
            system_prompt = self._build_system_prompt(emotion, confidence)
            conversation_context = self._build_conversation_context()
            response = self.client.chat.completions.create(
                model="gpt-4",
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": f"{conversation_context}\n\nUser: {user_text}"},
                ],
                max_tokens=200,
                temperature=0.7,
                presence_penalty=0.1,
                frequency_penalty=0.1,
            )
            assistant_message = response.choices[0].message.content.strip()
            if "I'm unable to provide the help" in assistant_message:
                raise Exception("Safety check failed")
            self._add_to_history(assistant_message)
            return ChatResponse(
                message=assistant_message,
                emotion_adapted=True,
                suggested_emotion=emotion,
                confidence=confidence,
            )
        except Exception as e:
            print(f"Failed to generate response: {e}")
            # fallback
            fallback_response = self._generate_fallback_response(user_text, emotion)
            return ChatResponse(
                message=fallback_response,
                emotion_adapted=False,
                confidence=0.5,
            )

    def _generate_fallback_response(self, user_text: str, emotion: EmotionType) -> str:
        import random
        fallback_responses: Dict[EmotionType, List[str]] = {
            EmotionType.HAPPY: [
                "It sounds like you're in a good mood! Is there anything happy you'd like to share?",
                "I'm glad to see you so happy! Keep up this good mood!",
                "Your good mood is contagious! What interesting things are happening?",
            ],
            EmotionType.SAD: [
                "I sense you might be feeling a bit down. Would you like to talk? I'm here to listen.",
                "Everyone has low moments, and that's completely normal. Would you like to share with me?",
                "I understand how you're feeling right now. If you need anything, I'm always here to support you.",
            ],
            EmotionType.ANGRY: [
                "I sense you're a bit angry. Take a deep breath and tell me slowly, okay?",
                "Anger is a normal emotion, but we can work together to calm down.",
                "I understand your feelings. Let's find a solution together.",
            ],
            EmotionType.FEAR: [
                "I sense you're a bit scared. It's okay, I'm here with you.",
                "Fear is a natural response. Would you like to tell me what happened?",
                "I'll always be here to support you. You're not alone.",
            ],
            EmotionType.SURPRISE: [
                "Wow! That sounds really surprising! Can you tell me what happened?",
                "That's really unexpected! Your reaction is adorable.",
                "I didn't expect something like this to happen!",
            ],
            EmotionType.DISGUST: [
                "I understand your feelings. Some things are indeed uncomfortable.",
                "Your reaction is normal. When we encounter things we don't like, this is how we feel.",
                "I understand your thoughts. Everyone has their own preferences.",
            ],
            EmotionType.NEUTRAL: [
                "I'm here to listen. What would you like to talk about?",
                "Okay, I understand. Is there anything else you'd like to say?",
                "Hmm, I understand what you mean.",
            ],
            EmotionType.EXCITED: [
                "Wow! You seem really excited! What good things happened?",
                "Your excitement is contagious! Can you share with me?",
                "That's amazing! Your enthusiasm makes me happy too!",
            ],
        }
        responses = fallback_responses.get(emotion, ["I understand your feelings."])
        return random.choice(responses)

### Voice Service

In [6]:
class VoiceService:
    def __init__(self):
        self.openai_client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
        self.elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY")
        self.elevenlabs_base_url = "https://api.elevenlabs.io/v1"
        self.ffmpeg_path = shutil.which('ffmpeg') or '/opt/homebrew/bin/ffmpeg'
        self.voice_settings: Dict[str, float] = {
            "stability": 0.5,
            "similarity_boost": 0.75,
            "style": 0.0,
            "use_speaker_boost": True,
        }
        self.default_voice_id = "ErXwobaYiN019PkySvjV"
        self.emotion_voice_settings: Dict[str, Dict[str, float]] = {
            "happy": {"stability": 0.6, "similarity_boost": 0.8, "style": 0.3},
            "sad": {"stability": 0.7, "similarity_boost": 0.6, "style": -0.2},
            "angry": {"stability": 0.4, "similarity_boost": 0.9, "style": 0.5},
            "fear": {"stability": 0.8, "similarity_boost": 0.5, "style": -0.3},
            "surprise": {"stability": 0.5, "similarity_boost": 0.8, "style": 0.4},
            "disgust": {"stability": 0.6, "similarity_boost": 0.7, "style": -0.1},
            "neutral": {"stability": 0.5, "similarity_boost": 0.75, "style": 0.0},
            "excited": {"stability": 0.4, "similarity_boost": 0.8, "style": 0.4},
        }

    def _convert_audio_format(self, audio_data: bytes, target_format: str = "wav") -> bytes:
        try:
            with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as input_file:
                input_file.write(audio_data)
                input_path = input_file.name
            with tempfile.NamedTemporaryFile(suffix=f".{target_format}", delete=False) as output_file:
                output_path = output_file.name
            cmd = [
                self.ffmpeg_path, "-y",
                "-i", input_path,
                "-acodec", "pcm_s16le",
                "-ar", "16000",
                "-ac", "1",
                "-af", "highpass=f=200,lowpass=f=3000,volume=1.5",
                output_path,
            ]
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
                with open(output_path, "rb") as f:
                    converted_audio = f.read()
                os.unlink(input_path)
                os.unlink(output_path)
                return converted_audio
            else:
                os.unlink(input_path)
                os.unlink(output_path)
                return audio_data
        except Exception as e:
            print(f"Audio conversion failed: {e}")
            return audio_data

    async def speech_to_text(self, audio_data: bytes) -> str:
        try:
            wav_audio = self._convert_audio_format(audio_data, "wav")
            with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file:
                temp_file.write(wav_audio)
                temp_file_path = temp_file.name
            try:
                with open(temp_file_path, "rb") as audio_file:
                    transcript = self.openai_client.audio.transcriptions.create(
                        model="whisper-1",
                        file=audio_file,
                       # language="zh",
                        response_format="text",
                        temperature=0.0,
                        prompt="This is a conversation in Chinese and English.",
                    )
            finally:
                try:
                    os.unlink(temp_file_path)
                except Exception:
                    pass
            return transcript.strip()
        except Exception as e:
            print(f"Speech to text failed: {e}")
            return "Sorry, I didn't catch that. Could you please repeat?"

    async def text_to_speech(self, text: str, emotion: Optional[str] = None) -> bytes:
        try:
            if not self.elevenlabs_api_key:
                return self._fallback_tts(text)
            voice_settings = dict(self.voice_settings)
            print(emotion)
            if emotion and emotion in self.emotion_voice_settings:
                voice_settings.update(self.emotion_voice_settings[emotion])
            url = f"{self.elevenlabs_base_url}/text-to-speech/{self.default_voice_id}"
            print(voice_settings)
            headers = {
                "Accept": "audio/mpeg",
                "Content-Type": "application/json",
                "xi-api-key": self.elevenlabs_api_key,
            }
            data = {
                "text": text,
                "model_id": "eleven_multilingual_v2",
                "voice_settings": voice_settings,
            }
            resp = requests.post(url, json=data, headers=headers)
            print(resp)
            if resp.status_code == 200:
                return resp.content
            else:
                print(f"ElevenLabs API error: {resp.status_code}")
                return self._fallback_tts(text)
        except Exception as e:
            print(f"Text to speech failed: {e}")
            return self._fallback_tts(text)

    def _fallback_tts(self, text: str) -> bytes:
        try:
            sample_rate = 22050
            duration = 1.0
            samples = int(sample_rate * duration)
            t = np.linspace(0, duration, samples, False)
            audio_data = np.sin(2 * np.pi * 440 * t) * 0.1
            with io.BytesIO() as audio_buffer:
                sf.write(audio_buffer, audio_data, sample_rate, format='WAV')
                return audio_buffer.getvalue()
        except Exception as e:
            print(f"Fallback TTS failed: {e}")
            return b""

    def get_available_voices(self) -> list:
        try:
            if not self.elevenlabs_api_key:
                return []
            url = f"{self.elevenlabs_base_url}/voices"
            headers = {"xi-api-key": self.elevenlabs_api_key}
            resp = requests.get(url, headers=headers)
            if resp.status_code == 200:
                voices = resp.json().get("voices", [])
                return [
                    {"id": v.get("voice_id"), "name": v.get("name"), "language": v.get("labels", {}).get("language", "unknown")}
                    for v in voices
                ]
            return []
        except Exception as e:
            print(f"Failed to get voice list: {e}")
            return []

    def set_voice(self, voice_id: str) -> None:
        self.default_voice_id = voice_id

    def set_voice_settings(self, settings: dict) -> None:
        self.voice_settings.update(settings)

    def get_audio_duration(self, audio_data: bytes) -> float:
        try:
            with io.BytesIO(audio_data) as audio_buffer:
                audio_array, sample_rate = sf.read(audio_buffer)
                return float(len(audio_array)) / float(sample_rate)
        except Exception as e:
            print(f"Failed to get audio duration: {e}")
            return 0.0

    def convert_audio_format(self, audio_data: bytes, target_format: str = "wav") -> bytes:
        try:
            with io.BytesIO(audio_data) as input_buffer:
                audio_array, sample_rate = sf.read(input_buffer)
                with io.BytesIO() as output_buffer:
                    sf.write(output_buffer, audio_array, sample_rate, format=target_format.upper())
                    return output_buffer.getvalue()
        except Exception as e:
            print(f"Audio format conversion failed: {e}")
            return audio_data

    def normalize_audio(self, audio_data: bytes) -> bytes:
        try:
            with io.BytesIO(audio_data) as audio_buffer:
                audio_array, sample_rate = sf.read(audio_buffer)
                if len(audio_array) > 0:
                    max_val = np.max(np.abs(audio_array))
                    if max_val > 0:
                        audio_array = audio_array / max_val * 0.8
                with io.BytesIO() as output_buffer:
                    sf.write(output_buffer, audio_array, sample_rate)
                    return output_buffer.getvalue()
        except Exception as e:
            print(f"Audio normalization failed: {e}")
            return audio_data


### Initialize service instances

In [41]:
class EmoChatBotService:
    def __init__(self):
        self.emotion_service = EmotionService()
        self.chat_service = ChatService()
        self.voice_service = VoiceService()
        print('Service is ready')

    async def run_pipeline(self, audio_bytes: bytes):
        print('audio received, running pipeline...')
        emotion_result = await self.emotion_service.analyze_emotion(audio_bytes)
        print('Emotion:', emotion_result.emotion, 'confidence=', round(emotion_result.confidence, 3))

        text = await self.voice_service.speech_to_text(audio_bytes)
        print('ASR Text:', text)

        reply = await self.chat_service.generate_response(text, emotion_result.emotion, emotion_result.confidence)
        print('Assistant:', reply.message)

        tts_audio = await self.voice_service.text_to_speech(reply.message, emotion_result.emotion.value)
        ext = "wav" if tts_audio[:4] == b"RIFF" else "mp3"
        with tempfile.NamedTemporaryFile(suffix=f".{ext}", delete=False) as f:
            f.write(tts_audio)
            path = f.name
        display(Audio(path, autoplay=True))  
        print('TTS bytes:', len(tts_audio))

In [43]:
nest_asyncio.apply()
chatbot=EmoChatBotService()
file=open("C:/Users/lwt/Documents/录音/录音.mp3", 'rb')
await chatbot.run_pipeline(file.read())

Service is ready
audio received, running pipeline...
Emotion: EmotionType.ANGRY confidence= 0.128
ASR Text: Hi, how are you today? I'm very happy right now.
Assistant: Hey there! I'm here and ready to chat as always. It sounds like you're experiencing a mix of emotions right now. It's completely normal to have multiple feelings at once. Do you want to talk about what's going on? I'm here to listen.
angry
{'stability': 0.4, 'similarity_boost': 0.9, 'style': 0.5, 'use_speaker_boost': True}
<Response [200]>


TTS bytes: 237027


### Model Training

In [34]:
emo=EmotionService()
train_loader, test_loader = emo._load_data()
emo._train_model(train_loader, test_loader)



03-01-02-01-01-02-01.wav
03-01-02-01-02-02-05.wav
03-01-03-01-02-01-20.wav
03-01-06-01-01-02-20.wav
03-01-08-01-02-02-01.wav
1435
Epoch 1, Train Loss: 5.1296
Epoch 2, Train Loss: 2.0679
Epoch 3, Train Loss: 2.0615
Epoch 4, Train Loss: 2.0466
Epoch 5, Train Loss: 1.9806
Epoch 6, Train Loss: 1.9694
Epoch 7, Train Loss: 1.9058
Epoch 8, Train Loss: 1.8452
Epoch 9, Train Loss: 1.8347
Epoch 10, Train Loss: 1.8154
Epoch 11, Train Loss: 1.7654
Epoch 12, Train Loss: 1.7721
Epoch 13, Train Loss: 1.7447
Epoch 14, Train Loss: 1.7610
Epoch 15, Train Loss: 1.6860
Epoch 16, Train Loss: 1.6736
Epoch 17, Train Loss: 1.6606
Epoch 18, Train Loss: 1.5855
Epoch 19, Train Loss: 1.5482
Epoch 20, Train Loss: 1.4927
Epoch 21, Train Loss: 1.5353
Epoch 22, Train Loss: 1.5040
Epoch 23, Train Loss: 1.4251
Epoch 24, Train Loss: 1.3854
Epoch 25, Train Loss: 1.3886
Epoch 26, Train Loss: 1.2922
Epoch 27, Train Loss: 1.2614
Epoch 28, Train Loss: 1.2337
Epoch 29, Train Loss: 1.1923
Epoch 30, Train Loss: 1.1984
Epoch 31,

(6.322009351518419, 39.37282229965157)