In [4]:
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
import librosa
import cv2
from transformers import AutoTokenizer, AutoModel 
from sklearn.preprocessing import LabelEncoder
from collections import Counter
from typing import Dict, List, Tuple

class MELDDataPreprocessor:
    """Handles data preprocessing and balancing for the MELD dataset"""
    
    def __init__(self, random_state: int = 42):
        self.random_state = random_state
        self.label_encoder = LabelEncoder()
        
    def load_meld_data(self, data_path: str) -> pd.DataFrame:
        """Load MELD dataset and perform initial preprocessing"""
        data_path="data_set"
        df = pd.read_csv(data_path)
        df['sentiment_encoded'] = self.label_encoder.fit_transform(df['sentiment'])
        return df
    
    def random_undersample(self, df: pd.DataFrame) -> pd.DataFrame:
        """Perform random undersampling to balance the dataset"""
        class_counts = Counter(df['sentiment_encoded'])
        min_class_count = min(class_counts.values())
        
        balanced_dfs = []
        for class_label in class_counts.keys():
            class_df = df[df['sentiment_encoded'] == class_label]
            if len(class_df) > min_class_count:
                class_df = class_df.sample(n=min_class_count, 
                                         random_state=self.random_state)
            balanced_dfs.append(class_df)
        
        return pd.concat(balanced_dfs, axis=0).reset_index(drop=True)

class MELDFeatureExtractor:
    """Extracts features from multimodal MELD data"""
    
    def __init__(self):
        self.tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
        
    def extract_audio_features(self, audio_path: str) -> np.ndarray:
        """Extract MFCC and other audio features"""
        y, sr = librosa.load(audio_path)
        features = []
        
        # MFCC features
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
        features.append(mfcc)
        
        # Spectral features
        spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)
        features.append(spectral_centroids)
        
        # Chromagram
        chroma = librosa.feature.chroma_stft(y=y, sr=sr)
        features.append(chroma)
        
        return np.concatenate(features, axis=0)
    
    def extract_video_features(self, video_path: str) -> np.ndarray:
        """Extract visual features including facial expressions"""
        face_cascade = cv2.CascadeClassifier(
            cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
        )
        
        cap = cv2.VideoCapture(video_path)
        frames_features = []
        
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
                
            # Detect faces
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            faces = face_cascade.detectMultiScale(gray, 1.1, 4)
            
            for (x, y, w, h) in faces:
                face_roi = frame[y:y+h, x:x+w]
                # Extract face embeddings or features here
                # For now, we'll use basic statistics of the face region
                face_features = cv2.resize(face_roi, (64, 64)).flatten()
                frames_features.append(face_features)
        
        cap.release()
        return np.array(frames_features)

class MultimodalTransformer(nn.Module):
    """Enhanced Transformer model for multimodal sentiment analysis"""
    
    def __init__(self, num_classes: int, dropout_rate: float = 0.3):
        super().__init__()
        
        # Text encoder
        self.text_encoder = AutoModel.from_pretrained('bert-base-uncased')
        
        # Audio encoder with attention
        self.audio_encoder = nn.Sequential(
            nn.Conv1d(20, 64, kernel_size=3),  # Increased input channels for more features
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Dropout(dropout_rate),
            nn.Conv1d(64, 128, kernel_size=3),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.AdaptiveAvgPool1d(1)
        )
        
        # Video encoder with 3D convolutions
        self.video_encoder = nn.Sequential(
            nn.Conv3d(3, 64, kernel_size=3),
            nn.ReLU(),
            nn.BatchNorm3d(64),
            nn.Dropout(dropout_rate),
            nn.Conv3d(64, 128, kernel_size=3),
            nn.ReLU(),
            nn.BatchNorm3d(128),
            nn.AdaptiveAvgPool3d(1)
        )
        
        # Cross-modal attention
        self.cross_attention = nn.MultiheadAttention(
            embed_dim=128,
            num_heads=8,
            dropout=dropout_rate
        )
        
        # Final classification
        self.classifier = nn.Sequential(
            nn.Linear(768 + 256, 512),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(512, num_classes)
        )
        
    def forward(self, text_ids, text_mask, audio_features, video_features):
        # Process text
        text_output = self.text_encoder(text_ids, attention_mask=text_mask)
        text_embeddings = text_output.last_hidden_state[:, 0, :]
        
        # Process audio and video
        audio_embeddings = self.audio_encoder(audio_features)
        video_embeddings = self.video_encoder(video_features)
        
        # Cross-modal attention between audio and video
        av_features, _ = self.cross_attention(
            audio_embeddings, 
            video_embeddings, 
            video_embeddings
        )
        
        # Concatenate all features
        combined = torch.cat([text_embeddings, av_features], dim=1)
        
        return self.classifier(combined)

