In [None]:
import pandas as pd
import sklearn
import numpy as np
import matplotlib.pyplot as plt
from sklearn.impute import SimpleImputer
from scipy.stats import shapiro
import seaborn as sns
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
import pickle
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, roc_auc_score, classification_report

In [None]:
#Las instrucciones dicen que debe ser una boleana para la muestra, pero es más eficiente pedir directamente el número y trabajarla como boleana dentro de la función. Además que sirve para detectar si el número ingresado es más grande que los datos

def cargar_datos(ruta, n_muestra=None, variables =None, separador= None, codificacion=None): 
    try:
        df=pd.read_csv(ruta, sep=separador, usecols= variables, encoding=codificacion)
        if n_muestra:
            if n_muestra > len(df):
                print(f"El número proporcionado: {n_muestra} es mayor que la cantidad de datos: {len(df)}")
            else:
                df=df.sample(n=n_muestra, random_state=25)
        return df
    except Exception as e:
        print(f"No se pudo importar el dataframe {e}")
        return None


#diagnóstico de los datos. 

def descriptivos(df, columnas=None, faltantes=None, media=None, mediana=None, desv_estandar=None, minimo=None, maximo=None):
    #Por si columnas está vacío, cargamos dataframe completo, pero seleccionamos las variables numéricas para ejecutar el diagnóstico.
    if columnas is None:
        columnas=df.select_dtypes(include="number").columns
    #Se intentó con listas, pero el diccionario resultó más manejable para la conversión en dataframe para presentar al final de la función.
    diagnostico={}

    for columna in columnas:
        #Hacemos un diccionario para guardar los números calculados
        medidas={}
        
        if faltantes:
            medidas['faltantes']=df[columna].isnull().sum()
        if media:
            medidas['media']=df[columna].mean()
        if mediana:
            medidas['mediana']=df[columna].median()
        if desv_estandar:
            medidas['desv.estandar']=df[columna].std()
        if minimo:
            medidas['mínimo']=df[columna].min()
        if maximo:
            medidas['máximo']=df[columna].max()

        #Poblamos el diccionario con los números del diagnóstico y las diferentes columnas que pasan por el ciclo
        diagnostico[columna]=medidas

    diagnostico=pd.DataFrame(diagnostico)
    return diagnostico

#Imputación de datos
def imputar_datos(df, descriptores, estrategias):
    #Hacemos copia profunda para evitar modificaciones no previstas
    df_imputado=df.copy(deep=True)
    #nos aseguramos que las listas de descriptores y estrategias tengan el mismo largo.
    if len(descriptores) != len(estrategias):
        raise ValueError("Las listas de descriptores y estrategias deben ser del mismo largo")
    for i, columna in enumerate(descriptores):
        if columna in df_imputado.columns:
            if estrategias[i] == 'media':
                imputacion=df_imputado[columna].mean()
            elif estrategias[i] =='mediana':
                imputacion=df_imputado[columna].median()
            else:
                print("Estrategia no válida")
            df_imputado[columna]=df_imputado[columna].fillna(imputacion)

    return df_imputado

#Graficadora.

def graficadora(df, variable_x, variable_y, categorica=None):
    plt.figure(figsize= (10, 5))
    sns.scatterplot(data=df, x=variable_x, y=variable_y)
    plt.title(f"Gráfico dispersión relación entre {variable_x} y {variable_y}")
    plt.xlabel(variable_x)
    plt.ylabel(variable_y)
    plt.savefig(f"Dispersión {variable_x}-{variable_y}.png")

    if categorica:
        plt.figure(figsize=(10, 5))
        sns.boxplot(data=df, x=categorica, y=variable_y)
        plt.title(f"Boxplot {categorica} y {variable_y}")
        plt.xlabel(categorica)
        plt.ylabel(variable_y)
        plt.savefig(f"boxplot {categorica}-{variable_y}.png")

        #Estandarización de datos.

