In [1]:
import numpy as np
import copy
import random

In [2]:
class GA:
    def __init__(self):
        self.crossover_prob = 0.7
        self.mutation_prob = 0.2
        self.ring_size = 3
        self.buffer = {}
    
    def _crossover(self, chromosome1, chromosome2):
            genes1 = chromosome1[1]
            genes2 = chromosome2[1]
            genes = np.random.choice(len(genes1), size=2, replace=False)
            genes.sort()
            child1_genes = genes1[:]
            child2_genes = genes2[:]
            for i in range(genes[0], genes[1] + 1):
                temp_read1 = child1_genes[i][1][:]
                temp_read2 = child2_genes[i][1][:]
                child1_genes[i] = (genes2[i][0], temp_read2)
                child2_genes[i] = (genes1[i][0], temp_read1)
            child1 = (0.0, child1_genes)
            child2 = (0.0, child2_genes)
            return child1, child2

    def _mutation(self, population):
            score = population[0]
            individuals = population[1].copy()
            index1, index2 = random.sample(range(len(individuals)), 2)
            individuals[index1], individuals[index2] = individuals[index2], individuals[index1]
            mutated_individuals = [
                (individuo[0], individuo[1]) if i == index1 or i == index2 else individuo
                for i, individuo in enumerate(individuals)
            ]
            mutated_population = (score, mutated_individuals)
            return mutated_population
    
    
    # Funzione di fitness, la quale calcola il punteggio per ogni individuo
    def _fitness(self, cromosoma):
            score = 0
            array_valori = [valori for _, valori in cromosoma]

            for i in range(len(array_valori) - 1):
                finger = array_valori[i]
                finger1 = array_valori[i + 1]
                # print("1 -> ",finger, "2 -> ",finger1)
                # Calcola l'overlap solo se le due sequenze sono identiche

                score += self.findOverlapNew(finger, finger1)
                # print("score", score)
            return score
    
    
    def _evaluatePopulation(self, population, gen):
            scores = np.zeros(len(population))
            for x in range(len(population)):
                scores[x] = self._fitness(population[x])
            return scores
    
    
    def findOverlapNew(self, finger, finger1):
            overlap = []
            min_len = min(len(finger), len(finger1))
            for i in range(1, min_len + 1):
                if finger[-i:] == finger1[:i]:
                    overlap = finger[-i:]
            return sum(overlap)
    
    
    # Usata per calcolare L'overlap tra due letture: return overlap
    def findOverlapGenoma(self, finger1, finger2):
            # print("Finger e finger 1", finger, finger1)
            # print("Siamo nel metodo findOverlapNew")
            overlap = []
            min_len = min(len(finger1), len(finger2))
            for i in range(1, min_len + 1):
                if finger1[-i:] == finger2[:i]:
                    # print(finger[-i:], "\n", finger1[:i])
                    overlap = finger1[-i:]
                    # print("Ancora qui", overlap)
            # print("Overlap individuato tra :",finger, "\n", "e ", finger1, "\n", "e di", sum(overlap))
            # print("somma:", sum(overlap))
            return overlap

In [3]:
def run_ga(self, env, pop, gen, iterazioni):
        pop_size = len(pop)
        population = pop
        pop_fitness = self._evaluatePopulation(population, gen)
        print(pop_fitness)
        c = np.argmax(pop_fitness)
        best_ind = copy.deepcopy(population[c])
        best_fit = self._fitness(best_ind)
        for generation in range(iterazioni):
            print('Iterazione n: ', generation)
            mappa_fitness = []
            for i in range(len(population)):
                individui = population[i]
                fitness_corrispondente = pop_fitness[i]
                tupla = (fitness_corrispondente, individui)
                mappa_fitness.append(tupla)
            sorted_data = sorted(mappa_fitness, key=lambda x: x[0], reverse=True)
            population = sorted_data
            n = len(population) // 2
            population = population[:n]
            for i in range(0, len(population) - 1, 2):
                if np.random.rand() < self.crossover_prob:
                    if i < len(population) - 1:
                        pop1, pop2 = self._crossover(population[i], population[i + 1])
                        population.append(pop1)
                        population.append(pop2)
            for i in range(len(population)):
                if np.random.rand() > self.mutation_prob:
                    population[i] = self._mutation(population[i])
            population.append(copy.deepcopy((0.0, best_ind)))
            population = [individuo for fitness, individuo in population]
            pop_fitness = self._evaluatePopulation(population, gen)
            print(pop_fitness)
            value = pop_fitness.max()
            best_ind = population[pop_fitness.argmax()].copy()
            if value < best_fit:
                best_ind = copy.deepcopy(population[np.argmax(pop_fitness)])
                population[0] = copy.deepcopy(best_ind)
                best_fit = value
            else:
                best_ind = population[pop_fitness.argmax()].copy()
                best_fit = pop_fitness.max()
                indexValue = np.argmax(pop_fitness)
                population[0] = copy.deepcopy(population[indexValue])
        return best_ind

