# Direct Preference Alignment (DPO) of Amazon Nova using Amazon SageMaker Training Job

You can customize Amazon Nova models through base recipes using Amazon SageMaker training jobs. These recipes support Supervised Fine-Tuning (SFT) and Direct Preference Optimization (DPO), with both Full-Rank and Low-Rank Adaptation (LoRA) options.

The end-to-end customization workflow involves stages like model training, model evaluation, and deployment for inference. This model customization approach on SageMaker AI provides greater flexibility and control to fine-tune its supported Amazon Nova models, optimize hyperparameters with precision, and implement techniques including LoRA Parameter-Efficient Fine-Tuning (PEFT), Full-Rank Supervised Fine-Tuning, and Direct Preference Optimization (DPO).


This notebook demonstrates Direct Preference Optimization (DPO) of Amazon Nova using Amazon SageMaker Training Job. DPO is a technique that allows fine-tuning language models based on human preferences, enabling the model to better align with human values and preferences.


> **Note:** This notebook demonstrates fine-tuning using Nova Lite, but the same techniques can be applied to Nova Pro or Nova Micro models with appropriate adjustments to the configuration.

## Installing Dependencies


The first cell installs the required Python packages for this notebook. For more details on other pre-requisites needed check out [AWS Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/nova-model-general-prerequisites.html)

In [1]:
! pip install -r ./requirements.txt --upgrade

