In [430]:
from typing import List, Tuple, Set, Dict

# Definizione del tipo Hypothesis come tupla di interi
Hypothesis = Tuple[int, ...]


def empty_hypothesis(n: int):
    """Crea un'ipotesi vuota di lunghezza n."""
    return tuple([0]*n)

In [431]:
def successore_immediato(h: Hypothesis):
    """
    Restituisce la lista di tutte le ipotesi che 
    differiscono da h per un solo bit (da 0 a 1)
    :param h: ipotesi di partenza
    :return: lista di ipotesi successori
    """
    successori = []
    h_list = list(h)
    for i in range(len(h_list)):
        if h_list[i] == 0:
            nuovo = h_list.copy()
            nuovo[i] = 1
            successori.append(nuovo)
    return successori

In [432]:
def predecessore_immediato(h: Hypothesis):
    """
    Il numero di predecessori immediati corrisponde al numero di 1 in h
    :param h: ipotesi di partenza
    :return: lista di ipotesi predecessori
    """
    predecessori = []
    for i in reversed(range(len(h))):  # Scorri gli indici al contrario
        if h[i] == 1:
            nuovo = list(h)
            nuovo[i] = 0
            predecessori.append(tuple(nuovo))
    return predecessori

In [433]:
def SET_FIELDS(h):
    """Metodo per impostare il campo vector(h) di un'ipotesi h.
    :param h: ipotesi di cui si vuole impostare il campo vector
    """
    ipotesi_vuota = empty_hypothesis(len(h))
    successori = successore_immediato(ipotesi_vuota)
    if list(h) in successori:
        vector_h = list(h)
    else:
        vector_h = [0] * len(h)
    return vector_h

In [434]:
def PROPAGATE(h: Hypothesis, h1: Hypothesis):
    """
    Propaga le informazioni dall'ipotesi h1 all'ipotesi h.
    :param h: ipotesi di partenza
    :param h1: ipotesi da cui propagare le informazioni
    :return: nuova ipotesi risultante dalla propagazione
    """
    risultato = tuple(x | y for x, y in zip(h, h1))

    return risultato

In [435]:
def LM1(h: Hypothesis) -> int:
    """
    Restituisce la posizione (indice) del primo 1 nella tupla h.
    Se non c'è nessun 1, restituisce -1.
    """
    try:
        return h.index(1)+1
    except ValueError:
        return -1

In [436]:
def globalInitial(h: Hypothesis):
    """
    Metodo che restituisce l'ipotesi iniziale globale.
    :param h: ipotesi di partenza
    :return: ipotesi iniziale globale
    """
    h_list = list(h)
    if not all(x == 0 for x in h_list) and h_list[0] == 0:
        h_list[0] = 1
        for i in range(len(h_list)-1, 0, -1):
            if h_list[i] == 1:
                h_list[i] = 0
                break
        return tuple(h_list)
    return tuple(h_list)

In [437]:
def initial(h: Hypothesis, h1: Hypothesis):
    """
    Restituisce il predecessore immediato di h1
    più a sinistra tra tutti i predecessori immediati di h1
    distinti da h
    :param h: ipotesi di partenza
    :param h1: ipotesi di cui si vogliono trovare i predecessori
    :return: predecessore immediato di h1 più a sinistra
    """
    predecessori = predecessore_immediato(h1)
    return predecessori[0]

In [438]:
def final(h: Hypothesis, h1: Hypothesis):
    """
    Restituisce il predecessore immediato di h1
    più a destra tra tutti i predecessori immediati di h1
    distinti da h
    :param h: ipotesi di partenza
    :param h1: ipotesi di cui si vogliono trovare i predecessori
    :return: predecessore immediato di h1 più a destra
    """
    predecessori = predecessore_immediato(h1)
    return predecessori[len(predecessori)-1]

In [439]:
def dist(h: Hypothesis, h1: Hypothesis) -> int:
    """
    Calcola la distanza tra due ipotesi h e h1
    :param h: ipotesi di partenza
    :param h1: ipotesi di arrivo
    :return: distanza tra h e h1
    """
    return sum(el1 != el2 for el1, el2 in zip(h, h1))

In [440]:
def card(h: Hypothesis) -> int:
    """
    Calcola la cardinalità di un'ipotesi h, intesa come numero di 1 in h
    :param h: ipotesi di cui si vuole calcolare la cardinalità
    :return: cardinalità di h
    """
    return sum(h)

In [441]:
def binario(h: Hypothesis) -> int:
    """
    Restituisce il valore intero corrispondente alla rappresentazione binaria di h
    :param h: ipotesi da convertire in intero
    :return: valore intero corrispondente a h
    """
    return int("".join(map(str, h)), 2)

In [442]:
def MERGE(seq1: List[Hypothesis], seq2: List[Hypothesis]) -> List[Hypothesis]:
    """
    Unisce due liste ordinate di ipotesi in un'unica lista ordinata,
    confrontando con la funzione binario(h).
    :param seq1: prima lista di ipotesi ordinate
    :param seq2: seconda lista di ipotesi ordinate
    :return: lista unita di ipotesi ordinate
    """
    i, j = 0, 0
    merged = []
    
    while i < len(seq1) and j < len(seq2):
        if binario(seq1[i]) <= binario(seq2[j]):
            merged.append(seq1[i])
            i += 1
        else:
            merged.append(seq2[j])
            j += 1
    
    # aggiungo gli elementi rimanenti
    while i < len(seq1):
        merged.append(seq1[i])
        i += 1
    while j < len(seq2):
        merged.append(seq2[j])
        j += 1
    
    return merged

In [443]:
def CHECK(h: Hypothesis, N: List[List[str]], M: List[str]):
    """
    Controlla se un ipotesi h è una soluzione per il problema
    :param h: ipotesi da controllare
    :param N: lista di liste di stringhe, dove ogni lista interna rappresenta un
    insieme di stringhe che devono essere hittati dall'ipotesi h
    :param M: lista di stringhe che rappresentano gli elementi associati agli indici
    di h
    """
    for lista_interna in N:
        if not any(h[i] == 1 and M[i] in lista_interna for i in range(len(M))):
            return False
    return True

In [444]:
def is_minimal(h, solutions):
    """
    Metodo che controlla se un'ipotesi h è minimale
    rispetto a una lista di soluzioni
    :param h: ipotesi da controllare
    :param solutions: lista di soluzioni
    :return: True se h è minimale, False altrimenti
    """
    for other in solutions:
        if other != h:
            # se other è sottoinsieme di h
            if all((h[i] >= other[i]) for i in range(len(h))) and any(h[i] > other[i] for i in range(len(h))):
                return False
    return True

In [445]:
def transpose(matrix: List[List[int]]) -> List[List[int]]:
    """
    Restituisce la trasposizione della matrice data.
    :param matrix: matrice da trasporre
    :return: matrice trasposta
    """
    if not matrix:
        return []
    return [list(row) for row in zip(*matrix)]

In [446]:
def parse_matrix_files(path):
    M = []   # domini (colonne)
    N = []   # insiemi (righe)

    with open(path, "r") as f:
        lines = f.readlines()

    # --- 1) Trova la riga con "Map"
    for line in lines:
        if line.strip().startswith(";;; Map"):
            parts = line.strip().split()[2:]  # salta ";;; Map"
            for p in parts:
                idx, name = p.split("(")
                name = name.strip(")")
                M.append(name)
            break

    # --- 2) Trova le righe numeriche
    for line in lines:
        if line.strip() and line[0].isdigit():
            values = line.strip().split()
            if values[-1] == "-":
                values = values[:-1]

            length = min(len(values), len(M))
            # lista ordinata di nomi corrispondenti a 1
            subset = [M[i] for i, v in enumerate(values[:length]) if v == "1"]
            N.append(subset)

    return M, N


In [447]:
import os

def get_filename(path: str, with_extension: bool = True) -> str:
    """
    Estrae il nome del file dal percorso completo.
    :param path: percorso completo del file
    :param with_extension: se True, include l'estensione del file
    :return: nome del file
    """
    base = os.path.basename(path)
    if not with_extension:
        base = os.path.splitext(base)[0]
    return base

In [448]:
M, N = parse_matrix_files("../benchmarks1/74L85.000.matrix")

print("Dominio M:", M)
print("Insiemi N:")
for i, subset in enumerate(N, 1):
    print(f"  N{i} = {subset}")

