In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [None]:
import tarfile
from tqdm import tqdm
import os
path = './'
os.makedirs(os.path.join(path, 'Convolutionalnet', 'data'), exist_ok=True)
root_dir = os.path.join(path, 'Convolutionalnet')


!pip3 install --upgrade gdown --quiet
!gdown 1z3B1GR7UtHZGrqNjUaep6thHwrN3IYSI

tar = tarfile.open("data.tar.gz", "r:gz")
total_size = sum(f.size for f in tar.getmembers())
with tqdm(total=total_size, unit="B", unit_scale=True, desc="Extracting tar.gz file") as pbar:
    for member in tar.getmembers():
        tar.extract(member, os.path.join(root_dir, 'data'))
        pbar.update(member.size)
tar.close()


Downloading...
From (original): https://drive.google.com/uc?id=1z3B1GR7UtHZGrqNjUaep6thHwrN3IYSI
From (redirected): https://drive.google.com/uc?id=1z3B1GR7UtHZGrqNjUaep6thHwrN3IYSI&confirm=t&uuid=685706d2-803c-4552-b221-99bae2dd2d23
To: /content/data.tar.gz
100% 423M/423M [00:11<00:00, 36.4MB/s]


Extracting tar.gz file: 100%|██████████| 526M/526M [00:17<00:00, 29.3MB/s]


In [None]:
from torch.utils.data import Dataset
from PIL import Image
root_dir = os.path.join(path, 'Convolutionalnet/data')
class MiniPlaces(Dataset):
    def __init__(self, root_dir, split, transform=None, label_dict=None):
        assert split in ['train', 'val', 'test']
        self.root_dir = root_dir
        self.split = split
        self.transform = transform
        self.filenames = []
        self.labels = []
        self.label_dict = label_dict if label_dict is not None else {}
        if split == "train" or split == "val":
            with open(os.path.join(root_dir, ("train" if self.split == "train" else "val") + ".txt")) as f:
                for line in f:
                    line = line.rstrip().split()
                    n = int(line[0][-12:-4])
                    if n <= 900:
                        self.filenames.append(os.path.join(line[0]))
                        self.labels.append(int(line[1]))
        if label_dict is None and split == "train":
            with open(os.path.join(root_dir, "train.txt")) as f:
                num = -1
                for line in f:
                    line = line.rstrip().split()
                    if int(line[1]) > num:
                        num += 1
                        self.label_dict.update({int(line[1]): line[0][8:line[0].find("/", 8)]})
        if split == "test":
            self.labels = os.listdir(os.path.join(root_dir, "images", "test"))
            self.filenames = ["test/" + i for i in self.labels]

    def __len__(self):
        dataset_len = len(self.labels)
        return dataset_len

    def __getitem__(self, idx):
        image = Image.open(os.path.join(self.root_dir, "images", self.filenames[idx]))
        if not self.transform is None:
            image = self.transform(image)
        label = self.labels[idx]
        return image, label

from torchvision import transforms

data_transform = transforms.Compose([
    transforms.Resize((64,64)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],[0.229, 0.224, 0.225])
])

data_root = os.path.join(root_dir, 'data')
miniplaces_train = MiniPlaces(data_root, split='train', transform=data_transform)
miniplaces_val = MiniPlaces(
    data_root, split='val',
    transform=data_transform,
    label_dict=miniplaces_train.label_dict)


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

Using device: cuda. Good to go!


In [None]:
def train(model, train_loader, val_loader, optimizer, criterion, device, num_epochs):
    model = model.to(device)
    best_acc = 0
    flag = False
    for epoch in range(num_epochs):
        model.train()
        with tqdm(total=len(train_loader), desc=f'Epoch {epoch + 1}/{num_epochs}') as pbar:
            for inputs, labels in train_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()
                logits = model(inputs)
                loss = criterion(logits, labels)
                loss.backward()
                optimizer.step()
                pbar.update(1)
                pbar.set_postfix(loss=loss.item())

        avg_loss, accuracy = evaluate(model, val_loader, criterion, device)
        if best_acc > accuracy:
            if flag:
                print(f'Validation set: Average loss = {avg_loss:.4f}, Accuracy = {accuracy:.4f}')
                break
            else:
                flag = True
        else:
            best_acc = accuracy
            flag = False
        print(f'Validation set: Average loss = {avg_loss:.4f}, Accuracy = {accuracy:.4f}')

