# Pruebas pre-validación datos presión atmosférica - Clase

> Elaborado por Paola Álvarez, profesional contratista IDEAM, contrato 196 de 2024. Comentarios o inquietudes, remitir a *palvarez@ideam.gov.co* 

**Librerías**

In [6]:
import pandas as pd
import numpy as np
import os
import re
import logging
from functools import wraps

----

A continuación, se encuentran las pruebas de pre-validación de datos de EMA para verificar su capacidad de detección de datos

## Clase con métodos de aplicación de QC

In [8]:
# Configuración del logger para guardar en el directorio de archivos y escribir cada vez
def setup_logger(log_file_path):
    logger = logging.getLogger('RawUnmodified_Patm')
    logger.setLevel(logging.INFO)
    # Clear existing handlers to avoid duplicate logs
    if logger.hasHandlers():
        logger.handlers.clear()
    file_handler = logging.FileHandler(log_file_path, mode='a')
    file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
    logger.addHandler(file_handler)
    return logger

def log_failures(func):
    @wraps(func)
    def wrapper(self, chunk, archivo):
        try:
            result, mask = func(self, chunk, archivo)
            if mask is not None:
                try:
                    aligned_mask = mask.reindex(chunk.index, fill_value=False)  # Asegura que la máscara esté alineada con el índice del DataFrame
                except AttributeError:
                    aligned_mask = mask  # Si no se puede reindexar, usa la máscara tal como está
                for index, row in chunk[aligned_mask].iterrows():
                    self.logger.info('File: %s - Row: %s - Failed in %s: %s', archivo, index, func.__name__, row['Valor'])
            return result
        except ValueError as e:
            self.logger.error('Error procesando el archivo %s: %s', archivo, str(e))
            return chunk  # Devuelve una máscara falsa para manejar el error
    return wrapper

