In [40]:
import os
import networkx as nx
import community as community_louvain
import matplotlib.pyplot as plt
import random
import time
import pandas as pd
from typing import List, Dict, Tuple

In [58]:
def incarca_retea(cale_fisier: str) -> nx.Graph:
    """
    Incarca graful din fisierul GML specificat si il transforma intr-un graf simplu,
    eliminand eventualele muchii duplicate. Afiseaza numarul de noduri si muchii.
    """
    G = nx.read_gml(cale_fisier, label='id')
    G = nx.Graph(G)
    print(f"Graful {os.path.basename(cale_fisier)} are {G.number_of_nodes()} noduri și {G.number_of_edges()} muchii după curățare.")
    return G


In [59]:
def detectare_louvain(G: nx.Graph) -> Tuple[Dict[int,int], int, float]:
    """
    Ruleaza algoritmul Louvain pentru detectarea comunitatilor in graf.
    Returneaza partitia nodurilor, numarul de comunitati si timpul de executie.
    """
    start = time.time()
    partitie = community_louvain.best_partition(G)
    durata = time.time() - start
    n_comunitati = len(set(partitie.values()))
    print(f"Louvain: {n_comunitati} comunități găsite în {durata:.4f} secunde.")
    return partitie, n_comunitati, durata

In [60]:
def init_populatie(G: nx.Graph, dim_pop=50) -> List[List[int]]:
    """
    Genereaza o populatie initiala de cromozomi (solutii) random,
    fiecare cromozom reprezinta o partitie a nodurilor in comunitati.
    """
    n = G.number_of_nodes()
    return [[random.randint(0, n-1) for _ in range(n)] for _ in range(dim_pop)]

def fitness_modularitate(G: nx.Graph, cromozom: List[int]) -> float:
    """
    Calculeaza fitness-ul cromozomului ca modularitatea partitiilor
    in graf, masurand calitatea comunitatilor detectate.
    """
    mapping = {i: c for i, c in enumerate(cromozom)}
    return community_louvain.modularity(mapping, G)

def fitness_densitate(G: nx.Graph, cromozom: List[int]) -> float:
    """
    Calculeaza fitness-ul ca densitatea medie a subgrafurilor
    formate de comunitatile din cromozom.
    """
    comunitati = {}
    for i, c in enumerate(cromozom):
        comunitati.setdefault(c, []).append(i)
    densitati = []
    for noduri in comunitati.values():
        subg = G.subgraph(noduri)
        if subg.number_of_nodes() <= 1:
            densitati.append(0)
        else:
            densitati.append(nx.density(subg))
    return sum(densitati) / len(densitati)

def selectie_tournament(populatie: List[List[int]], scoruri: List[float], k=3) -> List[int]:
    """
    Selecteaza un cromozom din populatie folosind turneul competitiv:
    se alege cel mai bun din k indivizi random.
    """
    competitori = random.sample(range(len(populatie)), k)
    best_idx = max(competitori, key=lambda i: scoruri[i])
    return populatie[best_idx]

def crossover(p1: List[int], p2: List[int]) -> List[int]:
    """
    Realizeaza recombinarea prin selectie aleatorie pe fiecare gena
    din cromozom intre parintii p1 si p2.
    """
    return [random.choice([g1, g2]) for g1, g2 in zip(p1, p2)]

def mutatie(cromozom: List[int], rata_mut=0.1) -> List[int]:
    """
    Aplica mutatie cromozomului prin schimbarea aleatoare a genei
    cu o probabilitate rata_mut.
    """
    n = len(cromozom)
    for i in range(n):
        if random.random() < rata_mut:
            cromozom[i] = random.randint(0, n-1)
    return cromozom

def algoritm_genetic(
    G: nx.Graph,
    dim_pop=50,
    generatii=100,
    rata_mut=0.1,
    fitness_func=fitness_modularitate
) -> Tuple[Dict[int,int], int, float, float]:
    """
    Ruleaza algoritmul genetic pentru detectarea comunitatilor:
    - initializeaza populatia
    - evalueaza fitness-ul fiecarei solutii
    - aplica selectie, crossover si mutatie iterativ pe generatii
    - returneaza cea mai buna solutie gasita, numarul de comunitati,
      timpul de rulare si valoarea fitness-ului.
    """
    noduri = list(G.nodes())
    populatie = init_populatie(G, dim_pop)
    scoruri = [fitness_func(G, c) for c in populatie]
    best = max(zip(scoruri, populatie), key=lambda x: x[0])

    start = time.time()
    for gen in range(generatii):
        new_pop = []
        for _ in range(dim_pop):
            p1 = selectie_tournament(populatie, scoruri)
            p2 = selectie_tournament(populatie, scoruri)
            copil = crossover(p1, p2)
            copil = mutatie(copil, rata_mut)
            new_pop.append(copil)
        populatie = new_pop
        scoruri = [fitness_func(G, c) for c in populatie]
        current_best = max(zip(scoruri, populatie), key=lambda x: x[0])
        if current_best[0] > best[0]:
            best = current_best
        if gen % 10 == 0 or gen == generatii - 1:
            print(f"Generatia {gen}: fitness = {best[0]:.4f}")
    durata = time.time() - start

    cromozom_best = best[1]
    mapping_best = {noduri[i]: c for i, c in enumerate(cromozom_best)}
    n_comunitati = len(set(cromozom_best))
    print(f"Algoritm genetic: {n_comunitati} comunitati, timp {durata:.4f} sec, fitness {best[0]:.4f}")
    return mapping_best, n_comunitati, durata, best[0]