class SentimentTrainer:
    """Handles model training and evaluation"""
    
    def __init__(self, model: nn.Module, device: str = 'cuda'):
        self.model = model.to(device)
        self.device = device
        
        # Use weighted cross entropy for class imbalance
        class_weights = torch.FloatTensor([1.0, 1.0, 1.0]).to(device)  # Adjust based on class distribution
        self.criterion = nn.CrossEntropyLoss(weight=class_weights)
        self.optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
        
    def train_epoch(self, train_loader: DataLoader) -> float:
        self.model.train()
        total_loss = 0
        
        for batch in train_loader:
            self.optimizer.zero_grad()
            
            # Move batch to device
            text_ids = batch['text_ids'].to(self.device)
            text_mask = batch['text_mask'].to(self.device)
            audio_features = batch['audio_features'].to(self.device)
            video_features = batch['video_features'].to(self.device)
            labels = batch['label'].to(self.device)
            
            outputs = self.model(text_ids, text_mask, audio_features, video_features)
            loss = self.criterion(outputs, labels)
            
            loss.backward()
            self.optimizer.step()
            
            total_loss += loss.item()
            
        return total_loss / len(train_loader)
    
    def evaluate(self, val_loader: DataLoader) -> Dict[str, float]:
        self.model.eval()
        correct = 0
        total = 0
        val_loss = 0
        
        with torch.no_grad():
            for batch in val_loader:
                text_ids = batch['text_ids'].to(self.device)
                text_mask = batch['text_mask'].to(self.device)
                audio_features = batch['audio_features'].to(self.device)
                video_features = batch['video_features'].to(self.device)
                labels = batch['label'].to(self.device)
                
                outputs = self.model(text_ids, text_mask, audio_features, video_features)
                loss = self.criterion(outputs, labels)
                
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                val_loss += loss.item()
        
        return {
            'accuracy': correct / total,
            'val_loss': val_loss / len(val_loader)
        }

#print("Accuracy:", accuracy)
# print("Validation loss:", val_loss)    


In [9]:
# Print column names
print(train_df.columns)

# Print first few rows of data
print(train_df.head())

Index(['Sr No.', 'Utterance', 'Speaker', 'Emotion', 'Sentiment', 'Dialogue_ID',
       'Utterance_ID', 'Season', 'Episode', 'StartTime', 'EndTime'],
      dtype='object')
   Sr No.                                          Utterance          Speaker  \
0       1  also I was the point person on my company’s tr...         Chandler   
1       2                   You must’ve had your hands full.  The Interviewer   
2       3                            That I did. That I did.         Chandler   
3       4      So let’s talk a little bit about your duties.  The Interviewer   
4       5                             My duties?  All right.         Chandler   

    Emotion Sentiment  Dialogue_ID  Utterance_ID  Season  Episode  \
0   neutral   neutral            0             0       8       21   
1   neutral   neutral            0             1       8       21   
2   neutral   neutral            0             2       8       21   
3   neutral   neutral            0             3       8       21 

In [11]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer
import pandas as pd

class SentimentEmotionDataset(Dataset):
    def __init__(self, dataframe, max_length=128):
        self.data = dataframe
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.max_length = max_length
        
        # Create label mappings
        self.emotion_map = {label: idx for idx, label in enumerate(dataframe['Emotion'].unique())}
        self.sentiment_map = {label: idx for idx, label in enumerate(dataframe['Sentiment'].unique())}
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        text = str(row['Utterance'])
        emotion = self.emotion_map[row['Emotion']]
        sentiment = self.sentiment_map[row['Sentiment']]
        
        # Tokenize the text
        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'emotion': torch.tensor(emotion, dtype=torch.long),
            'sentiment': torch.tensor(sentiment, dtype=torch.long)
        }