def estandarizadora(df, descriptores, estrategias):
    #podemos reciclar desde las imputaciones
    df_estandarizado =df.copy(deep=True)
    #nos aseguramos que las listas de descriptores y estrategias tengan el mismo largo, también reciclado.
    if len(descriptores) != len(estrategias):
        raise ValueError("Las listas de descriptores y estrategias deben ser del mismo largo")
    
    for i, columna in enumerate(descriptores):
        if columna in df_estandarizado.columns:
            estrategia=estrategias[i]
            if estrategia == "z":
                scaler=StandardScaler()
                df_estandarizado[columna]=scaler.fit_transform(df_estandarizado[[columna]])
            elif estrategia == "minmax":
                scaler=MinMaxScaler()
                df_estandarizado[columna]=scaler.fit_transform(df_estandarizado[[columna]])
            else:
                print("Debe ser estandarización Z o tipo Minmax")
        else:
            print("Columna no encontrada en el dataframe")
    return df_estandarizado

#Creación de train y test.

def generadora_set(df, target, test_size=0.2, random_state=20):
    X=df.drop(columns=[target])
    y=df[target]
    X_train, X_test, y_train, y_test=train_test_split(X, y, test_size=test_size, random_state=random_state, stratify=y)

    return X_train, X_test, y_train, y_test


#Función para entrenar y guardar un modelo de clasificación.

def clasificador(tipo_clasificador, ruta_salida, datos_entrenar):
    X_train, y_train= datos_entrenar

    if tipo_clasificador =="GaussianNB":
        modelo= GaussianNB()
    elif tipo_clasificador=="LogisticRegression":
        modelo=LogisticRegression(max_iter=2000)
    else:
        print("Clasificador incorrecto")
    modelo.fit(X_train, y_train)

    #Guardamos el modelo para poderlo usar en la siguiente pregunta. usamos with.

    with open(ruta_salida, "wb") as ruta_archivo:
        pickle.dump(modelo, ruta_archivo)

        print(f"{tipo_clasificador} guardado en {ruta_salida}")


#Función para evaluar los modelos

def evaluar_modelo(modelo, X_test, y_test, metrica):
    predicciones = modelo.predict(X_test)

    #Métricas
    accuracy = accuracy_score(y_test, predicciones)
    f1_macro = f1_score(y_test, predicciones, average="macro")
    f1_weighted = f1_score(y_test, predicciones, average="weighted")
    matriz_confusion = confusion_matrix(y_test, predicciones)
    reporte_clasificacion = classification_report(y_test, predicciones)

    
    
    if metrica=="accuracy":
        print("Accuracy:", accuracy)
    elif metrica =="f1_macro":
        print("F1 Score (macro):", f1_macro)
    elif metrica =="f1_weighted":
        print("F1 Score (weighted):", f1_weighted)
    elif metrica=="matriz_confusion":
        print("Matriz de Confusión:\n", matriz_confusion)
    elif metrica=="reporte_clasificacion":
        print("Reporte de Clasificación:\n", reporte_clasificacion)


In [None]:
df=cargar_datos("Gaia_NaN.csv", separador=";") # Instrucciones 3.1.2

#borramos id de la base de datos.

df=df.drop("ID", axis=1)

#Vamos a eliminar las filas que tengan NA en Class.
df=df.dropna(subset="Class")

Función de diagnóstico # Instrucciones 3.1.3

In [None]:
def descriptivos(df, columnas=None, faltantes=None, media=None, mediana=None, desv_estandar=None, minimo=None, maximo=None):
    #Por si columnas está vacío, cargamos dataframe completo, pero seleccionamos las variables numéricas para ejecutar el diagnóstico.
    if columnas is None:
        columnas=df.select_dtypes(include="number").columns
    #Se intentó con listas, pero el diccionario resultó más manejable para la conversión en dataframe para presentar al final de la función.
    diagnostico={}

    for columna in columnas:
        #Hacemos un diccionario para guardar los números calculados
        medidas={}
        
        if faltantes:
            medidas['faltantes']=df[columna].isnull().sum()
        if media:
            medidas['media']=df[columna].mean()
        if mediana:
            medidas['mediana']=df[columna].median()
        if desv_estandar:
            medidas['desv.estandar']=df[columna].std()
        if minimo:
            medidas['mínimo']=df[columna].min()
        if maximo:
            medidas['máximo']=df[columna].max()

        #Poblamos el diccionario con los números del diagnóstico y las diferentes columnas que pasan por el ciclo
        diagnostico[columna]=medidas

    diagnostico=pd.DataFrame(diagnostico)
    return diagnostico

In [None]:
prueba_funcion=descriptivos(df, media=True, mediana=True, faltantes=True, desv_estandar=True, minimo=True, maximo=True)

In [None]:
prueba_funcion.T

