In [2]:
import os
import pandas as pd
import torch
import numpy as np
import json

import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
import scaleogram as scg

from torch.utils.data import Dataset
from skimage import io

from torch.utils.data import DataLoader

In [3]:
def get_signal(sound_path: str):
    samplerate, x = wavfile.read(sound_path)
    return samplerate, x

def from_string_to_label(object_name:str):
    label_map = {
        'big_drone': 0,
        'bird': 1,
        'free_space': 2,
        'human': 3,
        'small_copter': 4
    }
    
    return label_map.get(object_name, -1)

def convert_gray2rgb(image):
    width, height = image.shape
    out = np.empty((width, height, 3), dtype=np.uint8)
    out[:, :, 0] = image
    out[:, :, 1] = image
    out[:, :, 2] = image

    return out

def normalize_scaleogram(coefs):
    # coefs_c = np.int32(coefs[:, ::96])
    min_coefs = np.min(coefs)
    max_coefs = np.max(coefs)
    normalized_coefs = np.int8(((coefs - min_coefs) / (max_coefs - min_coefs)) * 255)
    
    normalized_image = normalized_coefs.astype(np.uint8)

    return normalized_image

def load_json(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)
    return data

def get_scaleogram(sound_path, spectrum=None, wavelet=None, scales=None):
    sample_rate, signal = get_signal(sound_path)

    if not scales:
        scales = scg.periods2scales(np.logspace(np.log10(2), np.log10(1000)), wavelet)

    signal_length = signal.shape[0] / sample_rate
    time = np.linspace(0, signal_length, signal.shape[0])
    cwt = scg.CWT(time=time, signal=signal, scales=scales, wavelet=wavelet)

    if spectrum == 'amp':
        return np.abs(cwt.coefs), cwt.scales_freq
    elif spectrum == 'real':
        return np.real(cwt.coefs), cwt.scales_freq
    elif spectrum == 'imag':
        return np.imag(cwt.coefs), cwt.scales_freq
    return cwt.coefs, cwt.scales_freq

In [4]:
class DroneDataset(Dataset):
    def __init__(self, json_file, root_dir, transform=None):
        json_path = os.path.join(root_dir, json_file)
        self.data = load_json(json_path)
        self.transform = transform
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        
        image_arr = np.array(self.data[index].get('coefs'))
        normalized_image = normalize_scaleogram(image_arr)
        normalized_rgb_image = convert_gray2rgb(normalized_image)
        
        object_label = from_string_to_label(self.data[index].get('object'))
        y_label = torch.tensor(int(object_label))
        
        if self.transform:
            normalized_rgb_image = self.transform(normalized_rgb_image)
            
        return (normalized_rgb_image, y_label)

In [4]:
cur_folder_path = os.getcwd()
datasets_path = os.path.join(cur_folder_path, 'new_datasets')

# os.listdir(cur_folder_path)

In [5]:
json_file = os.path.join(datasets_path,'dataset.json')

json_train = os.path.join(datasets_path,'train_triple_shan.json')
json_valid = os.path.join(datasets_path,'valid_triple_shan.json')

transform = transforms.Compose([
    # transforms.Resize((224, 224)),  
    transforms.ToTensor()
])

# data_set = DroneDataset(json_file, cur_folder_path, transform)

train_set = DroneDataset(json_train, cur_folder_path, transform)
valid_set = DroneDataset(json_valid, cur_folder_path, transform)

In [6]:
batch_size = 30

train_loader = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(dataset=valid_set, batch_size=batch_size, shuffle=True)

In [6]:
num_classes = 5

model = models.resnet18(pretrained=True) 
model.fc = nn.Linear(model.fc.in_features, num_classes)

# model.load_state_dict(torch.load('resnet18_cmor_model.pth'))



In [7]:
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
model.to(device)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [8]:
in_channel = 3
learning_rate = 0.001
num_epochs = 10

In [9]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [21]:
import time
from tqdm import tqdm

