In [8]:
import torch

# Check if CUDA is available
if torch.cuda.is_available():
    print(f"CUDA is available. {torch.cuda.device_count()} GPU(s) detected:")
    for i in range(torch.cuda.device_count()):
        print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
else:
    print("CUDA is not available. Running on CPU.")

CUDA is not available. Running on CPU.


In [9]:
import torch
print(torch.cuda.is_available())  # Should return True if CUDA is available
print(torch.version.cuda)         # Should show the CUDA version
print(torch.backends.cudnn.enabled)  # Should be True if cuDNN is available

False
None
True


In [1]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoFeatureExtractor, AutoTokenizer, AutoModel
from datasets import load_dataset
import librosa
import numpy as np

# Load the AudioCLIP model and its components
model_name = "microsoft/audio-clip"
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

# Create a custom dataset for VGGSound
class VGGSoundDataset(Dataset):
    def __init__(self, dataset, feature_extractor, tokenizer):
        self.dataset = dataset
        self.feature_extractor = feature_extractor
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        audio, sr = librosa.load(item['audio_path'], sr=16000, mono=True)
        audio_features = self.feature_extractor(audio, sampling_rate=sr, return_tensors="pt")
        text_features = self.tokenizer(item['label'], padding=True, truncation=True, return_tensors="pt")
        
        return {
            'audio_features': audio_features['input_values'].squeeze(),
            'text_features': text_features['input_ids'].squeeze(),
            'label': item['label']
        }

# Load the VGGSound dataset
vggsound = load_dataset("harrym/vggsound", split="train[:1000]")  # Using a subset for demonstration

# Create the custom dataset and dataloader
dataset = VGGSoundDataset(vggsound, feature_extractor, tokenizer)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

# Function to compute similarity scores
def compute_similarity(audio_features, text_features):
    audio_embeds = model.get_audio_features(audio_features)
    text_embeds = model.get_text_features(text_features)
    
    # Normalize embeddings
    audio_embeds = audio_embeds / audio_embeds.norm(dim=-1, keepdim=True)
    text_embeds = text_embeds / text_embeds.norm(dim=-1, keepdim=True)
    
    # Compute similarity
    similarity = torch.matmul(audio_embeds, text_embeds.t())
    return similarity

# Training loop
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(5):  # 5 epochs for demonstration
    for batch in dataloader:
        audio_features = batch['audio_features'].to(device)
        text_features = batch['text_features'].to(device)
        
        similarity = compute_similarity(audio_features, text_features)
        
        # Implement your loss function and optimization step here
        # For example, you could use a contrastive loss or cross-entropy loss
        
        # Print some statistics
        print(f"Epoch {epoch}, Batch similarity: {similarity.mean().item()}")

# Example of using the model for inference
def predict(audio_path, candidate_labels):
    audio, sr = librosa.load(audio_path, sr=16000, mono=True)
    audio_features = feature_extractor(audio, sampling_rate=sr, return_tensors="pt").to(device)
    text_features = tokenizer(candidate_labels, padding=True, truncation=True, return_tensors="pt").to(device)
    
    with torch.no_grad():
        similarity = compute_similarity(audio_features['input_values'], text_features['input_ids'])
    
    predicted_label = candidate_labels[similarity.argmax().item()]
    return predicted_label

# Example usage
audio_path = "path/to/your/audio/file.wav"
candidate_labels = ["dog barking", "car horn", "people talking", "music playing"]
predicted_label = predict(audio_path, candidate_labels)
print(f"Predicted label: {predicted_label}")

  from .autonotebook import tqdm as notebook_tqdm


OSError: microsoft/audio-clip is not a local folder and is not a valid model identifier listed on 'https://huggingface.co/models'
If this is a private repository, make sure to pass a token having permission to this repo either by logging in with `huggingface-cli login` or by passing `token=<your_token>`

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel, AutoFeatureExtractor, AutoTokenizer
from torchvision import models, transforms
import torchaudio
from datasets import load_dataset
import numpy as np

