In [164]:
import math
import numpy as np

def parseRDD(point):
    """ Converte um ponto de metricas da base de dados para uma tupla (id, vetor de floats).
        Recebe um ponto onde o primeiro campo eh o id do vertice os 13 seguintes sao os valores para cada metrica
        e retorna uma tupla composta pelo id e pelas 13 metricas (vetor de floats).
    Args:
        point (str): uma string onde os termos estao separados por ',', sendo o primeiro campo o id do vertice e 
        os 13 seguintes os valores para cada metrica
    Returns:
        (id, []): uma tupla composta pelo id do vertice e uma lista dos valores de 13 metricas
    """
    data = point.split(';')
    floatMetrics = [float(i) for i in data[1:]]
    return (data[0], floatMetrics)

def isNotZero(parsedPoint):
    """ Retorna true se o ponto contem alguma metrica diferente de 0.
    Args:
        parsedPoint (str, []): uma tupla composta pelo id do vertice e lista de metricas
    Returns:
        bool: True se a lista contém pelo menos um valor nao nulo ou False se a somatoria da lista eh nula
    """
    return sum(parsedPoint[1]) > 0

def normalize(parsedPoint, means, standardDeviations, maxs, mins, normalizationType):
    """ Normaliza um ponto. Recebe um ponto cujas valores maximo e minimo para as metricas 
        podem ser muito amplos e retorna um ponto cujas metricas estao entre 0 e 1.0.
    Args:
        parsedPoint (str, []): uma tupla composta pelo id do vertice e lista de metricas
        means ([]): lista de medias das metricas
        standardDeviations (list): lista de desvios-padrao das metricas
        maxs ([]): lista de maximos das metricas
        mins ([]): lista de minimos das metricas
        normalizationType (str): tipo de normalizacao ('reescaling' | 'standard_score')
    Returns:
        (str, []): uma tupla de id (str) e metricas normalizadas ([])
    """
    nodeId = parsedPoint[0]
    metrics = parsedPoint[1]
    numberOfMetrics = len(means) # numero de metricas == numero de medias == numero de desvios-padrao
    if normalizationType == 'standard_score':
        normalizedMetrics = [(metrics[i] - means[i])/standardDeviations[i] for i in range(numberOfMetrics)]
    elif normalizationType == 'reescaling':
         normalizedMetrics = [(metrics[i] - mins[i])/(maxs[i]-mins[i]) for i in range(numberOfMetrics)]
    return (nodeId, normalizedMetrics)

def euclidianDistance(pointA, pointB):
    """ Calcula a distancia euclidiana entre dois pontos. Recebe dois pontos e retorna a distancia
        euclidiana entre eles.
    Args:
        pointA ([]): lista de floats
        pointB ([]): lista de floats
    Returns:
        float: a distancia entre dois pontos
    """
    numberOfMetrics = len(pointA) # numero de metricas de A e B eh igual a 13
    squaredDifferenceBetweenMetrics = [math.pow(pointA[i] - pointB[i], 2) for i in range(numberOfMetrics)]
    return math.sqrt(sum(squaredDifferenceBetweenMetrics))

def chooseRandomPoints(points, numberOfPoints):
    """ Escolhe um ponto aleatorio dentre uma lista de pontos. 
        Recebe uma lista de pontos e retorna um ponto dessa lista.
    Args:
        points (RDD): RDD de pontos
        numberOfPoints (int): numero de pontos a serem escolhidos
    Returns:
        points ([[]]): uma lista de lista de metricas representando n pontos
    """
    return points.takeSample(False, numberOfPoints)
    
def areCentroidsDifferent(pointA, pointB):
    """ Verifica se dois pontos sao diferentes, com base nas listas de metricas.
        Recebe dois pontos e returna True ou False.
    Args:
        pointA: um ponto do tipo [], em que contem metricas do tipo float
        ponttB: um ponto do tipo [], em que contem metricas do tipo float
    Returns:
        bool: True se pointA[i] != pointB[i] para todo i ou False caso contrario
    """
    for i in range(len(pointA)):
        if pointA[i] != pointB[i]:
            return True
    return False
        
