# Imports

In [51]:
import os, json, zipfile, time, boto3, textwrap
import sagemaker
import boto3
from sagemaker.session import Session
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput
from sagemaker.estimator import Estimator
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model_metrics import ModelMetrics  # opcional aqui
from sagemaker import image_uris

from sagemaker.workflow.lambda_step import (
    Lambda, LambdaStep, LambdaOutput, LambdaOutputTypeEnum
)


# Initialize

In [52]:

# Sessões
boto_sess = boto3.Session()
region = boto_sess.region_name or "us-east-2"
sm_sess = Session(boto_session=boto_sess)
pipe_sess = PipelineSession(boto_session=boto_sess)

# Descobre / define a Role
try:
    role = sagemaker.get_execution_role()
except Exception:
    role = os.environ.get("SAGEMAKER_EXECUTION_ROLE")
    assert role, "Defina SAGEMAKER_EXECUTION_ROLE com a role ARN do SageMaker."

# Bucket e prefixo
bucket = sm_sess.default_bucket()
prefix = "xgb-pipeline-do-mark"

print("region:", region)
print("bucket:", bucket)
print("role:", role)

region: us-east-2
bucket: sagemaker-us-east-2-650961196544
role: arn:aws:iam::650961196544:role/service-role/AmazonSageMaker-ExecutionRole-20250818T220926


# Pipeline Params

In [53]:
param_train_s3 = ParameterString("TrainS3Uri", default_value=f"s3://{bucket}/{prefix}/train/")        # ajuste
param_val_s3   = ParameterString("ValidationS3Uri", default_value=f"s3://{bucket}/{prefix}/validation/") # ajuste

param_model_pkg_group = ParameterString("ModelPackageGroupName", default_value="XGBDemoPkgGroup")
param_endpoint_name   = ParameterString("EndpointName",           default_value="xgb-pipeline-endpoint")
param_instance_type   = ParameterString("InstanceType",           default_value="ml.m5.large")
param_initial_count   = ParameterInteger("InitialInstanceCount",  default_value=1)

# Training

In [54]:
xgb_image = image_uris.retrieve(framework="xgboost", region=region, version="1.5-1")
xgb = Estimator(
    image_uri=xgb_image,
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=f"s3://{bucket}/{prefix}/training-output",
    sagemaker_session=pipe_sess,
)

xgb.set_hyperparameters(
    objective="binary:logistic",
    eval_metric="auc",
    num_round=100,
    max_depth=5,
    eta=0.2,
    subsample=0.8
)

train_step = TrainingStep(
    name="XGBoostTrain",
    estimator=xgb,
    inputs={
        "train": TrainingInput(
            s3_data=param_train_s3,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=param_val_s3,
            content_type="text/csv"
        )
    }
)


INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


# Register Model

In [55]:
register_step = RegisterModel(
    name="MarksRegisterModel",
    estimator=xgb,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=param_model_pkg_group,
    approval_status="Approved"  # pode fixar se preferir; aqui deixamos padrão (PendingManualApproval)
)

# LambdaStep

In [56]:
lambda_client = boto3.client("lambda", region_name=region)
lambda_role_arn = "arn:aws:iam::650961196544:role/role-do-mark-lambdastep-sagemaker"
assert lambda_role_arn, "Defina LAMBDA_ROLE_ARN com a role ARN do Lambda (com permissões de SageMaker)."

pipeline_name = "mark-xgboost-pipeline-demo"
lambda_fn_name = f"{pipeline_name}-deploy-fn"

lambda_code = textwrap.dedent(f"""
import boto3, os, time, json, botocore
sm = boto3.client("sagemaker")

def handler(event, context):
    endpoint_name = event["EndpointName"]
    model_package_arn = event["ModelPackageArn"]
    instance_type = event.get("InstanceType", "ml.m5.large")
    initial_instance_count = int(event.get("InitialInstanceCount", 1))
    role_arn = os.environ["SAGEMAKER_ROLE"]

    model_name = f"{{endpoint_name}}-model-{{int(time.time())}}"
    sm.create_model(
        ModelName=model_name,
        PrimaryContainer={{"ModelPackageName": model_package_arn}},
        ExecutionRoleArn=role_arn
    )

    endpoint_config_name = f"{{endpoint_name}}-cfg-{{int(time.time())}}"
    sm.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[{{
            "ModelName": model_name,
            "InstanceType": instance_type,
            "InitialInstanceCount": initial_instance_count,
            "VariantName": "AllTraffic"
        }}]
    )

    try:
        sm.describe_endpoint(EndpointName=endpoint_name)
        sm.update_endpoint(EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)
        action = "UPDATED"
    except botocore.exceptions.ClientError as e:
        if e.response.get("Error", {{}}).get("Code") in ["ValidationException", "ResourceNotFound"]:
            sm.create_endpoint(EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)
            action = "CREATED"
        else:
            raise

    return {{"EndpointName": endpoint_name, "Action": action, "EndpointConfigName": endpoint_config_name}}
""")

