In [1]:
import pandas as pd
import multiprocessing
import time
import numpy as np
import csv
import sys
from itertools import chain

# Aumentar el límite del campo CSV, esencial para archivos grandes
csv.field_size_limit(sys.maxsize)
pd.options.mode.chained_assignment = None

# --- ALGORITMOS DE ORDENAMIENTO Y DEPURACIÓN ---

def merge_sort(arr):
    if len(arr) <= 1:
        return arr
    mid = len(arr) // 2
    left = merge_sort(arr[:mid])
    right = merge_sort(arr[mid:])
    return merge(left, right)

def merge(left, right):
    result = []
    i = j = 0
    while i < len(left) and j < len(right):
        # Criterio de ordenamiento: de mayor a menor temperatura
        if left[i][2] >= right[j][2]:
            result.append(left[i])
            i += 1
        else:
            result.append(right[j])
            j += 1
    result.extend(left[i:])
    result.extend(right[j:])
    return result

# Quick Sort para depurar: filtra valores no numéricos o 0.0
def depurar_y_particionar(arr):
    n = len(arr)
    i = -1
    for j in range(n):
        temp_val = arr[j][2]
        # Aplica la depuración sobre los promedios calculados
        if isinstance(temp_val, (int, float)) and temp_val != 0.0:
            i += 1
            arr[i], arr[j] = arr[j], arr[i]
    return i + 1

# --- IMPLEMENTACIÓN DE LA FÓRMULA DE PROMEDIO ANUAL (Lógica Java) ---

def calcular_promedios_anuales(datos_list):
    """
    Calcula la temperatura promedio anual por ciudad usando la lógica de HashMap.
    Recibe: Lista de tuplas (Ciudad, Año, Temp).
    Devuelve: Lista de tuplas (Ciudad, Año, Promedio).
    """
    # map para guardar la suma de las temperaturas de ciudad-anio
    sumas = {}
    # conteo de dias por ciudad
    conteos = {}

    for ciudad, anio, temp in datos_list:
        clave = f"{ciudad}|{anio}"

        # Lógica para sumar las temperaturas
        # Si la clave existe, actualiza; si no, inicializa.
        sumas[clave] = sumas.get(clave, 0.0) + temp
        
        # Lógica para contar los días
        # Si la clave existe, incrementa; si no, inicializa a 1.
        conteos[clave] = conteos.get(clave, 0) + 1

    lista_promedios = []

    # Por cada clave, aplica la fórmula: Promedio = Suma / Conteo
    for clave in sumas.keys():
        suma_total = sumas[clave]
        cantidad_dias = conteos[clave]

        # FÓRMULA: Suma de Temperaturas / N (Días)
        promedio = suma_total / cantidad_dias

        # Descomponer la clave
        ciudad, anio = clave.split("|")

        # Agregar el nuevo registro de promedio anual
        # Formato (Ciudad, Año, Promedio) para que sea compatible con Merge Sort
        lista_promedios.append((ciudad, anio, promedio))

    return lista_promedios

# --- LÓGICA DE PROCESAMIENTO (CON MULTIPROCESAMIENTO) ---

def process_chunk(chunk):
    """
    Realiza la limpieza a nivel de fila de forma paralela y devuelve filas limpias.
    """
    if chunk.empty:
        return pd.DataFrame()

    try:
        chunk = chunk.copy()

        # 1. Limpieza y conversión a numérico
        chunk['AvgTemperature'] = chunk['AvgTemperature'].replace(r'["\(\)-]', '', regex=True)
        chunk['AvgTemperature'] = pd.to_numeric(chunk['AvgTemperature'], errors='coerce')

        # 2. Limpieza de fecha y extracción del año
        chunk['Fecha'] = chunk['Fecha'].replace(r'["\(\)]', '', regex=True)
        chunk = chunk.dropna(subset=['AvgTemperature', 'Fecha'])
        chunk['Year'] = chunk['Fecha'].str.split('/').str[2]
        chunk = chunk.dropna(subset=['Year'])

        # 3. Filtrado extremo
        chunk = chunk[chunk['AvgTemperature'] > -90]

        # 4. Devolver las columnas necesarias para el cálculo del promedio:
        return chunk[['City', 'Year', 'AvgTemperature']]

    except Exception as e:
        return pd.DataFrame()


def parallel_processing_with_chunks(chunks_to_process, func):
    """
    Divide y mapea la función 'func' a los chunks en paralelo.
    """
    num_cores = multiprocessing.cpu_count()
    print(f"\n[PROCESAMIENTO PARALELO] Usando {num_cores} núcleos para {len(chunks_to_process)} bloques.")

    with multiprocessing.Pool(num_cores) as pool:
        results = pool.map(func, chunks_to_process)

    return results

# --- FUNCIÓN PRINCIPAL (MAIN) ---