# Load your data
train_df = pd.read_csv('data_set/train_sent_emo.csv')
val_df = pd.read_csv('data_set/dev_sent_emo.csv')  # if you have this

# Create dataset instances
train_dataset = SentimentEmotionDataset(train_df)
val_dataset = SentimentEmotionDataset(val_df)  # if you have validation data

# Create data loaders
train_loader = DataLoader(
    train_dataset,
    batch_size=16,  # adjust based on your GPU memory
    shuffle=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=16,
    shuffle=False
)

# Print some information about the data
print(f"Number of training samples: {len(train_dataset)}")
print(f"Emotion classes: {train_dataset.emotion_map}")
print(f"Sentiment classes: {train_dataset.sentiment_map}")

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Number of training samples: 9989
Emotion classes: {'neutral': 0, 'surprise': 1, 'fear': 2, 'sadness': 3, 'joy': 4, 'disgust': 5, 'anger': 6}
Sentiment classes: {'neutral': 0, 'positive': 1, 'negative': 2}


In [13]:
optimizer = AccuracyOptimizer(model)
history = optimizer.train_with_validation(train_loader, val_loader)
test_metrics = optimizer.evaluate_test_set(test_loader)

NameError: name 'AccuracyOptimizer' is not defined

In [6]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModel, AutoTokenizer
import numpy as np
from sklearn.metrics import f1_score
from torch.optim.lr_scheduler import ReduceLROnPlateau

# 1. Modified model for text-only sentiment and emotion classification
class SentimentEmotionModel(nn.Module):
    def __init__(self, num_emotions, num_sentiments):
        super().__init__()
        
        # Text encoder (BERT)
        self.text_encoder = AutoModel.from_pretrained('bert-base-uncased')
        
        # Separate classification heads for emotion and sentiment
        self.emotion_classifier = nn.Sequential(
            nn.Linear(768, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_emotions)
        )
        
        self.sentiment_classifier = nn.Sequential(
            nn.Linear(768, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_sentiments)
        )
    
    def forward(self, input_ids, attention_mask):
        # Process text
        outputs = self.text_encoder(input_ids, attention_mask=attention_mask)
        embeddings = outputs.last_hidden_state[:, 0, :]  # Get CLS token
        
        # Generate predictions
        emotion_output = self.emotion_classifier(embeddings)
        sentiment_output = self.sentiment_classifier(embeddings)
        
        return emotion_output, sentiment_output

# 2. Dataset class for your data
class SentimentEmotionDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length=128):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_length = max_length
        
        # Create label mappings
        self.emotion_map = {label: idx for idx, label in enumerate(dataframe['Emotion'].unique())}
        self.sentiment_map = {label: idx for idx, label in enumerate(dataframe['Sentiment'].unique())}
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        text = str(row['Utterance'])
        
        # Tokenize text
        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'emotion': torch.tensor(self.emotion_map[row['Emotion']]),
            'sentiment': torch.tensor(self.sentiment_map[row['Sentiment']])
        }

