# Approximated Harmonic Centrality

### Libraries import & graph retrieval

In [1]:
import pickle               # to read the Amazon DiGraph
import networkx as nx
import pandas as pd
import numpy as np
import time
import gc

graph_path = "../data/processed/amazon_graph.pickle"
with open(graph_path, "rb") as f:
    G = pickle.load(f)

---

## Hyperball Algorithm

HyperBall è l'algoritmo che usa i contatori HLL per calcolare la centralità su tutto il grafo in modo efficiente. L'idea geniale è che non serve fare una visita (BFS) per ogni nodo. Si possono "muovere" i contatori.

Il Funzionamento Passo-Passo:
1. Inizializzazione ($t=0$): Ogni nodo ha un contatore HLL che contiene solo se stesso. Quindi ogni nodo sa di poter raggiungere 1 persona (se stesso) a distanza 0.
2. Iterazione ($t=1, 2, \dots$): Immagina che ogni nodo dica ai suoi vicini: "Ehi, questi sono tutti i nodi che io so raggiungere". In termini tecnici: il contatore HLL di un nodo $x$ viene aggiornato facendone l'unione con i contatori HLL dei suoi vicini. Grazie alla matematica degli HLL, l'unione è un'operazione velocissima (massimizzazione dei registri).
3. Stima delle Distanze: Al passo $t$, il contatore del nodo $x$ stima quanti nodi sono raggiungibili in $\le t$ passi (chiamiamo questa stima $N_t$). Al passo precedente ($t-1$), sapevi quanti erano raggiungibili in $\le t-1$ passi ($N_{t-1}$). La differenza $N_t - N_{t-1}$ ti dice approssimativamente quanti nodi sono esattamente a distanza $t$.
4. Calcolo della Centralità: Sapendo quanti nodi sono a distanza $t$, puoi aggiungere quel numero moltiplicato per $\frac{1}{t}$ al totale della tua Harmonic Centrality.

### Dettaglio del funzionamento di HyperBall
Quando hai pochi nodi (1, 10, 100), l'algoritmo funziona in modo diverso rispetto a quando ne hai milioni. Non guarda tanto l'altezza degli zeri, ma **quanti registri sono rimasti a zero**.

