In [1]:
import os
from datetime import datetime

from azure.ai.ml import MLClient, command, dsl, Input, Output
from azure.ai.ml.entities import Environment, BuildContext, AmlCompute
from azure.identity import InteractiveBrowserCredential

credential = InteractiveBrowserCredential()

PATH_TO_CONFIG_FILE = ""

ml_client = MLClient.from_config(credential, path=PATH_TO_CONFIG_FILE)

Found the config file in: east2.json


In [2]:
COMPUTE_MAPPING = {
    # **Flash attention does not work on V100s**
    "4xV100": "Standard_NC24s_v3",
    "2xV100": "Standard_NC12s_v3",
    "1xV100": "Standard_NC6s_v3",
    "4xT4": "Standard_NC64as_T4_v3",
    "1xT4": "Standard_NC4as_T4_v3",
    "1xA10": "Standard_NV36adms_A10_v5",
    "2xA10": "Standard_NV72ads_A10_v5",
}

In [6]:
MODEL_NAME = "pix2struct-cord"
TRAIN_DIR = "train"
TRAIN_ENV_NAME = "p2s_train_env"
TRAIN_COMPUTE_NAME = "p2s-t4"
TRAIN_INSTANCE_TYPE = COMPUTE_MAPPING["1xT4"]

DATASTORE_NAME = "workspaceartifactstore"

NUM_NODES = 1
NUM_GPUS = 1

# YearMonthDayHourMinute
timenow = datetime.utcnow().strftime("%Y%m%d%H%M")

TRAINING_OUTPUT_PATH = MODEL_NAME + "__" +timenow

In [7]:
def get_or_create_compute_target(
    ml_client,
    compute_name,
    instance_type="STANDARD_DS3_v2",
    min_nodes=0,
    max_nodes=1,
    idle_time=300,
):
    try:
        cmpute = ml_client.compute.get(compute_name)
        cmpute_name = cmpute.name
    except Exception:
        print(f"Creating a new {instance_type} compute target...")
        compute = AmlCompute(
            name=compute_name,
            size=instance_type,
            min_instances=min_nodes,
            max_instances=max_nodes,
            idle_time_before_scale_down=idle_time,
        )
        ml_client.compute.begin_create_or_update(compute)
        cmpute_name = compute.name
    return cmpute_name


def get_environment(
    environment_name, dependencies_dir, ml_client, gpu=False, dep_yaml=None, dockerfile_path=None
):
    try:
        env = ml_client.environments.get(name=environment_name)
    except Exception:
        if gpu:
            image = "mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.8-cudnn8-ubuntu22.04"
        else:
            image = "mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest"

        if dockerfile_path is not None:
            build_context = BuildContext(path=dependencies_dir, dockerfile_path=dockerfile_path)

            env = Environment(
                name=environment_name,
                description="Custom environment",
                build=build_context,
            )
        else:
            env = Environment(
                name=environment_name,
                description="Custom environment",
                conda_file=os.path.join(dependencies_dir, dep_yaml),
                image=image
            )

        env = ml_client.environments.create_or_update(env)

    return env

In [12]:
train_environment = get_environment(
    environment_name=TRAIN_ENV_NAME,
    dependencies_dir=TRAIN_DIR,
    ml_client=ml_client,
    gpu=True,
    dockerfile_path="Dockerfile"
)

train_compute = get_or_create_compute_target(
    ml_client=ml_client,
        compute_name=TRAIN_COMPUTE_NAME,
        min_nodes=0,
        max_nodes=2,
        instance_type=TRAIN_INSTANCE_TYPE,
    )

[32mUploading train (0.37 MBs): 100%|██████████| 370195/370195 [00:00<00:00, 548237.43it/s]
[39m



In [13]:
distribution = None
environment_variables = None

if NUM_NODES > 1:
    distribution = {
        "type": "PyTorch",
        "process_count_per_instance": NUM_GPUS,
    }
    environment_variables={
        "NCCL_SOCKET_IFNAME":"eth0",
        "NCCL_DEBUG": "INFO"
    }


train_command = command(
    name="train",
    display_name="Train pix2struct ",
    inputs={
        "nproc_per_node": NUM_GPUS,
        "nnodes": NUM_NODES,
    },
    outputs={
        "output_dir": Output(
            type="uri_folder",
            path=f"azureml://datastores/{DATASTORE_NAME}/paths/{TRAINING_OUTPUT_PATH}",
            mode="rw_mount",
        ),
    },
    # The source folder of the component
    code="./train",
    command="""torchrun \
             --nnodes ${{inputs.nnodes}} \
             --nproc_per_node  ${{inputs.nproc_per_node}} \
                run.py \
                --model_name_or_path google/pix2struct-base \
                --evaluation_strategy epoch \
                --save_strategy epoch \
                --logging_steps 25 \
                --max_patches 1024 \
                --per_device_train_batch_size 2 \
                --per_device_eval_batch_size 2 \
                --learning_rate 3e-5 \
                --num_train_epochs 3 \
                --weight_decay 0.01 \
                --optim adamw_torch \
                --warmup_steps 100 \
                --fp16 \
                --output_dir ${{outputs.output_dir}} \
                --logging_dir ${{outputs.output_dir}} \
                --dataloader_num_workers 4 \
                --gradient_accumulation_steps 1 \
                --seed 42 \
                --report_to mlflow \
                --remove_unused_columns False \
                --prompt "Extract the items from the receipt in the image below:"
            """,
    environment=f"{train_environment.name}:{train_environment.version}",
    compute=train_compute,
    instance_count=NUM_NODES,  
    distribution=distribution,
    environment_variables=environment_variables,
    shm_size="16g"
)


In [14]:
@dsl.pipeline(
    description="flash p2s",
    display_name=f"flash p2s",
)
def pipeline_func():

    train_job = train_command()

    return {
        "pipeline_job_train_data": train_job.outputs.output_dir,
    }

pipeline = pipeline_func()


pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name=TRAINING_OUTPUT_PATH,
)

[32mUploading train (0.37 MBs): 100%|██████████| 370195/370195 [00:00<00:00, 723045.07it/s]
[39m



In [11]:
pipeline_job

Experiment,Name,Type,Status,Details Page
pix2struct-cord__202307101651,busy_plow_v5c6cg3j2n,pipeline,Preparing,Link to Azure Machine Learning studio