Dominio M: ['z1', 'z2', 'z3', 'z4', 'z5', 'z6', 'z7', 'z8', 'z9', 'z10', 'z11', 'z12', 'z13', 'z14', 'z15', 'z16', 'z17', 'z18', 'z19', 'z20', 'z21', 'z22', 'z23', 'z24', 'z25', 'z26', 'z27', 'z28', 'z29', 'z30', 'o1', 'o2', 'o3']
Insiemi N:
  N1 = ['z4', 'z9', 'z10', 'z17', 'z22', 'z24', 'o2']
  N2 = ['z2', 'z22', 'o2']


In [449]:
def GENERATE_CHILDREN(h: Hypothesis, current: List[Hypothesis]):
    children = set()

    # Caso: ipotesi vuota → genera tutti i vicini a distanza 1
    if h == empty_hypothesis(len(h)):
        for i in range(len(h)):
            if h[i] == 0:
                h1 = list(h)
                h1[i] = 1
                SET_FIELDS(h1)
                children.add(tuple(h1))
        return list(children)

    # Mappa per accesso rapido agli indici
    index_map = {tuple(hyp): idx for idx, hyp in enumerate(current)}

    for i in range(LM1(h) - 1):
        if h[i] == 0:
            h1 = list(h)
            h1[i] = 1
            SET_FIELDS(h1)
            PROPAGATE(h, h1)

            h_i = initial(h, h1)
            h_f = final(h, h1)
            count = 0

            # prendo indici in sicurezza
            start_idx = index_map.get(tuple(h_f), 0)
            end_idx   = index_map.get(tuple(h_i), len(current) - 1)

            # scorro su una sottolista di current SENZA modificarla
            for hp in current[start_idx:end_idx + 1]:
                if dist(hp, h1) == 1 and dist(hp, h) == 2:
                    PROPAGATE(hp, h1)
                    count += 1

            if count == card(h) and tuple(h1) != tuple(h):
                children.add(tuple(h1))

    return list(children)
# TEST 1

Versione corretta ma teniamo anche l'altra

In [450]:
def GENERATE_CHILDREN(h: Hypothesis, current: List[Hypothesis]):
    """
    Genera i figli dell'ipotesi h secondo la logica originale.
    :param h: ipotesi di partenza
    :param current: lista delle ipotesi correnti
    :return: lista di ipotesi figlie
    """
    children = []

    # Caso ipotesi vuota → genera tutti i successori immediati
    if card(h) == 0:
        for i in range(len(h)):
            if h[i] == 0:
                h1 = list(h)
                h1[i] = 1
                children.append(tuple(h1))
        return children

    # Creazione mappa per accesso rapido
    index_map = {hyp: idx for idx, hyp in enumerate(current)}

    # Scorri fino al leftmost 1 di h
    for i in range(LM1(h) - 1):
        if h[i] == 0:
            h1 = list(h)
            h1[i] = 1
            # Propagazione
            h1 = PROPAGATE(h, tuple(h1))

            h_i = initial(h, tuple(h1))
            h_f = final(h, tuple(h1))

            # Indici nella lista corrente
            start_idx = index_map.get(h_f, 0)
            end_idx = index_map.get(h_i, len(current) - 1)

            count = 0
            for hp in current[start_idx:end_idx + 1]:
                if dist(hp, tuple(h1)) == 1 and dist(hp, h) == 2:
                    h1 = PROPAGATE(hp, tuple(h1))
                    count += 1

            if count == card(h) and tuple(h1) != tuple(h):
                children.append(tuple(h1))

    return children
# TEST 2

In [451]:
import time

def MAIN(N: List[List[str]], M: List[str], input_path:str, max_seconds: int = None, output_dir: str = "compito_1/outputs"):
    start_time = time.time() # tempo di inizio

    h_0 = tuple(empty_hypothesis(len(M)))
    current = [h_0]
    solutions = set()

    # Variabili per gestione timeout
    interrupted = False
    interrupted_level = None

    while current:
        next_level = []

        for h in list(current):
            # Controllo timeout
            if max_seconds and (time.time() - start_time) > max_seconds:
                interrupted = True
                interrupted_level = card(h)
                current = []  # forza uscita dal while
                break

            # Controllo se h è una soluzione
            if CHECK(h, N, M):
                solutions.add(h)
                current.remove(h)

            elif h == empty_hypothesis(len(M)):
                next_level.extend(GENERATE_CHILDREN(h, current))

            elif LM1(h) != 1:
                h_s_2 = globalInitial(h)

                # Rimuovi da current tutte le ipotesi h_r tali che bin(h_r) > bin(h_s_2)
                current = [h_r for h_r in current if binario(h_r) <= binario(h_s_2)]

                # Prendo la prima ipotesi in current
                if current:
                    h_p = current[0]
                    if h_p != h:
                        next_level = MERGE(next_level, GENERATE_CHILDREN(h, current))
        
        current = next_level
        minimal_solutions = [h for h in solutions if is_minimal(h, solutions)]
        

    # --- Prepara directory di output ---
    os.makedirs(output_dir, exist_ok=True)

    # Nome del file in uscita
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    output_path = os.path.join(output_dir, base_name + ".mhs")

    # --- Scrittura file ---
    with open(output_path, "w") as f:
        # intestazione
        f.write(f";;; Generated from {os.path.basename(input_path)}\n")
        f.write(f";;; MHS computation\n")

        # Map (colonne come nel file originale)
        f.write(";;; Map " + " ".join(f"{i+1}({M[i]})" for i in range(len(M))) + "\n")

        # scrivi i MHS in formato matriciale
        for sol in minimal_solutions:
            row = " ".join(str(x) for x in sol) + " -"
            f.write(row + "\n")

        # --- Riassunto ---
        f.write(";;;\n")
        f.write(f";;; Input matrix size: {len(N)} x {len(M)}\n")
        f.write(f";;; Number of MHS: {len(minimal_solutions)}\n")

        if minimal_solutions:
            min_card = min(card(h) for h in minimal_solutions)
            max_card = max(card(h) for h in minimal_solutions)
            f.write(f";;; Min cardinality: {min_card}\n")
            f.write(f";;; Max cardinality: {max_card}\n")

        if interrupted:
            f.write(f";;; Computation interrupted (timeout {max_seconds}s)\n")
            f.write(f";;; Interrupted at level H = {interrupted_level}\n")

    return minimal_solutions, output_path


Prova per vedere se funziona


In [452]:
# --- parsing del file .matrix ---
# M, N = parse_matrix_files("../benchmarks1/74L85.000.matrix")
# M, N = parse_matrix_files("../benchmarks1/74L85.008.matrix")
M, N = parse_matrix_files("../benchmarks1/74L85.002.matrix")

print("Dominio M:", M)
print("Insiemi N:")
for i, subset in enumerate(N, 1):
    print(f"  N{i} = {subset}")

Dominio M: ['z1', 'z2', 'z3', 'z4', 'z5', 'z6', 'z7', 'z8', 'z9', 'z10', 'z11', 'z12', 'z13', 'z14', 'z15', 'z16', 'z17', 'z18', 'z19', 'z20', 'z21', 'z22', 'z23', 'z24', 'z25', 'z26', 'z27', 'z28', 'z29', 'z30', 'o1', 'o2', 'o3']
Insiemi N:
  N1 = ['z1', 'z2', 'z5', 'z9', 'z10', 'z11', 'z12', 'z17', 'z18', 'z24', 'z25', 'o2', 'o3']
  N2 = ['z1', 'z4', 'z9', 'z10', 'z17', 'z22', 'z24', 'o2']
  N3 = ['z2', 'z4', 'z9', 'z10', 'z17', 'z21', 'z24', 'o2', 'o3']
  N4 = ['z1', 'z2', 'z4', 'z9', 'z10', 'z17', 'z24', 'o2']


In [453]:
# --- esecuzione algoritmo MAIN con output in ./outputs ---
# solutions, output_file = MAIN(N, M, "../benchmarks1/74L85.000.matrix", max_seconds=30) # test completo
# solutions, output_file = MAIN(N, M, "../benchmarks1/74L85.008.matrix", max_seconds=0.6) # test per interruzione
solutions, output_file = MAIN(N, M, "../benchmarks1/74L85.002.matrix", max_seconds=120)


print("\nNumero di MHS trovati:", len(solutions))
print("File di output generato:", output_file)

