# Invocación Paralela de Lambda para Procesamiento de Películas
 
Este notebook automatiza el procesamiento masivo de archivos JSON almacenados en S3 mediante la invocación paralela de funciones AWS Lambda. El objetivo es cargar grandes volúmenes de datos de películas en una base de datos de forma eficiente, controlada y monitorizada.

**Flujo principal:**
- Obtiene la lista de archivos JSON a procesar desde un bucket S3, excluyendo archivos irrelevantes (como `genres.json`).
- Inicializa un objeto de estadísticas thread-safe para monitorizar el progreso y los resultados.
- Define una función robusta para invocar la Lambda con reintentos automáticos y registro de métricas.
- Lanza el procesamiento paralelo de todos los archivos usando un pool de hasta 3 Lambdas concurrentes, con reintentos automáticos en caso de error.
- Muestra el progreso en tiempo real y reporta estadísticas finales de éxito, fallo y rendimiento.

**Configuración Recomendada para AWS Lambda:**
- **Timeout:** 900 segundos (15 minutos) - máximo permitido
- **Memoria:** 1024 MB (1 GB) - balance óptimo rendimiento/costo
- **Runtime:** Python 3.11 o superior
- **Arquitectura:** x86_64

**Variables necesarias:**
- Credenciales de base de datos (desde Secrets Manager)
- Configuración de región AWS y nombre del secreto en variables del entorno

El reprocesamiento de archivos fallidos se realiza en una fase posterior, reutilizando la misma lógica de procesamiento paralelo.

In [None]:
import boto3
import json
import os
from dotenv import load_dotenv
from datetime import datetime
import concurrent.futures
import time
import threading
from threading import Lock

# Cargar variables de entorno
load_dotenv()

In [None]:
# Configuración de AWS
bucket_name = os.getenv('BUCKET_NAME_E')  # Data lake original
lambda_function_name = 'volcado-completo'
region_name = os.getenv('REGION')

# Configuración de procesamiento paralelo
MAX_CONCURRENT_LAMBDAS = 3
RETRY_ATTEMPTS = 3  # Reintentos automáticos
LAMBDA_TIMEOUT = 900  # 15 minutos timeout para Lambda (máximo permitido)

# Inicializar clientes AWS
s3 = boto3.client('s3')
lambda_client = boto3.client('lambda', region_name=region_name)

In [None]:
# Clase para estadísticas thread-safe
class ProcessingStats:
    def __init__(self):
        self.lock = Lock()
        self.total_files = 0
        self.completed_files = 0
        self.failed_files = []
        self.successful_files = []
        self.total_movies_processed = 0
        self.total_movies_inserted = 0
        self.total_movies_updated = 0
        self.start_time = datetime.now()
        
    def add_success(self, filename, processed, inserted, updated):
        with self.lock:
            self.completed_files += 1
            self.successful_files.append(filename)
            self.total_movies_processed += processed
            self.total_movies_inserted += inserted
            self.total_movies_updated += updated
    
    def add_failure(self, filename, error):
        with self.lock:
            self.completed_files += 1
            self.failed_files.append({'file': filename, 'error': str(error)})
    
    def get_progress(self):
        with self.lock:
            elapsed_time = (datetime.now() - self.start_time).total_seconds()
            completion_rate = self.completed_files / self.total_files if self.total_files > 0 else 0
            eta_seconds = (elapsed_time / completion_rate * (1 - completion_rate)) if completion_rate > 0 else 0
            
            return {
                'completed': self.completed_files,
                'total': self.total_files,
                'successful': len(self.successful_files),
                'failed': len(self.failed_files),
                'movies_processed': self.total_movies_processed,
                'movies_inserted': self.total_movies_inserted,
                'movies_updated': self.total_movies_updated,
                'elapsed_minutes': elapsed_time / 60,
                'eta_minutes': eta_seconds / 60,
                'completion_percentage': completion_rate * 100
            }

# Inicializar estadísticas
stats = ProcessingStats()

