In [1]:
# Parâmetros do pipeline
bucket_name = "experimento-lucas-barbosa"  # Nome do bucket S3
pipeline_name = "ExperimentoPipeline"  # Nome do pipeline
instance_type = "ml.m5.large"  # Tipo de instância para processamento
wait_for_completion = True  # Se deve aguardar conclusão ou executar assíncrono
# Notebooks a serem executados
config_notebook = "00_config.ipynb"
validacao_notebook = "01_validar_dados.ipynb"
metricas_notebook = "02_calcular_metricas.ipynb"
salvar_notebook = "03_salvar_resultados.ipynb"

In [2]:
import boto3
import sagemaker
from sagemaker.session import Session
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.notebook_job_step import NotebookJobStep
from sagemaker.processing import ScriptProcessor
from datetime import datetime
import time

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
# Configuração inicial
sess = Session()
role = get_execution_role()
region = sess.boto_region_name
bucket = bucket_name

print(f"🔧 Configuração do ambiente:")
print(f"  Role: {role}")
print(f"  Region: {region}")
print(f"  Bucket: {bucket}")
print(f"  Pipeline: {pipeline_name}")
print(f"  Instance type: {instance_type}")

🔧 Configuração do ambiente:
  Role: arn:aws:iam::657444906686:role/service-role/AmazonSageMaker-ExecutionRole-20250708T150740
  Region: sa-east-1
  Bucket: experimento-lucas-barbosa
  Pipeline: ExperimentoPipeline
  Instance type: ml.m5.large


In [4]:
# Para sa-east-1 - SageMaker Distribution
image_uri = "567556641782.dkr.ecr.sa-east-1.amazonaws.com/sagemaker-distribution-prod:1-cpu"

In [5]:
# Etapa 1: Configuração
# Gera o arquivo config.json com as configurações do experimento
step_config = NotebookJobStep(
    name="ConfigStep",
    notebook_job_name=f"config-job-{int(time.time())}",
    input_notebook=config_notebook,
    image_uri=image_uri,
    role=role,
    instance_type=instance_type,
    kernel_name="python3",
    parameters={
        "BUCKET_NAME": bucket_name,
        "PASTA_RAW": "raw/",
        "PASTA_PROCESSED": "processed/",
        "PASTA_ARCHIVE": "archive/"
    }
)

print("✓ Etapa 1 (Config) definida")
print(f"  Notebook: {config_notebook}")
print(f"  Parâmetros: bucket={bucket_name}")

✓ Etapa 1 (Config) definida
  Notebook: 00_config.ipynb
  Parâmetros: bucket=experimento-lucas-barbosa


In [6]:
# Etapa 2: Validação de dados
# Valida arquivos CSV na pasta raw/ e prepara para processamento
step_validacao = NotebookJobStep(
    name="ValidacaoStep",
    notebook_job_name=f"validacao-job-{int(time.time())}",
    input_notebook=validacao_notebook,
    image_uri=image_uri,
    role=role,
    instance_type=instance_type,
    kernel_name="python3",
    parameters={
        "input_config_path": "config.json",
        "bucket_name": bucket_name,
        "pasta_raw": "raw/",
        "output_validation_file": "validacao_resultado.json"
    }
)

print("✓ Etapa 2 (Validação) definida")
print(f"  Notebook: {validacao_notebook}")
print(f"  Depende de: ConfigStep")

✓ Etapa 2 (Validação) definida
  Notebook: 01_validar_dados.ipynb
  Depende de: ConfigStep


In [7]:
# Etapa 3: Cálculo de métricas
# Calcula métricas básicas e lift para cada grupo do experimento
step_metricas = NotebookJobStep(
    name="MetricasStep",
    notebook_job_name=f"metricas-job-{int(time.time())}",
    input_notebook=metricas_notebook,
    image_uri=image_uri,
    role=role,
    instance_type=instance_type,
    kernel_name="python3",
    parameters={
        "input_config_path": "config.json",
        "input_validacao_path": "validacao_resultado.json",
        "output_metricas_path": "metricas_completas.csv"
    }
)

print("✓ Etapa 3 (Métricas) definida")
print(f"  Notebook: {metricas_notebook}")
print(f"  Depende de: ValidacaoStep")

✓ Etapa 3 (Métricas) definida
  Notebook: 02_calcular_metricas.ipynb
  Depende de: ValidacaoStep


In [8]:
# Etapa 4: Salvar resultados
# Salva métricas finais no S3 e gera logs de execução
step_salvar = NotebookJobStep(
    name="SalvarStep",
    notebook_job_name=f"salvar-job-{int(time.time())}",
    input_notebook=salvar_notebook,
    image_uri=image_uri,
    role=role,
    instance_type=instance_type,
    kernel_name="python3",
    parameters={
        "input_config_path": "config.json",
        "input_metricas_path": "metricas_completas.csv",
        "bucket_name": bucket_name,
        "pasta_processed": "processed/",
        "pasta_logs": "logs/"
    }
)

print("✓ Etapa 4 (Salvar) definida")
print(f"  Notebook: {salvar_notebook}")
print(f"  Depende de: MetricasStep")

✓ Etapa 4 (Salvar) definida
  Notebook: 03_salvar_resultados.ipynb
  Depende de: MetricasStep


