<a href="https://colab.research.google.com/github/qihuazhong/cnn-gunshot-detection/blob/main/Gunshots.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import librosa
from functools import lru_cache
import random
from tqdm import tqdm
from numpy import linalg as LA

**Local dataset**

In [None]:
# # Path to downloaded dataset 
# ROOT_DIR = './participants_dataset/'
# DF_PATH = f'{ROOT_DIR}/participant_urbansound8k.csv'

**Alternatively, use google drive**

In [None]:
!gdown 1izWL6k1DtS0FyiB1iTbKY7vjWn0CyqE1

Access denied with the following error:

 	Cannot retrieve the public link of the file. You may need to change
	the permission to 'Anyone with the link', or have had many accesses. 

You may still be able to access the file from the browser:

	 https://drive.google.com/uc?id=1izWL6k1DtS0FyiB1iTbKY7vjWn0CyqE1 



In [None]:
!unzip participants_dataset.zip

ln: failed to create symbolic link 'participants_dataset.zip': File exists


In [None]:
ROOT_DIR = '.'
DF_PATH = f'{ROOT_DIR}/participant_urbansound8k.csv'

**Data preprocessing**

In [None]:
N_MFCC = 60 # 30 or 60, both will produce the same scores given enough training


def make_dataframe(df_path):
    df = pd.read_csv(df_path)
    durations = []
    paths = []
    for idx, row in df.iterrows():
        path = f"{ROOT_DIR}/fold{row.fold}/{row.slice_file_name}"
        duration = librosa.get_duration(filename=path)
        durations.append(duration)
        paths.append(path)
    df['duration'] = durations
    df['path'] = paths
    return df

class FixedTimeSRPreprocessor:
    def __init__(self, max_duration, target_sr):
        self.max_duration = max_duration
        self.target_sr = target_sr
        self.max_length = int(target_sr * max_duration)

    def preprocess(self, y, sr):
        y_hat = librosa.resample(y, orig_sr=sr, target_sr=self.target_sr)
        y_hat = librosa.util.fix_length(y_hat, size=self.max_length)
        return y_hat, self.target_sr

class MFCCFeatureExtractor:
    def __init__(self, n_mfcc):
        self.n_mfcc = n_mfcc
        
    def process(self, y, sr):
        return librosa.feature.mfcc(y=y, sr=sr, n_mfcc=self.n_mfcc)#.flatten()

class TrainingSet:
    def __init__(self, df, preprocessor, feature_extractor):
        self.train_idxs = df['fold'].isin(range(9))
        self.train_df = df[self.train_idxs].copy()
        self.preprocessor = preprocessor
        self.feature_extractor = feature_extractor

    def __len__(self):
        return self.train_df.shape[0]
    
    @lru_cache()
    def __getitem__(self, idx):
        sample = self.train_df.iloc[idx].to_dict()
        label = sample['Label']
        y, sr = librosa.load(sample['path'])
        y_hat, target_sr = self.preprocessor.preprocess(y, sr)
        features = self.feature_extractor.process(y_hat, target_sr)
        return features, label
    
    
class TrainingSetAugmented:
    def __init__(self, df, preprocessor, feature_extractor, nagetive_samples):
        self.train_idxs = df['fold'].isin(range(9))
        self.train_df = df[self.train_idxs].copy()
        self.preprocessor = preprocessor
        self.feature_extractor = feature_extractor
        self.nagetive_samples = nagetive_samples
        
    def __len__(self):
        return self.train_df.shape[0]
    
    def __getitem__(self, idx):
        sample = self.train_df.iloc[idx].to_dict()
        label = sample['Label']
        y, sr = librosa.load(sample['path'])
        y_hat, target_sr = self.preprocessor.preprocess(y, sr)
        
        neg_sample_idx = random.sample(nagetive_samples, k=1)
        neg_sample = self.train_df.iloc[neg_sample_idx[0]].to_dict()
        y_neg, sr_neg = librosa.load(neg_sample['path'])
        y_hat_neg, _ = self.preprocessor.preprocess(y_neg, sr_neg)
        
        y_hat = self.augment(y_hat, y_hat_neg)
        features = self.feature_extractor.process(y_hat, target_sr)
        return features, label
    
    def augment(self, pos_sample, neg_sample):
        """
        Augment with background noises (from negative samples).
        positive sample + background noise should still be positive 
        
        https://pytorch.org/tutorials/beginner/audio_data_augmentation_tutorial.html#adding-background-noise
        """
        pos_sample_rms = LA.norm(pos_sample, 2)
        neg_sample_rms = LA.norm(neg_sample, 2)

        snr_db = 20 # Signal to Noise Ratio
        snr = 10 ** (snr_db / 20)
        scale = snr * neg_sample_rms / pos_sample_rms
        noisy_pos_sample = (pos_sample * scale + neg_sample ) / 2
        
        return noisy_pos_sample

