In [1]:
import json
import pandas as pd
from pathlib import Path
import cudf
import shutil
import rmm
import gc
import cupy as cp
import numpy as np
from pandas.tests.groupby.conftest import dropna
# from cuml.ensemble import IsolationForest NO furula, probablemente no compilado todavía! Estamos en el bleeding edge
from sklearn.ensemble import IsolationForest

# Sección de configuración de entorno
Variables de entorno


In [2]:
BASE_DIR = Path("~/Projects/proyecto-computacion-I/data/datasets/raw").expanduser()
OUTPUT_DIR = Path("~/Projects/proyecto-computacion-I/data/datasets/processed/measurements.parquet").expanduser()
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"
#
# Limpieza previa de archivos Usar si se cambian nombres de atributos por el Hive style
if OUTPUT_DIR.exists():
    print(f"Limpiando directorio antiguo: {OUTPUT_DIR}")
    shutil.rmtree(OUTPUT_DIR)

Limpiando directorio antiguo: /home/duo/Projects/proyecto-computacion-I/data/datasets/processed/measurements.parquet


Verificación de GPU pre cudf

In [3]:
import pynvml

try:
    pynvml.nvmlInit()
    device_count = pynvml.nvmlDeviceGetCount()
    print(f"GPUs detectadas: {device_count}")
    for i in range(device_count):
        handle = pynvml.nvmlDeviceGetHandleByIndex(i)
        name = pynvml.nvmlDeviceGetName(handle)
        print(f"GPU {i}: {name}")
except Exception as e:
    print(f"Error detectando GPU: {e}")

GPUs detectadas: 1
GPU 0: NVIDIA GeForce RTX 5090


Activar memoria RAM de apoyo

In [4]:
rmm.reinitialize(managed_memory=True)

Parseador JSON -> DF para concadenar posteriormente, aseguramos acceso O(N)

In [5]:
def process_json(filepath, batch_name):
    """Lee un JSON y devuelve un DataFrame aplanado."""
    with open(filepath, 'r') as f:
        data = json.load(f)

    # Extraer metadatos globales del archivo
    resp = data.get('response', {})
    metric_name = resp.get('nombre')
    unit = resp.get('unidad')

    # Extraer la lista de valores
    valores = resp.get('valores', [])

    if not valores:
        return pd.DataFrame()

    # Crear DataFrame
    df = pd.DataFrame(valores)

    # Limpieza y Tipado (CRUCIAL para Parquet/cuDF)
    # 1. Convertir tiempo a datetime real
    if 'tiempo' in df.columns:
        df['timestamp'] = pd.to_datetime(df['tiempo'], format='%d/%m/%Y %H:%M')

    # 2. Asegurar que valor sea float
    if 'valor' in df.columns:
        df['value'] = pd.to_numeric(df['valor'], errors='coerce')

    # 3. Añadir columnas de contexto
    df['metric'] = metric_name
    df['unit'] = unit
    df['batch'] = batch_name

    # Opcional: Extraer sensor del nombre del archivo si no está en el JSON
    # C302_AMONIO.json -> C302
    sensor_from_filename = filepath.stem.split('_')[0]
    df['sensor_id'] = sensor_from_filename

    # Seleccionar solo columnas útiles para ahorrar espacio
    # Tag se puede obtener combinando unit con sensor_id así que lo descartamos
    cols_to_keep = ['timestamp', 'value', 'sensor_id', 'metric', 'unit', 'batch']
    return df[cols_to_keep]

In [6]:
all_dfs = []

for batch_dir in BASE_DIR.glob('batch*'):
    print(f"Procesando {batch_dir.name}...")

    # Recorrer jsons
    for json_file in batch_dir.glob('*.json'):
        try:
            df_temp = process_json(json_file, batch_dir.name)
            if not df_temp.empty:
                all_dfs.append(df_temp)
        except Exception as e:
            print(f"Error en {json_file}: {e}")

Procesando batch2...
Procesando batch5...
Procesando batch4...
Procesando batch1...
Procesando batch3...


# Inicio de la lógica de desduplicación

Concadenamos y ordenamos por batch y fecha, así nos aseguramos de que la última escritura manda.

In [7]:
raw_df = pd.concat(all_dfs, ignore_index=True)
raw_df.sort_values(by=['batch', 'timestamp'], inplace=True)

In [8]:
print(f"Registros crudos totales: {len(raw_df):,}")
print("Muestra de datos crudos:")
display(raw_df.head())

Registros crudos totales: 261,163
Muestra de datos crudos:


Unnamed: 0,timestamp,value,sensor_id,metric,unit,batch
156726,2025-10-02 14:00:00,7.4,C323,PH,ph,batch1
156967,2025-10-02 14:00:00,0.1,C322,AMONIO,ppm,batch1
157208,2025-10-02 14:00:00,2.2,C342,CLOROFILA,µg/l,batch1
158409,2025-10-02 14:00:00,1.9,C323,CARBONO ORGANICO,ppm,batch1
158890,2025-10-02 14:00:00,12.6,C313,CLOROFILA,ppb,batch1


Definimos qué hace única a una medición y detectamos duplicados para saber cuántos hay

Le dice a pandas (cu pandas) que busque duplicados en las columnas elegidas que devuelve una boooleana. Marca todas las filas como duplicados False y a la última fila de los duplicados la marca como True

la suma de booleanos dan el número de duplicados

In [9]:
subset_unique = ['timestamp', 'sensor_id', 'metric']
dupes_count = raw_df.duplicated(subset=subset_unique, keep='last').sum()

print(f"Duplicados detectados (solapamiento de batches): {dupes_count:,}")

Duplicados detectados (solapamiento de batches): 78,172