### Funzioni di utilità

In [4]:
# Definizione stub della funzione qlearning
def qlearning(reads, episodes, genome=None, test_each_episode=False):
    factor = 1.0 / (len(reads) * max([len(read) for read in reads]))
    generations = episodes

    _cromosomeInt = []
    # print("------------")
    print(reads)
    sottosequenza_lunghezza = 100

    # Caso in cui abbiamo un dataset diviso in piu letture
    # readsGenoma = ''.join(reads)
    # print(readsGenoma)
    i = 0
    # print(reads)
    genomePartenza = copy.copy(reads)
    reads = createDataset(reads, sottosequenza_lunghezza)
    # print(reads)
    individuo = copy.copy(reads)
    # print(len(count_repeats(reads)))

    # for x in reads:
    # print("Lettura: ", x)

    # Dimensione del marcatore -> da 4 a 8
    countRepeat = 7
    dict = count_repeats(reads, countRepeat)
    # print(dict)
    # print(count_repeats(reads))

    # print("------------")
    marker = []
    # print("Le letture sono:", reads)

    #  Marcatori Indipendenti
    marksIndependent = 6
    max_readss = find_unique_markers(dict, marksIndependent)
    # print('3 marcatori distinti', max_readss)

    # max_readss = find_unique_markers(dict, 2)
    # print('2 marcatori distinti', max_readss)

    markers = chiavi = [sequenza for sequenza, valore in max_readss]
    # print("Combinazione dei marcatori", markers)

    #print("------------")
    results = []
    results2 = []
    for x in range(len(reads)):
        # print("Lettura", reads[x])
        resul = apply_CFL_to_reads(reads[x], markers)
        # risp = apply_CFL_to_reads(reads[x], markers)
        # print("Due marcatori:", risp)
        # print("Tre marcatori:", resul)
        results.append(resul)
        # results2.append(risp)

    # print("Risultato ottenuto:", results)

    _intA = compute_fingerprint_by_list_factors(results)
    # _intB = compute_fingerprint_by_list_factors(results2)
    # print("------------")

    # Popolazione
    num_ind = 1500
    iterazioni = 1500
    ga = GA()

    # Creo una lista di indici
    indices = list(range(len(_intA)))
    # print(_intA)
    indConfront = copy.copy(_intA)
    popolazioni_mescolate = generate_random_populations(_intA, num_ind, len(_intA))
    # popolazioni_mescolate2 = generate_random_populations(_intB, num_ind, len(_intB))
    # print(popolazioni_mescolate)

    # print("----")
    for i, sublist in enumerate(popolazioni_mescolate):
        #print(sublist)
        chiavi = [tupla[0] for tupla in sublist]
        array_di_interi = [tupla[1] for tupla in sublist]
        # Conversione della lista di chiavi in una singola stringa
        stringa_chiavi = ''.join(chiavi)
        chunks = [stringa_chiavi[i:i + len(individuo[0])] for i in range(0, len(stringa_chiavi), len(individuo[0]))]
        # print(chunks)
        # if individuo == chunks:
        # print("Trovato", chunks)


In [None]:
def generate_random_populations(data, num_ind, length):
    # implementation of the function goes here
    pass


In [None]:
def remove_zeros(array):
    no_zeros = [element for element in array if element != 0]
    return no_zeros