Prueba de normalidad. Forma de identificar si es mejor imputar por la media o la mediana dependiendo de la normalidad de los datos.

In [None]:
def normalidad_Shapiro(df, columnas=None):
    if columnas is None:
        columnas=df.select_dtypes(include="number").columns.tolist()

    resultados = []

    for columna in columnas:
        try:
            stat, p=shapiro(df[columna].dropna())
            resultados.append({
                "columna": columna, "p_valor": p, "normalidad": "Normal" if p >0.05 else "No normal"
            })
        except Exception as e:
            resultados.append({"columna": columna, "error": str(e)})
        
    resultados= pd.DataFrame(resultados)
    return resultados

In [None]:
normalidad=normalidad_Shapiro(df)

In [None]:
normalidad[normalidad['normalidad']=="Normal"]

Imputaciones # Instrucciones 3.1.4

In [None]:
descriptores_i = ["N", "Amplitude", "Rcs", "Meanvariance", "Autocor_length", "SlottedA_length", "Con", "SmallKurtosis", "Std", "Skew", "MaxSlope", "MedianAbsDev", "MedianBRP", "PairSlopeTrend", "FluxPercentileRatioMid20", "FluxPercentileRatioMid35", "FluxPercentileRatioMid50", "FluxPercentileRatioMid65", "FluxPercentileRatioMid80", "PercentDifferenceFluxPercentile", "PercentAmplitude", "LinearTrend", "Eta_e", "Mean", "Q31", "AndersonDarling", "PeriodLS", "Period_fit", "Psi_CS", "Psi_eta", "Freq1_harmonics_amplitude_0", "Freq1_harmonics_amplitude_1", "Freq1_harmonics_amplitude_2", "Freq1_harmonics_amplitude_3", "Freq2_harmonics_amplitude_0", "Freq2_harmonics_amplitude_1", "Freq2_harmonics_amplitude_2", "Freq2_harmonics_amplitude_3", "Freq3_harmonics_amplitude_0", "Freq3_harmonics_amplitude_1", "Freq3_harmonics_amplitude_2", "Freq3_harmonics_amplitude_3", "Freq1_harmonics_rel_phase_0", "Freq1_harmonics_rel_phase_1", "Freq1_harmonics_rel_phase_2", "Freq1_harmonics_rel_phase_3", "Freq2_harmonics_rel_phase_0", "Freq2_harmonics_rel_phase_1", "Freq2_harmonics_rel_phase_2", "Freq2_harmonics_rel_phase_3", "Freq3_harmonics_rel_phase_0", "Freq3_harmonics_rel_phase_1", "Freq3_harmonics_rel_phase_2", "Freq3_harmonics_rel_phase_3", ""]

estrategias_i=["mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "media", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "media", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "media", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana"]


#print(pd.DataFrame({"columna a": descriptores_i, "columna 2": estrategias_i}).head(25))

In [None]:
def imputar_datos(df, descriptores, estrategias):
    #Hacemos copia profunda para evitar modificaciones no previstas
    df_imputado=df.copy(deep=True)
    #nos aseguramos que las listas de descriptores y estrategias tengan el mismo largo.
    if len(descriptores) != len(estrategias):
        raise ValueError("Las listas de descriptores y estrategias deben ser del mismo largo")
    for i, columna in enumerate(descriptores):
        if columna in df_imputado.columns:
            if estrategias[i] == 'media':
                imputacion=df_imputado[columna].mean()
            elif estrategias[i] =='mediana':
                imputacion=df_imputado[columna].median()
            else:
                print("Estrategia no válida")
            df_imputado[columna]=df_imputado[columna].fillna(imputacion)

    return df_imputado

In [None]:
imputados=imputar_datos(df, descriptores_i, estrategias_i)

In [None]:
imputaciones=descriptivos(imputados, media=True, mediana=True, faltantes=True, desv_estandar=True, minimo=True, maximo=True)

prueba imputaciones

In [None]:
imputaciones.T

función graficadora # Instrucciones 3.1.5 y 3.1.6

