# Create Datastore

Create Datastore that connects to Azure blob container that contains raw text data.

In [1]:
from azureml.core import Workspace, Datastore
from azureml.core.dataset import Dataset

ws = Workspace.from_config()

# Register a new datastore from blob container with raw data
blob_store = Datastore.register_azure_blob_container(workspace=ws, 
                                                  datastore_name='lis_artifacts', 
                                                  container_name='lis-artifacts',
                                                  account_name='lisml8132196936',
                                                  account_key='vhUtGVMoHyzG0NBy86EkG+WYjXUfTiDxhDZvy4mJr2I5e432Lq1ynpyXMAP5z6fxK2Zf/woO8L2n1FJIThx1lA==')

blob_store = Datastore.get(ws, datastore_name='lis_artifacts')

ws.set_default_datastore('lis_artifacts')

Upload csv files to datastore.

In [2]:
# upload the local files from src_dir to the datastore

blob_store.upload_files(files=['../data/train.csv', '../data/eval.csv'], target_path='data')

Uploading an estimated of 2 files
Target already exists. Skipping upload for data/train.csv
Target already exists. Skipping upload for data/eval.csv
Uploaded 0 files


$AZUREML_DATAREFERENCE_ff0c89f895844ba2a61be9989fcb719b

# Create Components Directories

Create a directory for the components of the pipeline. Create subdirectories for each component/step (train, register, deploy ...)

In [3]:
import os, shutil

# Set the directory for the experiment files 
components_dir = 'lis_components'
os.makedirs(components_dir, exist_ok=True)

# Create a directory inside for the train component
os.makedirs(os.path.join(components_dir, "train"), exist_ok=True)
# Create a directory inside for the register component
os.makedirs(os.path.join(components_dir, "register"), exist_ok=True)
# Create a directory inside for the deploy component
os.makedirs(os.path.join(components_dir, "deploy"), exist_ok=True)

# Step 1: train.py

This script train load a pretrained GPT2 model and fine tune the model over the train & eval datasets provided in the datastore.

In [4]:
%%writefile $components_dir/train/train.py

import random
import logging
from importlib import reload  # Not needed in Python 2
from typing import Optional

import numpy as np
import tensorflow as tf
import math
from functools import partial

from transformers import TFTrainingArguments, HfArgumentParser
from transformers import AutoConfig, AutoTokenizer
from transformers import TFAutoModelForCausalLM
from transformers import create_optimizer
from datasets import load_dataset

from dataclasses import dataclass, field

from azureml.core import Run

#reload(logging)
#logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG, datefmt='%I:%M:%S')
#logger = logging.getLogger()


# region Command-line arguments
@dataclass
class ModelArguments:
    """
    Arguments pertaining to which model/config/tokenizer we are going to fine-tune, or train from scratch.
    """

    model_name: str = field(
        default="gpt2",
        metadata={
            "help": "The model checkpoint for weights initialization."
        },
    )
        
    def __post_init__(self):
        if self.model_name is None:
            raise ValueError(
                "--cannot call script without model_name argument"
            )

@dataclass
class DataTrainingArguments:
    """
    Arguments pertaining to what data we are going to input our model for training and eval.
    """

    train_file: str = field(default="data/train.csv", metadata={"help": "The input training data file (a csv file)."})
    eval_file: str = field(
        default="data/eval.csv",
        metadata={"help": "The input evaluation data file to evaluate the perplexity on (a csv file)."},
    )
    overwrite_cache: bool = field(
        default=True, metadata={"help": "Overwrite the cached training and evaluation sets"}
    )
    preprocessing_num_workers: Optional[int] = field(
        default=None,
        metadata={"help": "The number of processes to use for the preprocessing."},
    )
    max_train_samples: Optional[int] = field(
        default=None,
        metadata={
            "help": "For debugging purposes or quicker training, truncate the number of training examples to this "
            "value if set."
        },
    )
    max_eval_samples: Optional[int] = field(
        default=None,
        metadata={
            "help": "For debugging purposes or quicker training, truncate the number of evaluation examples to this "
            "value if set."
        },
    )

    def __post_init__(self):
        if self.train_file is None or self.eval_file is None:
            raise ValueError(
                "--cannot call scripts without train_file & eval_file arguments"
            )

