# Distributed training using pytorch operation


In [None]:
! pip install -r requirements.txt

In [None]:
# Import necessary libraries

import json
import os
from typing import NamedTuple
from collections import namedtuple
import kfp
import kfp.dsl as dsl
from kfp import components
from kfp.dsl.types import Integer

kfp.__version__

In [None]:
# Initialize global variables
ROOT_DIR = os.path.abspath('/home/jovyan')
efs_mount_point = 'efs-data'
train_image = '974643886555.dkr.ecr.ap-northeast-2.amazonaws.com/aladin-runtime:pytorch-llama'
#train_image='public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest'
train_script = 'klue_ynat_2_finetune_4_13b-chat.py'

## Copy the training script to EFS mount path

In [None]:
efs_mount_dir=ROOT_DIR+'/' + efs_mount_point
print(efs_mount_dir)
!cp ../klue/$train_script $efs_mount_dir/

In [None]:
# Get kuberenetes pvc claim id for the provisioned efs from Kubeflow Volumes on the dashboard. eg. efs-sc-claim
pvc_claim_id=!(kubectl get pvc --no-headers=true | awk '/efs-data/{print$3}' )
pvc_claim_id[0]

## GET kfp client

In [None]:
def kfp_client():
    """
    Returns Kubeflow pipelines client inside cluster.
    """
    end_point="http://ml-pipeline.kubeflow.svc.cluster.local:8888"
    credentials = kfp.auth.ServiceAccountTokenVolumeCredentials(path=None)
    client = kfp.Client(host=end_point, credentials=credentials)

    return client


## Base Image

In [None]:
def base_image() -> str:
    import os
    import re

    iam = os.environ.get("AWS_ROLE_ARN")
    account = re.findall("arn:aws:iam::(.*):role*", iam)[0]
    region = os.environ.get("AWS_REGION")
    base_image = "{}.dkr.ecr.{}.amazonaws.com/aladin-runtime:anaconda-cpu".format(account, region)
    print("base_image = {}".format(base_image))
    return base_image

In [None]:
def get_current_namespace():
    """Returns current namespace if available, else kubeflow"""
    try:
        current_namespace = open(
            "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
        ).read()
    except:
        current_namespace = "kubeflow"
    return current_namespace

## Worker spec

In [None]:
# Create worker spec
def create_worker_spec(
        worker_num: int = 0,
        train_image_name: str = None,
        train_script_name: str = None
) -> NamedTuple(
    "CreatWorkerSpec", [("worker_spec", dict)]
):
    from collections import namedtuple
    """
    Creates pytorch-job worker spec
    """
    worker = {}
    if worker_num > 0:
        worker = {
            "replicas": worker_num,
            "restartPolicy": "OnFailure",
            "template": {
                "metadata": {
                    "annotations": {
                        "sidecar.istio.io/inject": "false"
                    }
                },
                "spec": {
                    "containers": [
                        {
                            "command": [
                                "sh",
                                "-ec",
                                "python3 -m pip install --user --no-warn-script-location tensorboardX && $0 $@",
                                "python",
                                f"/efs-data/{train_script_name}"
                            ],
                            "args": [
                                "--backend",
                                "gloo",
                            ],
                            "image": f"{train_image_name}",
                            "name": "pytorch",
                            "resources": {
                                "requests": {
                                    "memory": "40Gi",
                                    "cpu": "7",
                                    # Uncomment for GPU
                                    "nvidia.com/gpu": 2,
                                }
                            },
                            "volumeMounts": [
                                {
                                    "mountPath": "/efs-data",
                                    "name": "efs-data"
                                }
                            ],
                        }
                    ],
                    "volumes": [
                        {
                            "name": "efs-data",
                            "persistentVolumeClaim": {
                                "claimName": "efs-data"
                            }
                        }
                    ]
                },
            },
        }

    worker_spec_output = namedtuple(
        "MyWorkerOutput", ["worker_spec"]
    )
    return worker_spec_output(worker)