In [None]:
def graficadora(df, variable_x, variable_y, categorica=None):
    plt.figure(figsize= (10, 5))
    sns.scatterplot(data=df, x=variable_x, y=variable_y)
    plt.title(f"Gráfico dispersión relación entre {variable_x} y {variable_y}")
    plt.xlabel(variable_x)
    plt.ylabel(variable_y)
    plt.savefig(f"Dispersión {variable_x}-{variable_y}.png")

    if categorica:
        plt.figure(figsize=(10, 5))
        sns.boxplot(data=df, x=categorica, y=variable_y)
        plt.title(f"Boxplot {categorica} y {variable_y}")
        plt.xlabel(categorica)
        plt.ylabel(variable_y)
        plt.savefig(f"boxplot {categorica}-{variable_y}.png")

In [None]:
graficadora(imputados, variable_x="N", variable_y= "Amplitude", categorica="Class")

prueba atipicos. Usamos esto para analizar la presencia de datos atípicos y juzgar que operación es la más conveniente antes de estandarizar. particularmente relevante, pues si hay muchos atípicos la estandarización no es capaz de manejar este problema. incluiremos esto en la clase final

In [None]:
def contar_atipicos(df, columna, metodo="iqr", umbral=1.5):

    valores = df[columna].dropna()  
    if metodo == "iqr":
        #esto es IQR
        Q1 = np.percentile(valores, 25) 
        Q3 = np.percentile(valores, 75) 
        IQR = Q3 - Q1
        limite_inferior = Q1 - umbral * IQR
        limite_superior = Q3 + umbral * IQR
        
        atipicos_inferior= valores[valores< limite_inferior].count()
        atipicos_superior = valores[valores> limite_superior].count()
        atipicos_totales = atipicos_inferior+atipicos_superior
    elif metodo == "zscore":
        #probamos con z
        z_scores = (valores - valores.mean()) / valores.std()
        atipicos = valores[np.abs(z_scores) > umbral]
    else:
        raise ValueError("Método no válido. Usa 'iqr' o 'zscore'.")
    
    return {"Total atípicos": atipicos_totales, "Atípicos por debajo": atipicos_inferior, "Atípicos por encima": atipicos_superior}

estandarización # Instrucciones 3.1.7

In [None]:
def estandarizadora(df, descriptores, estrategias):
    #podemos reciclar desde las imputaciones
    df_estandarizado =df.copy(deep=True)
    #nos aseguramos que las listas de descriptores y estrategias tengan el mismo largo, también reciclado.
    if len(descriptores) != len(estrategias):
        raise ValueError("Las listas de descriptores y estrategias deben ser del mismo largo")
    
    for i, columna in enumerate(descriptores):
        if columna in df_estandarizado.columns:
            estrategia=estrategias[i]
            if estrategia == "z":
                scaler=StandardScaler()
                df_estandarizado[columna]=scaler.fit_transform(df_estandarizado[[columna]])
            elif estrategia == "minmax":
                scaler=MinMaxScaler()
                df_estandarizado[columna]=scaler.fit_transform(df_estandarizado[[columna]])
            else:
                print("Debe ser estandarización Z o tipo Minmax")
        else:
            print("Columna no encontrada en el dataframe")
    return df_estandarizado

In [None]:
estandarizada=estandarizadora(imputados, ["N", "Amplitude"], ["z", "z"])

generación de train y test # Instrucciones 3.1.8

In [None]:
def generadora_set(df, target, test_size=0.2, random_state=20):
    X=df.drop(columns=[target])
    y=df[target]
    X_train, X_test, y_train, y_test=train_test_split(X, y, test_size=test_size, random_state=random_state, stratify=y)

    return X_train, X_test, y_train, y_test

In [None]:
X_train, X_test, y_train, y_test=generadora_set(estandarizada, "Class")

clase # Instrucciones 3.1.9