# 3. Modified AccuracyOptimizer for dual task learning
class AccuracyOptimizer:
    def __init__(self, model, device='cuda' if torch.cuda.is_available() else 'cpu'):
        self.model = model.to(device)
        self.device = device
        
        self.optimizer = torch.optim.AdamW(
            model.parameters(),
            lr=2e-5,
            weight_decay=0.01
        )
        
        self.scheduler = ReduceLROnPlateau(
            self.optimizer,
            mode='max',
            factor=0.5,
            patience=2,
            verbose=True
        )
        
        self.criterion = nn.CrossEntropyLoss()
    
    def train_epoch(self, train_loader):
        self.model.train()
        total_loss = 0
        
        for batch in train_loader:
            self.optimizer.zero_grad()
            
            # Move data to device
            input_ids = batch['input_ids'].to(self.device)
            attention_mask = batch['attention_mask'].to(self.device)
            emotion_labels = batch['emotion'].to(self.device)
            sentiment_labels = batch['sentiment'].to(self.device)
            
            # Forward pass
            emotion_outputs, sentiment_outputs = self.model(input_ids, attention_mask)
            
            # Calculate losses
            emotion_loss = self.criterion(emotion_outputs, emotion_labels)
            sentiment_loss = self.criterion(sentiment_outputs, sentiment_labels)
            
            # Combined loss
            loss = emotion_loss + sentiment_loss
            
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            self.optimizer.step()
            
            total_loss += loss.item()
        
        return total_loss / len(train_loader)

    def validate(self, val_loader):
        self.model.eval()
        total_loss = 0
        emotion_preds, sentiment_preds = [], []
        emotion_labels, sentiment_labels = [], []
        
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                batch_emotion_labels = batch['emotion'].to(self.device)
                batch_sentiment_labels = batch['sentiment'].to(self.device)
                
                emotion_outputs, sentiment_outputs = self.model(input_ids, attention_mask)
                
                # Calculate losses
                emotion_loss = self.criterion(emotion_outputs, batch_emotion_labels)
                sentiment_loss = self.criterion(sentiment_outputs, batch_sentiment_labels)
                loss = emotion_loss + sentiment_loss
                
                total_loss += loss.item()
                
                # Get predictions
                _, emotion_pred = torch.max(emotion_outputs, 1)
                _, sentiment_pred = torch.max(sentiment_outputs, 1)
                
                emotion_preds.extend(emotion_pred.cpu().numpy())
                sentiment_preds.extend(sentiment_pred.cpu().numpy())
                emotion_labels.extend(batch_emotion_labels.cpu().numpy())
                sentiment_labels.extend(batch_sentiment_labels.cpu().numpy())
        
        # Calculate metrics
        emotion_f1 = f1_score(emotion_labels, emotion_preds, average='weighted')
        sentiment_f1 = f1_score(sentiment_labels, sentiment_preds, average='weighted')
        
        return {
            'val_loss': total_loss / len(val_loader),
            'emotion_f1': emotion_f1,
            'sentiment_f1': sentiment_f1
        }

# 4. Training setup
def train_model(train_df, val_df, num_epochs=10):
    # Initialize tokenizer
    tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
    
    # Create datasets
    train_dataset = SentimentEmotionDataset(train_df, tokenizer)
    val_dataset = SentimentEmotionDataset(val_df, tokenizer)
    
    # Create dataloaders
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
    
    # Initialize model
    num_emotions = len(train_dataset.emotion_map)
    num_sentiments = len(train_dataset.sentiment_map)
    model = SentimentEmotionModel(num_emotions, num_sentiments)
    
    # Initialize optimizer
    optimizer = AccuracyOptimizer(model)
    
    # Training loop
    best_f1 = 0
    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        
        # Train for one epoch
        train_loss = optimizer.train_epoch(train_loader)
        print(f"Training Loss: {train_loss:.4f}")
        
        # Evaluate
        metrics = optimizer.validate(val_loader)
        print(f"Validation Loss: {metrics['val_loss']:.4f}")
        print(f"Emotion F1: {metrics['emotion_f1']:.4f}")
        print(f"Sentiment F1: {metrics['sentiment_f1']:.4f}")
        print("-" * 50)
        
        # Save best model
        avg_f1 = (metrics['emotion_f1'] + metrics['sentiment_f1']) / 2
        if avg_f1 > best_f1:
            best_f1 = avg_f1
            torch.save(model.state_dict(), 'best_model.pth')
    
    return model

In [18]:
! pip install torch transformers sklearn numpy