class AudioCLIPForVGGSound(nn.Module):
    def __init__(self, text_model_name, image_model_name, audio_model_name, projection_dim=512):
        super(AudioCLIPForVGGSound, self).__init__()
        
        # Text encoder (for labels)
        self.text_model = AutoModel.from_pretrained(text_model_name)
        self.text_projection = nn.Linear(self.text_model.config.hidden_size, projection_dim)
        
        # Image encoder
        self.image_model = models.resnet50(pretrained=True)
        self.image_model.fc = nn.Identity()  # Remove the final fully connected layer
        self.image_projection = nn.Linear(2048, projection_dim)
        
        # Audio encoder
        self.audio_model = AutoModel.from_pretrained(audio_model_name)
        self.audio_projection = nn.Linear(self.audio_model.config.hidden_size, projection_dim)
        
        # Temperature parameter
        self.logit_scale = nn.Parameter(torch.ones([]) * np.log(1 / 0.07))
        
    def encode_text(self, text):
        text_features = self.text_model(**text)[0]  # [batch_size, sequence_length, hidden_size]
        text_features = text_features[:, 0, :]  # Take the [CLS] token representation
        return self.text_projection(text_features)
    
    def encode_image(self, image):
        image_features = self.image_model(image)
        return self.image_projection(image_features)
    
    def encode_audio(self, audio):
        audio_features = self.audio_model(**audio)[0]
        audio_features = audio_features.mean(dim=1)  # Average pooling over time
        return self.audio_projection(audio_features)
    
    def forward(self, text, image, audio):
        text_features = self.encode_text(text)
        image_features = self.encode_image(image)
        audio_features = self.encode_audio(audio)
        
        # Normalize features
        text_features = F.normalize(text_features, dim=-1)
        image_features = F.normalize(image_features, dim=-1)
        audio_features = F.normalize(audio_features, dim=-1)
        
        # Compute similarity scores
        logit_scale = self.logit_scale.exp()
        logits_per_text_image = logit_scale * text_features @ image_features.t()
        logits_per_text_audio = logit_scale * text_features @ audio_features.t()
        logits_per_image_audio = logit_scale * image_features @ audio_features.t()
        
        return logits_per_text_image, logits_per_text_audio, logits_per_image_audio

class VGGSoundDataset(torch.utils.data.Dataset):
    def __init__(self, dataset, text_tokenizer, image_processor, audio_processor):
        self.dataset = dataset
        self.text_tokenizer = text_tokenizer
        self.image_processor = image_processor
        self.audio_processor = audio_processor
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        item = self.dataset[idx]
        
        # Process label as text
        text = self.text_tokenizer(item['label'], padding='max_length', truncation=True, return_tensors='pt')
        
        # Load and process image
        image = self.image_processor(item['image'].convert('RGB'))
        
        # Load and process audio
        audio, sr = torchaudio.load(item['audio'])
        audio = self.audio_processor(audio, sampling_rate=sr, return_tensors='pt')
        
        return {
            'text': text,
            'image': image,
            'audio': audio,
            'label': item['label']
        }

def train(model, dataloader, optimizer, device):
    model.train()
    for batch in dataloader:
        optimizer.zero_grad()
        
        text = {k: v.to(device) for k, v in batch['text'].items()}
        image = batch['image'].to(device)
        audio = {k: v.to(device) for k, v in batch['audio'].items()}
        
        logits_per_text_image, logits_per_text_audio, logits_per_image_audio = model(text, image, audio)
        
        # Compute contrastive loss
        labels = torch.arange(logits_per_text_image.shape[0]).to(device)
        loss_ti = F.cross_entropy(logits_per_text_image, labels)
        loss_ta = F.cross_entropy(logits_per_text_audio, labels)
        loss_ia = F.cross_entropy(logits_per_image_audio, labels)
        
        total_loss = (loss_ti + loss_ta + loss_ia) / 3
        
        total_loss.backward()
        optimizer.step()
        
        print(f"Loss: {total_loss.item()}")