In [None]:
class Preprocesamiento:
    def __init__(self):
        self.df = None
        self.label_encoder = None

    def cargar_datos(self, ruta, n_muestra=None, variables=None, separador=",", codificacion="utf-8"):
        try:
            self.df = pd.read_csv(ruta, sep=separador, usecols=variables, encoding=codificacion)
            
            # Eliminar columna 'ID'
            if 'ID' in self.df.columns:
                self.df.drop(columns=['ID'], inplace=True)
                print("Columna 'ID' eliminada.")
            
            
            if n_muestra:
                if n_muestra > len(self.df):
                    print(f"El número proporcionado: {n_muestra} es mayor que la cantidad de datos: {len(self.df)}")
                else:
                    self.df = self.df.sample(n=n_muestra, random_state=25)
            
            print("Datos cargados exitosamente.")
            return self.df
        except Exception as e:
            print(f"No se pudo importar el dataframe: {e}")
            return None

    def resetear_datos(self):
        #Por problemas en la ejecución repetida,  mejor resetear el asunto
        self.df = None
        print("Datos reseteados. Ahora puedes cargar nuevos datos.")

    def verificar_df(self):
        
        if self.df is None:
            raise ValueError("No se ha cargado ningún DataFrame. Usa el método `cargar_datos` primero.")

    def descriptivos(self, columnas=None, faltantes=None, media=None, mediana=None, desv_estandar=None, minimo=None, maximo=None):
        self.verificar_df()  # Verifica si hay un DataFrame cargado
        if columnas is None:
            columnas = self.df.select_dtypes(include="number").columns
        
        diagnostico = {}
        for columna in columnas:
            medidas = {}
            if faltantes:
                medidas['faltantes'] = self.df[columna].isnull().sum()
            if media:
                medidas['media'] = self.df[columna].mean()
            if mediana:
                medidas['mediana'] = self.df[columna].median()
            if desv_estandar:
                medidas['desv.estandar'] = self.df[columna].std()
            if minimo:
                medidas['mínimo'] = self.df[columna].min()
            if maximo:
                medidas['máximo'] = self.df[columna].max()
            
            diagnostico[columna] = medidas
        
        diagnostico = pd.DataFrame(diagnostico)
        return diagnostico

    def imputar_datos(self, descriptores, estrategias):
        df_imputado = self.df.copy(deep=True)
        if len(descriptores) != len(estrategias):
            raise ValueError("Las listas de descriptores y estrategias deben ser del mismo largo")

        for i, columna in enumerate(descriptores):
            if columna in df_imputado.columns:
                if pd.api.types.is_numeric_dtype(df_imputado[columna]):
                    # Imputación numérica
                    if estrategias[i] == 'media':
                        imputacion = df_imputado[columna].mean()
                    elif estrategias[i] == 'mediana':
                        imputacion = df_imputado[columna].median()
                    else:
                        print(f"Estrategia '{estrategias[i]}' no válida para la columna numérica '{columna}'")
                    df_imputado[columna] = df_imputado[columna].fillna(imputacion)
                else:
                    # Imputación categórica o no numérica
                    if estrategias[i] == 'moda':
                        imputacion = df_imputado[columna].mode()[0]
                        df_imputado[columna] = df_imputado[columna].fillna(imputacion)
                    else:
                        print(f"Estrategia '{estrategias[i]}' no válida para la columna categórica '{columna}'")
            else:
                print(f"Columna '{columna}' no encontrada en el DataFrame.")

        self.df = df_imputado
        return df_imputado

    def graficadora(self, variable_x, variable_y, categorica=None):
        plt.figure(figsize=(10, 5))
        sns.scatterplot(data=self.df, x=variable_x, y=variable_y)
        plt.title(f"Gráfico dispersión relación entre {variable_x} y {variable_y}")
        plt.xlabel(variable_x)
        plt.ylabel(variable_y)
        plt.savefig(f"Dispersión {variable_x}-{variable_y}.png")

        if categorica:
            plt.figure(figsize=(10, 5))
            sns.boxplot(data=self.df, x=categorica, y=variable_y)
            plt.title(f"Boxplot {categorica} y {variable_y}")
            plt.xlabel(categorica)
            plt.ylabel(variable_y)
            plt.savefig(f"boxplot {categorica}-{variable_y}.png")

    
    #Hay muchos atípicos en la base de datos cuya estandarización con Z o minmax no elimina el problema de lpeso de las colas. Incorporamos un método para eliminar columnas que tengan más de un X % de atípicos. Usaremos el 5 % como base de trabajo
    def eliminar_atipicas(self, umbral=None):
        self.verificar_df()
        columnas_eliminadas=[]
        columnas_numericas = self.df.select_dtypes(include="number").columns
        self.umbral=umbral
        
        for columna in columnas_numericas:
            q1=self.df[columna].quantile(0.25)
            q3=self.df[columna].quantile(0.75)
            iqr= q3-q1
            datos_atipicos= self.df[(self.df[columna] < q1 - 1.5 * iqr) | (self.df[columna] > q3 + 1.5* iqr)]
            porcentaje_atipicos= len(datos_atipicos) / len(self.df)

            if porcentaje_atipicos > umbral:
                columnas_eliminadas.append(columna)
        self.df=self.df.drop(columns=columnas_eliminadas)
        return self.df
    
    def estandarizadora(self, descriptores, estrategias):
        df_estandarizado = self.df.copy(deep=True)
        if len(descriptores) != len(estrategias):
            raise ValueError("Las listas de descriptores y estrategias deben ser del mismo largo")
        
        for i, columna in enumerate(descriptores):
            if columna in df_estandarizado.columns:
                estrategia = estrategias[i]
                if estrategia == "z":
                    scaler = StandardScaler()
                    df_estandarizado[columna] = scaler.fit_transform(df_estandarizado[[columna]])
                elif estrategia == "minmax":
                    scaler = MinMaxScaler()
                    df_estandarizado[columna] = scaler.fit_transform(df_estandarizado[[columna]])
                else:
                    print("Debe ser estandarización Z o tipo Minmax")
            else:
                print("Columna no encontrada en el dataframe")
        
        self.df = df_estandarizado
        return df_estandarizado

    def generadora_set(self, target, test_size=0.2, random_state=20):
        X = self.df.drop(columns=[target])
        y = self.df[target]
        
        #Transformamos la variable objetivo antes de separar los datos. Esto como elemento para introducirla al modelo. podría incluri un if para controlar los tipos, pero por ahora esto funciona bien.
        self.label_encoder=LabelEncoder()
        y=self.label_encoder.fit_transform(y)

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state, stratify=y)
        return X_train, X_test, y_train, y_test

    def ejecutar_preprocesamiento(self, ruta, n_muestra=None, variables=None, separador=None, codificacion=None, 
                          descriptores=None, estrategias_imputacion=None, umbral=None, estandarizacion=None, 
                          graficos=None, target=None, test_size=0.2, random_state=20):
        # Cargar datos
        self.cargar_datos(ruta, n_muestra, variables, separador, codificacion)
        print("Datos cargados.")

        # Descriptivos
        descriptivo = self.descriptivos(faltantes=True, media=True, mediana=True, desv_estandar=True, minimo=True, maximo=True)
        print("Descriptivos calculados.")
        print(descriptivo)

        # Imputación de datos
        if descriptores and estrategias_imputacion:
            self.imputar_datos(descriptores, estrategias_imputacion)
            print("Imputación realizada.")

        #eliminacion atipicos
        self.eliminar_atipicas(umbral=umbral)
                
        
        # Estandarización
        if descriptores and estandarizacion:
            self.estandarizadora(descriptores, estandarizacion)
            print("Estandarización realizada.")

        # Graficadora
        if graficos:
            for grafico in graficos:
                self.graficadora(**grafico)
            print("Gráficos generados.")

        # Generación de datos de entrenamiento/test
        if target:
            X_train, X_test, y_train, y_test = self.generadora_set(target, test_size, random_state)
            print("Sets generados.")
            return X_train, X_test, y_train, y_test


