In [1]:
import os
import time
import torch
import random
import logging
import argparse

import numpy as np 
import pandas as pd
import pytorch_lightning as pl

from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

from transformers import (
    AdamW,
    AutoConfig,
    AutoModelForSeq2SeqLM,
    AutoTokenizer,
    get_linear_schedule_with_warmup,
)

In [2]:
logging.basicConfig(level = logging.INFO)

In [3]:
logger = logging.getLogger(__name__)

### Load Dataset

In [4]:
train = pd.read_csv('train.csv').dropna()
test = pd.read_csv('test.csv')

train, val = train_test_split(train, test_size=0.13, random_state=42)

INFO:numexpr.utils:Note: NumExpr detected 24 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:NumExpr defaulting to 8 threads.


In [5]:
# Input
for a,b,_ in zip(train.sentiment.values[:10], train.text.values[:10], train.selected_text.values[:10]):
    print("sentiment:", a, "tweet:", b)

sentiment: negative tweet:  How did we just get paid and still be broke as hell?! No shopping spree for me today
sentiment: positive tweet: i no i no bt i had only been a gamer for like 2 years when i made that attempt  lol yea i luvd F1 to an extent 
sentiment: positive tweet: I love when my ipod shuffles so all the good songs are all together
sentiment: neutral tweet:  no i mean 2moz. I`m workin` 7-1 in a bakers then 6-4 later in a pub
sentiment: positive tweet: Lovely walk this morning with the missus; drizzle didn`t matter
sentiment: neutral tweet:  , just dont understand what`s it got to do with me. I`m just a nice girl
sentiment: negative tweet: getting bored of walking up and down the stairs
sentiment: positive tweet:  have your own style. it just might work.
sentiment: negative tweet: fighting with mum on mothers day
sentiment: neutral tweet:  & I got too much work to do


In [6]:
# Target
for _,_,c in zip(train.sentiment.values[:10], train.text.values[:10], train.selected_text.values[:10]):
    print(c)

broke as hell?!
luvd
love
no i mean 2moz. I`m workin` 7-1 in a bakers then 6-4 later in a pub
Lovely walk this morning with the missus; drizzle didn`t matter
, just dont understand what`s it got to do with me. I`m just a nice girl
getting bored of walking up and down the stairs
it just might work.
fighting
I got too much work to do


### Preprocess Dataset

In [7]:
# Append EOS token to target text, This is the standard format for T5 targets
train['selected_text'] = train['selected_text'] + ' </s>'
val['selected_text'] = val['selected_text'] + ' </s>'

# Apply Q&A structure
# From Appendix D in the T5 paper
processed_input_train = ("question: " + train.sentiment + " context: " + train.text)
processed_input_test = ("question: " + test.sentiment + " context: " + test.text)
processed_input_val = ("question: " + val.sentiment + " context: " + val.text)

# Save data as string separated by \n (new line)
processed_input_str_train = '\n'.join(processed_input_train.values.tolist())
processed_input_str_test = '\n'.join(processed_input_test.values.tolist())
selected_text_str_train = '\n'.join(train['selected_text'].values.tolist())
processed_input_str_val = '\n'.join(processed_input_val.values.tolist())
selected_text_str_val = '\n'.join(val['selected_text'].values.tolist())

In [8]:
processed_input_train[0], train['selected_text'][0]

('question: neutral context:  I`d have responded, if I were going',
 'I`d have responded, if I were going </s>')

In [9]:
processed_input_test[0]

'question: neutral context: Last session of the day  http://twitpic.com/67ezh'

In [10]:
with open('train.source', 'w') as f:
    f.write(processed_input_str_train)

with open('test.source', 'w') as f:
    f.write(processed_input_str_test)
    
with open('val.source', 'w') as f:
    f.write(processed_input_str_val)

In [11]:
with open('train.target', 'w') as f:
    f.write(selected_text_str_train)
    
with open('val.target', 'w') as f:
    f.write(selected_text_str_val)

### Prep the T5 Dataset

