In [None]:
!pip install -U sagemaker --quiet # Ensure latest version of SageMaker is installed

# Deepfake Pipeline

In [2]:
import os
import time
import sagemaker
import sagemaker.session
from sagemaker.processing import FrameworkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import (
    ProcessingStep, 
    CacheConfig,
    TrainingStep,
    TuningStep 
)
from sagemaker.workflow.functions import Join
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.tuner import (
    IntegerParameter,
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import RegisterModel, CreateModelStep
from sagemaker.inputs import TrainingInput
from sagemaker.pytorch import PyTorch, PyTorchModel

from sagemaker.workflow.pipeline_context import PipelineSession

In [3]:
session = sagemaker.session.Session()
region = session.boto_region_name
role = sagemaker.get_execution_role()
bucket = session.default_bucket()
pipeline_session = PipelineSession()

In [4]:
bucket_name = "deepfake-detection"
dataset_prefix = "dev_datasets"
output_prefix = f"{dataset_prefix}/preprocessed_data"
model_package_group_name = "deepfake-detection-models"  
pipeline_name = "Deepfake-detection-pipeline"  

<a id='parameters'></a>

### Pipeline input parameters

In [5]:
# What instance type to use for training
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")

# What is the default status of the model when registering with model registry.
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

# # Cache Pipeline steps to reduce execution time on subsequent executions
# cache_config = CacheConfig(enable_caching=True, expire_after="30d")

<a id='preprocess'></a>

## Preprocess data step

In [6]:
# Create FrameworkProcessor
pytorch_processor = FrameworkProcessor(
    estimator_cls=PyTorch,
    framework_version='1.12.0',
    py_version='py38',
    instance_type='ml.m5.xlarge',
    instance_count=1, # multiple machine distributed computing
    base_job_name='deepfake-processing',
    sagemaker_session=pipeline_session,
    role=role
)

In [7]:
frames_per_video = 15
batch_size = 32
face_size = 224

In [8]:
timestamp_prefix = str(int(time.time()))

step_args = pytorch_processor.run(
        code="preprocess_deepfake.py",
        source_dir='preprocess',
        outputs=[
            ProcessingOutput(
                output_name="train", 
                source="/opt/ml/processing/output/train",
                destination=Join(
                    on="/",
                    values=[
                        "s3://{}".format(bucket_name),
                        output_prefix,
                        timestamp_prefix,
                        "train",
                    ],
                ),
            ),
            ProcessingOutput(
                output_name="validation", 
                source="/opt/ml/processing/output/validation",
                destination=Join(
                    on="/",
                    values=[
                        "s3://{}".format(bucket_name),
                        output_prefix,
                        timestamp_prefix,
                        "validation",
                    ],
                ),
            ),
            ProcessingOutput(
                output_name="test", 
                source="/opt/ml/processing/output/test",
                destination=Join(
                    on="/",
                    values=[
                        "s3://{}".format(bucket_name),
                        output_prefix,
                        timestamp_prefix,
                        "test",
                    ],
                ),
            ),
        ],
        arguments=['--frames_per_video', str(frames_per_video),
                   '--batch_size', str(batch_size),
                   '--face_size', str(face_size)
                  ]
    )



In [9]:
step_preprocess_data = ProcessingStep(
    name="preprocess-deepfake-data",
    step_args=step_args
)

<a id='training'></a>

## Train model step
In the second step, the train and validation output from the precious processing step are used to train a model. 

In [10]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.pytorch import PyTorch

# Define the output path for the model artifacts from the Hyperparameter Tuning / Training Job
model_path = f"s3://{bucket_name}/{dataset_prefix}/training"

est = PyTorch(
    entry_point="train.py",
    source_dir="./code",  # directory of your training script
    role=role,
    framework_version="1.12.1",
    py_version="py38",
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    hyperparameters={"batch-size": 32, 
                     "epochs": 1, 
                     "learning-rate": 1e-4
                    },
)

# NOTE how the input to the training job directly references the output of the previous step.
step_train_model = TrainingStep(
    name="train-deepfake-detection-model",
    estimator=est,
    inputs={
        "train": TrainingInput(
            s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri
        ),
        "validation": TrainingInput(
            s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri
        )
    }
)

<a id='tuning'></a>

## Hyperparameter tuning 
Amazon SageMaker automatic model tuning, also known as hyperparameter tuning, finds the best version of a model by running many training jobs on your dataset using the algorithm and ranges of hyperparameters that you specify. It then chooses the hyperparameter values that result in a model that performs the best, as measured by a metric that you choose.

In [None]:
# Define the output path for the model artifacts from the Hyperparameter Tuning / Training Job
model_path = f"s3://{bucket_name}/{dataset_prefix}/training"

est = PyTorch(
    entry_point="train.py",
    source_dir="./code",  # directory of your training script
    role=role,
    framework_version="1.12.1",
    py_version="py38",
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    hyperparameters={"batch-size": 32, 
                     "epochs": 1, 
                     "learning-rate": 1e-4, 
                     "num_workers": 2
                    },
)

In [None]:
# "batch-size": CategoricalParameter([32, 64]),
# "learning-rate": ContinuousParameter(0.01, 0.1)
hyperparameter_ranges = {
    "batch-size": CategoricalParameter([32, 64])
}

In [None]:
objective_metric_name = "accuracy"
objective_type = "Maximize"
metric_definitions = [{"Name": "accuracy", "Regex": "'ndcg@5': ([0-9\\.]+)"}]

In [None]:
# early_stopping_type (str) – Specifies whether early stopping is enabled for the job. 
# Stop the training jobs that a hyperparameter tuning job launches early when they are not improving significantly as measured by the objective metric. 
# Stopping training jobs early can help reduce compute time and helps you avoid overfitting your model. 

tuner = HyperparameterTuner(
    est,
    objective_metric_name,
    hyperparameter_ranges,
    metric_definitions,
    max_jobs=2,
    max_parallel_jobs=2,
    objective_type=objective_type,
)

In [None]:
# add cache config if necessary
# cache_config=cache_config
step_tuning = TuningStep(
    name="deepfake-HPTuning",
    tuner=tuner,
    inputs={
        "train": TrainingInput(
            s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri
        ),
        "validation": TrainingInput(
            s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri
        )
    }
)

<a id='evaluate'></a>

### Evaluate top model HPO
After successfully completing the Hyperparameter Tuning job. You can either create SageMaker models from the model artifacts created by the training jobs from the TuningStep or register the models into the Model Registry.

When using the model Registry, if you register multiple models from the TuningStep, they will be registered as versions within the same model package group unless unique model package groups are specified for each RegisterModelStep that is part of the pipeline.

In this notebook, the two best models from the TuningStep are added to the same model package group in the Model Registry as v0 and v1.

You use the get_top_model_s3_uri method of the TuningStep class to get the model artifact from one of the top performing model versions


In [None]:
# A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. In this case, the top performing model
# is evaluated. Based on the results of the evaluation, the model is registered into the Model Registry using a ConditionStep.
from sagemaker.image_uris import retrieve
from sagemaker.processing import ScriptProcessor
from sagemaker import get_execution_role

model_bucket_key = f"s3://{bucket_name}/{dataset_prefix}/training"

image_uri = retrieve(
    framework='pytorch', 
    region='us-east-1', 
    version='1.12.1', 
    py_version='py38', 
    image_scope='training',
    instance_type='ml.m5.xlarge'
)

evaluate_model_processor = ScriptProcessor(
    role=get_execution_role(),
    image_uri=image_uri,
    command=['python3'],
    instance_count=1,
    instance_type='ml.m5.xlarge'
)

# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
    name="BestTuningModelEvaluationReport", output_name="evaluation", path="evaluation.json"
)

