In [None]:
import os
import librosa
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import Wav2Vec2ForCTC, Wav2Vec2Tokenizer

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
# Set the path to your audio files and corresponding labels
data_dir = "./data/recordings"
train_files = set(os.listdir(os.path.join(data_dir, "train")))
valid_files = set(os.listdir(os.path.join(data_dir, "validate")))
test_files = set(os.listdir(os.path.join(data_dir, "test")))

record_df = pd.read_csv("./data/overview-of-recordings.csv")
record_df["split"] = record_df["file_name"].apply(lambda x: "train" if x in train_files else ("validate" if x in valid_files else "test"))
train_df = record_df[record_df['split'] == 'train']
valid_df = record_df[record_df['split'] == 'validate']
test_df = record_df[record_df['split'] == 'test']

# append data_dir to file names
train_files = [os.path.join(data_dir, "train", f) for f in train_df["file_name"]]
valid_files = [os.path.join(data_dir, "validate", f) for f in valid_df["file_name"]]
test_files = [os.path.join(data_dir, "test", f) for f in test_df["file_name"]]

prompt_to_id = {prompt: i for i, prompt in enumerate(record_df.prompt.unique())}

train_labels = train_df.prompt.apply(lambda x: prompt_to_id[x]).values
valid_labels = valid_df.prompt.apply(lambda x: prompt_to_id[x]).values
test_labels = test_df.prompt.apply(lambda x: prompt_to_id[x]).values


In [None]:
# Load pre-trained Wav2Vec 2.0 model and tokenizer
wave2vecmodel = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h")
tokenizer = Wav2Vec2Tokenizer.from_pretrained("facebook/wav2vec2-base-960h")

In [None]:
class AudioDataset(Dataset):
    def __init__(self, file_paths, labels, tokenizer, model, max_sequence_length):
        self.file_paths = file_paths
        self.labels = labels
        self.tokenizer = tokenizer
        self.model = model
        self.max_sequence_length = max_sequence_length

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        audio_path = self.file_paths[idx]
        label = self.labels[idx]

        # Load audio using librosa
        audio, rate = librosa.load(audio_path, sr=16000)

        # Tokenize and obtain hidden states from the pretrained Wav2Vec 2.0 model
        inputs = self.tokenizer(audio, return_tensors="pt", padding="longest", truncation=True)
        with torch.no_grad():
            outputs = self.model(**inputs).logits

        features = torch.argmax(outputs.squeeze(), dim=1)

        # Pad features to the maximum sequence length
        pad_size = self.max_sequence_length - features.size(0)
        features = torch.nn.functional.pad(features, (0, pad_size))

        label = torch.tensor(label, dtype=torch.long)

        return features.float(), label  # Convert features to float type

In [None]:
# Determine the overall maximum sequence length
max_sequence_length = 295730

train_dataset = AudioDataset(train_files, train_labels, tokenizer, wave2vecmodel, max_sequence_length)
valid_dataset = AudioDataset(valid_files, valid_labels, tokenizer, wave2vecmodel, max_sequence_length)
test_dataset = AudioDataset(test_files, test_labels, tokenizer, wave2vecmodel, max_sequence_length)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=8, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [None]:
class AudioClassifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super(AudioClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:

# Function to train the model
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for inputs, labels in train_loader:
            # Send inputs and labels to the device
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            
            outputs = model(inputs)

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        # Validate the model
        model.eval()
        val_loss = 0.0
        correct_predictions = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                # Send inputs and labels to the device
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)

                loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct_predictions += (predicted == labels).sum().item()

        val_accuracy = correct_predictions / len(val_loader.dataset)

        print(f'Epoch {epoch + 1}/{num_epochs} Training Loss: {running_loss / len(train_loader)} Validation Loss: {val_loss / len(val_loader)} Validation Accuracy: {val_accuracy * 100:.2f}%')

# Function to test the model
def test_model(model, test_loader, criterion):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            # Send inputs and labels to the device
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()

    accuracy = correct_predictions / len(test_loader.dataset)
    print(f'Test Loss: {running_loss / len(test_loader)} Accuracy: {accuracy * 100:.2f}%')

In [None]:
# Send the model to the device
model = AudioClassifier(max_sequence_length, num_classes=25).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model
train_model(model, test_loader, valid_loader, criterion, optimizer, num_epochs=10)

# Test the model
test_model(model, train_loader, criterion)
