In [1]:
from pathlib import Path
import random

import os
import sys
import glob


import librosa
import librosa.display

import simplejpeg
import numpy as np

import torch
import torchvision as tv

import matplotlib.pyplot as plt

from PIL import Image
from IPython.display import Audio, display

# sys.path.append(os.path.abspath(f'{os.getcwd()}/..'))
sys.path.append(os.path.join(os.getcwd(), 'AudioCLIP-master'))

from model import AudioCLIP
from utils.transforms import ToTensor1D

MODEL_FILENAME = 'AudioCLIP-Full-Training.pt'
# derived from ESResNeXt
SAMPLE_RATE = 44100
# derived from CLIP
IMAGE_SIZE = 224
IMAGE_MEAN = 0.48145466, 0.4578275, 0.40821073
IMAGE_STD = 0.26862954, 0.26130258, 0.27577711

LABELS = ['cat', 'thunderstorm', 'coughing', 'alarm clock', 'car horn']

Model Instantiation

In [2]:
aclp = AudioCLIP(pretrained=f'AudioCLIP-master/assets/{MODEL_FILENAME}')

Audio & Image Transforms

In [3]:
image_transforms = tv.transforms.Compose([
    tv.transforms.ToTensor(),
    tv.transforms.Resize(IMAGE_SIZE, interpolation=Image.BICUBIC),
    tv.transforms.CenterCrop(IMAGE_SIZE),
    tv.transforms.Normalize(IMAGE_MEAN, IMAGE_STD)
])

  "Argument 'interpolation' of type int is deprecated since 0.13 and will be removed in 0.15. "


Image Loading

In [4]:
from torchvision import datasets
import time

data_path = Path("data/")
image_path = data_path / "only_images/only_images_demo"

train_dir = image_path / "train"
test_dir = image_path / "test"

train_data = datasets.ImageFolder(root=train_dir)
test_data = datasets.ImageFolder(root=test_dir)

device = "cuda" if torch.cuda.is_available() else "cpu"

def process_audioclip(data, device):
    images = []
    labels = []
    for i in range(len(data)):
        img = data[i][0]
        if img.mode != 'RGB':
            img = img.convert('RGB')
        img = np.array(img)
        images.append(img)
        labels.append(data[i][1])
    
    with torch.no_grad():
        # AudioCLIP handles raw audio on input, so the input shape is [batch x channels x duration]
        #audio = torch.stack([audio_transforms(track.reshape(1, -1)) for track, _ in audio])
        # standard channel-first shape [batch x channels x height x width]
        images = torch.stack([image_transforms(image) for image in images])
        # textual input is processed internally, so no need to transform it beforehand
        #text = [[label] for label in LABELS]

        # images.to(device)
        
        # AudioCLIP's output: Tuple[Tuple[Features, Logits], Loss]
        # Features = Tuple[AudioFeatures, ImageFeatures, TextFeatures]
        # Logits = Tuple[AudioImageLogits, AudioTextLogits, ImageTextLogits]
        #((audio_features, _, _), _), _ = aclp(audio=audio)
        ((_, image_features, _), _), _ = aclp(image=images)
        #((_, _, text_features), _), _ = aclp(text=text)

        end_time = time.time()
        print(f"aclp完成: {end_time - start_time:.4f} seconds")
        
        #audio_features = audio_features / torch.linalg.norm(audio_features, dim=-1, keepdim=True)
        image_features = image_features / torch.linalg.norm(image_features, dim=-1, keepdim=True)
        #text_features = text_features / torch.linalg.norm(text_features, dim=-1, keepdim=True)

    return image_features, labels

start_time = time.time()

train_features_embedding, train_labels = process_audioclip(train_data, device)
test_features_embedding, test_labels = process_audioclip(test_data, device)

end_time = time.time()
print(f"Running time: {end_time - start_time:.4f} seconds")

aclp完成: 138.3516 seconds
aclp完成: 178.9609 seconds
Running time: 178.9622 seconds


In [5]:
from torch.utils.data import TensorDataset, DataLoader

# 转换成DataLoader可接受类型
train_tensor_data = TensorDataset(train_features_embedding, torch.tensor(train_labels))
test_tensor_data = TensorDataset(test_features_embedding, torch.tensor(test_labels))

train_dataloader = DataLoader(dataset=train_tensor_data,
                              batch_size=16, # how many samples per batch?
                              num_workers=1, # how many subprocesses to use for data loading? (higher = more)
                              shuffle=True) # shuffle the data?
test_dataloader = DataLoader(dataset=test_tensor_data,
                              batch_size=16, # how many samples per batch?
                              num_workers=1, # how many subprocesses to use for data loading? (higher = more)
                              shuffle=False) # shuffle the data?



