In [None]:
!pip install timm

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import torch
import torch.nn as nn
import timm
from torch.nn import functional as F
from torchsummary import summary
from sklearn.neighbors import NearestNeighbors
import numpy as np
# from model import Head, MultiCrop,DinoLoss
# from Augmentation import DataAugmentation
# from PIL import ImagePath
# from torchvision.datasets import ImageFolder
# import pathlib
# from torch.utils.data import DataLoader, SubsetRandomSampler
# model and dim values


mobile_models = {
    'mobilevit_s':640,
    'mobilevit_xs':640,
    'mobilevit_xxs':640,
    'mobilenetv2_035':640,
    'mobilenetv2_075':640,
    'mobilenetv2_100':640,
    'resnet5m':512,   
}

class mobilenet(nn.Module):
    def __init__(self,
                 model:str = 'mobilevit_s',
                 pretrained=True):
        super(mobilenet,self).__init__()
        self.backbone = timm.create_model(model,pretrained=pretrained)
        self.backbone.reset_classifier(0)
        self.num_features = self.backbone.num_features

    def forward(self,x):
        x = self.backbone(x)
        return x

class MultiCrop(nn.Module):
    """
    backbone: timm.models.vision_transformer.VisionTransformer
    new_head: head

    """

    def __init__(self,
                 backbone,
                 new_head,
                 mobile_head=False
                 ) -> None:
        super().__init__()
        self.mobile_head =mobile_head 

        #setting up the model
        self.backbone = backbone
        backbone.head= nn.Identity()
        self.new_head = new_head


    def forward(self,x):
        """
        x is List of torch.Tensor of shape (n_samples, 3,size,size)
        
        """
        n_crops = len(x)
        #print("len of batch ",len(x))
        concatenated_tensor = torch.cat(x,dim=0) 
        # (n_samples*n_crops, 3, size, size)
        # example batch size of 64 we have [640,3, 224,224] for size crops of 10: 2G,8L
        
        #print("shape of concat tensor",concatenated_tensor.shape)
        cls_embedding = self.backbone(concatenated_tensor) # (n_samples * n_crops, in_dim)
        #print(cls_embedding.shape, "cls embedding")
        logits =self.new_head(cls_embedding) # n_samples * n_crops, out_dim

        chunks = logits.chunk(n_crops) # n_crops * (n_samples,outdim)
        
        return chunks


class Head(nn.Module):
    def __init__(self,
                 in_dim,
                 out_dim,
                 hidden_dim = 512,
                 bottleneck_dim = 256,
                 n_layers =3,
                 norm_last_layer=False,
                 init_weights=["normal",""] # yet to define
                 ) -> None:
        super().__init__()
        
        # create a Multilayer perceptron based on the layer number from in dim to out dim
       
        if n_layers ==1:
            self.mlp =nn.Linear(in_dim, bottleneck_dim)
        else:
            layers = [nn.Linear(in_dim, hidden_dim)]
            layers.append(nn.SELU())
            for _ in range(n_layers-2):
                layers.append(nn.Linear(hidden_dim,hidden_dim))
                layers.append(nn.SELU())
            layers.append(nn.Linear(hidden_dim,bottleneck_dim))
            self.mlp = nn.Sequential(*layers)
        
        
        self.apply(self._init_weights)
        self.last_layer = nn.utils.weight_norm(
            nn.Linear(bottleneck_dim,out_dim,bias=False)
        )
        self.last_layer.weight_g.data.fill_(1)
        if norm_last_layer:
            self.last_layer.weight_g.requires_grad=False
        
    def _init_weights(self,m):
        if isinstance(m,nn.Linear):
            nn.init.normal_(m.weight,std=0.02)
            if m.bias is not None:
                nn.init.constant_(m.bias,0)

    def forward(self,x):
        x= self.mlp(x)
        x= F.normalize(x,dim=-1,p=2)
        x=self.last_layer(x)
        return x
    


import torch.nn as nn
import timm
from torch.nn import functional as F



class ResBlock(nn.Module):
    def __init__(self,
                inchannels,
                outchannels,
                kernel_size=3,
                stride=1,
                skip=True):
        super().__init__()
        # Determines whether to add the identity mapping skip connection
        self.skip = skip
        
        # First block of the residual connection
        self.block = nn.Sequential(
            nn.Conv2d(inchannels,
                    outchannels,
                    kernel_size=kernel_size,
                    stride=stride,
                    padding=1,
                    bias=False),
            nn.BatchNorm2d(outchannels),
            nn.ReLU(inplace=True),
            nn.Conv2d(outchannels,
                    outchannels,
                    kernel_size=kernel_size,
                    padding=1,
                    bias=False),
            nn.BatchNorm2d(outchannels),
        )
        
        # If the stride is 2 or input channels and output channels do not match,
        # then add a convolutional layer and a batch normalization layer to the identity mapping
        if stride == 2 or inchannels != outchannels:
            self.skip = False
            self.skip_conv = nn.Conv2d(inchannels, outchannels, kernel_size=1, stride=stride, bias=False)
            self.skip_bn = nn.BatchNorm2d(outchannels)

    def forward(self, x):
        out = self.block(x)
        
        # If the skip connection is active, add the input to the output
        # If the skip connection is not active, add the skip connection to the output
        if not self.skip:
            out += self.skip_bn(self.skip_conv(x))
        else:
            out += x
        
        out = F.relu(out.clone())
        return out


