In [2]:
import pandas as pd
import json
import boto3
import pathlib
import io
import sagemaker
from time import gmtime, strftime, sleep
from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput, 
    ProcessingOutput, 
    ScriptProcessor
)
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
    ProcessingStep, 
    TrainingStep, 
    CreateModelStep
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig, 
    ClarifyCheckStep, 
    ModelExplainabilityCheckConfig
)
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.conditions import (
    ConditionGreaterThan,
    ConditionGreaterThanOrEqualTo
)
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import (
    Join,
    JsonGet
)
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)
from sagemaker.drift_check_baselines import DriftCheckBaselines

from sagemaker.image_uris import retrieve

sagemaker.__version__

'2.165.0'

In [3]:
%store -r 

%store

try:
    initialized
except NameError:
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN 00-start-here notebook   ")
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")

Stored variables and their in-db values:
baseline_s3_url                        -> 's3://sagemaker-eu-central-1-567821811420/from-ide
bucket_name                            -> 'sagemaker-eu-central-1-567821811420'
bucket_prefix                          -> 'from-idea-to-prod/xgboost'
domain_id                              -> 'd-ivd5gnez0yil'
evaluation_s3_url                      -> 's3://sagemaker-eu-central-1-567821811420/from-ide
experiment_name                        -> 'from-idea-to-prod-experiment-19-09-16-12'
initialized                            -> True
input_s3_url                           -> 's3://sagemaker-eu-central-1-567821811420/from-ide
model_package_group_name               -> 'from-idea-to-prod-model-group'
output_s3_url                          -> 's3://sagemaker-eu-central-1-567821811420/from-ide
prediction_baseline_s3_url             -> 's3://sagemaker-eu-central-1-567821811420/from-ide
region                                 -> 'eu-central-1'
sm_role               

In [4]:
# Set names of pipeline objects
project = "emo-video"

pipeline_name = f"{project}-pipeline"
pipeline_model_name = f"{project}-model-resnet"
model_package_group_name = f"{project}-model-group"
endpoint_config_name = f"{project}-endpoint-config"
endpoint_name = f"{project}-endpoint"

In [5]:
# Set instance types and counts
process_instance_type = "ml.c5.xlarge"
train_instance_count = 1
train_instance_type = "ml.m5.xlarge"

In [6]:
# Exercise 1 - write code here
boto_session = boto3.Session()
region = boto_session.region_name
bucket_name = sagemaker.Session().default_bucket()
bucket_prefix ="emovideo/resnet"  
sm_session = sagemaker.Session()
sm_client = boto_session.client("sagemaker")
sm_role = sagemaker.get_execution_role()

initialized =True
print(sm_role)

arn:aws:iam::567821811420:role/service-role/AmazonSageMaker-ExecutionRole-20230619T084765


In [7]:
# Set S3 urls for processed data
train_s3_url = f"s3://{bucket_name}/{bucket_prefix}/train"
validation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/validation"
test_s3_url = f"s3://{bucket_name}/{bucket_prefix}/test"
baseline_s3_url = f"s3://{bucket_name}/{bucket_prefix}/baseline"

evaluation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/evaluation"
prediction_baseline_s3_url = f"s3://{bucket_name}/{bucket_prefix}/prediction_baseline"

output_s3_url = f"s3://{bucket_name}/{bucket_prefix}/output"

In [8]:
%store train_s3_url
%store validation_s3_url
%store test_s3_url
%store baseline_s3_url
%store model_package_group_name
%store evaluation_s3_url
%store prediction_baseline_s3_url
%store output_s3_url

Stored 'train_s3_url' (str)
Stored 'validation_s3_url' (str)
Stored 'test_s3_url' (str)
Stored 'baseline_s3_url' (str)
Stored 'model_package_group_name' (str)
Stored 'evaluation_s3_url' (str)
Stored 'prediction_baseline_s3_url' (str)
Stored 'output_s3_url' (str)


In [9]:
try:
    input_s3_url
except NameError:      
    # If input_s3_url is not defined, upload the dataset to S3 and store the path
    input_s3_url = sagemaker.Session().upload_data(
        path="data/fer2013.csv",
        bucket=bucket_name,
        key_prefix=f"{bucket_prefix}/input"
    )
    print(f"Upload the dataset to {input_s3_url}")

    %store input_s3_url

In [10]:
# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set training instance count
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=train_instance_count
)

# Set model approval param
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

# Minimal threshold for model performance on the test dataset
test_score_threshold_param = ParameterFloat(
    name="TestScoreThreshold", 
    default_value=0.75
)

# Set S3 url for input dataset
input_s3_url_param = ParameterString(
    name="InputDataUrl",
    default_value=input_s3_url,
)

In [11]:
session = PipelineSession()

Preprocess step

In [12]:
%%writefile preprocessing.py

import pandas as pd
import numpy as np
import argparse
import os

def _parse_args():
    
    parser = argparse.ArgumentParser()
    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--filepath', type=str, default='/opt/ml/processing/input/')
    parser.add_argument('--filename', type=str, default='fer2013.csv')
    parser.add_argument('--outputpath', type=str, default='/opt/ml/processing/output/')
    
    return parser.parse_known_args()