start = time.time()

accuracy = []
avarage_loss = []

for epoch in tqdm(range(1, num_epochs+1)):
    losses = []
    num_correct = 0
    num_samples = 0
    
    model.train()
    for batch_idx, (data, targets) in enumerate(train_loader):
        data, targets = data.to(device=device), targets.to(device=device)
        scores = model(data)
        loss = criterion(scores, targets)
        
        losses.append(loss.item())
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        _, predictions = scores.max(1)
        num_correct += (predictions == targets).sum().item()
        num_samples += predictions.size(0)
    
    train_accuracy = num_correct / num_samples
    avg_loss = sum(losses) / len(losses)
    
    accuracy.append(train_accuracy)
    avarage_loss.append(avg_loss)
    
    print(f'Epoch {epoch}: Average Loss = {avg_loss:.4f}, Accuracy = {train_accuracy * 100:.2f}%')
        
finish = time.time()
print('Finished Training in ', finish - start)


torch.save(model.state_dict(), './resnet50_shan_triple_model.pth')

 10%|█         | 1/10 [00:35<05:23, 35.89s/it]

Epoch 1: Average Loss = 0.0265, Accuracy = 99.01%


 20%|██        | 2/10 [01:15<05:05, 38.15s/it]

Epoch 2: Average Loss = 0.0243, Accuracy = 99.09%


 30%|███       | 3/10 [01:46<04:05, 35.02s/it]

Epoch 3: Average Loss = 0.0173, Accuracy = 99.45%


 40%|████      | 4/10 [02:24<03:36, 36.07s/it]

Epoch 4: Average Loss = 0.0248, Accuracy = 99.18%


 50%|█████     | 5/10 [03:03<03:06, 37.24s/it]

Epoch 5: Average Loss = 0.0226, Accuracy = 99.21%


 60%|██████    | 6/10 [03:42<02:30, 37.55s/it]

Epoch 6: Average Loss = 0.0209, Accuracy = 99.29%


 70%|███████   | 7/10 [04:22<01:55, 38.45s/it]

Epoch 7: Average Loss = 0.0104, Accuracy = 99.63%


 80%|████████  | 8/10 [05:03<01:18, 39.15s/it]

Epoch 8: Average Loss = 0.0239, Accuracy = 99.21%


 90%|█████████ | 9/10 [05:37<00:37, 37.63s/it]

Epoch 9: Average Loss = 0.0266, Accuracy = 99.06%


100%|██████████| 10/10 [06:10<00:00, 37.03s/it]

Epoch 10: Average Loss = 0.0131, Accuracy = 99.61%
Finished Training in  370.3462772369385





In [16]:
def check_accuracy(loader, model):
    num_correct = 0
    num_samples = 0
    model.eval()
    
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device=device), y.to(device=device)
            
            scores = model(x)
            _, predictions = scores.max(1)
            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)
            
    accuracy = num_correct / num_samples
    print(f'Accuracy: {accuracy * 100:.5f}%')

In [24]:
check_accuracy(train_loader, model)

Accuracy: 99.70383%


In [25]:
check_accuracy(valid_loader, model)

Accuracy: 92.13166%


In [49]:
transform = transforms.Compose([
    # transforms.Resize((224, 224)),  
    transforms.ToTensor()
])

In [58]:
start = time.time()
pred_class, probabilities = test_audio(model, 'bird_1_99.wav', transform, wavelet)
end = time.time()

elapsed = end - start
print("\nProcessing for: {:.3f} seconds".format(elapsed))


Processing for: 0.156 seconds


In [53]:
pred_class, probabilities

(1,
 array([4.9301896e-09, 9.9999940e-01, 4.2976815e-08, 5.0224122e-07,
        8.8973543e-08], dtype=float32))

In [33]:
    label_map = {
        'big_drone': 0,
        'bird': 1,
        'free_space': 2,
        'human': 3,
        'small_copter': 4
    }