if __name__ == "__main__":
    # Initialize model and components
    text_model_name = "bert-base-uncased"
    image_model_name = "resnet50"
    audio_model_name = "facebook/wav2vec2-base"
    
    model = AudioCLIPForVGGSound(text_model_name, image_model_name, audio_model_name)
    
    text_tokenizer = AutoTokenizer.from_pretrained(text_model_name)
    image_processor = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    audio_processor = AutoFeatureExtractor.from_pretrained(audio_model_name)
    
    # Load VGGSound dataset
    vggsound = load_dataset("harrym/vggsound", split="train[:1000]")  # Using a subset for demonstration
    
    dataset = VGGSoundDataset(vggsound, text_tokenizer, image_processor, audio_processor)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    # Train the model
    for epoch in range(5):  # 5 epochs for demonstration
        print(f"Epoch {epoch+1}")
        train(model, dataloader, optimizer, device)



DatasetNotFoundError: Dataset 'harrym/vggsound' doesn't exist on the Hub or cannot be accessed.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel, AutoFeatureExtractor, AutoTokenizer
from torchvision import models, transforms
import torchaudio
import os
import pandas as pd
from PIL import Image
import numpy as np

class AudioCLIPForVGGSound(nn.Module):
    # ... [The model class remains the same as in the previous implementation]

class VGGSoundDataset(torch.utils.data.Dataset):
    def __init__(self, csv_file, audio_dir, image_dir, text_tokenizer, image_processor, audio_processor):
        self.data = pd.read_csv(csv_file)
        self.audio_dir = audio_dir
        self.image_dir = image_dir
        self.text_tokenizer = text_tokenizer
        self.image_processor = image_processor
        self.audio_processor = audio_processor
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        
        # Process label as text
        text = self.text_tokenizer(row['label'], padding='max_length', truncation=True, return_tensors='pt')
        
        # Load and process image
        image_path = os.path.join(self.image_dir, f"{row['video_id']}_{row['start_time']}.jpg")
        image = Image.open(image_path).convert('RGB')
        image = self.image_processor(image)
        
        # Load and process audio
        audio_path = os.path.join(self.audio_dir, f"{row['video_id']}_{row['start_time']}.wav")
        audio, sr = torchaudio.load(audio_path)
        audio = self.audio_processor(audio, sampling_rate=sr, return_tensors='pt')
        
        return {
            'text': text,
            'image': image,
            'audio': audio,
            'label': row['label']
        }

def train(model, dataloader, optimizer, device):
    # ... [The training function remains the same as in the previous implementation]

if __name__ == "__main__":
    # Initialize model and components
    text_model_name = "bert-base-uncased"
    image_model_name = "resnet50"
    audio_model_name = "facebook/wav2vec2-base"
    
    model = AudioCLIPForVGGSound(text_model_name, image_model_name, audio_model_name)
    
    text_tokenizer = AutoTokenizer.from_pretrained(text_model_name)
    image_processor = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    audio_processor = AutoFeatureExtractor.from_pretrained(audio_model_name)
    
    # Set paths for VGGSound dataset
    csv_file = "path/to/vggsound.csv"
    audio_dir = "path/to/vggsound_audio"
    image_dir = "path/to/vggsound_frames"
    
    # Create dataset and dataloader
    dataset = VGGSoundDataset(csv_file, audio_dir, image_dir, text_tokenizer, image_processor, audio_processor)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    # Train the model
    for epoch in range(5):  # 5 epochs for demonstration
        print(f"Epoch {epoch+1}")
        train(model, dataloader, optimizer, device)

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel, AutoFeatureExtractor, AutoTokenizer
from torchvision import models, transforms
import torchaudio
from datasets import load_dataset
import numpy as np