In [10]:
class AutomatPatmEMA:
    
    def __init__(self, dir_files, chunk_size=54000):
        self.dir_files = dir_files
        self.ruta_archivos = os.listdir(dir_files)
        self.chunk_size = chunk_size
        self.last_rows = None
        self.current_file = None
        # Sección configuración de logs
        log_file_path = os.path.join(dir_files, 'QC_Patm.log')
        self.logger = setup_logger(log_file_path)
        self.logger.info('Inicialización de AutomatPatmEMA en directorio: %s', dir_files)

    def p_transm(self, chunk, archivo):
        '''Esta prueba verifica si existe al menos el 70% de datos esperados por día y hora
        en la serie de datos; aquellos que no superen la prueba, son marcados como sospechosos'''
        # Se encontraron diferentes frecuencias en la transmisión de Patm, por lo tanto:
        freqinst100b = pd.read_csv('EMAPatm_allinfo.csv', encoding='latin-1', sep=';')
        
        # Se define un diccionario de frecuencias y cantidades esperadas
        frecuencias = {
            'min': {'cant_esperd_h': 60, 'cant_esperd_d': 1440},
            '2min': {'cant_esperd_h': 30, 'cant_esperd_d': 720},
            '10min': {'cant_esperd_h': 6, 'cant_esperd_d': 144},
            'h': {'cant_esperd_h': 1, 'cant_esperd_d': 24}
        }
        
        # Obtener la frecuencia de 'freqinst100b' basado en 'Station' y asignar a 'periodos'
        station_value = chunk['Station'].values[0]
        if pd.isna(station_value):
            periodos = None
        else:
            freqinst100b_station = freqinst100b.loc[freqinst100b['Station'] == station_value]
            if freqinst100b_station.empty:
                print(f"No se encontró la estación {station_value} en freqinst100b")
                return chunk
    
            freq_inf_value = freqinst100b_station['FreqInf'].values[0]
    
            if pd.isna(freq_inf_value):
                try:
                    periodos = pd.infer_freq(chunk['Fecha'][-25:])
                    print(periodos)
                    if periodos is None:
                        print(f"Frecuencia inferida es None para el archivo {archivo}")
                        return chunk
                except ValueError as e:
                    print(f'Error al inferir la frecuencia en el archivo {archivo}: {str(e)}')
                    return chunk
            else:
                periodos = freq_inf_value
    
        if periodos is None:
            print(f"Periodo es None para el archivo {archivo}")
            return chunk
    
        # Obtener las cantidades esperadas de acuerdo a la frecuencia
        cant_esperd_h = frecuencias[periodos]['cant_esperd_h']
        cant_esperd_d = frecuencias[periodos]['cant_esperd_d']
    
        # Se establecen los aceptables
        cant_aceptab_hora = 0.5 * cant_esperd_h
        cant_aceptab_dia = 0.5 * cant_esperd_d
    
        # Agregar columna de etiquetas al dataframe original
        chunk['Estado'] = ''
        
        # Definir función para asignar etiquetas
        def asignar_etiqueta(row):
            if row['count'] < cant_aceptab_hora:
                chunk.loc[chunk['Fecha'].dt.floor('h') == row['Fecha'].floor('h'), 'Estado'] = '0PSO0'
        
        # Evaluar por cada grupo de datos por hora y asignar la etiqueta
        canthora = chunk.groupby(chunk['Fecha'].dt.floor('h')).size().reset_index(name='count')
        canthora.apply(asignar_etiqueta, axis=1)
        
        # Definir función para asignar etiquetas de acumulado diario
        def asignar_etiqueta_diaria(row):
            if row['count'] < cant_aceptab_dia:
                chunk.loc[chunk['Fecha'].dt.floor('D') == row['Fecha'].floor('D'), 'Estado'] = '0PSO0'
        
        # Evaluar por cada grupo de datos por día y asignar la etiqueta
        cantdia = chunk.groupby(chunk['Fecha'].dt.floor('D')).size().reset_index(name='count')
        cantdia.apply(asignar_etiqueta_diaria, axis=1)
        
        return chunk
        
    @log_failures
    def p_estruct(self, chunk, archivo):
        '''Esta prueba verifica si los datos fueron transmitidos en horas y minutos exactos al ser el
        comportamiento esperado'''
        # Se crea la columna 'Estado' si no existe
        if 'Estado' not in chunk.columns:
            chunk['Estado'] = ''

        freqinst100b = pd.read_csv('EMAPatm_allinfo.csv', sep=';', encoding='latin-1') # Antiguo 'freq10binst_manualcorrect.csv'
        
        # Define un diccionario de frecuencias y cantidades esperadas
        frecuencias = {
            '2min': {'num_para_modulo': 2},
            '10min': {'num_para_modulo': 10}
        }
        
        # Obtener la frecuencia de 'freqinst100b' basado en 'Station' y asignar a 'periodos'
        station_value = chunk['Station'].values[0]
        if pd.isna(station_value):
            periodos = None
        else:
            freqinst100b_station = freqinst100b.loc[freqinst100b['Station'] == station_value]
            if freqinst100b_station.empty:
                print(f"No se encontró la estación {station_value} en freqinst100b")
                return chunk
            freq_inf_value = freqinst100b_station['FreqInf'].values[0]
    
            if pd.isna(freq_inf_value):
                try:
                    periodos = pd.infer_freq(chunk['Fecha'][-25:])
                    print(periodos)
                    if periodos is None:
                        print(f"Frecuencia inferida es None para el archivo {archivo}")
                        return chunk
                except ValueError as e:
                    print(f'Error al inferir la frecuencia en el archivo {archivo}: {str(e)}')
                    return chunk
            else:
                periodos = freq_inf_value
    
        # Se hace frente al caso de no encontrar la estación
        if periodos is not None:
            
            # Generar la operación para observar si la estructura es exacta en minutos
            fecha = chunk['Fecha']
    
            # Se vectoriza la evaluación de la estructura por minuto
            # Para cada chunk:
            if periodos == 'min':
                mask = fecha.dt.second != 0
            elif periodos == 'h':
                mask = (fecha.dt.minute != 0) | (fecha.dt.second != 0)
            else:
                # Se obtiene num_para_modulo
                num_para_modulo = frecuencias[periodos]['num_para_modulo']
                mask = fecha.dt.minute % num_para_modulo != 0

            chunk['Estado'] = chunk['Estado'].fillna('')
            # Cambiar '0PSO0' a '0PSO1' donde la máscara se cumple
            chunk.loc[mask & (chunk['Estado'] == '0PSO0'), 'Estado'] = '0PSO1'
            # Asignar '0PSO0' a los NaN donde la máscara se cumple
            chunk.loc[mask & chunk['Estado'].isnull(), 'Estado'] = '0PSO0'
        
        else:  # Si el periodo es None, no se hace ninguna modificación al chunk, pero puedes imprimir un mensaje si quieres
            print(f"No se encontró la frecuencia de la estación {station_value} ni un proyecto correspondiente en freqinst100b")
            
        return chunk, mask

    @log_failures
    def p_limrigidos(self, chunk, archivo):
        '''Esta prueba verifica si los datos crudos se encuentran fuera del umbral físico inferior o superior'''
        # Se crea la columna 'Estado' si no existe
        if 'Estado' not in chunk.columns:
            chunk['Estado'] = ''
        # Se genera la columna de estado anterior
        chunk['Estado_Anterior'] = ''
        
        # Se establecen los umbrales físicos/rígidos a datos crudos en nuevas colummnas para vectorizar
        chunk['umbr_crud_inf'] = 484.5
        chunk['umbr_crud_sup'] = 1013.26

        # Compara el dato con umbrales inferiores y superiores 
        mask_outbounds = (chunk['Valor'] < chunk['umbr_crud_inf']) | (chunk['Valor'] > chunk['umbr_crud_sup'])

        chunk['Estado'] = chunk['Estado'].fillna('')
        chunk['Estado_Anterior'] = chunk['Estado_Anterior'].fillna('')
        # Lógica de etiquetado para 'Estado_Anterior'
        condicion_0PSO0 = mask_outbounds & chunk['Estado'].isin(['0PSO0', '0PSO1'])
        chunk.loc[condicion_0PSO0, 'Estado_Anterior'] = chunk.loc[condicion_0PSO0, 'Estado']
        # Lógica de etiquetado para 'Estado'
        condicion_0PER0 = mask_outbounds & (chunk['Estado'].isnull() | chunk['Estado'].isin(['0PSO0', '0PSO1']))
        chunk.loc[condicion_0PER0, 'Estado'] = '0PER0'
        
        # Se eliminan las columnas no deseadas
        if 'umbr_crud_inf' in chunk.columns:
            chunk.drop(columns=['umbr_crud_inf', 'umbr_crud_sup'], axis=1, inplace=True)
                
        return chunk, mask_outbounds

    @log_failures
    def p_limpsicrom(self, chunk, archivo):
        '''Esta prueba calcula, con base en la ubicación de la estación y la estimación de tmax y tmin según las normales
        climatológicas, la escala de altura y la presión con la fórmula psicrométrica para fijar límites superior e inferior
        válidos; y reconocer datos atípicos en los conjuntos de datos'''
        # Se crea la columna 'Estado' si no existe
        if 'Estado' not in chunk.columns:
            chunk['Estado'] = ''
        # Se genera la columna 'Estado_anterior' si no existe
        if 'Estado_Anterior' not in chunk.columns:
            chunk['Estado_Anterior'] = ''

        # Se establece la ecuación de interpolación lineal
        def interp_lineal(x, x1, y1, x2, y2):
            if x1 == x2:  # Para evitar división por cero
                return y1
            return y1 + ((y2 - y1) / (x2 - x1)) * (x - x1)
        
        # Se lee el archivo con las altitudes por estación y ## medias máximas y mínimas estimadas según las normales
        EMN_TMaxMin = pd.read_csv('EMAPatm_allinfo.csv', sep=';', encoding='latin-1')
        # Se lee el archivo con el gradiente de temperatura calculado
        TempCalcGrad = pd.read_excel('TempCalcGrad.xlsx')

        # Se establecen los valores de altura y código de la estación
        Alt = EMN_TMaxMin['Altitud'].ravel()
        Statncod = chunk['Station']

        # Se itera sobre las altitudes y estación
        for dem, s in zip(Alt, Statncod):
            if s not in EMN_TMaxMin['Station'].values:
                print(f'La estación {s} no se encuentra en EMN_TMaxMin.')
                return chunk

            AltSttn = EMN_TMaxMin[EMN_TMaxMin['Station'] == s]['Altitud'].values[0]
            
            if AltSttn == 0.0:
                Talt = TempCalcGrad['TempCalcGradiente'].values[0]
            else:
                # Encontrar los puntos de interpolación
                alturas = TempCalcGrad['Altura']
                temp_grad = TempCalcGrad['TempCalcGradiente']
    
                # Verificar que la altura está dentro del rango de datos
                if AltSttn < alturas.min() or AltSttn > alturas.max():
                    print(f'La altura {AltSttn} de la estación {s} está fuera del rango de datos.')
                    return chunk
    
                # Encontrar los puntos de interpolación más cercanos
                alt_min = alturas[alturas <= AltSttn].max()
                alt_max = alturas[alturas >= AltSttn].min()
                temp_min = temp_grad[alturas == alt_min].values[0]
                temp_max = temp_grad[alturas == alt_max].values[0]
    
                # Realizar la interpolación lineal
                Talt = interp_lineal(AltSttn, alt_min, temp_min, alt_max, temp_max)
            
            # Se hacen los cálculos según fórmula psicrométrica
            EAsttn = (8.3144598 * (273.15 + Talt)) / (9.80665 * 0.0289644)
            Patmpsicrom = (1013.25 * np.exp(-(AltSttn / EAsttn)))

            # Se calculan los límites según fórmula psicrométrica, se tienen en cuenta dos valores de aumento y disminución
            chunk['LimSup_psicr_AT'] = (Patmpsicrom + 9.0) # Antes 8.0
            chunk['LimInf_psicr_AT'] = (Patmpsicrom - 9.0)
            chunk['LimSup_psicr_ER'] = (Patmpsicrom + 13.0) # Antes 10.0
            chunk['LimInf_psicr_ER'] = (Patmpsicrom - 13.0)


            chunk['Estado'] = chunk['Estado'].fillna('')
            chunk['Estado_Anterior'] = chunk['Estado_Anterior'].fillna('')
            ## Comparación con umbrales inferiores y superiores, caso erróneos (fuera de +-10.0hPa) 
            mask_outbPsicrom_ER = (chunk['Valor'] < chunk['LimInf_psicr_ER']) | (chunk['Valor'] > chunk['LimSup_psicr_ER'])
            # Lógica de etiquetado para 'Estado_Anterior'
            condicion_0PSO0 = mask_outbPsicrom_ER & chunk['Estado'].isin(['0PSO0', '0PSO1'])
            chunk.loc[condicion_0PSO0, 'Estado_Anterior'] = chunk.loc[condicion_0PSO0, 'Estado']
            # Lógica de etiquetado para 'Estado', '0PER0'
            condicion_0PER0 = mask_outbPsicrom_ER & (chunk['Estado'].isnull() | chunk['Estado'].isin(['0PSO0', '0PSO1']))
            chunk.loc[condicion_0PER0, 'Estado'] = '0PER0'
            mask_outbPsicrom_ER = mask_outbPsicrom_ER & ~condicion_0PER0
            # Condición 0PER1
            condicion_0PER1 = mask_outbPsicrom_ER & (chunk['Estado'] == '0PER0')
            chunk.loc[condicion_0PER1, 'Estado'] = '0PER1'

            ## Comparación con umbrales inferiores y superiores, caso atípicos (fuera de +-8.0hPa e inf. a +-10.0hPa)
            # Se genera filtro para no considerar datos ya catalogados como erróneos
            if chunk['Estado'].notna().all(): # Se verifica que no hayan valores nulos en tal columna
                chunk_psic = chunk[~chunk['Estado'].str.startswith('0PER', na=False)].copy()
            else:
                # Si todos los valores son NaN, simplemente copia el chunk
                chunk_psic = chunk.copy()
            # Comparación
            mask_outbPsicrom_AT = ((chunk_psic['Valor'] < chunk_psic['LimInf_psicr_AT']) & (chunk_psic['Valor'] > chunk_psic['LimInf_psicr_ER'])) | ((chunk_psic['Valor'] > chunk_psic['LimSup_psicr_AT']) & (chunk_psic['Valor'] < chunk_psic['LimSup_psicr_ER']))
            
            chunk_psic['Estado'] = chunk_psic['Estado'].fillna('')
            chunk_psic['Estado_Anterior'] = chunk_psic['Estado_Anterior'].fillna('')
            # Lógica de etiquetado para 'Estado_Anterior'
            condicion_0PSO0 = mask_outbPsicrom_AT & chunk_psic['Estado'].isin(['0PSO0', '0PSO1'])
            chunk_psic.loc[condicion_0PSO0, 'Estado_Anterior'] = chunk_psic.loc[condicion_0PSO0, 'Estado']
            # Lógica de etiquetado para 'Estado', '0PAT0'
            condicion_0PAT0 = mask_outbPsicrom_AT & (chunk_psic['Estado'].isnull() | chunk_psic['Estado'].isin(['0PSO0', '0PSO1']))
            chunk_psic.loc[condicion_0PAT0, 'Estado'] = '0PAT0'

            # Se copian los datos del filtro al chunk original
            chunk.loc[chunk_psic.index] = chunk_psic
            
            # Se eliminan las columnas no deseadas
            if 'LimSup_psicr_AT' in chunk.columns:
                chunk.drop(columns=['LimSup_psicr_AT', 'LimInf_psicr_AT', 'LimSup_psicr_ER','LimInf_psicr_ER'], axis=1, inplace=True)
                
            return chunk, (mask_outbPsicrom_ER | mask_outbPsicrom_AT)

    @log_failures
    def p_persist(self, chunk, archivo):
        '''Esta prueba verifica si los registros tienen datos persistentes, que podrían significar errores en el sensor'''
        # Se crea la columna 'Estado' si no existe
        if 'Estado' not in chunk.columns:
            chunk['Estado'] = ''
                
        # Se genera la columna 'Estado_anterior' si no existe
        if 'Estado_Anterior' not in chunk.columns:
            chunk['Estado_Anterior'] = ''

        # Verificar si el archivo ha cambiado
        if self.current_file != archivo:
            # Si el archivo cambió, resetea self.last_rows y actualiza self.current_file
            self.last_rows = None
            self.current_file = archivo
            
        # Usar self.last_rows para concatenar con el chunk actual
        if self.last_rows is not None:
            chunk = pd.concat([self.last_rows, chunk])
            chunk.reset_index(drop=True)
            
        # Crear máscaras para cada comparación de las 6 filas consecutivas
        mask_1 = (chunk['Valor'] == chunk['Valor'].shift(1))
        mask_2 = (chunk['Valor'] == chunk['Valor'].shift(2))
        mask_3 = (chunk['Valor'] == chunk['Valor'].shift(3))
        mask_4 = (chunk['Valor'] == chunk['Valor'].shift(4))
        mask_5 = (chunk['Valor'] == chunk['Valor'].shift(5))
        
        # Combinar todas las máscaras para obtener la condición deseada
        mask_pers3datos = mask_1 & mask_2 & mask_3 & mask_4 & mask_5

        chunk['Estado'] = chunk['Estado'].fillna('')
        chunk['Estado_Anterior'] = chunk['Estado_Anterior'].fillna('')
        # Etiquetado de valores, se inicia con el Estado Anterior
        condicion_0PSO0 = mask_pers3datos & chunk['Estado'].isin(['0PSO0', '0PSO1'])
        chunk.loc[condicion_0PSO0, 'Estado_Anterior'] = chunk.loc[condicion_0PSO0, 'Estado']

        # Lógica de etiquetado para 'Estado'
        condicion_0PER0 = mask_pers3datos & (chunk['Estado'].isnull() | chunk['Estado'].isin(['0PSO0', '0PSO1']))
        chunk.loc[condicion_0PER0, 'Estado'] = '0PER0'
        mask_pers3datos = mask_pers3datos & ~condicion_0PER0

        condicion_0PER1 = mask_pers3datos & (chunk['Estado'] == '0PER0')
        chunk.loc[condicion_0PER1, 'Estado'] = '0PER1'
        mask_pers3datos = mask_pers3datos & ~condicion_0PER1

        condicion_0PER2 = mask_pers3datos & (chunk['Estado'] == '0PER1')
        chunk.loc[condicion_0PER2, 'Estado'] = '0PER2'
        
        self.last_rows = chunk.tail(3)
        
        return chunk, mask_pers3datos

    @log_failures
    def p_salto(self, chunk, archivo):
        '''Esta prueba verifica si la variación entre valores consecutivos excede 3.0 hPa'''
        # Se crea la columna 'Estado' si no existe
        if 'Estado' not in chunk.columns:
            chunk['Estado'] = ''
                
        # Se genera la columna 'Estado_anterior' si no existe
        if 'Estado_Anterior' not in chunk.columns:
            chunk['Estado_Anterior'] = ''
            
        # Se toma nuevamente el archivo de frecuencias para analizar datos estrictamente consecutivos
        freqinst100b = pd.read_csv('EMAPatm_allinfo.csv', encoding='latin-1', sep=';')

        # Obtener la frecuencia de 'freqinst100b' basado en 'Station' y asignar a 'periodos'
        sttn_code = chunk['Station'].values[0]
        if pd.isna(sttn_code):
            periodos = None
        else:
            freqinst100b_station = freqinst100b.loc[freqinst100b['Station'] == sttn_code]
            if freqinst100b_station.empty:
                print(f"No se encontró la estación {sttn_code} en freqinst100b")
                return chunk
    
            freq_inf_value = freqinst100b_station['FreqInf'].values[0]
    
            if pd.isna(freq_inf_value):
                try:
                    periodos = pd.infer_freq(chunk['Fecha'][-25:])
                    print(periodos)
                    if periodos is None:
                        print(f"Frecuencia inferida es None para el archivo {archivo}")
                        return chunk
                except ValueError as e:
                    print(f'Error al inferir la frecuencia en el archivo {archivo}: {str(e)}')
                    return chunk
            else:
                periodos = freq_inf_value
    
        if periodos is None:
            print(f"Periodo es None para el archivo {archivo}")
            return chunk

        # Asegurarse de que 'periodos' tenga un número antes de la unidad
        if periodos.isalpha():
            periodos = '1' + periodos
        
        # Ordenar el chunk por la columna 'Fecha'
        chunk = chunk.sort_values('Fecha').reset_index(drop=True)

        # Crear una columna de diferencia temporal
        chunk['Fecha_anterior'] = chunk['Fecha'].shift(1)
        chunk['Delta_tiempo'] = chunk['Fecha'] - chunk['Fecha_anterior']
        
        # Crear una máscara para identificar filas consecutivas según la frecuencia esperada
        mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
        
        # Calcular la diferencia absoluta entre los valores consecutivos
        chunk['Delta'] = chunk['Valor'].diff().abs()
        chunk['Delta'] = chunk['Delta'].where(mask_consecutivo)

        # Máscara para identificar variaciones mayores a 3.0
        mask_variacion = chunk['Delta'] > 3.0

        # Se genera filtro para no considerar datos ya catalogados como erróneos
        if chunk['Estado'].notna().all(): # Se verifica que no hayan valores nulos en tal columna
            chunk_jmp = chunk[~chunk['Estado'].str.startswith('0PER', na=False)].copy()
        else:
            # Si todos los valores son NaN, simplemente copia el chunk
            chunk_jmp = chunk.copy()

        chunk_jmp['Estado'] = chunk_jmp['Estado'].fillna('')
        chunk_jmp['Estado_Anterior'] = chunk_jmp['Estado_Anterior'].fillna('')
        # Lógica de etiquetado para 'Estado_Anterior'
        condicion_0PSO0 = mask_variacion & chunk_jmp['Estado'].isin(['0PSO0', '0PSO1'])
        chunk_jmp.loc[condicion_0PSO0, 'Estado_Anterior'] = chunk_jmp.loc[condicion_0PSO0, 'Estado']

        # Se etiquetan los atípicos y para prueba de otra forma
        condicion_0PAT0 = mask_variacion & (chunk_jmp['Estado'].isnull() | chunk_jmp['Estado'].isin(['0PSO0', '0PSO1']))
        chunk_jmp.loc[condicion_0PAT0, 'Estado'] = '0PAT0'
        mask_variacion = mask_variacion & ~condicion_0PAT0
        # 0PAT1
        condicion_0PAT1 = mask_variacion & (chunk_jmp['Estado'] == '0PAT0')
        chunk_jmp.loc[condicion_0PAT1, 'Estado'] = '0PAT1'

        # Se copian los datos del filtro al chunk original
        chunk.loc[chunk_jmp.index] = chunk_jmp

        # Eliminar las columnas temporales antes de devolver el chunk
        chunk.drop(columns=['Delta', 'Fecha_anterior', 'Delta_tiempo'], axis=1, inplace=True)

        return chunk, mask_variacion
    
    @log_failures
    def p_limclimsigma(self, chunk, archivo):
        '''Esta prueba calcula, con los datos no etiquetados en la anterior prueba, la 3sigmas +- la media para detectar
        datos atípicos en los conjuntos de los datos'''
        # Se crea la columna 'Estado' si no existe
        if 'Estado' not in chunk.columns:
            chunk['Estado'] = ''
        # Se genera la columna 'Estado_anterior' si no existe
        if 'Estado_Anterior' not in chunk.columns:
            chunk['Estado_Anterior'] = ''

        if chunk['Estado'].notna().all(): # Se verifica que no hayan valores nulos en tal columna
            chunk_sgm = chunk[~chunk['Estado'].str.startswith('0PER', na=False)].copy()
        else:
            # Si todos los valores son NaN, simplemente copia el chunk
            chunk_sgm = chunk.copy()

        # Se calculan los estadísticos para sigma
        mean = chunk_sgm['Valor'].mean()
        std = chunk_sgm['Valor'].std()
        # Con ellos, se establecen los límites superior e inferior
        chunk_sgm['LimSup_Sigma'] = (mean + (4 * std))
        chunk_sgm['LimInf_Sigma'] = (mean - (4 * std))

        # Se etiquetan los valores que sobrepasen el límite
        mask_outbsigma = (chunk_sgm['Valor'] < chunk_sgm['LimInf_Sigma']) | (chunk_sgm['Valor'] > chunk_sgm['LimSup_Sigma'])

        chunk_sgm['Estado'] = chunk_sgm['Estado'].fillna('')
        chunk_sgm['Estado_Anterior'] = chunk_sgm['Estado_Anterior'].fillna('')
        ## Actualización de estado
        # Condición llenado de 'Estado_Anterior', si aplica
        condicion_0PSO0 = mask_outbsigma & chunk_sgm['Estado'].isin(['0PSO0', '0PSO1'])
        chunk_sgm.loc[condicion_0PSO0, 'Estado_Anterior'] = chunk_sgm.loc[condicion_0PSO0, 'Estado']

        # Se etiquetan los atípicos
        condicion_0PAT0 = mask_outbsigma & (chunk_sgm['Estado'].isnull() | chunk_sgm['Estado'].isin(['0PSO0', '0PSO1']))
        chunk_sgm.loc[condicion_0PAT0, 'Estado'] = '0PAT0'
        mask_outbsigma = mask_outbsigma & ~condicion_0PAT0
        # 0PAT1
        condicion_0PAT1 = mask_outbsigma & (chunk_sgm['Estado'] == '0PAT0')
        chunk_sgm.loc[condicion_0PAT1, 'Estado'] = '0PAT1'
        mask_outbsigma = mask_outbsigma & ~condicion_0PAT1
        # 0PAT2
        condicion_0PAT2 = mask_outbsigma & (chunk_sgm['Estado'] == '0PAT1')
        chunk_sgm.loc[condicion_0PAT2, 'Estado'] = '0PAT2'
                    
        # Se eliminan las columnas no deseadas
        if 'LimSup_Sigma' in chunk.columns:
            chunk.drop(columns=['LimSup_Sigma', 'LimInf_Sigma'], axis=1, inplace=True)
    
        chunk.loc[chunk_sgm.index] = chunk_sgm
        return chunk, mask_outbsigma

    @log_failures
    def p_horavmaxmin(self, chunk, archivo):
        '''Esta prueba detecta los datos que son máximos y mínimos en horarios distintos a los conocidos en cada periodo semidiurno
        y los marca como atípicos'''
        # Se crea la columna 'Estado' si no existe
        if 'Estado' not in chunk.columns:
            chunk['Estado'] = ''
        # Se genera la columna 'Estado_anterior' si no existe
        if 'Estado_Anterior' not in chunk.columns:
            chunk['Estado_Anterior'] = ''

        chunk['Estado_Anterior'] = chunk['Estado_Anterior'].fillna('')
        if chunk['Estado'].notna().all(): # Se verifica que no hayan valores nulos en tal columna
            chunk_hmm = chunk[~chunk['Estado'].str.startswith('0PER', na=False)].copy()
        else:
            # Si todos los valores son NaN, simplemente copia el chunk
            chunk_hmm = chunk.copy()
            
        # Se crean máscaras para los intervalos de tiempo conocidos para valores máximos y mínimos
        mask_max_morning = (chunk_hmm['Fecha'].dt.hour >= 8) & (chunk_hmm['Fecha'].dt.hour < 13) | (chunk_hmm['Fecha'].dt.hour == 12)
        mask_max_evening = (chunk_hmm['Fecha'].dt.hour >= 20) & (chunk_hmm['Fecha'].dt.hour <= 23) | (chunk_hmm['Fecha'].dt.hour == 1)
        mask_min_morning = (chunk_hmm['Fecha'].dt.hour >= 2) & (chunk_hmm['Fecha'].dt.hour < 7)
        mask_min_afternoon = (chunk_hmm['Fecha'].dt.hour > 14) & (chunk_hmm['Fecha'].dt.hour < 19)
        
        # Se filtran los datos para esas horas
        max_validdata = chunk_hmm[mask_max_morning | mask_max_evening]
        min_validdata = chunk_hmm[mask_min_morning | mask_min_afternoon]

        # Encontrar los dos valores máximos por día
        max_values = chunk_hmm.groupby(chunk_hmm['Fecha'].dt.date).apply(lambda x: x.nlargest(2, 'Valor')).reset_index(level=0, drop=True)
        
        # Encontrar los dos valores mínimos por día
        min_values = chunk_hmm.groupby(chunk_hmm['Fecha'].dt.date).apply(lambda x: x.nsmallest(2, 'Valor')).reset_index(level=0, drop=True)

        # Verificar los máximos y mínimos y obtener las horas correspondientes
        notvalid_max_values = max_values[~max_values.index.isin(max_validdata.index)]
        notvalid_min_values = min_values[~min_values.index.isin(min_validdata.index)]

        # Combinar los valores no válidos en un solo DataFrame
        notvalid_values = pd.concat([notvalid_max_values, notvalid_min_values])
        # Crear una máscara para identificar los índices de los valores no válidos
        notval_maxmin = chunk_hmm.index.isin(notvalid_values.index)

        ## Actualización de estado
        # Condición llenado de 'Estado_Anterior', si aplica
        condicion_0PSO0 = notval_maxmin & chunk_hmm['Estado'].isin(['0PSO0', '0PSO1'])
        chunk_hmm.loc[condicion_0PSO0, 'Estado_Anterior'] = chunk_hmm.loc[condicion_0PSO0, 'Estado']

        chunk_hmm['Estado'] = chunk_hmm['Estado'].fillna('')
        chunk_hmm['Estado_Anterior'] = chunk_hmm['Estado_Anterior'].fillna('')
        # Se etiquetan los atípicos
        condicion_0PAT0 = notval_maxmin & (chunk_hmm['Estado'].isnull() | chunk_hmm['Estado'].isin(['0PSO0', '0PSO1']))
        chunk_hmm.loc[condicion_0PAT0, 'Estado'] = '0PAT0'
        notval_maxmin = notval_maxmin & ~condicion_0PAT0
        # 0PAT1
        condicion_0PAT1 = notval_maxmin & (chunk_hmm['Estado'] == '0PAT0')
        chunk_hmm.loc[condicion_0PAT1, 'Estado'] = '0PAT1'
        notval_maxmin = notval_maxmin & ~condicion_0PAT1
        # 0PAT2
        condicion_0PAT2 = notval_maxmin & (chunk_hmm['Estado'] == '0PAT1')
        chunk_hmm.loc[condicion_0PAT2, 'Estado'] = '0PAT2'
        notval_maxmin = notval_maxmin & ~condicion_0PAT2
        # 0PAT3
        condicion_0PAT2 = notval_maxmin & (chunk_hmm['Estado'] == '0PAT2')
        chunk_hmm.loc[condicion_0PAT2, 'Estado'] = '0PAT3'

        chunk.loc[chunk_hmm.index] = chunk_hmm
        return chunk, notval_maxmin

    def llenado_0PCO0(self, chunk, archivo):
        '''Esta función permite el llenado de los NaN de la columna 'Estado' como '0PCO0' al haber superado las pruebas'''
        chunk.loc[chunk['Estado'].isnull(), 'Estado'] = '0PCO0'
        return chunk
       
    def procesar_archivos(self, funcion_evaluacion):
        '''Este método procesa la lectura y guardado de los archivos para todas las pruebas'''
        archivos = self.ruta_archivos

        archivos_salida = []  # Lista para almacenar nombres de archivos de salida

        # Se recorre cada archivo en la carpeta
        for archivo in archivos:
            if archivo.endswith('.csv'):
                ruta_archivo = os.path.join(self.dir_files, archivo)

                reader = pd.read_csv(ruta_archivo, encoding='latin-1', chunksize=self.chunk_size)#,dtype={7: 'str'}, low_memory=False)
                resultados = []

                for chunk in reader:
                    try:
                        chunk['Fecha'] = pd.to_datetime(chunk['Fecha'], format='%Y-%m-%d %H:%M:%S.%f')
                    except ValueError:
                        chunk['Fecha'] = pd.to_datetime(chunk['Fecha'], format='%Y-%m-%d %H:%M:%S')

                    chunk['Station'] = chunk['Station'].astype('int64')
                    chunk_resultado = funcion_evaluacion(chunk, archivo)
                    resultados.append(chunk_resultado)

                if not resultados:  # Se verifica si la lista está vacía
                    self.logger.warning('No hay resultados válidos para concatenar en el archivo %s. Continuando con el siguiente archivo.', archivo)
                    #print(f"No hay resultados válidos para concatenar en el archivo {archivo}. Continuando con el siguiente archivo.")
                    continue
                    
                resultados_consolidados = pd.concat(resultados)

                # Genera el nombre del archivo de salida conservando los primeros 19 caracteres del nombre del archivo original
                nombre_archivo_salida = archivo[:19] + '_qc.csv'

                resultados_consolidados.to_csv(os.path.join(self.dir_files, nombre_archivo_salida), encoding='latin-1', index=False)

                archivos_salida.append(nombre_archivo_salida)  # Agregar el nombre del archivo a la lista
            
        # Actualiza self.ruta_archivos para que la próxima prueba procese los resultados de esta prueba
        self.ruta_archivos = archivos_salida
        # Se fija el log de procesamiento completo de archivos
        self.logger.info('Procesamiento completo de archivos de estaciones Patm. Archivos generados: %s', archivos_salida)