def sample_generator(dataset, tokenizer):
    # Trim off the last partial batch if present
    sample_ordering = np.random.permutation(len(dataset))
    for sample_idx in sample_ordering:
        example = dataset[int(sample_idx)]
        # Handle dicts with proper padding and conversion to tensor.
        example = {key: tf.convert_to_tensor(arr, dtype_hint=tf.int64) for key, arr in example.items()}
        yield example, example["labels"]  # TF needs some kind of labels, even if we don't use them
    return

# region Helper classes
class SavePretrainedCallback(tf.keras.callbacks.Callback):
    # Hugging Face models have a save_pretrained() method that saves both the weights and the necessary
    # metadata to allow them to be loaded as a pretrained model in future. This is a simple Keras callback
    # that saves the model with this method after each epoch.
    def __init__(self, output_dir, **kwargs):
        super().__init__()
        self.output_dir = output_dir

    def on_epoch_end(self, epoch, logs=None):
        self.model.save_pretrained(self.output_dir)
        
        
def main():
    # region Argument Parsing
    parser = HfArgumentParser((ModelArguments, DataTrainingArguments, TFTrainingArguments))

    # region Setup logging
    #logger.setLevel(logging.INFO)
    
    model_args, data_args, training_args = parser.parse_args_into_dataclasses()

    # Load the dataset from the datastore.
    raw_datasets = load_dataset('csv', data_files={'train': data_args.train_file, 'test': data_args.eval_file})

    # Testing loading datasets
    index = random.sample(range(len(raw_datasets["train"])), 1)
    #logger.info(f"  Example raw dataset: %s", raw_datasets["train"][index])

    # Load pretrained model and tokenizer
    
    config = AutoConfig.from_pretrained(model_args.model_name)
    tokenizer = AutoTokenizer.from_pretrained(model_args.model_name)

    text_column_name = "text"
    column_names = raw_datasets["train"].column_names
    
    # Preprocess Dataset & add eos_token 
    # Main data processing function that will add eos_token to each text in the dataset
    def add_eos_token(examples):
        examples_with_eos = examples
        examples_with_eos[text_column_name] = [x + tokenizer.eos_token for x in examples[text_column_name]]  
        return examples_with_eos

    raw_datasets = raw_datasets.map(
        add_eos_token,
        batched=True,
        num_proc=data_args.preprocessing_num_workers,
        load_from_cache_file=not data_args.overwrite_cache,
        desc=f"Adding eos_token to each example in the dataset",
    )
    
    # Testing preprocess
    #logger.info(f"  Example raw dataset with eos token: %s", raw_datasets["train"][index])

    ## Tokenize dataset using gpt2 tokenizer
    def tokenize_function(examples):
        return tokenizer(examples[text_column_name])

    tokenized_datasets = raw_datasets.map(
        tokenize_function,
        batched=True,
        num_proc=data_args.preprocessing_num_workers,
        remove_columns=column_names,
        load_from_cache_file=not data_args.overwrite_cache,
        desc="Running tokenizer on dataset",
    )
    
    # Testing Tokenization
    #logger.info(f"  Example tokenized dataset: %s", tokenized_datasets["train"][index])

    # Concatenate all texts from our dataset and generate chunks of block_size
    
    block_size = tokenizer.model_max_length
    if block_size > 1024:
        # The tokenizer picked seems to have a very large `model_max_length`
        block_size = 1024

    # Main data processing function that will concatenate all texts from our dataset and generate chunks of block_size.
    def group_texts(examples):
        # Concatenate all texts.
        concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()}
        total_length = len(concatenated_examples[list(examples.keys())[0]])
        # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can
        # customize this part to your needs.
        if total_length >= block_size:
            total_length = (total_length // block_size) * block_size
        # Split by chunks of max_len.
        result = {
            k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
            for k, t in concatenated_examples.items()
        }
        result["labels"] = result["input_ids"].copy()
        return result

    lm_datasets = tokenized_datasets.map(
        group_texts,
        batched=True,
        batch_size=len(tokenized_datasets["train"]), # if training size is very small, like in our case.
        num_proc=data_args.preprocessing_num_workers,
        load_from_cache_file=not data_args.overwrite_cache,
        desc=f"Grouping texts in chunks of {block_size}",
    )
    
    # Testing Grouping Texts
    
    #logger.info(f"  Example 0 raw dataset: %s", raw_datasets["train"][0])
    #logger.info(f"  Example 0 raw dataset: %s", raw_datasets["train"][1])
    #logger.info(f"  Example 0 raw dataset: %s", raw_datasets["train"][2])
    #logger.info(f"  Example 0 raw dataset: %s", raw_datasets["train"][3])

    #logger.info(f"  Example 0 tokenized dataset: %s", tokenized_datasets["train"][0])
    #logger.info(f"  Example 0 tokenized dataset: %s", tokenized_datasets["train"][1])
    #logger.info(f"  Example 0 tokenized dataset: %s", tokenized_datasets["train"][2])
    #logger.info(f"  Example 0 tokenized dataset: %s", tokenized_datasets["train"][3])

   
    #logger.info(f"  Example 0 concatenated tokenized dataset: %s", lm_datasets["train"][0]['input_ids'][:40])

    
    # Prepare Training & Evaluation Datasets
    train_dataset = lm_datasets["train"]
    eval_dataset = lm_datasets["test"]
    
    if data_args.max_train_samples is not None:
        train_dataset = train_dataset.select(range(data_args.max_train_samples))
    if data_args.max_eval_samples is not None:
        eval_dataset = eval_dataset.select(range(data_args.max_eval_samples))
        
    # Logging Training Parameters
    
    num_replicas = training_args.strategy.num_replicas_in_sync
    batches_per_epoch = len(train_dataset) // (num_replicas * training_args.per_device_train_batch_size)
    """
    logger.info(f"  Training Arguments: %s",
    {
        "init_lr": training_args.learning_rate,
        "num_replicas": num_replicas,
        "strategy": training_args.strategy,
        "num_train_epochs": training_args.num_train_epochs,
        "per_device_train_batch_size": training_args.per_device_train_batch_size,
        "batches_per_epoch": len(train_dataset) // (num_replicas * training_args.per_device_train_batch_size),
        "num_train_steps": int(training_args.num_train_epochs * batches_per_epoch),
        "num_warmup_steps": training_args.warmup_steps,
        "adam_beta1": training_args.adam_beta1,
        "adam_beta2": training_args.adam_beta2,
        "adam_epsilon": training_args.adam_epsilon,
        "weight_decay_rate": training_args.weight_decay
    }
    )
    """

    
    # Train Model

    with training_args.strategy.scope():

        config = AutoConfig.from_pretrained(model_args.model_name)
        model = TFAutoModelForCausalLM.from_pretrained(model_args.model_name, config=config)

        model.resize_token_embeddings(len(tokenizer))

        num_replicas = training_args.strategy.num_replicas_in_sync

        # region TF Dataset preparation
        train_generator = partial(sample_generator, train_dataset, tokenizer)
        train_signature = {
            feature: tf.TensorSpec(shape=(None,), dtype=tf.int64)
            for feature in train_dataset.features
            if feature != "special_tokens_mask"
        }
        train_sig = (train_signature, train_signature["labels"])
        options = tf.data.Options()
        options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
        tf_train_dataset = (
            tf.data.Dataset.from_generator(train_generator, output_signature=train_sig)
            .with_options(options)
            .batch(batch_size=num_replicas * training_args.per_device_train_batch_size, drop_remainder=True)
            .repeat(int(training_args.num_train_epochs))
        )
        eval_generator = partial(sample_generator, eval_dataset, tokenizer)
        eval_signature = {
            feature: tf.TensorSpec(shape=(None,), dtype=tf.int64)
            for feature in eval_dataset.features
            if feature != "special_tokens_mask"
        }
        eval_sig = (eval_signature, eval_signature["labels"])
        tf_eval_dataset = (
            tf.data.Dataset.from_generator(eval_generator, output_signature=eval_sig)
            .with_options(options)
            .batch(batch_size=num_replicas * training_args.per_device_eval_batch_size, drop_remainder=True)
            .repeat(int(training_args.num_train_epochs))
        )
        # endregion
        # region Optimizer and loss

        batches_per_epoch = len(train_dataset) // (num_replicas * training_args.per_device_train_batch_size)
        # Bias and layernorm weights are automatically excluded from the decay
        optimizer, lr_schedule = create_optimizer(
            init_lr=training_args.learning_rate,
            num_train_steps=int(training_args.num_train_epochs * batches_per_epoch),
            num_warmup_steps=training_args.warmup_steps,
            adam_beta1=training_args.adam_beta1,
            adam_beta2=training_args.adam_beta2,
            adam_epsilon=training_args.adam_epsilon,
            weight_decay_rate=training_args.weight_decay,
        )

        def dummy_loss(y_true, y_pred):
            return tf.reduce_mean(y_pred)

        model.compile(optimizer=optimizer, loss={"loss": dummy_loss})
        # endregion

        # region Training and validation
        #logger.info("***** Running training *****")
        #logger.info(f"  Num examples = {len(train_dataset)}")
        #logger.info(f"  Num Epochs = {training_args.num_train_epochs}")
        #logger.info(f"  Instantaneous batch size per device = {training_args.per_device_train_batch_size}")
        #logger.info(f"  Total train batch size = {training_args.per_device_train_batch_size * num_replicas}")

        history = model.fit(
            tf_train_dataset,
            validation_data=tf_eval_dataset,
            epochs=int(training_args.num_train_epochs),
            steps_per_epoch=len(train_dataset) // (training_args.per_device_train_batch_size * num_replicas),
            callbacks=[SavePretrainedCallback(output_dir=training_args.output_dir)],
        )
        try:
            train_perplexity = math.exp(history.history["loss"][-1])
        except OverflowError:
            train_perplexity = math.inf
        try:
            validation_perplexity = math.exp(history.history["val_loss"][-1])
        except OverflowError:
            validation_perplexity = math.inf
        #logger.info(f"  Final train loss: {history.history['loss'][-1]:.3f}")
        #logger.info(f"  Final train perplexity: {train_perplexity:.3f}")
        #logger.info(f"  Final validation loss: {history.history['val_loss'][-1]:.3f}")
        #logger.info(f"  Final validation perplexity: {validation_perplexity:.3f}")
        # endregion
        
        # log metrics to AML
        run = Run.get_context()

        run.log("Final train loss", history.history['loss'][-1])
        run.log("Final validation loss", history.history['val_loss'][-1])
        run.log("Final train perplexity", train_perplexity)
        run.log("Final validation perplexity", validation_perplexity)

        run.parent.log("Final train loss", history.history['loss'][-1])
        run.parent.log("Final validation loss", history.history['val_loss'][-1])
        run.parent.log("Final train perplexity", train_perplexity)
        run.parent.log("Final validation perplexity", validation_perplexity)
                
        if training_args.output_dir is not None:
            model.save_pretrained(training_args.output_dir)

if __name__ == "__main__":
    main()

Overwriting lis_components/train/train.py


# Step 2: register.py

This script uploads the saved h5 model from the blob store and register it as an AML model.

In [5]:
%%writefile $components_dir/register/register.py

# Import libraries
import argparse
from azureml.core import Run

def main():
    # Get parameters
    parser = argparse.ArgumentParser()
    parser.add_argument('--model_dir', 
                        type=str, 
                        dest='model_dir', 
                        default="outputs",
                        help='model location')
    parser.add_argument("--model_name",
                        type=str,
                        help="Name of the Registered Model",
                        default="lis-gpt2-model")
    parser.add_argument("--register_deploy_link",
                        type=str,
                        help="register_deploy_link",
                        default="register_deploy_link")

   
    args = parser.parse_args()
    model_dir = args.model_dir
    model_name = args.model_name

    # Get the experiment run context
    run = Run.get_context()

    # load the model
    print("Loading model from " + model_dir)
    model_file = os.path.join(model_dir, "tf_model.h5")
    model_config_file = os.path.join(model_dir, "config.json")

    # Get metrics for registration
    metrics = run.parent.get_metrics()

    # Register the model
    run.upload_file("outputs/tf_model.h5", model_file)
    run.upload_file("outputs/config.json", model_config_file)
    
    run.register_model(
        model_path="outputs/",
        model_name=model_name,
        tags=metrics)

    run.complete()


if __name__ == '__main__':
    main()

Overwriting lis_components/register/register.py


# Step 3: deploy.py

This step needs two scripts:

A script deploy.py that will use azureml api to deploy an ACIservice. 

A script score.py that will be used by the service to perform inference.

In [6]:
%%writefile $components_dir/deploy/score.py

import json
from transformers import TFGPT2LMHeadModel
from transformers import GPT2Tokenizer
from azureml.core.model import Model

# Called when the service is loaded
def init():
    ## TODO
    global model, tokenizer
    # Get the path to the deployed model file and load it
    model_path = Model.get_model_path('lis-gpt2-model')    
    model = TFGPT2LMHeadModel.from_pretrained(model_path)
    tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

# Called when a request is received
def run(raw_data):

    input_ids = tokenizer.encode(json.loads(raw_data)['data'], return_tensors='tf')

    generated_text_samples = model.generate(
        input_ids, 
        max_length=30,  
        num_return_sequences=5,
        no_repeat_ngram_size=2,
        do_sample=True,
        early_stopping=True
    )

    json_output = {}
    for i, beam in enumerate(generated_text_samples):
        json_output[i+1] = tokenizer.decode(beam, skip_special_tokens=True)
        
    # Return the predictions as JSON
    return json.dumps(json_output)

Overwriting lis_components/deploy/score.py


In [7]:
%%writefile $components_dir/deploy/deploy.py

from azureml.core.webservice import AciWebservice
from azureml.core.model import InferenceConfig
from azureml.core import Run
from azureml.core.model import Model

import argparse
import os

def main():
    # Get parameters
    parser = argparse.ArgumentParser()
    parser.add_argument("--service_name",
                        type=str,
                        help="Name of the Web Service",
                        default="lis-gpt2-webservice")
    parser.add_argument("--model_name",
                        type=str,
                        help="Name of the registered model name",
                        default="lis-gpt2-model")
    parser.add_argument("--cpu_cores",
                        type=int,
                        help="CPU reserve capacity",
                        default=1)
    parser.add_argument("--memory_gb",
                        type=float,
                        help="Memory reserve capacity",
                        default=2)
    parser.add_argument("--register_deploy_link",
                        type=str,
                        help="register_deploy_link",
                        default="register_deploy_link")
    args = parser.parse_args()
    service_name = args.service_name
    model_name = args.model_name
    cpu_cores = args.cpu_cores
    memory_gb = args.memory_gb    
    components_dir = "lis_components"
    
    # Configure the scoring environment
    inference_config = InferenceConfig(runtime= "python",
                                       entry_script=os.path.join(components_dir, "deploy", "score.py"),
                                       conda_file=os.path.join(components_dir, "dependencies_scoring.yml"))

    deployment_config = AciWebservice.deploy_configuration(cpu_cores = cpu_cores, memory_gb = memory_gb)
    
    # Get the experiment run context
    run = Run.get_context()
    ws = run.experiment.workspace
    
    model = ws.models[model_name]
    print(model.name, 'version', model.version)

    service = Model.deploy(ws, service_name, [model], inference_config, deployment_config)

    service.wait_for_deployment(True)
    
    print(service.state)
    
if __name__ == '__main__':
    main()

Overwriting lis_components/deploy/deploy.py


# Create dependencies.yml

Define an environment YAML file with the components steps script dependencies and create an Azure ML environment for the pipeline.

In [8]:
%%writefile $components_dir/dependencies.yml

name: lis_env
    
dependencies:
  # The python interpreter version.
  # Currently Azure ML Workbench only supports 3.5.2 and later.
  - python=3.6.9
  - pip

  - pip:
      - transformers == 3.5.1
      - datasets == 1.10.2
      - tensorflow == 2.5.0
      - azureml-defaults==1.30.0

Overwriting lis_components/dependencies.yml


# Create dependencies_scoring.yml

Define an environment YAML file with dependencies for the web service.

In [9]:
%%writefile $components_dir/dependencies_scoring.yml

name: lis_env_scoring
    
dependencies:
  # The python interpreter version.
  # Currently Azure ML Workbench only supports 3.5.2 and later.
  - python=3.6.9
  - pip

  - pip:
      - transformers == 3.5.1
      - tensorflow == 2.5.0
      - azureml-defaults==1.30.0

Overwriting lis_components/dependencies_scoring.yml


# Create an Azure Machine Learning Pipeline to Run the Scripts as a Pipeline¶


In [10]:
import azureml.core
from azureml.core import Workspace

# Load the workspace
ws = Workspace.from_config()

## Prepare a Compute Environment for the Pipeline Steps

Create a compute target for training your model. We use Azure ML managed compute (AmlCompute) for remote compute resource.

In [11]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "pipeline-cluster"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_NC6', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


# Prepare Pipeline Envirnoment

Prepare Pipeline environment, python dependencies & compute used by aml pipeline.

In [12]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the pipeline experiment
pipeline_environment = Environment.from_conda_specification(name = 'pipeline-env', 
                                                          file_path = os.path.join(components_dir, 
                                                                                   "dependencies.yml"))
pipeline_environment.python.user_managed_dependencies = False # Let Azure ML manage dependencies
pipeline_environment.docker.enabled = True # Use a docker container

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = pipeline_environment

print ("Pipeline configuration created.")

'enabled' is deprecated. Please use the azureml.core.runconfig.DockerConfiguration object with the 'use_docker' param instead.


Pipeline configuration created.


In [13]:
from azureml.pipeline.core.graph import PipelineParameter
from azureml.pipeline.core import PipelineData
from azureml.data import OutputFileDatasetConfig
from azureml.data.data_reference import DataReference
from azureml.pipeline.steps import PythonScriptStep


# train_file, eval_file and output_file are passed as a datastore path between steps
train_datastore_path = DataReference(data_reference_name = "train_datastore_path", datastore=ws.datastores['lis_artifacts'], path_on_datastore = "data/train.csv")
eval_datastore_path = DataReference(data_reference_name = "eval_datastore_path", datastore=ws.datastores['lis_artifacts'], path_on_datastore = "data/eval.csv")
output_datastore_path = OutputFileDatasetConfig("output_datastore_path", destination=(ws.datastores['lis_artifacts'], "outputs")).as_mount()

num_train_epochs_param = PipelineParameter(name="num_train_epochs", default_value=3)

aml_model_name_param = PipelineParameter(name="aml_model_name", default_value="lis-gpt2-model")
register_deploy_link = PipelineData("register_deploy_link")

aml_service_name_param = PipelineParameter(name="aml_service_name", default_value="lis-gpt2-serviceapp")
cpu_cores_param = PipelineParameter(name="cpu_cores", default_value=1)
memory_gb_param = PipelineParameter(name="memory_gb", default_value=2)
    
    
# Create Step 1, which runs the PythonScriptStep to train / finetune
train_step = PythonScriptStep(name = "Train",
                                source_directory = ".",
                                script_name = os.path.join(components_dir, "train/train.py"),
                                arguments=['--model_name', "gpt2", 
                                           '--output_dir', output_datastore_path,
                                           '--train_file', train_datastore_path,
                                           '--eval_file', eval_datastore_path,
                                           '--num_train_epochs', num_train_epochs_param,
                                           '--max_train_samples', 8,
                                           '--max_eval_samples', 8,
                                           #'--per_gpu_train_batch_size', 8,  
                                           #'--per_gpu_eval_batch_size', 8, 
                                           #'--fp16'
                                          ],
                                inputs=[train_datastore_path, eval_datastore_path],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = False)

# Create Step 2, which runs the PythonScriptStep to register model
register_step = PythonScriptStep(name = "Register",
                                source_directory = ".",
                                script_name = os.path.join(components_dir, "register/register.py"),
                                arguments=['--model_name', aml_model_name_param, 
                                           '--model_dir', output_datastore_path.as_input(),
                                           '--register_deploy_link', register_deploy_link
                                          ],
                                outputs=[register_deploy_link],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = False)

# Create Step 3, which runs the PythonScriptStep to deploy model as a web service
deploy_step = PythonScriptStep(name = "Deploy",
                                source_directory = ".",
                                script_name = os.path.join(components_dir, "deploy/deploy.py"),
                                arguments=['--service_name', aml_service_name_param, 
                                           '--model_name', aml_model_name_param, 
                                           '--cpu_cores', cpu_cores_param,
                                           '--memory_gb', memory_gb_param,
                                           '--register_deploy_link', register_deploy_link
                                          ],
                                inputs=[register_deploy_link],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = False)


In [14]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [train_step, register_step, deploy_step]
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

Pipeline is built.


In [15]:
# Create an experiment and run the pipeline

experiment = Experiment(workspace = ws, name = 'lis-ct-pipeline')

pipeline_parameters = {"num_train_epochs": 1,
                       "aml_model_name": "lis-gpt2-model",
                       "aml_service_name": "lis-gpt2-serviceapp",
                       "cpu_cores": 1,
                       "memory_gb": 2,                       
                       }

pipeline_run = experiment.submit(pipeline, 
                                 pipeline_parameters=pipeline_parameters)

print("Pipeline submitted for execution.")
pipeline_run.wait_for_completion(show_output=True)

Created step Train [17b50458][9d96330e-3873-4610-83b7-957b4016babf], (This step will run and generate new outputs)
Created step Register [ab43254f][ff9b1db8-f8bf-4c25-8b71-70de214e2b41], (This step will run and generate new outputs)
Created step Deploy [25177c8c][0ac4e4a0-8edc-497f-a572-a64a8ac862b6], (This step will run and generate new outputs)
Using data reference train_datastore_path for StepId [7bea0512][ab4bddc8-5eb4-4709-ad5c-dc555df7d52f], (Consumers of this data are eligible to reuse prior runs.)
Using data reference eval_datastore_path for StepId [e56fbf22][197693d2-331a-4bf3-a2b4-c18bf5d4bb63], (Consumers of this data are eligible to reuse prior runs.)
Submitted PipelineRun f29233bb-ae00-4b8c-bf50-7e1cd833cd29
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/f29233bb-ae00-4b8c-bf50-7e1cd833cd29?wsid=/subscriptions/2f091423-f84d-4062-8e67-1437a0c50045/resourcegroups/lis/workspaces/lis-ml&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
Pipeline submitted for execution

'Finished'

# Consume Web Service

In [19]:
import urllib.request
import json
import os
import ssl

def allowSelfSignedHttps(allowed):
    # bypass the server certificate verification on client side
    if allowed and not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None):
        ssl._create_default_https_context = ssl._create_unverified_context