In [None]:
def CFL(word, T):
    CFL_list = []
    k = 0
    while k < len(word):
        i = k + 1
        j = k + 2
        while True:
            if j == len(word) + 1 or word[j - 1] < word[i - 1]:
                while k < i:
                    CFL_list.append(word[k:k + j - i])
                    k = k + j - i
                break
            else:
                if word[j - 1] > word[i - 1]:
                    i = k + 1
                else:
                    i = i + 1
                j = j + 1
    return CFL_list

In [None]:
def count_repeats(reads, countRepeat):
    repeats_count = {}
    max_length = len(reads[0])
    for k in range(countRepeat, max_length + 1):
        for read in reads:
            prefix = read[:k]
            repeats_count.setdefault(prefix, 0)
            repeats_count[prefix] += sum(1 for r in reads if prefix in r)
    return repeats_count

In [None]:
def apply_CFL_to_reads(reads, markers):
    cfl_list = []
    marker_indices = []

    # Trova tutti gli indici dei marcatori nella sequenza
    start = 0
    while start < len(reads):
        found_indices = [(reads.find(marker, start), marker) for marker in markers if reads.find(marker, start) != -1]
        if not found_indices:
            break
        idx, marker = min(found_indices, key=lambda x: x[0])
        marker_indices.append((idx, marker))
        start = idx + len(marker)

    if not marker_indices:
        cfl_list.append(reads)
        segments = CFL(reads[:], None)
        result = (reads, segments)
        return result

    # Se la sequenza inizia senza un marcatore, aggiungi la parte iniziale come CFL
    if marker_indices and marker_indices[0][0] > 0:
        cfl_list.append(CFL(reads[:marker_indices[0][0]], None))

    # Calcola le CFL sulla base degli indici dei marcatori
    i = 0
    while i < len(marker_indices) - 1:
        start_idx, current_marker = marker_indices[i]
        next_idx, next_marker = marker_indices[i + 1]

        # Controlla se i marcatori non sono consecutivi
        if start_idx + len(current_marker) <= next_idx:
            cfl_list.append(CFL(reads[start_idx:next_idx], None))

        else:
            # Trova il prossimo marcatore che non si sovrappone
            j = i + 1
            while j < len(marker_indices) and start_idx + len(current_marker) > marker_indices[j][0]:
                j += 1
            if j < len(marker_indices):
                cfl_list.append(CFL(reads[start_idx:marker_indices[j][0]], None))
            else:
                cfl_list.append(CFL(reads[start_idx:], None))
                break
        i += 1

    # Aggiungi l'ultimo segmento se esiste
    if marker_indices:
        last_idx, last_marker = marker_indices[-1]
        if last_idx + len(last_marker) <= len(reads):
            cfl_list.append(CFL(reads[last_idx:], None))

    lista_appiattita = [elemento for sottolista in cfl_list for elemento in sottolista]
    result = (reads, lista_appiattita)
    # print("Lettura: ", reads, "\nMarcatori Utilizzati: ", markers, "\nRappresentazione CFL: ", cfl_list)
    return result

In [None]:
def compute_fingerprint_by_list_factors(original_list):
    oggetto_trasformato = []
    for chiave, lista_stringhe in original_list:
        # Converti ogni stringa nella lista nella sua lunghezza
        lunghezze = [len(stringa) for stringa in lista_stringhe]
        oggetto_trasformato.append((chiave, lunghezze))
    return oggetto_trasformato

In [None]:
def find_unique_markers(reads, num_markers):
    # Ordina le sequenze per frequenza e lunghezza in ordine decrescente
    sorted_reads = sorted(reads.items(), key=lambda x: (-x[1], -len(x[0])))

    unique_markers = []

    for current_marker in sorted_reads:
        # Verifica che il marcatore corrente non sia un sottoinsieme di nessuno dei marcatori selezionati
        is_subset = False
        for selected_marker in unique_markers:
            if current_marker[0] in selected_marker[0] or selected_marker[0] in current_marker[0]:
                is_subset = True
                break
        if not is_subset:
            unique_markers.append(current_marker)
        if len(unique_markers) == num_markers:
            break

    return unique_markers