In [12]:
procesador = AutomatPatmEMA('Test_QC') #RawUnmodified_Patm

In [14]:
procesador.procesar_archivos(procesador.p_transm)

min
h
None
Frecuencia inferida es None para el archivo Estacion_0048015040.csv
min
min
min
min
None
Frecuencia inferida es None para el archivo Estacion_0048015040.csv
min
min
min
min
2min
None
Frecuencia inferida es None para el archivo Estacion_0057015010.csv
h
2min
2min
2min
2min
2min
2min
2min


In [15]:
procesador.procesar_archivos(procesador.p_estruct)

min
h
None
Frecuencia inferida es None para el archivo Estacion_0048015040_qc.csv
min
min
min
min
None
Frecuencia inferida es None para el archivo Estacion_0048015040_qc.csv
min
min
min
min
2min
None
Frecuencia inferida es None para el archivo Estacion_0057015010_qc.csv
h
2min
2min
2min
2min
2min
2min
2min


In [16]:
procesador.procesar_archivos(procesador.p_limrigidos)

In [17]:
procesador.procesar_archivos(procesador.p_limpsicrom)

  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()
  Alt = EMN_TMaxMin['Altitud'].ravel()


In [18]:
procesador.procesar_archivos(procesador.p_persist)

In [19]:
procesador.procesar_archivos(procesador.p_salto)

