In [1]:
import os
import time
import numpy as np
from numpy.random import RandomState
from PIL import Image
from tensorflow.keras.utils import Progbar
import torch
import torch.nn.functional as F
import torch.optim
from torchvision import datasets, transforms
import kymatio.datasets as scattering_datasets
from contourlet_cnn import ContourletCNN

In [2]:
# Train and test
def train(model, device, train_loader, optimizer, epoch, pbar):
    train_loss = 0
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output, target.long())
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        pbar.update(batch_idx, values=[("loss", loss.item())])

            
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.cross_entropy(output, target.long(), reduction='sum').item() # sum up batch loss
            pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    return test_loss

## Training settings and loading training data

In [3]:
###### Training settings
n_epochs = 10
val_size = 0.2
lr = 0.001
seed = 2021
img_dim = (224, 224)
batch_size = 4

# Random seed
torch.manual_seed(seed)
np.random.seed(seed)

In [4]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

# DataLoaders
if use_cuda:
    num_workers = 1
    pin_memory = True
else:
    num_workers = 1
    pin_memory = False

normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])

# CIFAR 10 data
cifar_data = datasets.CIFAR10(root=scattering_datasets.get_dataset_dir('CIFAR'), train=True, transform=transforms.Compose([
        transforms.RandomCrop(224, 224),
        transforms.ToTensor(),
        normalize,
    ]), download=True)
train_cifar_data = datasets.CIFAR10(root=scattering_datasets.get_dataset_dir('CIFAR'), train=True, transform=transforms.Compose([
        transforms.RandomCrop(224, 224),
        transforms.ToTensor(),
        normalize,
    ]), download=True)
test_cifar_data = datasets.CIFAR10(root=scattering_datasets.get_dataset_dir('CIFAR'), train=False, transform=transforms.Compose([
        transforms.RandomCrop(224, 224),
        transforms.ToTensor(),
        normalize,
    ]), download=True)

# Extract a subset of X samples per class
prng = RandomState(seed)
permuted = prng.permutation(np.arange(0, 5000))[0:50]
train_random_permute = permuted[0:25]
test_random_permute = permuted[25:50]
train_indx = np.concatenate([np.where(np.array(cifar_data.targets) == classe)[0][train_random_permute] for classe in range(0, 10)])
test_indx = np.concatenate([np.where(np.array(cifar_data.targets) == classe)[0][test_random_permute] for classe in range(0, 10)])

train_cifar_data.data, train_cifar_data.targets = cifar_data.data[train_indx], list(np.array(cifar_data.targets)[train_indx])
test_cifar_data.data, test_cifar_data.targets = cifar_data.data[test_indx], list(np.array(cifar_data.targets)[test_indx])

train_loader = torch.utils.data.DataLoader(train_cifar_data,
                                           batch_size=batch_size, shuffle=True, num_workers=num_workers,
                                           pin_memory=pin_memory)
test_loader = torch.utils.data.DataLoader(test_cifar_data,
                                           batch_size=batch_size, shuffle=True, num_workers=num_workers,
                                           pin_memory=pin_memory)

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified


## Initialize Contourlet-CNN model

In [5]:
model = ContourletCNN(input_dim=(3, 224, 224), num_classes=10, variant="SSF", spec_type="all").to(device)
print(model)

ContourletCNN(
  (conv_1): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv_2): Conv2d(88, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv_3): Conv2d(88, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv_4): Conv2d(128, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv_5): Conv2d(152, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv_6): Conv2d(256, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv_7): Conv2d(268, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv_8): Conv2d(256, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv_9): Conv2d(256, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (gap): AvgPool2d(kernel_size=7, stride=7, padding=0)
  (fc_1): Linear(in_features=128, out_features=2048, bias=True)
  (fc_2): Linear(in_features=2216, out_features=10, bias=True)
)


In [6]:
model_parameters = filter(lambda p: p.requires_grad, model.parameters())
params = sum([np.prod(p.size()) for p in model_parameters])
print("Number of parameters", params)

Number of parameters 3031386


## Start training

In [7]:
# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=lr, betas=(0.9, 0.999), 
                             eps=1e-08, weight_decay=0, amsgrad=False)

start_time = time.perf_counter()
for epoch in range(n_epochs):
    n_batches = len(train_loader)
    pbar = Progbar(target=n_batches)
    print(f'Epoch {epoch+1}/{n_epochs}')
    train(model, device, train_loader, optimizer, epoch+1, pbar)
    test(model, device, test_loader)
    print()
end_time = time.perf_counter()
print(end_time - start_time, "seconds")

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
920.300196266995 seconds