# Se vuoi dare un'occhiata rapida ai primi MHS trovati.
for s in solutions:
    elementi = [M[i] for i, bit in enumerate(s) if bit == 1]
    print(f"{s} -> {elementi}")


Numero di MHS trovati: 5
File di output generato: compito_1/outputs/74L85.002.mhs
(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0) -> ['o2']
(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0) -> ['z24']
(0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) -> ['z9']
(0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) -> ['z10']
(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) -> ['z17']


### Compito 2 (Update 22/09)

In [454]:
def reduce_domain_remove_empty_cols(M: List[str], N: List[Set[str]]):
    """
    Rimuove colonne di M che non compaiono in alcun subset di N.
    Restituisce:
      - M_prime: lista colonne non vuote (ordine preservato)
      - N_prime: lista di set coerenti con M_prime
      - removed_indices: lista di indici originali (0-based) rimossi
      - old_to_new: lista di lunghezza len(M) con -1 per rimossi, altrimenti indice in M_prime
      - new_to_old: lista di indici originali per ogni indice di M_prime
    """
    name_to_index = {name: i for i, name in enumerate(M)}   # mappa nome -> indice
    used = [False] * len(M)                                 # tracciamento colonne usate

    for subset in N:
        for name in subset:
            idx = name_to_index.get(name)
            if idx is not None:
                used[idx] = True

    removed_indices = [i for i, u in enumerate(used) if not u]      # indici rimossi
    new_to_old = [i for i, u in enumerate(used) if u]       
    old_to_new = [-1] * len(M)  
    for new_idx, old_idx in enumerate(new_to_old):                  
        old_to_new[old_idx] = new_idx

    M_prime = [M[i] for i in new_to_old]    # colonne non vuote di M

    # costruisco N_prime mantenendo solo nomi presenti in M_prime
    N_prime = []
    for subset in N:
        subset_prime = {name for name in subset if name in name_to_index and old_to_new[name_to_index[name]] != -1}
        N_prime.append(subset_prime)

    return M_prime, N_prime, removed_indices, old_to_new, new_to_old

In [455]:
def lift_solution_to_full(sol_reduced: Tuple[int, ...], M_len: int, old_to_new: List[int]):
    """
    Rialza una soluzione su M' (tupla len(M')) alla rappresentazione completa len(M),
    inserendo 0 nelle colonne rimosse.
    :param sol_reduced: soluzione su M' (tupla di 0/1)
    :param M_len: lunghezza originale di M
    :param old_to_new: mappa da indici originali a nuovi indici (-1 se rimosso)
    :return: tupla di lunghezza M_len con 0
    """
    full = [0] * M_len  # soluzione completa inizializzata a 0
    for old_idx, new_idx in enumerate(old_to_new):      # scorro indici originali
        if new_idx != -1:                               # controllo se non è stato rimosso   
            full[old_idx] = sol_reduced[new_idx]        
        else:
            full[old_idx] = 0
    return tuple(full)              

In [456]:
import tracemalloc
import time
import os
from typing import Optional

try:                            
    import psutil               # libreria per monitorare utilizzo della memoria
    _HAS_PSUTIL = True
except Exception:
    _HAS_PSUTIL = False

In [457]:
def start_perf_tracking():
    """Avvia tracciamento memoria (tracemalloc) e prende tempo iniziale."""
    tracemalloc.start()
    return time.perf_counter()

In [458]:
def stop_perf_tracking(start_time: float):
    """Ferma tracemalloc e ritorna elapsed_seconds, peak_mem_kb."""
    current, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    elapsed = time.perf_counter() - start_time  # tempo trascorso in secondi
    peak_kb = int(peak / 1024) if peak is not None else None    # picco memoria in KB 

    # se psutil disponibile forniamo anche rss come fallback
    rss_kb = None
    if _HAS_PSUTIL:
        p = psutil.Process(os.getpid())
        try:
            rss_kb = int(p.memory_info().rss / 1024)
        except Exception:
            rss_kb = None

    return elapsed, peak_kb, rss_kb

In [459]:
def write_mhs_file(output_path: str,
                   input_basename: str,
                   M: List[str],
                   minimal_solutions_full: List[Tuple[int, ...]],
                   stats: dict,
                   removed_indices_1based: Optional[List[int]] = None):
    """Metodo per scrivere il file .mhs con intestazione e riepilogo.
    :param output_path: percorso completo del file di output
    :param input_basename: nome del file di input (solo nome, senza path)
    :param M: lista completa delle colonne (dominio originale)
    :param minimal_solutions_full: lista di tuple rappresentanti le soluzioni su M completo
    :param stats: dizionario con statistiche da includere nel file
    :param removed_indices_1based: lista di indici (1-based) delle colonne rimosse
    """

    os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
    with open(output_path, "w") as f:
        f.write(f";;; Generated from {input_basename}\n")
        f.write(";;; MHS computation\n")
        f.write(f";;; Input matrix {stats.get('len_N')} X {stats.get('len_M')}\n")
        f.write(f";;; Number of MHS found: {stats.get('total_solutions_found')}\n")
        f.write(f";;; Minimum cardinality: {stats.get('min_cardinality')}\n")
        f.write(f";;; Maximum cardinality: {stats.get('max_cardinality')}\n")
        f.write(f";;; Elapsed time: {stats.get('elapsed_seconds')}s\n")
        f.write(f";;; Memory used: {stats.get('peak_mem_kb')} KB\n")

        # Colonne rimosse
        if removed_indices_1based:
            f.write(f";;; Computation done removing the columns: {removed_indices_1based}\n")
            f.write(f";;; -> The dimensions of the matrix really used are {stats.get('len_N')} X {stats.get('len_Mp')}\n")
        else:
            f.write(f";;; Computation done without removing columns\n")

        # Livello di interruzione (se presente)
        if stats.get("interrupted"):
            f.write(f";;; Computation stopped at level {stats.get('interrupted_level')}\n")
        else:
            f.write(f";;; Computation completed successfully\n")
        
        f.write(";;;\n")
        f.write(";;; Hypotesis generated for each level:\n")

        # --- Statistiche per livello ---
        level_stats = stats.get("level_stats", {})
        for lvl in sorted(level_stats.keys()):
            g = level_stats[lvl].get("generated", 0)
            label = "hypotesis" if g == 1 else "hypoteses"
            f.write(f";;; Level {lvl}: {g} {label}\n")

        f.write(";;;\n")
    
        f.write(";;; Map " + " ".join(f"{i+1}({M[i]})" for i in range(len(M))) + "\n")

        # --- Scrittura delle soluzioni ---
        for sol in minimal_solutions_full:
            f.write(" ".join(str(x) for x in sol) + " -\n")
                
    print(f"[INFO] File scritto correttamente: {output_path}")

In [460]:
#TEST 2.1
def GENERATE_CHILDREN(h: Hypothesis, current: List[Hypothesis]):
    """
    Genera i figli dell'ipotesi h in ordine decrescente binaria.
    """
    children = []

    # Caso ipotesi vuota → genera tutti i successori immediati
    if card(h) == 0:
        for i in reversed(range(len(h))):
            if h[i] == 0:
                h1 = list(h)
                h1[i] = 1
                children.append(tuple(h1))
        return children

    # Creazione mappa per accesso rapido
    index_map = {hyp: idx for idx, hyp in enumerate(current)}

    # Scorri tutti i bit da destra verso sinistra fino al leftmost 1
    for i in reversed(range(LM1(h))):
        if h[i] == 0:
            h1 = list(h)
            h1[i] = 1
            h1 = PROPAGATE(h, tuple(h1))

            h_i = initial(h, tuple(h1))
            h_f = final(h, tuple(h1))

            start_idx = index_map.get(h_f, 0)
            end_idx = index_map.get(h_i, len(current) - 1)

            count = 0
            for hp in current[start_idx:end_idx + 1]:
                if dist(hp, tuple(h1)) == 1 and dist(hp, h) == 2:
                    h1 = PROPAGATE(hp, tuple(h1))
                    count += 1

            if count == card(h) and tuple(h1) != tuple(h):
                children.append(tuple(h1))

    # Già in ordine decrescente, non serve ulteriore sort
    return children


Versione corretta Generate Children ripresa da sopra dove c'è scritto #TEST2