# This can be extended to evaluate multiple models from the HPO step
# cache_config=cache_config
step_evaluate_model = ProcessingStep(
    name="Evaluate-Top-Deepfake-Detection-Model",
    processor=evaluate_model_processor,
    inputs=[
        ProcessingInput(
            source=step_tuning.get_top_model_s3_uri(top_k=0, s3_bucket=model_bucket_key),
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_preprocess_data.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            destination="/opt/ml/processing/train",
        ),
         ProcessingInput(
            source=step_preprocess_data.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            destination="/opt/ml/processing/validation",
        ),
        ProcessingInput(
            source=step_preprocess_data.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation", 
            source="/opt/ml/processing/evaluation",
            destination = Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    "deepfake/output",
                    timestamp_prefix,
                    "evaluation-report",
                ],
            ),
        ),
    ],
    code="evaluate/evaluate.py",
    property_files=[evaluation_report]
)

## Evaluate model

In [11]:
# A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. In this case, the top performing model
# is evaluated. Based on the results of the evaluation, the model is registered into the Model Registry using a ConditionStep.
from sagemaker.image_uris import retrieve
from sagemaker.processing import ScriptProcessor
from sagemaker import get_execution_role

image_uri = retrieve(
    framework='pytorch', 
    region='us-east-1', 
    version='1.12.1', 
    py_version='py38', 
    image_scope='training',
    instance_type='ml.m5.xlarge'
)

evaluate_model_processor = ScriptProcessor(
    role=get_execution_role(),
    image_uri=image_uri,
    command=['python3'],
    instance_count=1,
    instance_type='ml.m5.xlarge'
)

# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

# This can be extended to evaluate multiple models from the HPO step
# cache_config=cache_config
step_evaluate_model = ProcessingStep(
    name="evaluate-deepfake-detection-model",
    processor=evaluate_model_processor,
    inputs=[
        ProcessingInput(
            source=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_preprocess_data.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation", 
            source="/opt/ml/processing/evaluation",
            destination = Join(
                on="/",
                values=[
                    "s3://{}".format(bucket_name),
                    dataset_prefix,
                    "evaluation",
                    timestamp_prefix,
                    "evaluation-report",
                ],
            ),
        ),
    ],
    code="evaluate/evaluate.py",
    property_files=[evaluation_report]
)

<a id='register'></a>

## Register model step
If the trained model meets the model performance requirements a new model version is registered with the model registry for further analysis. To attach model metrics to the model version, create a [ModelMetrics](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html) object using the evaluation report created in the evaluation step. Then, create the RegisterModel step.

In [12]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel

# Create ModelMetrics object using the evaluation report from the evaluation step
# A ModelMetrics object contains metrics captured from a model.
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            on="/",
            values=[
                step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
                    "S3Uri"
                ],
                "evaluation.json",
            ],
        ),
        content_type="application/json",
    )
)

# Crete a RegisterModel step, which registers the model with Sagemaker Model Registry.
step_register_model = RegisterModel(
    name="deepfake-detection",
    estimator=est,
    model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/x-recordio-protobuf"], 
    response_types=["application/json"],
    inference_instances=["ml.m5.xlarge", "ml.m5.large"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

<a id='condition'></a>

## Condition step
Adding conditions to the pipeline is done with a ConditionStep.
In this case, we only want to register the new model version with the model registry if the new model meets a ndcg condition.

In [13]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# Create accuracy condition to ensure the model meets performance requirements.
# Models with a test accuracy lower than the condition will not be registered with the model registry.
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_evaluate_model.name,
        property_file=evaluation_report,
        json_path="deepfake_detection_metrics.accuracy.value",
    ),
    right=0.40, #minimum accuracy value
)

# Create a Sagemaker Pipelines ConditionStep, using the condition above.
# Enter the steps to perform if the condition returns True / False.
step_cond = ConditionStep(
    name="check-accuracy-evaluation",
    conditions=[cond_gte],
    if_steps=[step_register_model],
    else_steps=[],
)

<a id='orchestrate'></a>

## Pipeline Creation: Orchestrate all steps

Now that all pipeline steps are created, a pipeline is created.

In [14]:
from sagemaker.workflow.pipeline import Pipeline

# Create a Sagemaker Pipeline.
# Each parameter for the pipeline must be set as a parameter explicitly when the pipeline is created.
# Also pass in each of the steps created above.
# Note that the order of execution is determined from each step's dependencies on other steps,
# not on the order they are passed in below.
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        training_instance_type,
        model_approval_status
    ],
    steps=[step_preprocess_data, step_train_model, step_evaluate_model, step_cond],
)

In [15]:
# import json

# definition = json.loads(pipeline.definition())
# definition

In [16]:
# Submit pipline
pipeline.upsert(role_arn=role)

# Execute pipeline using the default parameters.
execution = pipeline.start()

execution.wait()

# List the execution steps to check out the status and artifacts:
execution.list_steps()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.



Job Name:  deepfake-processing-2023-01-04-17-49-38-956
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-674518009863/Deepfake-detection-pipeline/code/364f60ed962f6f568205ec12d8519390/sourcedir.tar.gz', 'LocalPath': '/opt/ml/processing/input/code/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'entrypoint', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-674518009863/Deepfake-detection-pipeline/code/aba9cd410894d4dc1aa101779eb7c8dd/runproc.sh', 'LocalPath': '/opt/ml/processing/input/entrypoint', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3://deepfake-detection', 'dev_datasets/preprocessed_data', '1672854545', 'train']), 'LocalPath': '/opt/ml/pr

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


[{'StepName': 'deepfake-detection-model-RegisterModel',
  'StartTime': datetime.datetime(2023, 1, 4, 18, 3, 34, 190000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 1, 4, 18, 3, 34, 883000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:674518009863:model-package/deepfake-detection-models/2'}}},
 {'StepName': 'check-accuracy-evaluation',
  'StartTime': datetime.datetime(2023, 1, 4, 18, 3, 33, 693000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 1, 4, 18, 3, 33, 929000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Condition': {'Outcome': 'True'}}},
 {'StepName': 'evaluate-deepfake-detection-model',
  'StartTime': datetime.datetime(2023, 1, 4, 17, 58, 48, 378000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 1, 4, 18, 3, 32, 994000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJