# Sets iniciais do projeto.
Instalação de bibliotecas necessárias, imports e criação dos datasets de teste
| Data set | Classes | Dimension | Points |
|----------|---------|-----------|--------|
| g241c    | 2       | 241       | 1500   |
| g241d    | 2       | 241       | 1500   |
| Digit1   | 2       | 241       | 1500   |
| USPS     | 2       | 241       | 1500   |
| COIL     | 6       | 241       | 1500   |
| Text     | 2       | 11,960    | 1500   |


In [163]:
# Instalação das bibliotecas

# %pip install sslbookdata
# %pip install scikit-learn
# %pip install matplotlib
# %pip install networkx

In [164]:
# Importação das bibliotecas

from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import accuracy_score
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import sslbookdata
import matplotlib.pyplot as plt


In [165]:
# Upload dos datasets

datasets = {
    'USPS': sslbookdata.load_usps(0),
    'DIGIT': sslbookdata.load_digit1(0),
    'GC': sslbookdata.load_g241c(0),
    'GN': sslbookdata.load_g241n(0),
    # 'COIL': sslbookdata.load_coil2(0),
    # 'TEXT': sslbookdata.load_text(0)
}

In [166]:
# Função de PCA, redução da dimensionalidade
def analise_de_componente_principal(dataset, num_componentes_principais=50, plotar_imagem=False):
    scaler = StandardScaler()
    dados_normalizados = scaler.fit_transform(dataset)
    pca = PCA(n_components=num_componentes_principais)
    dados_reduzidos = pca.fit_transform(dados_normalizados)
    variancia_explicada_cumulativa = np.cumsum(pca.explained_variance_ratio_)
    if plotar_imagem:
        plt.plot(variancia_explicada_cumulativa)
        plt.xlabel('Número de Componentes Principais')
        plt.ylabel('Variância Explicada Cumulativa')
        plt.show()
    return dados_reduzidos

In [167]:
plotar_imagens = False

# Quais serão os passos da construção desse trabalho?
1) Geração de um grafo a partir do dataset
2) Difusão dos rótulos a partir do grafo
3) Resultados obtidos


# 1) Geração de um grafo a partir do dataset

O conceito abordado é a utilização dos dois primeiros passos do HDBSCAN* a fim de gerar uma árvore geradora mínima, esse é nosso objetivo

## Primeira etapa do HDBSCAN* - Computar core distance para todas instâncias do dataset em relação a um número mínimo de pontos

Conceito de Core Distance: Raio mínimo necessário para que uma instância qualquer X seja considerado um objeto core, tendo em vista um número mínimo de pontos próximos (incluindo o próprio ponto)

In [168]:
# Codigo que computa o core distance e retorna um array que contem os raios minimos 
# e as 3 instâncias que são as mais próximas de determinado ponto

def compute_core_distance(datasetToComputeDistance, min_samples):
    list_de_raios_minimos = []
    list_de_indices_vizinhos = []
    neighbors = NearestNeighbors(n_neighbors=min_samples).fit(datasetToComputeDistance)

    distances, indices = neighbors.kneighbors(datasetToComputeDistance)
    for i in range(len(datasetToComputeDistance)):
        list_de_raios_minimos.append(distances[i, -1])
        list_de_indices_vizinhos.append(indices[i])
    return list_de_raios_minimos, list_de_indices_vizinhos

In [169]:
# Print dos raios minimos, das 3 instancias mais proximas de cada ponto 
# e o dataset em 2d com os 3 pontos mais próximos da primeira instâncias

def plotGraphAndStats(dataset_name, datasetForPlot, listOfNeighboringIndices, listOfMinimumRadius):
    plt.figure()
    for i in range(1, len(datasetForPlot)):
        plt.scatter(datasetForPlot[i, 0], datasetForPlot[i, 1], c='b', marker='o', s=8)
    for i in listOfNeighboringIndices[0]:
        plt.scatter(datasetForPlot[i, 0], datasetForPlot[i, 1], c='r', marker='o', s=8)
    plt.scatter(datasetForPlot[0, 0], datasetForPlot[0, 1], c='r', marker='x', s=32, label='Instância 1')
    print(f'Raio mínimo para instância 1 ser um core: {listOfMinimumRadius[0]}')
    print(f'Lista das intâncias mais perto da instância 1: {listOfNeighboringIndices[0]}')
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.title(f'Instâncias do dataset {dataset_name}')
    plt.show()

In [170]:
def first_hdbscanstar_step(datasetTreino, numeroMinimoDePontos):
    lista_de_raios_minimos, listaDeIndicesVizinhos = compute_core_distance(datasetTreino, numeroMinimoDePontos)
    if plotar_imagens:
        plotGraphAndStats('USPS', datasetTreino, listaDeIndicesVizinhos, lista_de_raios_minimos)
    return lista_de_raios_minimos

## Segunda etapa do HDBSCAN* - Gerar uma árvore geradora mínima a partir do grafo de alcançabilidade mútua