min
h
h


  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  chunk.loc[chunk_jmp.index] = chunk_jmp
  chunk.loc[chunk_jmp.index] = chunk_jmp
  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')


min
min
min


  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  chunk.loc[chunk_jmp.index] = chunk_jmp
  chunk.loc[chunk_jmp.index] = chunk_jmp


min
min
None
Frecuencia inferida es None para el archivo Estacion_0048015040_qc.csv
None
Frecuencia inferida es None para el archivo Estacion_0048015040_qc.csv
None
Frecuencia inferida es None para el archivo Estacion_0048015040_qc.csv
min


  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')


2min
None
Frecuencia inferida es None para el archivo Estacion_0057015010_qc.csv
h


  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')


2min
2min
2min


  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')


2min
2min
2min


  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')
  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')


2min


  mask_consecutivo = chunk['Delta_tiempo'] == pd.to_timedelta('1H')


In [20]:
procesador.procesar_archivos(procesador.p_limclimsigma)

  chunk.loc[chunk_sgm.index] = chunk_sgm
  chunk.loc[chunk_sgm.index] = chunk_sgm
  chunk.loc[chunk_sgm.index] = chunk_sgm
  chunk.loc[chunk_sgm.index] = chunk_sgm


In [21]:
procesador.procesar_archivos(procesador.p_horavmaxmin)

  chunk.loc[chunk_hmm.index] = chunk_hmm
  chunk.loc[chunk_hmm.index] = chunk_hmm