In [10]:
clean_df = raw_df.drop_duplicates(subset=subset_unique, keep='last').copy()
print(f"Registros finales únicos: {len(clean_df):,}")
print(f"Se han eliminado {len(raw_df) - len(clean_df)} filas redundantes.")

Registros finales únicos: 182,991
Se han eliminado 78172 filas redundantes.


# Inicio serialización

Convertimos los tipos de datos a category, al guardar en Parquet, esto creará un "Dictionary Encoding" automático.

1. Guarda los Valores Únicos (El Diccionario): pandas crea una lista interna de todos los valores únicos que aparecen en esa columna.

2. Guarda Índices (Los Códigos): En lugar de almacenar el texto repetido en cada fila, la columna solo almacena un código numérico (un entero pequeño) que apunta a la posición del valor real en esa lista de valores únicos.

In [11]:
cols_to_convert = ['tag', 'sensor_id', 'metric', 'unit', 'batch']

for col in cols_to_convert:
    if col in clean_df.columns:
        clean_df[col] = clean_df[col].astype('category')

    print("Guardando a Parquet con categorías nativas...")

clean_df.to_parquet(
    OUTPUT_DIR,
    partition_cols=['batch'],
    engine='pyarrow',
    compression='snappy',
    index=False
)
print(f"Datos guardados en {OUTPUT_DIR}")

Guardando a Parquet con categorías nativas...
Guardando a Parquet con categorías nativas...
Guardando a Parquet con categorías nativas...
Guardando a Parquet con categorías nativas...
Guardando a Parquet con categorías nativas...
Datos guardados en /home/duo/Projects/proyecto-computacion-I/data/datasets/processed/measurements.parquet


# Limpieza

Aunque en producción usaremos el script de limpieza antes de guardar, aquí aprovechamos la potencia de la rtx5090 para limpiar y analizar antes de redactar el pipeline definitivo de preprocesamiento de datos.

## Pruebas iniciales

Encontramos primero a los culpables para el posterior infilling

In [12]:
# 1. Recargar los datos limpios
df_gpu = cudf.read_parquet(OUTPUT_DIR)
print(f"Analizando {len(df_gpu):,} filas en GPU...\n")

Analizando 182,991 filas en GPU...



### Duplicados

Los duplicados los matamos directamente sin miramientos. Aunque no deberían haberse guardado pues los duplicados se tiran antes de escribir a parquet.

In [13]:
print(f"Filas antes de limpiar: {len(df_gpu):,}")

# Identificamos duplicados basándonos en la CLAVE PRIMARIA compuesta
# Mismo sensor + Misma métrica + Mismo instante de tiempo = Dato duplicado
subset_unique = ['sensor_id', 'metric', 'timestamp']
df_gpu = df_gpu.drop_duplicates(subset=subset_unique, keep='last')

print(f"Filas tras limpiar:     {len(df_gpu):,}")

Filas antes de limpiar: 182,991
Filas tras limpiar:     182,991


### Nulos
A veces al parsear los tipos aparecen nulos

In [14]:
nulos = df_gpu.isna().sum()
if nulos.sum() > 0:
    print("ADVERTENCIA: Se han encontrado valores nulos:")
    print(nulos[nulos > 0])
else:
    print("No hay valores nulos (NaN).")

No hay valores nulos (NaN).


### Valores negativos
Según la física todas nuestras magnitudes manejadas no pueden ser negativas:



Ver tipos de magnitudes

In [15]:
print(df_gpu['metric'].unique())

0                   PH
1               AMONIO
2            CLOROFILA
3     CARBONO ORGANICO
4     OXIGENO DISUELTO
5        CONDUCTIVIDAD
6             TURBIDEZ
7          TEMPERATURA
8         FICOCIANINAS
9                NIVEL
10            FOSFATOS
11            NITRATOS
Name: metric, dtype: object


Ninguna magnitud puede ser negativa

In [16]:
# PH, Turbidez, etc., no pueden ser negativos.
# Filtramos solo columnas numéricas
negativos = df_gpu[df_gpu['value'] < 0]

if len(negativos) > 0:
    print(f"\nADVERTENCIA: Hay {len(negativos)} mediciones con valor negativo.")
    # Mostramos qué métricas son las culpables
    print(negativos['metric'].value_counts())
else:
    print("\nNo hay valores negativos imposibles.")


ADVERTENCIA: Hay 28 mediciones con valor negativo.
metric
FICOCIANINAS    28
Name: count, dtype: int64


### Investigación de ceros

Sensores rotos o ríos secos

In [17]:
ceros = df_gpu[df_gpu['value'] == 0]
if len(ceros) > 0:
    print(f"\nINFO: Hay {len(ceros)} mediciones con valor 0.0.")
    print("Revisar si son reales (ej: lluvia) o error de sensor.")


INFO: Hay 27207 mediciones con valor 0.0.
Revisar si son reales (ej: lluvia) o error de sensor.


### Investigación de outliers

Nuestras magnitudes siguen distribuciones normales o log-normales

Temperatura, nivel, PH y concentración de oxígeno siguen distribuciones normales (esperable)

Todas las demás magnitudes siguen distribuciones log normales, es decir, son de cola pesada, donde media >> mediana

Utilizaremos Rolling Z-Score estándar para encontrar outliers en distribuciones normales y transformación logarítmica + rolling z-score para las demás

Para cada valor calculamos un Z score que es básicamente (valor - media)/ desviación típica -> Si supera umbral de aceptación es un outlier

Recordamos que estamos intentando encontrar vertidos ilegales (aunque muy raros) que son outliers por naturaleza, así que vamos a ser generosos con el Z-score

