In [27]:
import os
import torch
import torchvision
import torchvision.models as models  # Add this line
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import cv2
import json

In [28]:
# Directory containing the videos
video_dir = 'data/mmsd_raw_data/utterances_final/'

# Get a list of all video files in the directory
video_files = [os.path.join(video_dir, f) for f in os.listdir(video_dir) if f.endswith('.mp4')]

# Function to extract visual features using I3D
def extract_video_features(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (224, 224))  # Resize frame to match I3D input size
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
        frames.append(frame)
    
    cap.release()
    
    video_tensor = torch.tensor(frames, dtype=torch.float32)  # Convert frames to tensor
    # Permute tensor dimensions to match I3D input shape: (time, channels, height, width)
    video_tensor = video_tensor.permute(0, 3, 1, 2)
    
    return video_tensor

# PyTorch Dataset for sarcasm detection
class VideoDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        video_path = item['video_path']
        sarcasm = int(item['sarcasm'])
        video_features = extract_video_features(video_path)
        return video_features, sarcasm

# Load the data from the JSON file
with open('sarcasm_data.json') as f:
    text_data = json.load(f)
    # Extract sarcasm labels
    sarcasm_labels = {os.path.splitext(os.path.basename(k))[0]: int(v['sarcasm']) for k, v in text_data.items()}

# Create the dataset
data = [{'video_path': path, 'sarcasm': sarcasm_labels[os.path.splitext(os.path.basename(path))[0]]} for path in video_files]

dataset = VideoDataset(data)

# Create DataLoader
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

In [29]:
# Set hyperparameters
learning_rate = 0.0001
num_epochs = 10
batch_size = 64
weight_decay = 0.05  # L2 Regularization
dropout_prob = 0

# Set seed for reproducibility
torch.manual_seed(42)

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create a new model
class SarcasmDetectionModel(nn.Module):
    def __init__(self, num_classes):
        super(SarcasmDetectionModel, self).__init__()
        self.i3d = models.video.r3d_18(pretrained=True)
        self.i3d.fc = nn.Identity()  # Remove final fully connected layer
        self.classifier = nn.Sequential(
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        features = self.i3d(x)
        logits = self.classifier(features)
        return logits

# Initialize the model
model = SarcasmDetectionModel(num_classes=2).to(device)

# Create optimizer and criterion
optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
criterion = nn.CrossEntropyLoss()

# Define dataset and dataloaders (assuming SarcasmDataset is already defined)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

# Define training function
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct_preds = 0
    total_preds = 0

    for inputs, labels in dataloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        _, preds = torch.max(outputs, 1)
        correct_preds += torch.sum(preds == labels).item()
        total_preds += labels.size(0)

    epoch_loss = running_loss / len(dataloader)
    epoch_acc = correct_preds / total_preds

    return epoch_loss, epoch_acc

# Define evaluation function
def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct_preds = 0
    total_preds = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()

            _, preds = torch.max(outputs, 1)
            correct_preds += torch.sum(preds == labels).item()
            total_preds += labels.size(0)

    epoch_loss = running_loss / len(dataloader)
    epoch_acc = correct_preds / total_preds

    return epoch_loss, epoch_acc

# Training loop
best_val_acc = 0.0
for epoch in range(num_epochs):
    train_loss, train_acc = train_epoch(model, train_dataloader, criterion, optimizer, device)
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")

    test_loss, test_acc = evaluate_model(model, test_dataloader, criterion, device)
    print(f"Epoch {epoch+1}/{num_epochs}, Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}")
    
    # Save the model if validation accuracy improves
    if test_acc > best_val_acc:
        best_val_acc = test_acc
        torch


Downloading: "https://download.pytorch.org/models/r3d_18-b3b3357e.pth" to /Users/regulafrey/.cache/torch/hub/checkpoints/r3d_18-b3b3357e.pth


URLError: <urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1000)>