Pobieranie danych z bucketa

In [1]:
!wget --no-check-certificate --no-proxy "url"

Wypakowanie danych do folderu /content/ na Google Colab

In [2]:
import tarfile

tar = tarfile.open('/content/audioSpectrograms2.tar.gz')
tar.extractall()
tar.close()

Funkcje służące do wizualizacji wyników:

In [4]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import copy
from sklearn import svm, datasets
from sklearn.metrics import roc_curve, auc


def visualise_learining_process(epochs, losses_on_test, losses_on_train, xtick):
    plt.plot(epochs, losses_on_test, 'yo')
    plt.plot(epochs, losses_on_train, 'bo')
    plt.title("Value of the cost function")
    plt.legend(['test', 'train'])
    plt.xlabel("Epoch")
    plt.xticks(np.arange(0, len(epochs), xtick))
    plt.show()

def visualise_errors_for_class(classify_table, class_index):
    '''
    Plots number of incorrect classifications of elements from specified class with division where elements were
    classified by network.

    Parameters:
    -----------
    class_index: class number
    '''
    indices = np.arange(10)
    p = list()
    table = copy.deepcopy(classify_table)
    table[np.argmax(table, 0), np.argmax(table, 1)] = 0
    plt.bar(indices, table[:, class_index])
    plt.xticks(indices)
    plt.title("Number of incorrect classifications of elements from class {}".format(class_index))
    plt.xlabel("Class returned by network")
    plt.show()
    
