In [1]:
import torch
import torch.nn as nn
from transformers import ASTConfig, ASTModel, ASTFeatureExtractor
import numpy as np
import librosa
import os
import glob
from torch.utils.data import TensorDataset, DataLoader, random_split
from tqdm import tqdm
import torch.optim as optim
import matplotlib.pyplot as plt

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("PyTorch is using GPU 🟢")
else:
    device = torch.device("cpu")
    print("PyTorch is using CPU 🟡")

PyTorch is using GPU 🟢


In [3]:
dir_path = r'C:../dataset/audioonly/labeled/set 2.2'
os.path.exists(dir_path)

True

In [4]:
class_name = next(os.walk(dir_path))[1]
class_name.remove('.ipynb_checkpoints')
class_name

['bpain', 'discomfort', 'hungry', 'tired']

In [5]:
audio_dir = [os.path.join(dir_path, name) for name in class_name]
audio_dir

['C:../dataset/audioonly/labeled/set 2.2\\bpain',
 'C:../dataset/audioonly/labeled/set 2.2\\discomfort',
 'C:../dataset/audioonly/labeled/set 2.2\\hungry',
 'C:../dataset/audioonly/labeled/set 2.2\\tired']

In [6]:
bpain_audio = glob.glob(os.path.join(audio_dir[0], '*.wav'))
discomf_audio = glob.glob(os.path.join(audio_dir[1], '*.wav'))
hungry_audio = glob.glob(os.path.join(audio_dir[2], '*.wav'))
tired_audio = glob.glob(os.path.join(audio_dir[3], '*.wav'))

In [7]:
len(hungry_audio)

100

In [8]:
audio_path_class = {
    'bpain': bpain_audio,
    'discomf': discomf_audio,
    'hungry': hungry_audio,
    'tired': tired_audio
}

## **전처리 하기**

In [9]:
def convert_audio(pathdict):
    audio_load = {}

    for class_name, path in tqdm(pathdict.items()):
        temp = []
        for file in path:
            audio, sr = librosa.load(file, sr = 20000)
            temp.append(audio)
        audio_load[class_name] = temp
    return audio_load

In [10]:
loaded_audio = convert_audio(audio_path_class)

100%|████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:02<00:00,  1.36it/s]


In [13]:
from transformers import AutoProcessor

processor = AutoProcessor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593", sampling_rate=20000)



## **훈련데이터 및 테스트셋 만들기**

In [14]:
input_byclass = {}
for class_name, audio_load in loaded_audio.items():
    temp = []
    for audio in audio_load:
        input = processor(audio, sampling_rate = 20000, return_tensor = 'np')
        temp.append(input['input_values'])
    input_byclass[class_name] = np.array(temp)

In [15]:
all_data = []
all_labels = []

for label, class_data in enumerate(input_byclass):
    all_data.append(input_byclass[class_data])
    all_labels.append(np.full(len(input_byclass[class_data]), label))

In [16]:
all_data = np.concatenate(all_data, axis=0)
all_labels = np.concatenate(all_labels, axis=0)

In [17]:
all_data.shape

(340, 1, 1024, 128)

In [18]:
squeezed_data_np = np.squeeze(all_data, axis = 1)
squeezed_data_np.shape

(340, 1024, 128)

In [19]:
from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(squeezed_data_np, all_labels, test_size = 0.2, random_state = 33)

In [20]:
x_train, x_test, y_train, y_test = train_test_split(x_train, y_train, test_size = 0.2, random_state = 33)

In [21]:
# Convert to PyTorch tensors
xtrain_tensor = torch.tensor(x_train, dtype=torch.float32)
xval_tensor = torch.tensor(x_val, dtype=torch.float32)

ytrain_tensor = torch.tensor(y_train, dtype=torch.long)
yval_tensor = torch.tensor(y_val, dtype=torch.long)

xtest_tensor = torch.tensor(x_test, dtype=torch.float32)
ytest_tensor = torch.tensor(y_test, dtype=torch.long)

In [22]:
print(xtrain_tensor.shape)
print(xval_tensor.shape)
print(ytrain_tensor.shape)
print(yval_tensor.shape)
print(xtest_tensor.shape)
print(ytest_tensor.shape)

torch.Size([217, 1024, 128])
torch.Size([68, 1024, 128])
torch.Size([217])
torch.Size([68])
torch.Size([55, 1024, 128])
torch.Size([55])


In [23]:
train_dataset = TensorDataset(xtrain_tensor, ytrain_tensor)
val_dataset = TensorDataset(xval_tensor, yval_tensor)
test_dataset = TensorDataset(xtest_tensor, ytest_tensor)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=True)

