## Peptídeos de Penetração Hematoencefálica

### Bibliotecas comumente utilizadas ao longo do código

In [None]:
import joblib as jl
import numpy as np
import pandas as pd
import pathlib as pl

### Funções para carregar a base de dados e retornar os dados em Data Frame

In [1]:
#load

def data_frame_from_excel(*args, **kwargs) -> pd.DataFrame:
    return pd.read_excel(*args, **kwargs)


def data_frame_from_csv(*args, **kwargs) -> pd.DataFrame:
    return pd.read_csv(*args, **kwargs)


def array_as_memory_map(array: np.ndarray, *, directory: pl.Path) -> np.memmap:
    if not directory.is_dir():
        directory.mkdir()

    memory_map_path = pl.Path(directory) / "memory_map"

    jl.dump(array, memory_map_path)

    return jl.load(memory_map_path, mmap_mode="r")


### Criação de um mapa de memória, onde os arquivos mapeados na memória são usados para acessar pequenos segmentos de arquivos grandes no disco, sem ler o arquivo inteiro na memória. 

In [None]:
def array_as_memory_map(array: np.ndarray, *, directory: pl.Path) -> np.memmap:
    if not directory.is_dir():
        directory.mkdir()

    memory_map_path = pl.Path(directory) / "memory_map"

    jl.dump(array, memory_map_path)

    return jl.load(memory_map_path, mmap_mode="r")


### Silhouette scores
O valor do coeficiente de silhueta é um valor de -1 a 1, onde um valor alto indica que o objeto está bem combinado com seu próprio cluster e mal combinado com os clusters vizinhos. Se a maioria dos objetos tiver um valor alto, a configuração de cluster é apropriada. Se muitos pontos tiverem um valor baixo ou negativo, a configuração de cluster pode ter muitos ou poucos clusters, ou seja, uma configuração de clusters diferente do apresentado.

In [2]:
#score

from sklearn import cluster, metrics


def get_silhouette_scores(data_frame: pd.DataFrame, *, parallel: jl.Parallel = None) -> pd.DataFrame:
    columns = [data_frame.loc[:, [column]] for column in data_frame.columns]

    if parallel is not None:
        scores = parallel(jl.delayed(__get_silhouette_score__)(column) for column in columns)
    else:
        scores = [__get_silhouette_score__(column) for column in columns]

    index = pd.Index(data_frame.columns.values, name="Feature")
    columns = pd.Index(("Silhouette score",), name="Measure")

    return pd.DataFrame(scores, index=index, columns=columns)


def evaluate_k_means_silhouette(individual: np.ndarray, *, descriptors: np.memmap, final_dim: int) -> tuple[float]:
    if individual.sum() != final_dim:
        return -1.,

    selected_descriptors = descriptors[:, individual]

    return __get_silhouette_score__(selected_descriptors),


# noinspection PyUnresolvedReferences
def __get_silhouette_score__(data_frame: pd.DataFrame) -> float:
    return metrics.silhouette_score(data_frame, cluster.KMeans(n_clusters=2).fit(data_frame).labels_)


### Pré-processamento

In [3]:
import typing as t
from feature_engine import selection

# Apaga atributos iguais
def drop_constant_features(data_frame: pd.DataFrame, *, tolerance: float) -> pd.DataFrame:
    return selection.DropConstantFeatures(tol=tolerance).fit_transform(data_frame)

# Apaga atributos correlacionados
def drop_correlated_features(
        data_frame: pd.DataFrame, sorted_scores: pd.DataFrame, *, tolerance: float) -> pd.DataFrame:
    flagged_feature_names = __get_correlated_feature_names__(data_frame, tolerance=tolerance)

    print(f"Correlated features found: {len(flagged_feature_names)}.")

    while len(flagged_feature_names) > 0:
        print(f"Remaining correlated features: {len(flagged_feature_names)}.", end="\r")

        worst_performing_column_name = sorted_scores.loc[flagged_feature_names, :].index.values[0]
        data_frame = data_frame.drop(worst_performing_column_name, axis=1)
        flagged_feature_names = __get_correlated_feature_names__(data_frame, tolerance=tolerance)

    print("All correlated features removed.")

    return data_frame

# Obtém os nomes dos atributos correlacionados
def __get_correlated_feature_names__(data_frame: pd.DataFrame, *, tolerance: float) -> tuple[t.Any, ...]:
    correlation_matrix = data_frame.corr(method="spearman").abs()
    upper_triangle = correlation_matrix.where(np.triu(np.ones(correlation_matrix.shape), k=1).astype(np.bool_))

    return tuple(column for column in upper_triangle.columns if any(upper_triangle[column] > tolerance))


### Otimização

In [4]:
from deap import algorithms, base, creator, tools

