# Cerebras ModelZoo PyTorch Training Workflow


Neocortex, a system that captures groundbreaking new hardware technologies, is designed to accelerate Artificial Intelligence (AI) research in pursuit of science, discovery, and societal good. Neocortex is a highly innovative resource that will accelerate AI- powered scientific discovery by vastly shortening the time required for deep learning training, foster greater integration of artificial deep learning with scientific workflows, and provide revolutionary new hardware for the development of more efficient algorithms for artificial intelligence and graph analytics.

In this notebook, we will create a Pegasus workflow to validate, compile and train the reference PyTorch model in the Cerebras modelzoo repository, to run on Neocortex.

## Pegasus Concepts and Training

It is highly recommended that you do Pegasus training on concepts of defining a workflow in Pegasus and how to configure the various catalogs via [ACCESS Pegasus](https://pegasus.access-ci.org). Click on the Try Pegasus Button on the page there.


## Container

All the jobs are run via a Cerebras provided singularity container that is available on the shared filesystem on Neocortex


## Setting up the input data for the workflow

The workflow also requires additional inputs to be placed in the input directory namely the **params.yaml** file for the training. The training datasets are tracked as part of the workflow and downloaded at runtime from the http://yann.lecun.com/exdb/mnist/

These are the inputs on which you do the training

modelzoo/modelzoo/fc_mnist/pytorch/data/mnist/train/MNIST/raw/train-images-idx3-ubyte.gz
modelzoo/modelzoo/fc_mnist/pytorch/data/mnist/train/MNIST/raw/train-labels-idx1-ubyte.gz
modelzoo/modelzoo/fc_mnist/pytorch/data/mnist/train/MNIST/raw/t10k-images-idx3-ubyte.gz
modelzoo/modelzoo/fc_mnist/pytorch/data/mnist/train/MNIST/raw/t10k-labels-idx1-ubyte.gz

The compute jobs will run on the cerebras cs-2 nodes. Pegasus adds other jobs that are run on the login node and retrieve the input data required for the workflow.  

In [None]:
!./executables/prepare_inputs.sh 

## Workflow

The Pegasus workflow starts with a tar file containing the Git checkout of the modelzoo repo, and iterates on it through the 3 stages. The `prepare_inputs.sh` script in the previous script does the checkout of the modelzoo github repository and tars it.

We then track it as an input for the workflow.

![Pegasus Cerebras Pytorch Workflow](./images/workflow.png)

### Set the Jupyter Environment

We set some environment variables and set PYTHONPATH for Pegasus libraries to be imported successfully. This is temporary until the Jupyter Notebook setup is fixed

In [None]:
%env PYTHONPATH=/ocean/projects/cis240026p/vahi/software/install/pegasus/default/lib64/pegasus/python:/jet/home/vahi/pegasus-env/lib/python3.6/site-packages:$PYTHONPATH"

In [None]:
#!/usr/bin/env python3

import sys
import os
sys.path.append("/ocean/projects/cis240026p/vahi/software/install/pegasus/default/lib64/pegasus/python")
sys.path.append("/jet/home/vahi/pegasus-env/lib/python3.6/site-packages")
print(os.environ.get('PYTHONPATH'))

In [None]:
"""
Sample Pegasus workflow for training a model on the Cerebras resource
at Neocortex.

This workflow validates, compiles and trains the model as part
of a single worklfow setup to run on Neocortex.

https://portal.neocortex.psc.edu/docs/running-jobs.html
"""

import argparse
import datetime
import logging
import os
import shutil
from pathlib import Path

from Pegasus.api import *

logging.basicConfig(level=logging.DEBUG)
BASE_DIR = str(Path(".").resolve())

# need to know where Pegasus is installed for notifications
PEGASUS_HOME = shutil.which("pegasus-version")
PEGASUS_HOME = os.path.dirname(os.path.dirname(PEGASUS_HOME))

# the PROJECT you are part of
PROJECT="cis240026p"

class CerebrasPyTorchWorkflow():
    wf = None
    sc = None
    tc = None
    rc = None
    props = None
    wf_name = "cerebras-model-zoo-pt"
    project = None
    # Log
    log = logging.getLogger(__name__)

    # --- Init ---------------------------------------------------------------------
    def __init__(self, dagfile="workflow.yml", project=None):
        self.dagfile = dagfile
        self.project = project

    # --- Write files in directory -------------------------------------------------
    def write(self):
        if not self.sc is None:
            self.sc.write()
        self.props.write()
        self.rc.write()
        self.tc.write()

        try:
            self.wf.write()
        except PegasusClientError as e:
            print(e)

    # --- Plan and Submit the workflow ----------------------------------------------
    def plan_submit(self):
        try:
            self.wf.plan(
                conf="pegasus.properties",
                sites=["neocortex"],
                output_site="local",
                dir="submit",
                cleanup="none",
                force=True,
                verbose=5,
                submit=True,
            )
        except PegasusClientError as e:
            print(e)

    # --- Get status of the workflow -----------------------------------------------
    def status(self):
        try:
            self.wf.status(long=True)
        except PegasusClientError as e:
            print(e)

    # --- Wait for the workflow to finish -----------------------------------------------
    def wait(self):
        try:
            self.wf.wait()
        except PegasusClientError as e:
            print(e)

    # --- Get statistics of the workflow -----------------------------------------------
    def statistics(self):
        try:
            self.wf.statistics()
        except PegasusClientError as e:
            print(e)

    # --- Configuration (Pegasus Properties) ---------------------------------------
    def create_pegasus_properties(self):
        self.props = Properties()
        self.props["pegasus.integrity.checking"] = "none"
        self.props[
            "pegasus.catalog.workflow.amqp.url"
        ] = "amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows"
        self.props["pegasus.data.configuration"] = "nonsharedfs"
        self.props["pegasus.mode"] = "development"
        # data transfers for the jobs should happen
        # on the HOSTOS not inside the container
        self.props["pegasus.transfer.container.onhost"] = True
        # we dont want any pegasus worker package
        # to be installed inside the container
        self.props["pegasus.transfer.worker.package"] = True
        self.props["pegasus.transfer.worker.package.autodownload"] = False
        # enable symlinking
        # props["pegasus.transfer.links"] = True
        self.props.write()
        return

    # --- Site Catalog -------------------------------------------------------------
    def create_sites_catalog(self, exec_site_name="condorpool"):
        self.sc = SiteCatalog()
        # add a local site with an optional job env file to use for compute jobs
        shared_scratch_dir = "/{}/workflows/LOCAL/scratch".format("${PROJECT}")
        local_storage_dir = "{}/outputs".format(BASE_DIR)
        local = Site("local").add_directories(
            Directory(Directory.SHARED_SCRATCH, shared_scratch_dir).add_file_servers(
                FileServer("file://" + shared_scratch_dir, Operation.ALL)
            ),
            Directory(Directory.LOCAL_STORAGE, local_storage_dir).add_file_servers(
                FileServer("file://" + local_storage_dir, Operation.ALL)
            ),
        )

        self.sc.add_sites(local)

        shared_scratch_dir = "/{}/workflows/NEOCORTEX/scratch".format("${PROJECT}")
        local_scratch_dir = "/local4/{}".format("${SALLOC_ACCOUNT}")
        neocortex = Site("neocortex").add_directories(
            Directory(
                Directory.SHARED_SCRATCH, shared_scratch_dir, shared_file_system=True
            ).add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),
            Directory(Directory.LOCAL_SCRATCH, local_scratch_dir).add_file_servers(
                FileServer("file://" + local_scratch_dir, Operation.ALL)
            ),
        )
        neocortex.add_condor_profile(grid_resource="batch slurm")
        #    neocortex.add_env("PEGASUS_HOME", "/ocean/projects/cis240026p/vahi/software/install/pegasus/default")
        neocortex.add_pegasus_profile(
            style="glite",
            queue="sdf",
            auxillary_local=True,
            runtime=1800,
            project=self.project,
        )
        self.sc.add_sites(neocortex)

    # --- Transformation Catalog (Executables and Containers) ----------------------
    def create_transformation_catalog(self, exec_site_name="condorpool"):
        self.tc = TransformationCatalog()
        container = Container(
            "cerebras",
            Container.SINGULARITY,
            "file:///ocean/neocortex/cerebras/cbcore_latest.sif",
            image_site="neocortex",
            # mounts=['/${PROJECT}/workflows/NEOCORTEX/scratch:/${PROJECT}/workflows/NEOCORTEX/scratch:rw'],
        )
        self.tc.add_containers(container)

        validate = Transformation(
            "validate",
            site="local",
            pfn=BASE_DIR + "/executables/validate.sh",
            is_stageable=True,
            container=container,
        )
        validate.add_profiles(Namespace.PEGASUS, key="cores", value="1")
        validate.add_profiles(Namespace.PEGASUS, key="runtime", value="900")
        validate.add_profiles(
            Namespace.PEGASUS,
            key="glite.arguments",
            value="--cpus-per-task=14 --gres=cs:cerebras:1 --qos=low",
        )
        self.tc.add_transformations(validate)

        compile = Transformation(
            "compile",
            site="local",
            pfn=BASE_DIR + "/executables/compile.sh",
            is_stageable=True,
            container=container,
        )
        compile.add_profiles(Namespace.PEGASUS, key="cores", value="1")
        compile.add_profiles(Namespace.PEGASUS, key="runtime", value="900")
        compile.add_profiles(
            Namespace.PEGASUS,
            key="glite.arguments",
            value="--cpus-per-task=14 --gres=cs:cerebras:1 --qos=low",
        )
        self.tc.add_transformations(compile)

        train = Transformation(
            "train", site="local", pfn=BASE_DIR + "/executables/train.sh", is_stageable=True, container=container
        )
        train.add_profiles(Namespace.PEGASUS, key="cores", value="1")
        train.add_profiles(Namespace.PEGASUS, key="runtime", value="3600")
        train.add_profiles(Namespace.PEGASUS, key="container.launcher", value="srun")
        train.add_profiles(Namespace.PEGASUS, key="container.launcher.arguments", value="--kill-on-bad-exit")
        train.add_profiles(
            Namespace.PEGASUS,
            key="glite.arguments",
            value="--cpus-per-task=14 --gres=cs:cerebras:1 --qos=low"
        )
        self.tc.add_transformations(train)

    # --- Replica Catalog ----------------------------------------------------------
    def create_replica_catalog(self):
        self.rc = ReplicaCatalog()
        # most of the replicas are added when creating the workflow

    # --- Create Workflow ----------------------------------------------------------
    def create_workflow(self):
        self.wf = Workflow(self.wf_name)

        # --- Workflow -----------------------------------------------------
        # the main input for the workflow is config file and the modelzoo checkout
        modelzoo_config_params = File(
            "modelzoo/modelzoo/fc_mnist/pytorch/configs/params.yaml"
        )
        modelzoo_raw = File("modelzoo-raw.tgz")
        self.rc.add_replica(
            "local", modelzoo_config_params.lfn, "{}/input/params.yaml".format(BASE_DIR)
        )
        self.rc.add_replica(
            "local", modelzoo_raw.lfn, "{}/input/modelzoo-raw.tgz".format(BASE_DIR)
        )

        # some output of modelzoo checkout at each stage
        modelzoo_validated = File("modelzoo-validated.tgz")
        modelzoo_compiled = File("modelzoo-compiled.tgz")
        modelzoo_trained = File("modelzoo-trained.tgz")
        modelzoo_trained_checkpoints = File("model-checkpoints.tgz")

        # some logs that we always stageout
        cerebras_logs = ["fabric.json", "run_summary.json", "params.yaml"]

        # validate job
        validate_job = Job("validate", node_label="validate_model")
        validate_job.add_args(
            "--mode train --validate_only --params configs/params.yaml  --model_dir model"
        )
        validate_job.add_inputs(modelzoo_raw)
        validate_job.add_inputs(modelzoo_config_params)
        validate_job.add_outputs(modelzoo_validated, stage_out=True)
        # add files against which we will train as inputs
        # instead of letting the code download automatically
        prefix = "modelzoo/modelzoo/fc_mnist/pytorch/data/mnist/train/MNIST/raw"
        for file in [
            "train-images-idx3-ubyte.gz",
            "train-labels-idx1-ubyte.gz",
            "t10k-images-idx3-ubyte.gz",
            "t10k-labels-idx1-ubyte.gz",
        ]:
            train_file = File("{}/{}".format(prefix, file))
            self.rc.add_replica(
                "nonlocal",
                train_file.lfn,
                "http://yann.lecun.com/exdb/mnist/{}".format(file),
            )
            self.rc.add_replica(
                "nonlocal",
                train_file.lfn,
                "https://ossci-datasets.s3.amazonaws.com/mnist/{}".format(file),
            )
            validate_job.add_inputs(train_file)

        # track some cerebras log files as outputs
        for file in cerebras_logs:
            # scripts do rename of the files after job completes
            validate_job.add_outputs(File("{}_{}".format("validate", file)), stage_out=True)

        self.wf.add_jobs(validate_job)

        # compile job
        compile_job = Job("compile", node_label="compile_model")
        compile_job.add_args(
            "--mode train --compile_only --params configs/params.yaml --model_dir model"
        )
        compile_job.add_inputs(modelzoo_validated)
        compile_job.add_outputs(modelzoo_compiled, stage_out=True)

        # track some cerebras log files as outputs
        for file in cerebras_logs:
            # scripts do rename of the files after job completes
            compile_job.add_outputs(File("{}_{}".format("compile", file)), stage_out=True)

        self.wf.add_jobs(compile_job)

        # training job
        now = datetime.datetime.now().strftime("%s")
        training_job = Job("train", node_label="train_model")
        training_job.add_args(
            "--mode train --params configs/params.yaml --model_dir model --cs_ip $CS_IP_ADDR"
        )
        training_job.add_inputs(modelzoo_compiled)
        training_job.add_outputs(modelzoo_trained, stage_out=True)
        # training_job.add_outputs(modelzoo_trained_checkpoints, stage_out=True)
        training_job.set_stdout("train-{}.out".format(now))
        training_job.set_stderr("train-{}.err".format(now))

        # track some cerebras log files as outputs
        for file in cerebras_logs:
            # scripts do rename of the files after job completes
            if file == "fabric.json":
                # we dont copy fabric.json
                continue
            training_job.add_outputs(File("{}_{}".format("train", file)), stage_out=True)

        training_job.add_outputs(File("train_performance.json"), stage_out=True)
        self.wf.add_jobs(training_job)

    def __call__(self):

        self.log.info("Creating workflow properties...")
        self.create_pegasus_properties()

        self.log.info("Creating execution sites...")
        self.create_sites_catalog()

        self.log.info("Creating transformation catalog...")
        self.create_transformation_catalog()

        self.log.info("Creating replica catalog...")
        self.create_replica_catalog()

        self.log.info("Creating workflow ...")
        self.create_workflow()

        self.write()
        self.log.info("Workflow has been created. Will be planned and submitted ...")

        #self.plan_submit()



logging.basicConfig(level=logging.DEBUG)
workflow = CerebrasPyTorchWorkflow(project=PROJECT)
try:
    workflow()
except PegasusClientError as e:
    workflow.log.debug("", exc_info=True)
    print(e.output)
    sys.exit(1)
except Exception:
    workflow.log.debug("", exc_info=True)
    sys.exit(1)

## Plan and Submit the Workflow

We will now plan and submit the workflow for execution. By default we are running jobs on site **condorpool** i.e the selected ACCESS resource.

In [None]:
workflow.plan_submit()

After the workflow has been successfully planned and submitted, you can use the Python `Workflow` object in order to monitor the status of the workflow. It shows in detail the counts of jobs of each status and also the whether the job is idle or running.

In [None]:
workflow.status()

## Wait for the workflow to finish

In [None]:
workflow.wait()

## Inspect the outputs

The outputs of the workflow run can be found in the `./outputs

In [None]:
!ls outputs