In [461]:
def GENERATE_CHILDREN(h: Hypothesis, current: List[Hypothesis]):
    """
    Genera i figli dell'ipotesi h secondo la logica originale.
    :param h: ipotesi di partenza
    :param current: lista delle ipotesi correnti
    :return: lista di ipotesi figlie
    """
    children = []

    # Caso ipotesi vuota → genera tutti i successori immediati
    if card(h) == 0:
        for i in range(len(h)):
            if h[i] == 0:
                h1 = list(h)
                h1[i] = 1
                children.append(tuple(h1))
        return children

    # Creazione mappa per accesso rapido
    index_map = {hyp: idx for idx, hyp in enumerate(current)}

    # Scorri fino al leftmost 1 di h
    for i in range(LM1(h) - 1):
        if h[i] == 0:
            h1 = list(h)
            h1[i] = 1
            # Propagazione
            h1 = PROPAGATE(h, tuple(h1))

            h_i = initial(h, tuple(h1))
            h_f = final(h, tuple(h1))

            # Indici nella lista corrente
            start_idx = index_map.get(h_f, 0)
            end_idx = index_map.get(h_i, len(current) - 1)

            count = 0
            for hp in current[start_idx:end_idx + 1]:
                if dist(hp, tuple(h1)) == 1 and dist(hp, h) == 2:
                    h1 = PROPAGATE(hp, tuple(h1))
                    count += 1

            if count == card(h) and tuple(h1) != tuple(h):
                children.append(tuple(h1))

    return children
# TEST 2

In [462]:
def MAIN_COMPITO_2(input_path: str, max_seconds: float = None, output_dir: str = "outputs"):
    
    """
    Wrapper MAIN:
      - legge M,N dal file (parse_matrix_file)
      - riduce dominio a M'
      - esegue la ricerca MHS su M' (utilizza le funzioni CHECK/GENERATE_CHILDREN/... già presenti)
      - registra statistiche temporali/spaziali e per-livello
      - scrive file .mhs in output_dir con soluzioni rialzate su M
    :param input_path: percorso completo del file .matrix di input
    :param max_seconds: timeout in secondi (None = nessun timeout)
    :param output_dir: directory di output per il file .mhs
    :return: (minimal_solutions_full, output_path, stats)
        - minimal_solutions_full: lista di tuple rappresentanti le soluzioni su M completo
        - output_path: percorso completo del file .mhs generato
        - stats: dizionario con statistiche della computazione
    """

    # --- parsing del file di input e riduzione delle colonne vuote ---
    M, N = parse_matrix_files(input_path)          # M: List[str], N: List[Set[str]]
    len_M = len(M)
    len_N = len(N)

    M_prime, N_prime, removed_indices, old_to_new, new_to_old = reduce_domain_remove_empty_cols(M, N)
    len_Mp = len(M_prime)

    # Livello H_max = max(len_N, len_Mp)
    H_max = max(len_N, len_Mp)

    # Conversione liste per la funzione CHECK
    N_list_prime = [list(s) for s in N_prime]
    M_prime_list = list(M_prime)

    # dizionario per statistiche
    stats: Dict = {
        "input_file": os.path.basename(input_path),
        "len_M": len_M,
        "len_Mp": len_Mp,
        "len_N": len_N,
        "removed_indices": removed_indices,
        "H_max": H_max,
        "elapsed_seconds": None,
        "peak_mem_kb": None,
        "interrupted": False,
        "interrupted_level": None,
        "max_seconds": max_seconds,
        "level_stats": {}, 
        "total_solutions_found": 0,
        "min_cardinality": None,
        "max_cardinality": None,
    }

    # --- start perf tracking ---
    start_time = start_perf_tracking()
    stats["start_time"] = start_time
    stats["start_time_str"] = time.ctime()


    h_0 = empty_hypothesis(len_Mp)
    current: List[Hypothesis] = [h_0]
    solutions_reduced: Set[Hypothesis] = set()

    # timeout flags
    interrupted = False
    interrupted_level = None

    level_stats: Dict[int, Dict[str, int]] = {} 

    # --- ciclo principale algoritmo ---
    try:
        while current:
            next_level: List[Hypothesis] = []

            for h in list(current):
                # timeout
                if max_seconds and (time.perf_counter() - start_time) > max_seconds:
                    interrupted = True
                    interrupted_level = card(h)
                    current = []  # forza uscita
                    break

                lvl = card(h)
                level_stats.setdefault(lvl, {"generated": 0, "checked": 0})
                level_stats[lvl]["checked"] += 1

                # prune: non generare figli oltre H_max
                if lvl >= H_max:
                    # verifico se è soluzione su dominio ridotto 
                    if CHECK(h, N_list_prime, M_prime_list):
                        solutions_reduced.add(h)
                    continue

                # verifica soluzione su dominio ridotto
                if CHECK(h, N_list_prime, M_prime_list):
                    solutions_reduced.add(h)
                    try:
                        current.remove(h)
                    except ValueError:
                        pass

                elif h == empty_hypothesis(len_Mp):
                    children = GENERATE_CHILDREN(h, current)
                    nxt_level = lvl + 1
                    level_stats.setdefault(nxt_level, {"generated": 0, "checked": 0})
                    level_stats[nxt_level]["generated"] += len(children)
                    next_level.extend(children)

                elif LM1(h) != 1:
                    h_s_2 = globalInitial(h)
                    # filtra current
                    current = [h_r for h_r in current if binario(h_r) <= binario(h_s_2)]

                    if current:
                        h_p = current[0]
                        if h_p != h:
                            children = GENERATE_CHILDREN(h, current)
                            nxt_level = lvl + 1
                            level_stats.setdefault(nxt_level, {"generated": 0, "checked": 0})
                            level_stats[nxt_level]["generated"] += len(children)
                            next_level = MERGE(next_level, children)

            # livello successivo esplorazione
            current = next_level
            stats["level_stats"] = level_stats

            # controllo interruzione per statistiche
            if interrupted: 
                stats["interrupted"] = True
                stats["interrupted_level"] = interrupted_level
                break

    except Exception as e:
        interrupted = True
        stats["exception"] = repr(e)

    # --- end tracking ---
    elapsed, peak_kb, rss_kb = stop_perf_tracking(start_time)
    stats["end_time"] = time.perf_counter()
    stats["end_time_str"] = time.ctime()
    stats["elapsed_seconds"] = elapsed
    stats["peak_mem_kb"] = peak_kb
    stats["rss_kb"] = rss_kb
    stats["interrupted"] = interrupted
    stats["interrupted_level"] = interrupted_level
    stats["level_stats"] = level_stats

    # --- estrai MHS e rialza su dominio M originale ---
    minimal_solutions_reduced = [h for h in solutions_reduced if is_minimal(h, solutions_reduced)]
    minimal_solutions_full = [lift_solution_to_full(h, len_M, old_to_new) for h in minimal_solutions_reduced]

    stats["total_solutions_found"] = len(minimal_solutions_full)
    if minimal_solutions_full:
        cards = [card(s) for s in minimal_solutions_full]
        stats["min_cardinality"] = min(cards)
        stats["max_cardinality"] = max(cards)
    else:
        stats["min_cardinality"] = 0
        stats["max_cardinality"] = 0

    # --- scrivi output .mhs --- -- SOLO SE HA TROVATO ALMENO UNA SOLUZIONE
        
    os.makedirs(output_dir, exist_ok=True)
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    output_path = os.path.join(output_dir, base_name + ".mhs")

    # indici delle colonne rimosse (1-based)
    removed_1based = [i + 1 for i in removed_indices] if removed_indices else []

    write_mhs_file(output_path, os.path.basename(input_path), M, minimal_solutions_full, stats, removed_1based)

    return minimal_solutions_full, output_path, stats

Versione corretta ma teniamo anche l'altra si sa mai

