## Orchestrate model evaluation with Amazon SageMaker Pipelines
  
The goal of this notebook is to provide an implementation of a multi-step SageMaker pipeline that will take care of multiple models evaluation, selection and registration into the SageMaker model registry.  
For running this example we will use a model loaded from [Amazon Sagemaker Jumpstart SDK](https://aws.amazon.com/sagemaker/jumpstart/) that will be finetuned and then evaluated against the performance of the same model with default weights.  
This notebook is also using other Amazon SageMaker components:  

[SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines/) is a purpose-built workflow orchestration service to automate all phases of machine learning (ML) from data pre-processing to model monitoring. With an intuitive UI and Python SDK you can manage repeatable end-to-end ML pipelines at scale. The native integration with multiple AWS services allows you to customize the ML lifecycle based on your MLOps requirements.
SageMaker Model Registry

[Amazon SageMaker Model Registry](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html) is a purpose-built metadata store to manage the entire lifecycle of ML models from training to inference. Whether you prefer to store your model artifacts (model framework files, container image) in AWS (Amazon ECR) or outside of AWS in any third party Docker repository, you can now track them all in Amazon SageMaker Model Registry. You also have the flexibility to register a model without read/write permissions to the associated container image. If you want to track an ML model in a private repository, set the optional ‘SkipModelValidation’ parameter to ‘All’ at the time of registration. Later you can also deploy these models for inference in Amazon SageMaker.

[Amazon SageMaker Clarify](https://aws.amazon.com/sagemaker/clarify/) provides purpose-built tools to gain greater insights into your ML models and data, based on metrics such as accuracy, robustness, toxicity, and bias to improve model quality and support responsible AI initiative. With the rise of generative AI, data scientists and ML engineers can leverage publicly available foundation models (FMs) to accelerate speed-to-market. To remove the heavy lifting of evaluating and selecting the right FM for your use case, Amazon SageMaker Clarify supports FM evaluation to help you quickly evaluate, compare, and select the best FM for your use case based on a variety of criteria across different tasks within minutes. It allows you to adopt FMs faster and with confidence.
To perform evaluation we are using the open source library [FMEval](https://github.com/aws/fmeval) that empowers SageMaker Clarify FM model evaluation.

This example is built by following the best practices explained in the blog post [Operationalize LLM Evaluation at Scale using Amazon SageMaker Clarify and MLOps services](https://aws.amazon.com/blogs/machine-learning/operationalize-llm-evaluation-at-scale-using-amazon-sagemaker-clarify-and-mlops-services/). 

### Environment setup
First we need to install required dependencies and import required libraries.  
We also create a sagemaker SDK *config.yml* file with basic pipeline configuration.
This file *config.yml* contains general pipeline parameters like the default pipeline container instance type and the path to the file *dependencies.txt* containing the required dependencies.
These dependencies will be automatically downloaded from the pipeline container at the start of each pipeline step. We will create *requirements.txt* file later in the notebook.

In [None]:
!pip3 install fmeval==0.4.0
!pip3 install sagemaker==2.208

In [None]:
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.function_step import step
import os

In [None]:
%%writefile config.yaml
SchemaVersion: '1.0'
SageMaker:
  PythonSDK:
    Modules:
      RemoteFunction:
        # role arn is not required if in SageMaker Notebook instance or SageMaker Studio
        # Uncomment the following line and replace with the right execution role if in a local IDE
        InstanceType: ml.m5.xlarge
        Dependencies: ./requirements.txt
        IncludeLocalWorkDir: true
        CustomFileFilter:
          IgnoreNamePatterns: # files or directories to ignore
          - "*.ipynb" # all notebook files
          - "__pycache__"

In [None]:
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()

### Evaluation and Fine-Tuning dataset preparation - preprocess step
*output_data_path* will contain the outputs artifacts of the pipeline.
We then add **preprocess** as our first pipeline step. This step will take care of any data preprocessing that must be done to create the evaluation and fine-tuning dataset.
In this example we are going to download the [SCIQ](https://huggingface.co/datasets/sciq) dataset and create from it two dataset for domain adaptation fine-tuning. We also create the evaluation dataset.
The output path of the datasets will be saved into *preprocess_step_ret* object.
Keep in mind the *pipeline_name* as it will be used in SageMaker Studio to find our pipeline in the UI.
Also keep in mind the path of the S3 bucket used as output for reviewing the output artifacts at the end.

In [None]:
@step(name="preprocess")
def preprocess_dft(s3_output_path):
    
    from sagemaker.s3 import S3Uploader
    from datasets import load_dataset
    import os
    
    def safe_open_w(path):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        return open(path, 'w')

    def write_to_file(input_str, file_path):
        with safe_open_w(file_path) as text_file:
            text_file.write(input_str)

    dataset_path = 'allenai/sciq'
    dataset = load_dataset(dataset_path)

    dataset_training_df = dataset['train'].to_pandas()
    dataset_training_df = dataset_training_df.sample(n=400, random_state=42, ignore_index=True)

    finetuning_dataset_filename = "dataset_finetune_daft.txt"
    evaluation_dataset_filename = "dataset_evaluation.jsonl"
    finetuning_dataset_local_path = f"./output/{dataset_path}/{finetuning_dataset_filename}"
    evaluation_dataset_local_path = f"./output/{dataset_path}/{evaluation_dataset_filename}"

    # Create DAFT dataset
    data_train_daft = " \n".join(((dataset_training_df.drop_duplicates(subset=['support']))['support']))
    write_to_file(data_train_daft, finetuning_dataset_local_path)

    # Create evaluation dataset
    dataset_evaluation_df = dataset_training_df[['question', 'correct_answer']].copy()
    dataset_evaluation_df = dataset_evaluation_df.rename(
        columns={"correct_answer": "target_output", "question": "model_input"})
    dataset_evaluation_df.to_json(evaluation_dataset_local_path, orient="records", lines=True)

    finetuning_dataset_s3_path = f"{s3_output_path}/{dataset_path}/finetuning/dft"
    evaluation_dataset_s3_path = f"{s3_output_path}/{dataset_path}/evaluation"
    print("Uploading finetuning dataset...")
    S3Uploader.upload(finetuning_dataset_local_path, f"{finetuning_dataset_s3_path}")
    print("Uploading evaluation dataset...")
    S3Uploader.upload(evaluation_dataset_local_path, f"{evaluation_dataset_s3_path}")

    return {"s3_output_path": s3_output_path,
            "s3_finetune_dataset_path": finetuning_dataset_s3_path,
            "s3_evaluation_data_location": f"{evaluation_dataset_s3_path}/{evaluation_dataset_filename}"}

In [None]:
pipeline_name = "pipeline-example-finetune-evaluate-dft"

default_bucket = sagemaker.Session().default_bucket()
output_data_path = ("s3://"+ default_bucket + "/pipelines_examples/" + pipeline_name)

# You can add your own evaluation dataset code into this step
preprocess_step_ret = preprocess_dft(output_data_path)

print("The pipeline name is "+pipeline_name)
# Mark the name of this bucket for reviewing the artifacts generated by this pipeline at the end of the execution
print("Output S3 bucket: "+output_data_path)

### Setup the models from SageMaker Jumpstart to be finetuned with a domain dataset
Setup the model_id from SageMaker Jumpstart that we are going to finetune with a domain dataset. We will use the default Jumpstart training parameters. The second model will be used as a baseline with default weights.

In [None]:
# We setup required model parameters
model_1 = {
    "model_id": "meta-textgeneration-llama-2-7b"
}
model_1["model_name"] = model_1["model_id"]+"_dft"

model_2 = {
    "model_id": "meta-textgeneration-llama-2-7b",
}
model_2["model_name"] = model_2["model_id"]+"_base"

model_list = [model_1, model_2]
evaluation_results_ret_list = []
deploy_ret_list = []

We are now going to create the pipeline steps. We start with a **finetune** step followed by a **deploy** and **evaluation** steps.
The result of evaluation will be saved into *evaluate_ret* object and appended into evaluation_results_ret_list to be used later.

In [None]:
@step(name="finetune")
def jumpstart_finetune(model, preprocess_step_ret):
    
    from sagemaker.jumpstart.estimator import JumpStartEstimator

    model_id = model["model_id"]

    train_data_path = preprocess_step_ret["s3_finetune_dataset_path"]

    estimator = JumpStartEstimator(
        model_id=model_id,
        environment={"accept_eula": "true"},
        disable_output_compression=False)

    estimator.fit(inputs={"training": train_data_path})
    training_job_name = estimator.latest_training_job.name

    return {"training_job_name": training_job_name}


@step(name="deploy")
def jumpstart_deploy(model, finetune_step_ret=None):
    
    import sagemaker
    from sagemaker.jumpstart.model import JumpStartModel
    from sagemaker.jumpstart.estimator import JumpStartEstimator

    model_id = model["model_id"]

    if finetune_step_ret is None:
        model = JumpStartModel(model_id=model_id)
        predictor = model.deploy(accept_eula=True)
        return {"model_endpoint": predictor.endpoint_name, "model_deployed": True, "is_finetuned_model": False}
    
    else:
        training_job_name = finetune_step_ret["training_job_name"]

        estimator = JumpStartEstimator.attach(training_job_name, model_id=model_id)
        estimator.logs()
        predictor = estimator.deploy(serializer=sagemaker.serializers.JSONSerializer(),
                                     deserializer=sagemaker.deserializers.JSONDeserializer())

        return {"model_endpoint": predictor.endpoint_name, "model_deployed": True, "is_finetuned_model": True, "training_job_name": training_job_name}

    
@step(name="evaluate")
def evaluation(model, preprocess_step_ret, deploy_step_ret):
    
    import boto3
    import markdown
    from sagemaker.s3_utils import parse_s3_url
    from fmeval.data_loaders.data_config import DataConfig
    from fmeval.reporting.eval_output_cells import EvalOutputCell
    from fmeval.constants import MIME_TYPE_JSONLINES
    from fmeval.model_runners.sm_jumpstart_model_runner import JumpStartModelRunner
    from fmeval.eval_algorithms.factual_knowledge import FactualKnowledge, FactualKnowledgeConfig

    s3 = boto3.client("s3")

    model_id = model["model_id"]
    model_name = model["model_name"]
    
    # FMEval library needs three components:
    # - The evaluation dataset
    # - A model runner
    # - An algorithm to use
    # We will configure each of this components in the following lines:

    # Get the dataset
    data_s3_path = preprocess_step_ret["s3_evaluation_data_location"]
    bucket, object_key = parse_s3_url(data_s3_path)
    print(bucket)
    print(object_key)
    s3.download_file(bucket, object_key, "dataset.jsonl")

    # Configure FMEval for reading the dataset
    config = DataConfig(
        dataset_name="dataset",
        dataset_uri="dataset.jsonl",
        dataset_mime_type=MIME_TYPE_JSONLINES,
        model_input_location="model_input",
        target_output_location="target_output",
    )

    # Create a JumpStartModelRunner that will be used by FMEval library to perform the call to the model 
    # and the evaluation of each entry of the evaluation dataset
    endpoint_name = deploy_step_ret["model_endpoint"]
    js_model_runner = JumpStartModelRunner(
        endpoint_name=endpoint_name,
        model_id=model_id,
        custom_attributes="accept_eula=true"
    )

    # Configure and launch FactualKnowledge evaluation algorithm
    eval_output_all = []
    eval_algo = FactualKnowledge(FactualKnowledgeConfig("<OR>"))
    eval_output = eval_algo.evaluate(
        model=js_model_runner,
        dataset_config=config,
        prompt_template="$feature",
        save=True,
    )
    eval_output_all.append(eval_output)

    # Save results to S3
    s3 = boto3.resource("s3")
    output_bucket, output_index = parse_s3_url(preprocess_step_ret["s3_output_path"])
    
    html = markdown.markdown(str(EvalOutputCell(eval_output[0])))
    file_index = (
            output_index
            + "/"
            + model_name
            + "_"
            + eval_algo.eval_name
            + ".html"
    )

    s3_object = s3.Object(bucket_name=output_bucket, key=file_index)
    s3_object.put(Body=html)

    return {"evaluation_output": eval_output_all, "model_name": model_name, "model_id":  model_id}

In [None]:
finetune_ret = jumpstart_finetune(model_1, preprocess_step_ret)

# Deploy step is using the output from the finetune step (the training job name)
deploy_ret = jumpstart_deploy(model_1, finetune_ret)

# Evaluation step is using the output from preprocess (the S3 location of the evaluation dataset file) 
# and the output of the deploy step (the endpoint name)
evaluate_ret = evaluation(model_1, preprocess_step_ret, deploy_ret)

evaluation_results_ret_list.append(evaluate_ret)
deploy_ret_list.append(deploy_ret)

We create the same for the second model, deployed with default weights. We setup a **deploy** step followed by an **evaluation** step.
Like before, the result of evaluation will be saved into *evaluate_ret* object and appended into evaluation_results_ret_list to be used later.

In [None]:
deploy_ret = jumpstart_deploy(model_2)

# Evaluation step is using the output from preprocess (the S3 location of the evaluation dataset file) 
# and the output of the deploy step (the endpoint name)
evaluate_ret = evaluation(model_2, preprocess_step_ret, deploy_ret)

evaluation_results_ret_list.append(evaluate_ret)
deploy_ret_list.append(deploy_ret)

### Select best model and register it into SageMaker Model Registry
Now it's time to select best model. To do so we create a pipeline step dedicated to the best model **selection**.
The selection is using the output of all the models' evaluation.
The output of the **selection** step is the best model name. We will use the best model name in the **register** step.  
The **register** step will also need a package group and description name.

In [None]:
@step(name="selection")
def selection(*evaluate_ret_list):

    # Select best model.
    # We are evaluating Factual Knowledge thus we are looking for the maximum score

    max_score = -1
    best_model_name = None
    best_evaluation_output = None

    for evaluate_ret in evaluate_ret_list:
        print(evaluate_ret)
        model_name = evaluate_ret['model_name']

        eval_result = evaluate_ret['evaluation_output']
        eval_output = eval_result[0][0]
        eval_score = eval_output.dataset_scores[0].value
        print(eval_score)

        if eval_score > max_score:
            max_score = eval_score
            best_model_name = model_name
            best_evaluation_output = eval_result

    return {"evaluation_output": best_evaluation_output, "model_name": best_model_name}


@step(name="register_model")
def register(evaluate_ret, deploy_ret, output_data_path, model_package_group_name, model_package_group_description):
    
    from sagemaker import ModelMetrics, MetricsSource
    from sagemaker.jumpstart.model import JumpStartModel
    from sagemaker.jumpstart.estimator import JumpStartEstimator
    from sagemaker.s3_utils import parse_s3_url
    import boto3
    import json

    sm_client = boto3.client("sagemaker")
    s3_client = boto3.client("s3")
    eval_result = evaluate_ret["evaluation_output"][0][0]

     # Upload evaluation report of the best model to s3
    eval_report_s3_uri = output_data_path + "/evaluation-report/" + evaluate_ret["model_name"] + ".json"
    bucket, object_key = parse_s3_url(eval_report_s3_uri)
    eval_report_str = json.dumps(
    {
        "score": eval_result.dataset_scores[0].value,
        "algorithm": eval_result.dataset_scores[0].name,
    })
    
    s3_client.put_object(Body=eval_report_str, Bucket=bucket, Key=object_key)

    # Create model_metrics as per evaluation report in s3
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri=eval_report_s3_uri,
            content_type="application/json",
        )
    )

    try:
        sm_client.describe_model_package_group(
            ModelPackageGroupName=model_package_group_name
        )
    except:
        model_package_group_input_dict = {
            "ModelPackageGroupName": model_package_group_name,
            "ModelPackageGroupDescription": model_package_group_description,
        }
        create_model_package_group_response = sm_client.create_model_package_group(
            **model_package_group_input_dict
        )
        print(
            "ModelPackageGroup Arn : {}".format(
                create_model_package_group_response["ModelPackageGroupArn"]
            )
        )
        
    # Register Model
    model_id = evaluate_ret["model_id"]

    if deploy_ret["is_finetuned_model"] is True:
        training_job_name = deploy_ret["training_job_name"]

        estimator = JumpStartEstimator.attach(training_job_name, model_id=model_id)
        model_package = estimator.register(
            model_package_group_name=model_package_group_name,
            content_types=["application/json"],
            response_types=["application/json"],
            customer_metadata_properties={
                "score": str(eval_result.dataset_scores[0].value),
                "algorithm": eval_result.dataset_scores[0].name,
            },
            model_metrics=model_metrics,
        )
    else:
        js_model = JumpStartModel(model_id=model_id)

        if isinstance(js_model.model_data, dict) and 'S3DataSource' in js_model.model_data:
            js_model.model_data = js_model.model_data['S3DataSource']['S3Uri']

        model_package = js_model.register(
            model_package_group_name=model_package_group_name,
            image_uri=js_model.image_uri,
            content_types=["application/json"],
            response_types=["application/json"],
            customer_metadata_properties={
                "score": str(eval_result.dataset_scores[0].value),
                "algorithm": eval_result.dataset_scores[0].name,
            },
            skip_model_validation="All",
            model_metrics=model_metrics,
        )

    model_package_arn = model_package.model_package_arn

    return {"model_package_arn", model_package_arn}

In [None]:
# Evaluation step is using the output from the evaluation steps of all the models
selection_ret = selection(*evaluation_results_ret_list)

# Set a package group name and description
model_package_group_name = "PipelineExampleDFTFinetune"
model_package_group_description = "Pipeline Example DFT Finetune"

# We will register the best model in the model register. The best model name is contained in the return object of the selection step
register_ret = register(selection_ret,
                        deploy_ret,
                        output_data_path,
                        model_package_group_name,
                        model_package_group_description)

### Cleanup
The last pipeline step is dedicated to cleanup all the resource that we are going to instantiate with the pipeline inside a **cleanup** step

In [None]:
@step(name="cleanup")
def cleanup(register_ret, *deploy_rets):
    import boto3

    for deploy_ret in deploy_rets:
        client = boto3.client('sagemaker')
        client.delete_endpoint(EndpointName=deploy_ret["model_endpoint"])
        client.delete_endpoint_config(EndpointConfigName=deploy_ret["model_endpoint"])

    return {"cleanup_done": True}

In [None]:
# We append register_ret to make cleanup step dependent on register step
cleanup_ret = cleanup(register_ret, *deploy_ret_list)

### Creating and launching the pipeline
We are finally ready to create and launch the pipeline but before doing that we will need to create a requirements.txt file.
As a best practice we are reading the current sagemaker library version that we are using to create the pipeline and set it as a requirement into the requirement file.
Keeping the same sagemaker version in the creation and running phase will allow us to avoid any deserialization issues.

In [None]:
if os.path.exists("requirements.txt"):
    os.remove("requirements.txt")

with open('requirements.txt', 'w') as req_file:
    req_file.write("fmeval==0.4.0\n")
    req_file.write("sagemaker==" + str(sagemaker.__version__) + "\n")
    req_file.write("datasets\n")

In the last cell of this notebook we are creating the pipeline and serializing it to S3. 
Don't forget to attach the execution role with sufficient permission and the return results from the last steps of our pipeline.
We are now ready to start the pipeline execution!

In [None]:
from sagemaker import get_execution_role
role = get_execution_role()

pipeline = Pipeline(name=pipeline_name, steps=[cleanup_ret])
pipeline.upsert(role)
pipeline.start()