if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    all_data= pd.read_csv(os.path.join(args.filepath, args.filename), sep=";")
    groups = [g for _, g in all_data.groupby('Usage')]
    training_data = groups[2]
    validation_data = groups[1]
    testing_data = groups[0]
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    label_names = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']
    def make_dataloader(data, batch_size, shuffle):
        images, labels = data['pixels'], data['emotion']
        images = np.array([np.fromstring(image, np.uint8, sep=' ') for image in images]) / 255.0 # normalizing data to be between 0 and 1
        images = torch.FloatTensor(images.reshape(images.shape[0], 1, 48, 48)).to(device) # 1 color channel, 48x48 images
        dataset = torch.utils.data.TensorDataset(images, torch.LongTensor(np.array(labels)).to(device))
        return torch.utils.data.DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle)
    train_loader = make_dataloader(training_data, 100, True)
    valid_loader = make_dataloader(validation_data, 100, False)
    
    

Writing preprocessing.py


In [13]:
sklearn_processor = SKLearnProcessor(
        framework_version="0.23-1",
        role=sm_role,
        instance_type=process_instance_type_param,
        instance_count=1,
        base_job_name=f"{pipeline_name}/preprocess",
        sagemaker_session=session,
    )
    
processing_inputs=[
    ProcessingInput(source=input_s3_url_param, destination="/opt/ml/processing/input")
]

processing_outputs=[
    ProcessingOutput(output_name="train_data", source="/opt/ml/processing/output/train", 
                     destination=train_s3_url),
    ProcessingOutput(output_name="validation_data", source="/opt/ml/processing/output/validation",
                     destination=validation_s3_url),
    ProcessingOutput(output_name="test_data", source="/opt/ml/processing/output/test",
                     destination=test_s3_url),
    ProcessingOutput(output_name="baseline_data", source="/opt/ml/processing/output/baseline", 
                     destination=baseline_s3_url),
]

processor_args = sklearn_processor.run(
    inputs=processing_inputs,
    outputs=processing_outputs,
    code='preprocessing.py',
    # arguments = ['arg1', 'arg2'],
)
    
# Define processing step
step_process = ProcessingStep(
    name=f"{pipeline_name}-preprocess-data",
    step_args=processor_args,
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


Training step

In [14]:
%%writefile pyorch-train.py
from torch import optim
from torchvision import models
import torch.nn as nn
import random
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import torch
import torch.utils.data
import matplotlib.pyplot as plt

random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
torch.cuda.manual_seed(0)
torch.backends.cudnn.deterministic = True

def adjust_model(model):
    model.conv1 = nn.Conv2d(1, 64, model.conv1.kernel_size, model.conv1.stride, model.conv1.padding, bias=False)
    model.fc = nn.Linear(model.fc.in_features, 7, bias=False)
    return model

def train_model(model, criterion, optimizer, data_loader, eval_loader):
    model = model.to(device)
    test_accuracy_history = []
    test_loss_history = []
    for epoch in range(epochs):
        print(f"Training epoch {epoch}")
        
        model.train()
        for data, labels in data_loader:
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            
        accuracy, loss = eval_model(model, eval_loader, criterion)
        print(f"Acc: {accuracy}, loss: {loss}")
        test_accuracy_history.append(accuracy)
        test_loss_history.append(loss)
    return test_accuracy_history, test_loss_history

def eval_model(model, data_loader, criterion):
    model.eval()
    with torch.no_grad():
        accuracy = 0
        loss = 0
        for data, labels in data_loader:
            output = model(data)
            _, preds = torch.max(output.data, 1)
            equals = (preds == labels).cpu()
            accuracy += torch.mean(equals.type(torch.FloatTensor)).item()
            loss += criterion(output, labels).data.cpu()
        return accuracy/len(data_loader), loss/len(data_loader)


Writing pyorch-train.py


In [15]:
epochs = 10
learning_rate = 0.1

In [None]:
!pip install -q opencv-python torchvision facenet_pytorch torch gaze-tracking opencv-python ffmpeg-python

In [21]:
from sagemaker.pytorch.estimator import PyTorch
import torch

ModuleNotFoundError: No module named 'torch'

In [19]:
pytorch_estimator = PyTorch('pytorch-train.py',
                            instance_type='ml.p3.2xlarge',
                            instance_count=1,
                            framework_version='1.8.0',
                            py_version='py3',
                            role = sm_role,
                            hyperparameters = {'epochs': 20, 'batch-size': 64, 'learning-rate': 0.1})

Evaluation step

Register step

In [20]:
model = Model(
    image_uri=pytorch-train.py,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    name=f"emovideo",
    sagemaker_session=session,
    role=sm_role,
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge", "ml.m5.large"],
    transform_instances=["ml.m5.xlarge", "ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status_param,
    model_metrics=model_metrics,
)

step_register = ModelStep(
    name=f"{pipeline_name}-register",
    step_args=register_args
)

NameError: name 'pytorch' is not defined

Pipeline constructer

In [None]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        process_instance_type_param,
        train_instance_type_param,
        train_instance_count_param,
        model_approval_status_param,
        test_score_threshold_param,
        input_s3_url_param,
    ],
    steps=[step_process, step_train, step_eval],
    sagemaker_session=session,
)

In [None]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sm_role)

In [None]:
pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
pipeline_definition

Execution pipeline

In [None]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType=process_instance_type,
        TrainingInstanceType=train_instance_type,
        TrainingInstanceCount=train_instance_count,
        ModelApprovalStatus="PendingManualApproval",
        TestScoreThreshold=0.75,
        InputDataUrl=input_s3_url
    )
)

In [None]:
execution.list_steps()