In [463]:
# AGGIORNATA
def MAIN_COMPITO_2(input_path: str, max_seconds: float = None, output_dir: str = "outputs"):
    """
    Wrapper MAIN:
      - legge M,N dal file (parse_matrix_file)
      - riduce dominio a M'
      - esegue la ricerca MHS su M' (utilizza le funzioni CHECK/GENERATE_CHILDREN/... già presenti)
      - registra statistiche temporali/spaziali e per-livello
      - scrive file .mhs in output_dir con soluzioni rialzate su M
    :param input_path: percorso completo del file .matrix di input
    :param max_seconds: timeout in secondi (None = nessun timeout)
    :param output_dir: directory di output per il file .mhs
    :return: (minimal_solutions_full, output_path, stats)
    """
    # --- parsing del file di input e riduzione delle colonne vuote ---
    M, N = parse_matrix_files(input_path)          # M: List[str], N: List[Set[str]]
    len_M = len(M)
    len_N = len(N)

    M_prime, N_prime, removed_indices, old_to_new, new_to_old = reduce_domain_remove_empty_cols(M, N)
    len_Mp = len(M_prime)

    # Livello H_max = max(len_N, len_Mp)
    H_max = max(len_N, len_Mp)

    # Conversione liste per la funzione CHECK
    N_list_prime = [list(s) for s in N_prime]
    M_prime_list = list(M_prime)

    # dizionario per statistiche
    stats: Dict = {
        "input_file": os.path.basename(input_path),
        "len_M": len_M,
        "len_Mp": len_Mp,
        "len_N": len_N,
        "removed_indices": removed_indices,
        "H_max": H_max,
        "start_time": None,
        "end_time": None,
        "start_time_str": None,
        "end_time_str": None,
        "elapsed_seconds": None,
        "peak_mem_kb": None,
        "rss_kb": None,
        "interrupted": False,
        "interrupted_level": None,
        "max_seconds": max_seconds,
        "level_stats": {},   # livello -> {'generated', 'checked'}
        "total_solutions_found": 0,
        "min_cardinality": None,
        "max_cardinality": None,
    }

    # --- start perf tracking ---
    start_time = start_perf_tracking()
    stats["start_time"] = start_time
    stats["start_time_str"] = time.ctime()

    h_0 = empty_hypothesis(len_Mp)
    current: List[Hypothesis] = [h_0]
    solutions_reduced: Set[Hypothesis] = set()

    # timeout flags
    interrupted = False
    interrupted_level = None

    level_stats: Dict[int, Dict[str, int]] = {} 

    # --- ciclo principale algoritmo ---
    try:
        while current:
            next_level: List[Hypothesis] = []

            for h in list(current):
                # timeout
                if max_seconds and (time.perf_counter() - start_time) > max_seconds:
                    interrupted = True
                    interrupted_level = card(h)
                    current = []  # forza uscita
                    break

                lvl = card(h)
                level_stats.setdefault(lvl, {"generated": 0, "checked": 0})
                level_stats[lvl]["checked"] += 1

                # prune: non generare figli oltre H_max
                if lvl >= H_max:
                    if CHECK(h, N_list_prime, M_prime_list):
                        solutions_reduced.add(h)
                    continue

                # verifica soluzione su dominio ridotto
                if CHECK(h, N_list_prime, M_prime_list):
                    solutions_reduced.add(h)
                    try:
                        current.remove(h)
                    except ValueError:
                        pass

                elif h == empty_hypothesis(len_Mp):
                    children = GENERATE_CHILDREN(h, current)
                    nxt_level = lvl + 1
                    level_stats.setdefault(nxt_level, {"generated": 0, "checked": 0})
                    level_stats[nxt_level]["generated"] += len(children)
                    next_level.extend(children)

                elif LM1(h) != 1:
                    h_s_2 = globalInitial(h)
                    current = [h_r for h_r in current if binario(h_r) <= binario(h_s_2)]

                    if current:
                        h_p = current[0]
                        if h_p != h:
                            children = GENERATE_CHILDREN(h, current)
                            nxt_level = lvl + 1
                            level_stats.setdefault(nxt_level, {"generated": 0, "checked": 0})
                            level_stats[nxt_level]["generated"] += len(children)
                            next_level = MERGE(next_level, children)

            current = next_level
            stats["level_stats"] = level_stats

            if interrupted: 
                stats["interrupted"] = True
                stats["interrupted_level"] = interrupted_level
                break

    except Exception as e:
        interrupted = True
        stats["exception"] = repr(e)

    # --- end tracking ---
    elapsed, peak_kb, rss_kb = stop_perf_tracking(start_time)
    stats["end_time"] = time.perf_counter()
    stats["end_time_str"] = time.ctime()
    stats["elapsed_seconds"] = elapsed
    stats["peak_mem_kb"] = peak_kb
    stats["rss_kb"] = rss_kb
    stats["interrupted"] = interrupted
    stats["interrupted_level"] = interrupted_level
    stats["level_stats"] = level_stats

    # --- estrai MHS e rialza su dominio M originale ---
    minimal_solutions_reduced = [h for h in solutions_reduced if is_minimal(h, solutions_reduced)]
    minimal_solutions_full = [lift_solution_to_full(h, len_M, old_to_new) for h in minimal_solutions_reduced]

    # --- ORDINA LE SOLUZIONI IN ORDINE DECRESCENTE BINARIO (sx→dx) ---
    minimal_solutions_full.sort(key=lambda x: int("".join(str(b) for b in x), 2), reverse=True)

    stats["total_solutions_found"] = len(minimal_solutions_full)
    if minimal_solutions_full:
        cards = [card(s) for s in minimal_solutions_full]
        stats["min_cardinality"] = min(cards)
        stats["max_cardinality"] = max(cards)
    else:
        stats["min_cardinality"] = 0
        stats["max_cardinality"] = 0

    # --- scrivi output .mhs ---
    os.makedirs(output_dir, exist_ok=True)
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    output_path = os.path.join(output_dir, base_name + ".mhs")

    removed_1based = [i + 1 for i in removed_indices] if removed_indices else []

    write_mhs_file(output_path, os.path.basename(input_path), M, minimal_solutions_full, stats, removed_1based)

    return minimal_solutions_full, output_path, stats


In [464]:
# # codice per file singolo

# # --- parsing e riduzione dominio ---
# # M, N = parse_matrix_files("../benchmarks1/74L85.000.matrix")
# # M, N = parse_matrix_files("../benchmarks1/74L85.008.matrix")
# M, N = parse_matrix_files("../benchmarks1/74L85.002.matrix")
# M_prime, N_prime, removed_indices, old_to_new, new_to_old = reduce_domain_remove_empty_cols(M, N)
# print("len(M)=", len(M), "len(M')=", len(M_prime), "len(N)=", len(N))
# print("removed (1-based):", [i+1 for i in removed_indices])

# # MAIN con output e timeout impostato a 60s
# # solutions, outpath, stats = MAIN("../benchmarks1/74L85.000.matrix", max_seconds=60, output_dir="outputs")
# solutions, outpath, stats = MAIN_COMPITO_2("../benchmarks1/74L85.002.matrix", max_seconds=120, output_dir="outputs")
# print("Output scritto in:", outpath)
# print("Soluzioni trovate:", len(solutions))

# for s in solutions:
#     elementi = [M[i] for i, bit in enumerate(s) if bit == 1]
#     print(f"{s} -> {elementi}")

# print("Stats elapsed (s):", stats['elapsed_seconds'])

-----------------------------------------------------------------------------------------------------

Faccio le sperimentazioni su:
-benchmark1: per ogni classe di file

c1908: 98 file
74181: 54 file
c5315: 248 file
c2670: 168 file
c1355: 98 file
c880: 198 file
74L85: 28 file
c7552: 176 file
c499: 141 file
74283: 30 file
c432: 45 file
74182: 50 file
c3540: 36 file
c6288: 1 file

scelgo una media ragionevole dei file da analizzare
e scrivo i risultati in outputs/compito_2/benchmarks1/classe/{un file.txt per ogni file matrix}

-benchmark2: per ogni classe di file:
c1908: 127 file
74181: 54 file
c5315: 248 file
c2670: 168 file
c1355: 98 file
c880: 198 file
74L85: 28 file
c7552: 176 file
c499: 141 file
74283: 30 file
c432: 45 file
74182: 50 file
c3540: 36 file
c6288: 1 file

scelgo una media ragionevole dei file da analizzare
chiamo il metodo delle permutazioni su ciascun file di ciascuna classe
e scrivo i risultati in outputs/compito_2/benchmarks2/classe/{un file.txt per ogni file matrix}




In [465]:
benchmarcks_1_classes = { "74181": 20, "c5315": 20, "c2670": 20, "c1355": 20, "c880": 20, "74L85": 20, "c7552": 20, "c499": 20, "74283": 20, "c432": 20, "74182": 20, "c3540": 20, "c6288": 1, }

---

Connessione al broker MQTT e sottoscrizione al topic MESSAGGI. Quando gli arriva un messaggio da parte della Web App, lo intercetta e si ferma

Aggiunto controllo stop flag in GENERATE_CHILDREN

