In [1]:
import numpy as np
import pandas as pd
import random
import time
import sklearn as skl

from sklearn.cluster import KMeans

random.seed(1234)

In [33]:
class experimento:
    # pontos é um np array com pontos nas linhas
    # resposta é um np array com as labels dos pontos
    def __init__(self, K, pontos, labels, p=1):
        # Parâmetro da distancia de Minkowski
        self.p = p
        self.N = pontos.shape[0]
        self.K = K
        
        self.pontos = pontos
        self.labels = labels
        
        # Calcula a matriz de distância usando norma p 
        t_inicial = time.time()
        self._calcular_distancias()
        t_final = time.time()
        print("Tempo para calcular matrix de dist: ", t_final - t_inicial)
        
    def _calcular_distancias(self):
        """
        # Calculando dist matrix
        self.dist_matrix = np.zeros((self.N, self.N))
        for i in range(self.N):
            for j in range(self.N):
                self.dist_matrix[i][j] = np.sum(np.abs(self.pontos[i] - self.pontos[j]) ** self.p) ** (1 / self.p)
                
        # Salvando na memória
        np.save('test_dist.npy', self.dist_matrix)
        """
        # Carregando pre computado da memória
        self.dist_matrix = np.load('test_dist.npy')
                
    # Constrói uma solução que possui raio, no máximo 2*r
    # Note que pode ser retornada uma lista de centros com tamanho > K
    def aprox_raio(self, r):
        centros = []
        ok = [False]*self.N
        i_lista = list(range(self.N))
        random.shuffle(i_lista)
        for i in range(len(i_lista)):
            if ok[i_lista[i]]:
                continue
            centros.append(i_lista[i])
            for j in range(i+1, self.N):
                if ok[i_lista[j]]:
                    continue
                if self.dist_matrix[i_lista[i]][i_lista[j]] <= 2*r:
                    ok[i_lista[j]] = True

        return centros

    # Retorna solução aproximada no máximo 2*ótimo com busca binária
    def aprox_refinamento(self, largura=1e-6):
        lo = 0
        hi = 0
        for i in range(len(pontos)):
            for j in range(i+1, len(pontos)):
                hi = max(hi, self.dist_matrix[i][j])

        largura_final = largura*(hi - lo)
        
        while abs(hi - lo) > largura_final:
            mid = (lo+hi)/2
            if len(self.aprox_raio(mid)) <= K:
                hi = mid
            else:
                lo = mid

        # Corrige re erro numérico causa raio ser muito pequeno
        r = (hi+lo)/2
        centros = self.aprox_raio(r)
        while len(centros) > K:
            centros = self.aprox_raio(r)
            r += largura
            
        # Adiciona um centro arbitrário se só encontrou 1 único
        # Isso é feito pois silhueta não funciona para só um centro
        if len(centros) == 1:
            centros.append((centros[0]+1)%self.N)
        
        return centros 

    # Retorna solução aproximada no máximo 2*ótimo com método incremental
    def aprox_incremental(self):
        if K >= self.N:
            return list(range(self.N))
        
        start = random.randint(0, self.N-1)
        centros = [ start ]
        
        falta = list(range(self.N))
        falta.remove(start)
        while len(centros) < self.K:
            idx = -1 
            mxdist_matrix = -1
            for i in falta:
                idist_matrix = 0
                for c in centros:
                    idist_matrix = max(idist_matrix, self.dist_matrix[i][c])
                if idist_matrix > mxdist_matrix:
                    idx = i
                    mxdist_matrix = idist_matrix

            centros.append(idx)
            falta.remove(idx)

        return centros
    
    def aprox_kmeans(self):
        kmeans = KMeans(n_clusters=self.K, n_init=10, random_state=random.randint(0, 1000))
        resposta = kmeans.fit_predict(self.pontos)
        centros = kmeans.cluster_centers_
        return [ centros, resposta ]
    
    def _calcular_particao(self, centros):
        resposta = [-1]*self.N
        for i in range(self.N):
            mn = 0
            for j in range(1, len(centros)):
                if self.dist_matrix[i][centros[j]] < self.dist_matrix[i][mn]:
                    mn = j
            resposta[i] = mn
        return resposta
    
    def _calcular_raio(self, resposta):
        raio = 0
        for i in range(self.N):
            c = resposta[i]
            if self.dist_matrix[i][c] > raio:
                raio = self.dist_matrix[i][c]
        return raio

    def teste_incremental(self):
        NUM_IT = 30
        tempo = np.zeros(NUM_IT)
        raio = np.zeros(NUM_IT)
        silhueta = np.zeros(NUM_IT)
        rand = np.zeros(NUM_IT)
        for i in range(NUM_IT):
            inicial = time.time()
            centros = self.aprox_incremental()
            resposta = self._calcular_particao(centros)
            final = time.time()
            
            tempo[i] = final - inicial
            raio[i] = self._calcular_raio(resposta)
            silhueta[i] = skl.metrics.silhouette_score(self.pontos, resposta)
            rand[i] = skl.metrics.adjusted_rand_score(self.labels, resposta)
            
            #print(f"tempo = {tempo[i]}, raio = {raio[i]}, silhueta = { silhueta[i] }, rand = {rand[i]}")
            #print()
            
        dados_tempo = [ np.mean(tempo), np.std(tempo) ]
        dados_raio = [ np.mean(raio), np.std(raio) ]
        dados_silhueta = [ np.mean(silhueta), np.std(silhueta) ]
        dados_rand = [ np.mean(rand), np.std(rand) ]
        
        print("Teste incremental feito:")
        print("\t\t Média \t\t\t Desvio padrão")
        print(f"tempo: \t\t {dados_tempo[0]} \t {dados_tempo[1]}")
        print(f"raio: \t\t {dados_raio[0]} \t {dados_raio[1]}")
        print(f"silhueta: \t {dados_silhueta[0]} \t {dados_silhueta[1]}")
        print(f"rand: \t\t {dados_rand[0]} \t {dados_rand[1]}")
        
    def teste_kmeans(self):
        NUM_IT = 30
        tempo = np.zeros(NUM_IT)
        raio = np.zeros(NUM_IT)
        silhueta = np.zeros(NUM_IT)
        rand = np.zeros(NUM_IT)
        for i in range(NUM_IT):
            inicial = time.time()
            centros, resposta = self.aprox_kmeans()
            final = time.time()
            
            tempo[i] = final - inicial
            raio[i] = self._calcular_raio(resposta)
            silhueta[i] = skl.metrics.silhouette_score(self.pontos, resposta)
            rand[i] = skl.metrics.adjusted_rand_score(self.labels, resposta)
            
            #print(f"tempo = {tempo[i]}, raio = {raio[i]}, silhueta = { silhueta[i] }, rand = {rand[i]}")
            #print()
            
        dados_tempo = [ np.mean(tempo), np.std(tempo) ]
        dados_raio = [ np.mean(raio), np.std(raio) ]
        dados_silhueta = [ np.mean(silhueta), np.std(silhueta) ]
        dados_rand = [ np.mean(rand), np.std(rand) ]
        
        print("Teste kmeans feito:")
        print("\t\t Média \t\t\t Desvio padrão")
        print(f"tempo: \t\t {dados_tempo[0]} \t {dados_tempo[1]}")
        print(f"raio: \t\t {dados_raio[0]} \t {dados_raio[1]}")
        print(f"silhueta: \t {dados_silhueta[0]} \t {dados_silhueta[1]}")
        print(f"rand: \t\t {dados_rand[0]} \t {dados_rand[1]}")
        
    def teste_refinamento(self, largura):
        NUM_IT = 30
        tempo = np.zeros(NUM_IT)
        raio = np.zeros(NUM_IT)
        silhueta = np.zeros(NUM_IT)
        rand = np.zeros(NUM_IT)
        for i in range(NUM_IT):
            inicial = time.time()
            centros = self.aprox_refinamento(largura)
            resposta = self._calcular_particao(centros)
            final = time.time()
            
            tempo[i] = final - inicial
            raio[i] = self._calcular_raio(resposta)
            silhueta[i] = skl.metrics.silhouette_score(self.pontos, resposta)
            rand[i] = skl.metrics.adjusted_rand_score(self.labels, resposta)
            
            #print(f"tempo = {tempo[i]}, raio = {raio[i]}, silhueta = { silhueta[i] }, rand = {rand[i]}")
            #print()
            
        dados_tempo = [ np.mean(tempo), np.std(tempo) ]
        dados_raio = [ np.mean(raio), np.std(raio) ]
        dados_silhueta = [ np.mean(silhueta), np.std(silhueta) ]
        dados_rand = [ np.mean(rand), np.std(rand) ]
        
        print(f"Teste refinamento com largura {largura:.0%} feito:")
        print("\t\t Média \t\t\t Desvio padrão")
        print(f"tempo: \t\t {dados_tempo[0]} \t {dados_tempo[1]}")
        print(f"raio: \t\t {dados_raio[0]:.15f} \t {dados_raio[1]}")
        print(f"silhueta: \t {dados_silhueta[0]} \t {dados_silhueta[1]}")
        print(f"rand: \t\t {dados_rand[0]} \t {dados_rand[1]}")

