In [1]:
import torch
import random
import numpy as np
import tensorflow.keras.datasets as datasets

2023-09-18 23:43:05.020457: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [28]:
def MNIST_data():
    (x_train, y_train), (x_test, y_test) = datasets.mnist.load_data()
    x_train = x_train.reshape((60000, 28, 28, 1))
    x_train = x_train / 255.0

    return x_train, y_train

def MNIST_test_data():
    (x_train, y_train), (x_test, y_test) = datasets.mnist.load_data()
    x_test = x_test.reshape((10000, 28, 28, 1))
    x_test = x_test / 255.0

    return x_test, y_test

In [3]:
def data_to_noisy(labels, rate):
    data_cnt = len(labels)
    noisy_cnt = int(rate*data_cnt)
    
    noisy_index = random.sample(range(0, data_cnt), noisy_cnt)
    for index in noisy_index:
        now_label = labels[index]
        new_label = random.randint(0,9)
        while (new_label == now_label):
            new_label = random.randint(0,9)
        labels[index] = new_label
    
    noisy_index.sort()
    
    return labels, noisy_index

In [4]:
class CNN(torch.nn.Module):

    def __init__(self):
        super(CNN, self).__init__()
        self.keep_prob = 0.5
        self.layer1 = torch.nn.Sequential(
            torch.nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = torch.nn.Sequential(
            torch.nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer3 = torch.nn.Sequential(
            torch.nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=2, stride=2, padding=1))
        self.fc1 = torch.nn.Linear(4 * 4 * 128, 625, bias=True)
        torch.nn.init.xavier_uniform_(self.fc1.weight)
        self.layer4 = torch.nn.Sequential(
            self.fc1,
            torch.nn.ReLU(),
            torch.nn.Dropout(p=1 - self.keep_prob))
        self.fc2 = torch.nn.Linear(625, 10, bias=True)
        torch.nn.init.xavier_uniform_(self.fc2.weight)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = out.view(out.size(0), -1)
        out = self.layer4(out)
        out = self.fc2(out)
        return out

In [25]:
epochs = 10
batch_size = 100
noisy_rate = 0.3

true_loss = []
noisy_loss = []
true_loss_10 = []
noisy_loss_10 = []

true_labels = np.eye(10)
X_noisy, Y_ = MNIST_data()
X, Y = MNIST_data()
X = np.array_split(X, len(X) / batch_size)
Y = np.array_split(Y, len(Y) / batch_size)

model = CNN()
model2 = CNN()
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
optimizer2 = torch.optim.Adam(model2.parameters(), lr=0.01)

model.train()
model2.train()

cnt = 0
Y_noisy, noisy_index = data_to_noisy(Y_, noisy_rate)
noisy = noisy_index.pop(0)
for x_train, y_train in zip(X_noisy, Y_noisy):
    x_train = torch.Tensor(x_train.reshape(1, 1, 28, 28))
    y_train = torch.Tensor(true_labels[y_train].reshape(1, 10))

    optimizer.zero_grad()
    y_pred = model(x_train)
    loss = loss_fn(y_pred, y_train)
    loss.backward()
    optimizer.step()

    if cnt == noisy:
        noisy_loss.append(loss.item())
        if len(noisy_index):
            noisy = noisy_index.pop(0)
    else: 
        true_loss.append(loss.item())

    cnt += 1

#============================================================================
for epoch in range(epochs):
    for x_train, y_train in zip(X, Y):
        x_train = torch.Tensor(x_train.reshape(batch_size, 1, 28, 28))
        y_train = torch.Tensor(true_labels[y_train].reshape(batch_size, 10))

        optimizer2.zero_grad()
        y_pred = model2(x_train)
        loss = loss_fn(y_pred, y_train)
        loss.backward()
        optimizer.step()
#============================================================================
cnt = 0
Y_noisy, noisy_index = data_to_noisy(Y_, noisy_rate)
noisy = noisy_index.pop(0)
for x_train, y_train in zip(X_noisy, Y_noisy):
    x_train = torch.Tensor(x_train.reshape(1, 1, 28, 28))
    y_train = torch.Tensor(true_labels[y_train].reshape(1, 10))

    optimizer2.zero_grad()
    y_pred = model2(x_train)
    loss = loss_fn(y_pred, y_train)
    loss.backward()
    optimizer.step()

    if cnt == noisy:
        noisy_loss_10.append(loss.item())
        if len(noisy_index):
            noisy = noisy_index.pop(0)
    else: 
        true_loss_10.append(loss.item())

    cnt += 1

In [26]:
print(sum(true_loss) / len(true_loss))
print(sum(noisy_loss) / len(noisy_loss))
print(sum(true_loss_10) / len(true_loss_10))
print(sum(noisy_loss_10) / len(noisy_loss_10))

2.3107564985837254
2.3104733042120933
2.3109339704116185
2.3074475865032937


In [29]:
x_test, y_test = MNIST_test_data()
model.eval()
model2.eval()
with torch.no_grad():
        x_test = torch.Tensor(x_test.reshape(len(x_test), 1, 28, 28))
        y_test = torch.Tensor(y_test)

        prediction = model(x_test)
        correct_prediction = torch.argmax(prediction, 1) == y_test
        accuracy = correct_prediction.float().mean()
        print('Accuracy:', accuracy.item())
        
        prediction = model2(x_test)
        correct_prediction = torch.argmax(prediction, 1) == y_test
        accuracy = correct_prediction.float().mean()
        print('Accuracy:', accuracy.item())

Accuracy: 0.09740000218153
Accuracy: 0.09570000320672989