In [None]:
procesador=Preprocesamiento()
descriptores_i = ["N", "Amplitude", "Rcs", "Meanvariance", "Autocor_length", "SlottedA_length", "Con", "SmallKurtosis", "Std", "Skew", "MaxSlope", "MedianAbsDev", "MedianBRP", "PairSlopeTrend", "FluxPercentileRatioMid20", "FluxPercentileRatioMid35", "FluxPercentileRatioMid50", "FluxPercentileRatioMid65", "FluxPercentileRatioMid80", "PercentDifferenceFluxPercentile", "PercentAmplitude", "LinearTrend", "Eta_e", "Mean", "Q31", "AndersonDarling", "PeriodLS", "Period_fit", "Psi_CS", "Psi_eta", "Freq1_harmonics_amplitude_0", "Freq1_harmonics_amplitude_1", "Freq1_harmonics_amplitude_2", "Freq1_harmonics_amplitude_3", "Freq2_harmonics_amplitude_0", "Freq2_harmonics_amplitude_1", "Freq2_harmonics_amplitude_2", "Freq2_harmonics_amplitude_3", "Freq3_harmonics_amplitude_0", "Freq3_harmonics_amplitude_1", "Freq3_harmonics_amplitude_2", "Freq3_harmonics_amplitude_3", "Freq1_harmonics_rel_phase_0", "Freq1_harmonics_rel_phase_1", "Freq1_harmonics_rel_phase_2", "Freq1_harmonics_rel_phase_3", "Freq2_harmonics_rel_phase_0", "Freq2_harmonics_rel_phase_1", "Freq2_harmonics_rel_phase_2", "Freq2_harmonics_rel_phase_3", "Freq3_harmonics_rel_phase_0", "Freq3_harmonics_rel_phase_1", "Freq3_harmonics_rel_phase_2", "Freq3_harmonics_rel_phase_3", "", "Class"]

