In [1]:
#transformers 4.6.1
#pip freeze | cut -d'=' -f1 | xargs -n1 pip install -U
#nvidia-smi
#torch.cuda.is_available()
#torch.cuda.empty_cache()
import nltk
#nltk.download('punkt')
import numpy as np
#import pandas as pd
import torch
import argparse
import mlflow
#import azureml.core
#import logging
#import sys
#import time
import os
from datasets import load_from_disk, load_metric
from transformers.integrations import MLflowCallback, AzureMLCallback
from transformers import (
    AutoTokenizer,
    AutoModelForSeq2SeqLM,
    Seq2SeqTrainingArguments,
    Seq2SeqTrainer,
    DataCollatorForSeq2Seq,
    #set_seed,
    AutoConfig,
    #HfArgumentParser
)

try:
    nltk.data.find("tokenizers/punkt")
except (LookupError, OSError):
    nltk.download("punkt", quiet=True)

In [19]:
class MLflowNoParamCallback(MLflowCallback):
    """
    A :class:`~transformers.TrainerCallback` that sends the logs to `MLflow <https://www.mlflow.org/>`__.
    """
    def setup(self, args, state, model):
        log_artifacts = os.getenv("HF_MLFLOW_LOG_ARTIFACTS", "FALSE").upper()
        if log_artifacts in {"TRUE", "1"}:
            self._log_artifacts = True
        if state.is_world_process_zero:
            self._ml_flow.start_run()
            combined_dict = args.to_dict()
            """
            if hasattr(model, "config") and model.config is not None:
                model_config = model.config.to_dict()
                combined_dict = {**model_config, **combined_dict}
            """
            # remove params that are too long for MLflow
            for name, value in list(combined_dict.items()):
                # internally, all values are converted to str in MLflow
                if len(str(value)) > self._MAX_PARAM_VAL_LENGTH:
                    logger.warning(
                        f"Trainer is attempting to log a value of "
                        f'"{value}" for key "{name}" as a parameter. '
                        f"MLflow's log_param() only accepts values no longer than "
                        f"250 characters so we dropped this attribute."
                    )
                    del combined_dict[name]
            # MLflow cannot log more than 100 values in one go, so we have to split it
            combined_dict_items = list(combined_dict.items())
            for i in range(0, len(combined_dict_items), self._MAX_PARAMS_TAGS_PER_BATCH):
                self._ml_flow.log_params(dict(combined_dict_items[i : i + self._MAX_PARAMS_TAGS_PER_BATCH]))
        self._initialized = True

In [None]:
# HfArgumentParser wip
parser = argparse.ArgumentParser(description="HuggingFace Trainer (WIP)")
parser.add_argument(
    "--data",
    type=str,
    default=None,
    help="data path")
# cmd args
args = parser.parse_args()

In [None]:
# download model (cache)
#model_name = "google/pegasus-large"
#model = AutoModelForSeq2SeqLM.from_pretrained(model_name, cache_dir = "./cache")
#tokenizer = AutoTokenizer.from_pretrained(model_name, cache_dir = "./cache")

# save model
#model.save_pretrained("../model/hf-pegasus")
#tokenizer.save_pretrained("../model/hf-pegasus")

In [16]:
# td: model -> workspace blob storage -> mount -> compute
model_path = "../model/hf-pegasus/"
device = "cuda" if torch.cuda.is_available() else "cpu"
config = AutoConfig.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForSeq2SeqLM.from_pretrained(model_path, config=config).to(device)

In [8]:
# td? FileSystems integration for cloud storage (abfs for Azure Blob service)
# load dataset from workspace
#data = load_from_disk(args.data, keep_in_memory=True)

data = load_from_disk("../data/xsum", keep_in_memory=True)
data

DatasetDict({
    train: Dataset({
        features: ['document', 'id', 'summary'],
        num_rows: 204045
    })
    validation: Dataset({
        features: ['document', 'id', 'summary'],
        num_rows: 11332
    })
    test: Dataset({
        features: ['document', 'id', 'summary'],
        num_rows: 11334
    })
})

In [9]:
# train/eval/test sets
# max samples wip
train_data = data["train"].select(range(1000, 1004))
eval_data = data["validation"].select(range(1000, 1004))
test_data = data["test"].select(range(1000, 1004))
train_data[0]

