In [1]:
%matplotlib inline

In [2]:
!pip install torch torchvision

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
import numpy as np
import pandas as pd
import sklearn
import torch
import librosa
import librosa.display

  _resample_loop_p(x, t_out, interp_win, interp_delta, num_table, scale, y)


In [4]:
import torchaudio
import os
import random
import matplotlib.pyplot as plt

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
# load metadata
metadata = pd.read_csv("/content/drive/MyDrive/UStest/UAV-velocity-prediction-master/information.csv")
metadata.head()

Unnamed: 0.1,Unnamed: 0,fname,directory,model,label
0,0,2022-05-14_06-18-26-0.wav,./dataset/big_fast_0719/,X8SW,1
1,1,2022-05-14_06-18-26-1.wav,./dataset/big_fast_0719/,X8SW,1
2,2,2022-05-14_06-18-26-2.wav,./dataset/big_fast_0719/,X8SW,1
3,3,2022-05-14_06-18-26-3.wav,./dataset/big_fast_0719/,X8SW,1
4,4,2022-05-14_06-18-26-4.wav,./dataset/big_fast_0719/,X8SW,1


In [7]:
class AudioUtil():
    def open(audio_file):
        y, sr = librosa.load(audio_file)
        return y, sr

    # data augmentation function
    def time_shift(aud, shift_limit):
        y, sr = aud
        _, sig_len = y.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return y.roll(shift_amt), sr
    
    def MFCCs(y, sr):
        mfccs = librosa.feature.mfcc(y=y, sr=sr)
        mfccs_scaled = np.mean(mfccs.T, axis=0)
        return mfccs_scaled

In [8]:
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio

In [9]:
class CustomDataset(Dataset):
    def __init__(self, root, label):
        # file root
        self.root = root
        # slow = 0, fast = 1
        self.label = label

        fs = [os.path.join(root, f) for f in os.listdir(self.root)]
        # all file path
        self.data_files = [f for f in fs if os.path.isfile(f)]
        self.label = [label] * len(self.data_files)
    
    # __len__
    def __len__(self):
        return len(self.data_files)
    
    def __getitem__(self, idx):
        y, sr = AudioUtil.open(self.data_files[idx])
        mfcc = AudioUtil.MFCCs(y, sr)
        return mfcc, torch.tensor(self.label[idx])

In [10]:
# file path
big_fast_path = "/content/drive/MyDrive/UStest/UAV-velocity-prediction-master/dataset/big_fast"
big_slow_path = "/content/drive/MyDrive/UStest/UAV-velocity-prediction-master/dataset/big_slow"

In [11]:
slow_dataset = CustomDataset(big_slow_path, label = 0)
fast_dataset = CustomDataset(big_fast_path, label = 1)

slow_train, slow_valid, slow_test = torch.utils.data.random_split(slow_dataset,
[int(len(slow_dataset)*0.8), int(len(slow_dataset)*0.1), len(slow_dataset) - int(len(slow_dataset) * 0.8) - int(len(slow_dataset) * 0.1)],
generator=torch.Generator().manual_seed(42))

fast_train, fast_valid, fast_test = torch.utils.data.random_split(fast_dataset,
[int(len(fast_dataset)*0.8), int(len(fast_dataset)*0.1), len(fast_dataset) - int(len(fast_dataset) * 0.8) - int(len(fast_dataset) * 0.1)],
generator=torch.Generator().manual_seed(42))

In [12]:
train_dataset = torch.utils.data.ConcatDataset([slow_train, fast_train])
val_dataset = torch.utils.data.ConcatDataset([slow_valid, fast_valid])
test_dataset = torch.utils.data.ConcatDataset([slow_test, fast_test])

In [13]:
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16)
val_loader = DataLoader(val_dataset, batch_size=16)

In [14]:
import torch
import torch.nn as nn
from tqdm import tqdm
from sklearn.metrics import accuracy_score

In [15]:
#JY-Depthwise Separable Convolution (CNN Lightweight)
class depthwise_separable_conv(nn.Module):
    def __init__(self, nin, nout, kernel_size, padding, bias=False):
        super(depthwise_separable_conv, self).__init__()
        self.depthwise = nn.Conv1d(nin, nin, kernel_size=kernel_size, padding=padding, groups=nin, bias=bias)
        self.pointwise = nn.Conv1d(nin, nout, kernel_size=1, bias=bias)

    def forward(self, x):
        out = self.depthwise(x)
        out = self.pointwise(out)
        return out