Grafo de alcançabilidade mútua: grafo completo em que os pontos são os objetos do dataset e os pesos das areastas é a distância de alcançabilidade entre os pontos, dado os pontos x1 e x2 essa ditância é calculada pelo maior valor entre: CoreDistance(X1), CoreDistance(X2) e Distância(X1, X2)

In [171]:
# TEMOS QUE CRIAR UMA FUNÇÃO QUE CALCULA A DISTÂNCIA DE ALCANÇABILIDADE ENTRE DOIS PONTOS

def mutualReachabilityDistanceCalculation(datasetParaAlcancabilidade, numero_minimo_pontos, printarDistancias=False):
    distancia_alcancabilidade_mutua = []
    listaDeRaiosMinimos = first_hdbscanstar_step(datasetParaAlcancabilidade, numero_minimo_pontos)
    for i in range(len(datasetParaAlcancabilidade)):
        alcancabilidade_mutua = []
        for j in range(len(datasetParaAlcancabilidade)):
            if i != j:
                distancia = np.linalg.norm(datasetParaAlcancabilidade[i] - datasetParaAlcancabilidade[j])
                reachability_distance = max(distancia, listaDeRaiosMinimos[j])
                alcancabilidade_mutua.append(reachability_distance)
        distancia_alcancabilidade_mutua.append(alcancabilidade_mutua)

    if printarDistancias:
        print("Distância de Alcançabilidade Mútua:")
        for i in range(len(distancia_alcancabilidade_mutua)):
            for j in range(len(distancia_alcancabilidade_mutua[i])):
                print(f"Ponto {i} para Ponto {j}: {distancia_alcancabilidade_mutua[i][j]}")
                
    return distancia_alcancabilidade_mutua

In [172]:
# Criação de um grafo de alcançabilidade mutua

def createReachabilityGraph(mutual_reachability_distance):
    G = nx.Graph()
    num_instancias = len(mutual_reachability_distance)
    G.add_nodes_from(range(num_instancias))
    for i in range(len(mutual_reachability_distance)):
        for j in range(len(mutual_reachability_distance[i])):
            peso = mutual_reachability_distance[i][j]
            G.add_edge(i, j, weight=peso)
    return G


In [173]:
def plotWeightedGraph(graphForPlot, alreadyWithLabels=False):
    if alreadyWithLabels:
        node_colors = ['blue' if graphForPlot.nodes[node]['label'] == 1 else 'red' for node in graphForPlot.nodes()]
        blue_patch = plt.Line2D([0], [0], marker='o', color='w', label='Label 1', markerfacecolor='blue', markersize=10)
        red_patch = plt.Line2D([0], [0], marker='o', color='w', label='Label -1', markerfacecolor='red', markersize=10)
        plt.legend(handles=[blue_patch, red_patch])
    else:
        node_colors = 'lightblue'

    pos = nx.spring_layout(graphForPlot)
    labels = nx.get_edge_attributes(graphForPlot, 'weight')
    edge_labels = {e: f'{labels[e]:.2f}' for e in graphForPlot.edges}
    plt.figure(figsize=(20, 16))
    nx.draw(graphForPlot, pos, with_labels=False, node_size=40, node_color=node_colors, edge_color='green')
    nx.draw_networkx_edges(graphForPlot, pos)
    nx.draw_networkx_edge_labels(graphForPlot, pos, edge_labels=edge_labels, font_size=6)
    plt.title("Árvore Geradora Mínima")
    plt.show()

In [174]:
# Criação de uma árvore geradora minima a partir de um grafo

def createMinimumSpanningTree(baseGraph, printGraph=False):
    minimum_spanning_tree = nx.minimum_spanning_tree(baseGraph)
    if printGraph:
        plotWeightedGraph(minimum_spanning_tree)
    return minimum_spanning_tree


In [175]:
def graph_generation(datasetTreino, numero_minimo_de_pontos):
    distancia_alcancabilidade_mutua = mutualReachabilityDistanceCalculation(datasetTreino, numero_minimo_de_pontos)
    reachability_graph = createReachabilityGraph(distancia_alcancabilidade_mutua)
    mst = createMinimumSpanningTree(reachability_graph, plotar_imagens)
    return mst

# 2) Difusão dos rótulos a partir do grafo

Recebendo a MST do primeiro passo vamos usar um método de Gaussian Field Harmonic Function para propagar os 
rótulos

In [176]:
# Função para distribuir os rótulos no grafo

def assign_labels_to_graph(graph, labels, number_of_labels=10, assign_id=False, half_nodes=False):
    if half_nodes:
        positive_nodes = np.where(labels == 1)[0]
        negative_nodes = np.where(labels == -1)[0]

        random_positive_nodes = np.random.choice(positive_nodes, 5, replace=False)
        random_negative_nodes = np.random.choice(negative_nodes, 5, replace=False)
        random_nodes = np.concatenate([random_positive_nodes, random_negative_nodes])
    else:
        random_nodes = np.random.choice(graph.nodes, number_of_labels, replace=False)

    nx.set_node_attributes(graph, None, 'label')
    for node, label in zip(random_nodes, labels[random_nodes]):
        graph.nodes[node]['label'] = label
    if assign_id:
        nx.set_node_attributes(graph, dict(zip(graph.nodes, range(len(graph.nodes)))), 'ID')
    result_dict = {
        'random_nodes': random_nodes,
        'graph': graph,
        'labels': labels[random_nodes]
    }
    return result_dict