def kmeans(data, k, iteractions):
    """ Base do bisect-kmeans. Recebe uma RDD de dados, um k=2 e o numero de iteracoes maximo e retorna dois clusters.
    Args:
        data (RDD): RDD de pontos do tipo (str, [])
        k (int): numero de clusters a serem gerados. Padrao para bisect é 2 por vez
        iteractions (int): numero de interacoes maximo
    Returns:
        clusters (RDD): um RDD contendo dois clusters. 
                        Cada ponto eh mapeado para (idClusters, idPonto, metricas)
                        idClusters eh ou 0 ou 1
                        metricas eh uma lista de floats
        centroids ([[]]): lista de listas de metricas representando os centroides
    """
    # seleciona k pontos aleatorios e atribui a lista inicial de centroides
    centroids = [p[1] for p in chooseRandomPoints(data, k)] 
    
    # roda kmeans ateh o limite de iteractions
    for i in range(iteractions):
        print('\tK-MEANS %s de %s' %(i+1, iteractions))
        # calcula distancia entre cada ponto (segundo elemento da tupla) e os centroides, e atribui a cada ponto o id do centroide cuja distanca eh menor
        clustersRDD = data.map(lambda x:(np.argmin([euclidianDistance(x[1], c) for c in centroids]), x[0], x[1]))
        
        # refaz lista de centroides, calculando o ponto medio dos pontos de cada cluster
        newCentroids = (clustersRDD
                        .map(lambda x:(x[0], [x[2],1])) # mapeia cada ponto para (indice do centroide, [vetor de metricas, 1])
                        .reduceByKey(lambda x,y:([(np.array(x[0]) + np.array(y[0])),(x[1] + y[1])])) # reduz conjunto de pontos com mesmo indice do centroide para a somatoria dos vetores de metricas
                        .map(lambda x:list(np.array(x[1][0])/(x[1][1]))) # divide a somatoria dos vetores pelo numero de pontos
                       ).collect()

        # caso numero de centroides mude, seleciona centroides faltantes de forma aleatoria
        if len(newCentroids) != len(centroids):
            diff = len(centroids) - len(newCentroids)
            newCentroids.extend([p[1] for p in chooseRandomPoints(data, diff)])
        
        # verifica se centroides mudaram
        centroidsHaveChanged = [areCentroidsDifferent(centroids[i], newCentroids[i]) for i in range(len(centroids))]
        
        if True not in centroidsHaveChanged:
            return clustersRDD, centroids
        else:
            centroids = newCentroids
    return clustersRDD, centroids

def getClustersByCount(cluster1, cluster2):
    """ Verifica qual dos clusters possui mais elementos e os retorna ordenado pela quantidade crescente de elementos.
    Args:
        cluster1 (RDD): RDD contendo pontos cujas chaves sao 0
        cluster2 (RDD): RDD contendo pontos cujas chaves sao 1
    Returns:
        clusterMaior (RDD), clusterMenor (RDD): retorna o maior e o menor dos clusters, nesta ordem
    """
    if cluster1.count() > cluster2.count():
        return cluster1, cluster2
    else:
        return cluster2, cluster1
    
def getSSE(cluster, centroid):
    """ Calcula a soma das distancias entre cada ponto e o centroide do cluster.
        Recebe um cluster e retorna a somatoria das distancias euclidianas de cada ponto ao centroide do cluster.
    Args:
        cluster (RDD): RDD contendo pontos
        centroid ([]): centroide do cluster
    Returns:
        sse (float): a somatoria das distancias euclidianas entre cada ponto e o centroide
    """
    return cluster.map(lambda x:euclidianDistance(x[2], centroid)).reduce(lambda x,y:x+y)
    
def bisectKmeans(data, k, bisects, iteractions):
    """ Algoritmo principal do bisect-kmeans. Recebe uma RDD de dados, o numero de clusters a serem gerados,
        o numero de iteracoes maximo do subalgoritmo kmeans e um boleano indicando se pode haver clusters vazios
    Args:
        data (RDD): RDD de pontos do tipo (idPonto, metricas), onde idPonto eh um str e metricas eh uma lista de floats
        k (int): numero de clusters
        bisects (int): numero de bifurcacoes
        iteractions (int): numero de iteracoes do k-means
    Returns:
        finalClusters ([]): lista de pontos do tipo (idPonto, metricas), 
                            len(finalClusters) eh garantido ser k, se forceAllClusterWithPoints for True
                            cada cluster contem x pontos
    """
    finalClusters = [data]
    while(len(finalClusters) != k):
        # coleta ultimo cluster para usar no k-means
        clusterToSplit = finalClusters.pop()   
        
        # lista de somatorios de erros quadraticos
        sse = []
        
        # lista temporaria de clusters
        tmpClusters = []
        
        # gera dois clusters bisect vezes e calcula os sseTotal para cada par de clusters
        for i in range(bisects):
            print('BISECT %s de %s' %(i+1, bisects))
            clustersRDD, centroids = kmeans(data=clusterToSplit, k=2, iteractions=iteractions)
            tmpClusters.append(clustersRDD.filter(lambda x:x[0] == 0))
            tmpClusters.append(clustersRDD.filter(lambda x:x[0] == 1))
            sse1 = getSSE(tmpClusters[-2], centroids[0])
            sse2 = getSSE(tmpClusters[-1], centroids[1])
            sse.append(sse1 + sse2)
        
        # coleta indice do menor sseTotal
        minSseIndex = np.argmin(sse)
        
        # ordena do maior para o menor os dois clusters cuja sseTotal eh menor
        largerCluster, minorCluster = getClustersByCount(tmpClusters[minSseIndex*2], tmpClusters[minSseIndex*2 + 1])

        # adiciona a lista de clusters o menor, depois o maior
        finalClusters.append(minorCluster.map(lambda x:(x[1], x[2])))
        finalClusters.append(largerCluster.map(lambda x:(x[1], x[2]))) 
        print('--------\nK %s de %s\n--------' %(len(finalClusters), k))
    return finalClusters