def visualise_errors_by_class(classify_table):
    '''
    Plots a barplot with classes on x-axis and number of bad classifications in each class. Colors division represents
    proportion showing to which class were classified examples that were classified incorrect.
    '''
    indices = np.arange(10)
    p = list()
    table = copy.deepcopy(classify_table)
    table[np.argmax(table, 0), np.argmax(table, 1)] = 0
    p.append(plt.bar(indices, table[:, 0]))
    for i in range(1, 10):
        p.append(plt.bar(indices, table[:, i], bottom = np.sum(table[:, 0:i], 1)))
    plt.title("Number of incorrect classifications")
    plt.xticks(indices)
    plt.xlabel("Correct class")
    plt.legend(list(range(0,10)), title = "class returned by network", bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
    plt.show()
    
def visualise_effectiveness_by_class(classify_table):
    '''
    Plots a barplot showing rate of correct classified examples for each class separately.
    '''
    indices = np.arange(10)
    results = [ classify_table[i,i] / np.sum(classify_table[i, :]) for i in range(10)]
    plt.bar(indices, results, color = ['#2ca25f', '#2c7fb8'])
    plt.title("Rate of correct classifications to each class")
    plt.xlabel('Class')
    plt.xticks(indices)
    plt.show()
    
def roc_analysis(labels, results, xlim, ylim, draw_diag = False):
    '''
    Plots ROC curve with AUC values calculated for elements from testing set with class division.

    Parameters:
    -----------
    xlim, ylim: lists of two parameters specifing start and end of respectively x and y axis
    '''
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i in range(0, 10):
        fpr[i], tpr[i], _ = roc_curve(labels[i, :], results[i, :])
        roc_auc[i] = auc(fpr[i], tpr[i])
    plt.figure(figsize=(7, 5), dpi= 80)
    for i in range(0, 10):
        plt.plot(fpr[i], tpr[i], lw=2, label='ROC curve (AUC = {:0.5f}) for {} class'.format(roc_auc[i], i))
    if (draw_diag == True):
        plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim(xlim)
    plt.ylim(ylim)
    plt.xlabel('False Positive Rate', size = 12)
    plt.ylabel('True Positive Rate', size = 12)
    plt.title('ROC curve for binary classification task', size = 13)
    plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
    plt.show()

Dedykowany Dataset:

In [7]:
#https://github.com/utkuozbulak/pytorch-custom-dataset-examples/blob/master/src/custom_datasets.py
import pandas as pd
import numpy as np
from PIL import Image

import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data.dataset import Dataset  # For custom datasets


class AudioDataset(Dataset):
    def __init__(self, csv_path, transform):
        """
        Args:
            csv_path (string): path to csv file
            img_path (string): path to the folder where images are
            transform: pytorch transforms for transforms and tensor conversion
        """
        # Transforms
        self.trans = transform
        # Read the csv file
        self.data_info = pd.read_csv(csv_path, header=0, sep = ';')
        # First column contains the image paths
        self.data_info.iloc[:, 0] = self.data_info.iloc[:, 0].apply(lambda line: '/content' + line[1:]) # for colab purpose
        self.image_arr = np.asarray(self.data_info.iloc[:, 0])
        # Third column is the labels
        self.label_arr = np.asarray(self.data_info.iloc[:, 2])
        self.data_len = len(self.data_info.index)

    def __getitem__(self, index):
        # Get image name from the pandas df
        single_image_name = self.image_arr[index]
        # Open image
        img_as_img = Image.open(single_image_name)

        img_as_tensor = self.trans(img_as_img)

        # Get label(class) of the image based on the cropped pandas column
        single_image_label = self.label_arr[index]
        
        return (img_as_tensor, single_image_label)

    def __len__(self):
        return self.data_len

Model sieci konwolucyjnej:

In [None]:
import torch
import torch.nn as nn

# Napisac wstepnie jakis convnet, a potem sie sproboje LSTM jak to zadziala
class NetModel(nn.Module):
    def __init__(self, number_of_classes = 10):
        super(NetModel, self).__init__()
        self.conv1 = nn.Conv2d(in_channels = 1, out_channels = 8, stride = 1, kernel_size = 5, padding = 1)
        self.relu1 = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(kernel_size = 2, stride = 2)
        self.conv2 = nn.Conv2d(in_channels = 8, out_channels = 16, stride = 1, kernel_size = 5, padding = 0)
        self.relu2 = nn.ReLU()
        self.maxpool2 = nn.MaxPool2d(kernel_size = 2, stride = 2)
        self.linear1 = nn.Linear(16 * 16 * 16, 300) # Tutaj trzeba zmienic wartosci (przeliczyc) 
        self.relu3 = nn.ReLU()
        self.drop1 = nn.Dropout(p = 0.5)
        self.linear2 = nn.Linear(300, number_of_classes)
        
        self.net = nn.Sequential(self.conv1, self.relu1, self.maxpool1, 
                                 self.conv2, self.relu2, self.maxpool2)
                                             
    def forward(self, x):
        x = self.net(x)
        x = x.view(-1, x.shape[0] , 16 * 16 * 16) # Tutaj zmienic wartosci
        x = self.linear1(x)
        x = self.relu3(x)
        x = self.drop1(x)
        x = self.linear2(x)
        return x
      


Model sieci LSTM:

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, number_of_classes = 10):
        super(LSTMModel, self).__init__()
        self.conv1 = nn.Conv2d(in_channels = 1, out_channels = 8, stride = 1, kernel_size = 5, padding = 1)
        self.relu1 = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(kernel_size = 2, stride = 2)
        self.LSTM = nn.LSTM(input_size= 36 * 36 * 8, hidden_size= 300)
        self.relu_lstm = nn.ReLU()
        self.linear1 = nn.Linear(300, 84) # Tutaj trzeba zmienic wartosci (przeliczyc) 
        self.relu3 = nn.ReLU()
        self.drop1 = nn.Dropout(p = 0.5)
        self.linear2 = nn.Linear(84, number_of_classes)
        
        self.net = nn.Sequential(self.conv1, self.relu1, self.maxpool1)
                                             
    def forward(self, x):
        x = self.net(x)
        x = x.view(-1, x.shape[0] , 36 * 36 * 8) # Tutaj zmienic wartosci
        x = self.LSTM(x)
        x = self.relu_lstm(x[0])
        x = self.linear1(x)
        x = self.relu3(x)
        x = self.drop1(x)
        x = self.linear2(x)
        return x

Klasa sieci:

In [None]:
# torch libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import numpy as np
import torchvision.datasets as datasets
from torchvision.transforms import transforms
from torch.utils.data import DataLoader
from torch.optim import Adadelta, Adam

# Others
import matplotlib.pyplot as plt
from IPython.core.debugger import set_trace