In [26]:
train_labels

[0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,


Create model

In [6]:
from torch import nn

device = "cuda" if torch.cuda.is_available() else "cpu"

class MLP(nn.Module):
    def __init__(self, input_shape: int, output_shape: int):
        super().__init__()
        self.layer_1 = nn.Linear(in_features=input_shape, out_features=output_shape)
        #self.layer_2 = nn.Linear(in_features=10, out_features=10)
        #self.relu = nn.ReLU()

    def forward(self, x):
        return self.layer_1(x)


Create train & test loop functions

In [7]:
def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               device
              ):
    # Put model in train mode
    model.train()

    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0
    
    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        # Send data to target device
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_pred = model(X)

        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Calculate and accumulate accuracy metrics across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)

    # Adjust metrics to get average loss and accuracy per batch
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              device
             ):
    # Put model in eval mode
    model.eval()

    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0

    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            # Send data to target device
            X, y = X.to(device), y.to(device)

            # 1. Forward pass
            test_pred_logits = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()

            # Calculate and accumulate accuracy
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

    # Adjust metrics to get average loss and accuracy per batch
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

Creating a train() function

In [8]:
# from tqdm.auto import tqdm

# 1. Take in various parameters required for training and test steps
def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          device,
          loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
          epochs: int = 5,
         ):

    # 2. Create empty results dictionary
    results = {"train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }

    # 3. Loop through training and testing steps for a number of epochs
    #for epoch in tqdm(range(epochs)):
    for epoch in range(epochs):
        train_loss, train_acc = train_step(model=model,
                                           dataloader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer,
                                           device = device
                                          )
        test_loss, test_acc = test_step(model=model,
                                        dataloader=test_dataloader,
                                        loss_fn=loss_fn,
                                        device = device
                                       )

        # 4. Print out what's happening
        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f}"
        )

        # 5. Update results dictionary
        # Ensure all data is moved to CPU and converted to float for storage
        results["train_loss"].append(train_loss.item() if isinstance(train_loss, torch.Tensor) else train_loss)
        results["train_acc"].append(train_acc.item() if isinstance(train_acc, torch.Tensor) else train_acc)
        results["test_loss"].append(test_loss.item() if isinstance(test_loss, torch.Tensor) else test_loss)
        results["test_acc"].append(test_acc.item() if isinstance(test_acc, torch.Tensor) else test_acc)

    # 6. Return the filled results at the end of the epochs
    return results

Train and Evaluate Model

In [9]:
# Set random seeds
torch.manual_seed(42)
torch.cuda.manual_seed(42)

# Set number of epochs
NUM_EPOCHS = 10

# Recreate an instance of TinyVGG
model_my = MLP(input_shape=1024, output_shape=len(train_data.classes)).to(device)

# Setup loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model_my.parameters(), lr=0.001)

# Start the timer
from timeit import default_timer as timer
start_time = timer()

# Train model_0
model_my_results = train(model=model_my,
                         train_dataloader=train_dataloader,
                         test_dataloader=test_dataloader,
                         optimizer=optimizer,
                         device = device,
                         loss_fn=loss_fn,
                         epochs=NUM_EPOCHS)

# End the timer and print out how long it took
end_time = timer()
print(f"Total training time: {end_time-start_time:.3f} seconds")

Epoch: 1 | train_loss: 0.6962 | train_acc: 0.4575 | test_loss: 0.6913 | test_acc: 0.4688
Epoch: 2 | train_loss: 0.6865 | train_acc: 0.5950 | test_loss: 0.6834 | test_acc: 0.6094
Epoch: 3 | train_loss: 0.6808 | train_acc: 0.6700 | test_loss: 0.6798 | test_acc: 0.6484
Epoch: 4 | train_loss: 0.6763 | train_acc: 0.6650 | test_loss: 0.6768 | test_acc: 0.6797
Epoch: 5 | train_loss: 0.6694 | train_acc: 0.7050 | test_loss: 0.6736 | test_acc: 0.6641
Epoch: 6 | train_loss: 0.6657 | train_acc: 0.7175 | test_loss: 0.6691 | test_acc: 0.6797
Epoch: 7 | train_loss: 0.6598 | train_acc: 0.7000 | test_loss: 0.6666 | test_acc: 0.6641
Epoch: 8 | train_loss: 0.6552 | train_acc: 0.7050 | test_loss: 0.6635 | test_acc: 0.6641
Epoch: 9 | train_loss: 0.6506 | train_acc: 0.7375 | test_loss: 0.6605 | test_acc: 0.6562
Epoch: 10 | train_loss: 0.6467 | train_acc: 0.7225 | test_loss: 0.6591 | test_acc: 0.6641
Total training time: 15.928 seconds