Collecting sklearn
  Downloading sklearn-0.0.post12.tar.gz (2.6 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'error'


  error: subprocess-exited-with-error
  
  × python setup.py egg_info did not run successfully.
  │ exit code: 1
  ╰─> [15 lines of output]
      The 'sklearn' PyPI package is deprecated, use 'scikit-learn'
      rather than 'sklearn' for pip commands.
      
      Here is how to fix this error in the main use cases:
      - use 'pip install scikit-learn' rather than 'pip install sklearn'
      - replace 'sklearn' by 'scikit-learn' in your pip requirements files
        (requirements.txt, setup.py, setup.cfg, Pipfile, etc ...)
      - if the 'sklearn' package is used by one of your dependencies,
        it would be great if you take some time to track which package uses
        'sklearn' instead of 'scikit-learn' and report it to their issue tracker
      - as a last resort, set the environment variable
        SKLEARN_ALLOW_DEPRECATED_SKLEARN_PACKAGE_INSTALL=True to avoid this error
      
      More information is available at
      https://github.com/scikit-learn/sklearn-pypi-packag

In [10]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import pandas as pd
import numpy as np
import librosa
import cv2
from transformers import AutoTokenizer
from typing import Dict, List, Tuple
import os

class MELDDataset(Dataset):
    def __init__(self, data_path: str, video_dir: str, audio_dir: str, max_length: int = 128):
        """
        Initialize MELD dataset
        
        Args:
            data_path: Path to the CSV file containing MELD annotations
            video_dir: Directory containing video files
            audio_dir: Directory containing audio files
            max_length: Maximum length for text tokenization
        """
        self.data = pd.read_csv(data_set/train_sent_emo.csv)
        self.video_dir = video_dir
        self.audio_dir = audio_dir
        self.max_length = max_length
        
        # Initialize tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
        
        # Map sentiment labels to integers
        self.sentiment_map = {
            'neutral': 0,
            'positive': 1,
            'negative': 2
        }
        
    def __len__(self) -> int:
        return len(self.data)
    
    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        row = self.data.iloc[idx]
        
        # Process text
        text_encoding = self._process_text(row['Utterance'])
        
        # Process audio
        audio_features = self._process_audio(os.path.join(self.audio_dir, row['Audio_ID']))
        
        # Process video
        video_features = self._process_video(os.path.join(self.video_dir, row['Video_ID']))
        
        # Get label
        label = self.sentiment_map[row['Sentiment']]
        
        return {
            'text_ids': text_encoding['input_ids'],
            'text_mask': text_encoding['attention_mask'],
            'audio_features': audio_features,
            'video_features': video_features,
            'label': torch.tensor(label, dtype=torch.long)
        }
    
    def _process_text(self, text: str) -> Dict[str, torch.Tensor]:
        """Tokenize text and convert to tensor"""
        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze()
        }
    
    def _process_audio(self, audio_path: str) -> torch.Tensor:
        """Extract audio features using librosa"""
        try:
            # Load audio file
            y, sr = librosa.load(audio_path, duration=10)  # Load first 10 seconds
            
            # Extract features
            mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
            mfcc_delta = librosa.feature.delta(mfcc)
            mfcc_delta2 = librosa.feature.delta(mfcc, order=2)
            
            # Combine features
            features = np.concatenate([mfcc, mfcc_delta, mfcc_delta2], axis=0)
            
            # Pad or truncate to fixed length
            target_length = 1000
            if features.shape[1] < target_length:
                features = np.pad(features, ((0, 0), (0, target_length - features.shape[1])))
            else:
                features = features[:, :target_length]
            
            return torch.FloatTensor(features)
            
        except Exception as e:
            print(f"Error processing audio file {audio_path}: {str(e)}")
            return torch.zeros((39, 1000))  # Return zero tensor with expected shape
    
    def _process_video(self, video_path: str) -> torch.Tensor:
        """Extract video features using OpenCV"""
        try:
            cap = cv2.VideoCapture(video_path)
            frames = []
            max_frames = 30  # Extract features from first 30 frames
            
            while len(frames) < max_frames and cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                    
                # Resize frame
                frame = cv2.resize(frame, (112, 112))
                
                # Convert to RGB
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                
                # Normalize
                frame = frame / 255.0
                
                frames.append(frame)
            
            cap.release()
            
            # Pad if necessary
            while len(frames) < max_frames:
                frames.append(np.zeros((112, 112, 3)))
            
            # Convert to tensor
            frames_tensor = torch.FloatTensor(np.array(frames))
            
            # Reshape to [C, T, H, W]
            frames_tensor = frames_tensor.permute(3, 0, 1, 2)
            
            return frames_tensor
            
        except Exception as e:
            print(f"Error processing video file {video_path}: {str(e)}")
            return torch.zeros((3, 30, 112, 112))  # Return zero tensor with expected shape

