In [2]:
!pip install --upgrade "sagemaker>=2"

[0m

In [3]:
import sagemaker
from sagemaker.inputs import TrainingInput
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.functions import JsonGet
import boto3
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.amazon.amazon_estimator import get_image_uri

# Initialize SageMaker session and role
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
bucket = sagemaker_session.default_bucket()
# Initialize SageMaker client
sagemaker_client = boto3.client('sagemaker')
s3 = boto3.resource("s3")
pipeline_session = PipelineSession()
prefix = 'olist_review_score'
cicd = 'CICD'

# Define pipeline parameters
train_data_param = ParameterString(name='TrainData', default_value=f's3://{bucket}/{prefix}/data/train/train_data.csv')
validation_data_param = ParameterString(name='ValidationData', default_value=f's3://{bucket}/{prefix}/data/validation/validation_data.csv')
test_data_param = ParameterString(name='TestData', default_value=f's3://{bucket}/{prefix}/data/batch/batch_data.csv')
model_output_param = ParameterString(name='ModelOutput', default_value=f's3://{bucket}/{prefix}/models/')

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


## Model Training Step ##

In [4]:
xgb_image = sagemaker.image_uris.retrieve(
    framework="xgboost", region=boto3.Session().region_name, version="1.7-1", py_version="py3",
)


model_path = f's3://{bucket}/{prefix}/models/'

eval_metrics = 'auc,aucpr,f1,precision,recall'

xgb_estimator = sagemaker.estimator.Estimator(
    xgb_image,
    role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size=50,
    output_path=model_path,
    sagemaker_session=pipeline_session,
)

xgb_estimator.set_hyperparameters(
    objective= 'binary:logistic',  # Binary classification
    max_depth= 5,  # Maximum depth of each tree
    eta= 0.2,  # Learning rate
    gamma= 4,  # Minimum loss reduction required to make a further partition on a leaf node
    subsample= 0.8,  # Subsample ratio of the training instances
    colsample_bytree= 0.8,  # Subsample ratio of columns when constructing each tree
    eval_metric= eval_metrics,  # Evaluation metric
    alpha= 0.1,  # L1 regularization term
    num_round=100
)

train_args = xgb_estimator.fit(
    inputs= {
        'train': TrainingInput(train_data_param, content_type='text/csv'),
        'validation': TrainingInput(validation_data_param, content_type='text/csv')
    }
)

# Define the training step
train_step = TrainingStep(
    name='TrainXGBModel',
    step_args=train_args,
    
)

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


## Model Evaluation Step ##

In [5]:
!mkdir -p code

In [6]:
%%writefile code/evaluation.py

import argparse
import os
import json
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, roc_curve, precision_recall_curve, auc
import joblib
import pickle
import tarfile
import logging

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model-dir', type=str, default='/opt/ml/processing/model')
    parser.add_argument('--test-data', type=str, default='/opt/ml/processing/input/validation/validation_data.csv')
    parser.add_argument('--output-dir', type=str, default='/opt/ml/processing/output')
    args = parser.parse_args()
    return args

def main():
    args = parse_args()
    
    # Set up logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger()
    
    logger.info('Loading test data from: %s', args.test_data)
    
    # Load test data with no headers
    df = pd.read_csv(args.test_data, header=None)
    
    # Separate features and labels
    y_test = df.iloc[:, 0]  # First column is the label
    X_test = df.iloc[:, 1:]  # Rest of the columns are features
    
    logger.info('Test data loaded. Number of samples: %d', len(df))
    
    # Load the model
    model_tar_path = os.path.join(args.model_dir, 'model.tar.gz')
    
    logger.info('Extracting model from: %s', model_tar_path)
    
    # Extract the model tar file
    with tarfile.open(model_tar_path) as tar:
        tar.extractall('.')
    
    model = pickle.load(open("xgboost-model", "rb"))
   #  model = joblib.load("model.pkl")
    
    # Make predictions
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]
    
    logger.info(f'Predictions: {y_pred}') 
    
    # Calculate metrics
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    roc_auc = roc_auc_score(y_test, y_prob)
    
    # Calculate ROC and Precision-Recall curves
    fpr, tpr, _ = roc_curve(y_test, y_prob)
    precision_curve, recall_curve, _ = precision_recall_curve(y_test, y_prob)
    pr_auc = auc(recall_curve, precision_curve)
    
    # Save the evaluation result
    evaluation_result = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'roc_auc': roc_auc,
        'pr_auc': pr_auc,
        'roc_curve': {'fpr': fpr.tolist(), 'tpr': tpr.tolist()},
        'precision_recall_curve': {'precision': precision_curve.tolist(), 'recall': recall_curve.tolist()}
    }
    
    output_path = os.path.join(args.output_dir, 'evaluation.json')
    with open(output_path, 'w') as f:
        json.dump(evaluation_result, f)

