# Install Python packages

In [None]:
%pip -q install -U azure-ai-ml azure-identity datasets==4.0.0 azureml-mlflow mlflow

# Load environment variables from a .env file
secret 노출을 피하고 notebook 들간의 일관된 환경변수를 설정하기 위해 `dotenv` 을 이용한다.

In [None]:
import os
from dotenv import load_dotenv

load_dotenv(override=True)

AZURE_AML_SUBSCRIPTION_ID = os.getenv("AZURE_AML_SUBSCRIPTION_ID")
AZURE_AML_RESOURCE_GROUP = os.getenv("AZURE_AML_RESOURCE_GROUP")
AZURE_AML_WORKSPACE = os.getenv("AZURE_AML_WORKSPACE")
AZURE_AML_CLUSTER = os.getenv("AZURE_AML_CLUSTER")

# Create a Azure Machine Learning cilent
Azure Machine Learning 의 Client 객체인 `MLClient` 을 생성한다. 본 예제는 Azure CLI 로그인 Credential 을 사용하고 있다. 터미널에서 `az login` 을 정상적으로 완료하여야 한다. `az` 명령어가 설치되어 있지 않다면 [Azure CLI 설치하는 방법](https://learn.microsoft.com/ko-kr/cli/azure/install-azure-cli?view=azure-cli-latest) 을 참고한다.

In [None]:
from azure.identity import AzureCliCredential
from azure.ai.ml import MLClient

ml_client = MLClient(
    AzureCliCredential(),
    AZURE_AML_SUBSCRIPTION_ID,
    AZURE_AML_RESOURCE_GROUP,
    AZURE_AML_WORKSPACE,
)

# Model Fine Tuning
OpenAI 에서 공개한 오픈웨이트 모델인 gpt-oss 를 사용한다. AML model registry 에 등록되어 있는 모델을 사용하여 fine tuning 을 해본다.

## Search gpt-oss-20b model
먼저 gpt-oss-20b 를 조회해본다.

In [None]:
model_name = "gpt-oss-20b"
model = MLClient(
    AzureCliCredential(),
    registry_name="azureml-openai-oss",
).models.get(model_name, label="latest")
model.properties["finetune-recommended-sku"]

## Preparing Dataset
huggingface 의 `HuggingFaceH4/Multilingual-Thinking` 을 train/valid/test 용 dataset 으로 준비한다.

In [None]:
import json
import random
from datasets import load_dataset


def save_hf_dataset_to_jsonl(dataset, output_file):
    """Save HuggingFace dataset to JSONL file."""
    with open(output_file, "w", encoding="utf-8") as f:
        for record in dataset:
            f.write(json.dumps(record, ensure_ascii=False, default=str) + "\n")
    print(f"Saved {len(dataset)} records to {output_file}")


def convert_multilingual_thinking(input_file, output_file):
    """Convert multilingual thinking dataset to standard JSONL format with only messages field."""
    with open(input_file, "r", encoding="utf-8") as f:
        lines = f.readlines()
    json_objects = [json.loads(line) for line in lines if line.strip()]
    with open(output_file, "w", encoding="utf-8") as f:
        for obj in json_objects:
            cleaned_messages = []
            for msg in obj.get("messages", []):
                cleaned_msg = {"role": msg["role"], "content": msg["content"]}
                if "thinking" in msg:
                    cleaned_msg["thinking"] = msg["thinking"]
                cleaned_messages.append(cleaned_msg)
            output_obj = {"messages": cleaned_messages}
            f.write(json.dumps(output_obj, ensure_ascii=False) + "\n")
    print(f"Converted {len(json_objects)} records to {output_file}")


def split_jsonl(
    input_file,
    train_file,
    valid_file,
    test_file,
    train_size=800,
    valid_size=100,
    test_size=100,
    seed=42,
):
    """Split JSONL file into train/valid/test splits."""
    with open(input_file, "r", encoding="utf-8") as f:
        lines = [line for line in f if line.strip()]
    random.seed(seed)
    random.shuffle(lines)
    train_lines = lines[:train_size]
    valid_lines = lines[train_size : train_size + valid_size]
    test_lines = lines[train_size + valid_size : train_size + valid_size + test_size]
    with open(train_file, "w", encoding="utf-8") as f:
        f.writelines(train_lines)
    with open(valid_file, "w", encoding="utf-8") as f:
        f.writelines(valid_lines)
    with open(test_file, "w", encoding="utf-8") as f:
        f.writelines(test_lines)
    print(
        f"Train: {len(train_lines)}, Valid: {len(valid_lines)}, Test: {len(test_lines)}"
    )


# Usage example:
dataset = load_dataset("HuggingFaceH4/Multilingual-Thinking", split="train")
save_hf_dataset_to_jsonl(dataset, "multilingual_thinking_raw_data.json")
convert_multilingual_thinking(
    "multilingual_thinking_raw_data.json", "multilingual_thinking_formatted_data.jsonl"
)
split_jsonl(
    "multilingual_thinking_formatted_data.jsonl",
    "multilingual_thinking_train.jsonl",
    "multilingual_thinking_valid.jsonl",
    "multilingual_thinking_test.jsonl",
)

데이터가 어떻게 생겼는지 확인해보자.

In [None]:
# load the ./multilingual_thinking_train.jsonl file into a pandas dataframe and show the first row
import pandas as pd

pd.set_option(
    "display.max_colwidth", 0
)  # set the max column width to 0 to display the full text
df = pd.read_json("./multilingual_thinking_train.jsonl", lines=True)
df.head(1)

## Train model
본격적으로 모델을 훈련한다. 아래 compute cluster 와 gpus 개수는 설정한 cluster 에 맞게 수정한다.

In [None]:
compute_cluster = AZURE_AML_CLUSTER
gpus_per_node = 4

In [None]:
import ast

# Default training parameters
training_parameters = dict(
    num_train_epochs=3,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    learning_rate=5e-6,
    lr_scheduler_type="cosine",
)
# Default optimization parameters
optimization_parameters = dict(
    apply_lora="true",
    apply_deepspeed="true",
    deepspeed_stage=3,
)
# Let's construct finetuning parameters using training and optimization paramters.
finetune_parameters = {**training_parameters, **optimization_parameters}

# Each model finetuning works best with certain finetuning parameters which are packed with model as `model_specific_defaults`.
# Let's override the finetune_parameters in case the model has some custom defaults.
if "model_specific_defaults" in model.tags:
    print("Warning! Model specific defaults exist. The defaults could be overridden.")
    finetune_parameters.update(
        ast.literal_eval(  # convert string to python dict
            model.tags["model_specific_defaults"]
        )
    )
print(
    f"The following finetune parameters are going to be set for the run: {finetune_parameters}"
)

In [None]:
# Set the pipeline display name for distinguishing different runs from the name
def get_pipeline_display_name():
    batch_size = (
        int(finetune_parameters.get("per_device_train_batch_size", 1))
        * int(finetune_parameters.get("gradient_accumulation_steps", 1))
        * int(gpus_per_node)
        * int(finetune_parameters.get("num_nodes_finetune", 1))
    )
    scheduler = finetune_parameters.get("lr_scheduler_type", "linear")
    deepspeed = finetune_parameters.get("apply_deepspeed", "false")
    ds_stage = finetune_parameters.get("deepspeed_stage", "2")
    if deepspeed == "true":
        ds_string = f"ds{ds_stage}"
    else:
        ds_string = "nods"
    lora = finetune_parameters.get("apply_lora", "false")
    if lora == "true":
        lora_string = "lora"
    else:
        lora_string = "nolora"
    save_limit = finetune_parameters.get("save_total_limit", -1)
    seq_len = finetune_parameters.get("max_seq_length", -1)
    return (
        model_name
        + "-"
        + "reasoning"
        + "-"
        + f"bs{batch_size}"
        + "-"
        + f"{scheduler}"
        + "-"
        + ds_string
        + "-"
        + lora_string
        + f"-save_limit{save_limit}"
        + f"-seqlen{seq_len}"
    )


pipeline_display_name = get_pipeline_display_name()
print(f"Display name used for the run: {pipeline_display_name}")

## Create a training pipeline and submit it
AML 의 `pipeline` 기능을 통해 job 을 생성하여 AML 에 제출한다.

In [None]:
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import Input

# fetch the pipeline component
registry_ml_client = MLClient(AzureCliCredential(), registry_name="azureml")
pipeline_component_func = registry_ml_client.components.get(
    name="chat_completion_pipeline", label="latest"
)


# define the pipeline job
@pipeline(name=pipeline_display_name)
def create_pipeline():
    chat_completion_pipeline = pipeline_component_func(
        pytorch_model_path=model .id,
        compute_model_import=compute_cluster,
        compute_preprocess=compute_cluster,
        compute_finetune=compute_cluster,
        compute_model_evaluation=compute_cluster,
        # map the dataset splits to parameters
        train_file_path=Input(
            type="uri_file", path="./multilingual_thinking_train.jsonl"
        ),
        validation_file_path=Input(
            type="uri_file", path="./multilingual_thinking_valid.jsonl"
        ),
        test_file_path=Input(
            type="uri_file", path="./multilingual_thinking_test.jsonl"
        ),
        # Training settings
        number_of_gpu_to_use_finetuning=gpus_per_node,  # set to the number of GPUs available in the compute
        **finetune_parameters
    )
    return {
        # map the output of the fine tuning job to the output of pipeline job so that we can easily register the fine tuned model
        # registering the model is required to deploy the model to an online or batch endpoint
        "trained_model": chat_completion_pipeline.outputs.mlflow_model_folder
    }


pipeline_object = create_pipeline()

# don't use cached results from previous jobs
pipeline_object.settings.force_rerun = True

# set continue on step failure to False
pipeline_object.settings.continue_on_step_failure = False

# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_object, experiment_name="chat_completion_gpt_oss"
)
# wait for the pipeline job to complete
ml_client.jobs.stream(pipeline_job.name)