def create_data_loaders(
    data_path: str,
    video_dir: str,
    audio_dir: str,
    batch_size: int = 32,
    train_ratio: float = 0.7,
    val_ratio: float = 0.15,
    num_workers: int = 4
) -> Tuple[DataLoader, DataLoader, DataLoader]:
    """
    Create train, validation, and test data loaders
    
    Args:
        data_path: Path to the CSV file containing MELD annotations
        video_dir: Directory containing video files
        audio_dir: Directory containing audio files
        batch_size: Batch size for data loaders
        train_ratio: Proportion of data to use for training
        val_ratio: Proportion of data to use for validation
        num_workers: Number of worker processes for data loading
    
    Returns:
        Tuple of (train_loader, val_loader, test_loader)
    """
    # Create dataset
    dataset = MELDDataset(data_path, video_dir, audio_dir)
    
    # Calculate split sizes
    total_size = len(dataset)
    train_size = int(train_ratio * total_size)
    val_size = int(val_ratio * total_size)
    test_size = total_size - train_size - val_size
    
    # Split dataset
    train_dataset, val_dataset, test_dataset = random_split(
        dataset, 
        [train_size, val_size, test_size],
        generator=torch.Generator().manual_seed(42)
    )
    
    # Create data loaders
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )
    
    test_loader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )
    
    return train_loader, val_loader, test_loader

In [11]:

# Create your data loaders first (train_loader, val_loader, test_loader)
# Then:

# Initialize model and optimizer
model = MultimodalSentimentModel(num_classes=3)     
#model = MultimodalTransformer(num_classes=3) 
optimizer = AccuracyOptimizer(model)

# Train the model
history = optimizer.train_with_validation(train_loader, val_loader)

# Get test metrics
test_metrics = optimizer.evaluate_test_set(test_loader)

NameError: name 'train_loader' is not defined

In [3]:
!pip install ipywidgets
#conda install -c conda-forge ipywidgets

Collecting ipywidgets
  Using cached ipywidgets-8.1.5-py3-none-any.whl.metadata (2.3 kB)
Collecting widgetsnbextension~=4.0.12 (from ipywidgets)
  Using cached widgetsnbextension-4.0.13-py3-none-any.whl.metadata (1.6 kB)
Collecting jupyterlab-widgets~=3.0.12 (from ipywidgets)
  Using cached jupyterlab_widgets-3.0.13-py3-none-any.whl.metadata (4.1 kB)
Using cached ipywidgets-8.1.5-py3-none-any.whl (139 kB)
Using cached jupyterlab_widgets-3.0.13-py3-none-any.whl (214 kB)
Using cached widgetsnbextension-4.0.13-py3-none-any.whl (2.3 MB)
Installing collected packages: widgetsnbextension, jupyterlab-widgets, ipywidgets
Successfully installed ipywidgets-8.1.5 jupyterlab-widgets-3.0.13 widgetsnbextension-4.0.13


In [12]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report, f1_score
from torch.optim.lr_scheduler import ReduceLROnPlateau
from typing import Dict, List, Tuple