Processing ./sagemaker-2.245.1.dev0.tar.gz
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Building wheels for collected packages: sagemaker
  Building wheel for sagemaker (pyproject.toml) ... [?25ldone
[?25h  Created wheel for sagemaker: filename=sagemaker-2.245.1.dev0-py3-none-any.whl size=1651861 sha256=a3200be025a9fd1e2aba119b05d84e5372ca6ffa71734d74198a911e63ebdbe6
  Stored in directory: /home/ec2-user/.cache/pip/wheels/df/ad/4f/d0e921991270f56ea2eb5a16205186ec9076ae1f25fa66df41
Successfully built sagemaker
Installing collected packages: sagemaker
  Attempting uninstall: sagemaker
    Found existing installation: sagemaker 2.245.1.dev0
    Uninstalling sagemaker-2.245.1.dev0:
      Successfully uninstalled sagemaker-2.245.1.dev0
Successfully installed sagemaker-2.245.1.dev0


***

## Step 0: Prerequisites

This section sets up the necessary AWS credentials and SageMaker session to run the notebook. You'll need proper IAM permissions to use SageMaker.


If you are going to use Sagemaker in a local environment. You need access to an IAM Role with the required permissions for Sagemaker. You can find [here](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) more about it.

The code initializes a SageMaker session, sets up the IAM role, and configures the S3 bucket for storing training data and model artifacts.


In [2]:
import sagemaker
import boto3

sess = sagemaker.Session()
sagemaker_session_bucket = None

if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client("iam")
    role = iam.get_role(RoleName="sagemaker_execution_role")["Role"]["Arn"]

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)
bucket_name = sess.default_bucket()
default_prefix = sess.default_bucket_prefix

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

  from pandas.core.computation.check import NUMEXPR_INSTALLED
Unable to load JumpStart region config.
Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/jumpstart/constants.py", line 69, in _load_region_config
    with open(filepath) as f:
FileNotFoundError: [Errno 2] No such file or directory: '/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/jumpstart/region_config.json'


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml
sagemaker role arn: arn:aws:iam::905418197933:role/service-role/AmazonSageMaker-ExecutionRole-20240807T161358
sagemaker bucket: sagemaker-us-east-1-905418197933
sagemaker session region: us-east-1


***

### Direct Prefernce Optimization
#### Preference optimization

Direct Preference Optimization (DPO) is an efficient fine-tuning method for Large Language Models (LLMs) that uses paired comparison data to align model outputs with human preferences. This approach enables direct optimization of model behavior based on human feedback about which responses are more desirable.

#### Why preference optimization matters
LLMs trained on large-scale data often generate outputs that may be factually correct but fail to align with specific user needs, organizational values, or safety requirements. Preference optimization addresses this gap by allowing organizations to:
• Fine-tune models toward desired behavior patterns
• Reduce unwanted outputs or harmful responses
• Align model responses with brand voice and communication guidelines
• Improve response quality based on domain expert feedback

#### How DPO works
DPO uses paired examples where human evaluators have indicated which of two possible responses is preferred. The model learns to maximize the likelihood of generating preferred responses while minimizing undesired ones. You can implement DPO using either:
• Full-rank DPO: Updates all model parameters to optimize for preferences
• LoRA-based DPO: Uses lightweight adapters to learn preference alignments, requiring fewer computational resources

#### When to choose preference optimization
We recommend using DPO under the following circumstances:
• When optimizing for subjective outputs that require alignment with specific human preferences
• When you need to adjust the model’s tone, style, or content characteristics to match desired response patterns
• When making targeted improvements to an existing model based on user feedback and error analysis
• When you need to maintain consistent output quality across different use cases
• When implementing safety guardrails through preferred response patterns

DPO is particularly effective for iterative refinement of model behavior through carefully curated preference datasets that demonstrate desired versus undesired outputs. The method’s flexibility in supporting both full-rank and LoRA-based approaches allows organizations to choose the most suitable implementation based on their computational resources and specific requirements.

![imgs/dpo_sft.png](imgs/dpo_sft.png)

Source: https://arxiv.org/pdf/2305.18290


## Step 1: Prepare the dataset

In this example, we are going to load [nvidia/When2Call](https://huggingface.co/datasets/nvidia/When2Call) dataset, an open-source dataset and model suite focused on enabling and improving function calling capabilities for large language models (LLMs).

### Step 1.1: Data Loading

This code loads the When2Call dataset from Hugging Face, specifically the "train_pref" split which contains preference data needed for DPO.

In [3]:
from datasets import load_dataset

dataset = load_dataset("nvidia/When2Call", "train_pref", split="train")

dataset

README.md: 0.00B [00:00, ?B/s]

when2call_train_pref.jsonl:   0%|          | 0.00/17.4M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/9000 [00:00<?, ? examples/s]

Dataset({
    features: ['tools', 'messages', 'chosen_response', 'rejected_response'],
    num_rows: 9000
})

Converting the dataset to a pandas DataFrame makes it easier to work with and manipulate.


In [4]:
import pandas as pd

df = pd.DataFrame(dataset)

df.head()

Unnamed: 0,tools,messages,chosen_response,rejected_response
0,"[{""name"": ""get_ico_calendar"", ""description"": ""...","[{'role': 'user', 'content': 'Show me complete...","{'role': 'assistant', 'content': '<TOOLCALL>[{...","{'role': 'assistant', 'content': 'Which langua..."
1,"[{""name"": ""monthly_mortgage_payment"", ""descrip...","[{'role': 'user', 'content': 'Find the 99% con...","{'role': 'assistant', 'content': 'Apologies, b...","{'role': 'assistant', 'content': 'To calculate..."
2,"[{""name"": ""rgb_to_cmyk"", ""description"": ""Conve...","[{'role': 'user', 'content': 'Convert RGB colo...","{'role': 'assistant', 'content': '<TOOLCALL>[{...","{'role': 'assistant', 'content': 'The CMYK equ..."
3,"[{""name"": ""whole_foods_order"", ""description"": ...","[{'role': 'user', 'content': 'Place a small or...","{'role': 'assistant', 'content': '<TOOLCALL>[{...","{'role': 'assistant', 'content': 'Which type o..."
4,"[{""name"": ""get_channels"", ""description"": ""Retr...","[{'role': 'user', 'content': 'I am interested ...","{'role': 'assistant', 'content': 'To assist yo...","{'role': 'assistant', 'content': '<TOOLCALL>[{..."


### Step 1.2: Train/Val/Test Split

The dataset is split into training (72%), validation (18%), and test (10%) sets to properly evaluate the model. 

In [5]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(df, test_size=0.1, random_state=42)
train, val = train_test_split(train, test_size=0.02, random_state=42)

print("Number of train elements: ", len(train))
print("Number of test elements: ", len(test))
print("Number of val elements: ", len(val))

Number of train elements:  7938
Number of test elements:  900
Number of val elements:  162


### Understanding the Nova Format for PPO

Let's format the dataset by using the prompt style for Amazon Nova:

```
{
    "system": [{"text": Content of the System prompt}],
    "messages": [
        {
            "role": "user",
            "content": ["text": Content of the user prompt]
        },
        {
            "role": "assistant",
            "content": ["text": Content of the answer]
        },
        ...
        {
            "role": "assistant",
            "candidates": [
                {
                    "content": ["text": Content of the answer, "preferenceLabel": "preferred"],
                    "content": ["text": Content of the answer, "preferenceLabel": "non-preferred"]
                }
            ]
        },
    ]
}
```

### Step 1.3: Data Preprocessing 
The notebook defines utility functions to clean the dataset content by removing prefixes and handling special cases:

```python
def clean_prefix(content):
    # Removes prefixes like "USER:", "ASSISTANT:", etc.
    ...

def clean_message_list(message_list):
    # Cleans message lists from None values and converts to proper format
    ...

def clean_numbered_conversation(message_list):
    # Cleans message lists from None values and converts to proper format
    ...
```

In [6]:
import json
import re


def clean_prefix(content):
    """Remove prefixes from content, according to Nova data_validator"""
    prefixes = [
        "SYSTEM:",
        "System:",
        "USER:",
        "User:",
        "ASSISTANT:",
        "Assistant:",
        "Bot:",
        "BOT:",
    ]

    # Handle array case (list of content items)
    if hasattr(content, "__iter__") and not isinstance(content, str):
        for i, item in enumerate(content):
            if isinstance(item, dict) and "text" in item:
                text = item["text"]
                if isinstance(text, str):
                    # Clean line by line for multi-line text
                    lines = text.split("\n")
                    cleaned_lines = []
                    for line in lines:
                        cleaned_line = line.strip()
                        for prefix in prefixes:
                            if cleaned_line.startswith(prefix):
                                cleaned_line = cleaned_line[len(prefix) :].strip()
                                break
                        cleaned_lines.append(cleaned_line)
                    item["text"] = "\n".join(cleaned_lines)
        return content

    # Handle string case
    if isinstance(content, str):
        lines = content.split("\n")
        cleaned_lines = []
        for line in lines:
            cleaned_line = line.strip()
            for prefix in prefixes:
                if cleaned_line.startswith(prefix):
                    cleaned_line = cleaned_line[len(prefix) :].strip()
                    break
            cleaned_lines.append(cleaned_line)
        return "\n".join(cleaned_lines)

    return content


def clean_message_list(message_list):
    """Clean message list from None values and convert to list of dicts if needed."""
    if isinstance(message_list, str):
        message_list = json.loads(message_list)

    tmp_cleaned = []
    for msg in message_list:
        new_msg = {}
        for key, value in msg.items():
            if key in ["candidates", "content"]:
                if value is None or str(value).lower() == "None":
                    continue
            new_msg[key] = value
        tmp_cleaned.append(new_msg)

    cleaned = []
    for item in tmp_cleaned:
        if item["role"] == "assistant":
            # Clean prefixes from candidates content
            if "candidates" in item:
                candidates = item["candidates"]
                for candidate in candidates:
                    if isinstance(candidate, dict) and "content" in candidate:
                        content = candidate["content"]
                        for content_item in content:
                            if (
                                isinstance(content_item, dict)
                                and "text" in content_item
                            ):
                                # First clean numbered conversation format
                                text = clean_numbered_conversation(content_item["text"])
                                # Then clean regular prefixes
                                content_item["text"] = clean_prefix(text)
            cleaned.append({"role": item["role"], "candidates": item["candidates"]})
        else:
            content = item["content"]
            for content_item in content:
                if isinstance(content_item, dict) and "text" in content_item:
                    text = clean_numbered_conversation(content_item["text"])
                    content_item["text"] = clean_prefix(text)
            cleaned.append({"role": item["role"], "content": content})

    return cleaned


# Additional function to specifically handle the numbered conversation format
def clean_numbered_conversation(text):
    """Clean numbered conversation format like '1. User: ...'"""
    if not isinstance(text, str):
        return text

    # Pattern to match numbered items with User: or Assistant: prefixes
    pattern = r"(\d+\.\s*)(User:|Assistant:)\s*"

    # Replace the pattern, keeping the number but removing the role prefix
    cleaned_text = re.sub(pattern, r"\1", text)

    return cleaned_text

Now let's define the functions to parse the datasets

### Dataset Parsing Functions

These functions transform the dataset into the format required by Nova models, handling tool calls and formatting:

```python
def transform_tool_format(tool):
    # Transforms tool format to Nova's expected format
    ...

def extract_toolcall_content(text):
    # Extracts content between <TOOLCALL> tags
    ...

def prepare_dataset(sample):
    # Prepares dataset in the required format for Nova models
    ...

def prepare_dataset_test(sample):
    # Formats validation dataset for evaluation
    ...
```

In [9]:
import json
import re


def transform_tool_format(tool):
    """Transform tool from old format to Nova format."""
    if isinstance(tool, str):
        tool = json.loads(tool)

    return {
        "toolSpec": {
            "name": tool["name"],
            "description": tool["description"],
            "inputSchema": {"json": tool["parameters"]},
        }
    }


def extract_toolcall_content(text):
    """Extract content between <TOOLCALL> tags if present."""
    if isinstance(text, dict):
        if text.get("content"):
            text = text.get("content")

    if "<TOOLCALL>" in text and "</TOOLCALL>" in text:
        pattern = r"<TOOLCALL>(.*?)</TOOLCALL>"
        match = re.search(pattern, text, re.DOTALL)
        if match:
            tool_calls_text = []
            if isinstance(match.group(1), str):
                tools = json.loads(match.group(1))

            for tool_call in tools:
                arguments = (
                    json.loads(tool_call["arguments"])
                    if isinstance(tool_call["arguments"], str)
                    else tool_call["arguments"]
                )
                tool_call_json = {
                    "name": tool_call["name"],
                    "parameters": arguments,
                }
                tool_calls_text.append(json.dumps(tool_call_json))

            return "".join(tool_calls_text)
    return text


def prepare_dataset(sample):
    """Prepare dataset in the required format for Nova models"""
    # Add user messages
    result = {"system": [], "messages": []}

    if isinstance(sample["tools"], str):
        tools = json.loads(sample["tools"]) if sample.get("tools") else []
    else:
        tools = sample["tools"]

    transformed_tools = [transform_tool_format(tool) for tool in tools]

    # Add system message with tools if tools exist
    if transformed_tools:
        tools_dict = {"tools": transformed_tools}
        system_text = (
            "You may call one or more functions to assist with the user query.\n\n"
            "You are provided with function signatures within <tools></tools> XML tags:\n"
            "<tools>\n"
            f"{json.dumps(tools_dict)}\n"
            "</tools>\n\n"
            "For each function call, return a json object with function name and parameters:\n"
            '{"name": function name, "parameters": dictionary of argument name and its value}'
        )
        result["system"] = [{"text": system_text}]

    for message in sample.get("messages", []):
        result["messages"].append(
            {
                "role": message["role"],
                "content": [{"text": extract_toolcall_content(message["content"])}],
            }
        )

    chosen = extract_toolcall_content(sample["chosen_response"])
    rejected = extract_toolcall_content(sample["rejected_response"])

    result["messages"].append(
        {
            "role": "assistant",
            "candidates": [
                {"content": [{"text": chosen}], "preferenceLabel": "preferred"},
                {"content": [{"text": rejected}], "preferenceLabel": "non-preferred"},
            ],
        }
    )

    return result

In [10]:
def prepare_dataset_test(sample):
    """Parse sample and format it for validation dataset."""
    # Process tools upfront
    if isinstance(sample["tools"], str):
        tools = json.loads(sample["tools"]) if sample.get("tools") else []
    else:
        tools = sample["tools"]

    transformed_tools = [transform_tool_format(tool) for tool in tools]

    # Initialize variables
    system_content = ""
    current_input = ""

    # Add system message with tools if tools exist
    if transformed_tools:
        system_content = (
            "You may call one or more functions to assist with the user query.\n\n"
            "You are provided with function signatures within <tools></tools> XML tags:\n"
            "<tools>\n"
            f"{json.dumps({'tools': transformed_tools})}\n"
            "</tools>\n\n"
            "For each function call, return a json object with function name and parameters:\n"
            '{"name": function name, "parameters": dictionary of argument name and its value}'
        )

    for message in sample.get("messages", []):
        if message["role"] == "user":
            current_input = (
                current_input
                + "\n##User: "
                + extract_toolcall_content(message["content"])
            )
        else:
            current_input = (
                current_input
                + "\n##Assistant: "
                + extract_toolcall_content(message["content"])
            )

    if current_input.startswith("\n"):
        current_input = current_input[1:]

    current_input = current_input.strip()

    chosen = extract_toolcall_content(sample["chosen_response"])

    return {"system": system_content, "query": current_input, "response": chosen}

### Step 1.4: Data Preperation in Converse Format for Train and Validation Datasets

The notebook applies the functions to transform the datasets into the required formats:


In [11]:
from datasets import Dataset, DatasetDict
from random import randint

train_dataset = Dataset.from_pandas(train)
test_dataset = Dataset.from_pandas(test)
val_dataset = Dataset.from_pandas(val)

dataset = DatasetDict(
    {"train": train_dataset, "test": test_dataset, "val": val_dataset}
)

train_dataset = dataset["train"].map(
    prepare_dataset, remove_columns=train_dataset.features
)

train_dataset = train_dataset.to_pandas()

train_dataset["messages"] = train_dataset["messages"].apply(clean_message_list)

print(train_dataset.iloc[randint(0, len(train_dataset))].to_json())

val_dataset = dataset["val"].map(
    prepare_dataset, remove_columns=test_dataset.features
)

val_dataset = val_dataset.to_pandas()

val_dataset["messages"] = val_dataset["messages"].apply(clean_message_list)

Map:   0%|          | 0/7938 [00:00<?, ? examples/s]

{"messages":[{"role":"user","content":[{"text":"Count the occurrences of each fruit in the given list."}]},{"role":"assistant","candidates":[{"content":[{"text":"To assist you better, could you please provide me with the list of fruits?"}],"preferenceLabel":"preferred"},{"content":[{"text":"I'm sorry for the inconvenience, but I'm currently unable to perform tasks such as counting occurrences in a list."}],"preferenceLabel":"non-preferred"}]}],"system":[{"text":"You may call one or more functions to assist with the user query.\n\nYou are provided with function signatures within <tools><\/tools> XML tags:\n<tools>\n{\"tools\": [{\"toolSpec\": {\"name\": \"get_range\", \"description\": \"Helper function to format the range string.\", \"inputSchema\": {\"json\": {\"type\": \"dict\", \"properties\": {\"start\": {\"description\": \"The start of the range.\", \"type\": \"int\"}, \"end\": {\"description\": \"The end of the range.\", \"type\": \"int\"}}}}}}, {\"toolSpec\": {\"name\": \"count_o

Map:   0%|          | 0/162 [00:00<?, ? examples/s]

### Step 1.5: Data Preperation on test data for Offline Evaluation post fine tuning

Let's format the test dataset in the format:

Required Fields:

* query: String containing the question or instruction that needs an answer
* response: String containing the expected model output

Optional Fields:

* system: String containing the system prompt that sets the behavior, role, or personality of the AI model before it processes the query

Example Entry
```

{
   "system":"You are a english major with top marks in class who likes to give minimal word responses: ",
   "query":"What is the symbol that ends the sentence as a question",
   "response":"?"
}
{
   "system":"You are a pattern analysis specialist that provides succinct answers: ",
   "query":"What is the next number in this series? 1, 2, 4, 8, 16, ?",
   "response":"32"
}
{
   "system":"You have great attention to detail that follows instructions accurately: ",
   "query":"Repeat only the last two words of the following: I ate a hamburger today and it was kind of dry",
   "response":"of dry"
}
```

In [12]:
test_dataset = dataset["test"].map(
    prepare_dataset_test, remove_columns=test_dataset.features
)

print(test_dataset[randint(0, len(test_dataset))])

Map:   0%|          | 0/900 [00:00<?, ? examples/s]

{'system': 'You may call one or more functions to assist with the user query.\n\nYou are provided with function signatures within <tools></tools> XML tags:\n<tools>\n{"tools": [{"toolSpec": {"name": "search_gifs", "description": "Search for gifs based on a query using the Humor API.", "inputSchema": {"json": {"type": "dict", "properties": {"query": {"description": "The search query to find matching gifs.", "type": "str", "default": "cat"}, "number": {"description": "The number of results to retrieve, between 1 and 10. Defaults to 3.", "type": "int, optional", "default": 3}}}}}}]}\n</tools>\n\nFor each function call, return a json object with function name and parameters:\n{"name": function name, "parameters": dictionary of argument name and its value}', 'query': '##User: I need 5 gifs of cute puppies.', 'response': '{"name": "search_gifs", "parameters": {"query": "cute puppies", "number": 5}}'}


### Step 1.6: Upload all 3 curated datasets (train, test, val) to Amazon S3

The notebook applies the functions to transform the datasets into the required formats


The processed datasets are saved locally and then uploaded to Amazon S3 for use in SageMaker training:



In [13]:
import boto3
import shutil

In [14]:
s3_client = boto3.client("s3")

# save train_dataset to s3 using our SageMaker session
if default_prefix:
    input_path = f"{default_prefix}/datasets/nova-dpo"
else:
    input_path = f"datasets/nova-dpo"

train_dataset_s3_path = f"s3://{bucket_name}/{input_path}/train/dataset.jsonl"
val_dataset_s3_path = f"s3://{bucket_name}/{input_path}/val/dataset.jsonl"
test_dataset_s3_path = f"s3://{bucket_name}/{input_path}/test/gen_qa.jsonl"

In [15]:
import os

# Save datasets to s3
os.makedirs("./data/train", exist_ok=True)
os.makedirs("./data/test", exist_ok=True)
os.makedirs("./data/val", exist_ok=True)

train_dataset.to_json("./data/train/dataset.jsonl", orient="records", lines=True)
val_dataset.to_json("./data/val/dataset.jsonl", orient="records", lines=True)
test_dataset.to_json("./data/test/gen_qa.jsonl")

s3_client.upload_file(
    "./data/train/dataset.jsonl", bucket_name, f"{input_path}/train/dataset.jsonl"
)

s3_client.upload_file(
    "./data/val/dataset.jsonl", bucket_name, f"{input_path}/val/dataset.jsonl"
)

s3_client.upload_file(
    "./data/test/gen_qa.jsonl", bucket_name, f"{input_path}/test/gen_qa.jsonl"
)

shutil.rmtree("./data")

print(f"Training data uploaded to:")
print(train_dataset_s3_path)
print(test_dataset_s3_path)
print(val_dataset_s3_path)

Creating json from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Training data uploaded to:
s3://sagemaker-us-east-1-905418197933/datasets/nova-dpo/train/dataset.jsonl
s3://sagemaker-us-east-1-905418197933/datasets/nova-dpo/test/gen_qa.jsonl
s3://sagemaker-us-east-1-905418197933/datasets/nova-dpo/val/dataset.jsonl


***

## Step 2: Model fine-tuning

We now define the PyTorch estimator to run the supervised fine-tuning on a tool-calling dataset for our Amazon Nova model

This section sets up and runs the fine-tuning job using SageMaker. It uses Supervised Fine-Tuning (SFT) with Parameter-Efficient Fine-Tuning (PEFT) to efficiently train the model.


#### Instance Type and Count

P5 instances are optimized for deep learning workloads, providing high-performance GPUs.


In [16]:
instance_type = "ml.p5.48xlarge"
instance_count = 2

instance_type

'ml.p5.48xlarge'

#### Image URI

This specifies the pre-built container for SFT fine-tuning, which is different from the DPO container.


In [17]:
image_uri = f"708977205387.dkr.ecr.{sess.boto_region_name}.amazonaws.com/nova-fine-tune-repo:SM-TJ-DPO-latest"

image_uri

'708977205387.dkr.ecr.us-east-1.amazonaws.com/nova-fine-tune-repo:SM-TJ-DPO-latest'

#### Configuring the Model and Recipe

This specifies which model to fine-tune and the recipe to use. The recipe includes "lora" indicating parameter-efficient fine-tuning, and "sft" indicating supervised fine-tuning.


In [35]:
model_id = "nova-micro/prod"
recipe = "fine-tuning/nova/nova_micro_p5_gpu_dpo"

#### PyTorch Estimator

This creates a PyTorch estimator with the configuration to run the training job.


In [41]:
from sagemaker.pytorch import PyTorch

# define Training Job Name
job_name = f"train-{model_id.split('/')[0].replace('.', '-')}-dpo"

# define OutputDataConfig path
if default_prefix:
    output_path = f"s3://{bucket_name}/{default_prefix}/{job_name}"
else:
    output_path = f"s3://{bucket_name}/{job_name}"

recipe_overrides = {
    "run": {
        "replicas": instance_count,  # Required
    },
    "training_config": {"trainer": {"max_epochs": 1}},
}

estimator = PyTorch(
    output_path=output_path,
    base_job_name=job_name,
    role=role,
    instance_count=instance_count,
    instance_type=instance_type,
    training_recipe=recipe,
    recipe_overrides=recipe_overrides,
    max_run=432000,
    sagemaker_session=sess,
    image_uri=image_uri,
    disable_profiler=True,
    debugger_hook_config=False,
)

Using instance_count argument to estimator to set number of nodes. Ignoring run -> replicas in recipe.


#### Configuring the Data Channels

Configure the Data Channels

In [42]:
from sagemaker.inputs import TrainingInput

train_input = TrainingInput(
    s3_data=train_dataset_s3_path,
    distribution="FullyReplicated",
    s3_data_type="Converse",
)

val_input = TrainingInput(
    s3_data=val_dataset_s3_path,
    distribution="FullyReplicated",
    s3_data_type="Converse",
)

In [43]:
# starting the train job with our uploaded datasets as input
estimator.fit(inputs={"train": train_input, "validation": val_input}, wait=False)

INFO:sagemaker:Creating training-job with name: train-nova-micro-dpo-2025-07-08-22-02-05-132


In [44]:
training_job_name = estimator.latest_training_job.name
print('Training Job Name:  {}'.format(training_job_name))

Training Job Name:  train-nova-micro-dpo-2025-07-08-22-02-05-132


In [45]:
from IPython.display import HTML, Markdown, Image
display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/jobs/{}">Training Job</a> After About 5 Minutes</b>'.format("us-east-1", training_job_name)))




from IPython.display import HTML, Markdown, Image
display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/TrainingJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a> After About 5 Minutes</b>'.format("us-east-1", training_job_name)))




from IPython.display import HTML, Markdown, Image
display(HTML('<b>Review <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview">S3 Output Data</a> After The Training Job Has Completed</b>'.format(bucket_name, training_job_name, "us-east-1")))



### Reading the Output Content after training job completion

In [None]:
model_s3_uri = estimator.model_data
print(model_s3_uri)

output_s3_uri = "/".join(model_s3_uri.split("/")[:-1])+"/output.tar.gz"

### Downloading and Extracting the Artifacts

In [None]:
!mkdir -p ./tmp/train_output/

In [None]:
!aws s3 cp $output_s3_uri ./tmp/train_output/output.tar.gz

In [None]:
!tar -xvzf ./tmp/train_output/output.tar.gz -C ./tmp/train_output/

In [None]:
escrow_model_uri = json.load(open('./tmp/train_output/manifest.json'))['checkpoint_s3_bucket']

In [None]:
escrow_model_uri

### Plotting the Train/Loss Curve 

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Read the CSV files
train_df = pd.read_csv('./tmp/train_output/step_wise_training_metrics.csv')
val_df = pd.read_csv('./tmp/train_output/validation_metrics.csv')

# Create the plot
plt.figure(figsize=(10, 6))
plt.plot(train_df['step_number'], train_df['training_loss'], label='Training Loss', color='blue')
plt.plot(val_df['step_number'], val_df['validation_loss'], label='Validation Loss', color='red')

plt.xlabel('Step Number')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

***

## Model evaluation

Create minimal recipe for `gen_qa` evaluation. With `gen_qa` evaluation, we bring our own dataset for evaluation, and measure the following metrics:

* rouge1
* rouge2
* rougeL
* exact_match
* quasi_exact_match
* f1_score
* f1_score_quasi
* bleu

Your fine-tuned model checkpoints are accessible through the `manifest.json` in the output.tar.gz

In [None]:
recipe_job_name = "nova-lite-gen_qa-eval-job"

recipe_content = f"""
run:
  name: {recipe_job_name}
  model_type: amazon.nova-lite-v1:0:300k
  model_name_or_path: {escrow_model_uri}
  data_s3_path: "" # Empty string

evaluation:
  task: gen_qa
  strategy: gen_qa
  metric: all

inference:
  max_new_tokens: 4096
  top_p: 0.9
  temperature: 0.1
"""

with open("eval-recipe.yaml", "w") as f:
  f.write(recipe_content)

### Instance count and Instance Type

Defines the Instance type and count to use for Evaluation 

In [None]:
instance_type = "ml.g5.12xlarge" # Override the instance type if you want to get a different container version
instance_count = 1

instance_type

#### Image URI for Evaluation

This specifies the pre-built container for Evaluation, which is different from the SFT container.


In [None]:
image_uri = f"708977205387.dkr.ecr.{sess.boto_region_name}.amazonaws.com/nova-evaluation-repo:SM-TJ-Eval-latest"

image_uri

#### Configuring the Model and Recipe

This specifies which model evaluation to use.


In [None]:
model_id = "nova-lite/prod"
recipe = "./eval-recipe.yaml"

#### PyTorch Estimator

This creates a PyTorch estimator with the configuration to run the evaluation job.


In [None]:
from sagemaker.pytorch import PyTorch

# define Training Job Name
job_name = f"train-{model_id.split('/')[0].replace('.', '-')}-dpo-eval"

# define OutputDataConfig path
if default_prefix:
    output_path = f"s3://{bucket_name}/{default_prefix}/{job_name}"
else:
    output_path = f"s3://{bucket_name}/{job_name}"

recipe_overrides = {
    "run": {
        "replicas": instance_count,  # Required
    },
}

eval_estimator = PyTorch(
    output_path=output_path,
    base_job_name=job_name,
    role=role,
    instance_count=instance_count,
    instance_type=instance_type,
    training_recipe=recipe,
    recipe_overrides=recipe_overrides,
    max_run=432000,
    sagemaker_session=sess,
    image_uri=image_uri,
    disable_profiler=True,
    debugger_hook_config=False,
)

### Configuring the Data Channel

In [None]:
from sagemaker.inputs import TrainingInput

eval_input = TrainingInput(
    s3_data=test_dataset_s3_path,
    distribution="FullyReplicated",
    s3_data_type="S3Prefix",
)

eval_input

### Starting the Training Job
This starts the training job with the configured estimator and datasets. Note that it uses the test dataset for validation during training.


In [None]:
# starting the train job with our uploaded datasets as input
eval_estimator.fit(inputs={"train": eval_input}, wait=False)

### Viewing the Evaluation Artifacts 
Downloading the artifact from Evaluation. 


In [None]:
output = eval_estimator.model_data

In [None]:
output = '/'.join(output.split("/")[:-1]) +"/output.tar.gz"

In [None]:
! aws s3 cp $output .

### Visualize results

The notebook defines a function to visualize the evaluation metrics in a bar chart:


In [None]:
import tarfile
import os
tarfile.open('output.tar.gz', 'r:gz').extractall('output_folder')

In [None]:
results_path = "output_folder/" + recipe_job_name +"/eval_results"

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os


def plot_metrics(results):
    # Extract metrics and their standard errors
    metrics = {}
    for key, value in results.items():
        if not key.endswith("_stderr"):
            metrics[key] = {"value": value, "stderr": results.get(f"{key}_stderr", 0)}

    # Sort metrics by value for better visualization
    sorted_metrics = dict(
        sorted(metrics.items(), key=lambda x: x[1]["value"], reverse=True)
    )

    # Prepare data for plotting
    labels = list(sorted_metrics.keys())
    values = [sorted_metrics[label]["value"] for label in labels]
    errors = [sorted_metrics[label]["stderr"] for label in labels]

    # Normalize BLEU score to be on the same scale as other metrics (0-1)
    bleu_index = labels.index("bleu") if "bleu" in labels else -1
    if bleu_index >= 0:
        values[bleu_index] /= 100
        errors[bleu_index] /= 100

    # Create figure
    fig, ax = plt.subplots(figsize=(12, 8))

    # Create bar chart
    x = np.arange(len(labels))
    bars = ax.bar(
        x,
        values,
        yerr=errors,
        align="center",
        alpha=0.7,
        capsize=5,
        color="skyblue",
        ecolor="black",
    )

    # Add labels and title
    ax.set_ylabel("Score")
    ax.set_title("Evaluation Metrics")
    ax.set_xticks(x)
    ax.set_xticklabels(labels, rotation=45, ha="right")
    ax.set_ylim(0, 1.0)

    # Add value labels on top of bars
    for i, bar in enumerate(bars):
        height = bar.get_height()
        # Convert BLEU back to its original scale for display
        display_value = values[i] * 100 if labels[i] == "bleu" else values[i]
        ax.text(
            bar.get_x() + bar.get_width() / 2.0,
            height + 0.01,
            f"{display_value:.2f}",
            ha="center",
            va="bottom",
        )

    # Add a note about BLEU
    if bleu_index >= 0:
        ax.text(
            0.5,
            -0.15,
            "Note: BLEU score shown as percentage (original: {:.2f})".format(
                values[bleu_index] * 100
            ),
            transform=ax.transAxes,
            ha="center",
            fontsize=9,
        )

    plt.tight_layout()
    return fig

In [None]:
import glob
import os

def find_json_files(path):
    return glob.glob(os.path.join(path, "*.json"))

In [None]:
evaluation_results_path = find_json_files(results_path)[0]

### Visualize results

In [None]:
import json

with open(evaluation_results_path, "r") as f:
    data = json.load(f)

fig = plot_metrics(data["results"]["all"])

output_file = os.path.join("./", 'evaluation_metrics.png')
fig.savefig(output_file, bbox_inches='tight')

***

## Model deployment and inference

After training and evaluating our model, we want to make it available for inference. Amazon Bedrock provides a serverless endpoint for model deployment, allowing us to serve the model without managing infrastructure.

The Bedrock Custom Model feature of Amazon Bedrock lets us import our fine-tuned model and access it through the same API as other foundation models. This provides:

In [None]:
import boto3

# Initialize the Bedrock client
bedrock = boto3.client("bedrock", region_name=sess.boto_region_name)


model_path = escrow_model_uri

# Define name for imported model
imported_model_name = "nova-lite-sagemaker-dpo"

### Creating the Bedrock Custom Model

In [None]:
request_params = {
    "modelName": imported_model_name,
    "modelSourceConfig": {"s3DataSource": {"s3Uri": model_path}},
    "roleArn": role,
    "clientRequestToken": "NovaRecipeSageMaker",
}

# Create the model import job
response = bedrock.create_custom_model(**request_params)

model_arn = response["modelArn"]

# Output the model ARN
print(f"Model import job created with ARN: {model_arn}")

### Monitoring the Model status

After initiating the model import, we need to monitor its progress. The status goes through several states:

* CREATING: Model is being imported
* ACTIVE: Import successful
* FAILED: Import encountered errors

This cell polls the Bedrock API every 60 seconds to check the status of the model import, continuing until it reaches a terminal state (ACTIVE or FAILED). Once the import completes successfully, we'll have the model ARN which we can use for inference.

In [None]:
from IPython.display import clear_output
import time

# Check CMI job status
while True:
    response = bedrock.list_custom_models(sortBy="CreationTime", sortOrder="Descending")
    model_summaries = response["modelSummaries"]
    status = ""
    for model in model_summaries:
        if model["modelName"] == imported_model_name:
            status = model["modelStatus"].upper()
            model_arn = model["modelArn"]
            print(f'{model["modelStatus"].upper()} {model["modelArn"]} ...')
            if status in ["ACTIVE", "FAILED"]:
                break
    if status in ["ACTIVE", "FAILED"]:
        break
    clear_output(wait=True)
    time.sleep(10)
    
    model_arn

##### ⚠️ After the model is ACTIVE, create provisioned throughput before running the inference!

Please refer to the official [AWS Documentation](https://docs.aws.amazon.com/bedrock/latest/userguide/prov-thru-purchase.html)

### Testing the Deployed Model

Now that our model is deployed to Amazon Bedrock, we can invoke it for inference. We'll set up the necessary clients and functions to interact with our model through the Bedrock Runtime API.

Inference Setup Components:
* Bedrock Runtime Client: AWS SDK client for making inference calls
* Helper Function: To handle retry logic and properly format requests
The generate function we're defining:

Applies the proper chat template to user messages
* Handles retry logic for robustness
* Sets appropriate generation parameters like temperature and top-p

This setup allows us to easily test how well our training worked by sending queries to the model and evaluating its responses.

In [None]:
import boto3
from botocore.config import Config


# Initialize Bedrock Runtime client
session = boto3.Session()
client = session.client(
    service_name="bedrock-runtime",
    region_name=sess.boto_region_name,
    config=Config(
        connect_timeout=300,  # 5 minutes
        read_timeout=300,  # 5 minutes
        retries={"max_attempts": 3},
    ),
)

In [None]:
def generate(
    model_id,
    messages,
    system_prompt=None,
    tools=None,
    temperature=0.3,
    max_tokens=4096,
    top_p=0.9,
    max_retries=10,
):
    """
    Generate response using the model with proper tokenization and retry mechanism

    Parameters:
        model_id (str): ID of the model to use
        messages (list): List of message dictionaries with 'role' and 'content'
        system_prompt (str, optional): System prompt to guide the model
        tools (dict, optional): Tool configuration for the model
        temperature (float): Controls randomness in generation (0.0-1.0)
        max_tokens (int): Maximum number of tokens to generate
        top_p (float): Nucleus sampling parameter (0.0-1.0)
        max_retries (int): Maximum number of retry attempts

    Returns:
        dict: Model response containing generated text and metadata
    """
    # Prepare base parameters for the API call
    kwargs = {
        "inferenceConfig": {
            "temperature": temperature,
            "maxTokens": max_tokens,
            "topP": top_p,
        },
    }

    # Add optional parameters if provided
    if tools:
        kwargs["toolConfig"] = tools
    if system_prompt:
        kwargs["system"] = [{"text": system_prompt}]

    # Retry logic
    for attempt in range(max_retries):
        try:
            return client.converse(modelId=model_id, messages=messages, **kwargs)
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(30)
            else:
                print("Max retries reached. Unable to get response.")
                print(str(e))
                return None

In [None]:
import json

model_arn = "<PROVISIONED_THROUGHPUT_ARN>"

system_prompt = f"""
You are a helpful AI assistant that can answer questions and provide information.
You can use tools to help you with your tasks.

You have access to the following tools:

<tools>
{{tools}}
</tools>
For each function call, return a json object with function name and parameters:

{{{{\"name\": \"function name\", \"parameters\": \"dictionary of argument name and its value\"}}}}
"""

tools = [
    {
        "toolSpec": {
            "name": "calculate_bmi",
            "description": "Calculate BMI given weight in kg and height in meters",
            "inputSchema": {
                "json": {
                    "type": "object",
                    "properties": {
                        "weight_kg": {
                            "type": "number",
                            "description": "Property weight_kg",
                        },
                        "height_m": {
                            "type": "number",
                            "description": "Property height_m",
                        },
                    },
                    "required": ["weight_kg", "height_m"],
                },
            },
        }
    },
    {
        "toolSpec": {
            "name": "fetch_weather",
            "description": 'Fetch weather information\n\nArgs:\nquery: The weather query (e.g., "weather in New York")\nnum_results: Number of results to return (default: 1)\n\nReturns:\nJSON string containing weather information',
            "inputSchema": {
                "json": {
                    "type": "object",
                    "properties": {
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "Property query",
                            },
                            "num_results": {
                                "type": "integer",
                                "description": "Property num_results",
                            },
                        },
                        "required": ["query"],
                    },
                },
            },
        }
    },
]

system_prompt = system_prompt.format(tools=json.dumps({"tools": tools}))

messages = [
    {"role": "user", "content": [{"text": "What is the weather in Rome, Italy?"}]},
]

response = generate(
    model_id=model_arn,
    system_prompt=system_prompt,
    messages=messages,
    temperature=0.1,
    top_p=0.9,
)

response["output"]