In [1]:
import glob, os
import pandas as pd
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import itertools

from collections import Counter

from sklearn.svm import SVR, SVC
from sklearn.model_selection import GridSearchCV
from sklearn.kernel_ridge import KernelRidge
from sklearn import  metrics

from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score

import torch
import torch.nn as nn
from happy_szczurki.models.CNN import ConvNet

NameError: name 's' is not defined

In [2]:
from types import SimpleNamespace 
from typing import Tuple


class Dataset:
    def __init__(self, path):
        self.path = path

        dataset = dict(np.load(self.path, allow_pickle=True))
        
        self.X = dataset.pop('X')
        self.y = dataset.pop('y')
        self.meta = dataset.pop('meta')[()]
        
        assert len(dataset) == 0
        
        self.y_binary = np.where(self.y == None, 0, 1)
    
    def normalize(self, mean=None, std=None):
        # TODO: czy powinniśmy normalizować per współrzędna czy globalnie?
        if mean is None:
            mean = np.mean(self.X, axis=0)
            
        if std is None:
            std = np.std(self.X, axis=0, ddof=1)
            
        # NOTE: inplace operators to prevent memory allocations
        self.X -= mean
        self.X /= std
        
        return mean, std
    
    def sample(self, n, *, balanced=False, with_idx=False, random_state=None, x_with_frame=False) -> Tuple[np.ndarray, np.ndarray]:
        """
        Choice `n` random samples from dataset.
        
        @param n: number of random samples to choose,
        @param balanced: if True number of samples for each class will be aprox. equal,
        @param with_idx: return data indexes of sampled records,
        @param random_state: random state used to generate samples indices,
        """
        assert len(self.y.shape) == 1
        
        if balanced:
            counts = Counter(self.y)
            class_count = len(counts)
            
            # NOTE: https://stackoverflow.com/questions/35215161/most-efficient-way-to-map-function-over-numpy-array/35216364
            probs = np.array([1.0 / (class_count * counts[x]) for x in self.y])
        else:
            probs = None
            
        idx = np.random.RandomState(random_state).choice(self.y.size, size=n, p=probs)
        
        new_X = np.pad(self.X, pad_width=[(128, 128), (0, 0)], mode='edge') 
        
        if x_with_frame:
            X = np.array([new_X[index: index + 257] for index in idx])
        else:
            X = new_X[idx + 128]
        
        if with_idx:
            return idx, X, self.y[idx]
        
        return X, self.y[idx]
    
    def frame_to_time(self, frame: int) -> float:
        """Convert frame id to time in sec."""
        return librosa.core.frames_to_time(
            frame,
            sr=self.meta['sampling_rate'],
            hop_length=self.meta['hop_length'],
            n_fft=self.meta['n_fft']
        )

In [3]:
train_data = test_data = None
train_data = Dataset('ch1-2018-11-20_10-29-02_0000012.wav.trimed.npz')
print(train_data.path, Counter(train_data.y_binary))

mean, std = train_data.normalize()
print("mean: %f; std: %f" % (np.mean(train_data.X), np.std(train_data.X, ddof=1)))

#################################################################
print()
test_data = Dataset('ch1-2018-11-20_10-31-42_0000014.wav.trimed.npz')
print(test_data.path, Counter(test_data.y_binary))

mean, std = test_data.normalize()
print("mean: %f; std: %f" % (np.mean(test_data.X), np.std(test_data.X, ddof=1)))

ch1-2018-11-20_10-29-02_0000012.wav.trimed.npz Counter({0: 196372, 1: 38013})
mean: 0.000000; std: 1.000209

ch1-2018-11-20_10-31-42_0000014.wav.trimed.npz Counter({0: 146541, 1: 87844})
mean: 0.000000; std: 1.000337


In [4]:
np.place(train_data.y, train_data.y == None, "None")
np.place(test_data.y, test_data.y == None, "None")


In [5]:
from sklearn.preprocessing import OrdinalEncoder
enc_train = OrdinalEncoder(dtype=int)
enc_train.fit(train_data.y.reshape(-1, 1))
enc_train.categories_

[array(['CMP', 'FL', 'FM', 'IU', 'None', 'RP', 'SH', 'ST', 'TR'],
       dtype=object)]

In [6]:
enc_test = OrdinalEncoder(dtype=int)
enc_test.fit(test_data.y.reshape(-1, 1))
enc_test.categories_

[array(['22kHz', 'CMP', 'FL', 'FM', 'None', 'RP', 'SH', 'ST', 'TR'],
       dtype=object)]

In [7]:
layer_params = [
    {
        "output_size": 32,
        "kernel_size": 5,
        "stride": 1,
        "padding": 2,
        "max_pool_kernel": 3,
        "max_pool_stride": 2
    },
    {
        "output_size": 64,
        "kernel_size": 5,
        "stride": 1,
        "padding": 2,
        "max_pool_kernel": 2,
        "max_pool_stride": 2
    },
    {
        "output_size": 128,
        "kernel_size": 5,
        "stride": 1,
        "padding": 2,
        "max_pool_kernel": 5,
        "max_pool_stride": 3
    },
    {
        "output_size": 256,
        "kernel_size": 5,
        "stride": 1,
        "padding": 2,
        "max_pool_kernel": 5,
        "max_pool_stride": 2
    }
]

hidden_dims = [
    (8 * 8 * 256), 100, 9
]

model = ConvNet(layer_params, hidden_dims)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

NameError: name 'ConvNet' is not defined

In [None]:
# Train the model

num_epochs = 20
total_step = 20
loss_list = []
acc_list = []
test_acc_list = []
for epoch in range(num_epochs):
    
    for i in range(total_step):
        images, labels = train_data.sample(1, balanced=True, with_idx=False, x_with_frame=True)
        images = torch.tensor(np.expand_dims(images, axis=1))
        labels = torch.tensor(enc_train.transform(labels.reshape(-1, 1)).reshape(-1), dtype=torch.long)
        
        # Run the forward pass
        outputs = model(images)
        
        loss = criterion(outputs, labels)
#         print(loss)
        loss_list.append(loss.item())

        # Backprop and perform Adam optimisation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Track the accuracy
        total = labels.size(0)
        _, predicted = torch.max(outputs.data, 1)
        print(predicted)
        print(labels)
        correct = (predicted == labels).sum().item()
        acc_list.append(correct / total)
        
        print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuracy: {:.2f}%'
                  .format(epoch + 1, num_epochs, i + 1, total_step, loss.item(),
                          (correct / total) * 100))
        
    # Test after epoch
    
    images, labels = test_data.sample(1, balanced=True, with_idx=False, x_with_frame=True)
    images = torch.tensor(np.expand_dims(images, axis=1))
    labels = torch.tensor(enc_test.transform(labels.reshape(-1, 1)).reshape(-1), dtype=torch.long)

    # Run the forward pass
    outputs = model(images)
    
    total = labels.size(0)
    _, predicted = torch.max(outputs.data, 1)
    print(predicted)
    print(labels)
    acc = (predicted == labels).sum().item() / total
    print("TEST ACC: ", acc )
    test_acc_list.append(acc)
    
    

try:
- without equal sampling
- different test file
- larger train and/or test data