In [None]:
import matplotlib.pyplot as plt
import numpy
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

In [None]:
# hyperparams
inp_dim = 28 * 28
hidden = 256
out_dim = 10
device_id = 0
device = 'cpu' if device_id == -1 else f'cuda:{device_id}' # 'cuda:0' id GPU
n_epochs = 10
batch_size = 128

In [None]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5), (0.5)),
     ])

# зашружаем тренировочный сет
dataset_train = datasets.MNIST('.', 
                               train=True,            
                               download=True, 
                               transform=transform)

dataset_test = datasets.MNIST('.', 
                              train=False,
                              download=True, 
                              transform=transform)

In [None]:
plt.imshow(dataset_train.data[1].detach().numpy(), cmap='gray')
plt.show()

In [None]:
figure = plt.figure(figsize=(10, 8))
cols, rows = 5, 5
for i in range(1, cols * rows + 1):
 sample_idx = torch.randint(len(dataset_train), size=(1,)).item()
 img, label = dataset_train[sample_idx]
 figure.add_subplot(rows, cols, i)
 plt.title(label)
 plt.axis("off")
 plt.imshow(img.squeeze(), cmap="gray")
plt.show()

In [None]:
plt.imshow(dataset_test.data[1].detach().numpy())
plt.show()

In [None]:
class LinearModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_p=0.1):
        super().__init__()
        self.linear1 = nn.Linear(input_dim, hidden_dim)
        # custom init nn.init.xavier_uniform(self.linear1.weight) 
        self.do = nn.Dropout(dropout_p)
        self.linear2 = nn.Linear(hidden_dim, output_dim)
        self.activ = nn.ReLU()

    def forward(self, x):
        x = self.linear1(x)
        x = self.activ(x)
        x = self.do(x)
        x = self.linear2(x)

        return x

In [None]:
dataset_train = datasets.MNIST('.', train=True, download=True)

def collate_fn(data: list):
    # data = [(pic, target)...]
    pics = []
    targets = []
    for item in data:
        pics.append(numpy.array(item[0]))
        targets.append(item[1])
    pics = torch.from_numpy(numpy.array(pics)).float() / 255
    pics = pics.view(pics.size(0), -1) # mtx 28x28 to vec 7xx
    targets = torch.from_numpy(numpy.array(targets))

    return {
        'data': pics,
        'target': targets,
    }

In [None]:
dataloader_train = DataLoader(dataset_train, 
                        batch_size, 
                        shuffle=True, 
                        collate_fn=collate_fn,
                        drop_last = True
                        )

In [None]:
dataset_test = datasets.MNIST('.', train=False, download=True)

In [None]:
dataloader_test = DataLoader(dataset_test, 
                        batch_size, 
                        shuffle=True, 
                        collate_fn=collate_fn,
                        drop_last = True,
                        )

In [None]:
model = LinearModel(inp_dim, hidden, out_dim).to(device)
optim = torch.optim.Adam(model.parameters())
loss_func = nn.CrossEntropyLoss()

In [None]:
for epoch in range(n_epochs):
    # train model
    model.train()
    for i, batch in enumerate(dataloader_train):
        optim.zero_grad()
        batch['target'] = batch['target'].type(torch.LongTensor)
        predict = model(batch['data'].to(device))
        loss = loss_func(predict, batch['target'].to(device))
        loss.backward()
        optim.step()
        if i % 200 == 0:
            print(f'epoch: {epoch}, step: {i}, loss: {loss.item()}')

    # test model
    model.eval()
    for i, batch in enumerate(dataloader_test):
              batch['target'] = batch['target'].type(torch.LongTensor)
              inputs, labels = batch
              loss_test = 0
              with torch.no_grad():
                  predict = model(batch['data'].to(device))
                  loss_test += loss_func(predict, batch['target'].to(device))
    print('test loss:', (loss_test / i).item())
    
    #save every epoch
    torch.save(model.state_dict(), f'./chkpt_cv1_{epoch}.pth')

In [None]:
i=1
image = dataset_test.data[i].detach().numpy()
data = dataset_test.data[i].unsqueeze(0).view(1, -1).to(device).float()
target = dataset_test.targets[i].tolist()
model.eval()
plt.imshow(image)
plt.show()
predict = torch.argmax(model(data)).squeeze().detach()
print(f"predict: {predict}")
print(f"true:    {target}")