def get_toolbox(
        *, descriptors: np.memmap, final_dim: int, ind_size: int, pop_size: int, parallel: jl.Parallel = None
) -> base.Toolbox:
    creator.create("Score", base.Fitness, weights=(1.0,))
    # noinspection PyUnresolvedReferences
    creator.create("Individual", np.ndarray, dtype="bool", fitness=creator.Score)

    toolbox = base.Toolbox()

    # if parallel is not None:
    #     toolbox.register("map", joblib_map, parallel=parallel)

    # noinspection PyUnresolvedReferences
    toolbox.register("individual", create_individual, size=ind_size, final_dim=final_dim)
    # noinspection PyUnresolvedReferences
    toolbox.register("population", tools.initRepeat, list, toolbox.individual, n=pop_size)

    toolbox.register("evaluate", evaluate_k_means_silhouette, descriptors=descriptors, final_dim=final_dim)
    toolbox.register("mate", tools.cxPartialyMatched)
    toolbox.register("mutate", tools.mutFlipBit, indpb=0.1)
    toolbox.register("select", tools.selTournament, tournsize=3)

    return toolbox


# noinspection PyUnresolvedReferences
def create_individual(size: int, *, final_dim: int):
    array = np.zeros(size, dtype=np.bool_)
    random_indices = np.random.randint(0, size, size=final_dim)
    array[random_indices] = np.True_

    # noinspection PyUnresolvedReferences
    return creator.Individual(array)


def joblib_map(function: t.Callable, *arg_lists, parallel: jl.Parallel):
    return parallel(jl.delayed(function)(*args) for args in zip(*arg_lists))


def genetic_algorithm(toolbox: base.Toolbox) -> list[np.ndarray]:
    # noinspection PyUnresolvedReferences
    population = toolbox.population()
    best_dim, best_fit = 0, 0

    gen, gens_without_progress = 0, 0

    while gens_without_progress < 100:
        gen += 1
        print_prefix = f"[{gen} ({gens_without_progress} without progress) | " \
                       f"Best individual: (D = {best_dim}, F = {best_fit})]"

        print(f"{print_prefix} Selecting parents...", end="\r")
        # noinspection PyUnresolvedReferences
        offspring = toolbox.select(population, k=len(population) // 2) + toolbox.population(n=len(population) // 2)
        print(f"{print_prefix} Mating and mutating selected parents...", end="\r")
        offspring = algorithms.varAnd(offspring, toolbox, cxpb=0.5, mutpb=0.2)
        print(f"{print_prefix} Scoring offspring fitness...", end="\r")
        # noinspection PyUnresolvedReferences
        values = toolbox.map(toolbox.evaluate, offspring)

        for value, individual in zip(values, offspring):
            individual.fitness.values = value

        print(f"{print_prefix} Selecting best from parents and children...", end="\r")
        # noinspection PyUnresolvedReferences
        population = toolbox.select(population + offspring, k=len(population))

        best_ind = tools.selBest(population, k=1)
        dim, fit = np.where(best_ind[0] == 1)[0].size, best_ind[0].fitness.values[0]

        if dim == best_dim and fit == best_fit:
            gens_without_progress += 1
        else:
            best_dim, best_fit = dim, fit
            gens_without_progress = 0

    print(f"{print_prefix} Execution finished.")

    return tools.selBest(population, k=1)[0]


### Execução

In [8]:
#playground
import pathlib as pl

import joblib as jl

#from pipeline import load, preprocess, score, optimize

#for i in range(50):
descriptors_file = pl.Path("data/descriptors.xlsx")
output_directory = pl.Path("results")#+str(i))

if not output_directory.is_dir():
    output_directory.mkdir()

variance_threshold, correlation_threshold = .95, .7
n_jobs = -1
final_dimensionality = 5
population_size = 100


with jl.Parallel(n_jobs=n_jobs) as parallel:
    if not (output_directory / "final_descriptors.csv").is_file():
        descriptors = data_frame_from_excel(
            descriptors_file, sheet_name="Blad1", header=2, index_col=0
        ).reset_index(drop=True)
        filtered_descriptors = drop_constant_features(descriptors, tolerance=variance_threshold)
        scores = get_silhouette_scores(filtered_descriptors, parallel=parallel)
        final_descriptors = drop_correlated_features(
            filtered_descriptors, scores.sort_values(by="Silhouette score"), tolerance=correlation_threshold)
        final_descriptors.to_csv(output_directory / "final_descriptors.csv", sep=";", index=False)
    else:
        final_descriptors = data_frame_from_csv(output_directory / "final_descriptors.csv", sep=";")

    data_memory_map = array_as_memory_map(final_descriptors.to_numpy(), directory=output_directory)

    toolbox = get_toolbox(
        descriptors=data_memory_map,
        final_dim=final_dimensionality,
        ind_size=final_descriptors.shape[1],
        pop_size=population_size,
        parallel=parallel
    )

    best_solution = genetic_algorithm(toolbox)
    final_descriptors.iloc[:, best_solution].to_csv(output_directory / "selected_descriptors.csv", sep=";", index=False)

    with open(output_directory / "selected_descriptors_fitness.txt", "w", encoding="utf-8") as file:
        # noinspection PyUnresolvedReferences
        file.write(str(best_solution.fitness.values[0]))


Correlated features found: 1360.
All correlated features removed.....
[1 (0 without progress) | Best individual: (D = 0, F = 0)] Scoring offspring fitness... parents...

AttributeError: 'NoneType' object has no attribute 'split'