In [12]:
def encode_file(tokenizer, data_path, max_length, padding='max_length', return_tensors="pt"):
    """
    This function reads the text files that we prepared and returns them in tokenized form.

    Actually tokenizer.batch_encode_plus returns these as a list of dictionaries where 
    each dictionary contains the word piece indices among other relevant inputs for training & inference
    """
    examples = []
    with open(data_path, "r") as f:
        for text in f.readlines():
            tokenized = tokenizer.batch_encode_plus(
                [text], max_length=max_length, padding=padding, return_tensors=return_tensors,
            )
            examples.append(tokenized)
    return examples

In [13]:
class T5Dataset(Dataset):
    """
    This is the T5 dataset that can read our train, test, and dev files separately

    This was patterned after the SummarizationDataset from the `transformer` library's 
    summarization example (compatible with T5)
    """
    def __init__(
        self,
        tokenizer,
        data_dir="./",
        type_path="train",
        max_source_length=1024,
        max_target_length=56,
    ):
        super().__init__()
        # Store the tokenizer
        self.tokenizer = tokenizer
        self.type_path = type_path
        # Read the source and target files for the type of file (train, test, or val)
        self.source = encode_file(tokenizer, os.path.join(data_dir, type_path + ".source"), max_source_length)
        self.target = None
        if self.type_path != "test":
            self.target = encode_file(tokenizer, os.path.join(data_dir, type_path + ".target"), max_target_length)

    def __len__(self):
        return len(self.source)

    def __getitem__(self, index):
        # Return example as a dictionary containing source_ids, src_mask, and target_ids
        source_ids = self.source[index]["input_ids"].squeeze() # (1024,)
        # We need masks for transformers to:
        # 1) ignore padding for both the encoder and decoder stages (src_mask)
        # 2) ignore future tokens at the decoder stage
        src_mask = self.source[index]["attention_mask"].squeeze()

        if self.type_path == "test":
            return {"source_ids": source_ids, "source_mask": src_mask}

        target_ids = self.target[index]["input_ids"].squeeze() # (56, )
        return {"source_ids": source_ids, "source_mask": src_mask, "target_ids": target_ids}

    def collate_fn(self, batch):
        """
        The tensors are stacked together as they are yielded.

        Collate function is applied to the output of a DataLoader as it is yielded.
        """
        input_ids = torch.stack([x["source_ids"] for x in batch]) # BS x SL
        masks = torch.stack([x["source_mask"] for x in batch]) # BS x SL
        pad_token_id = self.tokenizer.pad_token_id
        if self.type_path == "test":
            return {"source_ids": source_ids, "source_mask": source_mask}

        target_ids = torch.stack([x["target_ids"] for x in batch]) # BS x SL
        return {"source_ids": input_ids, "source_mask": masks, "target_ids": target_ids}

### Model

In [14]:
def set_seed(args: argparse.Namespace):
    """
    Set all the seeds to make results replicable
    """
    random.seed(args.seed)
    np.random.seed(args.seed)
    torch.manual_seed(args.seed)
    if args.n_gpu > 0:
        torch.cuda.manual_seed_all(args.seed)

def jaccard(str1, str2): 
    a = set(str1.lower().split()) 
    b = set(str2.lower().split())
    c = a.intersection(b)
    return float(len(c)) / (len(a) + len(b) - len(c))