{'document': 'Media playback is not supported on this device\nLee, 25, made a course-record 10-under-par 62 in the first round, and is ahead of China\'s Shanshan Feng and Thailand\'s Ariya Jutanugarn who are tied second.\nScotland\'s Catriona Matthew scored a stunning second-round 65 to go nine under par, two shots behind leader Lee.\nEngland\'s Charley Hull is tied in 10th place on five under at Woburn.\nPlaying at her home event, Hull made five birdies between holes seven and 12, but was put on the clock from 13 to 16 because of slow play.\nShe told BBC Sport: "It was raining so I had to rush and bogeyed the 16th hole, it is stuff you have to deal with.\n"I am usually a fast player but happy with the way I came through the middle part."\nMeanwhile, England\'s Bronte Law - the leading amateur who has had to borrow her clubs - carded consecutive rounds of 70 for a place in tied-17th position on four under par.\nNew Zealand\'s world number one Lydia Ko made the cut on equal par after a 

In [10]:
# max sequence length
# (xsum: 512, 56), (reddit-tifu: 512, 128)
max_source_length = 512
max_target_length = 56

# td parse: source & target columns
def preprocess_function(examples):
    inputs = [doc for doc in examples["document"]]
    model_inputs = tokenizer(inputs, max_length=max_source_length, truncation=True)

    with tokenizer.as_target_tokenizer():
        labels = tokenizer(examples["summary"], max_length=max_target_length, truncation=True)

    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

tokenized_train = train_data.map(preprocess_function, batched=True)
tokenized_eval = eval_data.map(preprocess_function, batched=True)
tokenized_test = test_data.map(preprocess_function, batched=True)

len(tokenized_train)

  0%|          | 0/1 [00:00<?, ?ba/s]

  0%|          | 0/1 [00:00<?, ?ba/s]

  0%|          | 0/1 [00:00<?, ?ba/s]

4

In [11]:
# task performance metric
metric = load_metric("rouge")
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)

    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
    
    # rouge sentence newline formatting (data post processing)
    decoded_preds = ["\n".join(nltk.sent_tokenize(pred.strip())) for pred in decoded_preds]
    decoded_labels = ["\n".join(nltk.sent_tokenize(label.strip())) for label in decoded_labels]
    
    result = metric.compute(predictions=decoded_preds, references=decoded_labels, use_stemmer=True)
    result = {key: value.mid.fmeasure * 100 for key, value in result.items()}
    
    # mean summary length metric
    prediction_lens = [np.count_nonzero(pred != tokenizer.pad_token_id) for pred in predictions]
    result["sum_len"] = np.mean(prediction_lens)
    
    return {k: round(v, 4) for k, v in result.items()}

metric

