# Clustering de imagenes y búsqueda de imagenes

INCOMPLETO.

**Referencias**

- [Image Clustering Implementation with PyTorch](https://towardsdatascience.com/image-clustering-implementation-with-pytorch-587af1d14123)

In [3]:
import torch
from torch import nn
from torchvision import models

## Encoder

In [4]:
class EncoderVGG(nn.Module):
    '''Encoder of image based on the architecture of VGG-16 with batch normalization.
    Args:
        pretrained_params (bool, optional): If the network should be populated with pre-trained VGG parameters.
            Defaults to True.
    '''
    channels_in = 3
    channels_code = 512

    def __init__(self, pretrained_params=True):
        super(EncoderVGG, self).__init__()

        vgg = models.vgg16_bn(pretrained=pretrained_params)
        del vgg.classifier
        del vgg.avgpool

        self.encoder = self._encodify_(vgg)
        
    def _encodify_(self, encoder):
        '''Create list of modules for encoder based on the architecture in VGG template model.
        In the encoder-decoder architecture, the unpooling operations in the decoder require pooling
        indices from the corresponding pooling operation in the encoder. In VGG template, these indices
        are not returned. Hence the need for this method to extent the pooling operations.
        Args:
            encoder : the template VGG model
        Returns:
            modules : the list of modules that define the encoder corresponding to the VGG model
        '''
        modules = nn.ModuleList()
        for module in encoder.features:
            if isinstance(module, nn.MaxPool2d):
                module_add = nn.MaxPool2d(kernel_size=module.kernel_size,
                                          stride=module.stride,
                                          padding=module.padding,
                                          return_indices=True)
                modules.append(module_add)
            else:
                modules.append(module)

        return modules
    
    def forward(self, x):
        '''Execute the encoder on the image input
        Args:
            x (Tensor): image tensor
        Returns:
            x_code (Tensor): code tensor
            pool_indices (list): Pool indices tensors in order of the pooling modules
        '''
        pool_indices = []
        x_current = x
        for module_encode in self.encoder:
            output = module_encode(x_current)

            # If the module is pooling, there are two outputs, the second the pool indices
            if isinstance(output, tuple) and len(output) == 2:
                x_current = output[0]
                pool_indices.append(output[1])
            else:
                x_current = output

        return x_current, pool_indices 

## Decoder

In [5]:
class DecoderVGG(nn.Module):
    '''Decoder of code based on the architecture of VGG-16 with batch normalization.
    Args:
        encoder: The encoder instance of `EncoderVGG` that is to be inverted into a decoder
    '''
    channels_in = EncoderVGG.channels_code
    channels_out = 3

    def __init__(self, encoder):
        super(DecoderVGG, self).__init__()

        self.decoder = self._invert_(encoder)
        
    def _invert_(self, encoder):
        '''Invert the encoder in order to create the decoder as a (more or less) mirror image of the encoder
        The decoder is comprised of two principal types: the 2D transpose convolution and the 2D unpooling. The 2D transpose
        convolution is followed by batch normalization and activation. Therefore as the module list of the encoder
        is iterated over in reverse, a convolution in encoder is turned into transposed convolution plus normalization
        and activation, and a maxpooling in encoder is turned into unpooling.
        Args:
            encoder (ModuleList): the encoder
        Returns:
            decoder (ModuleList): the decoder obtained by "inversion" of encoder
        '''
        modules_transpose = []
        for module in reversed(encoder):

            if isinstance(module, nn.Conv2d):
                kwargs = {'in_channels' : module.out_channels, 'out_channels' : module.in_channels,
                          'kernel_size' : module.kernel_size, 'stride' : module.stride,
                          'padding' : module.padding}
                module_transpose = nn.ConvTranspose2d(**kwargs)
                module_norm = nn.BatchNorm2d(module.in_channels)
                module_act = nn.ReLU(inplace=True)
                modules_transpose += [module_transpose, module_norm, module_act]

            elif isinstance(module, nn.MaxPool2d):
                kwargs = {'kernel_size' : module.kernel_size, 'stride' : module.stride,
                          'padding' : module.padding}
                module_transpose = nn.MaxUnpool2d(**kwargs)
                modules_transpose += [module_transpose]

        # Discard the final normalization and activation, so final module is convolution with bias
        modules_transpose = modules_transpose[:-2]

        return nn.ModuleList(modules_transpose)
    
    def forward(self, x, pool_indices):
        '''Execute the decoder on the code tensor input
        Args:
            x (Tensor): code tensor obtained from encoder
            pool_indices (list): Pool indices Pytorch tensors in order the pooling modules in the encoder
        Returns:
            x (Tensor): decoded image tensor
        '''
        x_current = x

        k_pool = 0
        reversed_pool_indices = list(reversed(pool_indices))
        for module_decode in self.decoder:

            # If the module is unpooling, collect the appropriate pooling indices
            if isinstance(module_decode, nn.MaxUnpool2d):
                x_current = module_decode(x_current, indices=reversed_pool_indices[k_pool])
                k_pool += 1
            else:
                x_current = module_decode(x_current)

        return x_current   

## Autoencoder

In [None]:
class AutoEncoderVGG(nn.Module):
    '''Auto-Encoder based on the VGG-16 with batch normalization template model. The class is comprised of
    an encoder and a decoder.
    Args:
        pretrained_params (bool, optional): If the network should be populated with pre-trained VGG parameters.
            Defaults to True.
    '''
    channels_in = EncoderVGG.channels_in
    channels_code = EncoderVGG.channels_code
    channels_out = DecoderVGG.channels_out

    def __init__(self, pretrained_params=True):
        super(AutoEncoderVGG, self).__init__()

        self.encoder = EncoderVGG(pretrained_params=pretrained_params)
        self.decoder = DecoderVGG(self.encoder.encoder)
        
    def forward(self, x):
        '''Forward the autoencoder for image input
        Args:
            x (Tensor): image tensor
        Returns:
            x_prime (Tensor): image tensor following encoding and decoding
        '''
        code, pool_indices = self.encoder(x)
        x_prime = self.decoder(code, pool_indices)

        return x_prime

## Local aggregation loss

In [10]:
import torch.nn.functional as F

import numpy as np

from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import KMeans
from sklearn.preprocessing import normalize
from scipy.spatial.distance import cosine as cosine_distance

class LocalAggregationLoss(nn.Module):
    '''Local Aggregation Loss module from "Local Aggregation for Unsupervised Learning of Visual Embeddings" by
    Zhuang, Zhai and Yamins (2019), arXiv:1903.12355v2
    '''
    def __init__(self, temperature,
                 k_nearest_neighbours, clustering_repeats, number_of_centroids,
                 memory_bank,
                 kmeans_n_init=1, nn_metric=cosine_distance, nn_metric_params={}):
        super(LocalAggregationLoss, self).__init__()

        self.temperature = temperature
        self.memory_bank = memory_bank

        self.neighbour_finder = NearestNeighbors(n_neighbors=k_nearest_neighbours + 1,
                                                 algorithm='ball_tree',
                                                 metric=nn_metric, metric_params=nn_metric_params)
        self.clusterer = []
        for k_clusterer in range(clustering_repeats):
            self.clusterer.append(KMeans(n_clusters=number_of_centroids,
                                         init='random', n_init=kmeans_n_init))
            
    def forward(self, codes, indices):
        '''Forward pass for the local aggregation loss module'''
        assert codes.shape[0] == len(indices)

        codes = codes.type(torch.DoubleTensor)
        code_data = normalize(codes.detach().numpy(), axis=1)

        # Compute and collect arrays of indices that define the constants in the loss function. Note that
        # no gradients are computed for these data values in backward pass
        self.memory_bank.update_memory(code_data, indices)
        
        background_neighbours = self._nearest_neighbours(code_data, indices)
        close_neighbours = self._close_grouper(indices)
        neighbour_intersect = self._intersecter(background_neighbours, close_neighbours)

        # Compute the probability density for the codes given the constants of the memory bank
        v = F.normalize(codes, p=2, dim=1)
        d1 = self._prob_density(v, background_neighbours)
        d2 = self._prob_density(v, neighbour_intersect)
        
        return torch.sum(torch.log(d1) - torch.log(d2)) / codes.shape[0]            

## Memory bank

In [None]:
class MemoryBank(object):
    '''Memory bank
    Args:
        n_vectors (int): Number of vectors the memory bank should hold
        dim_vector (int): Dimension of the vectors the memory bank should hold
        memory_mixing_rate (float, optional): Fraction of new vector to add to currently stored vector. The value
            should be between 0.0 and 1.0, the greater the value the more rapid the update. The mixing rate can be
            set during calling `update_memory`.
    '''
    def __init__(self, n_vectors, dim_vector, memory_mixing_rate):

        self.dim_vector = dim_vector
        self.vectors = np.array([marsaglia(dim_vector) for _ in range(n_vectors)])
        self.memory_mixing_rate = memory_mixing_rate
        self.mask_init = np.array([False] * n_vectors)

    def update_memory(self, vectors, index):
        '''Update the memory with new vectors'''
        if isinstance(index, int):
            self.vectors[index] = self._update_(vectors, self.vectors[index])

        elif isinstance(index, np.ndarray):
            for ind, vector in zip(index, vectors):
                self.vectors[ind] = self._update_(vector, self.vectors[ind])

    def mask(self, inds_int):
        '''Construct a Boolean mask given integer indices'''
        ret_mask = []
        for row in inds_int:
            row_mask = np.full(self.vectors.shape[0], False)
            row_mask[row.astype(int)] = True
            ret_mask.append(row_mask)

        return np.array(ret_mask)

    def _update_(self, vector_new, vector_recall):
        return vector_new * self.memory_mixing_rate + vector_recall * (1.0 - self.memory_mixing_rate)