In [15]:
class T5Module(pl.LightningModule):
    """
    Base Transformer model that uses Pytorch Lightning as a PyTorch wrapper.

    T5 specific methods are implemented in T5Trainer
    """
    def __init__(self, hparams: argparse.Namespace, **config_kwargs):
        "Initialize a model."

        super().__init__()
        self.hparams = hparams
        cache_dir = self.hparams.cache_dir if self.hparams.cache_dir else None
        self.config = AutoConfig.from_pretrained(self.hparams.model_name_or_path)
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.hparams.model_name_or_path,
            cache_dir=cache_dir,
        )
        self.model = AutoModelForSeq2SeqLM.from_pretrained(
            self.hparams.model_name_or_path,
            from_tf=bool(".ckpt" in self.hparams.model_name_or_path), # Checkpoint is a TF format
            config=self.config,
            cache_dir=cache_dir,
        )

        # Save dataset params
        self.dataset_kwargs: dict = dict(
            data_dir=self.hparams.data_dir,
            max_source_length=self.hparams.max_source_length,
            max_target_length=self.hparams.max_target_length,
        )

    # Forward function
    # Defines the forward pass of the module

    def forward(
        self,
        input_ids, 
        attention_mask=None, 
        decoder_input_ids=None, 
        labels=None
        ):
        """
         loss (torch.FloatTensor of shape (1,), optional, returned when lm_label is provided
        """
        
        return self.model(
            input_ids,
            attention_mask=attention_mask,
            decoder_input_ids=decoder_input_ids,
            labels=labels,
        )

    # Data preparation

    def get_dataloader(self, type_path: str, batch_size: int, \
                       shuffle: bool = False, num_workers: int = 24)-> DataLoader:
        dataset = T5Dataset(self.tokenizer, type_path=type_path, **self.dataset_kwargs)
        dataloader = DataLoader(dataset, batch_size=batch_size, collate_fn=dataset.collate_fn, shuffle=shuffle,
                               num_workers = num_workers)
        return dataloader

    def train_dataloader(self) -> DataLoader:
        dataloader = self.get_dataloader("train", batch_size=self.hparams.train_batch_size, shuffle=True)
        t_total = (
            (len(dataloader.dataset) // (self.hparams.train_batch_size * max(1, self.hparams.n_gpu)))
            // self.hparams.gradient_accumulation_steps
            * float(self.hparams.num_train_epochs)
        )
        scheduler = get_linear_schedule_with_warmup(
            self.opt, num_warmup_steps=self.hparams.warmup_steps, num_training_steps=t_total
        )
        self.lr_scheduler = scheduler
        return dataloader

    def val_dataloader(self) -> DataLoader:
        return self.get_dataloader("val", batch_size=self.hparams.eval_batch_size)

    def test_dataloader(self) -> DataLoader:
        return self.get_dataloader("test", batch_size=self.hparams.eval_batch_size)

    # Configure optimizers

    def configure_optimizers(self):
        "Prepare optimizer and schedule (linear warmup and decay)"

        model = self.model
        # Weight decay explanation:
        # Weight decay will not be applied to "bias" and "LayerNorm.weight" parameters
        # When training neural networks, it is common to use "weight decay," where after each update,
        # the weights are multiplied by a factor slightly less than 1.
        # This prevents the weights from growing too large, and can be seen as gradient descent 
        # on a quadratic regularization term.
        # https://metacademy.org/graphs/concepts/weight_decay_neural_networks
        no_decay = ["bias", "LayerNorm.weight"]

        # Group parameters to those that will and will not have weight decay applied
        optimizer_grouped_parameters = [
            {
                "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
                "weight_decay": self.hparams.weight_decay,
            },
            {
                "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
                "weight_decay": 0.0,
            },
        ]

        optimizer = AdamW(optimizer_grouped_parameters, lr=self.hparams.learning_rate,\
                          eps=self.hparams.adam_epsilon)
        self.opt = optimizer
        return [optimizer]

    # Forward pass and calculate loss per batch (step)

    def _step(self, batch, return_text=False):
        """
        Runs forward pass and calculates loss per batch. Applied for training_step, and validation_step
        """
        pad_token_id = self.tokenizer.pad_token_id
        source_ids, source_mask, y = batch["source_ids"], batch["source_mask"], batch["target_ids"]
        y_ids = y[:, :-1].contiguous()
        labels = y[:, 1:].clone()
        # Change pad_token_id to -100
        labels[y[:, 1:] == pad_token_id] = -100
        # Run forward pass and calculate loss
        outputs = self(source_ids, attention_mask=source_mask, decoder_input_ids=y_ids, labels=labels,)
        # Only get loss from the output since that's all we need to apply our optimizer
        loss = outputs[0]
        if return_text:
            target_text = [self.tokenizer.decode(ids) for ids in y_ids]
            return loss, target_text
        else:
            return loss

    # Step during training

    def training_step(self, batch, batch_idx):
        """
        Runs forward pass, calculates loss, and returns loss (and logs) in a dict
        """
        loss = self._step(batch)

        # Notice that each training step loss is recorded on tensorboard,
        # which makes sense since we're tracking loss per batch
        tensorboard_logs = {"train_loss": loss}
        return {"loss": loss, "log": tensorboard_logs}

    # Adjust weights based on calculated gradients and learning rate scheduler

    def optimizer_step(self, epoch, batch_idx, optimizer, optimizer_idx, second_order_closure=None):
        """
        Adjust weights based on calculated gradients + learning rate scheduler, and refresh gradients
        """
        optimizer.step()

        # Refresh gradients (to zero)
        optimizer.zero_grad()
        # Update the learning rate scheduler
        self.lr_scheduler.step()

    # Step during validation

    def validation_step(self, batch, batch_idx):
        """
        Runs forward pass, calculates loss, and returns loss in a dict
        """

        # Return source and target text to calculate jaccard score only for validation
        loss, target_text = self._step(batch, return_text=True)

        preds = self.test_step(batch, batch_idx)
        preds_text = preds["preds"]
        # Track jaccard score to get validation accuracy
        jaccard_score = [jaccard(p, t) for p, t in zip(preds_text, target_text)]

        return {"val_loss": loss, "jaccard_score": jaccard_score}

    # Show loss after validation

    def validation_end(self, outputs):
        """
        Calculate average loss for all the validation batches
        """
        avg_loss = torch.stack([x["val_loss"] for x in outputs]).mean()
        jaccard_scores = sum([x["jaccard_score"] for x in outputs], [])
        avg_jaccard_score = np.mean(jaccard_scores)
        tensorboard_logs = {"val_loss": avg_loss, "jaccard_score": avg_jaccard_score}
        return {"avg_val_loss": avg_loss, "avg_jaccard_score": avg_jaccard_score, "log": tensorboard_logs}

    # Step during testing

    def test_step(self, batch, batch_idx):
        """
        Runs forward pass on test set and returns calculated loss, predictions, and targets
        Note: this assumes that your test set has targets (doesn't have for kaggle).
        """
        pad_token_id = self.tokenizer.pad_token_id
        source_ids, source_mask = batch["source_ids"], batch["source_mask"]
        # NOTE: the following kwargs get more speed and lower quality summaries than those in evaluate_cnn.py
        # For the sentiment span extraction task, turning off early stopping proved superior
        generated_ids = self.model.generate(
            input_ids=source_ids,
            attention_mask=source_mask,
            num_beams=1,
            max_length=80,
            repetition_penalty=2.5,
            length_penalty=1.0,
            early_stopping=True,
            use_cache=True,
        )
        preds = [
            self.tokenizer.decode(g, skip_special_tokens=True, clean_up_tokenization_spaces=True)
            for g in generated_ids
        ]

        return {"preds": preds}

    # Note: we don't attempt to print the loss from the test set, 
    # because it's assumed that we don't have the test targets
    def test_end(self, outputs):
        """
        """
        preds = []
        for pred in outputs:
            preds += pred["preds"]
        return {"preds": preds}

    def test_epoch_end(self, outputs):
        """
        Save test predictions and targets as text files and return the calculated loss for the test set
        """
        output_test_predictions_file = os.path.join(self.hparams.output_dir, "test_predictions.txt")
        # write predictions and targets for later rouge evaluation.
        with open(output_test_predictions_file, "w+") as p_writer:
            for output_batch in outputs:
                p_writer.writelines(s + "\n" for s in output_batch["preds"])
            p_writer.close()

        return self.test_end(outputs)

    def get_tqdm_dict(self):
        """
        Print average loss and learning rate at each step
        """
        avg_loss = getattr(self.trainer, "avg_loss", 0.0)
        tqdm_dict = {"loss": "{:.3f}".format(avg_loss), "lr": self.lr_scheduler.get_last_lr()[-1]}
        return tqdm_dict

    def _feature_file(self, mode):
        return os.path.join(
            self.hparams.data_dir,
            "cached_{}_{}_{}".format(
                mode,
                list(filter(None, self.hparams.model_name_or_path.split("/"))).pop(),
                str(self.hparams.max_seq_length),
            ),
        )

    def is_logger(self):
        return True # for single GPU

    @staticmethod
    def add_model_specific_args(parser, root_dir):
        parser.add_argument(
            "--model_name_or_path",
            default=None,
            type=str,
            required=True,
            help="Path to pretrained model or model identifier from huggingface.co/models",
        )
        parser.add_argument(
            "--config_name", default="", type=str, help="Pretrained config \
            name or path if not the same as model_name"
        )
        parser.add_argument(
            "--tokenizer_name",
            default="",
            type=str,
            help="Pretrained tokenizer name or path if not the same as model_name",
        )
        parser.add_argument(
            "--cache_dir",
            default="",
            type=str,
            help="Where do you want to store the pre-trained models downloaded from s3",
        )
        parser.add_argument("--learning_rate", default=5e-5, type=float, help="The initial learning\
        rate for Adam.")
        parser.add_argument("--weight_decay", default=0.0, type=float, help="Weight decay if we apply some.")
        parser.add_argument("--adam_epsilon", default=1e-8, type=float, help="Epsilon for Adam optimizer.")
        parser.add_argument("--warmup_steps", default=0, type=int, help="Linear warmup over warmup_steps.")
        parser.add_argument(
            "--num_train_epochs", default=3, type=int, help="Total number of training epochs to perform."
        )

        parser.add_argument("--train_batch_size", default=32, type=int)
        parser.add_argument("--eval_batch_size", default=32, type=int)

        parser.add_argument(
            "--max_source_length",
            default=1024,
            type=int,
            help="The maximum total input sequence length after tokenization. Sequences longer "
            "than this will be truncated, sequences shorter will be padded.",
        )
        parser.add_argument(
            "--max_target_length",
            default=56,
            type=int,
            help="The maximum total input sequence length after tokenization. Sequences longer "
            "than this will be truncated, sequences shorter will be padded.",
        )

        parser.add_argument(
            "--data_dir",
            default=None,
            type=str,
            required=True,
            help="The input data dir. Should contain the dataset files for the text generation task.",
        )
        return parser


class LoggingCallback(pl.Callback):
    def on_validation_end(self, trainer: pl.Trainer, pl_module: pl.LightningModule):
        logger.info("***** Validation results *****")
        if pl_module.is_logger():
            metrics = trainer.callback_metrics
            # Log results
            for key in sorted(metrics):
                if key not in ["log", "progress_bar"]:
                    logger.info("{} = {}\n".format(key, str(metrics[key])))


def add_generic_args(parser, root_dir):
    parser.add_argument(
        "--output_dir",
        default=None,
        type=str,
        required=True,
        help="The output directory where the model predictions and checkpoints will be written.",
    )

    parser.add_argument(
        "--fp16",
        action="store_true",
        help="Whether to use 16-bit (mixed) precision (through NVIDIA apex) instead of 32-bit",
    )

    parser.add_argument(
        "--fp16_opt_level",
        type=str,
        default="O1",
        help="For fp16: Apex AMP optimization level selected in ['O0', 'O1', 'O2', and 'O3']."
        "See details at https://nvidia.github.io/apex/amp.html",
    )

    parser.add_argument("--n_gpu", type=int, default=1)
    parser.add_argument("--max_grad_norm", default=1.0, type=float, help="Max gradient norm.")
    parser.add_argument("--do_train", action="store_true", help="Whether to run training.")
    parser.add_argument("--do_predict", action="store_true", help="Whether to run predictions on the test set.")
    parser.add_argument(
        "--gradient_accumulation_steps",
        type=int,
        default=1,
        help="Number of updates steps to accumulate before performing a backward/update pass.",
    )

    parser.add_argument("--seed", type=int, default=42, help="random seed for initialization")


def generic_train(model: T5Module, args: argparse.Namespace):
    # init model
    set_seed(args)

    if os.path.exists(args.output_dir) and os.listdir(args.output_dir) and args.do_train:
        raise ValueError("Output directory ({}) already exists and is not empty.".format(args.output_dir))

    # Can take out checkpoint saving after each epoch to save memory
    checkpoint_callback = pl.callbacks.ModelCheckpoint(
        filepath=args.output_dir, prefix="checkpoint", monitor="val_loss", mode="min", save_top_k=5
    )

    train_params = dict(
        accumulate_grad_batches=args.gradient_accumulation_steps,
        gpus=args.n_gpu,
        max_epochs=args.num_train_epochs,
        gradient_clip_val=args.max_grad_norm,
        checkpoint_callback=checkpoint_callback,
        callbacks=[LoggingCallback()],
        enable_pl_optimizer=False
    )

    if args.fp16:
        train_params["use_amp"] = args.fp16
        train_params["amp_level"] = args.fp16_opt_level

    if args.n_gpu > 1:
        train_params["distributed_backend"] = "ddp"

    trainer = pl.Trainer(**train_params)

    if args.do_train:
        trainer.fit(model)

    return trainer

In [16]:
def main(args):

    # If output_dir not provided, a folder will be generated in pwd
    if not args.output_dir:
        args.output_dir = os.path.join("./results", f"{args.task}_{time.strftime('%Y%m%d_%H%M%S')}",)
        os.makedirs(args.output_dir)
    model = T5Module(args)
    trainer = generic_train(model, args)

    # Save the last model as model.bin
    #checkpoints = list(sorted(glob.glob(os.path.join(args.output_dir, "checkpointepoch=*.ckpt"), recursive=True)))
    #model = model.load_from_checkpoint(checkpoints[-1])
    model.model.save_pretrained(args.output_dir)
    # Save tokenizer files
    model.tokenizer.save_pretrained('./')
    
    # Optionally, predict on dev set and write to output_dir
    if args.do_predict:
        # See https://github.com/huggingface/transformers/issues/3159
        # pl use this format to create a checkpoint:
        # https://github.com/PyTorchLightning/pytorch-lightning/blob/master\
        # /pytorch_lightning/callbacks/model_checkpoint.py#L169
        trainer.test(model)
    return trainer

In [17]:
!mkdir output

mkdir: cannot create directory ‘output’: File exists


In [18]:
ARGS_STR = """
--data_dir=./ \
--model_name_or_path=t5-base \
--learning_rate=3e-5 \
--train_batch_size=32 \
--output_dir=output/ \
--do_train \
--n_gpu=1 \
--num_train_epochs 5 \
--max_source_length 80 \
"""
#
#--eval_batch_size=3 \
#--do_predict \

parser = argparse.ArgumentParser()
add_generic_args(parser, os.getcwd())
parser = T5Module.add_model_specific_args(parser, os.getcwd())
args = parser.parse_args(ARGS_STR.split())
trainer = main(args)

Some weights of the model checkpoint at t5-base were not used when initializing T5ForConditionalGeneration: ['decoder.block.0.layer.1.EncDecAttention.relative_attention_bias.weight']
- This IS expected if you are initializing T5ForConditionalGeneration from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing T5ForConditionalGeneration from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
GPU available: True, used: True
INFO:lightning:GPU available: True, used: True
TPU available: None, using: 0 TPU cores
INFO:lightning:TPU available: None, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:lightning:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type                       | Params
--------

HBox(children=(HTML(value='Validation sanity check'), FloatProgress(value=1.0, bar_style='info', layout=Layout…

INFO:__main__:***** Validation results *****


HBox(children=(HTML(value='Training'), FloatProgress(value=1.0, bar_style='info', layout=Layout(flex='2'), max…




TypeError: optimizer_step() got an unexpected keyword argument 'on_tpu'