In [466]:
# def GENERATE_CHILDREN(h: Hypothesis, current: List[Hypothesis]):
#     global stop_flag
#     children = set()

#     # Controllo STOP immediato
#     if stop_flag:
#         return list(children)

#     # Caso: ipotesi vuota → genera tutti i vicini a distanza 1
#     if h == empty_hypothesis(len(h)):
#         for i in range(len(h)):
#             if stop_flag:
#                 break
#             if h[i] == 0:
#                 h1 = list(h)
#                 h1[i] = 1
#                 SET_FIELDS(h1)
#                 children.add(tuple(h1))
#         return list(children)

#     # Mappa per accesso rapido agli indici
#     index_map = {tuple(hyp): idx for idx, hyp in enumerate(current)}

#     for i in range(LM1(h) - 1):
#         if stop_flag:
#             break
#         if h[i] == 0:
#             h1 = list(h)
#             h1[i] = 1
#             SET_FIELDS(h1)
#             PROPAGATE(h, h1)

#             h_i = initial(h, h1)
#             h_f = final(h, h1)
#             count = 0

#             # prendo indici in sicurezza
#             start_idx = index_map.get(tuple(h_f), 0)
#             end_idx   = index_map.get(tuple(h_i), len(current) - 1)

#             # scorro su una sottolista di current SENZA modificarla
#             for hp in current[start_idx:end_idx + 1]:
#                 if stop_flag:
#                     break
#                 if dist(hp, h1) == 1 and dist(hp, h) == 2:
#                     PROPAGATE(hp, h1)
#                     count += 1

#             if count == card(h) and tuple(h1) != tuple(h):
#                 children.add(tuple(h1))

#     return list(children)


In [467]:
# def MAIN_COMPITO_2(input_path: str, max_seconds: float = None, output_dir: str = "outputs"):
#     global stop_flag
#     """
#     Wrapper MAIN:
#       - legge M,N dal file (parse_matrix_file)
#       - riduce dominio a M'
#       - esegue la ricerca MHS su M' (utilizza le funzioni CHECK/GENERATE_CHILDREN/... già presenti)
#       - registra statistiche temporali/spaziali e per-livello
#       - scrive file .mhs in output_dir con soluzioni rialzate su M
#     :param input_path: percorso completo del file .matrix di input
#     :param max_seconds: timeout in secondi (None = nessun timeout)
#     :param output_dir: directory di output per il file .mhs
#     :return: (minimal_solutions_full, output_path, stats)
#         - minimal_solutions_full: lista di tuple rappresentanti le soluzioni su M completo
#         - output_path: percorso completo del file .mhs generato
#         - stats: dizionario con statistiche della computazione
#     """

#     # --- parsing del file di input e riduzione delle colonne vuote ---
#     M, N = parse_matrix_files(input_path)          # M: List[str], N: List[Set[str]]
#     len_M = len(M)
#     len_N = len(N)

#     M_prime, N_prime, removed_indices, old_to_new, new_to_old = reduce_domain_remove_empty_cols(M, N)
#     len_Mp = len(M_prime)

#     # Livello H_max = max(len_N, len_Mp)
#     H_max = max(len_N, len_Mp)

#     # Conversione liste per la funzione CHECK
#     N_list_prime = [list(s) for s in N_prime]
#     M_prime_list = list(M_prime)

#     # dizionario per statistiche
#     stats: Dict = {
#         "input_file": os.path.basename(input_path),
#         "len_M": len_M,
#         "len_Mp": len_Mp,
#         "len_N": len_N,
#         "removed_indices": removed_indices,
#         "H_max": H_max,
#         "start_time": None,
#         "end_time": None,
#         "start_time_str": None,
#         "end_time_str": None,
#         "elapsed_seconds": None,
#         "peak_mem_kb": None,
#         "rss_kb": None,
#         "interrupted": False,
#         "interrupted_level": None,
#         "max_seconds": max_seconds,
#         "level_stats": {},   # livello -> {'generated', 'checked'}
#         "total_solutions_found": 0,
#         "min_cardinality": None,
#         "max_cardinality": None,
#     }

#     # --- start perf tracking ---
#     start_time = start_perf_tracking()
#     stats["start_time"] = start_time
#     stats["start_time_str"] = time.ctime()


#     h_0 = empty_hypothesis(len_Mp)
#     current: List[Hypothesis] = [h_0]
#     solutions_reduced: Set[Hypothesis] = set()

#     # timeout flags
#     interrupted = False
#     interrupted_level = None

#     level_stats: Dict[int, Dict[str, int]] = {} 

#     # --- ciclo principale algoritmo ---
#     try:
#         while current:
#             #Controllo se c'è lo STOP
#             if stop_flag: 
#                 print("STOP ricevuto, interrompo il benchmark!")
#                 interrupted = True
#                 break

#             next_level: List[Hypothesis] = []

#             for h in list(current):
#                 if stop_flag:
#                     interrupted = True
#                     break
#                 # timeout
#                 if max_seconds and (time.perf_counter() - start_time) > max_seconds:
#                     interrupted = True
#                     interrupted_level = card(h)
#                     current = []  # forza uscita
#                     break

#                 lvl = card(h)
#                 level_stats.setdefault(lvl, {"generated": 0, "checked": 0})
#                 level_stats[lvl]["checked"] += 1

#                 # prune: non generare figli oltre H_max
#                 if lvl >= H_max:
#                     # verifico se è soluzione su dominio ridotto 
#                     if CHECK(h, N_list_prime, M_prime_list):
#                         solutions_reduced.add(h)
#                     continue

#                 # verifica soluzione su dominio ridotto
#                 if CHECK(h, N_list_prime, M_prime_list):
#                     solutions_reduced.add(h)
#                     try:
#                         current.remove(h)
#                     except ValueError:
#                         pass

#                 elif h == empty_hypothesis(len_Mp):
#                     children = GENERATE_CHILDREN(h, current)
#                     nxt_level = lvl + 1
#                     level_stats.setdefault(nxt_level, {"generated": 0, "checked": 0})
#                     level_stats[nxt_level]["generated"] += len(children)
#                     next_level.extend(children)

#                 elif LM1(h) != 1:
#                     h_s_2 = globalInitial(h)
#                     # filtra current
#                     current = [h_r for h_r in current if binario(h_r) <= binario(h_s_2)]

#                     if current:
#                         h_p = current[0]
#                         if h_p != h:
#                             children = GENERATE_CHILDREN(h, current)
#                             nxt_level = lvl + 1
#                             level_stats.setdefault(nxt_level, {"generated": 0, "checked": 0})
#                             level_stats[nxt_level]["generated"] += len(children)
#                             next_level = MERGE(next_level, children)

#             # livello successivo esplorazione
#             current = next_level
#             stats["level_stats"] = level_stats

#             # controllo interruzione per statistiche
#             if interrupted: 
#                 stats["interrupted"] = True
#                 stats["interrupted_level"] = interrupted_level
#                 break

#     except Exception as e:
#         interrupted = True
#         stats["exception"] = repr(e)

#     # --- end tracking ---
#     elapsed, peak_kb, rss_kb = stop_perf_tracking(start_time)
#     stats["end_time"] = time.perf_counter()
#     stats["end_time_str"] = time.ctime()
#     stats["elapsed_seconds"] = elapsed
#     stats["peak_mem_kb"] = peak_kb
#     stats["rss_kb"] = rss_kb
#     stats["interrupted"] = interrupted
#     stats["interrupted_level"] = interrupted_level
#     stats["level_stats"] = level_stats

#     # --- estrai MHS e rialza su dominio M originale ---
#     minimal_solutions_reduced = [h for h in solutions_reduced if is_minimal(h, solutions_reduced)]
#     minimal_solutions_full = [lift_solution_to_full(h, len_M, old_to_new) for h in minimal_solutions_reduced]

#     stats["total_solutions_found"] = len(minimal_solutions_full)
#     if minimal_solutions_full:
#         cards = [card(s) for s in minimal_solutions_full]
#         stats["min_cardinality"] = min(cards)
#         stats["max_cardinality"] = max(cards)
#     else:
#         stats["min_cardinality"] = 0
#         stats["max_cardinality"] = 0

#     # --- scrivi output .mhs ---
#     os.makedirs(output_dir, exist_ok=True)
#     base_name = os.path.splitext(os.path.basename(input_path))[0]
#     output_path = os.path.join(output_dir, base_name + ".mhs")