In [None]:
class LinearModel1(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_p=0.1):
        super().__init__()
        self.linear1 = nn.Linear(input_dim, hidden_dim)
        self.do = nn.Dropout(dropout_p)
        self.act = nn.ReLU()
        self.linear2 = nn.Linear(hidden_dim, hidden_dim // 2)
        self.linear3 = nn.Linear(hidden_dim // 2, output_dim)

    def forward(self, X):
        X = self.act(self.do(self.linear1(X)))
        X = self.act(self.do(self.linear2(X)))
        X = self.linear3(X)

        return X

In [None]:
model1 = LinearModel1(inp_dim, hidden, out_dim).to(device)
optim = torch.optim.Adam(model1.parameters())
loss_func = nn.CrossEntropyLoss()

In [None]:
for epoch in range(n_epochs):
    # train model
    model1.train()
    for i, batch in enumerate(dataloader_train):
        optim.zero_grad()
        batch['target'] = batch['target'].type(torch.LongTensor)
        predict = model1(batch['data'].to(device))
        loss = loss_func(predict, batch['target'].to(device))
        loss.backward()
        optim.step()
        if i % 200 == 0:
            print(f'epoch: {epoch}, step: {i}, loss: {loss.item()}')

    # test model
    model1.eval()
    for i, batch in enumerate(dataloader_test):
              inputs, labels = batch
              batch['target'] = batch['target'].type(torch.LongTensor)
              loss_test = 0
              with torch.no_grad():
                  predict = model1(batch['data'].to(device))
                  loss_test += loss_func(predict, batch['target'].to(device))
    print('test loss:', loss_test.item() / i)
    
    #save every epoch
    torch.save(model.state_dict(), f'./chkpt_cv1_{epoch}.pth')

In [None]:
i=1
image = dataset_test.data[i].detach().numpy()
data = dataset_test.data[i].unsqueeze(0).view(1, -1).to(device).float()
target = dataset_test.targets[i].tolist()
model1.eval()
plt.imshow(image)
plt.show()
predict = torch.argmax(model1(data)).squeeze().detach()
print(f"predict: {predict}")
print(f"true:    {target}")

In [None]:
# hyperparams
input_ch = 1
hidden_ch = 128
out_dim = 10
device_id = 0
device = 'cpu' if device_id == -1 else f'cuda:{device_id}'
n_epochs = 10
batch_size = 128

In [None]:
class ConvDO(nn.Module):
    def __init__(self, input_ch, output_ch, kernel_size, stride, padding, 
                 dropout_p=0.1):
        super().__init__()
        self.conv = nn.Conv2d(
            input_ch, 
            output_ch, 
            kernel_size=kernel_size,
            stride=stride,
            padding=padding, 
        )
        # TODO добавить батч норм
        self.do = nn.Dropout(dropout_p)

    def forward(self, x):

        return self.do(self.conv(x))

class ConvModel(nn.Module):
    def __init__(self, input_ch, hidden_ch, output_dim, dropout_p=0.1):
        super().__init__()
        self.conv1 = ConvDO(input_ch, hidden_ch, 5, 2, 2) # уменьшим размер выходной фичматрицы в 2 раза
        # TODO еще уменьшить размер фичматрицы
        self.conv2 = ConvDO(hidden_ch, hidden_ch, 3, 1, 1)
        # TODO увеличить число выходных каналов
        self.conv3 = ConvDO(hidden_ch, 1, 3, 1, 1)
        self.linear = nn.Linear(
            14 * 14, # TODO поправить при изменении числа каналов и размера фич матриц
            output_dim,
        )  
        self.activ = nn.ReLU()    

    def forward(self, x):
        x = self.activ(self.conv1(x))
        x = self.activ(self.conv2(x))
        x = self.activ(self.conv3(x))
        x = self.linear(x.view(x.size(0), -1))

        return x

In [None]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5), (0.5)),
     ])

# зашружаем тренировочный сет
dataset_train = datasets.MNIST('.', 
                               train=True,            
                               download=True, 
                               transform=transform)

dataset_test = datasets.MNIST('.', 
                              train=False,
                              download=True, 
                              transform=transform)
trainloader = torch.utils.data.DataLoader(dataset_train, 
                                          batch_size=batch_size,
                                          shuffle=True, 
                                          num_workers=2, 
                                          drop_last = True,)

testloader = torch.utils.data.DataLoader(dataset_test, 
                                          batch_size=batch_size,
                                          shuffle=True, 
                                          num_workers=2, 
                                          drop_last = True,)

In [None]:
model_conv = ConvModel(input_ch, hidden_ch, out_dim).to(device)
optim = torch.optim.Adam(model_conv.parameters())
loss_func = nn.CrossEntropyLoss()

In [None]:
for epoch in range(n_epochs):
    #train model
    model_conv.train()
    for i, batch in enumerate(trainloader):
        inputs, labels = batch
        optim.zero_grad()

        predict = model_conv(inputs.to(device))
        loss = loss_func(predict, labels.to(device))
        loss.backward()
        optim.step()
        if i % 200 == 0:
            print(f'epoch: {epoch}, step: {i}, loss: {loss.item()}')
    
    #test model
    model_conv.eval()
    for i, batch in enumerate(testloader):
        inputs, labels = batch
        loss_test = 0
        with torch.no_grad():
            predict = model_conv(inputs.to(device))
            loss_test += loss_func(predict, labels.to(device))
    
    print('test loss:', loss_test.item() / i)
    
    #save every epoch
    torch.save(model_conv.state_dict(), f'./chkpt_cv1_conv_{epoch}.pth')