## Register model from the job output
job 의 output 으로부터 model 을 찾아 이를 `Model` 로 등록한다.

In [None]:
import time

from azure.ai.ml.entities import Model
from azure.ai.ml.constants import AssetTypes

# check if the `trained_model` output is available
print("pipeline job outputs: ", ml_client.jobs.get(pipeline_job.name).outputs)

# fetch the model from pipeline job output - not working, hence fetching from fine tune child job
model_path_from_job = "azureml://jobs/{0}/outputs/{1}".format(
    pipeline_job.name, "trained_model"
)

finetuned_model_name = model_name + "multi-lingual-reasoning"
finetuned_model_name = finetuned_model_name.replace("/", "-")
print("path to register model: ", model_path_from_job)

prepare_to_register_model = Model(
    path=model_path_from_job,
    type=AssetTypes.MLFLOW_MODEL,
    name=finetuned_model_name,
    version=str(int(time.time())),  # use timestamp as version to avoid version conflict
    description=model_name
    + " fine tuned model for multi-lingual-thinking chat-completion",
)
print("prepare to register model: \n", prepare_to_register_model)
# register the model from pipeline job output
registered_model = ml_client.models.create_or_update(
    prepare_to_register_model
)
print("registered model: \n", registered_model)

# Model Deployment
모델을 학습했으니, 마지막으로 모델 배포다. 모델 배포 모듈은 Endpoint 와 Deployment 로 구성된다. Endpoint 는 model deployment 에 대한 reverse proxy 로, 권한&인증/traffic 관리/모니터링 등의 기능을 가진다. Deployment 는 모델을 안정적으로 배포하는 환경을 구축하며, 모델에 따른 다양한 runtime 환경을 지원한다.

