# Demonstracija korištenja neuronskih mreža

U ovoj demonstraciji ćemo istražiti kako se neuronske mreže mogu iskoristiti za rješavanje problema klasifikacije podataka. Koristiti ćemo PyTorch modul za Python, za što će Vam trebati Python 3.8+ i PyTorch 1.6+. Također je potrebno instalirati pakete `numpy`, `opencv-python`, `wandb`, `matplotlib`, `torchsummary` i `tqdm`.

Upoznavati ćemo komponente PyTorcha korak po korak, a za one koji žele znati više se preporuča sljedeći tutorial:  https://pytorch.org/tutorials/beginner/deep_learning_60min_blitz.html

Za postavljanje radne okoline, pokrenite sljedeće naredbe:

In [None]:
!pip install torch torchvision --index-url https://download.pytorch.org/whl/cu118
!pip install torchsummary
!pip install numpy matplotlib opencv-python
!pip install tqdm wandb

Za početak importamo sve module koje ćemo koristiti za demonstraciju.

In [None]:
import os

import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, Dataset, DataLoader

from torchsummary import summary

import cv2
import numpy as np
import matplotlib.pyplot as plt
import wandb

import tqdm

Weights & Biases je online sustav za rukovanje sa eksperimentima strojnog i dubokog učenja. Korisnički račun si možete napraviti besplatno na njihovoj stranici ukoliko želite koristiti taj (opcionalni, ali preporučeni) dio vježbe. Nakon što napravite račun, pod "Profilna slika" -> "Settings" -> "API keys" možete napraviti API ključ za pristup Vašem računu. Taj API ključ zapišite u "wandb_key.env" datoteku.

In [None]:
with open("wandb_key.env", 'r') as f:
    os.environ['WANDB_API_KEY'] = f.read()

## Eksperimenti sa neuronskim mrežama nad točkama i clusterima

Prvi problem koji ćemo pokušati riješiti je sljedeći: možemo li za neku novu točku reći kojem clusteru pripada?

Što to točno znači pogledajmo u sljedećem kodu gdje generiramo dataset. Generirati ćemo `num_of_clusters` clustera. "Cluster" se definira kao "a group of similar things or people positioned or occurring closely together". Iako bi prva ideja bila samo računanje udaljenosti od centra svakog clustera, to ne bi pokrilo sve slučajeve koje možemo sresti u podacima. Npr. neki clusteri mogu biti izduženi samo u jednoj dimenziji, što naivni model ne bi uspješno rješio.

Primjetimo i `train_test_ratio` varijablu. Pomoću nje određujemo nad kolikim postotkom podataka ćemo mreže učiti, a nad kolikim postotkom podataka ćemo mrežu evaluirati. Prisjetimo se, **želimo napraviti model koji dobro radi na neviđenim podacima**. Da bi to testirali, kod pripreme podataka odvajamo podskup koji će bit neviđen tijekom treniranja.

Clustere generiramo tako da je svaki cluster jedna normalna distribucija, a uzorci su uzorci i te normalne distribucije.

In [None]:
num_of_clusters = 5
num_of_samples = 50
train_test_ratio = 0.8

mean_xs = np.random.uniform(0, 10 * num_of_clusters, num_of_clusters).astype(np.uint8).astype(np.float32)
std_xs = np.random.uniform(0, num_of_clusters, num_of_clusters)

mean_ys = np.random.uniform(0, 10 * num_of_clusters, num_of_clusters).astype(np.uint8).astype(np.float32)
std_ys = np.random.uniform(0, num_of_clusters, num_of_clusters)

number_of_train_samples = int(num_of_clusters * num_of_samples * train_test_ratio)
number_of_test_samples = num_of_clusters * num_of_samples - number_of_train_samples

data_x_train = []
data_y_train = []
labels_train = []

data_x_test = []
data_y_test = []
labels_test = []

for idx in range(0, num_of_clusters):
    data_x_train.extend(np.random.normal(mean_xs[idx], std_xs[idx], number_of_train_samples // num_of_clusters))
    data_y_train.extend(np.random.normal(mean_ys[idx], std_ys[idx], number_of_train_samples // num_of_clusters))
    labels_train.extend([idx] * (number_of_train_samples // num_of_clusters))

    data_x_test.extend(np.random.normal(mean_xs[idx], std_xs[idx], number_of_test_samples // num_of_clusters))
    data_y_test.extend(np.random.normal(mean_ys[idx], std_ys[idx], number_of_test_samples // num_of_clusters))
    labels_test.extend([idx] * (number_of_test_samples // num_of_clusters))

Sljedeći kod nam vizualizira podatke koje smo generirali.

Napomena: Ukoliko Vam se desi da imate dva ili više clustera koji se jako preklapaju, preporučamo da ponovo pokrene gornji kod. Ponovite to dok niste zadovoljni sa razmakom clustera.

In [None]:
for idx in range(0, num_of_clusters):
    plt.scatter(data_x_train[(idx)*number_of_train_samples//num_of_clusters:(idx+1)*number_of_train_samples//num_of_clusters], data_y_train[(idx)*number_of_train_samples//num_of_clusters:(idx+1)*number_of_train_samples//num_of_clusters], label=idx)
plt.legend()
plt.title("Samples from the train set");

Pogledajmo i skup za testiranje. Možemo vidjeti da se ne radi o istim točkama, da ih je manje, ali da su istog karaktera kao primjeri iz train seta.

In [None]:
for idx in range(0, num_of_clusters):
    plt.scatter(data_x_test[(idx)*number_of_test_samples//num_of_clusters:(idx+1)*number_of_test_samples//num_of_clusters], data_y_test[(idx)*number_of_test_samples//num_of_clusters:(idx+1)*number_of_test_samples//num_of_clusters], label=idx)
plt.legend()
plt.title("Samples from the test set");

Pripremimo skup podataka. Da bi se podaci mogli koristiti u PyTorchu, potrebno je pripremiti DataLoader s Datasetom tih podataka. Prvo naše podatke spremamo u 'torch.Tensor' (ekvivalent matrice u numpy). Iz tih tenzora generiramo `TensorDataset`, koji radi točno što mu ime kaže - od `Tensor` radi `Dataset`. `DataLoader` prima kao prvi argument `Dataset`, a ostali argumenti definiraju ponašanje. U ovom slučaju, `batch_size`, koji utjeće na veličinu mini-skupine, i `shuffle` koji mjenja naš `DataLoader` na način da iteriranjem kroz njega ne dobivamo uvijek isti poredak uzoraka.

In [None]:
batch_size = 1

tensor_x_train = torch.Tensor(np.dstack([data_x_train, data_y_train]).reshape(number_of_train_samples, 2).astype(np.float32)) # komentirat zašto stackam
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(np.dstack([data_x_test, data_y_test]).reshape(number_of_test_samples, 2).astype(np.float32))
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

Definirajmo sad jednostavnu mrežu. Da bi u PyTorchu definirali mrežu, radimo klasu koja nasljeđuje `nn.Module`. Za implementaciju modela je potrebno implementirati dvije metode: konstruktor `__init__(self)` i metodu `forward(self, ...)`.

Želimo jednoslojnu neuronsku mrežu, koja prima koordinate točke koje provjeravamo (znači 2 broja) i vraća vjerojatnost pripadnosti za svaku klasu (cluster u ovom slučaju). Pogledajmo:

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 5)

    def forward(self, x):
        x = self.fc1(x)

        return F.log_softmax(x, dim=1)

Sad kad imamo model, pripremimo se za treniranje. Prvo ćemo definirati koliko epoha treniramo (`n_epochs`), kolika je stopa učenja (`learning_rate`) i na kojem uređaju ćemo trenirati našu mrežu (`device`). Za velike neurnske mreže se treniranje radi na GPU ili TPU, ali za školske primjere je CPU dovoljan.

Nakon toga instanciramo našu mrežu, i pošaljemo je na uređaj na kojem će se trenirati (`.to(device)`). Također će nam trebati optimizator. U PyTorchu se optimizator nalazi u `optim` podmodulu. Koristiti ćemo osnovni Stohastic Gradient Descent ("SGD").

Metoda `summary(model, input_size, device)` nam ispisuje kako naš model izgleda. Za svaki sloj, ispisuje njegovu dimenziju i od koliko se parametara sastoji taj sloju.

In [None]:
n_epochs = 5
learning_rate = 1
device = 'cpu'

network = Net().to(device)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
summary(network, input_size=(2, ), device=device) # komentar šta to je

Došlo je vrijeme za treniranje. Prvo instanciramo eksperiment za Weights & Biases. `wandb.watch(model, ...)` će pratiti model tijekom treniranja, te će nam dati duboki uvid u proces učenja.

Nakon toga pripremamo liste u kojima ćemo čuvati rezultate tijekom treniranja. Te rezultate ćemo kasnije vizualizirati.

Tad, za svaku epohu, prolazimo kroz svaki mini-skup našeg `DataLoader` skupa za učenje, šaljemo podatke na odgovarajući uređaj, uklanjamo sve gradijente u našoj mreži (ako je nešto ostalo od prije), izvršimo našu mrežu nad tim podacima, računamo pogrešku, provodimo backpropagaciju te izvršavamo korak optimizacije.

Nakon što odradimo korake optimizacije za cijeli skup za učenje, potrebno je evalurati na neviđenim podacima. Radi toga model prebacujemo u `.eval()` stanje, i sa `with torch.no_grad()` određujemo scope u kojem operacije nad mrežom neće računati gradijente. Slanje podataka i evaluacija se radi na isti način kao i tijekom treniranja, a nad rezultatima računamo metrike koje nas zanimaju. Kako se radi o klasifikacijom problemu, u ovom slučaju su to loss i accuracy.

wandb će ispisati link na kojem možete direktno pratiti napredak modela, i gdje možete dobit uvid u vrijednosti gradijenta tijekom treniranja.

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in range(0, n_epochs):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)

        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()

        if epoch_idx == 0:
            if batch_idx == 0:
                print("# Epoha 1, batch 0")
                print("## Sloj 1 u 1. koraku:")
                print("### Tezine")
                print(network.fc1.weight)
                print("### Tezine - Gradient")
                print(network.fc1.weight.grad)

                print("### Bias")
                print(network.fc1.bias)
                print("### Bias - Gradient")
                print(network.fc1.bias.grad)

            if batch_idx == 1:
                print("--------------")
                print("# Epoha 1, batch 1")
                print("## Sloj 1 u 1. koraku:")
                print("### Tezine")
                print(network.fc1.weight)

                print("### Bias")
                print(network.fc1.bias)

        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())


    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

Vizualizirajmo rezultate. `_step` i `_acc` liste uvijek dolaze u paru, gdje `_step` lista opisuje o kojem koraku se radi, dok `_acc` lista sadrži vrijednost naše metrike za taj korak.

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))
plt.plot(train_steps, loss_acc, label="Train loss")
plt.plot(test_steps, test_loss_acc, label="Test loss")

plt.legend()
plt.grid()

Train vrijednosti ima puno, jer se bilježi za svaki mini-skup. Pogledajmo samo test accuracy.

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

To je osnovni princip rada s neuronskim mrežama u PyTorchu. U ostatku demonstracije se koristi isti okvir, samo s različitim parametrima ovisno o tome što želimo vidjeti.

Provjerimo sada kako bi se naš model ponašao ako povećamo `batch_size` na vrijednost 4.


In [None]:
batch_size = 4

tensor_x_train = torch.Tensor(np.dstack([data_x_train, data_y_train]).reshape(number_of_train_samples, 2).astype(np.float32))
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(np.dstack([data_x_test, data_y_test]).reshape(number_of_test_samples, 2).astype(np.float32))
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 5)

    def forward(self, x):
        x = self.fc1(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 5
learning_rate = 1
device = 'cpu'

network = Net().to(device)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
summary(network, input_size=(2, ), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in range(0, n_epochs):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)

        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())

    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))
plt.plot(train_steps, loss_acc, label="Raw train loss")
plt.plot(test_steps, test_loss_acc, label="Raw test loss")

plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

Vidimo drugačije ponašanje! Iz ponašanja lossa vidimo da mreža zapravo ne napreduje, nego većinom "titra" oko neke fiksne vrijednosti. To je često indikator da nam je stopa učenja prevelika. Smanjimo je na 0.01!

In [None]:
batch_size = 4

tensor_x_train = torch.Tensor(np.dstack([data_x_train, data_y_train]).reshape(number_of_train_samples, 2).astype(np.float32))
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(np.dstack([data_x_test, data_y_test]).reshape(number_of_test_samples, 2).astype(np.float32))
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 5)

    def forward(self, x):
        x = self.fc1(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 5
learning_rate = 0.01
device = 'cpu'

network = Net().to(device)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
summary(network, input_size=(2, ), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in range(0, n_epochs):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)

        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())

    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))
plt.plot(train_steps, loss_acc, label="Raw train loss")
plt.plot(test_steps, test_loss_acc, label="Raw test loss")

plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

Vidimo opet drugačije ponašanje. Isprobajmo još manju stopu učenja, 0.0001.

In [None]:
batch_size = 4

tensor_x_train = torch.Tensor(np.dstack([data_x_train, data_y_train]).reshape(number_of_train_samples, 2).astype(np.float32))
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(np.dstack([data_x_test, data_y_test]).reshape(number_of_test_samples, 2).astype(np.float32))
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 5)

    def forward(self, x):
        x = self.fc1(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 5
learning_rate = 0.0001
device = 'cpu'

network = Net().to(device)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
summary(network, input_size=(2, ), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in range(0, n_epochs):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)

        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())


    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))
plt.plot(train_steps, loss_acc, label="Raw train loss")
plt.plot(test_steps, test_loss_acc, label="Raw test loss")


plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

Možemo uočiti da loss pada, ali sporo. Moguće je da naš model nije dovoljan. Ponovimo naš eksperiment sa stopom učenja od 0.01, ali ovaj puta sa dva sloja.

In [None]:
batch_size = 4

tensor_x_train = torch.Tensor(np.dstack([data_x_train, data_y_train]).reshape(number_of_train_samples, 2).astype(np.float32))
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(np.dstack([data_x_test, data_y_test]).reshape(number_of_test_samples, 2).astype(np.float32))
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 2)
        self.fc2 = nn.Linear(2, 5)

    def forward(self, x):
        x = self.fc1(x)
        x = self.fc2(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 5
learning_rate = 0.01
device = 'cpu'

network = Net().to(device)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
summary(network, input_size=(2, ), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in range(0, n_epochs):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)

        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())


    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

Pogledajmo rezultate:

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))
plt.plot(train_steps, loss_acc, label="Train loss")
plt.plot(test_steps, test_loss_acc, label="Test loss")

plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

Jedna od glavnih snaga neuronskih mreža je iskorištavanje nelinearnosti. Provedimo naš eksperiment ponovo, ali ovaj put koristeći sigmoidu kao aktivacijsku funkciju.

In [None]:
batch_size = 4

tensor_x_train = torch.Tensor(np.dstack([data_x_train, data_y_train]).reshape(number_of_train_samples, 2).astype(np.float32))
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(np.dstack([data_x_test, data_y_test]).reshape(number_of_test_samples, 2).astype(np.float32))
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 2)
        self.fc2 = nn.Linear(2, 5)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.fc1(x)
        x = self.sigmoid(x)
        x = self.fc2(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 5
learning_rate = 0.01
device = 'cpu'

network = Net().to(device)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
summary(network, input_size=(2, ), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in range(0, n_epochs):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)

        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()

        if epoch_idx == 0:
            if batch_idx == 10:
                print("# Epoha 1, batch 10")
                print("## Sloj 1 u 1. koraku:")
                print("### Tezine")
                print(network.fc1.weight)
                print("### Tezine - Gradient")
                print(network.fc1.weight.grad)

                print("### Bias")
                print(network.fc1.bias)
                print("### Bias - Gradient")
                print(network.fc1.bias.grad)

            if batch_idx == 11:
                print("--------------")
                print("# Epoha 1, batch 11")
                print("## Sloj 1 u 1. koraku:")
                print("### Tezine")
                print(network.fc1.weight)

                print("### Bias")
                print(network.fc1.bias)

        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())


    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))
plt.plot(train_steps, loss_acc, label="Raw train loss")
plt.plot(test_steps, test_loss_acc, label="Raw test loss")

plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

Sigmoida ima problem - u zasićenju su gradijenti gotovo 0. To je problem kada koristimo gradijentni spust kao optimizacijsku metodu! Probajmo danas najkorišteniju aktivacijsku funkciju: ReLU! ReLU je funkcija koja je obični pravac za x > 0, a 0 za sve vrijednosti manje ili jednake nuli.

In [None]:
batch_size = 4

tensor_x_train = torch.Tensor(np.dstack([data_x_train, data_y_train]).reshape(number_of_train_samples, 2).astype(np.float32))
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(np.dstack([data_x_test, data_y_test]).reshape(number_of_test_samples, 2).astype(np.float32))
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 2)
        self.fc2 = nn.Linear(2, 5)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x, inplace=True)
        x = self.fc2(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 5
learning_rate = 0.01
device = 'cpu'

network = Net().to(device)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
summary(network, input_size=(2, ), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in range(0, n_epochs):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)

        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())


    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

Vizualizirajmo rezultate:

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))
plt.plot(train_steps, loss_acc, label="Train loss")
plt.plot(test_steps, test_loss_acc, label="Test loss")

plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

Provjerimo što se dešava ako modelu sa sigmoid aktivacijom damo više vremena.

Napomena: ovaj model povremeno divergira, zbog problema kojeg smo gore naveli. Unutar 2-3 runa obično bude barem jedan dobri.

In [None]:
batch_size = 4

tensor_x_train = torch.Tensor(np.dstack([data_x_train, data_y_train]).reshape(number_of_train_samples, 2).astype(np.float32))
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(np.dstack([data_x_test, data_y_test]).reshape(number_of_test_samples, 2).astype(np.float32))
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 8)
        self.fc2 = nn.Linear(8, 5)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.fc1(x)
        x = self.sigmoid(x)
        x = self.fc2(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 10
learning_rate = 0.01
device = 'cpu'

network = Net().to(device)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
summary(network, input_size=(2, ), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in range(0, n_epochs):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)

        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())


    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))
plt.plot(train_steps, loss_acc, label="Raw train loss")
# plt.plot(train_steps[int(len(loss_acc) - len(loss_acc) * (1-rolling_average_width_percentage)) - 1:],
#          moving_average(loss_acc, int(len(loss_acc) * rolling_average_width_percentage)),
#          label=f"Smoothed ({rolling_average_width_percentage * 100:.2f}%) train loss")

plt.plot(test_steps, test_loss_acc, label="Raw test loss")
# plt.plot(test_steps[int(len(test_loss_acc) - len(test_loss_acc) * (1-rolling_average_width_percentage)) - 1:],
#          moving_average(test_loss_acc, int(len(test_loss_acc) * rolling_average_width_percentage)),
#          label=f"Smoothed ({rolling_average_width_percentage * 100:.2f}%) train loss")

plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

Napredak! Provjerimo sad što se dešava ako treniramo duže.

In [None]:
batch_size = 4

tensor_x_train = torch.Tensor(np.dstack([data_x_train, data_y_train]).reshape(number_of_train_samples, 2).astype(np.float32))
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(np.dstack([data_x_test, data_y_test]).reshape(number_of_test_samples, 2).astype(np.float32))
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 8)
        self.fc2 = nn.Linear(8, 5)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.fc1(x)
        x = self.sigmoid(x)
        x = self.fc2(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 50
learning_rate = 0.01
device = 'cpu'

network = Net().to(device)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
summary(network, input_size=(2, ), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in range(0, n_epochs):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)

        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())


    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))
plt.plot(train_steps, loss_acc, label="Raw train loss")
plt.plot(test_steps, test_loss_acc, label="Raw test loss")

plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

I konačno, isprobajmo što se dogodi ako dodamo još dva sloja na naš najuspješniji model.

In [None]:
batch_size = 4

tensor_x_train = torch.Tensor(np.dstack([data_x_train, data_y_train]).reshape(number_of_train_samples, 2).astype(np.float32))
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(np.dstack([data_x_test, data_y_test]).reshape(number_of_test_samples, 2).astype(np.float32))
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 16)
        self.fc2 = nn.Linear(16, 8)
        self.fc3 = nn.Linear(8, 8)
        self.fc4 = nn.Linear(8, 5)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x, inplace=True)

        x = self.fc2(x)
        x = F.relu(x, inplace=True)

        x = self.fc3(x)
        x = F.relu(x, inplace=True)

        x = self.fc4(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 50
learning_rate = 0.01
device = 'cpu'

network = Net().to(device)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
summary(network, input_size=(2, ), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in range(0, n_epochs):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)

        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())


    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))
plt.plot(train_steps, loss_acc, label="Raw train loss")
plt.plot(test_steps, test_loss_acc, label="Raw test loss")

plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

Ovaj pristup se čini dobrim i donekle stabilnim. Jedino što nam još preostaje je povećati veličinu mini-skupa. Ponovimo prošli eksperiment sa mini-skupom veličine 20.

In [None]:
batch_size = 20

tensor_x_train = torch.Tensor(np.dstack([data_x_train, data_y_train]).reshape(number_of_train_samples, 2).astype(np.float32))
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(np.dstack([data_x_test, data_y_test]).reshape(number_of_test_samples, 2).astype(np.float32))
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 16)
        self.fc2 = nn.Linear(16, 8)
        self.fc3 = nn.Linear(8, 8)
        self.fc4 = nn.Linear(8, 5)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x, inplace=True)

        x = self.fc2(x)
        x = F.relu(x, inplace=True)

        x = self.fc3(x)
        x = F.relu(x, inplace=True)

        x = self.fc4(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 50
learning_rate = 0.01
device = 'cpu'

network = Net().to(device)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
summary(network, input_size=(2, ), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in range(0, n_epochs):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)

        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())


    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))
plt.plot(train_steps, loss_acc, label="Raw train loss")
# plt.plot(train_steps[int(len(loss_acc) - len(loss_acc) * (1-rolling_average_width_percentage)) - 1:],
#          moving_average(loss_acc, int(len(loss_acc) * rolling_average_width_percentage)),
#          label=f"Smoothed ({rolling_average_width_percentage * 100:.2f}%) train loss")

plt.plot(test_steps, test_loss_acc, label="Raw test loss")
# plt.plot(test_steps[int(len(test_loss_acc) - len(test_loss_acc) * (1-rolling_average_width_percentage)) - 1:],
#          moving_average(test_loss_acc, int(len(test_loss_acc) * rolling_average_width_percentage)),
#          label=f"Smoothed ({rolling_average_width_percentage * 100:.2f}%) train loss")

plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

Vidimo sad da imamo stabilnije treniranje i solidne rezultate. Na istraživaću je sad naći dobar skup hiperparametara za model (arhitektura, uvjeti treniranja i slično) da dobije najbolji mogući rezultat na neviđenom skupu podataka.

## Primjer klasifikacije slike

Za ovaj primjer ćemo imati samo dvije klase. Imamo slike trokuta i kružića, i cilj nam je napraviti neuronsku mrežu koja može razlikovati kružiće od trokuta. Radi toga pišemo funkcije `drawRandomTriangle()` i `drawRandomCircle()`, koje nam vraćaju sliku trokuta i kružnice slučajne veličine i pozicije na slici veličine 128x128px.

In [None]:
def drawRandomTriangle():
    canvas = np.zeros((128, 128, 3))
    pt1 = (np.random.uniform(0, 128, 2).astype(np.uint64))
    pt2 = (np.random.uniform(0, 128, 2).astype(np.uint64))
    pt3 = (np.random.uniform(0, 128, 2).astype(np.uint64))


    triangle_cnt = np.array( [pt1, pt2, pt3] )
    canvas = cv2.drawContours(canvas, [triangle_cnt], 0, (255,255,255), -1)
    return canvas[..., 0].reshape(1, 128, 128)

In [None]:
def drawRandomCircle():
    canvas = np.zeros((128, 128, 3))
    radius = int(np.random.uniform(10, 40))
    pt = tuple(np.random.uniform(0+radius, 128-radius, 2).astype(np.uint64))

    canvas = cv2.circle(canvas, pt, radius, (255, 255, 255), -1)
    return canvas[..., 0].reshape(1, 128, 128)

Skup podataka se generira vrlo slično kao u prošlom zadatku, s razlikom da sada koristimo funkcije za trokut i kružić, umjesto funkcije za slučajni uzorak iz normalne distribucije.

In [None]:
num_of_samples_per_class = 128
train_test_ratio = 0.8

number_of_train_samples = int(2 * num_of_samples_per_class * train_test_ratio)
number_of_test_samples = 2 * num_of_samples_per_class - number_of_train_samples

data_samples_train = []
labels_train = []

data_samples_test = []
labels_test = []

for idx in range(0, number_of_train_samples):
    data_samples_train.append(drawRandomTriangle())
    labels_train.append(0)

    data_samples_train.append(drawRandomCircle())
    labels_train.append(1)


for idx in range(0, number_of_test_samples):
    data_samples_test.append(drawRandomTriangle())
    labels_test.append(0)

    data_samples_test.append(drawRandomCircle())
    labels_test.append(1)

Vizualizirajmo naš skup podataka.

In [None]:
plt.figure(figsize=(32, 32))
for idx in range(0, 100):
    plt.subplot(10, 10, idx + 1)
    plt.imshow(data_samples_train[idx][0, ...])
    plt.axis('off')
    plt.colorbar()
    plt.title(labels_train[idx])

Izrada `DataLoader` je ista kao u prethodnim koracima. Odlučili smo se na veličinu mini-skupa od 16.

In [None]:
batch_size = 16

tensor_x_train = torch.Tensor(data_samples_train)
tensor_y_train = torch.Tensor(labels_train).to(dtype=torch.int64)