class AudioCLIPForAudioSet(nn.Module):
    def __init__(self, text_model_name, audio_model_name, projection_dim=512):
        super(AudioCLIPForAudioSet, self).__init__()
        
        # Text encoder (for labels)
        self.text_model = AutoModel.from_pretrained(text_model_name)
        self.text_projection = nn.Linear(self.text_model.config.hidden_size, projection_dim)
        
        # Audio encoder
        self.audio_model = AutoModel.from_pretrained(audio_model_name)
        self.audio_projection = nn.Linear(self.audio_model.config.hidden_size, projection_dim)
        
        # Temperature parameter
        self.logit_scale = nn.Parameter(torch.ones([]) * np.log(1 / 0.07))
        
    def encode_text(self, text):
        text_features = self.text_model(**text)[0]  # [batch_size, sequence_length, hidden_size]
        text_features = text_features[:, 0, :]  # Take the [CLS] token representation
        return self.text_projection(text_features)
    
    def encode_audio(self, audio):
        audio_features = self.audio_model(**audio)[0]
        audio_features = audio_features.mean(dim=1)  # Average pooling over time
        return self.audio_projection(audio_features)
    
    def forward(self, text, audio):
        text_features = self.encode_text(text)
        audio_features = self.encode_audio(audio)
        
        # Normalize features
        text_features = F.normalize(text_features, dim=-1)
        audio_features = F.normalize(audio_features, dim=-1)
        
        # Compute similarity scores
        logit_scale = self.logit_scale.exp()
        logits_per_text_audio = logit_scale * text_features @ audio_features.t()
        
        return logits_per_text_audio

class AudioSetDataset(torch.utils.data.Dataset):
    def __init__(self, dataset, text_tokenizer, audio_processor):
        self.dataset = dataset
        self.text_tokenizer = text_tokenizer
        self.audio_processor = audio_processor
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        item = self.dataset[idx]
        
        # Process label as text
        text = self.text_tokenizer(item['labels'][0], padding='max_length', truncation=True, return_tensors='pt')
        
        # Process audio
        audio = self.audio_processor(item['audio']['array'], sampling_rate=item['audio']['sampling_rate'], return_tensors='pt')
        
        return {
            'text': text,
            'audio': audio,
            'label': item['labels'][0]
        }

def train(model, dataloader, optimizer, device):
    model.train()
    for batch in dataloader:
        optimizer.zero_grad()
        
        text = {k: v.to(device) for k, v in batch['text'].items()}
        audio = {k: v.to(device) for k, v in batch['audio'].items()}
        
        logits_per_text_audio = model(text, audio)
        
        # Compute contrastive loss
        labels = torch.arange(logits_per_text_audio.shape[0]).to(device)
        loss = F.cross_entropy(logits_per_text_audio, labels)
        
        loss.backward()
        optimizer.step()
        
        print(f"Loss: {loss.item()}")

if __name__ == "__main__":
    # Initialize model and components
    text_model_name = "bert-base-uncased"
    audio_model_name = "facebook/wav2vec2-base"
    
    model = AudioCLIPForAudioSet(text_model_name, audio_model_name)
    
    text_tokenizer = AutoTokenizer.from_pretrained(text_model_name)
    audio_processor = AutoFeatureExtractor.from_pretrained(audio_model_name)
    
    # Load AudioSet dataset
    audioset = load_dataset("jzumer/audioset", split="train[:1000]")  # Using a subset for demonstration
    
    dataset = AudioSetDataset(audioset, text_tokenizer, audio_processor)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    # Train the model
    for epoch in range(5):  # 5 epochs for demonstration
        print(f"Epoch {epoch+1}")
        train(model, dataloader, optimizer, device)

DatasetNotFoundError: Dataset 'jzumer/audioset' doesn't exist on the Hub or cannot be accessed.

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel, AutoFeatureExtractor, AutoTokenizer
from torchvision import models, transforms
import torchaudio
from datasets import load_dataset
import numpy as np

