In [1]:
import os
from datetime import datetime

from azure.ai.ml import MLClient, command, dsl, Input, Output
from azure.ai.ml.entities import Environment, BuildContext, AmlCompute
from azure.identity import InteractiveBrowserCredential

credential = InteractiveBrowserCredential()

PATH_TO_CONFIG_FILE = "./config.json"

ml_client = MLClient.from_config(credential, path=PATH_TO_CONFIG_FILE)

Found the config file in: config.json


In [2]:
COMPUTE_MAPPING = {
    # **Flash attention v2 only works on GPUs that start with A (A10, A100), H (H100), L (L40)**
    "1xA10": "Standard_NV36adms_A10_v5",
    "2xA10": "Standard_NV72ads_A10_v5",
    "8xA100": "Standard_ND96amsr_A100_v4",
}

In [4]:
MODEL_NAME = "mistral7b-summarization"
TRAIN_DIR = "train"
TRAIN_ENV_NAME = "fa2_train_env"
TRAIN_COMPUTE_NAME = "fa2_a10"
TRAIN_INSTANCE_TYPE = COMPUTE_MAPPING["2xA10"]
TRAIN_DISPLAY_NAME = "fa2 clm training"
TRAIN_DESCRIPTION = "Training a causal language model using Flash Attention 2"

DATASTORE_NAME = "workspaceartifactstore"

NUM_GPUS = 2

# YearMonthDayHourMinute
timenow = datetime.utcnow().strftime("%Y%m%d%H%M")

TRAINING_OUTPUT_PATH = MODEL_NAME + "__" + timenow

In [5]:
def get_or_create_compute_target(
    ml_client,
    compute_name,
    instance_type="STANDARD_DS3_v2",
    min_nodes=0,
    max_nodes=1,
    idle_time=300,
):
    try:
        cmpute = ml_client.compute.get(compute_name)
        cmpute_name = cmpute.name
    except Exception:
        print(f"Creating a new {instance_type} compute target...")
        compute = AmlCompute(
            name=compute_name,
            size=instance_type,
            min_instances=min_nodes,
            max_instances=max_nodes,
            idle_time_before_scale_down=idle_time,
        )
        ml_client.compute.begin_create_or_update(compute)
        cmpute_name = compute.name
    return cmpute_name


def get_environment(
    environment_name,
    dependencies_dir,
    ml_client,
    gpu=False,
    dep_yaml=None,
    dockerfile_path=None,
):
    try:
        env = ml_client.environments.get(name=environment_name)
    except Exception:
        if gpu:
            image = "mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.8-cudnn8-ubuntu22.04"
        else:
            image = "mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest"

        if dockerfile_path is not None:
            build_context = BuildContext(
                path=dependencies_dir, dockerfile_path=dockerfile_path
            )

            env = Environment(
                name=environment_name,
                description="Custom environment",
                build=build_context,
            )
        else:
            env = Environment(
                name=environment_name,
                description="Custom environment",
                conda_file=os.path.join(dependencies_dir, dep_yaml),
                image=image,
            )

        env = ml_client.environments.create_or_update(env)

    return env

In [None]:
train_environment = get_environment(
    environment_name=TRAIN_ENV_NAME,
    dependencies_dir=TRAIN_DIR,
    ml_client=ml_client,
    gpu=True,
    dockerfile_path="Dockerfile",
)

train_compute = get_or_create_compute_target(
    ml_client=ml_client,
    compute_name=TRAIN_COMPUTE_NAME,
    min_nodes=0,
    max_nodes=2,
    instance_type=TRAIN_INSTANCE_TYPE,
)

In [None]:
train_command = command(
    name="train",
    display_name=TRAIN_DISPLAY_NAME,
    inputs={
        "num_gpus": NUM_GPUS,
    },
    outputs={
        "output_dir": Output(
            type="uri_folder",
            path=f"azureml://datastores/{DATASTORE_NAME}/paths/{TRAINING_OUTPUT_PATH}",
            mode="rw_mount",
        ),
    },
    # The source folder of the component
    code="./train",
    command="""accelerate launch \
             --num_machines 1 \
             --multi_gpu \
             --num_processes  ${{inputs.num_gpus}} \
                train.py \
                --model_name_or_path "mistralai/Mistral-7B-v0.1" \
                --max_seq_length 4096 \
                --evaluation_strategy epoch \
                --save_strategy epoch \
                --save_total_limit 2 \
                --logging_steps 25 \
                --per_device_train_batch_size 8 \
                --per_device_eval_batch_size 8 \
                --learning_rate 1e-4 \
                --num_train_epochs 3 \
                --weight_decay 0.01 \
                --optim paged_adamw_8bit \
                --warmup_ratio 0.05 \
                --bf16 \
                --output_dir ${{outputs.output_dir}} \
                --logging_dir ${{outputs.output_dir}} \
                --dataloader_num_workers 4 \
                --gradient_accumulation_steps 1 \
                --seed 42 \
                --report_to mlflow \
                --attn_implementation "flash_attention_2" \
                --num_proc 16 \
                --gradient_checkpointing \
                --ddp_find_unused_parameters False
            """,
    environment=f"{train_environment.name}:{train_environment.version}",
    compute=train_compute,
    instance_count=1,
    shm_size="16g",
)

In [None]:
@dsl.pipeline(
    description=TRAIN_DESCRIPTION,
    display_name=TRAIN_DISPLAY_NAME,
)
def pipeline_func():
    train_job = train_command()

    return {
        "pipeline_job_train_data": train_job.outputs.output_dir,
    }


pipeline = pipeline_func()


pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name=TRAINING_OUTPUT_PATH,
)

In [None]:
pipeline_job