train_dataset = TensorDataset(tensor_x_train, tensor_y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


tensor_x_test = torch.Tensor(data_samples_test)
tensor_y_test = torch.Tensor(labels_test).to(dtype=torch.int64)

test_dataset = TensorDataset(tensor_x_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

Ovo je jedan predloženi model. Ovdje se možete poigrati s parametrima da vidite utjecaj na rezultat.

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 4, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(4, 8, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(8, 4, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(1024, 2)

        self.maxpool = nn.MaxPool2d(kernel_size=2)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.conv2(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.conv3(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = x.view(-1, 1024)

        x = self.fc1(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 10
learning_rate = 0.01
device = 'cpu'

network = Net().to(device)
optimizer = optim.Adam(network.parameters(), lr=learning_rate)
summary(network, input_size=(1, 128, 128), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in tqdm.tqdm(range(0, n_epochs)):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)
        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())


    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

Vizualizirajmo:

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))

plt.plot(train_steps, loss_acc, label="Raw train loss")
plt.plot(test_steps, test_loss_acc, label="Raw test loss")

plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

Ovdje možemo primjetiti jedan zanimljivi fenomen: **overfitting**. Imamo problem da iz početka train i test loss padaju, a nakon nekog vremena test loss krene rasti, dok train loss i dalje pada. To znači da naša mreža više ne uči neki generalizirani način za prepoznavanje, nego **pamti konkretne primjere**. To nije dobro, i overfitting želimo po svaku cijenu spriječiti.

Da bi spriječili overfitting, možemo mrežu produbiti. Na taj način smanjujemo prostornu veličinu (visina i širina tenzora), a povećavamo dubinu (broj filtera u zadnjoj konvoluciji; "broj značajki"). Proširujemo model sa dvije dodatne konvolucije, i povećavamo broj filtera u svim konvolucijama.

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(1024, 2)

        self.maxpool = nn.MaxPool2d(kernel_size=2)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.conv2(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.conv3(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.conv4(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.conv5(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = x.view(-1, 1024)

        x = self.fc1(x)

        return F.log_softmax(x, dim=1)

In [None]:
n_epochs = 10
learning_rate = 0.01
device = 'cpu'

network = Net().to(device)
optimizer = optim.Adam(network.parameters(), lr=learning_rate)
summary(network, input_size=(1, 128, 128), device=device)

In [None]:
run = wandb.init(project="oi_demo", reinit=True)
wandb.watch(network, log_freq=1)

loss_acc = []
test_loss_acc = []
test_accuracy_acc = []
train_steps = []
test_steps = []
current_step = 0

for epoch_idx in tqdm.tqdm(range(0, n_epochs)):
    network.train()

    per_epoch_trainloss = []
    for batch_idx, (data, target) in enumerate(train_dataloader):
        data = data.to(device)
        target = target.to(device)
        network.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.item())
        current_step += 1
        train_steps.append(current_step)
        per_epoch_trainloss.append(loss.item())


    network.eval()
    with torch.no_grad():
        test_loss = 0
        correct = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)

            output = network(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()

    test_loss /= len(test_dataloader.dataset)
    accuracy = 100. * correct / len(test_dataloader.dataset)

    test_loss_acc.append(test_loss)
    test_accuracy_acc.append(accuracy)
    test_steps.append(current_step)

    wandb.log({
        'test_loss': test_loss,
        'test_accuracy': accuracy,
        'train_loss': np.mean(per_epoch_trainloss)
    })

run.finish()

Vizualizirajmo:

In [None]:
rolling_average_width_percentage = 0.05
plt.figure(figsize=(16, 4))

plt.plot(train_steps, loss_acc, label="Raw train loss")
plt.plot(test_steps, test_loss_acc, label="Raw test loss")

plt.legend()
plt.grid()

In [None]:
plt.figure(figsize=(16, 4))
plt.plot(test_steps, test_accuracy_acc, label="Test accuracy")
plt.ylabel("Accuracy [%]")
plt.xlabel("Training step")
plt.grid()
plt.legend()

Vidimo da mreža relativno brzo počinje generalizirati. Nema značajne razlike u trendu između train i test lossa, a accuracy na test skupu je visok, unatoč tome što su to neviđeni podaci za model. S ovime smo napravili model koji je sposoban razlikovati slike kružića od slika trokuta.