In [None]:
class TestSet:
    def __init__(self, df, preprocessor, feature_extractor):
        self.test_idxs = df['fold'].isin(range(9, 13))
        self.test_df = df[self.test_idxs].copy()
        self.preprocessor = preprocessor
        self.feature_extractor = feature_extractor

    def __len__(self):
        return self.test_df.shape[0]
    
    @lru_cache()
    def __getitem__(self, idx):
        sample = self.test_df.iloc[idx].to_dict()
        y, sr = librosa.load(sample['path'])
        y_hat, target_sr = self.preprocessor.preprocess(y, sr)

        features = self.feature_extractor.process(y_hat, target_sr)
        return features

        

In [None]:
import numpy as np


df = make_dataframe(DF_PATH)
preprocessor = FixedTimeSRPreprocessor(max_duration=4.0, target_sr=22050)
feature_extractor = MFCCFeatureExtractor(n_mfcc=N_MFCC)

training_set = TrainingSet(df, preprocessor, feature_extractor)
test_set = TestSet(df, preprocessor, feature_extractor)

positive_samples = df[(df['Label'] == True)].index.values.tolist()
nagetive_samples = df[(df['Label'] == False)].index.values.tolist()

training_set_augmented = TrainingSetAugmented(df, preprocessor, feature_extractor, nagetive_samples)


In [None]:
def get_positive_samples(k=60):
    """
    Get random positive samples
    """
    pos_sample_indices = random.sample(positive_samples, k=k)
    pos_samples = torch.stack([
        torch.Tensor(training_set[idx][0]) for idx in pos_sample_indices
    ])
    
    return pos_samples

def get_noisy_positive_samples(k=60):
    """
    Get random positive samples
    """
    pos_sample_indices = random.sample(positive_samples, k=k)
    pos_samples = torch.stack([
        torch.Tensor(training_set_augmented[idx][0]) for idx in pos_sample_indices
    ])
    
    return pos_samples

In [None]:
!pip install torchmetrics

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchmetrics
  Downloading torchmetrics-0.10.0-py3-none-any.whl (529 kB)
[K     |████████████████████████████████| 529 kB 5.2 MB/s 
Installing collected packages: torchmetrics
Successfully installed torchmetrics-0.10.0


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

import torch.nn.functional as F
import torchaudio.transforms as T

from torch.utils.data import DataLoader
from torchmetrics import Accuracy, F1Score

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

**Simple Custom CNN Model**

In [None]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=256, kernel_size=(N_MFCC, 12), stride=(3))
        self.conv2 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=(1, 12), stride=(3))
    
        self.bn = nn.BatchNorm2d(512)
        self.drop = nn.Dropout2d(p=0.5)

        self.fc1 = nn.Linear(512 * 15, 1024)
        self.fc2 = nn.Linear(1024, 2)
        self.flatten = nn.Flatten()
        

    def forward(self, x):

        x = F.relu(self.conv1(x))
        x = self.drop(x)
#         print(x.shape)
        x = F.relu(self.conv2(x))
        x = self.drop(x)
#         print(x.shape)
        x = self.bn(x)
        
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x))

        return x


net = SimpleCNN().to(device)

In [None]:
net

SimpleCNN(
  (conv1): Conv2d(1, 256, kernel_size=(60, 12), stride=(3, 3))
  (conv2): Conv2d(256, 512, kernel_size=(1, 12), stride=(3, 3))
  (bn): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (drop): Dropout2d(p=0.5, inplace=False)
  (fc1): Linear(in_features=7680, out_features=1024, bias=True)
  (fc2): Linear(in_features=1024, out_features=2, bias=True)
  (flatten): Flatten(start_dim=1, end_dim=-1)
)

In [None]:
from torch.utils.tensorboard import SummaryWriter

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=3e-4, momentum=0.9)

