In [4]:
# --------------------------------------------
# 0. Importar librerías necesarias
# --------------------------------------------
from sagemaker.workflow.parameters import ParameterString
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker import get_execution_role
# --------------------------------------------
# 1. Configuración de entorno SageMaker
# --------------------------------------------
role = get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1",
    instance_type="ml.t3.medium",  # Cambia si tienes otro disponible
    instance_count=1,
    base_job_name="verificacion-insumos",
    role=role,
    sagemaker_session=pipeline_session
)

# --------------------------------------------
# 2. Definición de parámetros de entrada desde S3
# --------------------------------------------
base_data_path = "s3://itam-analytics-danielmichell/coco/raw"

parametros_s3_uri = ParameterString(
    name="ParametrosInput",
    default_value=f"{base_data_path}/solicitudes/Parametros/parametros.xlsx"
)
experiencia_global_s3_uri = ParameterString(
    name="ExperienciaGlobalInput",
    default_value=f"{base_data_path}/experiencia_global.xlsx"
)
emisiones_s3_uri = ParameterString(
    name="EmisionesInput",
    default_value=f"{base_data_path}/emisiones.xlsx"
)
cotizaciones_s3_uri = ParameterString(
    name="CotizacionesInput",
    default_value=f"{base_data_path}/cotizaciones.xlsx"
)
solicitudes_s3_uri = ParameterString(
    name="SolicitudesInput",
    default_value=f"{base_data_path}/solicitudes/Base_Datos/"
)
fecha_proceso = ParameterString(
    name="FechaProceso",
    default_value="2025-05-27"
)
# --------------------------------------------
# 3. Verificar existencia de objetos en S3 (boto3)
# --------------------------------------------
verificar_paths_step = ProcessingStep(
    name="VerificarPathsStep",
    processor=sklearn_processor,
    inputs=[],  # No necesita montajes
    code="code/verificar_paths_s3.py"
)

# --------------------------------------------
# 4. Leer archivos montados y validar contenido
# --------------------------------------------
inputs = [
    ProcessingInput(source=parametros_s3_uri, destination="/opt/ml/processing/parametros"),
    ProcessingInput(source=experiencia_global_s3_uri, destination="/opt/ml/processing/experiencia"),
    ProcessingInput(source=emisiones_s3_uri, destination="/opt/ml/processing/emisiones"),
    ProcessingInput(source=cotizaciones_s3_uri, destination="/opt/ml/processing/cotizaciones"),
    ProcessingInput(source=solicitudes_s3_uri, destination="/opt/ml/processing/solicitudes"),
]

leer_archivos_step = ProcessingStep(
    name="LeerArchivosStep",
    processor=sklearn_processor,
    inputs=inputs,
    code="code/read_and_clean_data.py"
)


cleaning_step = ProcessingStep(
    name="CleaningDataStep",
    processor=sklearn_processor,
    inputs=inputs,
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination="s3://itam-analytics-danielmichell/coco/processing/"
        )
    ],
    code="code/clean_and_save.py"
)


primas_inputs = [
    ProcessingInput(
        source="s3://itam-analytics-danielmichell/coco/processing/",
        destination="/opt/ml/processing/input"
    )
]


calculo_primas_step = ProcessingStep(
    name="CalculoPrimasStep",
    processor=sklearn_processor,
    inputs=primas_inputs,
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination="s3://itam-analytics-danielmichell/coco/results/"
        ),
        ProcessingOutput(
            source="/opt/ml/processing/output/json",
            destination="s3://itam-analytics-danielmichell/coco/results/json/"
        ),
        ProcessingOutput(
            source="/opt/ml/processing/output/memory",
            destination="s3://itam-analytics-danielmichell/coco/results/memory/"
        ),
        ProcessingOutput(
            source="/opt/ml/processing/output/master",
            destination="s3://itam-analytics-danielmichell/coco/master/"
        )
    ],
    code="code/calculo_primas.py",
    job_arguments=["--fecha_proceso", fecha_proceso]
)

# --------------------------------------------
# 5. Construcción y ejecución del pipeline
# --------------------------------------------

pipeline = Pipeline(
    name="PipelineSeguros",
    parameters=[
        parametros_s3_uri,
        experiencia_global_s3_uri,
        emisiones_s3_uri,
        cotizaciones_s3_uri,
        solicitudes_s3_uri,
        fecha_proceso
    ],
    steps=[
        verificar_paths_step,
        leer_archivos_step,
        cleaning_step,
        calculo_primas_step
    ],
    sagemaker_session=pipeline_session
)


# Registrar y lanzar ejecución del pipeline
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()