class AudioCLIPForAudioSet(nn.Module):
    def __init__(self, text_model_name, audio_model_name, projection_dim=512):
        super(AudioCLIPForAudioSet, self).__init__()
        
        # Text encoder (for labels)
        self.text_model = AutoModel.from_pretrained(text_model_name)
        self.text_projection = nn.Linear(self.text_model.config.hidden_size, projection_dim)
        
        # Audio encoder
        self.audio_model = AutoModel.from_pretrained(audio_model_name)
        self.audio_projection = nn.Linear(self.audio_model.config.hidden_size, projection_dim)
        
        # Temperature parameter
        self.logit_scale = nn.Parameter(torch.ones([]) * np.log(1 / 0.07))
        
    def encode_text(self, text):
        text_features = self.text_model(**text)[0]  # [batch_size, sequence_length, hidden_size]
        text_features = text_features[:, 0, :]  # Take the [CLS] token representation
        return self.text_projection(text_features)
    
    def encode_audio(self, audio):
        audio_features = self.audio_model(**audio)[0]
        audio_features = audio_features.mean(dim=1)  # Average pooling over time
        return self.audio_projection(audio_features)
    
    def forward(self, text, audio):
        text_features = self.encode_text(text)
        audio_features = self.encode_audio(audio)
        
        # Normalize features
        text_features = F.normalize(text_features, dim=-1)
        audio_features = F.normalize(audio_features, dim=-1)
        
        # Compute similarity scores
        logit_scale = self.logit_scale.exp()
        logits_per_text_audio = logit_scale * text_features @ audio_features.t()
        
        return logits_per_text_audio

class AudioSetDataset(torch.utils.data.Dataset):
    def __init__(self, dataset, text_tokenizer, audio_processor):
        self.dataset = dataset
        self.text_tokenizer = text_tokenizer
        self.audio_processor = audio_processor
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        item = self.dataset[idx]
        
        # Process label as text
        text = self.text_tokenizer(item['labels'][0], padding='max_length', truncation=True, return_tensors='pt')
        
        # Process audio
        audio = self.audio_processor(item['audio']['array'], sampling_rate=item['audio']['sampling_rate'], return_tensors='pt')
        
        return {
            'text': text,
            'audio': audio,
            'label': item['labels'][0]
        }

def train(model, dataloader, optimizer, device):
    model.train()
    for batch in dataloader:
        optimizer.zero_grad()
        
        text = {k: v.to(device) for k, v in batch['text'].items()}
        audio = {k: v.to(device) for k, v in batch['audio'].items()}
        
        logits_per_text_audio = model(text, audio)
        
        # Compute contrastive loss
        labels = torch.arange(logits_per_text_audio.shape[0]).to(device)
        loss = F.cross_entropy(logits_per_text_audio, labels)
        
        loss.backward()
        optimizer.step()
        
        print(f"Loss: {loss.item()}")

if __name__ == "__main__":
    # Initialize model and components
    text_model_name = "bert-base-uncased"
    audio_model_name = "facebook/wav2vec2-base"
    
    model = AudioCLIPForAudioSet(text_model_name, audio_model_name)
    
    text_tokenizer = AutoTokenizer.from_pretrained(text_model_name)
    audio_processor = AutoFeatureExtractor.from_pretrained(audio_model_name)
    
    # Load AudioSet dataset
    audioset = load_dataset("google/audioset", split="train[:1000]")  # Using a subset for demonstration
    
    dataset = AudioSetDataset(audioset, text_tokenizer, audio_processor)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    # Train the model
    for epoch in range(5):  # 5 epochs for demonstration
        print(f"Epoch {epoch+1}")
        train(model, dataloader, optimizer, device)

DatasetNotFoundError: Dataset 'google/audioset' doesn't exist on the Hub or cannot be accessed.

In [7]:
from datasets import load_dataset

# Load the VGGSound dataset
vggsound = load_dataset("harrym/vggsound", split="train")  # Change the split as needed

# Print some examples
print(vggsound[0])  # Print the first example


DatasetNotFoundError: Dataset 'harrym/vggsound' doesn't exist on the Hub or cannot be accessed.

In [8]:
#Speech command dataset
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel, AutoFeatureExtractor, AutoTokenizer
from datasets import load_dataset
import numpy as np