In [18]:
metrics_normal = ['PH', 'TEMPERATURA', 'OXIGENO DISUELTO', 'NIVEL']
metrics_log = ['AMONIO', 'TURBIDEZ', 'NITRATOS', 'FOSFATOS',
               'CLOROFILA', 'FICOCIANINAS', 'CONDUCTIVIDAD', 'CARBONO ORGANICO']
WINDOW_SIZE = 96
MIN_PERIODS = 24
MAX_ZSCORE = 6
EPSILON = 1e-6

Atención trabajando con el dataset he encontrado que Window_size = 24 (un día) hacía matemáticamente inexacto al modelo Si tenemos una ventana con pocos datos o si el dato es un pico muy fuerte, el pico dispara la media hacia arriba o desploma la desviación estándar. Se ha incrementado a 4 días para por ejemplo, minimizar los eventos como lluvias para que el z-score no se acostumbre

Resultado: El Z-Score se auto-limita. Matemáticamente, nunca puede superar sqrt(N−1) (aproximadamente) en una ventana de tamaño N.

Preparar los datos

In [19]:
df_gpu = df_gpu.reset_index(drop=True)

In [20]:
df_gpu = df_gpu.sort_values(['sensor_id', 'metric', 'timestamp'])
df_gpu['trans_value'] = df_gpu['value']

Creamos una columna con la copia de los valores y le aplicamos el logaritmo natural de 1 + x a las métricas que pueden seguir una distribución exponencial

Recordamos que no existen valores negativos medición, pero sí puede existir valor 0. Así que esto protege a la GPU en las matemáticas.

In [21]:
mask_log = df_gpu['metric'].isin(metrics_log)
df_gpu.loc[mask_log, 'trans_value'] = cp.log1p(df_gpu.loc[mask_log, 'value'])

Calcular z-score

Comparamos con las 12h antes y 12h después porque center=true

Usamos la función de pandas (implementación RAPIDS) de rolling

In [22]:
grouped = df_gpu.groupby(['sensor_id', 'metric'])
roll_mean = grouped['trans_value'].rolling(WINDOW_SIZE, center=True, min_periods=MIN_PERIODS).mean()
roll_std = grouped['trans_value'].rolling(WINDOW_SIZE, center=True, min_periods=MIN_PERIODS).std()


En cudf/pandas, el resultado de groupby().rolling() tiene un MultiIndex (claves de grupo + índice original).
Necesitamos soltar los niveles del grupo para alinear con df_gpu.
El nivel del índice original suele estar al final.

In [23]:
roll_mean_values = roll_mean.reset_index(level=[0, 1], drop=True)
roll_std_values  = roll_std.reset_index(level=[0, 1], drop=True)

Aseguramos que estamos alineados (ordenamos ambos por índice por si acaso)
Esto es costoso pero seguro. Si confías en el sort inicial, puedes omitir el sort_index final.

In [24]:
roll_mean_values = roll_mean_values.sort_index()
roll_std_values = roll_std_values.sort_index()
df_gpu = df_gpu.sort_index()

Calculamos cuántas desviaciones estándar se aleja un valor

Lógica inteligente con CuPy:
Si std > EPSILON, usamos std. Si es 0 o muy chico, usamos EPSILON para no dividir por 0.

Evitamos aplicar epsilon a todo porque atenúa los datos (aumenta el denominador de z-score)

In [25]:
numerator = df_gpu['trans_value'] - roll_mean_values
denominator = cp.where(roll_std_values > EPSILON, roll_std_values, EPSILON)

z_scores = numerator / denominator

Si std era NaN (ej. ventana de 1 dato), el z-score será NaN. Convertir a 0.

In [26]:
z_scores = z_scores.fillna(0)

Analizamos la sensibilidad que queremos aplicar

Sigma 3: Muy estricto. Eliminará muchas cosas (quizás datos reales pero raros).

Sigma 10: Muy laxo. Solo eliminará errores catastróficos.

In [27]:
total_rows = len(df_gpu)
print(f"Total registros: {total_rows:,}")
print("\nImpacto de diferentes umbrales (Sigma):")

for sigma in np.arange(1.0, 10.0, 0.1):
    outliers = (z_scores.abs() > sigma).sum()
    percent = (outliers / total_rows) * 100
    print(f"Sigma {sigma}: Eliminaría {outliers:,} registros ({percent:.4f}%)")

    if sigma == MAX_ZSCORE:
        df_gpu['is_outlier'] = z_scores.abs() > sigma


Total registros: 182,991

Impacto de diferentes umbrales (Sigma):
Sigma 1.0: Eliminaría 31,161 registros (17.0287%)
Sigma 1.1: Eliminaría 25,831 registros (14.1160%)
Sigma 1.2000000000000002: Eliminaría 20,982 registros (11.4661%)
Sigma 1.3000000000000003: Eliminaría 17,028 registros (9.3054%)
Sigma 1.4000000000000004: Eliminaría 13,749 registros (7.5135%)
Sigma 1.5000000000000004: Eliminaría 11,097 registros (6.0642%)
Sigma 1.6000000000000005: Eliminaría 8,974 registros (4.9041%)
Sigma 1.7000000000000006: Eliminaría 7,213 registros (3.9417%)
Sigma 1.8000000000000007: Eliminaría 5,747 registros (3.1406%)
Sigma 1.9000000000000008: Eliminaría 4,664 registros (2.5488%)
Sigma 2.000000000000001: Eliminaría 3,858 registros (2.1083%)
Sigma 2.100000000000001: Eliminaría 3,179 registros (1.7372%)
Sigma 2.200000000000001: Eliminaría 2,658 registros (1.4525%)
Sigma 2.300000000000001: Eliminaría 2,276 registros (1.2438%)
Sigma 2.4000000000000012: Eliminaría 1,909 registros (1.0432%)
Sigma 2.500000