Metric(name: "rouge", features: {'predictions': Value(dtype='string', id='sequence'), 'references': Value(dtype='string', id='sequence')}, usage: """
Calculates average rouge scores for a list of hypotheses and references
Args:
    predictions: list of predictions to score. Each predictions
        should be a string with tokens separated by spaces.
    references: list of reference for each prediction. Each
        reference should be a string with tokens separated by spaces.
    rouge_types: A list of rouge types to calculate.
        Valid names:
        `"rouge{n}"` (e.g. `"rouge1"`, `"rouge2"`) where: {n} is the n-gram based scoring,
        `"rougeL"`: Longest common subsequence based scoring.
        `"rougeLSum"`: rougeLsum splits text using `"
"`.
        See details in https://github.com/huggingface/datasets/issues/617
    use_stemmer: Bool indicating whether Porter stemmer should be used to strip word suffixes.
    use_agregator: Return aggregates if this is set to True
Retu

In [12]:
# For optimization, both pre-training and fine-tuning used Adafactor (Shazeer & Stern, 2018) with square root learning rate decay and dropout rate of 0.1.
#num_beams = 8, # PEGASUS -> 8
#gradient_accumulation_steps = (256/batch_size/nodes) # PEGASUS effective batch size -> 256

# for early stopping callback
#load_best_model_at_end = True,
#metric_for_best_model = "eval_rouge2",
#greater_is_better = True,

#freeze_embeds = True,
batch_size = 4
train_args = Seq2SeqTrainingArguments(
    per_device_train_batch_size = batch_size,
    per_device_eval_batch_size = batch_size,
    gradient_accumulation_steps = 256 / batch_size,
    adafactor = True, # PEGASUS -> Adafactor
    fp16 = False, # hf-PEGASUS ONLY FP32
    label_smoothing_factor = 0.1,
    weight_decay = 0.01,
    learning_rate = 0.0001, # (1e-4) -> 256 effective batch size
    num_train_epochs = 1.0,
    evaluation_strategy = "epoch",
    logging_strategy = "steps",
    logging_steps = 1,
    logging_dir='./logs',
    save_strategy = "epoch",
    save_total_limit = 1,
    output_dir = "./outputs/pegasus-dev",
    overwrite_output_dir = True,
    predict_with_generate = True,
    #do_eval = True,
    do_train = True,
    do_predict = True
)

In [13]:
#optimizers=[torch.optim.Optimizer, torch.optim.lr_scheduler.LambdaLR]
#[torch.optim.Adam(params=model.parameters(), lr=args.learning_rate), None]
data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)

trainer = Seq2SeqTrainer(
    model,
    #optimizers = optimizers,
    args = train_args,
    train_dataset = tokenized_train,
    eval_dataset = tokenized_eval,
    data_collator = data_collator,
    tokenizer = tokenizer,
    compute_metrics = compute_metrics
)

# add EarlyStoppingCallback
#from transformers import EarlyStoppingCallback
#trainer.add_callback(EarlyStoppingCallback)

In [None]:
# azureml-mlflow param overflow
trainer.add_callback(MLflowNoParamCallback)
trainer.remove_callback(MLflowCallback)
#trainer.remove_callback(AzureMLCallback)

In [23]:
# for dev testing
#torch.cuda.empty_cache()
mlflow.end_run()

In [14]:
# evaluate before fine-tuning
trainer.evaluate(
    max_length = max_target_length, #data_args.val_max_target_length
    metric_key_prefix = "eval",
    num_beams = 8 #data_args.num_beams
)

Attempted to log scalar metric eval_loss:
4.544429779052734
Attempted to log scalar metric eval_rouge1:
23.1517
Attempted to log scalar metric eval_rouge2:
7.2789
Attempted to log scalar metric eval_rougeL:
17.3721
Attempted to log scalar metric eval_rougeLsum:
18.0478
Attempted to log scalar metric eval_sum_len:
42.25
Attempted to log scalar metric eval_runtime:
125.9204
Attempted to log scalar metric eval_samples_per_second:
0.032


{'eval_loss': 4.544429779052734,
 'eval_rouge1': 23.1517,
 'eval_rouge2': 7.2789,
 'eval_rougeL': 17.3721,
 'eval_rougeLsum': 18.0478,
 'eval_sum_len': 42.25,
 'eval_runtime': 125.9204,
 'eval_samples_per_second': 0.032,
 'init_mem_cpu_alloc_delta': 8192,
 'init_mem_cpu_peaked_delta': 0,
 'eval_mem_cpu_alloc_delta': 139485184,
 'eval_mem_cpu_peaked_delta': 321056768}

In [16]:
# fine-tune
trainer.train()

Trainer is attempting to log a value of "{'summarization_aeslc': {'length_penalty': 0.6, 'max_length': 32, 'max_position_embeddings': 512}, 'summarization_arxiv': {'length_penalty': 0.8, 'max_length': 256, 'max_position_embeddings': 1024}, 'summarization_big_patent': {'length_penalty': 0.7, 'max_length': 256, 'max_position_embeddings': 1024}, 'summarization_billsum': {'length_penalty': 0.6, 'max_length': 256, 'max_position_embeddings': 1024}, 'summarization_cnn_dailymail': {'length_penalty': 0.8, 'max_length': 128, 'max_position_embeddings': 1024}, 'summarization_gigaword': {'length_penalty': 0.6, 'max_length': 32, 'max_position_embeddings': 128}, 'summarization_large': {'length_penalty': 0.8, 'max_length': 256, 'max_position_embeddings': 1024}, 'summarization_multi_news': {'length_penalty': 0.8, 'max_length': 256, 'max_position_embeddings': 1024}, 'summarization_newsroom': {'length_penalty': 0.8, 'max_length': 128, 'max_position_embeddings': 512}, 'summarization_pubmed': {'length_pena

Epoch,Training Loss,Validation Loss,Rouge1,Rouge2,Rougel,Rougelsum,Sum Len
1,0.0741,4.351368,22.8346,6.9813,16.8227,18.1157,47.0


Attempted to log scalar metric loss:
0.0741
Attempted to log scalar metric learning_rate:
0.0
Attempted to log scalar metric epoch:
1.0
Attempted to log scalar metric eval_loss:
4.351367950439453
Attempted to log scalar metric eval_rouge1:
22.8346
Attempted to log scalar metric eval_rouge2:
6.9813
Attempted to log scalar metric eval_rougeL:
16.8227
Attempted to log scalar metric eval_rougeLsum:
18.1157
Attempted to log scalar metric eval_sum_len:
47.0
Attempted to log scalar metric eval_runtime:
554.8311
Attempted to log scalar metric eval_samples_per_second:
0.007
Attempted to log scalar metric epoch:
1.0
Attempted to log scalar metric train_runtime:
649.8689
Attempted to log scalar metric train_samples_per_second:
0.002
Attempted to log scalar metric total_flos:
0
Attempted to log scalar metric epoch:
1.0


TrainOutput(global_step=1, training_loss=0.07407309859991074, metrics={'train_runtime': 649.8689, 'train_samples_per_second': 0.002, 'total_flos': 0, 'epoch': 1.0, 'init_mem_cpu_alloc_delta': 0, 'init_mem_cpu_peaked_delta': 0, 'train_mem_cpu_alloc_delta': 4480581632, 'train_mem_cpu_peaked_delta': 4798541824})

In [None]:
# simple dev test
results = trainer.predict(
    tokenized_test,
    max_length = max_target_length,
    num_beams = 8)

In [None]:
# eval
if train_args.do_eval:
    #logger.info("*** Evaluate ***")

    metrics = trainer.evaluate(
        max_length = max_target_length, #data_args.val_max_target_length
        metric_key_prefix = "eval",
        num_beams = 8 #data_args.num_beams
    )
    metrics["eval_samples"] = len(tokenized_eval)
    #max_eval_samples = data_args.max_eval_samples if data_args.max_eval_samples is not None else len(eval_dataset)
    #metrics["eval_samples"] = min(max_eval_samples, len(eval_dataset))

    trainer.log_metrics("eval", metrics)
    trainer.save_metrics("eval", metrics)

In [None]:
# train wip
if train_args.do_train:
    checkpoint = None
    if train_args.resume_from_checkpoint is not None:
        checkpoint = train_args.resume_from_checkpoint
    elif last_checkpoint is not None:
        checkpoint = last_checkpoint
    train_result = trainer.train(resume_from_checkpoint=checkpoint)
    trainer.save_model()

    metrics = train_result.metrics
    metrics["train_samples"] = len(tokenized_train)
    #max_train_samples = data_args.max_train_samples if data_args.max_train_samples is not None else len(train_dataset)
    #metrics["train_samples"] = min(max_train_samples, len(train_dataset))

    trainer.log_metrics("train", metrics)
    trainer.save_metrics("train", metrics)
    trainer.save_state()

In [22]:
# inference summary generation
if train_args.do_predict:
    #logger.info("*** Predict ***")

    results = trainer.predict(
        tokenized_test,
        metric_key_prefix = "test",
        max_length = max_target_length, #data_args.val_max_target_length
        num_beams = 8 #data_args.num_beams
    )
    metrics = results.metrics
    metrics["test_samples"] = len(tokenized_test)
    #max_predict_samples = data_args.max_predict_samples if data_args.max_predict_samples is not None else len(tokenized_test)
    #metrics["predict_samples"] = min(max_predict_samples, len(tokenized_test))

    trainer.log_metrics("test", metrics)
    trainer.save_metrics("test", metrics)

    # output text summaries
    if trainer.is_world_process_zero():
        if train_args.predict_with_generate:
            summary_texts = tokenizer.batch_decode(
                results.predictions, skip_special_tokens=True, clean_up_tokenization_spaces=True # docs
            )
            summary_texts = [text.strip() for text in summary_texts] # necessary.?
            summary_file = os.path.join(train_args.output_dir, "generated_summaries.txt")
            with open(summary_file, "w") as writer:
                writer.write("\n".join(summary_texts))

In [None]:
# track runtime
#start_time = time.time()
# action
#print(f"action took {(time.time() - start_time) / 60:.2f}mn")

mlflow.exceptions.RestException: BAD_REQUEST: Response: {'Error': {'Code': 'UserError', 'Severity': None, 'Message': 'A field of the entity is over the size limit. FieldName=Parameters, Limit=100, Size=148. See https://aka.ms/azure-machine-learning-limits for service limits documentation.', 'MessageFormat': None, 'MessageParameters': None, 'ReferenceCode': None, 'DetailsUri': None, 'Target': None, 'Details': [], 'InnerError': None, 'DebugInfo': None}, 'Correlation': {'operation': '97dc438dde54304987b7615e3ff523fd', 'request': '17d9cfdbe83ad14a'}, 'Environment': 'eastus', 'Location': 'eastus', 'Time': '2021-04-30T16:16:38.6563505+00:00', 'ComponentName': 'mlflow', 'error_code': 'BAD_REQUEST'}

In [None]:
#!az ml job create --file hf-train.yml

# https://github.com/google-research/pegasus/blob/939830367bcf411193d2b5eca2f2f90f3f9260ca/pegasus/params/public_params.py
# adafactor optimizer blocked (pegasus, t5)
# deepspeed & fp16 blocked (pegasus)
#sshleifer/distill-pegasus-xsum-16-4
#sshleifer/distill-pegasus-xsum-16-8
#sshleifer/distill-pegasus-xsum-12-12
#google/pegasus-xsum