class AudioCLIPForSpeechCommands(nn.Module):
    def __init__(self, text_model_name, audio_model_name, projection_dim=512):
        super(AudioCLIPForSpeechCommands, self).__init__()
        
        # Text encoder (for labels)
        self.text_model = AutoModel.from_pretrained(text_model_name)
        self.text_projection = nn.Linear(self.text_model.config.hidden_size, projection_dim)
        
        # Audio encoder
        self.audio_model = AutoModel.from_pretrained(audio_model_name)
        self.audio_projection = nn.Linear(self.audio_model.config.hidden_size, projection_dim)
        
        # Temperature parameter
        self.logit_scale = nn.Parameter(torch.ones([]) * np.log(1 / 0.07))
        
    def encode_text(self, text):
        text_features = self.text_model(**text)[0]  # [batch_size, sequence_length, hidden_size]
        text_features = text_features[:, 0, :]  # Take the [CLS] token representation
        return self.text_projection(text_features)
    
    def encode_audio(self, audio):
        audio_features = self.audio_model(**audio)[0]
        audio_features = audio_features.mean(dim=1)  # Average pooling over time
        return self.audio_projection(audio_features)
    
    def forward(self, text, audio):
        text_features = self.encode_text(text)
        audio_features = self.encode_audio(audio)
        
        # Normalize features
        text_features = F.normalize(text_features, dim=-1)
        audio_features = F.normalize(audio_features, dim=-1)
        
        # Compute similarity scores
        logit_scale = self.logit_scale.exp()
        logits_per_text_audio = logit_scale * text_features @ audio_features.t()
        
        return logits_per_text_audio

class SpeechCommandsDataset(torch.utils.data.Dataset):
    def __init__(self, dataset, text_tokenizer, audio_processor):
        self.dataset = dataset
        self.text_tokenizer = text_tokenizer
        self.audio_processor = audio_processor
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        item = self.dataset[idx]
        
        # Process label as text
        text = self.text_tokenizer(item['label'], padding='max_length', truncation=True, return_tensors='pt')
        
        # Process audio
        audio = self.audio_processor(item['audio']['array'], sampling_rate=item['audio']['sampling_rate'], return_tensors='pt')
        
        return {
            'text': text,
            'audio': audio,
            'label': item['label']
        }

def train(model, dataloader, optimizer, device):
    model.train()
    for batch in dataloader:
        optimizer.zero_grad()
        
        text = {k: v.to(device) for k, v in batch['text'].items()}
        audio = {k: v.to(device) for k, v in batch['audio'].items()}
        
        logits_per_text_audio = model(text, audio)
        
        # Compute contrastive loss
        labels = torch.arange(logits_per_text_audio.shape[0]).to(device)
        loss = F.cross_entropy(logits_per_text_audio, labels)
        
        loss.backward()
        optimizer.step()
        
        print(f"Loss: {loss.item()}")

if __name__ == "__main__":
    # Initialize model and components
    text_model_name = "bert-base-uncased"
    audio_model_name = "facebook/wav2vec2-base"
    
    model = AudioCLIPForSpeechCommands(text_model_name, audio_model_name)
    
    text_tokenizer = AutoTokenizer.from_pretrained(text_model_name)
    audio_processor = AutoFeatureExtractor.from_pretrained(audio_model_name)
    
    # Load Speech Commands dataset
    speech_commands = load_dataset("speech_commands", "v0.01", split="train[:1000]")  # Using a subset for demonstration
    
    dataset = SpeechCommandsDataset(speech_commands, text_tokenizer, audio_processor)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    # Train the model
    for epoch in range(5):  # 5 epochs for demonstration
        print(f"Epoch {epoch+1}")
        train(model, dataloader, optimizer, device)

ValueError: The repository for speech_commands contains custom code which must be executed to correctly load the dataset. You can inspect the repository content at https://hf.co/datasets/speech_commands.
Please pass the argument `trust_remote_code=True` to allow custom code to be run.

In [9]:
#flicker30k-audio dataset

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models, transforms
from transformers import BertTokenizer, BertModel, Wav2Vec2Model, Wav2Vec2FeatureExtractor
from datasets import load_dataset
from torch.utils.data import DataLoader, Dataset

class ImageEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        resnet = models.resnet50(pretrained=True)
        self.encoder = nn.Sequential(*list(resnet.children())[:-1])
        self.fc = nn.Linear(2048, 512)

    def forward(self, x):
        x = self.encoder(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return F.normalize(x, p=2, dim=1)

class TextEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.encoder = BertModel.from_pretrained('bert-base-uncased')
        self.fc = nn.Linear(768, 512)

    def forward(self, text):
        inputs = self.tokenizer(text, return_tensors='pt', padding=True, truncation=True, max_length=128)
        inputs = {k: v.to(next(self.encoder.parameters()).device) for k, v in inputs.items()}
        outputs = self.encoder(**inputs)
        x = outputs.last_hidden_state[:, 0, :]  # CLS token
        x = self.fc(x)
        return F.normalize(x, p=2, dim=1)

class AudioEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained("facebook/wav2vec2-base")
        self.encoder = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")
        self.fc = nn.Linear(768, 512)

    def forward(self, audio):
        inputs = self.feature_extractor(audio, return_tensors="pt", padding=True)
        inputs = {k: v.to(next(self.encoder.parameters()).device) for k, v in inputs.items()}
        outputs = self.encoder(**inputs)
        x = outputs.last_hidden_state.mean(dim=1)  # Average pooling
        x = self.fc(x)
        return F.normalize(x, p=2, dim=1)

class ExtendedAudioCLIP(nn.Module):
    def __init__(self):
        super().__init__()
        self.image_encoder = ImageEncoder()
        self.text_encoder = TextEncoder()
        self.audio_encoder = AudioEncoder()
        self.fusion_layer = nn.Linear(512 * 3, 512)

    def forward(self, image, text, audio):
        image_features = self.image_encoder(image)
        text_features = self.text_encoder(text)
        audio_features = self.audio_encoder(audio)

        fused_features = torch.cat([image_features, text_features, audio_features], dim=1)
        fused_features = self.fusion_layer(fused_features)
        return F.normalize(fused_features, p=2, dim=1)

def multi_modal_contrastive_loss(fused_features, labels, temperature=0.07):
    similarity_matrix = torch.matmul(fused_features, fused_features.T) / temperature
    labels_expanded = labels.unsqueeze(0) == labels.unsqueeze(1)
    labels_expanded = labels_expanded.float()

    mask = torch.eye(labels_expanded.shape[0], device=fused_features.device).bool()
    labels_expanded.masked_fill_(mask, 0)

    similarity_matrix = torch.exp(similarity_matrix)
    numerator = similarity_matrix * labels_expanded
    denominator = similarity_matrix.sum(dim=1, keepdim=True) - similarity_matrix.diag().unsqueeze(1)

    loss = -torch.log(numerator.sum(dim=1) / denominator.squeeze(1))
    return loss.mean()

class Flickr30kAUDDataset(Dataset):
    def __init__(self, split='train'):
        self.dataset = load_dataset("google/flue", "image_caption_flickr30k_aud", split=split)
        self.image_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def __getitem__(self, idx):
        item = self.dataset[idx]
        image = self.image_transform(item['image'])
        text = item['caption'][0]  # Use only the first caption for simplicity
        audio = item['audio']['array']
        return image, text, audio, idx

    def __len__(self):
        return len(self.dataset)

def train(model, dataloader, optimizer, device):
    model.train()
    total_loss = 0

    for batch in dataloader:
        image, text, audio, labels = batch
        image, audio, labels = image.to(device), audio.to(device), labels.to(device)

        optimizer.zero_grad()
        fused_features = model(image, text, audio)
        loss = multi_modal_contrastive_loss(fused_features, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)

def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = ExtendedAudioCLIP().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

    dataset = Flickr30kAUDDataset(split='train')
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)

    num_epochs = 10
    for epoch in range(num_epochs):
        loss = train(model, dataloader, optimizer, device)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss:.4f}")

    torch.save(model.state_dict(), "extended_audioclip_model.pth")

if __name__ == "__main__":
    main()

DatasetNotFoundError: Dataset 'google/flue' doesn't exist on the Hub or cannot be accessed.

In [10]:
from datasets import load_dataset

ds = load_dataset("Loie/VGGSound")

Downloading data:   0%|          | 0/20 [00:00<?, ?files/s]