In [8]:
import os
from datetime import datetime

from dotenv import load_dotenv

loaded = load_dotenv("../../.env")
print(f"Loaded .env: {loaded}")


from azure.ai.ml import MLClient, command, dsl, Input, Output
from azure.ai.ml.entities import Environment, BuildContext, AmlCompute
from azure.identity import InteractiveBrowserCredential

credential = InteractiveBrowserCredential()

PATH_TO_CONFIG_FILE = "config.json"

ml_client = MLClient.from_config(credential, path=PATH_TO_CONFIG_FILE)

Found the config file in: /Users/nicholasbroad/huggingface/azure-ml-transformers/config.json


Loaded .env: True


In [9]:
COMPUTE_MAPPING = {
    # Flash attention only works on GPUs that start with A (A100), L (L4, L40), or H (H100)
    "4xV100": "Standard_NC24s_v3",
    "2xV100": "Standard_NC12s_v3",
    "1xV100": "Standard_NC6s_v3",
    "4xT4": "Standard_NC64as_T4_v3",
    "1xT4": "Standard_NC4as_T4_v3",
    "1xA10": "Standard_NV36adms_A10_v5",
    "2xA10": "Standard_NV72ads_A10_v5",
    "1xA100-80GB": "Standard_NC24ads_A100_v4",
    "8xA100-40GB": "Standard_ND96asr_A100_v4",
    "8xA100-80GB": "Standard_ND96amsr_A100_v4",
    
}

In [10]:
timenow = datetime.utcnow().strftime("%Y%m%d%H%M") # YearMonthDayHourMinute

MODEL_NAME = "llama_clm__" + timenow
DATASTORE_NAME = "workspaceblobstore"



TRAIN_TOKENIZER_ENV_NAME = "train_tokenizer_env"
TRAIN_TOKENIZER_DIR = "train_tokenizer"
TRAIN_TOKENIZER_COMPUTE_NAME = "train-tokenization-compute" 
TRAIN_TOKENIZER_INSTANCE_TYPE = "Standard_F4s_v2" 
TEXT_FILES_BLOB_PATH = "robertamlm"
TEXT_FILES_PATH = f"azureml://datastores/{DATASTORE_NAME}/paths/{TEXT_FILES_BLOB_PATH}"
TOKENIZER_BLOB_PATH = "custom_llama_tokenizer"
TOKENIZER_OUTPUT_PATH = f"azureml://datastores/{DATASTORE_NAME}/paths/{TOKENIZER_BLOB_PATH}"
TRAIN_TOKENIZER_COMMAND_NAME = "train_tokenizer"
TRAIN_TOKENIZER_DISPLAY_NAME = "Train Tokenizer"

DATA_TOKENIZATION_ENV_NAME = "data_tokenization_env"
DATA_TOKENIZATION_DIR = "data_tokenization"
DATA_TOKENIZATION_COMPUTE_NAME = TRAIN_TOKENIZER_COMPUTE_NAME # use same compute as training tokenizer
DATA_TOKENIZATION_INSTANCE_TYPE = TRAIN_TOKENIZER_INSTANCE_TYPE # use same compute as training tokenizer
TOKENIZED_DATA_BLOB_PATH = "custom_llama_tokenized_data"
TOKENIZED_DATA_OUTPUT_PATH = f"azureml://datastores/{DATASTORE_NAME}/paths/{TOKENIZED_DATA_BLOB_PATH}"
DATA_TOKENIZATION_COMMAND_NAME = "data_tokenization"
DATA_TOKENIZATION_DISPLAY_NAME = "Data Tokenization"


TRAIN_DIR = "train_model"
TRAIN_ENV_NAME = "llama_clm_env"
TRAIN_COMPUTE_NAME = "roberta-mlm-compute-a100"
TRAIN_INSTANCE_TYPE = COMPUTE_MAPPING["1xA100-80GB"]
TRAIN_COMMAND_NAME = "train"
TRAIN_DISPLAY_NAME = "Train Model"
TRAINED_MODEL_OUTPUT_PATH = f"azureml://datastores/{DATASTORE_NAME}/paths/{MODEL_NAME}"

PIPELINE_NAME = "clm_pretraining_from_scratch"
PIPELINE_DESCRIPTION = "CLM pretraining pipeline from scratch"
EXPERIMENT_NAME = MODEL_NAME

CPU_ENV_IMAGE = "mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu22.04"
GPU_ENV_IMAGE = "mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.8-cudnn8-ubuntu22.04"