## **모델 불러오기**

In [None]:
import torch.nn.functional as F
class CustomASTClassifier(nn.Module):
    def __init__(self, ast_model_name, num_labels):
        super().__init__()
        self.astconfig = ASTConfig() # 괄호 안에 configuration 값 넣어주면 됩니다. 아마 sampling rate도 20000으로 맞춰주시면 될 것 같아요.
        self.ast = ASTModel.from_pretrained(ast_model_name, self.astconfig) # 요거 안되면 질문주세요
        self.num_labels = num_labels
        for param in self.ast.parameters():
            param.requires_grad = False

        # 요 아래코드에 Convolution 과 classifier 설정해주시면 됩니다..

        # 예시
        '''
        self.conv2d = nn.Conv2d(in_channels=1,  
                                out_channels=32,  
                                kernel_size=(5, 5),  
                                stride=1)
                                
        self.pool = nn.AdaptiveAvgPool2d((1, 768))

        self.classifier = nn.Linear(24576, 48)
        self.add_layer_01 = nn.Linear(48, num_labels)
        self.num_labels = num_labels
        '''

    def forward(self, input_values, labels=None):
        input_values = input_values.float()
        outputs = self.ast(input_values)
        embeddings = outputs.last_hidden_state

        # 요 아래 부분부터 Feed Forward 부분이니 위에서 설정한 Convolution과 classifier를 넣어주시면 됩니다.
        # 예시)
        '''
        embeddings = embeddings.unsqueeze(1)
        
        # Convolution block
        conv_output = self.conv2d(embeddings)
        conv_output = nn.LeakyReLU(0.1)(conv_output)
        pooled_output = self.pool(conv_output)

        flattened_output = pooled_output.view(pooled_output.size(0), -1)
        logits = self.classifier(flattened_output)
        logits = torch.tanh(logits)
        logits = self.add_layer_01(logits)
        '''

        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1).long())
            return loss, logits
        return logits

In [None]:
import torch.optim as optim
import matplotlib.pyplot as plt

num_labels = 4
ast_model_name = "MIT/ast-finetuned-audioset-10-10-0.4593"
model = CustomASTClassifier(ast_model_name, num_labels).to(device)

# Define your optimizer, loss function, etc.
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Initialize metric lists
train_losses, train_accuracies, val_losses, val_accuracies = [], [], [], []

In [None]:
# 여기서 ast 모델이 다 freeze 됐는지 확인하시면 되요
for name, param in model.ast.named_parameters():
    if param.requires_grad:
        print(f"{name}: requires_grad is True")
    else:
        print(f"{name}: requires_grad is False")

In [None]:
# Training loop
for epoch in range(10):  # 에폭은 임구님이 원하시는 값을 넣어주시면 됩니다.
    total_loss, correct, total = 0, 0, 0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}")

    # Training phase
    for batch in train_loader:
        optimizer.zero_grad()
        input_values, labels = batch
        input_values, labels = input_values.to(device), labels.to(device)

        loss, logits = model(input_values, labels)
        total_loss += loss.item()

        # Calculate accuracy
        preds = logits.argmax(dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

        loss.backward()
        optimizer.step()

        progress_bar.set_postfix(loss=loss.item())

    train_accuracy = correct / total
    train_losses.append(total_loss / len(train_loader))
    train_accuracies.append(train_accuracy)

    # Validation phase
    if val_loader is not None:
        model.eval()
        with torch.no_grad():
            val_loss, correct, total = 0, 0, 0
            for batch in val_loader:
                input_values, labels = batch
                input_values, labels = input_values.to(device), labels.to(device)

                loss, logits = model(input_values, labels)
                val_loss += loss.item()
                preds = logits.argmax(dim=1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)

            val_accuracy = correct / total
            val_losses.append(val_loss / len(val_loader))
            val_accuracies.append(val_accuracy)

        model.train()

In [None]:
# Plotting
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss')
if val_loader is not None:
    plt.plot(val_losses, label='Validation Loss')
plt.title("Loss Over Epochs")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Training Accuracy')
if val_loader is not None:
    plt.plot(val_accuracies, label='Validation Accuracy')
plt.title("Accuracy Over Epochs")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
# plt.savefig("win500_set2.2_test_epc200_flattened.png") # 그래프 사진은 주석풀어서 사진이름만 설정해주시면 됩니다.

plt.show()

In [None]:
model.eval()  # Set the model to evaluation mode

correct = 0
total = 0
with torch.no_grad():  # Disable gradient computation
    for data in test_loader:  # dataloader is your DataLoader for the dataset
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = correct / total
print(f'Accuracy: {accuracy * 100:.2f}%')