In [2]:
from typing import List, Optional
from overrides import overrides
import numpy as np
import pandas as pd
import torch
import torch.nn as nn


In [3]:
from archai.discrete_search.api import ArchaiModel


In [4]:
class MyModel(nn.Module):
    def __init__(self,nb_layers: int,kernel_size: int,hidden_dim:int):
        super().__init__()
        self.nb_layers=nb_layers
        self.kernel_size=kernel_size
        self.hidden_dim=hidden_dim

        layer_list = []

        for i in range(nb_layers):
            in_ch = (1 if i == 0 else hidden_dim)

            layer_list += [
                nn.Conv2d(in_ch, hidden_dim, kernel_size=kernel_size, padding=(kernel_size-1)//2),
                nn.BatchNorm2d(hidden_dim),
                nn.ReLU(),
            ]

        layer_list += [
            nn.AdaptiveAvgPool2d(output_size=(1, 1)),
            nn.Conv2d(hidden_dim, 10, kernel_size=1)
        ]

        self.model = nn.Sequential(*layer_list)
    def forward(self,x):
        return self.model(x).squeeze()
    def get_archid(self):
        return f'({self.nb_layers}, {self.kernel_size}, {self.hidden_dim})'


In [10]:
model_obj=MyModel(3,3,32)
model=ArchaiModel(arch=model_obj,archid=f'L={model_obj.nb_layers}, K={model_obj.kernel_size}, H={model_obj.hidden_dim}')


In [11]:
model.archid

'L=3, K=3, H=32'

In [12]:
model.arch

MyModel(
  (model): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU()
    (9): AdaptiveAvgPool2d(output_size=(1, 1))
    (10): Conv2d(32, 10, kernel_size=(1, 1), stride=(1, 1))
  )
)

In [13]:
import json
from random import Random
from archai.discrete_search.api import DiscreteSearchSpace

import json
from random import Random
from archai.discrete_search.api import DiscreteSearchSpace


class CNNSearchSpace(DiscreteSearchSpace):
    def __init__(self, min_layers: int = 1, max_layers: int = 12,
                 kernel_list=(1, 3, 5, 7), hidden_list=(16, 32, 64, 128),
                 seed: int = 1):

        self.min_layers = min_layers
        self.max_layers = max_layers
        self.kernel_list = kernel_list
        self.hidden_list = hidden_list

        self.rng = Random(seed)

    def get_archid(self, model: MyModel) -> str:
        return f'L={model.nb_layers}, K={model.kernel_size}, H={model.hidden_dim}'

    @overrides
    def random_sample(self) -> ArchaiModel:
        # Randomly chooses architecture parameters
        nb_layers = self.rng.randint(1, self.max_layers)
        kernel_size = self.rng.choice(self.kernel_list)
        hidden_dim = self.rng.choice(self.hidden_list)

        model = MyModel(nb_layers, kernel_size, hidden_dim)

        # Wraps model into ArchaiModel
        return ArchaiModel(arch=model, archid=self.get_archid(model))

    @overrides
    def save_arch(self, model: ArchaiModel, file_path: str=r'C:\Users\Aryan\Downloads\archai-main\archai-main\ff_test.json'):
        with open(file_path, 'w') as fp:
            json.dump({
                'nb_layers': model.arch.nb_layers,
                'kernel_size': model.arch.kernel_size,
                'hidden_dim': model.arch.hidden_dim
            }, fp)

    @overrides
    def load_arch(self, file_path: str)->ArchaiModel:
        config = json.load(open(file_path))
        model = MyModel(**config)

        return ArchaiModel(arch=model, archid=self.get_archid(model))

    @overrides
    def save_model_weights(self, model: ArchaiModel, file_path: str=r'C:\Users\Aryan\Downloads\archai-main\archai-main\ff_test.json'):
        state_dict = model.arch.get_state_dict()
        torch.save(state_dict, file_path)

    @overrides
    def load_model_weights(self, model: ArchaiModel, file_path: str=r'C:\Users\Aryan\Downloads\archai-main\archai-main\ff_test.json'):
        model.arch.load_state_dict(torch.load(file_path))


