In [None]:
import os
#from azure.ai.ml import MLClient, Input, MpiDistribution, command
from azure.ai.ml import MLClient, Input, Output, PyTorchDistribution, command
from azure.ai.ml.entities import (
    AmlCompute, Environment, BuildContext, Data,
    ManagedOnlineEndpoint, ManagedOnlineDeployment, CodeConfiguration, OnlineRequestSettings
)
from azure.identity import DefaultAzureCredential
from azure.ai.ml.constants import AssetTypes
import datetime

from dotenv import load_dotenv
load_dotenv(override=True)

# Azure ML workspace configuration
SUBSCRIPTION_ID = os.getenv("SUBSCRIPTION_ID")
RESOURCE_GROUP = os.getenv("RESOURCE_GROUP")
WORKSPACE_NAME = os.getenv("WORKSPACE_NAME")
COMPUTE_CLUSTER = "demo-gpucluster01"
HF_TOKEN = os.getenv("HF_TOKEN")


# authentication via managed identity or service principal (no hard-coded creds)
ml_client = MLClient(DefaultAzureCredential(), SUBSCRIPTION_ID, RESOURCE_GROUP, WORKSPACE_NAME)

# ensure compute cluster exists or create it
try:
    ml_client.compute.get(COMPUTE_CLUSTER)
except Exception:
    print("demo-gpucluster01 was not found")

In [None]:
data_uri = "./dpo_pairs.parquet"

data = Data(
    path = data_uri,
    type = AssetTypes.URI_FILE,
    description = "dpo dataset for gpt-oss-20b",
    name = "dpo_pairs",
    version = '1'
)
ml_client.data.create_or_update(data)


<h5> Training code

In [None]:
%%writefile ./src/dpo.py
import os
import argparse
import json
import torch

from datasets import load_dataset
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    Mxfp4Config,
)
from peft import LoraConfig, get_peft_model, PeftModel
from trl import DPOTrainer, DPOConfig


def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--train_file", type=str, required=True,
                   help="Converted Harmony (or DPO) data (parquet or jsonl)")
    parser.add_argument("--output_dir", type=str, default="./outputs",
                   help="Output directory")
    parser.add_argument("--push_to_hub", action="store_true",
                   help="Specify to push to Hugging Face Hub")
    parser.add_argument("--hub_model_id", type=str, default=None,
                   help="Model id on the Hub (org/name). If omitted, uses output_dir name")
    parser.add_argument("--hf_token", type=str, default=None,
                   help="Hugging Face access token (or via env var HF_TOKEN)")
    parser.add_argument("--lr", type=float, default=2e-4)
    parser.add_argument("--epochs", type=int, default=1)
    parser.add_argument("--per_device_train_batch_size", type=int, default=2)
    parser.add_argument("--grad_accum_steps", type=int, default=8)
    parser.add_argument("--max_seq_len", type=int, default=2048)
    parser.add_argument("--warmup_ratio", type=float, default=0.03)
    parser.add_argument("--cosine_min_lr_rate", type=float, default=0.1)
    parser.add_argument("--logging_steps", type=int, default=10)
    parser.add_argument("--seed", type=int, default=42)
    return parser.parse_args()


def load_dpo_dataset(train_file):
    train_file = os.path.abspath(train_file)
    if train_file.endswith(".parquet"):
        ds = load_dataset("parquet", data_files=train_file)["train"]
    elif train_file.endswith(".jsonl") or train_file.endswith(".json"):
        ds = load_dataset("json", data_files=train_file, split="train")
    else:
        raise ValueError("train_file must be .parquet or .jsonl")

    needed = {"prompt", "chosen", "rejected"}
    if not needed.issubset(ds.column_names):
        raise ValueError(f"DPO requires columns {needed}. Found: {ds.column_names}")
    return ds


def ensure_chat_form(ds, tokenizer):
    def _map(ex):
        p = ex["prompt"]
        # Skip if already templated
        if any(m in p for m in ("<|assistant|>", "<|user|>", "<|system|>", "<|im_start|>", "<|start_header_id|>")):
            return {"prompt": p}
        msgs = [{"role": "user", "content": p}]
        templated = tokenizer.apply_chat_template(
            msgs, tokenize=False, add_generation_prompt=True
        )
        return {"prompt": templated}
    return ds.map(_map)


def main():
    args = get_args()

    # Hub token (needed only when pushing)
    hf_token = args.hf_token or os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACEHUB_API_TOKEN")

    # Flexible dtype/device so it runs even on a CPU smoke test
    use_bf16 = torch.cuda.is_available() and torch.cuda.is_bf16_supported()
    dtype = torch.bfloat16 if use_bf16 else torch.float16 if torch.cuda.is_available() else torch.float32

    # In AzureML distributed environments accelerate is used; omit device_map here
    model = AutoModelForCausalLM.from_pretrained(
        "openai/gpt-oss-20b",
        attn_implementation="eager",
        dtype=dtype,
        #quantization_config=quantization_config,
        use_cache=False,
        trust_remote_code=True,
    )

    tokenizer = AutoTokenizer.from_pretrained(
        "openai/gpt-oss-20b",
        trust_remote_code=True,
        use_fast=True,
    )
    # Minimal settings for chat / long text
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "right"

    peft_config = LoraConfig(
        r=8, 
        lora_alpha=16, 
        target_modules=[
            "q_proj",
            "k_proj",
            "v_proj",
            "o_proj",
            "gate_up_proj",
            "down_proj"
        ],
        bias="none", task_type="CAUSAL_LM",
    )


    model = get_peft_model(model, peft_config)
    model.print_trainable_parameters()

    # --- Dataset ---
    dpo_ds = load_dpo_dataset(args.train_file)
    dpo_ds = ensure_chat_form(dpo_ds, tokenizer)

    # --- DPO config ---
    # report_to may be "none" / "tensorboard" / "wandb" etc.
    dpo_args = DPOConfig(
        learning_rate=args.lr,
        gradient_checkpointing=True,
        gradient_checkpointing_kwargs={"use_reentrant": False},   # added
        #ddp_find_unused_parameters=False,                        # may cause OOM
        #packing=False,                                           # optional
        beta=0.1,                                                 # DPO
        loss_type="ipo",                                          # DPO variant
        max_length=args.max_seq_len,                              # DPO
        max_prompt_length=min(1024, args.max_seq_len // 2),       # DPO
        num_train_epochs=args.epochs,
        logging_steps=args.logging_steps,
        per_device_train_batch_size=args.per_device_train_batch_size,
        gradient_accumulation_steps=args.grad_accum_steps,
        warmup_ratio=args.warmup_ratio,
        lr_scheduler_type="cosine_with_min_lr",
        lr_scheduler_kwargs={"min_lr_rate": args.cosine_min_lr_rate},
        output_dir=args.output_dir,
        report_to="trackio", # changed from none
        bf16=use_bf16,  # enable only if GPU supports bf16
        fp16=(torch.cuda.is_available() and not use_bf16),
        seed=args.seed,
        push_to_hub=args.push_to_hub,
        hub_model_id=(args.hub_model_id or os.path.basename(args.output_dir)) if args.push_to_hub else None,
        hub_token=hf_token if args.push_to_hub else None,
        save_total_limit=2,
        save_steps=500,
    )



    trainer = DPOTrainer(
        model=model,         # Existing LoRA-applied model (or base + LoRA applied here)
        args=dpo_args,
        processing_class=tokenizer,
        train_dataset=dpo_ds,
    )

    trainer.train()
    trainer.save_model(args.output_dir)
    tokenizer.save_pretrained(args.output_dir)

    # === Merge LoRA into base model and save ===
    try:
        print("Merging LoRA adapter into base model...")
        merged_model = model.merge_and_unload()
        merged_dir = os.path.join(args.output_dir, "merged")
        os.makedirs(merged_dir, exist_ok=True)
        merged_model.save_pretrained(merged_dir)
        tokenizer.save_pretrained(merged_dir)
        print(f"Merged model saved at: {merged_dir}")
    except Exception as e:
        print(f"[WARN] Could not merge model automatically: {e}")


if __name__ == "__main__":
    main()


In [None]:
# job configuration
NUM_NODES = 2
NUM_GPU_PER_NODE = 1

# define distributed training job
dist = PyTorchDistribution(
    process_count_per_instance=NUM_GPU_PER_NODE,
    node_count=NUM_NODES
)


job = command(
    code="./src",  # Root (refers to src/dpo.py)
    command=(
        "python dpo.py "
        "--train_file ${{inputs.train_file}} "
        #"--output_dir ${{outputs.model_dir}} "   # default to "./outputs"
        "--epochs 8 " 
        "--per_device_train_batch_size 1 "
        "--grad_accum_steps 16 " 
        "--max_seq_len 1024 "
    ),
    inputs={
        "train_file": Input(
            type=AssetTypes.URI_FILE,
            path="dpo_pairs@latest"
        )
    },
    outputs={"model_dir": {"mode": "rw_mount", "path": "azureml://datastores/workspaceblobstore/paths/models/gpt-oss-20b-jp-reasoner"}},
    environment="env-gpt-oss-01:2",  # Includes transformers>=4.44, trl>=0.9, peft>=0.10, accelerate>=0.33
    compute=COMPUTE_CLUSTER,
    display_name="dpo-gpt-oss-20b-01",
    experiment_name="dpo-gpt-oss-20b-01-exp",
    instance_count=NUM_NODES,
    distribution=dist,
    environment_variables={
        "NCCL_DEBUG": "WARN",
        "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True",
        "HF_TOKEN": HF_TOKEN,
        "HF_HOME": "./outputs/hfhome",
        "TRACKIO_PROJECT": "ft-project"
    }
)
returned_job = ml_client.jobs.create_or_update(job)
print(returned_job.studio_url)

<h5>Register Model</h5>

In [None]:
model_path_from_job = "azureml://jobs/{0}/outputs/{1}".format(
    returned_job.name, "artifacts/outputs/merged"
)

print("path to register model: ", model_path_from_job)

In [None]:
from azure.ai.ml.entities import Model
model = Model(
    path=model_path_from_job,  # Reference to training job output
    name="gpt-oss-20b-jp-reasoner-01-dpo",
    description="LoRA fine-tuned gpt-oss-20b model for Japanese reasoning",
    type="custom_model",   # Hugging Face models are often registered as custom_model
    version="1",             # Explicit version (or omit for auto increment)
)

registered_model = ml_client.models.create_or_update(model)
print(f"Registered: {registered_model.name}:{registered_model.version}")

<h5>Deploy model

In [None]:
%%writefile ./src/score.py
# score.py
import os, json, torch
from transformers import AutoTokenizer, AutoModelForCausalLM

REASONING_LANGUAGE_DEFAULT = "Japanese"

def init():
    global model, tokenizer
    model_root = os.environ.get("AZUREML_MODEL_DIR", ".")
    model_dir  = os.path.join(model_root, os.getenv("MODEL_SUBDIR", "merged"))  # Read merged/

    tokenizer = AutoTokenizer.from_pretrained(
        model_dir, trust_remote_code=True, local_files_only=True
    )
    model = AutoModelForCausalLM.from_pretrained(
        model_dir,
        trust_remote_code=True,
        dtype=(torch.bfloat16 if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else None),
        device_map=("auto" if torch.cuda.is_available() else None),
        local_files_only=True,
    )
    model.eval()

def _build_messages(data):
    # Input can be either 1) {"prompt": "..."} or 2) {"messages": [...]} 
    system_prompt = data.get(
        "system_prompt",
        f"reasoning language: {data.get('reasoning_language', REASONING_LANGUAGE_DEFAULT)}"
    )

    if "messages" in data and isinstance(data["messages"], list):
        msgs = data["messages"]
        # Prepend system if the first message isn't system
        if not msgs or msgs[0].get("role") != "system":
            msgs = [{"role": "system", "content": system_prompt}] + msgs
        return msgs

    # Default messages if only a "prompt" was provided
    user_prompt = data.get("prompt", "")
    return [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt},
    ]