class ResNet5M(nn.Module):
    def __init__(self):
        super().__init__()
        
        # Initial convolutional layer and batch normalization
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
        
        # Residual blocks
        self.resblock3 = ResBlock(64, 64, stride=1)
        self.resblock6 = ResBlock(64, 64, stride=1)
        self.resblock7 = ResBlock(64, 64, stride=1)
        self.resblock8 = ResBlock(64, 128, stride=2)
        self.resblock9 = ResBlock(128, 128, stride=1)
        self.resblock10 = ResBlock(128, 128, stride=1)
        self.resblock11 = ResBlock(128, 128, stride=1)
        self.resblock12 = ResBlock(128, 128, stride=1)
        self.resblock13 = ResBlock(128, 128, stride=1)
        self.resblock14 = ResBlock(128, 512, stride=2)
        
        # Global average pooling and fully-connected layer
        self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
        self.flat = nn.Flatten()
        # self.fc = nn.Linear(in_features=512, out_features=10, bias=True)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x.clone())
        x = self.maxpool(x)
        x = self.resblock3(x)
        x = self.resblock6(x)
        x = self.resblock7(x)
        x = self.resblock8(x)
        x = self.resblock9(x)
        x = self.resblock10(x)
        x = self.resblock11(x)
        x = self.resblock12(x)
        x = self.resblock13(x)
        x = self.resblock14(x)
        x = self.avgpool(x)
        x = self.flat(x)
        # x = self.fc(x) 
        return x



In [None]:
# model = timm.create_model('mobilevit_s', pretrained=True).to('cuda')

# model.reset_classifier(0)

# model=torch.hub.load('facebookresearch/dino:main', 'dino_resnet50').to('cuda')
# model=torch.hub.load('facebookresearch/dinov2', 'dinov2_vits14').to('cuda')

# print(model)

In [None]:
# student_vit=mobilenet()
student_vit=ResNet5M()
model = MultiCrop(
        student_vit,
        Head(
            512,
            1024
        ),
    )
model=nn.DataParallel(model)
# model = timm.create_model('mobilevit_s', pretrained=True)


# # torch.save(model.state_dict(),'test.pth')
m=torch.randn(1,3,224,224).to('cuda')
with torch.no_grad():
    o1 = model.module.backbone(m)
checkpoint = torch.load('./resnet5m_student_model_epoch32.pth')



model.load_state_dict(checkpoint['model_state_dict'])
# o2=model.backbone(m)

# model=model.to('cuda')
o1.shape

torch.Size([1, 512])

In [None]:
import torchvision.datasets as datasets
import torchvision.transforms as transforms

transform = transforms.Compose([
    
    transforms.ToTensor(),
    transforms.Resize((56,56)),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) 
])

cifar_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:05<00:00, 29844033.92it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data


In [None]:
import torch.utils.data as data

batch_size = 32
dataloader = data.DataLoader(cifar_dataset, batch_size=batch_size, shuffle=False)


In [None]:
import numpy as np

num_samples = len(cifar_dataset)
embedding_size = 512
embeddings = np.zeros((num_samples, embedding_size))

In [None]:
model.eval()
model=model.to('cuda')
with torch.no_grad():
    image_idx = 0
    for images, _ in dataloader:
        batch_size = images.size(0)
        images = images.to('cuda')  
        
        outputs = model.module.backbone(images)
        
        embeddings[image_idx:image_idx+batch_size] = outputs.squeeze().cpu().numpy()
        
        image_idx += batch_size




In [None]:
embeddings.shape
np.save("embeddings.npy", embeddings)

In [None]:
k = 5
knn = NearestNeighbors(n_neighbors=k)
knn.fit(embeddings)

test_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False)


output_tensor = []

with torch.no_grad():
    for images, _ in test_loader:
        images = images.to('cuda') 
        output = model.module.backbone(images)
        output_tensor.append(output.squeeze().cpu().numpy())

output_tensor = np.stack(output_tensor)
_, indices = knn.kneighbors(output_tensor)



Files already downloaded and verified




In [None]:
label_array = cifar_dataset.targets
true_test_labels=test_dataset.targets

In [None]:
test_labels = []
for i in range(len(indices)):
    train_indices = indices[i]
    first_train_label = label_array[train_indices[0]]
    test_labels.append(first_train_label)


In [None]:
from sklearn.metrics import accuracy_score

accuracy_score(true_test_labels,test_labels)

0.3433

In [None]:
from keras.models import Sequential
from keras.layers import Dense

# Define the model
model = Sequential()
model.add(Dense(512, input_dim=640, activation='relu'))
model.add(Dense(100, activation='softmax'))

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])



# Convert labels to one-hot encoding if necessary
# Assuming label_array contains class indices (0 to 9) for each image
from keras.utils import to_categorical
labels = to_categorical(label_array, num_classes=100)

# Train the model
model.fit(embeddings, labels, epochs=20, batch_size=32)


In [None]:
linear_preds=model.predict(output_tensor)
predicted_labels = np.argmax(linear_preds, axis=1)



In [None]:
predicted_labels

array([3, 8, 8, ..., 5, 1, 7])

In [None]:
accuracy_score(predicted_labels,true_test_labels)

0.7034