In [32]:
import scipy

import numpy as np


from numba import njit
from copy import deepcopy
from sklearn import metrics
from abc import abstractmethod

In [None]:
class Collaborator:
    """
    An abstract class representing a collaborator.
    Collaborators can be of different types (running with different algorithms)
    e.g. Gtms, gaussian mixture models, other mixture models, or any kind of probabilistic model.
    """
    
    
    @njit
    def __init__(self, data_Id:np.array, X:np.array, Y=None, K=3, use_criterion=False, *args, **kwargs):
        """
        Parameters:
        data_Id: np.array(int)
            N-dimensional array, Id of each individual.
        X: np.ndarray(float)
            N*D array of features.
        use_criterion: bool
            Whether to use criterion to accept collaboration or not. Default to False.
            
        Optional:
        Y : np.array(int)
            N-dimensional array, labels. Default to None.
        K : int
            Number of clusters. Default to 3.
        add_noise: bool
            Whether to add white noise or not. Default to False.
        noise_scale: float
            If add_noise==True, the variance of the noise in each dimension is equal to this value
            multiplied by the variance of the data. Default to 0.1.
            
        define:
        self.Id: int
            Id of the collaborator. Set by the setter method.
        self.K: int
            The number of clusters.
        self.N: int
            Number of lines in the dataset.
        self.D: int
            Dimensionality of the data.
        self.data_Id: np.array(int)
            N-dimensional array, Id of each individual.
        self.X: np.array
        self.R: np.array
            Partition matrix.
        self.history_R: list(np.array)
            List of all partition matrices.
        self.H: np.array
            N-dimensional array. Entropy of the classification of each individual.
        self.use_criterion: bool
            Whether to use criterion to accept collaboration or not. Default to False.
        self.criterion: float
            Current value of the criterion used to decide whether to accept collaboration.
            Computed with self.get_criterion()
        """
        
        self.Id = None # set by the master algorithm using self.set_id
        self.K = K
        self.N, self.D= None, None
        self.data_Id, self.X = data_Id, self.parseX(X, **kwargs)
        if Y:
            self.Y = deepcopy(Y)
        self.R = np.zeros((N, K))
        self.history_R = []
        self.H = np.zeros(N)
        self.use_criterion, self.criterion = use_criterion, None
        self.db, self.purity, self.silhouette = None, None, None
        
            
    
    @njit
    def parseX(self, X, *args, **kwargs):
        """
        parse the dataset
        """
        
        res = deepcopy(X)
        
        # we want a 2-D array
        if res.ndim == 1:
            res = res.reshape(-1, 1)
        self.N, self.D = res.shape
        
        # If add_noise is set to True, then add noise
        if kwargs.get('add_noise', False):
            std = np.std(res, axis=0)
            noise_std = kwargs.get('noise_scale', .1) * np.diag(std)
            noise = scipy.random.multivariate_normal(mean=np.zeros(self.D), cov=noise_std, size=self.N)
        res += noise
        
        return res
        
        
    @abstractmethod
    def local_step(self):
        """
        Fit the parameters to the dataset.
        Add first partition matrix to history.
        Also initialize the validation indices, and in particular the criterion.
        """
        pass
    
    
    @abstractmethod
    def log_local(self):
        """
        First log, after local step. Log the values of the various validation indices (db, purity, silhouette).
        
        Returns:
            log: dict
            Dictionary with the values to log: at least the validation indices (db, purity, silhouette).
        """
        
        self.db = self.compute_db()
        self.purity = self.compute_purity()
        self.silhouette = self.compute_silhouette()
        
        res = {
            "db":self.db,
            "purity":self.purity,
            "silhouette":self.silhouette
        }
        
        """
        TODO:
        add Normalized Mutual Information.
        """
        
        return res
    
    
    @njit
    def collaborate(self, remote_Ids, remote_Rs): # horizontal collab for now
        """
        Compute a new collaboration matrix, given all remote matrices.
        
        Parameters:
        remote_Ids: list(int)
            List of Ids of the collaborators.
        remote_Rs: list(np.array)
            List of P N*K(p)-dimensional arrays.
            Where K(p) is the number of clusters in data site number p (p=1,...,P).
            
        returns:
            res: np.array
                N*K array, the collaborated partition matrix.
            confidence_coefficients: np.array
                P-dimensional array. The confidence coefficient of collab with each remote site.
        """
        
        # number of collaborators
        P = len(remote_Rs) + 1
        
        # vector of confidence coefficients.
        confidence_coefficients = np.zeros(P)
        
        # res
        res = np.zeros_like(self.R)
        
        # entropy of local classification
        local_H = self.compute_entropy(self.R)
            
        for p, (remote_Id, remote_R) in enumerate(zip(remote_Ids, remote_Rs)):
            # optimal transport
            remote_R = self.optimal_transport(remote_R)
            remote_H = self.compute_entropy(remote_R)
            # compute the local and remote coefficients (one coeff for each individual)
            l, r = (1/(P-1)) * remote_H * (1-local_H), local_H * (1-remote_H)
            res += l * self.R + r * remote_R
            # update confidence vector
            confidence_coefficients[remote_Id] += r.sum()
            confidence_coefficients[self.Id] += l.sum()
            
        # normalize partition matrix
        res /= res.sum(axis=1, keepdims=True)
        
        # decide whether to accept collaboration
        if self.use_criterion:
            self.refit(res)
            
        
            
            
    @njit
    def future_collaborate(self, remote_Ids, remote_data_Ids, remote_Rs): # horizontal collab for now
        """
        Compute a new collaboration matrix, given all remote matrices.
        
        Parameters:
        remote_Ids: list(int)
            List of Ids of the collaborators.
        remote_data_Ids: list(np.array)
            List of P N(p)-dimensional arrays, Id of each individual.
            Where N(p) is the number of observations available on data site number p (p=1,...,P).
        remote_Rs: list(np.array)
            List of P N(p)*K(p)-dimensional arrays.
            Where K(p) is the number of clusters in data site number p (p=1,...,P).
            
        returns:
            The collaborated partition matrix
        """
        
        # number of collaborators
        P = len(remote_Rs) + 1
        
        # Ids and partition matrices after left join.
        lj_remote_data_Ids, lj_remote_Rs = [], []
        # partition matrices after optimal transport.
        transported_remote_Rs = []
        # vector of confidence coefficients.
        confidence_coefficient = np.zeros(P)
        
        #local_component, remote_component = np.zeros_like(self.R), np.zeros_like(self.R)
        
        # we need to know, for each individual, how many data sites have it.
        count = np.zeros_like(self.data_Id)
        
        for p, (remote_data_Id, remote_R) in enumerate(zip(remote_data_Ids, remote_Rs)):
            # keep only common observations
            remote_data_Id, remote_R = self.left_join(remote_data_Id, remote_R)
            lj_remote_data_Ids.append(deepcopy(remote_data_Id))
            lj_remote_Rs.append(deepcopy(remote_R))
            # update the count variable
            count += self.find_indices(self.data_Id, lj_remote_data_Id)
            
            
        
        for p, (remote_data_Id, remote_R) in enumerate(zip(remote_data_Ids, remote_Rs)):
            
            
            # optimal transport
            remote_R = self.optimal_transport(remote_R)
            remote_H = self.compute_entropy(remote_R)
            # compute the local and remote coefficients (one coeff for each individual)
            l, r = remote_H * (1-self.H), self.H * (1-remote_H)
            local_component += l
            remote_component += r
            
    
    @abstractmethod
    def log_collab(self):
        """
        Log the results of a collaboration step:
        the validation indices (db, purity, silhouette) and the confidence vector.
        """
        pass
    
    
    def get_partition_matrix(self):
        """
        Accessor. Returns the partition matrix.
        """
        
        return self.R
    
    
    def set_id(self, Id):
        """
        Mutator
        """
        self.Id = Id
        
        
    def get_id(self):
        """
        Accessor
        """
        return self.Id
    
        
    def compute_db(self, resp=None):
        """
        compute the DB index of a dataset, given a clustering for this dataset

        Args:
            resp: array-like, (n_samples, n_clusters)
                reponsibility matrix

        Returns:
            float, the DB index
        """

        resp = resp if resp is not None else self.R
        
        try:
            # a hard partition is required
            y_pred = resp.argmax(axis=1)

            return metrics.davies_bouldin_score(self.X, y_pred)

        except:
            return None
    
    
    def compute_purity(self, y_true=None, y_pred=None):
        """
        compute the purity score of a clustering

        Args:
            y_true: array-like, (n_samples,)
                labels of each observation
            y_pred: array-like, (n_samples,) or (n_samples, n_clusters)
                predicted hard clustering

        Returns: float
                purity score.
        """
        
        # if we do not have the labels, return None.
        if y_true == None:
            return None
        
        y_pred = y_pred if y_pred is not None else self.R
        if y_pred.ndim == 2:
            y_pred = np.argmax(y_pred, axis=1)

        # compute contingency matrix (also called confusion matrix).
        contingency_matrix = metrics.cluster.contingency_matrix(y_true, y_pred)

        return np.sum(np.amax(contingency_matrix, axis=0)) / np.sum(contingency_matrix)
    
    
    def compute_silhouette(self, y_pred=None):
        """
        Compute the silhouette index of the classification.
        Args:
            y_pred: array-like, (n_samples,) or (n_samples, n_clusters)
                predicted hard clustering or partition matrix.
            
        Returns: float
            silhouette index.
        """
        
        y_pred = y_pred if y_pred is not None else self.R
        if y_pred.ndim == 2:
            y_pred = np.argmax(y_pred, axis=1)
        
        try:
            return metrics.silhouette_score(self.X, y_pred)
        except:
            return None