#     # indici delle colonne rimosse (1-based)
#     removed_1based = [i + 1 for i in removed_indices] if removed_indices else []

#     write_mhs_file(output_path, os.path.basename(input_path), M, minimal_solutions_full, stats, removed_1based)

#     return minimal_solutions_full, output_path, stats

PARTE CONNESSIONE CON BROKER MQTT CON METODI NUOVI AGGIORNATI

In [468]:
# Versione corretta
def GENERATE_CHILDREN_aggiornato(h: Hypothesis, current: List[Hypothesis]) -> List[Hypothesis]:
    global stop_flag
    """
    Genera i figli dell'ipotesi h in ordine da sinistra a destra,
    seguendo la logica del metodo Node.generate_children, e mantenendo SET_FIELDS.
    """
    children = []

    # Controllo STOP immediato
    if stop_flag:
        return children

    # Caso iniziale: ipotesi vuota → genera tutti i vicini a distanza 1
    if h == empty_hypothesis(len(h)):
        for i in range(len(h)):
            if stop_flag:
                break
            if h[i] == 0:
                h1 = list(h)
                h1[i] = 1
                SET_FIELDS(h1)
                children.append(tuple(h1))
        return children

    # --- Costruisco una mappa delle posizioni delle ipotesi correnti
    index_map = {tuple(hyp): idx for idx, hyp in enumerate(current)}

    # --- Ciclo da sinistra a destra
    for i in range(LM1(h) - 1):
        if stop_flag:
            break
        if h[i] == 0:
            h1 = list(h)
            h1[i] = 1
            SET_FIELDS(h1)
            h1 = tuple(PROPAGATE(h, h1))

            # Calcolo hi (iniziale) e hf (finale)
            hi = initial(h, h1)
            hf = final(h, h1)

            # Recupero posizioni in current
            try:
                pos_hi = index_map[tuple(hi)]
            except KeyError:
                pos_hi = len(current) - 1
            try:
                pos_hf = index_map[tuple(hf)]
            except KeyError:
                pos_hf = 0

            count = 0
            for hp in current[pos_hf:pos_hi + 1]:
                if stop_flag:
                    break
                if dist(hp, h1) == 1 and dist(hp, h) == 2:
                    h1 = tuple(PROPAGATE(hp, h1))
                    count += 1

            # Condizione di ammissibilità 
            # if count == card(h) and tuple(h1) != tuple(h):
            #     children.append(tuple(h1))

            if count == card(h):
                children.append(tuple(h1))

    # Ordina in (sinistra → destra)
    children.sort(key=binario)
    return children

In [469]:
# AGGIORNATA
def MAIN_COMPITO_2_aggiornato(input_path: str, max_seconds: float = None, output_dir: str = "outputs"):
    global stop_flag
    """
    Wrapper MAIN:
      - legge M,N dal file (parse_matrix_file)
      - riduce dominio a M'
      - esegue la ricerca MHS su M' (utilizza le funzioni CHECK/GENERATE_CHILDREN/... già presenti)
      - registra statistiche temporali/spaziali e per-livello
      - scrive file .mhs in output_dir con soluzioni rialzate su M
    :param input_path: percorso completo del file .matrix di input
    :param max_seconds: timeout in secondi (None = nessun timeout)
    :param output_dir: directory di output per il file .mhs
    :return: (minimal_solutions_full, output_path, stats)
    """
    # --- parsing del file di input e riduzione delle colonne vuote ---
    M, N = parse_matrix_files(input_path)          # M: List[str], N: List[Set[str]]
    len_M = len(M)
    len_N = len(N)

    M_prime, N_prime, removed_indices, old_to_new, new_to_old = reduce_domain_remove_empty_cols(M, N)
    len_Mp = len(M_prime)

    # Livello H_max = max(len_N, len_Mp)
    H_max = max(len_N, len_Mp)

    # Conversione liste per la funzione CHECK
    N_list_prime = [list(s) for s in N_prime]
    M_prime_list = list(M_prime)

    # dizionario per statistiche
    stats: Dict = {
        "input_file": os.path.basename(input_path),
        "len_M": len_M,
        "len_Mp": len_Mp,
        "len_N": len_N,
        "removed_indices": removed_indices,
        "H_max": H_max,
        "start_time": None,
        "end_time": None,
        "start_time_str": None,
        "end_time_str": None,
        "elapsed_seconds": None,
        "peak_mem_kb": None,
        "rss_kb": None,
        "interrupted": False,
        "interrupted_level": None,
        "max_seconds": max_seconds,
        "level_stats": {},   # livello -> {'generated', 'checked'}
        "total_solutions_found": 0,
        "min_cardinality": None,
        "max_cardinality": None,
    }

    # --- start perf tracking ---
    start_time = start_perf_tracking()
    stats["start_time"] = start_time
    stats["start_time_str"] = time.ctime()

    h_0 = empty_hypothesis(len_Mp)
    current: List[Hypothesis] = [h_0]
    solutions_reduced: Set[Hypothesis] = set()

    # timeout flags
    interrupted = False
    interrupted_level = None

    level_stats: Dict[int, Dict[str, int]] = {} 

    # --- ciclo principale algoritmo ---
    try:
        while current:
            #Controllo se c'è lo STOP
            if stop_flag: 
                print("STOP ricevuto, interrompo il benchmark!")
                interrupted = True
                break

            next_level: List[Hypothesis] = []

            for h in list(current):
                if stop_flag:
                    interrupted = True
                    break
                # timeout
                if max_seconds and (time.perf_counter() - start_time) > max_seconds:
                    interrupted = True
                    interrupted_level = card(h)
                    current = []  # forza uscita
                    break

                lvl = card(h)
                level_stats.setdefault(lvl, {"generated": 0, "checked": 0})
                level_stats[lvl]["checked"] += 1

                # prune: non generare figli oltre H_max
                if lvl >= H_max:
                    if CHECK(h, N_list_prime, M_prime_list):
                        solutions_reduced.add(h)
                    continue

                # verifica soluzione su dominio ridotto
                if CHECK(h, N_list_prime, M_prime_list):
                    solutions_reduced.add(h)
                    try:
                        current.remove(h)
                    except ValueError:
                        pass

                elif h == empty_hypothesis(len_Mp):
                    children = GENERATE_CHILDREN_aggiornato(h, current)
                    nxt_level = lvl + 1
                    level_stats.setdefault(nxt_level, {"generated": 0, "checked": 0})
                    level_stats[nxt_level]["generated"] += len(children)
                    next_level.extend(children)

                elif LM1(h) != 1:
                    h_s_2 = globalInitial(h)
                    current = [h_r for h_r in current if binario(h_r) <= binario(h_s_2)]

                    if current:
                        h_p = current[0]
                        if h_p != h:
                            children = GENERATE_CHILDREN_aggiornato(h, current)
                            nxt_level = lvl + 1
                            level_stats.setdefault(nxt_level, {"generated": 0, "checked": 0})
                            level_stats[nxt_level]["generated"] += len(children)
                            next_level = MERGE(next_level, children)

            current = next_level
            stats["level_stats"] = level_stats

            if interrupted: 
                stats["interrupted"] = True
                stats["interrupted_level"] = interrupted_level
                break

    except Exception as e:
        interrupted = True
        stats["exception"] = repr(e)

    # --- end tracking ---
    elapsed, peak_kb, rss_kb = stop_perf_tracking(start_time)
    stats["end_time"] = time.perf_counter()
    stats["end_time_str"] = time.ctime()
    stats["elapsed_seconds"] = elapsed
    stats["peak_mem_kb"] = peak_kb
    stats["rss_kb"] = rss_kb
    stats["interrupted"] = interrupted
    stats["interrupted_level"] = interrupted_level
    stats["level_stats"] = level_stats

    # --- estrai MHS e rialza su dominio M originale ---
    minimal_solutions_reduced = [h for h in solutions_reduced if is_minimal(h, solutions_reduced)]
    minimal_solutions_full = [lift_solution_to_full(h, len_M, old_to_new) for h in minimal_solutions_reduced]

    # --- ORDINA LE SOLUZIONI IN ORDINE DECRESCENTE BINARIO (sx→dx) ---
    minimal_solutions_full.sort(key=lambda x: int("".join(str(b) for b in x), 2), reverse=True)

    stats["total_solutions_found"] = len(minimal_solutions_full)
    if minimal_solutions_full:
        cards = [card(s) for s in minimal_solutions_full]
        stats["min_cardinality"] = min(cards)
        stats["max_cardinality"] = max(cards)
    else:
        stats["min_cardinality"] = 0
        stats["max_cardinality"] = 0

    # --- scrivi output .mhs ---    
    if stats["total_solutions_found"] > 0:
        os.makedirs(output_dir, exist_ok=True)
        base_name = os.path.splitext(os.path.basename(input_path))[0]
        output_path = os.path.join(output_dir, base_name + ".mhs")

        removed_1based = [i + 1 for i in removed_indices] if removed_indices else []

        write_mhs_file(output_path, os.path.basename(input_path), M, minimal_solutions_full, stats, removed_1based)

        return minimal_solutions_full, output_path, stats
    else:
        return [], None, stats