class AccuracyOptimizer:
    def __init__(self, model: nn.Module, device: str = 'cuda'):
        self.model = model.to(device)
        self.device = device
        
        # Initialize optimizer with weight decay
        self.optimizer = torch.optim.AdamW(
            model.parameters(),
            lr=2e-5,
            weight_decay=0.01  # L2 regularization
        )
        
        # Learning rate scheduler
        self.scheduler = ReduceLROnPlateau(
            self.optimizer,
            mode='max',
            factor=0.5,
            patience=2,
            verbose=True
        )
        
        # Focal Loss for better handling of class imbalance
        self.criterion = FocalLoss(gamma=2.0)
        
    def train_with_validation(
        self,
        train_loader: torch.utils.data.DataLoader,
        val_loader: torch.utils.data.DataLoader,
        num_epochs: int = 10
    ) -> Dict[str, List[float]]:
        history = {
            'train_loss': [],
            'val_loss': [],
            'val_accuracy': [],
            'val_f1': []
        }
        
        best_f1 = 0.0
        patience_counter = 0
        
        for epoch in range(num_epochs):
            # Training phase
            train_loss = self._train_epoch(train_loader)
            history['train_loss'].append(train_loss)
            
            # Validation phase
            metrics = self._validate(val_loader)
            history['val_loss'].append(metrics['val_loss'])
            history['val_accuracy'].append(metrics['accuracy'])
            history['val_f1'].append(metrics['f1_score'])
            
            # Learning rate scheduling
            self.scheduler.step(metrics['f1_score'])
            
            # Early stopping check
            if metrics['f1_score'] > best_f1:
                best_f1 = metrics['f1_score']
                patience_counter = 0
                # Save best model
                torch.save(self.model.state_dict(), 'best_model.pth')
            else:
                patience_counter += 1
                if patience_counter >= 5:  # Early stopping patience
                    print("Early stopping triggered")
                    break
                    
            print(f"Epoch {epoch+1}/{num_epochs}")
            print(f"Train Loss: {train_loss:.4f}")
            print(f"Val Loss: {metrics['val_loss']:.4f}")
            print(f"Val Accuracy: {metrics['accuracy']:.4f}")
            print(f"Val F1 Score: {metrics['f1_score']:.4f}")
            print("--------------------")
            
        return history
    
    def _train_epoch(self, train_loader) -> float:
        self.model.train()
        total_loss = 0
        
        for batch in train_loader:
            self.optimizer.zero_grad()
            
            outputs = self.model(
                batch['text_ids'].to(self.device),
                batch['text_mask'].to(self.device),
                batch['audio_features'].to(self.device),
                batch['video_features'].to(self.device)
            )
            
            loss = self.criterion(outputs, batch['label'].to(self.device))
            loss.backward()
            
            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            
            self.optimizer.step()
            total_loss += loss.item()
            
        return total_loss / len(train_loader)
    
    def _validate(self, val_loader) -> Dict[str, float]:
        self.model.eval()
        val_loss = 0
        all_preds = []
        all_labels = []
        
        with torch.no_grad():
            for batch in val_loader:
                outputs = self.model(
                    batch['text_ids'].to(self.device),
                    batch['text_mask'].to(self.device),
                    batch['audio_features'].to(self.device),
                    batch['video_features'].to(self.device)
                )
                
                labels = batch['label'].to(self.device)
                loss = self.criterion(outputs, labels)
                val_loss += loss.item()
                
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
        
        # Calculate metrics
        accuracy = np.mean(np.array(all_preds) == np.array(all_labels))
        f1 = f1_score(all_labels, all_preds, average='weighted')
        
        return {
            'val_loss': val_loss / len(val_loader),
            'accuracy': accuracy,
            'f1_score': f1
        }
    
    def evaluate_test_set(self, test_loader) -> Dict[str, float]:
        self.model.eval()
        all_preds = []
        all_labels = []
        
        with torch.no_grad():
            for batch in test_loader:
                outputs = self.model(
                    batch['text_ids'].to(self.device),
                    batch['text_mask'].to(self.device),
                    batch['audio_features'].to(self.device),
                    batch['video_features'].to(self.device)
                )
                
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(batch['label'].cpu().numpy())
        
        # Generate detailed metrics
        conf_matrix = confusion_matrix(all_labels, all_preds)
        class_report = classification_report(all_labels, all_preds, output_dict=True)
        
        return {
            'confusion_matrix': conf_matrix,
            'classification_report': class_report,
            'accuracy': class_report['accuracy'],
            'macro_f1': class_report['macro avg']['f1-score']
        }

class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0):
        super().__init__()
        self.gamma = gamma
        
    def forward(self, inputs, targets):
        ce_loss = nn.CrossEntropyLoss(reduction='none')(inputs, targets)
        pt = torch.exp(-ce_loss)
        focal_loss = ((1 - pt) ** self.gamma * ce_loss).mean()
        return focal_loss

def get_optimization_suggestions(metrics: Dict[str, float]) -> List[str]:
    suggestions = []
    
    if metrics['accuracy'] < 0.7:
        suggestions.extend([
            "Consider increasing model complexity",
            "Try different learning rates",
            "Add more regularization"
        ])
    
    if metrics['macro_f1'] < 0.6:
        suggestions.extend([
            "Check class distribution",
            "Adjust class weights",
            "Try data augmentation"
        ])
    
    return suggestions