In [34]:
import dados

K, pontos, labels = dados.banknote_data()
E = experimento(K, pontos, labels, 1)

Tempo para calcular matrix de dist:  0.004670143127441406


In [35]:
E.teste_incremental()
print()
E.teste_kmeans()
print()

for largura in [ 0.01, 0.05, 0.10, 0.15, 0.20 ]:
    E.teste_refinamento(largura)
    print()

Teste incremental feito:
		 Média 			 Desvio padrão
tempo: 		 0.0030540307362874348 	 0.0003507827511791674
raio: 		 52.27014733333332 	 0.23403069178967642
silhueta: 	 0.4384136678423244 	 0.07876838427476952
rand: 		 0.09081044197793 	 0.03020081038645909

Teste kmeans feito:
		 Média 			 Desvio padrão
tempo: 		 0.046557044982910155 	 0.041289042383098176
raio: 		 52.61415766666666 	 0.46491647864738533
silhueta: 	 0.4322884268208396 	 5.551115123125783e-17
rand: 		 0.04853814205195779 	 1.3877787807814457e-17

Teste refinamento com largura 1% feito:
		 Média 			 Desvio padrão
tempo: 		 0.34612249533335365 	 0.01216141262103266
raio: 		 52.270147333333320 	 0.23403069178967642
silhueta: 	 0.38071084055224136 	 0.11106287915761945
rand: 		 0.11307243510675698 	 0.1287543551126146

Teste refinamento com largura 5% feito:
		 Média 			 Desvio padrão
tempo: 		 0.35073204040527345 	 0.01128133775732799
raio: 		 52.270147333333327 	 0.2340306917896764
silhueta: 	 0.4098861982797329 	 0.0762