NUM_NODES = 1
NUM_GPUS = 1

In [11]:
def get_or_create_compute_target(
    ml_client,
    compute_name,
    instance_type="STANDARD_DS3_v2",
    min_nodes=0,
    max_nodes=1,
    idle_time=300,
):
    try:
        cmpute = ml_client.compute.get(compute_name)
        cmpute_name = cmpute.name
    except Exception:
        print(f"Creating a new {instance_type} compute target...")
        compute = AmlCompute(
            name=compute_name,
            size=instance_type,
            min_instances=min_nodes,
            max_instances=max_nodes,
            idle_time_before_scale_down=idle_time,
        )
        ml_client.compute.begin_create_or_update(compute)
        cmpute_name = compute.name
    return cmpute_name


def get_environment(
    environment_name,
    dependencies_dir,
    ml_client,
    gpu=False,
    dep_yaml=None,
    dockerfile_path=None,
):
    try:
        env = ml_client.environments.get(name=environment_name)
    except Exception:

        image = GPU_ENV_IMAGE if gpu else CPU_ENV_IMAGE

        if dockerfile_path is not None:
            build_context = BuildContext(
                path=dependencies_dir, dockerfile_path=dockerfile_path
            )

            env = Environment(
                name=environment_name,
                description="Custom environment",
                build=build_context,
            )
        else:
            env = Environment(
                name=environment_name,
                description="Custom environment",
                conda_file=os.path.join(dependencies_dir, dep_yaml),
                image=image,
            )

        env = ml_client.environments.create_or_update(env)

    return env

# Train Tokenizer

In [12]:
train_tokenizer_environment = get_environment(
    environment_name=TRAIN_TOKENIZER_ENV_NAME,
    dependencies_dir=TRAIN_TOKENIZER_DIR,
    dep_yaml="env.yaml",
    ml_client=ml_client,
    gpu=False,
)

train_tokenizer_compute = get_or_create_compute_target(
    ml_client=ml_client,
    compute_name=TRAIN_TOKENIZER_COMPUTE_NAME,
    min_nodes=0,
    max_nodes=1,
    instance_type=TRAIN_TOKENIZER_INSTANCE_TYPE,
)

In [34]:
"""
I need to pass the HF_TOKEN environment variable to the job because I use a gated repo (llama3) for a tokenizer.
"""

train_tokenizer_command = command(
    name=TRAIN_TOKENIZER_COMMAND_NAME,
    display_name=TRAIN_TOKENIZER_DISPLAY_NAME,
    inputs={
        "text_files_dir": Input(
            type="uri_folder",
            path=TEXT_FILES_PATH,
            mode="mount",
        ),
    },
    outputs={
        "output_dir": Output(
            type="uri_folder",
            path=TOKENIZER_OUTPUT_PATH,
            mode="rw_mount",
        ),
    },
    # The source folder of the component
    code="./train_tokenizer",
    command="""
python run.py \
    --tokenizer_name "meta-llama/Meta-Llama-3-8B-Instruct" \
    --vocab_size 128256 \
    --text_files_dir ${{inputs.text_files_dir}} \
    --glob_pattern "*.parquet" \
    --num_samples 10000 \
    --output_dir ${{outputs.output_dir}}
            """,
    environment=f"{train_tokenizer_environment.name}:{train_tokenizer_environment.version}",
    compute=train_tokenizer_compute,
    environment_variables={"HF_TOKEN": os.environ["HF_TOKEN"]},
    instance_count=1,
)

# Data Tokenization

In [14]:
data_tokenization_environment = get_environment(
    environment_name=DATA_TOKENIZATION_ENV_NAME,
    dependencies_dir=DATA_TOKENIZATION_DIR,
    ml_client=ml_client,
    dep_yaml="env.yaml",
    gpu=False,
)

data_tokenization_compute = get_or_create_compute_target(
    ml_client=ml_client,
    compute_name=DATA_TOKENIZATION_COMPUTE_NAME,
    min_nodes=0,
    max_nodes=1,
    instance_type=DATA_TOKENIZATION_INSTANCE_TYPE,
)