In [14]:
from archai.discrete_search.api.search_space import EvolutionarySearchSpace, BayesOptSearchSpace
from random import random
class CNNSearchSpaceExt(CNNSearchSpace, EvolutionarySearchSpace, BayesOptSearchSpace):
    ''' We are subclassing CNNSearchSpace just to save up space'''

    @overrides
    def mutate(self, arch: ArchaiModel) -> ArchaiModel:
        print(arch.arch.nb_layers)
        self.config = {
            'nb_layers': arch.arch.nb_layers,
            'kernel_size': arch.arch.kernel_size,
            'hidden_dim': arch.arch.hidden_dim
        }
        print(self.config)
        if random() < 0.2:
            self.config['nb_layers'] = self.rng.randint(self.min_layers, self.max_layers)

        if random() < 0.2:
            self.config['kernel_size'] = self.rng.choice(self.kernel_list)

        if random() < 0.2:
            self.config['hidden_dim'] = self.rng.choice(self.hidden_list)

        mutated_model = MyModel(**self.config)

        return ArchaiModel(
            arch=mutated_model, archid=self.get_archid(mutated_model)
        )

    @overrides
    def crossover(self, arch_list: List[ArchaiModel]) -> ArchaiModel:
        new_config = {
            'nb_layers': self.rng.choice([m.arch.nb_layers for m in arch_list]),
            'kernel_size': self.rng.choice([m.arch.kernel_size for m in arch_list]),
            'hidden_dim': self.rng.choice([m.arch.hidden_dim for m in arch_list]),
        }

        crossover_model = MyModel(**new_config)

        return ArchaiModel(
            arch=crossover_model, archid=self.get_archid(crossover_model)
        )

    @overrides
    def encode(self,arch: ArchaiModel) -> np.ndarray:
        return np.array([arch.nb_layers,arch.kernel_size, arch.hidden_dim])

In [15]:

ss = CNNSearchSpaceExt(max_layers=10, kernel_list=[3, 5, 7], hidden_list=[16, 32, 64])
m=ss.random_sample()
print(m.arch.nb_layers)

3


In [16]:
models = ss.random_sample()
print(m.archid) 
ss.mutate(models)

L=3, K=7, H=16
5
{'nb_layers': 5, 'kernel_size': 3, 'hidden_dim': 32}