In [61]:
def afiseaza_comunitati(G: nx.Graph, partitie: Dict[int,int], titlu="Comunități detectate"):
    """
    Afiseaza grafic graful cu nodurile colorate dupa comunitatile din partitie.
    """
    culori = [partitie[nod] for nod in G.nodes()]
    plt.figure(figsize=(10,7))
    nx.draw_spring(G, node_color=culori, cmap=plt.cm.Set3, with_labels=True)
    plt.title(titlu)
    plt.show()

In [64]:
def ruleaza_pe_folder(
    folder: str,
    dim_pop=50,
    generatii=100,
    rata_mut=0.1,
    fitness_funcs=[fitness_modularitate],
    afisare_grafica=False
):
    """
    Proceseaza toate fisierele .gml dintr-un folder folosind Louvain si algoritm genetic,
    afiseaza rezultatele si returneaza un raport cu performantele.
    """
    fisiere = [f for f in os.listdir(folder) if f.endswith(".gml")]
    raport = []

    for f in fisiere:
        print(f"\nRețea: {f}")
        cale = os.path.join(folder, f)
        try:
            G = incarca_retea(cale)
        except Exception as e:
            print(f"EROARE la încărcarea rețelei {f}: {e}")
            continue 

        part_louvain, n_louvain, t_louvain = detectare_louvain(G)
        if afisare_grafica:
            afiseaza_comunitati(G, part_louvain, f"Louvain - {f}")

        for idx, fit_func in enumerate(fitness_funcs):
            print(f"Algoritm genetic cu fitness #{idx+1}: {fit_func.__name__}")
            try:
                part_ga, n_ga, t_ga, fit_ga = algoritm_genetic(G, dim_pop, generatii, rata_mut, fit_func)
            except Exception as e:
                print(f"EROARE la algoritmul genetic pe rețeaua {f} cu funcția {fit_func.__name__}: {e}")
                continue
            if afisare_grafica:
                afiseaza_comunitati(G, part_ga, f"GA {fit_func.__name__} - {f}")

            raport.append({
                "fisier": f,
                "metoda": "Louvain",
                "nr_comunitati": n_louvain,
                "timp": t_louvain,
                "fitness": None,
            })
            raport.append({
                "fisier": f,
                "metoda": f"GA_{fit_func.__name__}",
                "nr_comunitati": n_ga,
                "timp": t_ga,
                "fitness": fit_ga,
            })

    return raport


In [None]:
def ruleaza_si_salveaza_raport(folder1: str, folder2: str, fitness_funcs: list, dim_pop=50, generatii=100, rata_mut=0.1, afisare_grafica=True, nume_fisier="raport_complet_detectie_comunitati.csv"):
    """
    Ruleaza detectia comunitatilor pe retele din doua foldere diferite,
    combina rezultatele si le salveaza intr-un fisier CSV.
    """
    raport1 = ruleaza_pe_folder(
        folder=folder1,
        dim_pop=dim_pop,
        generatii=generatii,
        rata_mut=rata_mut,
        fitness_funcs=fitness_funcs,
        afisare_grafica=afisare_grafica
    )

    raport2 = ruleaza_pe_folder(
        folder=folder2,
        dim_pop=dim_pop,
        generatii=generatii,
        rata_mut=rata_mut,
        fitness_funcs=fitness_funcs,
        afisare_grafica=afisare_grafica
    )

    raport_total = raport1 + raport2
    df_raport = pd.DataFrame(raport_total)
    print(df_raport)

    df_raport.to_csv(nume_fisier, index=False)
    print(f"Raportul complet a fost salvat in '{nume_fisier}'")

folder_real = "/Users/ungur/OneDrive/Desktop/facultate_materiale/AN_2/SEM_2/ai/lab10/real"
folder_real2 = "/Users/ungur/OneDrive/Desktop/facultate_materiale/AN_2/SEM_2/ai/lab10/real2"
fitnessuri = [fitness_modularitate, fitness_densitate]

ruleaza_si_salveaza_raport(folder_real, folder_real2, fitnessuri)