def run(raw_data):
    try:
        data = json.loads(raw_data) if isinstance(raw_data, str) else raw_data

        # ---- Build messages (based on provided code) ----
        messages = _build_messages(data)

        # Generation parameters (moderate defaults)
        max_new = int(data.get("max_new_tokens", 128))
        do_sample = bool(data.get("do_sample", False))
        gen_kwargs = {
            "max_new_tokens": max_new,
            "do_sample": do_sample,
        }
        # Only set temperature if sampling; otherwise warning appears
        if do_sample and "temperature" in data:
            gen_kwargs["temperature"] = float(data["temperature"])

        # ---- chat template -> input_ids ----
        input_ids = tokenizer.apply_chat_template(
            messages,
            add_generation_prompt=True,
            return_tensors="pt",
        ).to(model.device)

        # ---- Generate ----
        with torch.no_grad():
            output_ids = model.generate(input_ids, **gen_kwargs)

        # ---- Remove prompt echo (keep only generated continuation) ----
        generated_ids = output_ids[0, input_ids.shape[-1]:]
        text = tokenizer.decode(generated_ids, skip_special_tokens=True)

        # Return JSON (UTF-8)
        return json.dumps({"output": text}, ensure_ascii=False)

    except Exception as e:
        # Return error also as JSON
        return json.dumps({"error": str(e)}, ensure_ascii=False)


In [None]:
endpoint_name = f"gptoss-jp-{datetime.datetime.now():%m%d%H%M}"

# 1) Create endpoint
endpoint = ManagedOnlineEndpoint(
    name=endpoint_name, 
    description="gpt-oss-20b",
    auth_mode="key")