time: 37.6 ms


In [152]:
fileName = ('metricas_t.csv')
rawRDD = sc.textFile(fileName)
metricsHeader = rawRDD.take(1)[0]
metricsRDD = (rawRDD
              .filter(lambda x: x != metricsHeader)
              .map(lambda x:parseRDD(x))
              .filter(lambda x: isNotZero(x))
             )

time: 76.8 ms


In [149]:
means = []
maxs = []
mins = []
stdevs = []
for i in range(13):
    metricI = metricsRDD.map(lambda x: x[1][i])
    maxs.append(metricI.max())
    mins.append(metricI.min())
    means.append(metricI.mean())
    stdevs.append(metricI.stdev())

time: 4.31 s


In [142]:
normalizedMetricsRDD = metricsRDD.map(lambda x:normalize(x, means, stdevs, maxs, mins, 'standard_score'))

time: 1.92 ms


In [165]:
clusters = bisectKmeans(data=normalizedMetricsRDD, iteractions=5, k=4, bisects=6)

BISECT 1 de 6
	K-MEANS 1 de 5
	K-MEANS 2 de 5
	K-MEANS 3 de 5
	K-MEANS 4 de 5
	K-MEANS 5 de 5
BISECT 2 de 6
	K-MEANS 1 de 5
	K-MEANS 2 de 5
	K-MEANS 3 de 5
	K-MEANS 4 de 5
	K-MEANS 5 de 5
BISECT 3 de 6
	K-MEANS 1 de 5
	K-MEANS 2 de 5
	K-MEANS 3 de 5
	K-MEANS 4 de 5
	K-MEANS 5 de 5
BISECT 4 de 6
	K-MEANS 1 de 5
	K-MEANS 2 de 5
	K-MEANS 3 de 5
	K-MEANS 4 de 5
	K-MEANS 5 de 5
BISECT 5 de 6
	K-MEANS 1 de 5
	K-MEANS 2 de 5
	K-MEANS 3 de 5
	K-MEANS 4 de 5
	K-MEANS 5 de 5
BISECT 6 de 6
	K-MEANS 1 de 5
	K-MEANS 2 de 5
	K-MEANS 3 de 5
	K-MEANS 4 de 5
	K-MEANS 5 de 5
--------
K 2 de 4
--------
BISECT 1 de 6
	K-MEANS 1 de 5
	K-MEANS 2 de 5
	K-MEANS 3 de 5
BISECT 2 de 6
	K-MEANS 1 de 5
	K-MEANS 2 de 5
	K-MEANS 3 de 5
	K-MEANS 4 de 5
	K-MEANS 5 de 5
BISECT 3 de 6
	K-MEANS 1 de 5
	K-MEANS 2 de 5
	K-MEANS 3 de 5
	K-MEANS 4 de 5
	K-MEANS 5 de 5
BISECT 4 de 6
	K-MEANS 1 de 5
	K-MEANS 2 de 5
	K-MEANS 3 de 5
	K-MEANS 4 de 5
	K-MEANS 5 de 5
BISECT 5 de 6
	K-MEANS 1 de 5
	K-MEANS 2 de 5
	K-MEANS 3 de 5
	K-

In [166]:
tamanhos = []
for index, c in enumerate(clusters):
    tamanhos.append(c.count())
    print('cluster %s possui %s pontos' %(index + 1, tamanhos[index]))
#     c.saveAsTextFile('12threads_i5k10b3/%s' %index)
    
print('------------------------\ntotal possui %s pontos' %sum(tamanhos))

cluster 1 possui 429 pontos
cluster 2 possui 655 pontos
cluster 3 possui 177 pontos
cluster 4 possui 723 pontos
------------------------
total possui 1984 pontos
time: 383 ms