In [None]:
def generate_random_populations(reads, num_populations, population_size):
    populations = [reads]
    for _ in range(num_populations - 1):
        population = random.sample(reads, population_size)
        populations.append(population)
    return populations

In [None]:
def assemble_genome_with_overlaps(reads):
    # print("reads:", reads)
    genome = reads[0][0]
    # print("qui", genome)
    for i in range(len(reads) - 1):
        current_read, current_overlaps = reads[i]
        current_read2, current_overlaps2 = reads[i + 1]
        # print(current_read, current_overlaps, "\n", current_read2, current_overlaps2)

        lista = GA.findOverlapGenoma(None, current_overlaps, current_overlaps2)
        # print("lista", lista)
        if (len(lista) > 0):
            stringa_concatenata = current_read2[sum(lista):]
            # print("Stringa conc", stringa_concatenata)
            genome += stringa_concatenata
            # print("Genoma", genome)
        else:
            genome += current_read2
            # print("Genoma", genome)
    return genome

In [None]:
def levenshtein(s, t, costs=(1, 1, 1)):
    """
                iterative_levenshtein(s, t) -> ldist
                ldist is the Levenshtein distance between the strings
                s and t.
                For all i and j, dist[i,j] will contain the Levenshtein
                distance between the first i characters of s and the
                first j characters of t

                costs: a tuple or a list with three integers (d, i, s)
                       where d defines the costs for a deletion
                             i defines the costs for an insertion and
                             s defines the costs for a substitution
            """
    rows = len(s) + 1
    cols = len(t) + 1
    deletes, inserts, substitutes = costs

    dist = [[0 for x in range(cols)] for x in range(rows)]
    # source prefixes can be transformed into empty strings
    # by deletions:
    for row in range(1, rows):
        dist[row][0] = row * deletes

    # target prefixes can be created from an empty source string
    # by inserting the characters
    for col in range(1, cols):
        dist[0][col] = col * inserts

    for col in range(1, cols):
        for row in range(1, rows):
            if s[row - 1] == t[col - 1]:
                cost = 0
            else:
                cost = substitutes
            dist[row][col] = min(dist[row - 1][col] + deletes,
                                 dist[row][col - 1] + inserts,
                                 dist[row - 1][col - 1] + cost)  # substitution
    return dist[rows - 1][cols - 1]

In [None]:
def createDataset(dataset, lunghezza_sottosequenza):
    # Definiamo i 4 caratteri
    # caratteri = ['A', 'C', 'G','T']

    # Creiamo un dataset di 2000 caratteri scelti casualmente tra i 4
    # dataset = ''.join(np.random.choice(caratteri) for _ in range(2000))

    # print(dataset)
    # Dividiamo il dataset in sottosequenze di 100 caratteri
    # sottosequenze = [dataset[i:i + 100] for i in range(0, len(dataset), 100)]
    # return sottosequenze

    # Calcola il numero massimo di pezzi sovrapposti
    sottosequenze = []
    indice_inizio = 0
    k = 0
    while indice_inizio < len(dataset) - lunghezza_sottosequenza + 1:
        # Genera la sottosequenza corrente
        sottosequenza = dataset[indice_inizio:indice_inizio + lunghezza_sottosequenza]
        # print("sottosequenza estratta", sottosequenza)
        sottosequenze.append(sottosequenza)
        # print("sottosequenza estratta", sottosequenze)
        # Seleziona un indice casuale per l'overlap
        indice_casuale = random.randint(1, lunghezza_sottosequenza - 1)
        # print("INDICE CASUALE", indice_casuale)
        # Aggiorna l'indice di inizio per la prossima sottosequenza
        indice_inizio += indice_casuale

        # Verifica se la fine della sequenza è stata raggiunta
        if indice_inizio + lunghezza_sottosequenza > len(dataset):
            # Aggiungi l'ultima sottosequenza che include la fine della sequenza
            sottosequenze.append(dataset[-lunghezza_sottosequenza:])
            break
        k = k + 1
    """
    i = 0
    car = 0
    print(sottosequenze)
    for x in sottosequenze:
        print(len(x))
        car += len(x)
        i += 1

    print("DIM", len(sottosequenze),i, car)
    """

    return sottosequenze