ArchaiModel(
	archid=L=5, K=5, H=32, 
	metadata={}, 
	arch=MyModel(
  (model): Sequential(
    (0): Conv2d(1, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(32, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (4): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): Conv2d(32, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (7): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU()
    (9): Conv2d(32, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (10): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): ReLU()
    (12): Conv2d(32, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (13): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (14): ReLU()
    (

In [17]:
m1=ss.mutate(m)
print(m1.arch.nb_layers)

3
{'nb_layers': 3, 'kernel_size': 7, 'hidden_dim': 16}
3


Building a configsearchspace for initial experiments

In [18]:
import torch
import torch.nn as nn

from einops import rearrange


def conv_1x1_bn(inp, oup):
    return nn.Sequential(
        nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
        nn.BatchNorm2d(oup),
        nn.SiLU()
    )


def conv_nxn_bn(inp, oup, kernal_size=3, stride=1):
    return nn.Sequential(
        nn.Conv2d(inp, oup, kernal_size, stride, 1, bias=False),
        nn.BatchNorm2d(oup),
        nn.SiLU()
    )


class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.fn = fn
    
    def forward(self, x, **kwargs):
        return self.fn(self.norm(x), **kwargs)


class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout=0.):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.SiLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )
    
    def forward(self, x):
        return self.net(x)


class Attention(nn.Module):
    def __init__(self, dim, heads=8, dim_head=64, dropout=0.):
        super().__init__()
        inner_dim = dim_head *  heads
        project_out = not (heads == 1 and dim_head == dim)

        self.heads = heads
        self.scale = dim_head ** -0.5

        self.attend = nn.Softmax(dim = -1)
        self.to_qkv = nn.Linear(dim, inner_dim * 3, bias = False)

        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, dim),
            nn.Dropout(dropout)
        ) if project_out else nn.Identity()

    def forward(self, x):
        qkv = self.to_qkv(x).chunk(3, dim=-1)
        q, k, v = map(lambda t: rearrange(t, 'b p n (h d) -> b p h n d', h = self.heads), qkv)

        dots = torch.matmul(q, k.transpose(-1, -2)) * self.scale
        attn = self.attend(dots)
        out = torch.matmul(attn, v)
        out = rearrange(out, 'b p h n d -> b p n (h d)')
        return self.to_out(out)


class Transformer(nn.Module):
    def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout=0.):
        super().__init__()
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(nn.ModuleList([
                PreNorm(dim, Attention(dim, heads, dim_head, dropout)),
                PreNorm(dim, FeedForward(dim, mlp_dim, dropout))
            ]))
    
    def forward(self, x):
        for attn, ff in self.layers:
            x = attn(x) + x
            x = ff(x) + x
        return x


class MV2Block(nn.Module):
    def __init__(self, inp, oup, stride=1, expansion=4):
        super().__init__()
        self.stride = stride
        assert stride in [1, 2]

        hidden_dim = int(inp * expansion)
        self.use_res_connect = self.stride == 1 and inp == oup

        if expansion == 1:
            self.conv = nn.Sequential(
                # dw
                nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                nn.BatchNorm2d(hidden_dim),
                nn.SiLU(),
                # pw-linear
                nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                nn.BatchNorm2d(oup),
            )
        else:
            self.conv = nn.Sequential(
                # pw
                nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False),
                nn.BatchNorm2d(hidden_dim),
                nn.SiLU(),
                # dw
                nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                nn.BatchNorm2d(hidden_dim),
                nn.SiLU(),
                # pw-linear
                nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                nn.BatchNorm2d(oup),
            )

    def forward(self, x):
        if self.use_res_connect:
            return x + self.conv(x)
        else:
            return self.conv(x)


class MobileViTBlock(nn.Module):
    def __init__(self, dim, depth, channel, kernel_size, patch_size, mlp_dim, dropout=0.):
        super().__init__()
        self.ph, self.pw = patch_size

        self.conv1 = conv_nxn_bn(channel, channel, kernel_size)
        self.conv2 = conv_1x1_bn(channel, dim)

        self.transformer = Transformer(dim, depth, 4, 8, mlp_dim, dropout)

        self.conv3 = conv_1x1_bn(dim, channel)
        self.conv4 = conv_nxn_bn(2 * channel, channel, kernel_size)
    
    def forward(self, x):
        y = x.clone()

        # Local representations
        x = self.conv1(x)
        x = self.conv2(x)
        
        # Global representations
        _, _, h, w = x.shape
        x = rearrange(x, 'b d (h ph) (w pw) -> b (ph pw) (h w) d', ph=self.ph, pw=self.pw)
        x = self.transformer(x)
        x = rearrange(x, 'b (ph pw) (h w) d -> b d (h ph) (w pw)', h=h//self.ph, w=w//self.pw, ph=self.ph, pw=self.pw)

        # Fusion
        x = self.conv3(x)
        x = torch.cat((x, y), 1)
        x = self.conv4(x)
        return x


class MobileViT(nn.Module):
    def __init__(self, image_size, dims, channels, num_classes, expansion=4, kernel_size=3, patch_size=(2, 2)):
        super().__init__()
        ih, iw = image_size
        ph, pw = patch_size
        assert ih % ph == 0 and iw % pw == 0

        L = [2, 4, 3]

        self.conv1 = conv_nxn_bn(3, channels[0], stride=2)

        self.mv2 = nn.ModuleList([])
        self.mv2.append(MV2Block(channels[0], channels[1], 1, expansion))
        self.mv2.append(MV2Block(channels[1], channels[2], 2, expansion))
        self.mv2.append(MV2Block(channels[2], channels[3], 1, expansion))
        self.mv2.append(MV2Block(channels[2], channels[3], 1, expansion))   # Repeat
        self.mv2.append(MV2Block(channels[3], channels[4], 2, expansion))
        self.mv2.append(MV2Block(channels[5], channels[6], 2, expansion))
        self.mv2.append(MV2Block(channels[7], channels[8], 2, expansion))
        
        self.mvit = nn.ModuleList([])
        self.mvit.append(MobileViTBlock(dims[0], L[0], channels[5], kernel_size, patch_size, int(dims[0]*2)))
        self.mvit.append(MobileViTBlock(dims[1], L[1], channels[7], kernel_size, patch_size, int(dims[1]*4)))
        self.mvit.append(MobileViTBlock(dims[2], L[2], channels[9], kernel_size, patch_size, int(dims[2]*4)))

        self.conv2 = conv_1x1_bn(channels[-2], channels[-1])

        self.pool = nn.AvgPool2d(ih//32, 1)
        self.fc = nn.Linear(channels[-1], num_classes, bias=False)

    def forward(self, x):
        x = self.conv1(x)
        x = self.mv2[0](x)

        x = self.mv2[1](x)
        x = self.mv2[2](x)
        x = self.mv2[3](x)      # Repeat

        x = self.mv2[4](x)
        x = self.mvit[0](x)

        x = self.mv2[5](x)
        x = self.mvit[1](x)

        x = self.mv2[6](x)
        x = self.mvit[2](x)
        x = self.conv2(x)

        x = self.pool(x).view(-1, x.shape[1])
        x = self.fc(x)
        return x


def mobilevit_xxs():
    dims = [64, 80, 96]
    channels = [16, 16, 24, 24, 48, 48, 64, 64, 80, 80, 320]
    return MobileViT((256, 256), dims, channels, num_classes=1000, expansion=2)


def mobilevit_xs():
    dims = [96, 120, 144]
    channels = [16, 32, 48, 48, 64, 64, 80, 80, 96, 96, 384]
    return MobileViT((256, 256), dims, channels, num_classes=1000)


def mobilevit_s():
    dims = [144, 192, 240]
    channels = [16, 32, 64, 64, 96, 96, 128, 128, 160, 160, 640]
    return MobileViT((256, 256), dims, channels, num_classes=1000)


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


if __name__ == '__main__':
    img = torch.randn(5, 3, 256, 256)

    vit = mobilevit_xxs()
    out = vit(img)
    print(out.shape)
    print(count_parameters(vit))

    vit = mobilevit_xs()
    out = vit(img)
    print(out.shape)
    print(count_parameters(vit))

    vit = mobilevit_s()
    out = vit(img)
    print(out.shape)
    print(count_parameters(vit))


torch.Size([5, 1000])
1331472
torch.Size([5, 1000])
2382944
torch.Size([5, 1000])
5636720


Search Objectives + Search Algorithm

In [19]:
from archai.discrete_search.api import SearchObjectives
from archai.discrete_search.evaluators import TorchFlops
objectives = SearchObjectives()

ImportError: cannot import name 'TorchFlops' from 'archai.discrete_search.evaluators' (c:\Users\Aryan\Downloads\archai-main\archai-main\archai\discrete_search\evaluators\__init__.py)

Define Objective, in this case Latency

In [29]:
from archai.discrete_search.evaluators.pt_profiler import TorchFlops
ss = CNNSearchSpaceExt(max_layers=10, kernel_list=[3, 5, 7], hidden_list=[16, 32, 64])
objectives = SearchObjectives()

In [30]:

arch=model
objectives.add_objective(
    'FLOPs', TorchFlops(torch.randn(1, 1, 28, 28)),
    higher_is_better=False,
    compute_intensive=False,
    # We may optionally add a constraint.
    # Architectures outside this range will be ignored by the search algorithm
    # constraint=(0.0, 1e9)
)

In [31]:
from archai.discrete_search.algos import EvolutionParetoSearch
algo = EvolutionParetoSearch(
    ss, objectives,
    output_dir='./out_evo',
    num_iters=5, num_crossovers=5,
    mutations_per_parent=5,
    max_unseen_population=10,
    save_pareto_model_weights=False,
    seed=42
)

In [32]:
search_results = algo.search()
# search_results

2023-07-04 12:18:28,997 - archai.discrete_search.algos.evolution_pareto — INFO —  Using 10 random architectures as the initial population ...
2023-07-04 12:18:29,016 - archai.discrete_search.algos.evolution_pareto — INFO —  Iteration 1/5
2023-07-04 12:18:29,018 - archai.discrete_search.algos.evolution_pareto — INFO —  Calculating search objectives ['FLOPs'] for 10 models ...
2023-07-04 12:18:29,094 - archai.discrete_search.algos.evolution_pareto — INFO —  Updating Pareto frontier ...
2023-07-04 12:18:29,095 - archai.discrete_search.algos.evolution_pareto — INFO —  Found 1 members.
2023-07-04 12:18:29,101 - archai.discrete_search.algos.evolution_pareto — INFO —  Optimzing memory usage ...
2023-07-04 12:18:29,104 - archai.discrete_search.algos.evolution_pareto — INFO —  Choosing 1 parents ...
2023-07-04 12:18:29,105 - archai.discrete_search.algos.evolution_pareto — INFO —  wtf man
2023-07-04 12:18:29,106 - archai.discrete_search.algos.evolution_pareto — INFO —  L=1, K=7, H=64
2023-07-04 

NameError: name 'mutated' is not defined

__main__.CNNSearchSpace