# Test function to find indices

In [73]:
Id = np.array([3, 5, 1, 7, 9])
count = np.zeros_like(Id)
present = np.array([1, 7])

In [74]:
def find_indices(Id, present):
    res = np.zeros_like(Id)
    for x in present:
        res[np.where(Id==x)]+=1
    return res

@njit
def fast_find_indices(Id, present):
    res = np.zeros_like(Id)
    for x in present:
        res[np.where(Id==x)]+=1
    return res

In [78]:
%%timeit
find_indices(Id, present)

9.06 µs ± 270 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [79]:
%%timeit
fast_find_indices(Id, present)

772 ns ± 12.1 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


In [24]:
class my_list:
    def __init__(self, *args, **kwargs):
        self.values = []
        for e in args:
            self.values.append(e)
            
    def __eq__(self, other):
        try:
            assert isinstance(other, my_list)
        except:
            print("only compare to another instance of my_list")
            return False
        if len(self.values) != len(other.values):
            print("length difference: {} != {}".format(len(self.values), len(other.values)))
            return False
        for s, o in zip(self.values, other.values):
            if s != o:
                print("{} != {}".format(s, o))
                return False
        return True

In [71]:
a = my_list(1, 2)
b = my_list(1, 2)
c = my_list(1, 2, 3)
d = [1, 2]
e = my_list(1, 3)

In [72]:
a==b

True