def evaluate(model, test_loader, criterion, device):

    model.eval()

    with torch.no_grad():
        total_loss = 0.0
        num_correct = 0
        num_samples = 0

        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            logits = model(inputs)
            loss = criterion(logits, labels)
            total_loss += loss.item()

            _, predictions = torch.max(logits, dim=1)
            num_correct += (predictions == labels).sum().item()
            num_samples += len(inputs)

    avg_loss = total_loss / len(test_loader)
    accuracy = num_correct / num_samples

    return avg_loss, accuracy

def predict(model, test_dataloader):
    out = []
    for i in test_dataloader:
        pic = i[0]
        lab = torch.argmax(model.to('cpu')(pic))
        out.append(lab.item())

    return out

data_transform_flatten = transforms.Compose([data_transform, torch.flatten])

In [None]:
class Conv(nn.Module):
    def __init__(
        self,
        input_channels, conv_hidden_channels, conv_out_channels,
        input_size=(64,64),
        dropout_rate1=0.25, dropout_rate2=0.5,
        fc_out_channels=128, num_classes=100,
        kernel_size=3, stride=1, padding=1):

        super().__init__()

        flatten_size = conv_out_channels * int(((input_size[0] + (2 * padding) - kernel_size) / stride) + 1) * int(((input_size[1] + (2 * padding) - kernel_size) / stride) + 1)

        self.conv1 = nn.Conv2d(input_channels, conv_hidden_channels, kernel_size, stride, padding)
        self.conv2 = nn.Conv2d(conv_hidden_channels, conv_out_channels, kernel_size, stride,(padding, padding))
        self.max_pooling = nn.MaxPool2d(kernel_size,stride, padding)
        self.dropout1 = nn.Dropout(dropout_rate1)
        self.dropout2 = nn.Dropout(dropout_rate2)
        self.fc1 = nn.Linear(flatten_size, fc_out_channels)
        self.fc2 = nn.Linear(fc_out_channels, num_classes)
        self.activation = nn.ReLU()


    def forward(self, x, return_intermediate=False):

        x = self.conv1(x)
        x = self.activation(x)
        x = self.conv2(x)
        x = self.max_pooling(x)
        x = self.dropout1(x)
        x = torch.flatten(x,1)
        x = self.fc1(x)
        x = self.dropout2(x)
        x = self.activation(x)
        x = self.fc2(x)

        return x

In [None]:
conv_train_dataset = MiniPlaces(
    root_dir=data_root, split='train',
    transform=data_transform)
conv_val_dataset = MiniPlaces(
    root_dir=data_root, split='val',
    transform=data_transform)
conv_train_loader = torch.utils.data.DataLoader(
    conv_train_dataset, batch_size=64, num_workers=0, shuffle=True)
conv_val_loader = torch.utils.data.DataLoader(
    conv_val_dataset, batch_size=64, num_workers=0, shuffle=False)

model = Conv(
    input_channels=3, conv_hidden_channels=64, conv_out_channels=128,
    input_size=(64,64),
    dropout_rate1=0.25, dropout_rate2=0.5,
    fc_out_channels=128,
    kernel_size=3, stride=1, padding=1,
    num_classes=len(miniplaces_train.label_dict))

criterion = nn.CrossEntropyLoss()

In [None]:
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum = 0.6)

In [None]:
train(model, conv_train_loader, conv_val_loader, optimizer, criterion, device, num_epochs=1)

Epoch 1/1: 100%|██████████| 1407/1407 [02:34<00:00,  9.13it/s, loss=2.08]


Validation set: Average loss = 3.2352, Accuracy = 0.2300