In [177]:
# Função que reordena as instâncias, colocando as rotuladas na frente

def reorder_nodes(graph, random_nodes):
    if any(node not in graph.nodes for node in random_nodes):
        raise ValueError("Um ou mais nós aleatórios não pertencem ao grafo.")
    remaining_nodes = [node for node in graph.nodes if node not in random_nodes]
    result_array = np.concatenate((random_nodes, remaining_nodes))
    return result_array

In [178]:
# Função do Gaussian Field Harmonic Function

def gfhf(ordemObjetos, yl, W):
  nRotulado = yl.shape[0]
  nNaoRotulado = W.shape[0]-nRotulado
  nObjetos = W.shape[0]

  D = np.zeros(W.shape)
  np.fill_diagonal(D, np.sum(W, axis=1))

  L= D - W

  L = L[ordemObjetos,:]
  L = L[:, ordemObjetos]

  matRotulado = L[0:nRotulado, 0:nRotulado]
  matNaoRotuladoRotulado = L[nRotulado:nObjetos, 0:nRotulado]
  matNaoRotulado = L[nRotulado:nObjetos, nRotulado:nObjetos]

  yl = np.asmatrix(yl)
  matNaoRotuladoRotulado = np.asmatrix(matNaoRotuladoRotulado)
  matNaoRotulado = np.asmatrix(matNaoRotulado)

  f= -np.linalg.inv(matNaoRotulado)*matNaoRotuladoRotulado*yl
  resultado = [0]*nObjetos

  for i in range(nRotulado):
    resultado[ordemObjetos[i]] = yl[i,0]

  ordemNaoRotulado = ordemObjetos[nRotulado:]

  for i in range(nNaoRotulado):
    resultado[ordemNaoRotulado[i]]= int(1*np.sign(f[i,0]))

  return resultado

In [179]:
def propagate_labels(mst, label_array, plotar_imagens=False):
    resultado_rotulado = assign_labels_to_graph(mst, label_array, number_of_labels=10, assign_id=True)
    random_nodes = sorted(resultado_rotulado['random_nodes'])
    
    ordem_objetos = reorder_nodes(mst, random_nodes)
    yl = resultado_rotulado['labels']
    # W = nx.linalg.graphmatrix.adjacency_matrix(mst).todense()
    W = nx.linalg.graphmatrix.adjacency_matrix(mst)

    for i in range(W.shape[0]):
        for j in range(W.shape[1]):
            if W[i, j] != 0:
                W[i, j] = 1


    resultado_GFHF = gfhf(ordem_objetos, yl, W)

    # if plotar_imagens:
        # print(f'Esses são os objetos rotulados: {random_nodes}')
        # np.savetxt('./txt-para-analise/ordem_objetos.txt', ordem_objetos, fmt='%d')
        # np.savetxt('./txt-para-analise/label_array.txt', label_array, fmt='%d')
        # np.savetxt('./txt-para-analise/yl.txt', yl, fmt='%d')
    # np.savetxt('./txt-para-analise/matriz_ajacencia.txt', W, fmt='%d')
        # np.savetxt('./txt-para-analise/resultado.txt', resultado_GFHF, fmt='%d')
    return resultado_GFHF

# 3) Resultados obtidos

In [180]:
numeroMinimoDePontos = 3
reduzir_dimensionalidade = False

def calcular_acuracia_media(dataset_treino, label_array, numero_minimo_de_pontos):
    acuracias = []
    for _ in range(5):
        primeiro_resultado = graph_generation(dataset_treino, numero_minimo_de_pontos)
        resultado_dos_rotulos = propagate_labels(primeiro_resultado, label_array)

        acuracia = accuracy_score(resultado_dos_rotulos, label_array)
        acuracias.append(acuracia)
    acuracia_media = sum(acuracias) / len(acuracias)
    return acuracia_media

for dataset_name, dataset_data in datasets.items():
    if reduzir_dimensionalidade and dataset_name != "TEXT":
        dataset_treino = analise_de_componente_principal(dataset_data['data'])
    dataset_treino = dataset_data['data']
    label_array = dataset_data['target']
    acuracia_media = calcular_acuracia_media(dataset_treino, label_array, numeroMinimoDePontos)
    error_rate = (1 - acuracia_media) * 100
    print(f"Average error rate for dataset {dataset_name}: {error_rate:.2f}")

# Como resolver Coil, 


Average error rate for dataset USPS: 28.84
Average error rate for dataset DIGIT: 55.04
Average error rate for dataset GC: 50.00
Average error rate for dataset GN: 49.60