In [22]:
procesador.procesar_archivos(procesador.llenado_0PC0)

AttributeError: 'AutomatPatmEMA' object has no attribute 'llenado_0PC0'

## Alistamiento archivos Cassandra

In [None]:
# Se reemplaza con la carpeta de archivos del resultado de pruebas con columnas extra
input_dir = '_______'

# Se genera lista de todos los archivos CSV en la carpeta de salida de pruebas y que ya hayan sido corregidos 
# y por tanto inician con el prefijo "cor", si no es el caso, omitir 'and f.startswith('cor')'
files = [f for f in os.listdir(input_dir) if f.endswith('.csv') and f.startswith('cor')]

# Se itera por cada archivo
for file in files:
    toCass_filepath = os.path.join(input_dir, file)

    # Se lee el archivo a transformar a formato adecuado para Cassandra
    toCass_df = pd.read_csv(toCass_filepath, encoding='latin-1')
    toCass_df['Fecha'] = pd.to_datetime(toCass_df['Fecha'], format='%Y-%m-%d %H:%M:%S')
    
    # Se eliminan las columnas no aptas para Cassandra que se conoce que están en todos los archivos
    toCass_df.drop(columns=['Unnamed: 0','Name'], inplace=True)

    # Se reemplazan sensores por los acordes en el glosario de variables + 'QC'
    # Se desconoce
    # Se guarda el archivo listo para Cassandra
    toCassReady_filepath = os.path.join(input_dir, f"cass_{file}")
    toCass_df.to_csv(toCassReady_filepath, index=False, encoding='latin-1')