if __name__ == '__main__':
    main()



Overwriting code/evaluation.py


In [7]:
# model_tar_param = f'{model_output_param}/model.tar.gz'

# Define the ScriptProcessor for evaluation
eval_processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve('sklearn', region, version='0.23-1'),
    command=['python3'],
    instance_type='ml.m5.xlarge',
    instance_count=1,
    base_job_name='xgb-val',
    role=role,
    sagemaker_session=pipeline_session
)

# PropertyFile to retrieve the evaluation result
evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='evaluation',
    path='evaluation.json'
)

# Define the evaluation step
eval_step = ProcessingStep(
    name='EvaluateXGBModel',
    processor=eval_processor,
    inputs=[
        ProcessingInput(source=train_step.properties.ModelArtifacts.S3ModelArtifacts, destination='/opt/ml/processing/model/'),
        ProcessingInput(source=validation_data_param, destination='/opt/ml/processing/input/validation/')
    ],
    outputs=[
        ProcessingOutput(output_name='evaluation', source='/opt/ml/processing/output'),
    ],
    property_files=[evaluation_report],
    code='code/evaluation.py',
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


## Define a Create Model Step ##

In [8]:
from sagemaker.model import Model

model = Model(
    image_uri=xgb_image,
    model_data=model_output_param,
    sagemaker_session=pipeline_session,
    role=role,
)

In [9]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="ReviewScoreCreateModel",
    step_args=model.create(instance_type="ml.m5.xlarge", accelerator_type="ml.eia1.medium"),
)

## Define a Create Batch Transform Step ##

In [10]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{bucket}/{prefix}/ReviewScoreTransform",
)

In [11]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

step_transform = TransformStep(
    name="ReviewScoreTransform", transformer=transformer, inputs=TransformInput(data=test_data_param)
)

## Define a Model Register Step ##

In [12]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_package_group_name = f"ReviewScoreModelPackageGroupName"
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            eval_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ReviewScoreRegisterModel", step_args=register_args)



## Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed ##

In [13]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="ReviewScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to'f1_score >", 0.80]),
)

## Condition Step for Evaluation Metrics ##

In [14]:
# Define the condition step
cond_step = ConditionStep(
    name='CheckEvaluationMetrics',
    conditions=[
        ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=eval_step.name,
                property_file=evaluation_report,
                json_path='accuracy'
            ),
            right=0.85  # Set your accuracy threshold
        ),
        ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=eval_step.name,
                property_file=evaluation_report,
                json_path='f1_score'
            ),
            right=0.80  # Set your F1-score threshold
        ),
        # Add more conditions for other metrics if needed
    ],
    if_steps=[
        # Add steps to register the model if the condition is met
    ],
    else_steps=[]
)

## Define Pipeline ##

In [15]:
## Define the pipeline
pipeline = Pipeline(
    name='XGBModelPipeline',
    parameters=[
        train_data_param,
        validation_data_param,
        test_data_param,
        model_output_param
    ],
    steps=[train_step, eval_step], # model_output_param
)

# Submit the pipeline
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:757088774357:pipeline/XGBModelPipeline',
 'ResponseMetadata': {'RequestId': 'a812529e-42c6-462f-b279-1d7837f9ba93',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a812529e-42c6-462f-b279-1d7837f9ba93',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '84',
   'date': 'Tue, 11 Jun 2024 05:00:35 GMT'},
  'RetryAttempts': 0}}

