# (05) Deployment Pipeline (New Dataset, Unsupervised, no Annotations ONLY)

In [1]:
import boto3
import sagemaker
from sagemaker import get_execution_role
import pandas as pd
from datetime import datetime

role = get_execution_role()

region = boto3.Session().region_name

s3_client = boto3.client("s3")
sm_client = boto3.client("sagemaker")

#sess = sagemaker.Session()


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
# Production Pipeline Input Parameters

# configure root S3 bucket where everything under the sun, all experiments are stored, during development phase
bucket_name = "aai-590-tmp2"


In [17]:
# Configure s3 locations and sagemaker model to use
s3_newdata_dir = f's3://{bucket_name}/data_split/train_val/validation2'
s3_newdata_csv = f's3://{bucket_name}/data_split/train_val/validation2/val-meta.csv' # used only to extract annotations later
s3_newdata_manifest = f's3://{bucket_name}/data_split/train_val/validation2/val-meta.manifest' # should have been generated from datapreprocessing pipeline
s3_label_map_uri = f"s3://{bucket_name}/data_split/train_val/label_mapping.json"

-----
## Generate Monthly Ground Truth Data from Production Set

In [None]:
import subprocess
import sys
import os

def get_repo_root():
    return subprocess.check_output(['git', 'rev-parse', '--show-toplevel']).decode('utf-8').strip()
repo_root = get_repo_root()
print(repo_root)

src_path = os.path.join(repo_root, 'src')
# Add src_path to sys.path if not already present
if src_path not in sys.path:
    sys.path.insert(0, src_path)

In [None]:
# try importing src/utils
from utils.utils import parse_s3_uri
from utils.utils import generate_manifest_file

----
## Design Monitoring Pipeline

In [4]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.steps import TransformStep, ProcessingStep
from sagemaker.transformer import Transformer
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.functions import Join
from sagemaker.inputs import TransformInput
from sagemaker.model import Model
from sagemaker import ModelPackage
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession

### Pipeline Input Parameters

In [5]:
s3_newdata_manifest

's3://aai-590-tmp2/data_split/train_val/validation2/val-meta.manifest'

In [14]:
# Pipeline parameters

# s3 uri folder where a csv and manifest file of new images collected in that month are present, with annotations, this is the ground truth folder
# and this is where evaluation files are going to be stored

s3_new_input_dir = ParameterString(name="s3InputDir", default_value=s3_newdata_dir) # must have manifest file

model_package_arn = ParameterString(
        name="ModelPackageArn", 
        default_value = 'arn:aws:sagemaker:us-east-1:324183265896:model-package/wildscan-image-classifier-fixed-locs/1')
#pipeline_timestamp = ParameterString(name="PipelineTimestamp",default_value="") # to simulate production set time-series performance on cloudwatch

### Create Model Step 

In [10]:
# Create Model resource from input model package ARN
pipeline_session = PipelineSession()
model = ModelPackage(
    model_package_arn=model_package_arn.default_value,
    role=role,
    sagemaker_session=pipeline_session
)

create_model_step = ModelStep(
    name="LoadSpecifiedModel",
    step_args=model.create(instance_type="ml.m5.large")
)



### Configure Transformer Step 

In [16]:
# set transformer output s3 location
s3_transform_out = Join( on='/', values=[s3_new_input_dir, "batch_transform_out"])

# set manifest file s3 loc
s3_manifest_file = Join( on='/', values=[s3_new_input_dir, "labels.manifest"])

# initialize Tranformer
transformer = Transformer(
    model_name = create_model_step.properties.ModelName,
    instance_count=1,  # Number of instances
    instance_type="ml.g4dn.xlarge",  # Instance type
    output_path= s3_transform_out,  # Predictions output
    max_payload=10,  # Max payload size (MB)
    strategy="MultiRecord" , # for faster processing, but in real world, instance type can be ml.m5.xlarge and single record strategy is ok
    max_concurrent_transforms=10,
    sagemaker_session=pipeline_session,

    accept = 'txt/csv', # so output is generated in single file
    assemble_with='Line', # new line is generated for each prediction

)

# configure transformer STep
transform_step = TransformStep(
    name= 'BatchTransform',
    transformer = transformer,
    inputs = TransformInput(
                data=s3_manifest_file,
                data_type='ManifestFile', # provide list of s3uris of objects to be batch transformed
                content_type='application/x-image', 
                split_type='None'
            )
)

### Configure Evaluation Step

In [18]:
# retrieve image_uri for evaluation script processor container
image_uri = sagemaker.image_uris.retrieve(
    framework='sklearn',        # or 'xgboost', 'pytorch', etc.
    region=region,
    version='1.2-1',            # Specify the version you need
    py_version='py3',           # Specify Python version if required
       # Use 'processing' for processing jobs
)

# Define your processing container (can use a built-in or custom image)
evaluation_processor = ScriptProcessor(
    command=['python3'],
    image_uri=image_uri,  # e.g., a scikit-learn or custom image
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    
)

s3_evaluation_out = Join( on='/', values=[s3_new_input_dir, "evaluation"])
s3_true_meta_uri = Join( on='/', values=[s3_new_input_dir, "labels.csv"])

# Define the evaluation Processing step
evaluation_step = ProcessingStep(
    name="ModelEvaluation",
    processor=evaluation_processor,
    code='../src/evaluation/evaluate.py',  # Your processing script,
    
    inputs=[
        # S3 location of batch transform predictions files
        ProcessingInput(
            source=transform_step.properties.TransformOutput.S3OutputPath,       # S3 bucket with predictions
            destination='/opt/ml/processing/input_predictions'        # Where the script will read input in local container
        ),
        
        # S3 location of the ground truth labels for the images in this set
        ProcessingInput(
            source=s3_true_meta_uri,
            destination='/opt/ml/processing/true_labels'
        ),

        # Label Mapping
        ProcessingInput(
            source=s3_label_map_uri,
            destination='/opt/ml/processing/label_mapping'
        )
    ],
    
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output',           # Where the script will write output files in local container
            destination=s3_evaluation_out    # S3 bucket to store results
        )
    ]
)


INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


-----
## Assemble the Pipeline 
### (add more steps later like Conditional for Continuous D)

In [19]:
pipeline = Pipeline(
    name="CapstonePipelineTrial",
    parameters=[s3_new_input_dir, model_package_arn],
    steps=[create_model_step, transform_step, evaluation_step],
    sagemaker_session=pipeline_session
)


pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:324183265896:pipeline/CapstonePipelineTrial',
 'ResponseMetadata': {'RequestId': '1ad842dd-cc8b-4ea5-afbc-af693a9f5819',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1ad842dd-cc8b-4ea5-afbc-af693a9f5819',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '89',
   'date': 'Tue, 15 Jul 2025 14:26:51 GMT'},
  'RetryAttempts': 0}}

-----
## Execute Pipeline on Validation Set

In [None]:
execution =pipeline.start(parameters={
    "s3InputDir": f's3://{bucket_name}/data_split/train_val/validation2',
    "ModelPackageArn": 'arn:aws:sagemaker:us-east-1:324183265896:model-package/wildscan-image-classifier-fixed-locs/1',
    
})

execution.describe()
execution.wait()