print("¡Proceso completado!")

In [None]:
import unittest

In [None]:
# V1 p_limpsicrom --fue reemplazada
    def p_limpsicrom(self, chunk, archivo):
        '''Esta prueba calcula, con base en la ubicación de la estación y la estimación de tmax y tmin según las normales
        climatológicas, la escala de altura y la presión con la fórmula psicrométrica para fijar límites superior e inferior
        válidos; y reconocer datos atípicos en los conjuntos de datos'''
        # Se crea la columna 'Estado' si no existe
        if 'Estado' not in chunk.columns:
            chunk['Estado'] = None
        # Se genera la columna 'Estado_anterior' si no existe
        if 'Estado_Anterior' not in chunk.columns:
            chunk['Estado_Anterior'] = None
        
        # Se lee el archivo con las medias máximas y mínimas estimadas según las normales
        EMN_TMaxMin = pd.read_table('EMA_Patm_TMaxMin.txt', sep=';', encoding='latin-1')

        # Se establecen los valores de Tmax y Tmin
        Tmax = EMN_TMaxMin['TmaxNorm'].ravel()
        Tmin = EMN_TMaxMin['TminNorm'].ravel()
        Alt = EMN_TMaxMin['Altitud'].ravel()
        Statncod = chunk['Station']

        # Se itera sobre los valores de temperatura, altitudes y estación debidos
        for i,j,dem,s in zip(Tmax,Tmin,Alt,Statncod):
            if s not in EMN_TMaxMin['Station'].values:
                print(f'La estación {s} no se encuentra en EMN_TMaxMin.')
                return chunk
            
            TmaxSttn = EMN_TMaxMin[EMN_TMaxMin['Station'] == s]['TmaxNorm'].values[0]
            TminSttn = EMN_TMaxMin[EMN_TMaxMin['Station'] == s]['TminNorm'].values[0]
            AltSttn = EMN_TMaxMin[EMN_TMaxMin['Station'] == s]['Altitud'].values[0]

            # Se hacen los cálculos según fórmula psicrométrica
            EAsttnSup = (8.3144598*(273.15+TmaxSttn))/(9.80665*0.0289644)
            PatmpsicromSup = (1013.25*np.exp(-(AltSttn/EAsttnSup))) + 15.0
            EAsttnInf = (8.3144598*(273.15+TminSttn))/(9.80665*0.0289644)
            PatmpsicromInf = (1013.25*np.exp(-(AltSttn/EAsttnInf))) - 15.0

            # Se establecen los umbrales según fórmula psicrométrica
            chunk['LimSup_Sttn'] = PatmpsicromSup
            chunk['LimInf_Sttn'] = PatmpsicromInf
    
            # Compara el dato con umbrales inferiores y superiores 
            mask_outbPsicrom = (chunk['Valor'] < chunk['LimInf_Sttn']) | (chunk['Valor'] > chunk['LimSup_Sttn'])
            # Lógica de etiquetado para 'Estado_Anterior'
            condicion_0PSO0 = mask_outbPsicrom & chunk['Estado'].isin(['0PSO0', '0PSO1'])
            chunk.loc[condicion_0PSO0, 'Estado_Anterior'] = chunk.loc[condicion_0PSO0, 'Estado']
    
            # Lógica de etiquetado para 'Estado'
            condicion_0PER0 = mask_outbPsicrom & (chunk['Estado'].isnull() | chunk['Estado'].isin(['0PSO0', '0PSO1']))
            chunk.loc[condicion_0PER0, 'Estado'] = '0PER0'
            mask_outbPsicrom = mask_outbPsicrom & ~condicion_0PER0

            condicion_0PER1 = mask_outbPsicrom & (chunk['Estado'] == '0PER0')
            chunk.loc[condicion_0PER1, 'Estado'] = '0PER1'
            
            # Se eliminan las columnas no deseadas
            if 'LimSup_Sttn' in chunk.columns:
                chunk.drop(columns=['LimSup_Sttn', 'LimInf_Sttn'], axis=1, inplace=True)
                
            return chunk