In [28]:
print(f"Z-Score Máximo encontrado en todo el dataset: {z_scores.max():.4f}")
print(f"Z-Score Mínimo encontrado en todo el dataset: {z_scores.min():.4f}")

print("\nINSPECCIÓN DE LOS 'PEORES' CASOS")
# Vamos a ver las filas que tienen los Z-Scores más altos (los que están en ese limbo de 1.x)
df_gpu['z_score_abs'] = z_scores.abs()
top_outliers = df_gpu.nlargest(10, 'z_score_abs')
print(top_outliers[['sensor_id', 'metric', 'timestamp', 'value', 'trans_value', 'z_score_abs']].to_pandas())

Z-Score Máximo encontrado en todo el dataset: 9.6959
Z-Score Mínimo encontrado en todo el dataset: -9.6956

INSPECCIÓN DE LOS 'PEORES' CASOS
       sensor_id            metric           timestamp     value  trans_value  \
70276       C326            AMONIO 2025-11-03 05:00:00  0.019803     0.019803   
134144      C343      FICOCIANINAS 2025-12-02 05:00:00  0.095310     0.095310   
40969       C327     CONDUCTIVIDAD 2025-10-10 11:00:00  0.000000     0.000000   
114003      C315                PH 2025-11-28 09:00:00  0.000000     0.000000   
25442       C344     CONDUCTIVIDAD 2025-10-07 11:00:00  4.394449     4.394449   
114117      C315     CONDUCTIVIDAD 2025-11-28 09:00:00  0.000000     0.000000   
130380      C314     CONDUCTIVIDAD 2025-12-01 12:00:00  2.995732     2.995732   
109286      C343     CONDUCTIVIDAD 2025-11-27 11:00:00  8.006701     8.006701   
114058      C315  CARBONO ORGANICO 2025-11-28 09:00:00  0.000000     0.000000   
82232       C315          NITRATOS 2025-11-05 12:

### Investigación profunda de outliers

Utilizamos IsolationForest para encontrar datos outliers tanto por métrica simple como con z-score, como por métricas compuestas.

Debemos pasar de un formato largo a un formato ancho para este análisis. Esto generará muchas columnas huecas. Nuestra estrategia es entrenar un IsolationForest para cada conjunto de estaciones con métricas similares

Isolation forest busca qué tan fácil es aislar un dato del resto aplicando cortes arbitraarios en el dataset. Encuentra rápidamente a outliers. Es un algoritmo no supervisado.


NOS TRAEMOS LOS DATOS A CPU POR DEPENDENCY HELL DE RAPIDS QUE NO ME CONSIGUE ENCONTRAR LA CLASE ISOLATION FOREST

Pivotar la tabla puede tardar u poco

In [29]:
df_pd = df_gpu.to_pandas()
df_pivot = df_pd.pivot_table(
    index=['sensor_id', 'timestamp'],
    columns='metric',
    values='trans_value' # Usamos el valor transformado (log) que es más "digerible"
)
print(f"Dimensiones de la tabla pivotada: {df_pivot.shape}")

Dimensiones de la tabla pivotada: (25300, 12)


Identificar el "Perfil" de cada estación, para cada sensor_id, vemos qué columnas NO son nulas nunca (o casi nunca)

Un truco rápido: Agrupamos por sensor_id y vemos si la columna tiene datos (podemos hacerlo porque sé como vienen)

In [30]:
sensor_profiles = df_pivot.groupby('sensor_id').apply(
    lambda x: tuple(sorted(x.dropna(axis=1, how='all').columns))
)

Agrupamos por perfil

Esto nos da algo como:
* C001: ('PH', 'TEMPERATURA')
* C002: ('AMONIO', 'PH', 'TEMPERATURA', 'TURBIDEZ')
* C003: ('PH', 'TEMPERATURA')  <-- Mismo perfil que C001

In [31]:
print("\nAgrupando estaciones por tipos de sensores...")
unique_profiles = sensor_profiles.unique()
print(f"Se han encontrado {len(unique_profiles)} configuraciones de sensores distintas.")
print(unique_profiles)