In [16]:
#JY-CNN Structure with reference to Xception Structure
class ClassifireCNN(nn.Module):
    def __init__(self, drop_out=0.0):
        super(ClassifireCNN, self).__init__()
        self.entry_flow = nn.Sequential(
            nn.Conv1d(20, 32, kernel_size = 3, stride=2, padding=1, bias=False),
            nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
        )
        self.entry_flow_residual = nn.Conv1d(20, 32, kernel_size=1, stride=2, padding=0)
        self.middle_flow = nn.Sequential(
            nn.ReLU(True),
            depthwise_separable_conv(32, 32, 3, 1)
        )
        self.middle_flow_residual = nn.Conv1d(32, 32, kernel_size=1, stride=2, padding=0)
        self.exit_flow = nn.Sequential(
            nn.ReLU(True),
            depthwise_separable_conv(32, 8, 3, 1),
            nn.ReLU(True),
            depthwise_separable_conv(8, 1, 3, 1)
        )
        self.exit_flow_residual = nn.Conv1d(32, 1, kernel_size=1, stride=2, padding=0)
        self.cnn1 = depthwise_separable_conv(nin=20, nout=32, kernel_size=5, padding=2)
        self.cnn2 = depthwise_separable_conv(nin=32, nout=8, kernel_size=5, padding=2)
        self.cnn3 = depthwise_separable_conv(nin=8, nout= 1, kernel_size=5, padding=2)
        self.relu = nn.ReLU()

        self.drop_out = nn.Dropout(p=drop_out)

    def forward(self, x):
        # input : [16, 20], [batch, feature]
        x = torch.reshape(x, (-1, 20, 1))   #[batch, feature, 1]
        x1 = self.entry_flow(x) + self.entry_flow_residual(x)
        x2 = self.middle_flow(x1) + self.middle_flow_residual(x1)
        x3 = self.exit_flow(x2) + self.exit_flow_residual(x2)
        x = torch.sigmoid(x3)

        return x.view(-1)

In [17]:
device = torch.device(f'cuda:0' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda', index=0)

In [18]:
LR = 0.045
PATIENCE = 3
FACTOR = 0.95
DROP_OUT = 0.3
EPOCHS = 100

In [19]:
model = ClassifireCNN(drop_out=DROP_OUT).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
criterion = nn.BCELoss()

In [20]:
best_auc = 0
best_epoch = -1
best_pred = []

prev_model = None

val_losses = []
train_losses = []
val_accuracy = []
train_accuracy = []

In [None]:
for i in tqdm(range(EPOCHS)):

    # Train
    loss_sum = 0
    train_loss_sum = 0
    val_loss_sum = 0
    true_labels = []
    pred_labels = []
    t_true_labels = []
    t_pred_labels = []
    v_true_labels = []
    v_pred_labels = []
    model.train()
    if i == 0:
        lr = LR
    elif i % 2 == 0 and i != 0:
        lr *= 0.94
        optimizer = torch.optim.Adam(model.parameters(), lr=LR)
        
    for e_num, (x, y) in enumerate(train_loader):

        x, y = x.type(torch.FloatTensor).to(device), y.type(torch.FloatTensor).to(device)
        
        model.zero_grad()
        pred_y = model(x)

        tloss = criterion(pred_y, y)
        train_loss_sum += tloss.detach()
        
        optimizer.zero_grad()
        tloss.backward()
        optimizer.step()

        true_labels.extend(y.cpu().numpy())
        pred_labels.extend(np.around(pred_y.cpu().detach().numpy()))
        t_true_labels.extend(y.cpu().numpy())
        t_pred_labels.extend(np.around(pred_y.cpu().detach().numpy()))

    train_auc = accuracy_score(t_true_labels, t_pred_labels)
    train_accuracy.append(train_auc.item())
    auc = accuracy_score(true_labels, pred_labels)
    train_losses.append(train_loss_sum.item())

    loss_sum = train_loss_sum
    loss = tloss
    # Valid
    for e_num, (x, y) in enumerate(val_loader):
        x, y = x.type(torch.FloatTensor).to(device), y.type(torch.FloatTensor).to(device)

        pred_y = model(x)
        vloss = criterion(pred_y, y)
        loss = vloss

        val_loss_sum += vloss.detach()
        
        loss_sum += loss.detach()
        true_labels.extend(y.cpu().numpy())
        pred_labels.extend(np.around(pred_y.cpu().detach().numpy()))
        v_true_labels.extend(y.cpu().numpy())
        v_pred_labels.extend(np.around(pred_y.cpu().detach().numpy()))
        
        
    val_auc = accuracy_score(v_true_labels, v_pred_labels)
    val_accuracy.append(val_auc.item())
    auc = accuracy_score(true_labels, pred_labels)
    val_losses.append(val_loss_sum.item())

    
    
    # wirter.add_scalar("")

    if auc > best_auc:
        best_pred = pred_labels
        best_auc = auc
        best_epoch = i

        if prev_model is not None:
            os.remove(prev_model)
        prev_model = f'cnn_model_{best_auc}.h5'
        torch.save(model.state_dict(), prev_model)

    if i % 5 == 4:
      print(f"best validation acc = {best_auc}, in epoch {best_epoch}")
      print("loss = ", loss_sum)
      print("train_loss = ", train_loss_sum, "val_loss", val_loss_sum)
      print("auc = ", auc)
      print("train_auc = ", train_auc, "val_auc = ", val_auc)

print(f"best validation acc = {best_auc}, in epoch {best_epoch}")

  2%|▏         | 2/100 [06:32<4:47:54, 176.27s/it]

In [None]:
#JY-loss graph
plt.figure(figsize=(10,5))
plt.title("Training and Validation Loss")
plt.plot(val_losses,label="val")
plt.plot(train_losses,label="train")
plt.xlabel("EPOCHS")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
#JY-accuracy graph
plt.figure(figsize=(10,5))
plt.title("Training and Validation Accuracy")
plt.plot(val_accuracy,label="val")
plt.plot(train_accuracy,label="train")
plt.xlabel("EPOCHS")
plt.ylabel("Accuracy")
plt.legend()
plt.show()