# Network
class Network:
    
    def __init__(self, batch_size, learning_rate, l2_reg):
        self.batch_size = batch_size
        self.lr = learning_rate
        self.l2_reg = l2_reg
        self.train_set , self.test_set = self.datasets()
        self.train_loader, self.test_loader = self.data_loaders(self.batch_size)
        self.model = NetModel(number_of_classes = 10).cuda() # dopisac model
        self.optimizer = Adam(self.model.parameters(), lr = self.lr, weight_decay=self.l2_reg)
        self.loss_fun = nn.CrossEntropyLoss()
        self.classify_table = np.zeros((10,10))
        self.training_tracking = [list(), list()]

        
    def data_loaders(self, batch_train_size): ### Tutaj na batch size test trzeba ustawic rozmiar zbioru
        train_loader = DataLoader(self.train_set, batch_size = batch_train_size, shuffle = True, num_workers = 4)
        test_loader = DataLoader(self.test_set, batch_size = len(self.test_set), shuffle = False, num_workers = 4)
        return train_loader, test_loader
    
    def datasets(self):
        train_set = AudioDataset('/content/train.csv', transform = transforms.ToTensor())
        test_set = AudioDataset('/content/test.csv', transform = transforms.ToTensor())
        return train_set, test_set

    def train(self, epochs):
        #entering training mode
        cuda0 = torch.device('cuda:0')
        for epoch in range(1 , epochs + 1):
            for i, (img, label) in enumerate(self.train_loader):
                img = img.to(cuda0)
                label = label.to(cuda0)
                #set_trace()
                self.model.train()
                self.optimizer.zero_grad()
                prediction = self.model(img)
                #set_trace()
                loss = self.loss_fun(prediction[0], label) # tu moze sie sypac ten indexer jak mam nie rgb obrazki
                loss.backward()
                self.optimizer.step()
                test_eval = self.test_evaluate()
                #print(i)
            test_eval = self.test_evaluate()
            train_eval = self.train_evaluate()
            self.training_tracking[0].append(float(test_eval[1]))
            self.training_tracking[1].append(float(train_eval[1]))
            print('Epoch {}: acc on train: {}, acc on test {}'.format(epoch, train_eval[0], test_eval[0]))
        self.visualise_learing_process() # to jest do zmiany
                
    
    def train_evaluate(self):
        with torch.no_grad():
            self.model.eval()
            class_table = np.zeros((10,10))
            loss = 0.0
            true_counter = 0
            cuda0 = torch.device('cuda:0')
            for i, (img, label) in enumerate(self.train_loader):
                img = img.to(cuda0)
                label = label.to(cuda0)
                prediction = self.model(img)
                loss += self.loss_fun(prediction[0], label) # to prediction moze miec inny wymiar
                _ , prediction = torch.max(prediction[0].data, 1)
                self.update_classify_table(class_table, prediction, label.data)
                true_counter += torch.sum(prediction == label.data)
            return (true_counter.cpu().numpy() / len(self.train_set)) , (loss * self.batch_size / len(self.train_set))

    def test_evaluate(self):
        with torch.no_grad():
            self.model.eval()
            self.classify_table = np.zeros((10,10))
            true_counter = 0
            loss = 0.0
            cuda0 = torch.device('cuda:0')
            for i, (img, label) in enumerate(self.test_loader):
                img = img.to(cuda0)
                label = label.to(cuda0)
                prediction = self.model(img)
                loss += self.loss_fun(prediction[0], label)
                _ , prediction = torch.max(prediction[0].data, 1)  # tu moze sie sypac ten indexer jak mam nie rgb obrazki
                self.update_classify_table(self.classify_table, prediction, label.data)
                true_counter += torch.sum(prediction == label.data)
            return (true_counter.cpu().numpy() / len(self.test_set)) , loss 
    
    
    def update_classify_table(self, classify_table, predictions, labels):
        for lab, pred in zip(labels, predictions):
            classify_table[lab, pred] += 1
    
    def vector_label(self, labels):
        lab = np.zeros((10, len(labels)))
        lab[labels, range(len(labels))] = 1
        return lab
            
    #visualisation
    def visualise_learing_process(self): # tutaj poprawic
        epochs = range(1, len(self.training_tracking[0]) + 1)
        visualise_learining_process(epochs, self.training_tracking[0], self.training_tracking[1], 3)
    

In [8]:
net = Network(batch_size = 100, learning_rate = 0.001, l2_reg = 0.001)

In [None]:
num_epochs = 30
net.train(30)