# Text Classification Pipelines with Azure Machine Learning

In this example, we fine-tune and evaluate a number of pretrained models on a subset of the [MultiNLI](https://www.nyu.edu/projects/bowman/multinli/) dataset using [Azure Machine Learning Pipelines](https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-ml-pipelines). Pipelines allow us to create sequential steps for preprocessing and training workflows, in addition to parallel steps that run independenly on a cluster of nodes. We demonstrate how one can submit model training jobs for multiple models, each consisting of multiple steps.

We use a [sequence classifier](../../../utils_nlp/models/transformers/sequence_classification.py) that wraps [Hugging Face's PyTorch implementation](https://github.com/huggingface/transformers) of different transformers, like [BERT](https://github.com/google-research/bert), [XLNet](https://github.com/zihangdai/xlnet), and [RoBERTa](https://github.com/pytorch/fairseq).

Below is a general illustration of the pipeline and its preprocessing and training steps.

<img src="https://nlpbp.blob.core.windows.net/images/tc_pipeline_graph.PNG" width=500>

The pipeline steps we chose are generic [Python script steps](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.python_script_step.pythonscriptstep?view=azure-ml-py) of the Azure ML SDK. This allows us to run parametrized Python scripts on a remote target. For this example, we will create pipeline steps that execute the preprocessing and training scripts provided in the [scripts](scripts) folder, with different arguments for different model types.

# Table of Contents

- [Define Parameters](#Define-Parameters)
- [Create AML Workspace and Compute Target](#Create-AML-Workspace-and-Compute-Target)
- [Upload Training Data to Workspace](#Upload-Training-Data-to-Workspace)
- [Setup Execution Environment](#Setup-Execution-Environment)
- [Define Pipeline Graph](#Define-Pipeline-Graph)
- [Run Pipeline](#Run-Pipeline)
- [Retrieve a Trained Model from Pipeline](#Retrieve-a-Trained-Model-from-Pipeline)
- [Test Model](#Test-Model)


In [1]:
from datetime import datetime
import os
import pandas as pd
import pickle
from azureml.core import Datastore, Environment, Experiment
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration
from azureml.data.data_reference import DataReference
from azureml.exceptions import ComputeTargetException
from azureml.pipeline.core import Pipeline, PipelineData, PipelineRun
from azureml.pipeline.steps import PythonScriptStep
from azureml.widgets import RunDetails
from utils_nlp.azureml import azureml_utils
from utils_nlp.dataset.multinli import load_pandas_df
from utils_nlp.models.transformers.sequence_classification import Processor, SequenceClassifier

## Define Parameters

In [2]:
SUBSCRIPTION_ID = ""
RESOURCE_GROUP = "ignite-demo"
WORKSPACE_NAME = "ignite-nlp-amlws"
WORKSPACE_REGION = "eastus"

# remote target
CLUSTER_NAME = "ignite-nlp-clstr"  # 2-16 chars
VM_SIZE = "STANDARD_NC12"
MIN_NODES = 0
MAX_NODES = 2

# local data
TEMP_DIR = "temp"
TRAIN_FILE = "train.csv"
TEXT_COL = "text"
LABEL_COL = "label"
TRAIN_SAMPLE_SIZE = 10000
# remote data
REMOTE_DATA_CONTAINER = "data"

# remote env config
PIP_PACKAGES = ["azureml-sdk==1.0.65", "torch==1.1", "tqdm==4.31.1", "transformers==2.1.1"]
CONDA_PACKAGES = ["numpy", "scikit-learn", "pandas"]
UTILS_NLP_WHL_DIR = "../../../dist"
PYTHON_VERSION = "3.6.8"
USE_GPU = True

# pipeline scripts
SCRIPTS_DIR = "scripts"
PREPROCESS_SCRIPT = "preprocess.py"
TRAIN_SCRIPT = "train.py"

# pretrained models
MODEL_NAMES = ["bert-base-uncased", "xlnet-base-cased"]

## Create AML Workspace and Compute Target

The following code block creates or retrieves an existing Azure ML workspace and a corresponding Azure ML compute target. For deep learning tasks, it is recommended that your compute nodes are GPU-enabled. Here, we're using a scalable cluster of size *(min_nodes, max_nodes)*. Setting *min_nodes* to zero ensures that the nodes are shutdown when not in use. Azure ML will allocate nodes as needed, up to *max_nodes*, and based on the jobs submitted to the compute target.

In [3]:
# create/get AML workspace
ws = azureml_utils.get_or_create_workspace(
    subscription_id=SUBSCRIPTION_ID,
    resource_group=RESOURCE_GROUP,
    workspace_name=WORKSPACE_NAME,
    workspace_region=WORKSPACE_REGION,
)

# create/get compute target
try:
    compute_target = ComputeTarget(workspace=ws, name=CLUSTER_NAME)
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(
        vm_size=VM_SIZE, min_nodes=MIN_NODES, max_nodes=MAX_NODES, vm_priority="lowpriority"
    )
    compute_target = ComputeTarget.create(
        workspace=ws, name=CLUSTER_NAME, provisioning_configuration=compute_config
    )
    compute_target.wait_for_completion(show_output=True)


## Upload Training Data to Workspace

In this example, we use a subset of the MultiNLI dataset for fine-tuning the specified pre-trained models. The dataset contains a column of sentences (*sentence1*) which we will use as text input, and a *genre* column which we use as class labels.

In [4]:
# create training data sample
os.makedirs(TEMP_DIR, exist_ok=True)
df = load_pandas_df(TEMP_DIR, "train")
df = df[df["gold_label"] == "neutral"] # filter duplicate sentences
df = df.sample(TRAIN_SAMPLE_SIZE)
df[TEXT_COL] = df["sentence1"]
df[LABEL_COL] = df["genre"]
df[[TEXT_COL, LABEL_COL]].to_csv(
    os.path.join(TEMP_DIR, TRAIN_FILE), header=True, index=None, quoting=1
)
# inspect dataset
df[[TEXT_COL, LABEL_COL]].head()

Unnamed: 0,text,label
31802,"As Buchanan put it, the boys in the War Room h...",slate
380426,"But if we cited your telephone number instead,...",slate
321741,"And while some, such as Goodwin, argue that th...",slate
58393,"Many Poles, especially among the intellectual ...",travel
138205,The small but significant Craft and Folk Art M...,travel


The Azure ML workspace comes with a default datastore that is linked to an Azure Blob storage in the same resource group. We will use this datastore to upload the CSV data file. We will also use it for the intermediate output of the pipeline steps, as well as for the final output of the training step. In practice, one can create other datastores and link them to existing Blob Storage containers.

In [5]:
# upload data to datastore
ds = ws.get_default_datastore()
ds.upload_files(
    files=[os.path.join(TEMP_DIR, TRAIN_FILE)],
    target_path=REMOTE_DATA_CONTAINER,
    overwrite=True,
    show_progress=True,
)

Uploading an estimated of 1 files
Uploading temp/train.csv
Uploaded temp/train.csv, 1 files out of an estimated total of 1
Uploaded 1 files


$AZUREML_DATAREFERENCE_6c40ab853af64e0b968e85f0cd51d0e5

## Setup Execution Environment

In addition to the *pip* and *conda* dependencies listed in the parameters section, we would need to include the packaged utils_nlp wheel file. The utils_nlp folder of this repo includes the transformer procesor and the classifier that we will fine-tune on the remote target. The *preprocess.py* and *train.py* [scripts](scripts) import the *utils_nlp* package, as they call the preprocessing and classification functions of its wrapper classes.

In [6]:
# locate utils_nlp whl file
utils_nlp_whl_file = [x for x in os.listdir(UTILS_NLP_WHL_DIR) if x.endswith(".whl")][0]

In [7]:
# conda env setup
conda_dependencies = CondaDependencies.create(
    conda_packages=CONDA_PACKAGES,
    pip_packages=PIP_PACKAGES,
    python_version=PYTHON_VERSION,
)
nlp_repo_whl = Environment.add_private_pip_wheel(
    workspace=ws,
    file_path=os.path.join(UTILS_NLP_WHL_DIR, utils_nlp_whl_file),
    exist_ok=True,
)
conda_dependencies.add_pip_package(nlp_repo_whl)
run_config = RunConfiguration(conda_dependencies=conda_dependencies)
run_config.environment.docker.enabled = True
if USE_GPU:
    run_config.environment.docker.base_image = azureml.core.runconfig.DEFAULT_GPU_IMAGE

## Define Pipeline Graph

As shown in the diagram earlier, the pipeline can be represented as a graph, where nodes represent execution steps. In this example we create a pipeline with two steps for each pretrained model we want to fine-tune. The processing and fine-tuning steps need to be executed in order. However, each sequence of these two steps can be executed in parallel for many types of models on multiple nodes of the compute cluster.

For text classification, a number of pretrained-models are available from [Hugging Face's transformers package](https://github.com/huggingface/transformers), which is used within *utils_nlp*. Here, we include preprocessing and training steps for the *MODEL_NAMES* defined in the parameters section. You can list the supported pretrained models using the following code.

In [8]:
print(SequenceClassifier.list_supported_models())

['bert-base-uncased', 'bert-large-uncased', 'bert-base-cased', 'bert-large-cased', 'bert-base-multilingual-uncased', 'bert-base-multilingual-cased', 'bert-base-chinese', 'bert-base-german-cased', 'bert-large-uncased-whole-word-masking', 'bert-large-cased-whole-word-masking', 'bert-large-uncased-whole-word-masking-finetuned-squad', 'bert-large-cased-whole-word-masking-finetuned-squad', 'bert-base-cased-finetuned-mrpc', 'bert-base-german-dbmdz-cased', 'bert-base-german-dbmdz-uncased', 'roberta-base', 'roberta-large', 'roberta-large-mnli', 'xlnet-base-cased', 'xlnet-large-cased', 'distilbert-base-uncased', 'distilbert-base-uncased-distilled-squad']


In [9]:
input_dir = DataReference(
    datastore=ds,
    data_reference_name="input_dir",
    path_on_datastore=REMOTE_DATA_CONTAINER,
    overwrite=False,
)

# create pipeline steps
all_steps = []

for model_name in MODEL_NAMES:

    preprocess_dir = PipelineData(
        name="preprocessed",
        datastore=ds,
        output_path_on_compute=REMOTE_DATA_CONTAINER + "/" + "preprocessed_" + model_name,
    )

    output_dir = PipelineData(
        name="trained",
        datastore=ds,
        output_path_on_compute=REMOTE_DATA_CONTAINER + "/" + "trained_" + model_name,
    )

    preprocess_step = PythonScriptStep(
        name="preprocess_step_{}".format(model_name),
        arguments=[input_dir, TRAIN_FILE, preprocess_dir, TEXT_COL, LABEL_COL, model_name],
        script_name=PREPROCESS_SCRIPT,
        inputs=[input_dir],
        outputs=[preprocess_dir],
        source_directory=SCRIPTS_DIR,
        compute_target=compute_target,
        runconfig=run_config,
        allow_reuse=False,
    )

    train_step = PythonScriptStep(
        name="train_step_{}".format(model_name),
        arguments=[preprocess_dir, output_dir, model_name, MAX_NODES],
        script_name=TRAIN_SCRIPT,
        inputs=[preprocess_dir],
        outputs=[output_dir],
        source_directory=SCRIPTS_DIR,
        compute_target=compute_target,
        runconfig=run_config,
        allow_reuse=False,
    )

    train_step.run_after(preprocess_step)

    all_steps.append(preprocess_step)
    all_steps.append(train_step)

## Run Pipeline

Once the pipeline and its steps are defined, we can create an experiment in the Azure ML workspace and submit a pipeline run as shown below.

In [10]:
# create pipeline
pipeline = Pipeline(workspace=ws, steps=[all_steps])
experiment_name = "nlpatIgnite_" + datetime.now().strftime("%H%M%S")
experiment = Experiment(ws, experiment_name)
pipeline_run = experiment.submit(pipeline)
RunDetails(pipeline_run).show()
pipeline_run_id = pipeline_run.id

Created step preprocess_step_bert-base-uncased [9e0b4f42][d967ea09-c7a8-45e7-adba-44c9a3ccc8a9], (This step will run and generate new outputs)
Created step train_step_bert-base-uncased [4eb92d51][5ebdf951-fecf-4384-a497-ec2be1878246], (This step will run and generate new outputs)
Created step preprocess_step_xlnet-base-cased [2f85a9f4][df2b656c-04c0-4246-b1a0-a9dd479656a7], (This step will run and generate new outputs)
Created step train_step_xlnet-base-cased [28ddb914][82bfe567-ddb1-4b6a-ac5b-09a89bbba91d], (This step will run and generate new outputs)
Using data reference input_dir for StepId [142359a7][c1b309f5-1049-4014-8fd7-794c0b242c46], (Consumers of this data are eligible to reuse prior runs.)
Using data reference input_dir for StepId [326d0b6b][c1b309f5-1049-4014-8fd7-794c0b242c46], (Consumers of this data are eligible to reuse prior runs.)
Submitted pipeline run: a8c5b459-3dd0-4cea-9f8f-003df5fd3309


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

## Retrieve a Trained Model from Pipeline

The Azure ML SDK allows retrieving the pipeline runs and steps using the run id and step name. The following example downloads the output of the training step of the first model in *MODEL_NAMES*, which includes the fine-tuned classifier and the label_encoder used earlier.

In [11]:
# retrieve an existing training step & download corresponding model
# (from an existing experiment and pipeline run)
experiment = Experiment(ws, experiment_name)
pipeline_run = PipelineRun(experiment, pipeline_run_id)

In [13]:
train_step_run = pipeline_run.find_step_run("train_step_{}".format(MODEL_NAMES[0]))[0]
train_step_run

Experiment,Id,Type,Status,Details Page,Docs Page
nlpatIgnite_214131,7784250a-bbf1-47d3-85df-942b98c9b244,azureml.StepRun,Completed,Link to Azure Portal,Link to Documentation


In [14]:
# download
train_step_run.get_output_data(output_dir.name).download(local_path=TEMP_DIR)

# load classifier and label encoder
trained_dir = (
    "./temp/azureml/" + train_step_run.id + "/" + output_dir.name 
)
classifier = pickle.load(open(trained_dir + "/" + MODEL_NAMES[0] + "_clf", "rb"))
label_encoder = pickle.load(open(trained_dir + "/" + MODEL_NAMES[0] + "_le", "rb"))



## Test Model
Finally, we can test the model by scoring some text input.

In [15]:
# test
test_input = ["Let's go to Orlando. I've heard it's a nice place"]
processor = Processor(model_name=MODEL_NAMES[0], cache_dir=TEMP_DIR)
test_ds = processor.preprocess(test_input, max_len=150)
pred = classifier.predict(test_ds, device="cpu")
label_encoder.inverse_transform(pred)

Evaluating: 100%|██████████| 1/1 [00:01<00:00,  1.61s/it]


array(['fiction'], dtype=object)