Gestione RISPOSTA che mando alla WebApp

In [471]:
import datetime
import json
def risposta(comando_ricevuto, GUID):

    global benchmark_in_corso, benchmark_owner

    #Se il benchmarck è fermo 
    if not benchmark_in_corso:
        if comando_ricevuto == 1: #start
            esito = "COMANDO ACCETTATO"
        elif comando_ricevuto == 2:
            esito = "COMANDO RIFIUTATO"
        else:
            esito = "COMANDO RIFIUTATO" 

    #Se sil benchmarck è in corso e ottengo START --> esito: no e rimango in = RUNNING
    else:
        if benchmark_in_corso:
            if comando_ricevuto == 1:
                esito = "COMANDO RIFIUTATO"
            elif comando_ricevuto == 2:
            #ricevo uno STOP 
                esito = "COMANDO ACCETTATO"
            else:
                esito = "COMANDO RIFIUTATO" 

  
    return esito


In [482]:
import paho.mqtt.client as mqtt
from pathlib import Path
import os, random
import sys
import json
import threading

stop_flag = False  # variabile globale
start_flag = False


def on_message(client, userdata, msg):
    global stop_flag
    global start_flag
    global stato_corrente
    global benchmark_in_corso
    global benchmark_owner

    payload = msg.payload.decode()

    data = json.loads(payload)
    codice_comando = data["codice_comando"]
    GUID = data['GUID']

    #print(f"Messaggio ricevuto: {payload}")
    esito = risposta(codice_comando, GUID)
    


    if esito == "COMANDO ACCETTATO": 
        # Se un benchmark è già in corso e arriva un START
        if codice_comando == 1:
            if benchmark_in_corso:
                # Rispondo con rifiuto
                ANSW = {
                    "Stato corrente": stato_corrente,
                    "esito": "COMANDO RIFIUTATO"
                }
                client.publish(f"ANSWER/{GUID}", json.dumps(ANSW))
                print(f"[{GUID}] START rifiutato, benchmark già in corso")
                return
            
            # Se un benchmark NON è in corso e arriva un START
            else:
                start_flag = True
                benchmark_in_corso = True
                benchmark_owner = GUID  

                # Rispondo positivo
                ANSW = {
                    "Stato corrente": "RUNNING",
                    "esito": "COMANDO ACCETTATO"
                }

                #Aggiorno lo stato corrente
                stato_corrente = ANSW['Stato corrente']

                client.publish(f"ANSWER/{GUID}", json.dumps(ANSW))
                print(f"[{GUID}] COMANDO START ricevuto")


                # Avvia il benchmark in un thread separato con wrapper
                threading.Thread(
                    target=run_benchmark_wrapper,
                    args=(GUID, client),
                    daemon=True
                ).start()
        
       
        elif codice_comando == 2: #Ricevo lo STOP
            if not benchmark_in_corso:

                ANSW = {
                    "Stato corrente": "STAND_BY",
                    "esito": "COMANDO RIFIUTATO"
                }

                client.publish(f"ANSWER/{GUID}", json.dumps(ANSW))
                print(f"[{GUID}] COMANDO STOP rifiutato, NESSUN BENCHMARCK IN CORSO")
                return

            else: #se c'è un becnhmarck in corso, solo chi lo ha avviato può terminarlo
                if GUID != benchmark_owner:
                    stop_flag = False
                
                    ANSW = {
                        "Stato corrente": "STAND_BY",
                        "esito": "COMANDO RIFIUTATO: NON AUTORIZZATO"
                    }
                    client.publish(f"ANSWER/{GUID}", json.dumps(ANSW))
                    print(f"[{GUID}] STOP rifiutato: benchmark avviato da {benchmark_owner}")
                    return
        
                else:
                    stop_flag = True
                    benchmark_in_corso = False
                    benchmark_owner = None
                    stato_corrente = "STAND_BY"

                    ANSW = {
                        "Stato corrente": stato_corrente,
                        "esito": "COMANDO ACCETTATO"
                    }
                    client.publish(f"ANSWER/{GUID}", json.dumps(ANSW))
                    print(f"[{GUID}] COMANDO STOP accettato → benchmark interrotto")
    else:
        #COMANDO RIFIUTATO
        ANSW = {
                "Stato corrente": "STAND_BY",
                "esito": "COMANDO RIFIUTATO"
                }
        
        client.publish(f"ANSWER/{GUID}", json.dumps(ANSW))

    #client.publish("ANSWER", json.dumps(ANSW))
    #client.publish(f"ANSWER/{GUID}", json.dumps(ANSW))



def run_benchmark_wrapper(GUID, client):
    global benchmark_in_corso, stop_flag
    try:
        run_benchmark("../benchmarks1", "output/compito_2/benchmarks1", benchmarcks_1_classes)
    finally:
        benchmark_in_corso = False  # libero il benchmark
        stop_flag = False  # reset stop
        ANSW = {
            "Stato corrente": stato_corrente,
            "esito": "ELABORAZIONE TERMINATA"
        }
        client.publish(f"ANSWER/{GUID}", json.dumps(ANSW))
        print(f"[{GUID}] Elaborazione completata, benchmark libero")


import shutil
def run_benchmark(benchmark_folder: str, output_root: str, classes: dict):
    global stop_flag

    if os.path.exists("output"):
        shutil.rmtree("output")  # cancella la cartella e il suo contenuto
        os.makedirs("output")    # la ricrea vuota

        print("\nCartella svuotata con successo -- INIZIO NUOVA ELABORAZIONE --\n")

    
    benchmark_folder = Path(benchmark_folder)
    output_root = Path(output_root)

    for classe, n_files in classes.items():
        if stop_flag: 
            print("Esecuzione sul benchmark interrotta!")
            return

        all_files = list(benchmark_folder.glob(f"{classe}.*.matrix"))
        if not all_files:
            continue
        
        selected_files = random.sample(all_files, min(n_files, len(all_files)))

        for file in selected_files:
            if stop_flag:
                print("Esecuzione interrotta durante l'elaborazione dei file!")
                return

            # Esecuzione algoritmo Main
            sols, path, stats = MAIN_COMPITO_2_aggiornato(file, max_seconds=120)

            # Crea le cartelle solo se ci sono soluzioni
            if sols:
                output_folder = output_root / classe
                output_folder_file = output_folder / file.stem
                os.makedirs(output_folder_file, exist_ok=True)
                # Ora scrivi l'output nel posto giusto
                write_mhs_file(
                    output_folder_file / f"{file.stem}.mhs",
                    os.path.basename(file),
                    [],
                    sols,
                    stats,
                    []
                )


In [483]:
client = mqtt.Client()
client.on_message = on_message

  client = mqtt.Client()


In [484]:
import socket

def get_local_ip():
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # l'indirizzo non deve essere raggiungibile; serve solo per ottenere l'IP locale
        s.connect(("8.8.8.8", 80))
        return s.getsockname()[0]
    except Exception:
        return "127.0.0.1"
    finally:
        s.close()

if __name__ == "__main__":
    print(get_local_ip())


192.168.1.33


In [476]:
myIP = get_local_ip()


In [487]:
stop_flag = False
start_flag = False
benchmark_in_corso = False
benchmark_owner = None

stato_corrente = "STAND_BY"

#client.connect("192.168.1.22", 1883)

client.connect(myIP, 1883)

client.subscribe("MESSAGGI")

try:
    client.loop_forever()
except KeyboardInterrupt:
    print("\n🛑 Client MQTT fermato manualmente.")
    client.disconnect()
    print("Connessione chiusa con successo.")




🛑 Client MQTT fermato manualmente.
Connessione chiusa con successo.