Agrupando estaciones por tipos de sensores...
Se han encontrado 13 configuraciones de sensores distintas.
[('AMONIO', 'CONDUCTIVIDAD', 'NITRATOS', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ')
 ('CLOROFILA', 'CONDUCTIVIDAD', 'FICOCIANINAS', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ')
 ('AMONIO', 'CARBONO ORGANICO', 'CONDUCTIVIDAD', 'FOSFATOS', 'NIVEL', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ')
 ('AMONIO', 'CARBONO ORGANICO', 'CONDUCTIVIDAD', 'NIVEL', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ')
 ('AMONIO', 'CONDUCTIVIDAD', 'NITRATOS', 'NIVEL', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ')
 ('AMONIO', 'CARBONO ORGANICO', 'CONDUCTIVIDAD', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ')
 ('AMONIO', 'CARBONO ORGANICO', 'CONDUCTIVIDAD', 'FOSFATOS', 'NITRATOS', 'NIVEL', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ')
 ('CONDUCTIVIDAD', 'NIVEL', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ')
 ('TEMPERATURA',)
 ('CONDUCTIVIDAD', 'OXIGE

Iteramos cada modelo y entrenamos un modelo específico

In [32]:
results = []

for i, profile in enumerate(unique_profiles):
    # Sensores que tienen este perfil exacto
    sensors_in_profile = sensor_profiles[sensor_profiles == profile].index

    print(f"\n--- Grupo {i+1}/{len(unique_profiles)}: {profile} ---")

    # .loc devuelve una vista o copia dependiendo del contexto.
    # Añadimos .copy() para asegurarnos de que es un objeto independiente y evitar Warnings
    subset = df_pivot.loc[df_pivot.index.get_level_values('sensor_id').isin(sensors_in_profile), list(profile)].copy()

    # Limpieza de huecos temporales
    subset = subset.ffill().dropna()

    if len(subset) < 100:
        print(f"   Muy pocos datos para entrenar. Saltando perfil.")
        continue

    # Entrenar Isolation Forest
    iso_forest = IsolationForest(
        n_estimators=100,
        contamination=0.005,
        n_jobs=-1,
        random_state=69420
    )

    iso_forest.fit(subset)

    # 1. Predicción binaria (Anomaly: True/False)
    preds = iso_forest.predict(subset)

    # 2. Puntuación continua (Normality Score)
    scores = iso_forest.decision_function(subset)

    # Guardamos AMBOS resultados en el dataframe
    subset['is_anomaly_multivariate'] = preds == -1
    subset['normality_score'] = scores

    # Logging: Contamos anomalías solo para informar por pantalla
    n_anomalies = subset['is_anomaly_multivariate'].sum()
    print(f"   Procesado. Detectadas {n_anomalies} anomalías.")

    # Guardamos el subset completo (con anomalías Y datos normales puntuados)
    results.append(subset)


--- Grupo 1/13: ('AMONIO', 'CONDUCTIVIDAD', 'NITRATOS', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ') ---
   Procesado. Detectadas 13 anomalías.

--- Grupo 2/13: ('CLOROFILA', 'CONDUCTIVIDAD', 'FICOCIANINAS', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ') ---
   Procesado. Detectadas 34 anomalías.

--- Grupo 3/13: ('AMONIO', 'CARBONO ORGANICO', 'CONDUCTIVIDAD', 'FOSFATOS', 'NIVEL', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ') ---
   Procesado. Detectadas 17 anomalías.

--- Grupo 4/13: ('AMONIO', 'CARBONO ORGANICO', 'CONDUCTIVIDAD', 'NIVEL', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ') ---
   Procesado. Detectadas 22 anomalías.

--- Grupo 5/13: ('AMONIO', 'CONDUCTIVIDAD', 'NITRATOS', 'NIVEL', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ') ---
   Procesado. Detectadas 5 anomalías.

--- Grupo 6/13: ('AMONIO', 'CARBONO ORGANICO', 'CONDUCTIVIDAD', 'OXIGENO DISUELTO', 'PH', 'TEMPERATURA', 'TURBIDEZ') ---
   Procesado. Detectadas 9 anomalías.

--- Grupo 7/1

Concadenar resultados

In [33]:
df_final_scored = pd.concat(results)
print(f"\nProceso terminado. Total registros puntuados: {len(df_final_scored):,}")
print("\nTop 5 Anomalías más extremas (Score más negativo):")
print(df_final_scored.sort_values('normality_score', ascending=True).head(5))


Proceso terminado. Total registros puntuados: 25,300

Top 5 Anomalías más extremas (Score más negativo):
metric                         AMONIO  CONDUCTIVIDAD  NITRATOS  \
sensor_id timestamp                                              
C315      2025-11-28 09:00:00     0.0       0.000000   0.00000   
C306      2025-12-10 11:00:00     0.0       0.000000       NaN   
C317      2025-11-06 07:00:00     NaN            NaN       NaN   
C333      2025-12-03 10:00:00     0.0       5.587249   6.25575   
C327      2025-10-10 11:00:00     0.0       0.000000       NaN   

metric                         OXIGENO DISUELTO   PH  TEMPERATURA  TURBIDEZ  \
sensor_id timestamp                                                           
C315      2025-11-28 09:00:00              0.00  0.0          0.0  0.000000   
C306      2025-12-10 11:00:00              0.00  0.0          0.0  0.000000   
C317      2025-11-06 07:00:00               NaN  NaN         11.9       NaN   
C333      2025-12-03 10:00:00       

Muy raro la observación en la central C310:

La Hipótesis del Modelo: El Isolation Forest ha detectado un patrón extraño:

Oxígeno Disuelto subiendo rápido: Pasar de 5.5 a 7.3 en 4 horas sin que cambie la temperatura es raro (normalmente el oxígeno sube si baja la temperatura o si hay mucha fotosíntesis, pero la Turbidez es constante).

Nitratos clavados en 0.0: En un río con Amonio presente (1.8 - 1.9, que es detectable), tener Nitratos absolutos en 0.0 es químicamente sospechoso (el ciclo del nitrógeno suele convertir algo de amonio en nitratos).

Se investiga más abajo en la sección de investigación

Procedemos a usar isolation forest para la tarea contraria - el registro más representativo del dataset

Ordenar los datos más altos, los más densos, más representativos:

In [34]:
print("\nTop 5 Registros más 'Normales/Centrales' (Score más alto):")
print(df_final_scored.sort_values('normality_score', ascending=False).head(5))


Top 5 Registros más 'Normales/Centrales' (Score más alto):
metric                         AMONIO  CONDUCTIVIDAD  NITRATOS  \
sensor_id timestamp                                              
C317      2025-10-30 18:00:00     NaN            NaN       NaN   
          2025-11-29 12:00:00     NaN            NaN       NaN   
          2025-11-30 07:00:00     NaN            NaN       NaN   
          2025-11-30 08:00:00     NaN            NaN       NaN   
          2025-11-30 15:00:00     NaN            NaN       NaN   

metric                         OXIGENO DISUELTO  PH  TEMPERATURA  TURBIDEZ  \
sensor_id timestamp                                                          
C317      2025-10-30 18:00:00               NaN NaN         26.0       NaN   
          2025-11-29 12:00:00               NaN NaN         26.0       NaN   
          2025-11-30 07:00:00               NaN NaN         26.0       NaN   
          2025-11-30 08:00:00               NaN NaN         26.0       NaN   
         

Estación C333 (Noviembre 2025):

* PH: 7.2 (Perfectamente neutro/sano).

* TEMPERATURA: ~16ºC (Razonable para otoño).

* OXIGENO: ~4.0 (Un poco bajo, pero estable).

* AMONIO: 0.0 (Limpio).

* NITRATOS: ~6.2 (Estables).

Diagnóstico: Este es el "Golden Batch". Así se ve el río cuando está tranquilo. Es muy valioso saber esto, porque cualquier desviación futura se debe comparar contra estos valores, no contra la media global (que podría estar sucia).

Se profundizará con esto en el análisis exploratorio pues necesitamos calibrar el río

## Investigación

In [46]:
PATH_SAICA_METADATA = '../data/datasets/raw/saica_stations_master_list.json'
df_stations = pd.read_json(PATH_SAICA_METADATA)

In [47]:
df_stations

Unnamed: 0,codigo,nombre,subcuenca,url_detalles
0,C322,SAICA CARCABOSO,ALAGÓN,https://saihtajo.chtajo.es/index.php?w=get-est...
1,C323,SAICA BEJAR,ALAGÓN,https://saihtajo.chtajo.es/index.php?w=get-est...
2,C313,SAICA CAZALEGAS,ALBERCHE,https://saihtajo.chtajo.es/index.php?w=get-est...
3,C326,SAICA ESCALONA,ALBERCHE,https://saihtajo.chtajo.es/index.php?w=get-est...
4,C342,SAICA PICADAS,ALBERCHE,https://saihtajo.chtajo.es/index.php?w=get-est...
5,C317,SAICA ALMARAZ,BAJO TAJO,https://saihtajo.chtajo.es/index.php?w=get-est...
6,C331,SAICA CEDILLO,BAJO TAJO,https://saihtajo.chtajo.es/index.php?w=get-est...
7,C343,SAICA VALDECAÑAS,BAJO TAJO,https://saihtajo.chtajo.es/index.php?w=get-est...
8,C345,SAICA TORREJON-TAJO,BAJO TAJO,https://saihtajo.chtajo.es/index.php?w=get-est...
9,C302,SAICA ARANJUEZ,CABECERA,https://saihtajo.chtajo.es/index.php?w=get-est...


Vamos a verificar los registros que estaríamos eliminando utilizando el z-score

Permitiendo un MAX_ZSCORE de 6

In [48]:
mask_outliers = z_scores.abs() > MAX_ZSCORE

In [50]:
df_to_delete = df_gpu[mask_outliers].copy()
df_to_delete['z_score_calculated'] = z_scores[mask_outliers]

print(f"Usando Sigma {MAX_ZSCORE}, hemos aislado {len(df_to_delete)} registros sospechosos.")
print("-" * 60)

Usando Sigma 6, hemos aislado 107 registros sospechosos.
------------------------------------------------------------


In [53]:
print("Top 10 registros más extremos que serán eliminados:")
cols_to_show = ['sensor_id', 'metric', 'timestamp', 'value', 'trans_value', 'z_score_calculated']
print(df_to_delete.sort_values('z_score_calculated', ascending=False)[cols_to_show].head(10).to_pandas())

print("-" * 60)
print("Muestra aleatoria de registros a eliminar (para ver casos no tan extremos):")
print(df_to_delete.sample(5)[cols_to_show].to_pandas())

Top 10 registros más extremos que serán eliminados:
       sensor_id            metric           timestamp      value  \
70276       C326            AMONIO 2025-11-03 05:00:00   0.019803   
134144      C343      FICOCIANINAS 2025-12-02 05:00:00   0.095310   
109286      C343     CONDUCTIVIDAD 2025-11-27 11:00:00   8.006701   
140749      C315  CARBONO ORGANICO 2025-12-03 12:00:00   2.945491   
141100      C314                PH 2025-12-03 13:00:00  14.000000   
131683      C315          FOSFATOS 2025-12-01 18:00:00   2.988708   
163189      C333          TURBIDEZ 2025-12-07 19:00:00   4.714921   
36238       C323          TURBIDEZ 2025-10-09 13:00:00   2.639057   
114798      C333          TURBIDEZ 2025-11-28 12:00:00   5.045359   
55616       C307            AMONIO 2025-10-31 10:00:00   2.557227   

        trans_value  z_score_calculated  
70276      0.019803            9.695897  
134144     0.095310            9.695897  
109286     8.006701            9.685767  
140749     2.945491 

#### Análisis científico de resultados

**PH 14.0 (Z-Score 9.60)** - Imposible
Ya aquí vemos por ejemplo que se ha detectado un PH de 14, imposible, saturación de voltaje del sensor:
El pH va de 0 a 14. Un valor de 14.0 exacto indica una solución saturada de Hidróxido de Sodio (sosa cáustica pura). Es imposible que un río alcance este valor de forma natural o incluso por un vertido normal. Es un fallo típico de saturación del sensor (el electrodo leyó el voltaje máximo posible).

**Conductividad 8.0 µS/cm (Z-Score 9.68)** - Muy sospechoso, error rango bajo
El agua de río normal suele tener entre 50 y 1500 µS/cm (dependiendo de la salinidad). Un valor de 8 µS/cm es casi agua destilada o agua de deshielo muy pura en alta montaña. En la cuenca del Tajo (zonas agrícolas/urbanas), bajar a 8 µS/cm es prácticamente imposible. Indica sensor desconectado o fallo de lectura.

**Turbidez 5.04 NTU (Z-Score 9.05)** - Posible falso positivo
5 NTU es un agua bastante clara (el agua potable permite hasta 1-5 NTU). ¿Por qué es un outlier? Porque probablemente ese río suele estar cristalino (0-1 NTU). Al subir a 5 NTU (una lluvia ligera), matemáticamente es un salto de 500%, pero físicamente es un valor perfectamente normal.

**Amonio 0.019 mg/L (Z-Score 9.69)** - Posible falso positivo
0.019 mg/L es prácticamente cero (límite de detección). Si el sensor normalmente marca 0.001, subir a 0.019 es estadísticamente enorme (Z=9), pero ambientalmente es insignificante. Borrar este dato no afecta al estudio de contaminación.

**Carbono Orgánico = 0.00 mg/L (Z-Score -8.81)** - Imposible
Siempre hay algo de materia orgánica en un río (hojas, algas, bacterias). Un TOC (Carbono Orgánico Total) de 0.00 exacto es imposible en la naturaleza. Indica fallo de sensor (lectura plana).

**Conductividad = 5.73 µS/cm (Z-Score -6.39)** - Imposible
Igual que el caso anterior. Agua ultra-pura (casi destilada) en un tramo medio de río. Error de sensor.

**Amonio = 1.18 mg/L (Z-Score 6.78)** - POSIBLE (Evento Real de Contaminación)
El límite de buen estado suele estar en 0.5 - 1.0 mg/L. Un valor de 1.18 mg/L indica un vertido o contaminación moderada.

# Política Integral de Calidad del Dato (QC) e Imputación Inteligente

Esta celda define las reglas de negocio para la limpieza (Hard Limits) y la estrategia diferenciada de relleno de huecos (Smart Infilling) según la naturaleza física de cada variable.

## 1. Fase de Limpieza: Límites Físicos (Hard Limits)

**Acción:** Todo valor fuera de estos rangos se considera imposible o error crítico y se convierte inmediatamente a `NaN`.

| Métrica | Unidad | Min | Max | Restricción Extra | Justificación |
| :--- | :--- | :--- | :--- | :--- | :--- |
| **AMONIO** | $\text{mg}/\text{L}$ | 0.0 | 20.0 | - | $>20$ indica toxicidad extrema o error. |
| **CARBONO ORG.** | $\text{mg}/\text{L}$ | 0.0 | 50.0 | No Cero Exacto | Siempre hay carbono. $>50$ es lodo. |
| **CLOROFILA** | $\mu\text{g}/\text{L}$ | 0.0 | 500.0 | - | Permite blooms de algas intensos. |
| **CONDUCTIVIDAD** | $\mu\text{S}/\text{cm}$ | 20.0 | 5000.0 | - | $<20$ agua destilada. $>5000$ salmuera. |
| **FICOCIANINAS** | $\mu\text{g}/\text{L}$ | 0.0 | 1000.0 | - | Tope algas verde-azuladas. |
| **FOSFATOS** | $\text{mg}/\text{L}$ | 0.0 | 10.0 | - | $>10$ vertido industrial directo. |
| **NITRATOS** | $\text{mg}/\text{L}$ | 0.0 | 250.0 | - | Margen amplio para contaminación agrícola. |
| **NIVEL** | $\text{m}$ | 0.0 | 20.0 | - | Niveles negativos imposibles. |
| **O. DISUELTO** | $\text{mg}/\text{L}$ | 0.0 | 20.0 | - | $>20$ sobresaturación imposible. |
| **PH** | ud. | 4.0 | 10.5 | No Cero Exacto | Rango vital. Fuera es ácido/base fuerte. |
| **TEMPERATURA** | $\text{ºC}$ | 0.0 | 38.0 | No Cero Exacto | $38\text{ºC}$ límite biológico. |
| **TURBIDEZ** | $\text{NTU}$ | 0.0 | 2000.0 | - | $2000$ es saturación de sedimentos. |

## 2. Fase de Limpieza: Coherencia Temporal

Se aplica sobre la variable **TEMPERATURA** antes del infilling.
* **Regla:** Si $\Delta T > 10 \text{ºC}$ respecto al registro anterior inmediato.
* **Acción:** Marcar como __NaN__.
* **Razón:** Inercia térmica del agua (imposible calentar un río 10 grados en 1 hora).

---

## 3. Fase de Imputación: Estrategia Diferenciada (Smart Infilling)

A diferencia de la interpolación genérica, aquí aplicamos métodos específicos según la "física" de la variable.

### Grupo A: Variables de Alta Inercia ("Los Lentos")
Magnitudes con gran capacidad de almacenamiento o cambios lentos.
* **Métricas:** TEMPERATURA, NIVEL, CONDUCTIVIDAD.
* **Método:** Linear (Interpolación Lineal).
* **Ventana Máxima (Gap):** **6 horas**.
* **Justificación:** Unir dos puntos separados por 6h con una recta introduce un error mínimo debido a la alta estabilidad de estas variables.

### Grupo B: Variables Biológicas / Cíclicas ("Ritmos Circadianos")
Magnitudes gobernadas por la luz solar y la fotosíntesis (ciclos día/noche senoidales).
* **Métricas:** OXIGENO DISUELTO, CLOROFILA, FICOCIANINAS, PH.
* **Método:** Spline o Polynomial (orden 2 o 3).
* **Ventana Máxima (Gap):** **3 horas**.
* **Justificación:** La interpolación lineal "aplana" las curvas, borrando picos vitales de producción de oxígeno. Se necesita una curva que respete la tendencia inercial. El límite es estricto para no inventar ciclos falsos.

### Grupo C: Variables de Evento ("Los Nerviosos")
Magnitudes con ruido de fondo bajo y picos violentos (lluvias, vertidos).
* **Métricas:** TURBIDEZ, AMONIO, NITRATOS, FOSFATOS, CARBONO ORGANICO.
* **Método:** Linear (Muy Conservador).
* **Ventana Máxima (Gap):** **2 horas**.
* **Justificación:** Riesgo crítico. Interpolar un hueco grande durante una tormenta puede ocultar una riada o un vertido tóxico. Si el sensor falla más de 2 horas durante un evento, el dato se considera perdido para no falsear la carga contaminante.

---

### Resumen de Implementación Técnica

| Grupo | Variables | Método `interpolate()` | `limit` (Horas) |
| :--- | :--- | :--- | :--- |
| **Inerciales** | Temp, Nivel, Cond | `method='linear'` | 6 |
| **Biológicas** | OD, Clorofila, Ficocianinas, pH | `method='spline', order=3` | 3 |
| **Eventos** | Turbidez, Nutrientes (N/P), TOC | `method='linear'` | 2 |

*Nota: Cualquier hueco superior al `limit` especificado permanecerá como `NaN`.*

#### Conclusiones

Errores Físico-Químicos (Hard Errors): Valores como pH 14.0 o Carbono Orgánico 0.00 mg/L violan los principios básicos de la limnología en ríos naturales. Representan fallos instrumentales (saturación o desconexión).

Inconsistencias Estadísticas Extremas (Soft Errors): Valores como Conductividad < 10 µS/cm en tramos medios de la cuenca del Tajo (donde la media supera los 500 µS/cm) indican lecturas erróneas por falta de mineralización detectada.

#### Políticas a aplicar


### ZSCORE

### Isolation forest

Investigación sospechosos identificado por Isolation Forest

In [35]:
# Verificamos los datos crudos originales de la C310 antes de pivotar
print("Inspección de la estación C310:")
inspect_c310 = df_gpu[df_gpu['sensor_id'] == 'C310'].to_pandas()

# Vemos qué métricas tiene y cuántos datos hay de cada una
metrics_count = inspect_c310.groupby('metric')['value'].count()
print(metrics_count)

# Vemos una muestra de esos Nitratos sospechosos
print("\nMuestra de Nitratos en C310:")
print(inspect_c310[inspect_c310['metric'] == 'NITRATOS'].tail(5))
print(inspect_c310[inspect_c310['metric'] == 'OXIGENO DISUELTO'].tail(5))
print(inspect_c310[inspect_c310['metric'] == 'TEMPERATURA'].tail(5))

Inspección de la estación C310:
metric
AMONIO              843
CONDUCTIVIDAD       843
NITRATOS            843
OXIGENO DISUELTO    843
PH                  843
TEMPERATURA         843
TURBIDEZ            843
Name: value, dtype: int64

Muestra de Nitratos en C310:
                 timestamp  value sensor_id    metric unit   batch  \
181952 2025-12-11 10:00:00    0.0      C310  NITRATOS  ppm  batch5   
182169 2025-12-11 11:00:00    0.0      C310  NITRATOS  ppm  batch5   
182386 2025-12-11 12:00:00    0.0      C310  NITRATOS  ppm  batch5   
182603 2025-12-11 13:00:00    0.0      C310  NITRATOS  ppm  batch5   
182820 2025-12-11 14:00:00    0.0      C310  NITRATOS  ppm  batch5   

        trans_value  z_score_abs  
181952          0.0          0.0  
182169          0.0          0.0  
182386          0.0          0.0  
182603          0.0          0.0  
182820          0.0          0.0  
                 timestamp  value sensor_id            metric unit   batch  \
182051 2025-12-11 10:00:00  

Diagnóstico: Seguramente ese sensor esté roto o offline

In [36]:
print("INVESTIGACIÓN FICOCIANINAS")
ficos_neg = df_gpu[
    (df_gpu['metric'] == 'FICOCIANINAS') &
    (df_gpu['value'] < 0)
    ]

print(ficos_neg['value'].describe())

print("\nEjemplos de valores negativos:")
print(ficos_neg['value'].head(5))

INVESTIGACIÓN FICOCIANINAS
count    28.000000
mean     -0.109567
std       0.022259
min      -0.223144
25%      -0.105361
50%      -0.105361
75%      -0.105361
max      -0.105361
Name: value, dtype: float64

Ejemplos de valores negativos:
51882    -0.105361
52099    -0.105361
104613   -0.105361
104830   -0.105361
105047   -0.105361
Name: value, dtype: float64


In [37]:
df_gpu

Unnamed: 0,timestamp,value,sensor_id,metric,unit,batch,trans_value,z_score_abs
0,2025-10-02 14:00:00,7.400000,C323,PH,ph,batch1,7.400000,4.440892e-09
1,2025-10-02 14:00:00,0.095310,C322,AMONIO,ppm,batch1,0.095310,1.774644e+00
2,2025-10-02 14:00:00,1.163151,C342,CLOROFILA,µg/l,batch1,1.163151,6.388911e-01
3,2025-10-02 14:00:00,1.064711,C323,CARBONO ORGANICO,ppm,batch1,1.064711,1.691932e+00
4,2025-10-02 14:00:00,2.610070,C313,CLOROFILA,ppb,batch1,2.610070,3.181695e+00
...,...,...,...,...,...,...,...,...
182986,2025-12-11 14:00:00,12.500000,C314,TEMPERATURA,ºC,batch5,12.500000,9.104366e-01
182987,2025-12-11 14:00:00,0.000000,C308,AMONIO,mg/l,batch5,0.000000,0.000000e+00
182988,2025-12-11 14:00:00,8.300000,C329,PH,ph,batch5,8.300000,7.213932e-01
182989,2025-12-11 14:00:00,0.955511,C304,FOSFATOS,ppm,batch5,0.955511,9.449859e-01


In [38]:
print(df_gpu[df_gpu['metric'] == 'AMONIO']['value'].describe())

count    14335.000000
mean         0.878308
std          1.036280
min          0.000000
25%          0.095310
50%          0.405465
75%          1.335001
max          4.875197
Name: value, dtype: float64
