In [3]:
#Nombre: ETLTextos
#Autor: PhD (C) Manuel Alejandro Pastrana Pardo
#Fecha de Creación: 2025-02-13
#Descripción: Ejemplo descomposición por tareas (funcional).
#Curso: Infraestructuras Paralelas y Distribuidas
#Código: 750023C

import multiprocessing
import time

def leer_datos(archivo_entrada, cola):
    """Lee datos del archivo y los envía a la cola."""
    with open(archivo_entrada, 'r') as f:
        for linea in f:
            cola.put(linea.strip())
    cola.put(None)  # Señal de fin

def limpiar_datos(cola_entrada, cola_salida):
    """Limpia los datos y los envía a la siguiente cola."""
    while True:
        dato = cola_entrada.get()
        if dato is None:
            cola_salida.put(None)
            break
        dato_limpio = dato.replace(',', '.')  # Ejemplo de limpieza
        cola_salida.put(dato_limpio)

def transformar_datos(cola_entrada, cola_salida):
    """Transforma los datos y los envía a la siguiente cola."""
    while True:
        dato = cola_entrada.get()
        if dato is None:
            cola_salida.put(None)
            break
        dato_transformado = float(dato) * 2  # Ejemplo de transformación
        cola_salida.put(dato_transformado)

def escribir_datos(cola_entrada, archivo_salida):
    """Escribe los datos transformados al archivo de salida."""
    with open(archivo_salida, 'w') as f:
        while True:
            dato = cola_entrada.get()
            if dato is None:
                break
            f.write(str(dato) + '\n')

if __name__ == '__main__':
    # Crear colas
    cola_lectura = multiprocessing.Queue()
    cola_limpieza = multiprocessing.Queue()
    cola_transformacion = multiprocessing.Queue()

    # Crear procesos
    proceso_lectura = multiprocessing.Process(target=leer_datos, args=('02_SesiónIV/entrada.txt', cola_lectura))
    proceso_limpieza = multiprocessing.Process(target=limpiar_datos, args=(cola_lectura, cola_limpieza))
    proceso_transformacion = multiprocessing.Process(target=transformar_datos, args=(cola_limpieza, cola_transformacion))
    proceso_escritura = multiprocessing.Process(target=escribir_datos, args=(cola_transformacion, '02_SesiónIV/salida.txt'))

    # Iniciar procesos
    proceso_lectura.start()
    proceso_limpieza.start()
    proceso_transformacion.start()
    proceso_escritura.start()

    # Esperar a que terminen
    proceso_lectura.join()
    proceso_limpieza.join()
    proceso_transformacion.join()
    proceso_escritura.join()

    print("Proceso completado.")


Proceso completado.