In [9]:
# Definir dependências entre etapas
step_validacao.add_depends_on([step_config])
step_metricas.add_depends_on([step_validacao])
step_salvar.add_depends_on([step_metricas])

print("✓ Dependências configuradas:")
print("  ConfigStep → ValidacaoStep → MetricasStep → SalvarStep")

✓ Dependências configuradas:
  ConfigStep → ValidacaoStep → MetricasStep → SalvarStep


In [10]:
# Criar o pipeline
pipeline = Pipeline(
    name=pipeline_name,
    steps=[
        step_config,
        step_validacao,
        step_metricas,
        step_salvar
    ],
    sagemaker_session=sess
)

print("Pipeline criado com sucesso")
print(f"  Nome: {pipeline.name}")
print(f"  Etapas: {len(pipeline.steps)}")
print(f"  Sessão: {sess.boto_region_name}")

# Mostrar resumo das etapas
print("\n Resumo das etapas:")
for i, step in enumerate(pipeline.steps, 1):
    print(f"  {i}. {step.name}")

Pipeline criado com sucesso
  Nome: ExperimentoPipeline
  Etapas: 4
  Sessão: sa-east-1

 Resumo das etapas:
  1. ConfigStep
  2. ValidacaoStep
  3. MetricasStep
  4. SalvarStep


In [11]:
# Registrar o pipeline no SageMaker
print("🚀 Registrando pipeline no SageMaker...")
pipeline.upsert(role_arn=role)
print("✓ Pipeline registrado no SageMaker")

# Iniciar execução manual
print("🚀 Iniciando execução manual do pipeline...")
execution = pipeline.start()
print(f"✓ Execução iniciada: {execution.arn}")

# Aguardar conclusão se solicitado
if wait_for_completion:
    print("⏳ Aguardando conclusão do pipeline...")
    print("   (Isso pode levar alguns minutos)")
    
    # Monitorar progresso
    start_time = time.time()
    while True:
        status = execution.describe()['PipelineExecutionStatus']
        elapsed = int(time.time() - start_time)
        print(f"   Status: {status} (tempo: {elapsed}s)")
        
        if status in ['Succeeded', 'Failed', 'Stopped']:
            break
            
        time.sleep(30)  # Aguardar 30 segundos antes de verificar novamente
    
    final_status = execution.describe()['PipelineExecutionStatus']
    if final_status == 'Succeeded':
        print(f"✅ Pipeline concluído com sucesso!")
    else:
        print(f"❌ Pipeline falhou com status: {final_status}")
        
else:
    print("🔄 Pipeline executando em modo assíncrono")
    print(f"   Acompanhe o progresso no SageMaker Studio")
    print(f"   ARN: {execution.arn}")

🚀 Registrando pipeline no SageMaker...
✓ Pipeline registrado no SageMaker
🚀 Iniciando execução manual do pipeline...
✓ Execução iniciada: arn:aws:sagemaker:sa-east-1:657444906686:pipeline/ExperimentoPipeline/execution/sgrjvz6w2dqi
⏳ Aguardando conclusão do pipeline...
   (Isso pode levar alguns minutos)
   Status: Executing (tempo: 0s)
   Status: Executing (tempo: 30s)
   Status: Executing (tempo: 60s)
   Status: Executing (tempo: 90s)
   Status: Executing (tempo: 120s)
   Status: Executing (tempo: 150s)
   Status: Executing (tempo: 180s)
   Status: Executing (tempo: 210s)
   Status: Executing (tempo: 240s)
   Status: Executing (tempo: 270s)
   Status: Executing (tempo: 301s)
   Status: Executing (tempo: 331s)
   Status: Executing (tempo: 361s)
   Status: Executing (tempo: 391s)
   Status: Executing (tempo: 421s)
   Status: Executing (tempo: 451s)
   Status: Executing (tempo: 481s)
   Status: Executing (tempo: 511s)
   Status: Executing (tempo: 541s)
   Status: Executing (tempo: 571s)


In [15]:
# Configurar agendamento automático a cada 12 horas - VERSÃO SIMPLES
print("Configurando agendamento automático...")

# Usar EventBridge Rules (mais simples que Scheduler)
events_client = boto3.client('events', region_name=region)

# Nome da regra
rule_name = f"{pipeline_name}-schedule-12h"

# Criar regra com cron
events_client.put_rule(
    Name=rule_name,
    ScheduleExpression="cron(0 0,12 * * ? *)",  # A cada 12 horas
    Description="Executa pipeline ExperimentoPipeline a cada 12 horas",
    State='ENABLED'
)

# Adicionar target (SageMaker Pipeline)
events_client.put_targets(
    Rule=rule_name,
    Targets=[
        {
            'Id': '1',
            'Arn': f'arn:aws:sagemaker:{region}:{account_id}:pipeline/{pipeline_name}',
            'RoleArn': role,
            'SageMakerPipelineParameters': {
                'PipelineParameterList': []
            }
        }
    ]
)

Configurando agendamento automático...


{'FailedEntryCount': 0,
 'FailedEntries': [],
 'ResponseMetadata': {'RequestId': '8884fa74-6174-45c1-8535-9396b9a70dfe',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '8884fa74-6174-45c1-8535-9396b9a70dfe',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '41',
   'date': 'Mon, 21 Jul 2025 19:31:36 GMT'},
  'RetryAttempts': 0}}