In [19]:
data_tokenization_command = command(
    name="data_tokenization",
    display_name="Data Tokenization",
    inputs={
        "text_files_dir": Input(
            type="uri_folder",
            path=TEXT_FILES_PATH,
            mode="mount",
        ),
        "tokenizer_name_or_path": Input(
            type="uri_folder",
            path=TOKENIZER_OUTPUT_PATH,
            mode="mount",
        ),
    },
    outputs={
        "output_dir": Output(
            type="uri_folder",
            path=TOKENIZED_DATA_OUTPUT_PATH,
            mode="rw_mount",
        ),
    },
    # The source folder of the component
    code="./data_tokenization",
    command="""
python run.py \
    --tokenizer_name_or_path ${{inputs.tokenizer_name_or_path}} \
    --file_type "parquet" \
    --text_files_dir ${{inputs.text_files_dir}} \
    --glob_pattern "*.parquet" \
    --output_dir ${{outputs.output_dir}} \
    --num_proc 4 \
    --max_seq_length 1024
            """,
    environment=f"{data_tokenization_environment.name}:{data_tokenization_environment.version}",
    compute=data_tokenization_compute,
    instance_count=1,
)

# Train

In [25]:
train_environment = get_environment(
    environment_name=TRAIN_ENV_NAME,
    dependencies_dir=TRAIN_DIR,
    dep_yaml="env.yaml",
    ml_client=ml_client,
    gpu=True,
)

train_compute = get_or_create_compute_target(
    ml_client=ml_client,
    compute_name=TRAIN_COMPUTE_NAME,
    min_nodes=0,
    max_nodes=1,
    instance_type=TRAIN_INSTANCE_TYPE,
)

In [35]:
"""
Since I use a custom mlflow callback, I disable `report_to` in the training command.
"""

train_command = command(
    name=TRAIN_COMMAND_NAME,
    display_name=TRAIN_DISPLAY_NAME,
    inputs={
        "num_processes": NUM_GPUS,
        "tokenizer_name_or_path" : Input(
            type="uri_folder",
            path=TOKENIZER_OUTPUT_PATH,
            mode="mount",
        ),
        "tokenized_files_dir": Input(
            type="uri_folder",
            path=TOKENIZED_DATA_OUTPUT_PATH,
            mode="mount",
        ),
    },
    outputs={
        "output_dir": Output(
            type="uri_folder",
            path=TRAINED_MODEL_OUTPUT_PATH,
            mode="rw_mount",
        ),
    },
    
    code="./train_model",
    command="""
accelerate launch --num_processes ${{inputs.num_processes}} --num_machines 1 \
    run.py \
--tokenizer_name_or_path ${{inputs.tokenizer_name_or_path}} \
--config_name_or_path ./llama_1b_config.json \
--tokenized_files_dir ${{inputs.tokenized_files_dir}} \
--glob_pattern "*.parquet" \
--output_dir ${{outputs.output_dir}} \
--do_train \
--do_eval \
--eval_strategy steps \
--eval_steps 50 \
--validation_split_num_samples_or_percentage 1000 \
--warmup_steps 100 \
--fp16 \
--attn_implementation sdpa \
--per_device_train_batch_size 8 \
--per_device_eval_batch_size 8 \
--gradient_accumulation_steps 4 \
--gradient_checkpointing True \
--num_train_epochs 3 \
--learning_rate 5e-5 \
--weight_decay 0.01 \
--optim adamw_torch \
--logging_steps 5 \
--save_strategy epoch \
--save_total_limit 3 \
--report_to none \
--torch_compile False \
--dataloader_num_workers 2 \
--ddp_find_unused_parameters False \
--max_steps 100
""",
    environment=f"{train_environment.name}:{train_environment.version}",
    compute=train_compute,
    shm_size="16g",
)

In [36]:
@dsl.pipeline(
    description=PIPELINE_DESCRIPTION,
    display_name=PIPELINE_NAME,
)
def pipeline_func():

    train_tokenizer_job = train_tokenizer_command()

    data_tokenization_job = data_tokenization_command(tokenizer_name_or_path=train_tokenizer_job.outputs.output_dir)

    train_job = train_command(
        tokenizer_name_or_path=train_tokenizer_job.outputs.output_dir,
        tokenized_files_dir=data_tokenization_job.outputs.output_dir,
        )

    return {
        "pipeline_job_train_data": train_job.outputs.output_dir,
    }


pipeline = pipeline_func()


pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name=EXPERIMENT_NAME,
)

[32mUploading train_model (0.02 MBs): 100%|██████████| 17729/17729 [00:00<00:00, 53908.05it/s]
[39m

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored


In [17]:
pipeline_job

Experiment,Name,Type,Status,Details Page
202402280146,happy_bulb_vrymb38623,pipeline,Preparing,Link to Azure Machine Learning studio