estrategias_i=["mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "media", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "media", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "media", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "mediana", "moda"]


#Función para ejecutar los métodos. En este caso usamos lo necesario para producir los insumos para el entrenamiento y prueba de los modelos
X_train, X_test, y_train, y_test=procesador.ejecutar_preprocesamiento(ruta="Gaia_NaN.csv", separador=";", descriptores=descriptores_i, estrategias_imputacion=estrategias_i, umbral= 0.05, test_size=0.2, random_state=30, target="Class")

modelo de clasificación GaussianNB # Instrucciones 3.2.1

In [None]:
clasificador(tipo_clasificador="GaussianNB", ruta_salida="modelo_g.pkl", datos_entrenar=(X_train, y_train))

In [None]:
with open("modelo_g.pkl", "rb") as modelo_g:
    modelo_ga=pickle.load(modelo_g)

In [None]:
evaluar_modelo(modelo_ga, X_test, y_test, metrica="f1_macro")

GaussianNB()

In [None]:
#Según la documentación, modificar los priors sirve para asignar manualmente un peso cuando las clases son desbalanceadas. En este caso probamos asignando la misma probabilidad a todas las clases. no obstante, este modelo funciona peor que el entrenado por defecto, con un 0.30 en el f1_macro frent a un 0.37 del otro modelo.

priors = [1/9] * 9

modelo_ga2= GaussianNB(priors=priors)
modelo_ga2.fit(X_train, y_train)

In [None]:
evaluar_modelo(modelo_ga2, X_test, y_test, metrica="f1_macro")

Modelo LogisticRegression

In [None]:
clasificador(tipo_clasificador="LogisticRegression", ruta_salida="modelo_lg.pkl", datos_entrenar=(X_train, y_train))

In [None]:
with open("modelo_lg.pkl", "rb") as modelo_rl:
    modelo_rl=pickle.load(modelo_rl)

In [None]:
evaluar_modelo(modelo_rl, X_test, y_test, metrica="f1_macro")

prueba de modelos de regresión

In [None]:
logistica_ajustada = LogisticRegression(penalty='l1', solver= "liblinear", class_weight='balanced')
logistica_ajustada.fit(X_train, y_train)

evaluar_modelo(logistica_ajustada, X_test, y_test, metrica="f1_weighted")

#Este modelo rinde bien con las clases grandes, pero falla con las minoritarias. con un f1_macro de 0.42 y un f1 f1_weighted cercano al 90 %, podemos decir que es un modelo regular. predice bien las clases con mayor frecuencia, pero falla en las minoritarias

In [None]:
#logistica_ajustada2 = LogisticRegression(penalty='l2', solver= "liblinear", class_weight='balanced')
#logistica_ajustada2.fit(X_train, y_train)

evaluar_modelo(logistica_ajustada2, X_test, y_test, metrica="f1_macro")

#Este tiene el mismo problema que el anterior. buena capacidad general, pero bajo rendimiento cuando se trata de específicas. 0.89 y f1_macro de 0.42 lo demuestran.

In [None]:
#logistica_ajustada3 = LogisticRegression(penalty='elasticnet', solver= 'saga', l1_ratio=0.5, class_weight='balanced', C=1.0, random_state=30)
#logistica_ajustada3.fit(X_train, y_train)

evaluar_modelo(logistica_ajustada3, X_test, y_test, metrica="reporte_clasificacion")

#Este tiene un menor rendimiento, tanto en f1_macro como en f1_weighted, 0.33en el primero y 0.74 en el segundo. Peor rendimiento hasta el momento.

En general y luego de haber analizado varios modelos con distintos parámetros, el que tiene mejor rendimiento es la regresión logística por defecto. Si bien se experimentan cambios al ajustar ciertos parámetros como le penalización, el rendimiento general se mantiene. En general predice bien las clases mayoritarias, pero le cuesta mucho en aquellas donde no hay tantos datos.