In [None]:
def invoke_lambda_for_file(file_key, attempt=1, retry_attempts=3, delay_base=1, stats_obj=None, show_skipped=False):
    # Invoca Lambda para procesar un archivo específico con reintentos automáticos
    if stats_obj is None:
        stats_obj = stats
    try:
        # Preparar payload para Lambda
        payload = {
            'bucket_name': bucket_name,
            'file_key': file_key
        }
        
        print(f"[{attempt}/{retry_attempts}] Invocando Lambda para: {file_key}")
        
        # Invocar Lambda de forma síncrona
        response = lambda_client.invoke(
            FunctionName=lambda_function_name,
            InvocationType='RequestResponse',
            Payload=json.dumps(payload)
        )
        
        # Leer respuesta
        response_payload = json.loads(response['Payload'].read().decode('utf-8'))
        
        if response['StatusCode'] == 200 and response_payload.get('statusCode') == 200:
            # Procesar respuesta exitosa
            body = json.loads(response_payload['body'])
            processed = body.get('processed', 0)
            inserted = body.get('inserted', 0)
            updated = body.get('updated', 0)
            skipped = body.get('skipped', 0) if show_skipped else 0
            
            stats_obj.add_success(file_key, processed, inserted, updated)
            print(f"ÉXITO {file_key}: {processed} películas ({inserted} nuevas, {updated} actualizadas)")
            if show_skipped and skipped > 0:
                print(f"   Saltadas {skipped} películas sin título válido")
            return True
        else:
            # Error en la respuesta
            error_msg = response_payload.get('body', 'Error desconocido')
            raise Exception(f"Lambda error: {error_msg}")
    except Exception as e:
        if attempt < retry_attempts:
            delay = delay_base * (2 ** (attempt - 1))
            print(f"Error en intento {attempt}, esperando {delay}s... {str(e)[:100]}")
            time.sleep(delay)
            return invoke_lambda_for_file(file_key, attempt + 1, retry_attempts, delay_base, stats_obj, show_skipped)
        else:
            print(f"ERROR DEFINITIVO en {file_key}: {str(e)}")
            stats_obj.add_failure(file_key, e)
            return False

def process_file_wrapper(file_key):
    # Wrapper para el procesamiento en ThreadPoolExecutor
    return invoke_lambda_for_file(file_key)

In [None]:
def run_parallel_processing(file_list, stats_obj, max_workers=MAX_CONCURRENT_LAMBDAS, retry_attempts=3, delay_base=1, show_skipped=False):
    # Procesa en paralelo una lista de archivos usando invoke_lambda_for_file y ThreadPoolExecutor.
    stop_event = threading.Event()
    monitor_thread = threading.Thread(target=monitor_progress_generic, args=(stop_event, stats_obj))
    monitor_thread.daemon = True
    monitor_thread.start()

    stats_obj.start_time = datetime.now()
    start_time = datetime.now()

    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_file = {
                executor.submit(invoke_lambda_for_file, file_key, 1, retry_attempts, delay_base, stats_obj, show_skipped): file_key
                for file_key in file_list
            }
            for future in concurrent.futures.as_completed(future_to_file):
                file_key = future_to_file[future]
                try:
                    result = future.result()
                except Exception as e:
                    print(f"Error inesperado en {file_key}: {str(e)}")
                    stats_obj.add_failure(file_key, e)

    except KeyboardInterrupt:
        print("\nProcesamiento interrumpido por el usuario")
        stop_event.set()

    except Exception as e:
        print(f"\nError crítico en el procesamiento: {str(e)}")
        stop_event.set()
        raise e

    finally:
        stop_event.set()
        if monitor_thread.is_alive():
            monitor_thread.join(timeout=2)
        end_time = datetime.now()
        total_duration = (end_time - start_time).total_seconds()
        return total_duration

def monitor_progress_generic(stop_event, stats_obj):
    # Funcion para monitoreo
    while not stop_event.is_set():
        progress = stats_obj.get_progress()
        if progress['total'] > 0:
            print(f"\nPROGRESO:")
            print(f" Archivos: {progress['completed']}/{progress['total']} ({progress['completion_percentage']:.1f}%)")
            print(f" Exitosos: {progress['successful']} | Fallidos: {progress['failed']}")
            print(f" Películas: {progress['movies_processed']} ({progress['movies_inserted']} nuevas, {progress['movies_updated']} actualizadas)")
            print(f" Tiempo: {progress['elapsed_minutes']:.1f} min | ETA: {progress['eta_minutes']:.1f} min")
            print("-" * 50)
        stop_event.wait(30)

In [None]:
# Obtener lista de archivos del bucket
print("Listando archivos del bucket S3...")

try:
    response = s3.list_objects_v2(Bucket=bucket_name)
    all_keys = [obj['Key'] for obj in response.get('Contents', [])]
    
    # Extensiones y archivos específicos a ignorar
    excluidos_ext = ['.txt']
    excluidos_files = ['genres.json']
    
    # Filtrar archivos: excluir extensiones no deseadas Y archivos específicos
    keys = [key for key in all_keys 
            if not any(key.endswith(ext) for ext in excluidos_ext)
            and not any(key.lower().endswith(filename.lower()) for filename in excluidos_files)]
    
    # Configurar estadísticas
    stats.total_files = len(keys)
    
    print(f"RESUMEN DE ARCHIVOS:")
    print(f"Total archivos en bucket: {len(all_keys)}")
    print(f"Archivos JSON a procesar: {len(keys)}")
    print(f"Archivos excluidos: {len(all_keys) - len(keys)}")
    
    # Mostrar archivos excluidos si los hay
    excluidos_encontrados = [key for key in all_keys if key not in keys]
    if excluidos_encontrados:
        print(f"Archivos excluidos: {excluidos_encontrados}")
    
    print(f"\nEstimación de películas totales: {len(keys) * 8500:,} películas")
    