def main():
    tiempo_INICIAL = time.time()
    archivo_entrada = 'temperaturas.csv'
    archivo_salida = 'procesamiento_paralelo_python_promedios.csv'
    CHUNK_SIZE = 100000

    print("\n=======================================================")
    print("Iniciando Cálculo de Promedios Anuales por Ciudad")
    print("=======================================================")

    # 1. LECTURA POR CHUNKS Y LIMPIEZA PARALELA
    print("\n[PASO 1] Leyendo y Limpiando filas crudas en paralelo...")
    start_lectura = time.time()
    chunks = []
    try:
        csv_iterator = pd.read_csv(
            archivo_entrada,
            usecols=['City', 'AvgTemperature', 'Fecha'],
            chunksize=CHUNK_SIZE,
            encoding='utf-8',
            low_memory=True,
            skip_blank_lines=True
        )
        for chunk in csv_iterator:
            chunks.append(chunk)
    except Exception as e:
        print(f"\n ERROR: Al leer el archivo {archivo_entrada}. Detalles: {e}")
        return
    
    results = parallel_processing_with_chunks(chunks, process_chunk)
    valid_results = [res for res in results if not res.empty]
    if not valid_results:
        print("\n Advertencia: No se obtuvieron datos válidos de los procesos paralelos.")
        return

    # Consolidar todos los DataFrames de filas limpias
    final_df_limpio = pd.concat(valid_results, ignore_index=True)
    
    # Convertir el DataFrame a lista de tuplas para el algoritmo de agregación
    data_list_limpia = final_df_limpio.to_records(index=False).tolist()
    end_lectura = time.time()
    
    print("-------------------------------------------------------")
    print(f"   Tiempo de lectura y limpieza paralela: {end_lectura - start_lectura:.4f} segundos")
    print(f"   Filas crudas válidas consolidadas: {len(data_list_limpia):,} filas.")
    print("-------------------------------------------------------")


    # 2. CÁLCULO DE PROMEDIOS ANUALES (se implementa la formula)
    print("\n[PASO 2] Aplicando Fórmula de Promedio Anual...")
    start_promedios = time.time()
    
    # Se utiliza la lista limpia y se aplica la lógica de agregación
    # El resultado es la lista de promedios anuales
    arr_promedios = calcular_promedios_anuales(data_list_limpia)
    
    end_promedios = time.time()
    
    print(f"  Tiempo de cálculo de promedios: {end_promedios - start_promedios:.4f} segundos")
    print(f"  Registros de promedios anuales: {len(arr_promedios):,} registros (Resultado de la agregación).")
    print("-------------------------------------------------------")


    # 3. DEPURACIÓN NIVEL 2 (QuickSort)
    print("\n[PASO 3] Depuración (QuickSort-style) de los promedios...")
    start_depuracion = time.time()
    n_validos = depurar_y_particionar(arr_promedios)
    arr_depurado = arr_promedios[:n_validos]
    end_depuracion = time.time()
    print(f"   Tiempo de depuración: {end_depuracion - start_depuracion:.4f} segundos")
    print(f"   Promedios finales a ordenar: {n_validos:,} registros.")
    print("-------------------------------------------------------")


    # 4. ORDENAMIENTO (Merge Sort)
    print("\n[PASO 4] Ordenamiento (Merge Sort) de los promedios...")
    start_merge = time.time()
    datos_ordenados = merge_sort(arr_depurado)
    end_merge = time.time()
    print(f"  Tiempo de ordenamiento: {end_merge - start_merge:.4f} segundos")
    print("-------------------------------------------------------")


    # 5. ESCRITURA DE REPORTE
    print(f"\n[PASO 5] Generando archivo CSV: {archivo_salida}")
    start_escritura = time.time()
    try:
        with open(archivo_salida, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(['Anio', 'Ciudad', 'TemperaturaPromedio'])
            
            for row in datos_ordenados:
                ciudad, anio, temp = row
                writer.writerow([anio, f'"{ciudad}"', f'{temp:.2f}'])

    except Exception as e:
        print(f"\n ERROR: Al escribir el archivo. Detalles: {e}")
        return
    end_escritura = time.time()
    print(f"  Tiempo de escritura: {end_escritura - start_escritura:.4f} segundos")


    tiempo_FINAL = time.time()
    print("\n=======================================================")
    print(f" PROCESO COMPLETADO. Tiempo total: {tiempo_FINAL - tiempo_INICIAL:.4f} segundos.")
    print("=======================================================")


if __name__ == '__main__':
    multiprocessing.freeze_support()
    main()


Iniciando Cálculo de Promedios Anuales por Ciudad

[PASO 1] Leyendo y Limpiando filas crudas en paralelo...

[PROCESAMIENTO PARALELO] Usando 8 núcleos para 11 bloques.
-------------------------------------------------------
   Tiempo de lectura y limpieza paralela: 4.6973 segundos
   Filas crudas válidas consolidadas: 992,090 filas.
-------------------------------------------------------

[PASO 2] Aplicando Fórmula de Promedio Anual...
  Tiempo de cálculo de promedios: 0.5525 segundos
  Registros de promedios anuales: 2,912 registros (Resultado de la agregación).
-------------------------------------------------------

[PASO 3] Depuración (QuickSort-style) de los promedios...
   Tiempo de depuración: 0.0016 segundos
   Promedios finales a ordenar: 2,912 registros.
-------------------------------------------------------

[PASO 4] Ordenamiento (Merge Sort) de los promedios...
  Tiempo de ordenamiento: 0.0170 segundos
-------------------------------------------------------

[PASO 5] Gene