allowSelfSignedHttps(True) # this line is needed if you use self-signed certificate in your scoring service.

# Request data goes here
data = {"data":"Hello"
}

body = str.encode(json.dumps(data))

url = 'http://f87c9bb3-aded-45e3-99d8-e9de9bad63f8.westeurope.azurecontainer.io/score'
api_key = '' # Replace this with the API key for the web service
headers = {'Content-Type':'application/json', 'Authorization':('Bearer '+ api_key)}

req = urllib.request.Request(url, body, headers)

try:
    response = urllib.request.urlopen(req)

    result = response.read()
    print(json.loads(json.loads(result)))
except urllib.error.HTTPError as error:
    print("The request failed with status code: " + str(error.code))

    # Print the headers - they include the requert ID and the timestamp, which are useful for debugging the failure
    print(error.info())
    print(json.loads(error.read().decode("utf8", 'ignore')))

{'1': 'Hello have a fantastic day @Nagpur."\n\nAt the time, she was on vacation in Switzerland.\n.@NagaKi', '2': 'Hello, what a shame, if my brother is in the hospital? Are you still the kind of guy who gives us hope?" Marge giggled', '3': 'Hello\n\nGreetings\n', '4': 'Hello and thank you all for your support!!!', '5': "Hello in the past. I am glad if your name wasn't mentioned in this comment. This guy started with my wife's name and he's done"}