# empacota
zip_path = "lambda_deploy.zip"
with open("lambda_function.py", "w") as f:
    f.write(lambda_code)
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
    zf.write("lambda_function.py", arcname="lambda_function.py")

# cria ou atualiza
fn_arn = None
try:
    resp = lambda_client.create_function(
        FunctionName=lambda_fn_name,
        Runtime="python3.11",
        Role=lambda_role_arn,
        Handler="lambda_function.handler",
        Code={"ZipFile": open(zip_path, "rb").read()},
        Timeout=900,
        MemorySize=512,
        Environment={"Variables": {"SAGEMAKER_ROLE": role}},
        Publish=True,
    )
    fn_arn = resp["FunctionArn"]
except lambda_client.exceptions.ResourceConflictException:
    lambda_client.update_function_code(
        FunctionName=lambda_fn_name,
        ZipFile=open(zip_path, "rb").read(),
        Publish=True,
    )
    # garante/atualiza env
    lambda_client.update_function_configuration(
        FunctionName=lambda_fn_name,
        Environment={"Variables": {"SAGEMAKER_ROLE": role}},
        Timeout=900,
        MemorySize=512,
        Role=lambda_role_arn,
        Runtime="python3.11",
        Handler="lambda_function.handler",
    )
    fn_arn = lambda_client.get_function(FunctionName=lambda_fn_name)["Configuration"]["FunctionArn"]

print("Lambda ARN:", fn_arn)


Lambda ARN: arn:aws:lambda:us-east-2:650961196544:function:mark-xgboost-pipeline-demo-deploy-fn


# Connect Lambda function to LambdaStep

In [57]:
# %% 
deploy_lambda = Lambda(function_arn=fn_arn, session=pipe_sess)

deploy_step = LambdaStep(
    name="DeployToEndpoint",
    lambda_func=deploy_lambda,
    inputs={
        "ModelPackageArn": register_step.properties.ModelPackageArn,
        "EndpointName": param_endpoint_name,
        "InstanceType": param_instance_type,
        "InitialInstanceCount": param_initial_count,
    },
    outputs=[
        LambdaOutput(output_name="EndpointName", output_type=LambdaOutputTypeEnum.String),
        LambdaOutput(output_name="Action",       output_type=LambdaOutputTypeEnum.String),
    ],
)


# Upsert Pipeline

In [58]:
# %% 
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        param_train_s3, param_val_s3,
        param_model_pkg_group,
        param_endpoint_name, param_instance_type, param_initial_count
    ],
    steps=[train_step, register_step, deploy_step],
    sagemaker_session=pipe_sess,
)

# cria/atualiza a definição no SageMaker
pipeline.upsert(role_arn=role)




{'PipelineArn': 'arn:aws:sagemaker:us-east-2:650961196544:pipeline/mark-xgboost-pipeline-demo',
 'ResponseMetadata': {'RequestId': '3e44a7e5-03c8-441d-9dab-1a53563b9c5b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '3e44a7e5-03c8-441d-9dab-1a53563b9c5b',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '94',
   'date': 'Sun, 07 Sep 2025 18:11:25 GMT'},
  'RetryAttempts': 0}}

# Start Pipeline

In [59]:
# %% 
execution = pipeline.start(
    parameters={
        "TrainS3Uri": param_train_s3.default_value,          # ajuste se quiser
        "ValidationS3Uri": param_val_s3.default_value,        # ajuste se quiser
        "ModelPackageGroupName": "XGBDemoPkgGroup",
        "EndpointName": "xgb-pipeline-endpoint",
        "InstanceType": "ml.m5.large",
        "InitialInstanceCount": 1,
    }
)

print("Execution ARN:", execution.arn)
# Opcional: bloquear o notebook até terminar
# execution.wait()


Execution ARN: arn:aws:sagemaker:us-east-2:650961196544:pipeline/mark-xgboost-pipeline-demo/execution/9mkozc4zv1b1


# Check Endpoint Status

In [None]:
# %% 
sm = boto3.client("sagemaker", region_name=region)
desc = sm.describe_endpoint(EndpointName="xgb-pipeline-endpoint")
print(desc["EndpointStatus"])
