In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F


import numpy as np

In [2]:
def getdev():
    return 'cpu'
#     dev = 'cuda' if torch.cuda.is_available() else 'cpu'
#     return dev

class accuracy:
    """Returns count of correct classes.
    
    output: predicted probability of each class
    target: ground truth class label (index value of the correct class)
    """
    
    def __init__(self, reduction = None):
        super().__init__()
        self.reduction = reduction


    def __repr__(self):
        return 'Accuracy'

    def __str__(self):
        return 'Accuracy'
    
    def __call__(self, output, target_classes):
        _, predicted_class = torch.max(output.data, -1)
        if (target_classes.shape != predicted_class.shape):
            print('Warning: predicted_class shape does not match target_class shape')
            print('predicted_class shape:', predicted_class.shape, ' target_classes shape:', target_classes.shape)
        if (self.reduction == 'sum'):
            return (predicted_class == target_classes).sum()
        else:
            return (predicted_class == target_classes).sum() / torch.numel(target_classes)

In [3]:
# Testing functions

def evaluate(evaluation_models, dataloader, metrics, dev = None):
    if dev is None:
        dev = getdev()
    if not isinstance(evaluation_models, list):
        evaluation_models = [evaluation_models]
    for m in evaluation_models:
        m.eval()

    loss_eval = np.stack([np.zeros_like(metrics, dtype = float) for m in evaluation_models])
    count_eval = np.stack([np.zeros_like(metrics, dtype = float) for m in evaluation_models])

    with torch.no_grad():
        # loop through batches first, since I think it takes longer to create batches
        for batch_num, (batch_in, batch_target) in enumerate(dataloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            batch_in, batch_target = batch_in.to(dev), batch_target.to(dev)

            for model_num, model in enumerate(evaluation_models, 0):
                output = model(batch_in)

                for i, m in enumerate(metrics):
                    loss_eval[model_num, i] += m(output, batch_target)
                    count_eval[model_num, i] += np.prod(batch_target.shape)

    loss_eval = loss_eval/count_eval
    return loss_eval


In [4]:
def _MNIST(transform):
    data = torchvision.datasets.MNIST(root='./data', train=True,
                                            download=True, transform=transform)
    train_set, val_set = torch.utils.data.random_split(data, [50000, 10000])

    test_set = torchvision.datasets.MNIST(root='./data', train=False,
                                        download=True, transform=transform)
    return train_set, val_set, test_set

def MNIST():
    input_size = (1, 28, 28)

    transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize(0.5, 0.5)])

    return _MNIST(transform) + (input_size,)

In [5]:
def load_data(dataset_name, params):
    batch_size = params['batch_size']

    if dataset_name == 'MNIST':
        train_set, val_set, test_set, input_size = MNIST()
    else:
        raise(Exception("Unknown dataset name: " + dataset_name))

    train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size,
                                            shuffle=True, num_workers=6, pin_memory=torch.cuda.is_available())
    val_loader = torch.utils.data.DataLoader(val_set, batch_size=batch_size,
                                            shuffle=True, num_workers=6, pin_memory=torch.cuda.is_available())

    test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size,
                                            shuffle=False, num_workers=6, pin_memory=torch.cuda.is_available())
    return train_loader, val_loader, test_loader, input_size

In [6]:
train_loader, val_loader, test_loader, input_size = load_data('MNIST', {'batch_size':200})

  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


In [51]:
class FCNet(nn.Module):
    '''
    A simple fully connected model.
    '''
    def __init__(self, layer_size, dropout_prob):
        super(FCNet, self).__init__()
        self.layer_size = layer_size

        self.linears = nn.ModuleList([nn.Linear(n_in, n_out) for n_in, n_out in zip(self.layer_size[:-1], self.layer_size[1::])])
        self.dropout = nn.Dropout(p = dropout_prob)

    def forward(self, x):
        x = x.view(x.shape[0], -1)
        for l in self.linears[:-1]:
            x = F.relu(l(x))
            x = self.dropout(x)
        
        # no relu on last layer
        out = self.linears[-1](x)
        # out = F.relu(self.fc1())
        return out

In [52]:
evaluate([net], val_loader, [accuracy()])

array([[0.000442]])

In [134]:
def evaluate_fitness(genome):
    net = FCNet((784, 100, 10), 0)
    
    sd = net.state_dict()
    
    for i in range(len(net.linears)):
        sd[f'linears.{i}.weight'] = torch.tensor(genome[2*i].T)
        sd[f'linears.{i}.bias'] = torch.tensor(genome[2*i+1])
    
    net.load_state_dict(sd)
    return evaluate([net], val_loader, [accuracy()])

In [135]:
g = [np.random.randn(28*28, 100), np.random.randn(100), np.random.randn(100, 10), np.random.randn(10)]

In [136]:
evaluate_fitness(g)

array([[0.0007265]])