## Create a managed online endpoint
먼저 endpoint 를 배포해보자. 실시간 추론을 지원하는 online type 으로 배포한다.

In [None]:
from azure.ai.ml.entities import (
    ManagedOnlineEndpoint,
    ManagedOnlineDeployment,
    ProbeSettings,
    OnlineRequestSettings,
)

# Create online endpoint - endpoint names need to be unique in a region, hence using timestamp to create unique endpoint name

online_endpoint_name = "chat-completion-" + str(int(time.time()))
# create an online endpoint
endpoint = ManagedOnlineEndpoint(
    name=online_endpoint_name,
    auth_mode="key",
)
ml_client.begin_create_or_update(endpoint).wait()

## Create a online deployment
다음은 Deployment 이다. Endpoint 와 동일하게 실시간 추론 타입으로 배포하자.

In [None]:

demo_deployment = ManagedOnlineDeployment(
    name="demo",
    endpoint_name=online_endpoint_name,
    model=registered_model.id,
    instance_type="Standard_NC40ads_H100_v5",
    instance_count=1,
    liveness_probe=ProbeSettings(initial_delay=600),
    request_settings=OnlineRequestSettings(request_timeout_ms=90000),
)
ml_client.online_deployments.begin_create_or_update(demo_deployment).wait()
endpoint.traffic = {"demo": 100}
ml_client.begin_create_or_update(endpoint).result()

In [None]:
# read ./multilingual_thinking_test.jsonl into a pandas dataframe
test_df = pd.read_json("./multilingual_thinking_test.jsonl", lines=True)
# take few random samples
test_df = test_df.sample(n=1)
# rebuild index
test_df.reset_index(drop=True, inplace=True)
test_df.head(1)

In [None]:
import json

test_df["messages"][0]
messages = []
for msg in test_df["messages"][0]:
    messages.append(msg)
    if msg["role"] == "user":
        break

# create a json object with the key as "input_data" and value as a list of values from the text column of the test dataframe
parameters = {
    "temperature": 0.6,
    "top_p": 0.9,
    "do_sample": True,
    "max_new_tokens": 200,
}
test_json = {
    "input_data": {
        "input_string": [messages],
        "parameters": parameters,
    },
    "params": {},
}
# save the json object to a file named sample_score.json in the ./samsum-dataset folder
with open("./sample_score.json", "w") as f:
    json.dump(test_json, f)
    
# score the sample_score.json file using the online endpoint with the azureml endpoint invoke method
response = ml_client.online_endpoints.invoke(
    endpoint_name=online_endpoint_name,
    deployment_name="demo",
    request_file="./sample_score.json",
)
print("raw response: \n", response, "\n")

In [None]:
ml_client.online_endpoints.begin_delete(name=online_endpoint_name).wait()