In [None]:
i=1
image = dataset_test.data[i].detach().numpy()
data = dataset_test.data[i].unsqueeze(0).unsqueeze(0).to(device).float()
target = dataset_test.targets[i].tolist()
model_conv.eval()
plt.imshow(image)
plt.show()
predict = torch.argmax(model_conv(data)).squeeze().detach()
print(f"predict: {predict}")
print(f"true:    {target}")

In [None]:
class ConvDO1(nn.Module):
    def __init__(self, input_ch, output_ch, kernel_size, stride, padding, 
                 dropout_p=0.1):
        super().__init__()
        self.conv = nn.Conv2d(
            input_ch, 
            output_ch, 
            kernel_size=kernel_size,
            stride=stride,
            padding=padding, 
        )
        self.do = nn.Dropout(dropout_p)

    def forward(self, x):
                
        return self.do(self.conv(x))

class ConvModel1(nn.Module):
    def __init__(self, input_ch, hidden_ch, output_dim, dropout_p=0.1):
        super().__init__()
        self.conv1 = ConvDO1(input_ch, hidden_ch, 5, 2, 2) # уменьшим размер выходной фичматрицы в 2 раза 14x14
        self.pool = nn.MaxPool2d(kernel_size=2) # TODO еще уменьшить размер фичматрицы 7x7
        self.conv2 = ConvDO1(hidden_ch, hidden_ch, 3, 1, 1)
        self.conv3 = ConvDO1(hidden_ch, 10, 3, 1, 1) # TODO увеличить число выходных каналов
        self.linear1 = nn.Linear(
            10 * 7 * 7, # TODO поправить при изменении числа каналов и размера фич матриц
            200
        )  
        self.linear2 = nn.Linear(200, output_dim)
        self.activ = nn.ReLU()    
        self.bn = nn.BatchNorm2d(hidden_ch) # TODO добавить батч норм

    def forward(self, x):
        x = self.activ(self.conv1(x))
        x = self.pool(x)
        x = self.activ(self.conv2(x))
        x = self.bn(x)
        x = self.activ(self.conv3(x))
        x = self.linear1(x.view(x.size(0), -1))
        x = self.activ(x)
        x = self.linear2(x)
        
        return x

In [None]:
model_conv1 = ConvModel1(input_ch, hidden_ch, out_dim).to(device)
optim = torch.optim.Adam(model_conv1.parameters())
loss_func = nn.CrossEntropyLoss()

In [None]:
for epoch in range(n_epochs):
    #train model
    model_conv1.train()
    for i, batch in enumerate(trainloader):
        inputs, labels = batch
        optim.zero_grad()

        predict = model_conv1(inputs.to(device))
        loss = loss_func(predict, labels.to(device))
        loss.backward()
        optim.step()
        if i % 200 == 0:
            print(f'epoch: {epoch}, step: {i}, loss: {loss.item()}')
    
    #test model
    model_conv1.eval()
    for i, batch in enumerate(testloader):
        inputs, labels = batch
        loss_test = 0
        with torch.no_grad():
            predict = model_conv1(inputs.to(device))
            loss_test += loss_func(predict, labels.to(device))
    
    print('test loss:', loss_test / i)
    
    #save every epoch
    torch.save(model_conv1.state_dict(), f'./chkpt_cv1_conv_{epoch}.pth')

In [None]:
i=1
image = dataset_test.data[i].detach().numpy()
data = dataset_test.data[i].unsqueeze(0).unsqueeze(0).to(device).float()
target = dataset_test.targets[i]
model_conv1.eval()
plt.imshow(image)
plt.show()
predict = torch.argmax(model_conv1(data)).squeeze().detach()
print(f"predict: {predict}")
print(f"true:    {target}")

In [None]:
figure = plt.figure(figsize=(10, 8))
cols, rows = 5, 5
for i in range(1, cols * rows + 1):
 sample_idx = torch.randint(len(dataset_test), size=(1,)).item()
 img, label = dataset_test[sample_idx]
 data = dataset_test.data[sample_idx].unsqueeze(0).unsqueeze(0).to(device).float()
 predict = torch.argmax(model_conv1(data)).squeeze().detach().item()
 figure.add_subplot(rows, cols, i)
 plt.title(predict)
 plt.axis("off")
 plt.imshow(img.squeeze(), cmap="gray")
plt.show()