In [None]:
# Create worker_spec component
worker_spec_op = components.func_to_container_op(
    create_worker_spec,
    base_image=base_image(),
)

## Create Kubeflow Pipeline

In [None]:
@dsl.pipeline(
    name="klue-pytorchjob",
    description="An example to launch pytorch.",
)
def train_def(
        namespace: str = get_current_namespace(),
        worker_replicas: int = 1,
        ttl_seconds_after_finished: int = -1,
        job_timeout_minutes: int = 60,
        delete_after_done: bool = False,
):
    print("train_pipeline: namespace={}, worker_replicas={}, ttl_seconds_after_finished={}, job_timeout_minutes={}, delete_after_done={}"
          .format(namespace, worker_replicas, ttl_seconds_after_finished, job_timeout_minutes, delete_after_done))
    pytorchjob_launcher_op = components.load_component_from_file(
        "../launcher/component.yaml"
    )

    master = {
        "replicas": 1,
        "restartPolicy": "OnFailure",
        "template": {
            "metadata": {
                "annotations": {
                    # See https://github.com/kubeflow/website/issues/2011
                    "sidecar.istio.io/inject": "false"
                }
            },
            "spec": {
                "containers": [
                    {
                        #To override default command
                        "command": [
                            "sh",
                            "-ec",
                            "| \npython3 -m pip install --user --no-warn-script-location tensorboardX \n$0 $@",
                            "python",
                            f"/efs-data/{train_script}"
                        ],
                        "args": [
                            "--backend",
                            "gloo",
                        ],
                        # Or, create your own image from
                        # https://github.com/kubeflow/pytorch-operator/tree/master/examples/mnist
                        "image": f"{train_image}",
                        "name": "pytorch",
                        "resources": {
                            "requests": {
                                "memory": "40Gi",
                                "cpu": "7",
                                # Uncomment for GPU
                                "nvidia.com/gpu": 2,
                            }
                        },
                        "volumeMounts": [
                            {
                                "mountPath": "/efs-data",
                                "name": "efs-data"
                            }
                        ],
                    }
                ],
                "volumes": [
                    {
                        "name": "efs-data",
                        "persistentVolumeClaim": {
                            "claimName": "efs-data"
                        }
                    }
                ]
            },
        },
    }

    print(f"master_spec:\n{master}")

    worker_spec_create = worker_spec_op(
        worker_replicas, train_image, train_script
    )

    # Launch and monitor the job with the launcher
    pytorchjob_launcher_op(
        # Note: name needs to be a unique pytorchjob name in the namespace.
        # Using RUN_ID_PLACEHOLDER is one way of getting something unique.
        name=f"pytorch-klue-{kfp.dsl.RUN_ID_PLACEHOLDER}",
        namespace=namespace,
        master_spec=master,
        # pass worker_spec as a string because the JSON serializer will convert
        # the placeholder for worker_replicas (which it sees as a string) into
        # a quoted variable (eg a string) instead of an unquoted variable
        # (number).  If worker_replicas is quoted in the spec, it will break in
        # k8s.  See https://github.com/kubeflow/pipelines/issues/4776
        worker_spec=worker_spec_create.outputs[
            "worker_spec"
        ],
        ttl_seconds_after_finished=ttl_seconds_after_finished,
        job_timeout_minutes=job_timeout_minutes,
        delete_after_done=delete_after_done,
    )


## Upload the pipeline

In [None]:
import kfp.compiler as compiler

pipeline_file = "pytorch_klue_pipeline.yaml"
print(
    f"Compiling pipeline as {pipeline_file}"
)
compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY).compile(
    train_def, pipeline_file
)
client = kfp_client()
client.upload_pipeline(pipeline_package_path=pipeline_file, pipeline_name="pytorch_klue_notebook", description="pytorch_klue_notebook")
print(f"Created pipeline ")