In [None]:
batch_size = 64
train_dataloader = DataLoader(training_set, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

In [None]:
accuracy_metric = Accuracy().to(device)
f1_metric = F1Score(num_classes=2).to(device)
report_freq = 20

def train(n_epochs=100, augment=False):
    """
    Adapted from the basic pytorch training script:
    https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html#train-the-network
    """
    
    net.train()
    # 1500 epochs in total
    for epoch in range(n_epochs):  # loop over the dataset multiple times
        running_acc = 0.0
        running_f1 = 0.0
        running_loss = 0.0

        for i, data in enumerate(train_dataloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # Because the samples are imbalanced (only ~5% positive ones), we over-sample the positivie instances
            if augment:
                inputs = torch.concat([inputs, get_noisy_positive_samples(5), get_positive_samples(55)])
            else:
                inputs = torch.concat([inputs, get_positive_samples()])
                
            # Unsqueeze the channel dimension and (toughly) normalize inputs
            inputs = torch.unsqueeze(torch.Tensor(inputs), dim=1).to(device) / 1000 

            labels = torch.concat([labels, torch.Tensor([True]*60)])
            labels = labels.long().to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()

            preds = outputs.max(1).indices
            running_acc +=accuracy_metric(preds, labels)
            running_f1 += f1_metric(preds, labels)

            if i % report_freq == report_freq-1:    # print every 20 mini-batches
                print(f'epoch:{epoch + 1}, batch:{i + 1:5d} loss: {running_loss / report_freq:.3f}')
                print(f'Acc={running_acc / report_freq:.3f}, f1= {running_f1 / report_freq:.3f}')

                running_loss = 0.0
                running_acc = 0.0
                running_f1 = 0.0

    print('Finished Training')

In [None]:
# Train without background noise augmentation
train(n_epochs=1300, augment=False)

  x = F.softmax(self.fc2(x))


epoch:1, batch:   20 loss: 0.680
Acc=0.545, f1= 0.545
epoch:1, batch:   40 loss: 0.640
Acc=0.714, f1= 0.714
epoch:1, batch:   60 loss: 0.603
Acc=0.789, f1= 0.789
epoch:1, batch:   80 loss: 0.567
Acc=0.832, f1= 0.832
epoch:1, batch:  100 loss: 0.538
Acc=0.847, f1= 0.847
epoch:2, batch:   20 loss: 0.508
Acc=0.857, f1= 0.857
epoch:2, batch:   40 loss: 0.494
Acc=0.867, f1= 0.867
epoch:2, batch:   60 loss: 0.478
Acc=0.872, f1= 0.872
epoch:2, batch:   80 loss: 0.462
Acc=0.890, f1= 0.890
epoch:2, batch:  100 loss: 0.467
Acc=0.871, f1= 0.871
epoch:3, batch:   20 loss: 0.441
Acc=0.899, f1= 0.899
epoch:3, batch:   40 loss: 0.454
Acc=0.878, f1= 0.878
epoch:3, batch:   60 loss: 0.445
Acc=0.886, f1= 0.886
epoch:3, batch:   80 loss: 0.439
Acc=0.893, f1= 0.893
epoch:3, batch:  100 loss: 0.431
Acc=0.897, f1= 0.897
epoch:4, batch:   20 loss: 0.434
Acc=0.893, f1= 0.893
epoch:4, batch:   40 loss: 0.433
Acc=0.891, f1= 0.891
epoch:4, batch:   60 loss: 0.435
Acc=0.891, f1= 0.891
epoch:4, batch:   80 loss: 0

In [None]:
# Train with background noise augmentation

# Augmentation requires extracting MFCC features on the fly, which is very heavy on CPU and slow. So only tried 50 epochs.
# Unfornately this gives worse results, so this part is skipped in the highest score submission

# train(n_epochs=50, augment=True)

  x = F.softmax(self.fc2(x))


epoch:1, batch:   20 loss: 0.321
Acc=0.993, f1= 0.993
epoch:1, batch:   40 loss: 0.314
Acc=0.999, f1= 0.999
epoch:1, batch:   60 loss: 0.317
Acc=0.996, f1= 0.996
epoch:1, batch:   80 loss: 0.319
Acc=0.995, f1= 0.995
epoch:1, batch:  100 loss: 0.319
Acc=0.995, f1= 0.995
epoch:2, batch:   20 loss: 0.319
Acc=0.995, f1= 0.995
epoch:2, batch:   40 loss: 0.317
Acc=0.997, f1= 0.997
epoch:2, batch:   60 loss: 0.319
Acc=0.995, f1= 0.995
epoch:2, batch:   80 loss: 0.316
Acc=0.998, f1= 0.998
epoch:2, batch:  100 loss: 0.319
Acc=0.994, f1= 0.994
epoch:3, batch:   20 loss: 0.317
Acc=0.997, f1= 0.997
epoch:3, batch:   40 loss: 0.317
Acc=0.996, f1= 0.996
epoch:3, batch:   60 loss: 0.319
Acc=0.995, f1= 0.995
epoch:3, batch:   80 loss: 0.318
Acc=0.996, f1= 0.996
epoch:3, batch:  100 loss: 0.317
Acc=0.997, f1= 0.997
epoch:4, batch:   20 loss: 0.317
Acc=0.997, f1= 0.997
epoch:4, batch:   40 loss: 0.317
Acc=0.996, f1= 0.996
epoch:4, batch:   60 loss: 0.318
Acc=0.996, f1= 0.996
epoch:4, batch:   80 loss: 0

In [None]:
net.eval()
preds_all = []

for i, inputs in enumerate(test_dataloader, 0):
    inputs = torch.unsqueeze(torch.Tensor(inputs), dim=1).to(device) / 1000
    outputs = net(inputs)
    preds = outputs.max(1).indices
    preds_all.append(preds)
    

  x = F.softmax(self.fc2(x))


In [None]:
test_set.test_df['Label'] = torch.concat(preds_all).bool().cpu()
pred_df = test_set.test_df[['slice_file_name', 'ID', 'fold', 'Label']]

In [None]:
pred_idx = 12
pred_df.to_csv(f'preds{pred_idx}.csv', index=False)

In [None]:
torch.save(net.state_dict(), f'./model_{pred_idx}.m')

In [None]:
# To load trained model
net.load_state_dict(torch.load(f'./model_{12}.m'))

<All keys matched successfully>