ml_client.begin_create_or_update(endpoint).result()

In [None]:
# 2) Deploy (refer to the already registered merged model in Model Registry)
registered = ml_client.models.get("gpt-oss-20b-jp-reasoner-01-dpo", version="1")

deploy = ManagedOnlineDeployment(
    name="blue",
    endpoint_name=endpoint_name,
    model=registered.id,
    environment="env-gpt-oss-01@latest",
    code_configuration=CodeConfiguration(code="./src", scoring_script="score.py"),
    instance_type="Standard_NC40ads_H100_v5", 
    instance_count=1,
    request_settings=OnlineRequestSettings(
        request_timeout_ms=180000,
        max_concurrent_requests_per_instance=1,
        max_queue_wait_ms=180000,
    ),
)
ml_client.begin_create_or_update(deploy).result()

In [None]:
# 3) Traffic routing
ep = ml_client.online_endpoints.get(endpoint_name)
ep.traffic = {"blue": 100}
ml_client.begin_create_or_update(ep).result()

print("endpoint:", endpoint_name)

<h5>Try the deployment

In [None]:
from azure.ai.ml.entities import OnlineRequestSettings
from azure.ai.ml import MLClient
import json, requests

keys = ml_client.online_endpoints.get_keys(name=endpoint_name).primary_key
scoring_url = ml_client.online_endpoints.get(endpoint_name).scoring_uri
headers = {"Authorization": f"Bearer {keys}"}

payload = {
  "messages": [
    {"role": "system", "content": "reasoning language: Japanese"},
    {"role": "user", "content": "オーストラリアの首都はどこですか？"}
  ],
  "max_new_tokens": 500
}

res = requests.post(scoring_url, headers=headers, json=payload, timeout=300)
print(res.status_code)
data = res.json()                 # First-level decode
if isinstance(data, str):         # If still a string, decode second-level
    data = json.loads(data)

print(data["output"])  

In [None]:
ml_client.online_endpoints.begin_delete(name=endpoint_name).wait()

<h5>問題解決

In [None]:
print(res.status_code)
print(res.headers.get("content-type"))
print(res.text[:1000])  

In [None]:
ml_client.online_deployments.get_logs(
    name="blue", endpoint_name=endpoint_name, lines=200
)

<h5>Original Code</h5>

In [None]:
# job configuration
NUM_NODES = 1
NUM_GPU_PER_NODE = 1

# define distributed training job
dist = PyTorchDistribution(
    process_count_per_instance=NUM_GPU_PER_NODE,
    node_count=NUM_NODES
)

job = command(
    code="./azureml",
    command=(
        "python megatron_lm/tools/preprocess_data.py \
        --input ${{inputs.train_data}} \
        --output-prefix ${{outputs.indexed}}/wikidump \
        --tokenizer-type Llama2Tokenizer \
        --tokenizer-model ${{inputs.model_dir}} \
        --workers 1 && "
        "cp ${{inputs.train_data}} ${{outputs.indexed}} && "
        "mv ${{outputs.indexed}}/wikidump_text_document.bin ${{outputs.indexed}}/wikidump.jsonl.bin && "
        "mv ${{outputs.indexed}}/wikidump_text_document.idx ${{outputs.indexed}}/wikidump.jsonl.idx"
    ),
    inputs={
        "train_data": Input(
            type=AssetTypes.URI_FILE, 
            path="wiki_dump@latest"
        ),
        "model_dir": Input(
            type=AssetTypes.URI_FOLDER, 
            path="llama3-8b@latest"
        )
    },
    outputs={
        "indexed": Output(
            type=AssetTypes.URI_FOLDER,
            path="azureml://datastores/workspaceblobstore/paths/wiki-indexed-dataset1/",
            mode="rw_mount"
        )                      # Mountable from subsequent jobs
    },
    environment="llama3-8b-wiki_env@latest",
    compute=COMPUTE_CLUSTER,
    instance_count=NUM_NODES,
    distribution=dist,
    environment_variables={
        "LOGLEVEL": "INFO",
        "NCCL_DEBUG": "WARN",
        "NCCL_DEBUG_SUBSYS": "WARN",
        "PYTHONFAULTHANDLER": "1",
        "CUDA_LAUNCH_BLOCKING": "0"
    },
    display_name="llama3-8b-wiki-index",
    experiment_name="llama3-8b-wiki-index-exp"
)

In [None]:
# submit the job
returned_job = ml_client.jobs.create_or_update(job)
print(f"Job submitted: {returned_job.name}")
print(f"Monitor at: {returned_job.studio_url}")