In [16]:
# Test the Pipeline
# Start the pipeline execution
execution = pipeline.start()
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:757088774357:pipeline/XGBModelPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:757088774357:pipeline/XGBModelPipeline/execution/8t0z9109buz7',
 'PipelineExecutionDisplayName': 'execution-1718082036665',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2024, 6, 11, 5, 0, 36, 617000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 6, 11, 5, 0, 36, 617000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:757088774357:user-profile/d-kzsg2nqmfwoc/default-1716397287064',
  'UserProfileName': 'default-1716397287064',
  'DomainId': 'd-kzsg2nqmfwoc',
  'IamIdentity': {'Arn': 'arn:aws:sts::757088774357:assumed-role/LabRole/SageMaker',
   'PrincipalId': 'AROA3ARQB5DKRSEFITSVX:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:757088774357:user-profile/d-kzsg2nqmfwoc/default-1716397287064',
  'UserProfileName': 'default-1716397287064'

In [17]:
# Get the execution ARN
execution_arn = execution.arn

# Wait for the execution to complete (optional)
execution.wait()

# Describe the execution to get details
execution_details = sagemaker_client.describe_pipeline_execution(
    PipelineExecutionArn=execution_arn
)

print(execution_details)

# Print execution details
print(f"Pipeline Execution Status: {execution_details['PipelineExecutionStatus']}")
# print(f"Start Time: {execution_details['StartTime']}")
# print(f"End Time: {execution_details['EndTime']}")
if 'FailureReason' in execution_details:
    print(f"Failure Reason: {execution_details['FailureReason']}")


WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

In [None]:
execution_arn = execution_details['PipelineExecutionArn']

# List the steps in the execution
steps = sagemaker_client.list_pipeline_execution_steps(
    PipelineExecutionArn=execution_arn
)

# Fetch details for each step
for step in steps['PipelineExecutionSteps']:
    print(f"Step Name: {step['StepName']}")
    print(f"Step Status: {step['StepStatus']}")
    if 'FailureReason' in step:
        print(f"Failure Reason: {step['FailureReason']}")
    
    # Fetch processing job details if it's a processing step
    if step['StepStatus'] == ('Processing' or 'Executing'):
        processing_job_arn = step['Metadata']['ProcessingJob']['Arn']
        processing_job_details = sagemaker_client.describe_processing_job(
            ProcessingJobName=processing_job_arn.split('/')[-1]
        )
        log_group_name = processing_job_details['ProcessingJob']['ProcessingJobName']
        log_stream_name = processing_job_details['ProcessingJob']['ProcessingJobStatus']
        
        print(f"Log Group Name: {log_group_name}")
        print(f"Log Stream Name: {log_stream_name}")
        print("---------")

In [None]:
from sagemaker.s3 import S3Downloader

# List the steps in the execution
steps = sagemaker_client.list_pipeline_execution_steps(
    PipelineExecutionArn=execution_arn
)

# Find the evaluation step
evaluation_step = next(step for step in steps['PipelineExecutionSteps'] if step['StepName'] == 'EvaluateXGBModel')

print(execution_arn)
print(evaluation_step)



# Get the Arn of the evaluation report
evaluation_output = evaluation_step['Metadata']['ProcessingJob']['Arn']

# Describe the processing job
processing_job_details = sagemaker_client.describe_processing_job(
    ProcessingJobName=evaluation_output.split('/')[-1]
)

# Get the S3 output location
s3_output_uri = processing_job_details['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print(f"S3 Output URI: {s3_output_uri}")

# Define local path for downloading the evaluation report
local_evaluation_file = 'evaluation.json'

# Download the evaluation report using S3Downloader
S3Downloader.download(s3_uri=f'{s3_output_uri}/evaluation.json', local_path=local_evaluation_file)

# Now you can read and print the evaluation report
with open(local_evaluation_file, 'r') as f:
    evaluation_report = json.load(f)
    print(json.dumps(evaluation_report, indent=4))