except Exception as e:
    print(f"Error listando archivos: {str(e)}")
    raise e

In [None]:
# EJECUTAR PROCESAMIENTO PARALELO
print("INICIANDO PROCESAMIENTO PARALELO CON LAMBDA")

# Procesar todos los archivos en paralelo
total_duration = run_parallel_processing(keys, stats, max_workers=MAX_CONCURRENT_LAMBDAS, retry_attempts=RETRY_ATTEMPTS, delay_base=1)

print("PROCESAMIENTO PARALELO COMPLETADO")

In [None]:
# ESTADÍSTICAS FINALES
print(f"TIEMPO TOTAL: {int(total_duration // 60)}m {int(total_duration % 60)}s")
print(f"ARCHIVOS PROCESADOS: {stats.completed_files}")
print(f"ÉXITOS: {len(stats.successful_files)}")
print(f"FALLOS: {len(stats.failed_files)}")

if len(stats.failed_files) > 0:
    print(f"\nARCHIVOS CON ERRORES:")
    for failed_info in stats.failed_files:
        file_key = failed_info['file']
        error = str(failed_info['error'])
        print(f"{file_key}: {error[:100]}...")

# Calcular estadísticas de rendimiento
files_per_minute = (stats.completed_files / total_duration) * 60 if total_duration > 0 else 0
estimated_movies = len(stats.successful_files) * 8500  # Promedio estimado por archivo

print(f"\nRENDIMIENTO:")
print(f"Velocidad: {files_per_minute:.1f} archivos/minuto")
print(f"Películas procesadas: {stats.total_movies_processed:,}")
print(f"Películas insertadas: {stats.total_movies_inserted:,}")
print(f"Películas actualizadas: {stats.total_movies_updated:,}")

# Reprocesamiento de Archivos Fallidos

Procesamiento para los archivos que fallaron en la ejecución anterior.

**Archivos identificados:**
- 4 por problemas de DB (necesitan correcciones de campo)

**Antes de ejecutar:** Aplicar las correcciones de base de datos ejecutando `fix-database-fields.sql`

In [None]:
# Lista de archivos fallidos identificados del procesamiento anterior
failed_files_list = [
    'movies1130000.json',
    'movies1140000.json', 
    'movies1160000.json',
    'movies1430000.json',
]

In [None]:
# Crear nueva instancia de estadísticas para el reprocesamiento
retry_stats = ProcessingStats()
retry_stats.total_files = len(failed_files_list)

In [None]:
# EJECUTAR REPROCESAMIENTO PARALELO DE ARCHIVOS FALLIDOS
print("INICIANDO REPROCESAMIENTO PARALELO DE ARCHIVOS FALLIDOS")

# Volvemos a usar la función invoke_lambda_for_file para el reprocesamiento.
retry_total_duration = run_parallel_processing(failed_files_list, retry_stats, max_workers=MAX_CONCURRENT_LAMBDAS, retry_attempts=5, delay_base=3)

print("REPROCESAMIENTO DE FALLIDOS COMPLETADO")

In [None]:
# ESTADÍSTICAS FINALES DEL REPROCESAMIENTO
print(f"TIEMPO TOTAL REPROCESAMIENTO: {int(retry_total_duration // 60)}m {int(retry_total_duration % 60)}s")
print(f"ARCHIVOS REPROCESADOS: {retry_stats.completed_files}")
print(f"ÉXITOS: {len(retry_stats.successful_files)}")
print(f"FALLOS: {len(retry_stats.failed_files)}")

if len(retry_stats.failed_files) > 0:
    print(f"\nARCHIVOS AÚN CON ERRORES:")
    for failed_info in retry_stats.failed_files:
        file_key = failed_info['file']
        error = str(failed_info['error'])
        print(f"{file_key}: {error[:100]}...")

# Calcular estadísticas de rendimiento del reprocesamiento
retry_files_per_minute = (retry_stats.completed_files / retry_total_duration) * 60 if retry_total_duration > 0 else 0

print(f"\nRENDIMIENTO REPROCESAMIENTO:")
print(f"Velocidad: {retry_files_per_minute:.1f} archivos/minuto")
print(f"Películas procesadas: {retry_stats.total_movies_processed:,}")
print(f"Películas insertadas: {retry_stats.total_movies_inserted:,}")
print(f"Películas actualizadas: {retry_stats.total_movies_updated:,}")

retry_success_rate = (len(retry_stats.successful_files) / retry_stats.completed_files * 100) if retry_stats.completed_files > 0 else 0
print(f"Tasa de éxito: {retry_success_rate:.1f}%")