Immagina di avere **1024 secchi** (i registri) allineati in un campo sotto la pioggia.
Ogni nodo è una goccia di pioggia che cade in un secchio casuale (determinato dai primi 10 bit dell'hash).

* **1 Nodo (1 goccia):** Bagna 1 secchio. 1023 secchi sono asciutti (valore 0).
* **2 Nodi (2 gocce):** Bagnano probabilmente 2 secchi diversi. 1022 secchi asciutti.
* **100 Nodi (100 gocce):** Bagnano circa 90-95 secchi. ~930 secchi sono ancora asciutti.

**La matematica della Media Armonica ():**
La formula che usi (`sum(2**-M)`) è sensibilissima ai valori bassi.

* Se un registro è vuoto (valore 0), contribuisce alla somma con .
* Se un registro ha un valore alto (es. 10), contribuisce pochissimo ().

Se hai 1 nodo "fortunato" che finisce nel registro #5 con valore 10, ma gli altri 1023 registri sono vuoti (valore 0), la somma sarà:



*(Nota: senza una correzione specifica chiamata "Linear Counting", l'HLL puro sovrastima un po' i numeri piccolissimi, ma l'ordine di grandezza rimane contenuto grazie alla massa di registri a zero).*

Quindi: **Finché ci sono tanti registri a zero, la stima rimane bassa**, indipendentemente da quanto è "fortunato" quel singolo nodo.

### 2. La transizione ai Grandi Numeri

Man mano che i nodi aumentano, i secchi si riempiono tutti. Non ci sono più registri a zero.
A questo punto, l'algoritmo smette di basarsi sui "vuoti" e inizia a basarsi sulla **rarità**.

* Con 10.000 nodi, è probabile che in ogni secchio sia caduta almeno una goccia.
* Ora guardiamo **dentro** ogni secchio: qual è la goccia "più rara" (più leading zeros) che è caduta qui?

È qui che entra in gioco la tua intuizione: su 1024 registri, la media dei valori massimi salirà statisticamente in modo molto preciso all'aumentare dei nodi unici.

### 3. Perché la Media Armonica?

L'uso della media armonica (invece di quella aritmetica) serve proprio a **proteggersi dalla fortuna**.

* **Media Aritmetica:** Se su 1024 studenti, 1023 hanno 0 euro e uno ha 1 milione di euro, la media aritmetica dice che sono tutti ricchi (~1000€ a testa). Sbagliato.
* **Media Armonica:** È pesantemente influenzata dai valori bassi. Nello stesso esempio, la media armonica direbbe che la ricchezza è vicina a 0.

Nell'HyperBall:
Se un nodo ha un hash rarissimo (30 zeri) ma è l'unico nodo che ho incontrato, gli altri 1023 registri a zero "tirano giù" la stima con una forza enorme, impedendo che quel singolo evento fortunato faccia schizzare la cardinalità stimata a miliardi.

### 4. Precisione e Limiti

Non è *perfetto*. È una stima probabilistica.
Con  registri (), l'errore standard è circa:

Significa che se il numero reale è 100, l'algoritmo potrebbe dirti 97 o 103. Se è 1.000.000, potrebbe dirti 1.030.000.
Per calcolare la centralità in un grafo, questo errore è assolutamente accettabile perché ti interessa l'ordine di grandezza e la classifica relativa dei nodi, non il numero atomico esatto.


### Versione 1: calcolo su CPU usando dizionari e HyperLogLog

In [2]:
from datasketch import HyperLogLog
import copy

def harmonic_v1_CPU(G, p=10):
    # =========================================================================
    # FASE 1: PREPARAZIONE E INIZIALIZZAZIONE
    # =========================================================================
    # Per calcolare la centralità "in entrata" (quanto sono importante),
    # dobbiamo contare chi può raggiungere ME. HyperBall propaga "in avanti",
    # quindi lavoriamo sul grafo trasposto (archi invertiti).
    print(f"--- FASE 1: Inversione grafo e Inizializzazione HLL (p={p}) ---")
    G_rev = G.reverse()
    nodes = list(G_rev.nodes())

    # =========================================================================
    # FASE 2: COSTRUZIONE DELLA MATRICE DI CONTATORI
    # =========================================================================

    # Dizionario per i contatori attuali: {nodo: HyperLogLog}
    # Inizialmente ogni nodo conosce solo se stesso (distanza 0).
    counters = {}
    for node in nodes:
        hll = HyperLogLog(p=p)
        # HLL richiede input in bytes. Convertiamo l'ID del nodo.
        node_id_encoded = str(node).encode('utf-8')
        # Aggiungo il nodo stesso al contatore (inizialmente l'insieme dei nodi raggiungibili contiene solo se stesso
        hll.update(node_id_encoded)
        # Aggiungo il contatore al dizionario che associa ogni nodo ad un contatore HyperLogLog
        counters[node] = hll

    # Dizionari per memorizzare i risultati
    # per ogni nodo in nodes, aggiungi al dizionario la coppia chiave-valore (node:0.0)
    harmonic_centrality = {node: 0.0 for node in nodes}

    # Memorizziamo la cardinalità al passo precedente (N_{t-1}).
    # Al tempo t=0, ogni nodo raggiunge solo se stesso, quindi count = 1.
    prev_cardinality = {node: 1.0 for node in nodes}

    print("Inizializzazione completata.")

    # =========================================================================
    # FASE 2: LOOP PRINCIPALE (Espansione della 'Palla') [cite: 771, 778]
    # =========================================================================
    # Iteriamo per t = 1, 2, ... fino a che le stime non cambiano più (stabilizzazione).
    # t rappresenta la distanza (raggiungibili in t passi).
    t = 0
    changed = True

    while changed:
        t += 1
        changed = False
        print(f"--- Inizio Iterazione t={t} ---")
        start_time = time.time()

        # Buffer per i nuovi contatori del passo t
        next_counters = {}

        # =====================================================================
        # FASE 3: UNIONE DEI CONTATORI (Propagazione) [cite: 775, 780]
        # =====================================================================
        # Logica: I nodi che posso raggiungere in t passi sono l'unione di:
        # 1. Quelli che raggiungevo già (me stesso e i vecchi percorsi)
        # 2. Quelli che raggiungono i miei vicini al passo t-1.

        for node in nodes:
            # Copiamo il contatore attuale del nodo (stato t-1)
            # NOTA: copy è necessario perché HLL è mutabile
            hll_new = copy.copy(counters[node])

            # Uniamo con i contatori dei vicini (successori nel grafo trasposto, G_rev.neighbors() = G.successors())
            # Questo simula il passaggio di informazioni "indietro" nel grafo originale
            neighbors = list(G_rev.neighbors(node))

            if neighbors:
                for neighbor in neighbors:
                    # L'operazione di unione HLL è molto veloce (bit-wise OR dei registri)
                    hll_new.merge(counters[neighbor])

            # Salviamo il nuovo stato
            next_counters[node] = hll_new

        # =====================================================================
        # FASE 4: AGGIORNAMENTO METRICA (Calcolo Harmonic)
        # =====================================================================
        # Formula: H(x) = sum [ (N_t - N_{t-1}) / t ]
        # Dove (N_t - N_{t-1}) è la stima dei nodi trovati ESATTAMENTE a distanza t.

        current_change_count = 0

        for node in nodes:
            old_count = prev_cardinality[node]
            new_count = next_counters[node].count()

            # Se la stima è aumentata, abbiamo trovato nuovi nodi a distanza t
            if new_count > old_count:
                delta = new_count - old_count

                # Aggiungiamo il contributo alla centralità armonica
                harmonic_centrality[node] += delta * (1.0 / t)

                # Aggiorniamo la memoria per il prossimo passo
                prev_cardinality[node] = new_count

                # Segnaliamo che c'è stato un cambiamento nel sistema
                changed = True
                current_change_count += 1

        # =====================================================================
        # FASE 5: CHIUSURA ITERAZIONE E CONTROLLO CONVERGENZA [cite: 833]
        # =====================================================================
        # Scambiamo i buffer: i contatori 'next' diventano quelli attuali per il prossimo t
        counters = next_counters

        elapsed = time.time() - start_time
        print(f"Fine t={t}. Nodi aggiornati: {current_change_count}. Tempo: {elapsed:.2f}s")

        # Sicurezza per evitare loop infiniti in grafi patologici,
        # anche se HLL converge tipicamente entro il diametro effettivo del grafo.
        if t > 1000: # Cutoff arbitrario, aumentabile
            print("Raggiunto limite massimo iterazioni.")
            break

    return pd.DataFrame(data = list(harmonic_centrality.items()),
                        columns=['ASIN', 'HarmonicCentrality'])

---

### Versione 2: CPU (matrici numPy e pre-allocazione buffer)

In [3]:
import hashlib

def harmonic_v2_CPU(G, p=10):

    # DEFINIZIONE STRUTTURE BASE PER TRATTARE IL GRAFO NETWORKX
    print(f"--- FASE 1: Setup CPU e Hashing ---")

    G_rev = G.reverse()
    nodes = list(G_rev.nodes())
    n_nodes = len(nodes)

    m = 1 << p
    """
    Numero di registri che compongono ciascun contatore: m = 2^p
    """

    node_to_idx = {node: i for i, node in enumerate(nodes)}
    """
    Mappatura nodo -> indice 0..N-1: dizionario del tipo (k, v) = (node, i), con i risultato dell'enumerazione dell' array di nodi
    """

    edges = np.array([(node_to_idx[u], node_to_idx[v]) for u, v in G_rev.edges()], dtype=np.int32)
    """
    Array di coppie (u_index, v_index), una per ogni edge del tipo (u, v) in G_rev
    """

    # DEFINIZIONE MATRICE DEI CONTATORI
    M_A = np.zeros((n_nodes, m), dtype=np.uint8)
    """
    Matrice [n_nodes x m] dei contatori;
    Dato che un registro deve contenere il numero di leading zeroes di un hash (64 bits), il massimo valore inseribile in ciascun registro sara' < 64 (dato che i primi b bits dell' hash servono a individuare il registro corretto tra gli m), quindi 1 byte (uint8) e' sufficiente.
    """

    # CALCOLO DEGLI HASH
    print("Calcolo hash iniziali su CPU...")
    for i, node in enumerate(nodes):
        # Hash del nodo (crea hash con algoritmo md5 e trasforma il risultato in stringa hex, poi converte in intero a partire da base 16 verso base 10)
        h = int(hashlib.md5(str(node).encode('utf8')).hexdigest(), 16)

        # AND binario tra l' hash h e il numero m-1 = (2^10 - 1) = 1023 = sequenza di zeri seguiti da 10 valori '1'
        # il risultato corrisponde agli ultimi 10 bit di h, che selezionano il registro in cui scrivere il numero di leading zeroes
        j = h & (m - 1)

        # Right shift per rimuovere da h gli ultimi 10 bit estratti in precedenza
        w = h >> p

        # Conteggio del numero di trailing zeroes della porzione di hash rimanente (+1)
        rho = 1
        while (w & 1) == 0 and rho < 32: # while l'ultimo bit di w è uno '0':
            w >>= 1
            rho += 1
        M_A[i, j] = rho

    # STRUTTURE DATI HYPERBALL E PREALLOCAZIONE
    M_B = M_A.copy()
    M_float = np.empty((n_nodes, m), dtype=np.float32)
    sources = edges[:, 1] # Array monodimensionale ottenuto prendendo SOLO (tutta) la colonna 1 di edges
    targets = edges[:, 0] # Array monodimensionale ottenuto prendendo SOLO (tutta) la colonna 0 di edges

    # Coppia di arrays per memorizzare le cardinalità al passo t-1 e t
    # Inizialmente ogni nodo ha cardinalità 1 (se stesso)
    prev_cardinality = np.ones(n_nodes, dtype=np.float32)
    harmonic_centrality = np.zeros(n_nodes, dtype=np.float32)

    alpha_m = 0.7213 / (1 + 1.079 / m)
    factor = alpha_m * (m ** 2)

    # PULIZIA RAM
    del G_rev, edges
    gc.collect()

    print("Dati pronti. Inizio loop CPU.")


    # -------------------------------------------------- #
    # LOOP PRINCIPALE HYPERBALL                          #
    # -------------------------------------------------- #
    t = 0
    changed = True

    while changed:
        # Time
        start_time = time.time()

        t += 1
        changed = False

        m_src = M_A if t % 2 == 1 else M_B
        m_target = M_B if t % 2 == 1 else M_A

        # PROPAGAZIONE IN AVANTI DEI CONTATORI
        m_target[:] = m_src[:]
        if len(sources) > 0:
            np.maximum.at(m_target, targets, m_src[sources])

        # Conversione in float sfruttando la memoria già allocata (M_float)
        np.copyto(M_float, m_target, casting='safe')

        # PRIMA STIMA - MEDIA ARMONICA (in-place)
        np.multiply(M_float, -1.0, out=M_float)
        np.power(2.0, M_float, out=M_float)
        sum_regs = np.sum(M_float, axis=1)
        estimate_raw = factor / sum_regs

        # CORREZIONE DELLA STIMA TRAMITE LINEAR COUNTING
        estimate = estimate_raw.copy()
        num_zeros = np.sum(m_target == 0, axis=1) # conta quanti registri sono rimasti a 0 per ciascun nodo
        threshold = 2.5 * m
        mask_lc = (estimate_raw <= threshold) & (num_zeros > 0) # Maschera booleana che determina quali nodi devono usare la correzione linear counting

        if np.any(mask_lc):
            # Formula: m * log(m / V)
            # Calcoliamo solo per i nodi nella maschera
            V = num_zeros[mask_lc].astype(np.float32)
            estimate[mask_lc] = m * np.log(m / V)

        # VERIFICA MODIFICHE RISPETTO ALLA STIMA DI CARDINALITA' PRECEDENTE
        diff = estimate - prev_cardinality
        mask_changed = diff > 0.001

        if np.any(mask_changed):
            changed = True
            harmonic_centrality[mask_changed] += diff[mask_changed] * (1.0 / t)
            prev_cardinality[mask_changed] = estimate[mask_changed]

        # Print finale
        elapsed = time.time() - start_time
        active_nodes = int(np.sum(mask_changed))
        print(f"Fine t={t}. Tempo CPU: {elapsed:.4f}s. Nodi attivi: {active_nodes}")

        if t > 1000:
            break

    # =========================================================================
    # FASE 4: RECUPERO RISULTATI
    # =========================================================================
    print("Calcolo finito. Restituzione dati...")
    return pd.DataFrame({
        'ASIN': nodes,
        'HarmonicCentrality': harmonic_centrality
    })


In [4]:
from numba import njit, prange

# [NUOVA] Funzione compilata con Numba per la propagazione ultra-veloce.
# L'uso di prange e parallel=True permette di usare tutti i core della CPU.
@njit(parallel=True, cache=True)
def numba_propagate(sources, targets, m_src, m_target, n_edges, m):
    # Inizializziamo il target con i valori correnti del sorgente (in-place)
    # Questa parte è seriale ma veloce in Numba
    for i in range(m_target.shape[0]):
        for j in range(m):
            m_target[i, j] = m_src[i, j]

    # Propagazione dei massimi attraverso gli archi
    # Parallelizziamo sugli archi per massimizzare il throughput
    for e in prange(n_edges):
        u = sources[e]
        v = targets[e]
        for j in range(m):
            # Operazione di massimo: se il registro del vicino è maggiore, aggiorna
            if m_src[u, j] > m_target[v, j]:
                m_target[v, j] = m_src[u, j]

def harmonic_v2_CPU_optimized(G, p=10):
    print(f"--- FASE 1: Setup CPU e Hashing ---")
    G_rev = G.reverse()
    nodes = list(G_rev.nodes())
    n_nodes = len(nodes)
    m = 1 << p

    node_to_idx = {node: i for i, node in enumerate(nodes)}
    edges = np.array([(node_to_idx[u], node_to_idx[v]) for u, v in G_rev.edges()], dtype=np.int32)

    # Usiamo uint8 (1 byte) per risparmiare RAM
    M_A = np.zeros((n_nodes, m), dtype=np.uint8)

    print("Calcolo hash iniziali...")
    for i, node in enumerate(nodes):
        h = int(hashlib.md5(str(node).encode('utf8')).hexdigest(), 16)
        j = h & (m - 1)
        w = h >> p
        rho = 1
        while (w & 1) == 0 and rho < 32:
            w >>= 1
            rho += 1
        M_A[i, j] = rho

    print(f"--- FASE 2: Pre-allocazione Buffer ---")
    M_B = M_A.copy()
    M_float = np.empty((n_nodes, m), dtype=np.float32)

    sources = edges[:, 1]
    targets = edges[:, 0]
    n_edges = len(sources)

    prev_cardinality = np.ones(n_nodes, dtype=np.float32)
    harmonic_centrality = np.zeros(n_nodes, dtype=np.float32)
    factor = (0.7213 / (1 + 1.079 / m)) * (m ** 2)

    del G_rev, edges
    gc.collect()

    # =========================================================================
    # FASE 3: LOOP PRINCIPALE
    # =========================================================================
    t = 0
    changed = True

    while changed:
        t += 1
        changed = False
        start_time = time.time()

        m_src = M_A if t % 2 == 1 else M_B
        m_target = M_B if t % 2 == 1 else M_A

        # [MODIFICATA] Sostituito np.maximum.at con la funzione Numba.
        # Questo elimina la creazione di matrici temporanee e usa il parallelismo.
        if n_edges > 0:
            numba_propagate(sources, targets, m_src, m_target, n_edges, m)
        else:
            m_target[:] = m_src[:]

        # Conversione in float in-place
        np.copyto(M_float, m_target, casting='safe')

        # Calcolo potenze in-place
        np.multiply(M_float, -1.0, out=M_float)
        np.power(2.0, M_float, out=M_float)

        sum_regs = np.sum(M_float, axis=1)
        estimate_raw = factor / sum_regs

        # Conteggio zeri e correzione Linear Counting
        num_zeros = np.sum(m_target == 0, axis=1)
        estimate = estimate_raw.copy()

        mask_lc = (estimate_raw <= 2.5 * m) & (num_zeros > 0)
        if np.any(mask_lc):
            V = num_zeros[mask_lc].astype(np.float32)
            estimate[mask_lc] = m * np.log(m / V)

        # Verifica modifiche
        diff = estimate - prev_cardinality
        mask_changed = diff > 0.001

        if np.any(mask_changed):
            changed = True
            harmonic_centrality[mask_changed] += diff[mask_changed] * (1.0 / t)
            prev_cardinality[mask_changed] = estimate[mask_changed]

        elapsed = time.time() - start_time
        print(f"t={t} | Tempo CPU: {elapsed:.2f}s | Nodi attivi: {int(np.sum(mask_changed))}")

        if t > 1000: break

    return pd.DataFrame({'ASIN': nodes, 'HarmonicCentrality': harmonic_centrality})

---

### Versione 3: GPU (matrici cuPy e pre-allocazione buffer)

In [5]:
import cupy as cp
import hashlib
import gc

def harmonic_v3_GPU(G, p=10):
    # =========================================================================
    # FASE 1: SETUP SU CPU
    # =========================================================================
    print(f"--- FASE 1: Setup CPU e Hashing ---")

    G_rev = G.reverse()
    nodes = list(G_rev.nodes())
    n_nodes = len(nodes)

    m = 1 << p
    """
    Numero di registri che compongono ciascun contatore: m = 2^p
    """

    node_to_idx = {node: i for i, node in enumerate(nodes)}
    """
    Mappatura nodo -> indice 0..N-1: dizionario del tipo (k, v) = (node, i), con i risultato dell'enumerazione dell' array di nodi
    """

    edges = np.array([(node_to_idx[u], node_to_idx[v]) for u, v in G_rev.edges()], dtype=np.int32)
    """
    Array di coppie (u_index, v_index), una per ogni edge del tipo (u, v) in G_rev
    """

    M_cpu = np.zeros((n_nodes, m), dtype=np.int32)
    """
    Matrice [n_nodes x m] dei contatori;
    Dato che un registro deve contenere il numero di leading zeroes di un hash (64 bits), il massimo valore inseribile in ciascun registro sara' < 64 (dato che i primi b bits dell' hash servono a individuare il registro corretto tra gli m), quindi 1 byte (uint8) e' sufficiente.
    """

    print("Calcolo hash iniziali su CPU...")

    # Pre-calcolo hash per ogni nodo per inizializzare M
    for i, node in enumerate(nodes):
        # Hash del nodo (crea hash con algoritmo md5 e trasforma il risultato in stringa hex, poi converte in intero a partire da base 16 verso base 10)
        h = int(hashlib.md5(str(node).encode('utf8')).hexdigest(), 16)

        # AND binario tra l' hash h e il numero m-1 = (2^10 - 1) = 1023 = sequenza di zeri seguiti da 10 valori '1'
        # il risultato corrisponde agli ultimi 10 bit di h, che selezionano il registro in cui scrivere il numero di leading zeroes
        j = h & (m - 1)

        # Right shift per rimuovere da h gli ultimi 10 bit estratti in precedenza
        w = h >> p

        # Conteggio del numero di trailing zeroes della porzione di hash rimanente (+1)
        rho = 1
        while (w & 1) == 0 and rho < 32: # while l'ultimo bit di w è uno '0':
            w >>= 1
            rho += 1
        M_cpu[i, j] = rho

    # =========================================================================
    # FASE 2: TRASFERIMENTO SU GPU
    # =========================================================================
    print(f"--- FASE 2: Spostamento dati su GPU, pre-allocazione ---")
    M_A = cp.asarray(M_cpu)         # Matrice corrente
    M_B = M_A.copy()                # Matrice per il prossimo passo (Double Buffering)
    M_float = cp.empty_like(M_A, dtype=cp.float32) # Buffer per i calcoli float

    if len(edges) > 0:
        sources_gpu = cp.asarray(edges[:, 1])
        """ Array monodimensionale ottenuto prendendo SOLO (tutta) la colonna 1 di edges """
        targets_gpu = cp.asarray(edges[:, 0])
        """ Array monodimensionale ottenuto prendendo SOLO (tutta) la colonna 0 di edges """
    else:
        sources_gpu = cp.array([], dtype=cp.int32)
        targets_gpu = cp.array([], dtype=cp.int32)

    # Coppia di arrays per memorizzare le cardinalità al passo t-1 e t
    # Inizialmente ogni nodo ha cardinalità 1 (se stesso)
    prev_cardinality_gpu = cp.ones(n_nodes, dtype=cp.float32)
    harmonic_centrality_gpu = cp.zeros(n_nodes, dtype=cp.float32)

    alpha_m = 0.7213 / (1 + 1.079 / m)
    factor = alpha_m * (m ** 2)

    del M_cpu, edges
    gc.collect()

    print(f"Dati pronti. VRAM inizialmente impegnata: ~{((M_A.nbytes * 3) / 1024**2):.2f} MB")

    # =========================================================================
    # FASE 3: LOOP PRINCIPALE SU GPU
    # =========================================================================
    t = 0
    changed = True

    while changed:
        t += 1
        changed = False

        # Time
        cp.cuda.Stream.null.synchronize()
        start_time = time.time()


        m_src = M_A if t % 2 == 1 else M_B
        m_target = M_B if t % 2 == 1 else M_A


        # PROPAGAZIONE IN AVANTI DEI CONTATORI
        # se il numero di archi e' > 0, calcola il massimo tra il registro del nodo sorgente e i registri dei nodi target
        m_target[:] = m_src[:]
        if len(sources_gpu) > 0:
            # Aggiorna i nodi target con il massimo
            cp.maximum.at(m_target, targets_gpu, m_src[sources_gpu])

        # CONVERSIONI DELLA MATRICE DEI CONTATORI
        M_float[:] = m_target.astype(cp.float32)

        # PRIMA STIMA - MEDIA ARMONICA
        cp.multiply(M_float, -1.0, out=M_float)
        cp.exp2(M_float, out=M_float)
        sum_regs = cp.sum(M_float, axis=1)
        estimate_raw = factor / sum_regs

        # CORREZIONE DELLA STIMA TRAMITE LINEAR COUNTING
        num_zeros = cp.sum(m_target == 0, axis=1) # conta quanti registri sono rimasti a 0 per ciascun nodo (dato che True e' considerato 1)
        estimate = estimate_raw.copy()

        threshold = 2.5 * m
        # Maschera booleana che determina quali nodi devono usare la correzione linear counting. un elemento del vettore e' true solo se entrambe le condizioni sono verificate (AND bitwise)
        mask_lc = (estimate_raw <= threshold) & (num_zeros > 0)

        if cp.any(mask_lc):
            # Formula: m * log(m / V)
            # Calcoliamo solo per i nodi nella maschera
            V = num_zeros[mask_lc].astype(cp.float32)
            estimate[mask_lc] = m * cp.log(m / V)

        # VERIFICA MODIFICHE RISPETTO ALLA STIMA DI CARDINALITA' PRECEDENTE
        diff = estimate - prev_cardinality_gpu
        mask_changed = diff > 0.001

        if cp.any(mask_changed):
            changed = True
            harmonic_centrality_gpu[mask_changed] += diff[mask_changed] * (1.0 / t)
            prev_cardinality_gpu[mask_changed] = estimate[mask_changed]

        # Time
        cp.cuda.Stream.null.synchronize()
        elapsed = time.time() - start_time

        active_nodes = int(cp.sum(mask_changed))
        print(f"Fine t={t}. Tempo GPU: {elapsed:.4f}s. Nodi attivi: {active_nodes}")

        if t > 1000:
            break

    # =========================================================================
    # FASE 4: RECUPERO RISULTATI
    # =========================================================================
    print("Calcolo terminato. Recupero dati dalla GPU...")
    return pd.DataFrame({
        'ASIN': nodes,
        'HarmonicCentrality': harmonic_centrality_gpu.get()
    })

In [6]:
import cupy as cp
import hashlib
import gc

# Definiamo il Kernel CUDA per la propagazione senza copie di memoria
# Questo kernel evita la creazione della matrice temporanea da 4.5GB
propagation_kernel = cp.RawKernel(r'''
extern "C" __global__
void propagate_max(const int* sources, const int* targets, const int* src_matrix, int* target_matrix, int n_edges, int m) {
    int edge_idx = blockIdx.x * blockDim.x + threadIdx.x;
    if (edge_idx < n_edges) {
        int u_idx = sources[edge_idx];
        int v_idx = targets[edge_idx];

        for (int j = 0; j < m; j++) {
            // Esegue l'atomicMax per ogni registro tra il nodo sorgente e il target
            atomicMax(&target_matrix[v_idx * m + j], src_matrix[u_idx * m + j]);
        }
    }
}
''', 'propagate_max')

def harmonic_v3_GPU_optimized(G, p=10):
    print(f"--- FASE 1: Setup CPU e Hashing ---")
    G_rev = G.reverse()
    nodes = list(G_rev.nodes())
    n_nodes = len(nodes)
    m = 1 << p

    node_to_idx = {node: i for i, node in enumerate(nodes)}
    edges = np.array([(node_to_idx[u], node_to_idx[v]) for u, v in G_rev.edges()], dtype=np.int32)

    M_cpu = np.zeros((n_nodes, m), dtype=np.int32)
    for i, node in enumerate(nodes):
        h = int(hashlib.md5(str(node).encode('utf8')).hexdigest(), 16)
        j = h & (m - 1)
        w = h >> p
        rho = 1
        while (w & 1) == 0 and rho < 32:
            w >>= 1
            rho += 1
        M_cpu[i, j] = rho

    print(f"--- FASE 2: Trasferimento e Pre-allocazione ---")
    M_A = cp.asarray(M_cpu)
    M_B = M_A.copy()
    M_float = cp.empty_like(M_A, dtype=cp.float32)

    if len(edges) > 0:
        # Nota: usiamo i nomi originali ma la logica del kernel
        s_gpu = cp.asarray(edges[:, 1]) # sorgenti degli archi
        t_gpu = cp.asarray(edges[:, 0]) # target degli archi
        n_edges = len(edges)
    else:
        n_edges = 0

    prev_cardinality_gpu = cp.ones(n_nodes, dtype=cp.float32)
    harmonic_centrality_gpu = cp.zeros(n_nodes, dtype=cp.float32)
    alpha_m = 0.7213 / (1 + 1.079 / m)
    factor = alpha_m * (m ** 2)

    del M_cpu, edges, G_rev
    gc.collect()

    # =========================================================================
    # FASE 3: LOOP PRINCIPALE
    # =========================================================================
    t = 0
    changed = True

    # Configuriamo i blocchi per il Kernel (128 thread per blocco è uno standard bilanciato)
    threads_per_block = 128
    grid_size = (n_edges + threads_per_block - 1) // threads_per_block

    print(f"Inizio loop. VRAM statica: ~{((M_A.nbytes * 3) / 1024**2):.2f} MB. No extra allocs.")

    while changed:
        t += 1
        changed = False
        cp.cuda.Stream.null.synchronize()
        start_time = time.time()

        m_src = M_A if t % 2 == 1 else M_B
        m_target = M_B if t % 2 == 1 else M_A

        # 1. PROPAGAZIONE (Kernel personalizzato: ZERO copie extra)
        m_target[:] = m_src[:]
        if n_edges > 0:
            propagation_kernel((grid_size,), (threads_per_block,),
                               (s_gpu, t_gpu, m_src, m_target, n_edges, m))

        # 2. STIMA (Operazioni in-place)
        M_float[:] = m_target.astype(cp.float32)
        cp.multiply(M_float, -1.0, out=M_float)
        cp.exp2(M_float, out=M_float)
        sum_regs = cp.sum(M_float, axis=1)
        estimate_raw = factor / sum_regs

        # 3. CORREZIONE LC
        num_zeros = cp.sum(m_target == 0, axis=1)
        estimate = estimate_raw.copy()
        mask_lc = (estimate_raw <= 2.5 * m) & (num_zeros > 0)
        if cp.any(mask_lc):
            V = num_zeros[mask_lc].astype(cp.float32)
            estimate[mask_lc] = m * cp.log(m / V)

        # 4. AGGIORNAMENTO
        diff = estimate - prev_cardinality_gpu
        mask_changed = diff > 0.001
        if cp.any(mask_changed):
            changed = True
            harmonic_centrality_gpu[mask_changed] += diff[mask_changed] * (1.0 / t)
            prev_cardinality_gpu[mask_changed] = estimate[mask_changed]

        cp.cuda.Stream.null.synchronize()
        elapsed = time.time() - start_time
        print(f"t={t} | Tempo: {elapsed:.4f}s | Nodi Attivi: {int(cp.sum(mask_changed))}")

        if t > 1000: break

    # Pulizia
    del M_A, M_B, M_float, s_gpu, t_gpu
    cp.get_default_memory_pool().free_all_blocks()
    gc.collect()

    return pd.DataFrame({'ASIN': nodes, 'HarmonicCentrality': harmonic_centrality_gpu.get()})

---

# Calcolo Harmonic Centrality con metodo HyperBall

In [7]:
if __name__ == "__main__":
    # Compute Harmonic scores and save them to a csv file
    df_harmonic_scores = harmonic_v3_GPU_optimized(G)
    df_harmonic_scores.to_csv("../data/processed/harm_scores.csv", index=False)
    # cp.get_default_memory_pool().free_all_blocks()

    display(df_harmonic_scores.head(5))
    print(f"Computed hc for {len(df_harmonic_scores)} nodes.")

--- FASE 1: Setup CPU e Hashing ---
--- FASE 2: Trasferimento e Pre-allocazione ---
Inizio loop. VRAM statica: ~3923.94 MB. No extra allocs.
t=1 | Tempo: 0.2792s | Nodi Attivi: 212737
t=2 | Tempo: 0.2788s | Nodi Attivi: 177200
t=3 | Tempo: 0.2859s | Nodi Attivi: 151752
t=4 | Tempo: 0.2515s | Nodi Attivi: 135309
t=5 | Tempo: 0.2514s | Nodi Attivi: 123841
t=6 | Tempo: 0.2457s | Nodi Attivi: 115746
t=7 | Tempo: 0.2423s | Nodi Attivi: 109974
t=8 | Tempo: 0.2402s | Nodi Attivi: 105901
t=9 | Tempo: 0.2386s | Nodi Attivi: 102913
t=10 | Tempo: 0.2309s | Nodi Attivi: 100695
t=11 | Tempo: 0.2286s | Nodi Attivi: 98941
t=12 | Tempo: 0.2308s | Nodi Attivi: 97592
t=13 | Tempo: 0.2323s | Nodi Attivi: 96577
t=14 | Tempo: 0.2294s | Nodi Attivi: 95811
t=15 | Tempo: 0.2275s | Nodi Attivi: 95206
t=16 | Tempo: 0.2277s | Nodi Attivi: 94746
t=17 | Tempo: 0.2262s | Nodi Attivi: 94336
t=18 | Tempo: 0.2261s | Nodi Attivi: 93999
t=19 | Tempo: 0.2288s | Nodi Attivi: 93733
t=20 | Tempo: 0.2254s | Nodi Attivi: 9354

Unnamed: 0,ASIN,HarmonicCentrality
0,827229534,10890.977539
1,738700797,9056.976562
2,842328327,2.840921
3,1577943082,5.111011
4,486220125,0.0


Computed hc for 334843 nodes.
