# T5 TPU Tensorflow 

In [1]:
%tensorflow_version 2.x
!pip install transformers



### Imports

We'll only be importing the components that we'll use during this tutorial: the TensorFlow model alongside the model specific tokenizer. The last two imports will manage the pre-processing of our data.

In [2]:
import tensorflow as tf
print(tf.__version__)

import os
from transformers import ( 
    T5Tokenizer, 
    TFT5Model, 
    TFT5ForConditionalGeneration
)

2.3.0


In [3]:
BATCH_SIZE = 8

SHUFFEL_SIZE = 1024

learning_rate = 3e-5

model_size = "t5-base"

# Building the training system

## Strategy

We make use of TensorFlow's strategies, which handle the data distribution as well as the distributed training that happens on the devices available. In this example we'll be using a `MirroredStrategy` which can be used to train on a multiple GPUs in a distributed manner. 

In [4]:
tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
tf.config.experimental_connect_to_cluster(tpu)
tf.tpu.experimental.initialize_tpu_system(tpu)

INFO:absl:Entering into master device scope: /job:worker/replica:0/task:0/device:CPU:0


INFO:tensorflow:Initializing the TPU system: grpc://10.25.24.114:8470


INFO:tensorflow:Initializing the TPU system: grpc://10.25.24.114:8470


INFO:tensorflow:Clearing out eager caches


INFO:tensorflow:Clearing out eager caches


INFO:tensorflow:Finished initializing TPU system.


INFO:tensorflow:Finished initializing TPU system.


<tensorflow.python.tpu.topology.Topology at 0x7f5901a68400>

In [5]:
strategy = tf.distribute.experimental.TPUStrategy(tpu)
print("Number of accelerators: ", strategy.num_replicas_in_sync)



INFO:tensorflow:Found TPU system:


INFO:tensorflow:Found TPU system:


INFO:tensorflow:*** Num TPU Cores: 8


INFO:tensorflow:*** Num TPU Cores: 8


INFO:tensorflow:*** Num TPU Workers: 1


INFO:tensorflow:*** Num TPU Workers: 1


INFO:tensorflow:*** Num TPU Cores Per Worker: 8


INFO:tensorflow:*** Num TPU Cores Per Worker: 8


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)


Number of accelerators:  8


## Loading the Dataset with the strategy

Here we define a batch size for each replica. We set it to be a multiple of 8 to best leverage the systolic array as defined in the [Google TPU performance guide](https://cloud.google.com/tpu/docs/performance-guide#rule_of_thumb_pick_efficient_values_for_batch_and_feature_dimensions).

In [6]:
BATCH_SIZE_PER_REPLICA = 6
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 5

MAX_ARTICLE_LEN = 512
MAX_HIGHLIGHT_LEN = 150

In [7]:
GLOBAL_BATCH_SIZE

48

### Retrieving the TFRecord dataset

The TFRecord dataset is now entirely processed and ready to be used as input by our training loop. We load it, shuffle it and batch it.

In [8]:
tokenizer = T5Tokenizer.from_pretrained(model_size)
en_de_prefix = tf.reshape(tokenizer.encode("summarize: en_to_ger ", return_tensors="tf"), (-1,))
de_en_prefix = tf.reshape(tokenizer.encode("summarize: ger_to_en ", return_tensors="tf"), (-1,))
en_en_prefix = tf.reshape(tokenizer.encode("summarize: en_to_en ", return_tensors="tf"), (-1,))
de_de_prefix = tf.reshape(tokenizer.encode("summarize: ger_to_ger ", return_tensors="tf"), (-1,))

In [9]:
import numpy as np
bucket = "gs://tpu-bucket-cnn-daily-mail"

def get_tfrecord_dataset(drive_path, file_name):
    features = {
        'ger_x': tf.io.FixedLenFeature([MAX_ARTICLE_LEN-8], tf.int64),
        'ger_x_mask': tf.io.FixedLenFeature([MAX_ARTICLE_LEN-8], tf.int64),
        'ger_y': tf.io.FixedLenFeature([MAX_HIGHLIGHT_LEN], tf.int64),
        'ger_y_ids': tf.io.FixedLenFeature([MAX_HIGHLIGHT_LEN - 1], tf.int64),
        'ger_y_labels': tf.io.FixedLenFeature([MAX_HIGHLIGHT_LEN - 1], tf.int64),

        'en_x': tf.io.FixedLenFeature([MAX_ARTICLE_LEN-8], tf.int64),
        'en_x_mask': tf.io.FixedLenFeature([MAX_ARTICLE_LEN-8], tf.int64),
        'en_y': tf.io.FixedLenFeature([MAX_HIGHLIGHT_LEN], tf.int64),
        'en_y_ids': tf.io.FixedLenFeature([MAX_HIGHLIGHT_LEN - 1], tf.int64),
        'en_y_labels': tf.io.FixedLenFeature([MAX_HIGHLIGHT_LEN - 1], tf.int64),
    }

    dataset = tf.data.TFRecordDataset(f"{drive_path}/{file_name}.tfrecord")

    # Taken from the TensorFlow models repository: https://github.com/tensorflow/models/blob/befbe0f9fe02d6bc1efb1c462689d069dae23af1/official/nlp/bert/input_pipeline.py#L24
    def decode_record(record, features):
        """Decodes a record to a TensorFlow example."""
        example = tf.io.parse_single_example(record, features)

        # tf.Example only supports tf.int64, but the TPU only supports tf.int32.
        # So cast all int64 to int32.
        for name in list(example.keys()):
            t = example[name]
            if t.dtype == tf.int64:
                t = tf.cast(t, tf.int32)
            example[name] = t
        return example


    def select_data_from_record(record):
        i  = np.random.randint(4) 
        if i == 0:
            return tf.concat([de_de_prefix, record['ger_x']], axis=0), tf.concat([tf.ones(8, dtype=tf.int32), record['ger_x_mask']], axis=0), record['ger_y'], record['ger_y_ids'], record['ger_y_labels']
        elif i == 1:
            return tf.concat([en_de_prefix, record['en_x']], axis=0), tf.concat([tf.ones(8, dtype=tf.int32), record['en_x_mask']], axis=0), record['ger_y'], record['ger_y_ids'], record['ger_y_labels']
        elif i == 2:
            return tf.concat([de_en_prefix, record['ger_x']], axis=0), tf.concat([tf.ones(8, dtype=tf.int32), record['ger_x_mask']], axis=0), record['en_y'], record['en_y_ids'], record['en_y_labels']
        elif i == 3:
            return tf.concat([en_en_prefix, record['en_x']], axis=0), tf.concat([tf.ones(8, dtype=tf.int32), record['en_x_mask']], axis=0), record['en_y'], record['en_y_ids'], record['en_y_labels']
 
    dataset = dataset.map(lambda record: decode_record(record, features))
    dataset = dataset.map(select_data_from_record)
    dataset = dataset.shuffle(100)
    return dataset.batch(GLOBAL_BATCH_SIZE)

train_dataset = get_tfrecord_dataset(bucket, "corss_lingual_train_cnn_daily_mail")
train_dataset.prefetch(1024)

validation_dataset = get_tfrecord_dataset(bucket, "corss_lingual_val_cnn_daily_mail")
test_dataset = get_tfrecord_dataset(bucket, "corss_lingual_test_cnn_daily_mail")


In [10]:
from google.colab import auth
auth.authenticate_user()

There is an additional step here to distribute the dataset among the different TPU cores. We make use of a strategy method to do so.

Every item held in the dataset (which is a batched dataset) will now be split over the TPU workers. As the TPU we're using has 8 workers and our batch is of size 64, every example will be evenly split in batches of (64 / 8 =) 8 and distributed across workers.

In [11]:
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
validation_dist_dataset = strategy.experimental_distribute_dataset(validation_dataset)

## Model creation

We create a function that will instantiate a new model when called.

In [12]:
def model_fn():
    return TFT5ForConditionalGeneration.from_pretrained(model_size)

## Hyperparameters initialization

While in the strategy's scope, we define a sparse categorical crossentropy loss. We define a method `compute_loss` which will be called to compute the loss between the model's prediction and the expected result (or label).

In order to measure the accuracy during training and evaluation, we define two metrics which are both sparse categorical accuracy.

Finally, we initialize a model and create an optimizer object using the Adam optimizer.


In [13]:
with strategy.scope():
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(reduction=tf.keras.losses.Reduction.NONE, from_logits=True)

    def compute_loss(labels, predictions):
        per_example_loss = loss_object(labels, predictions)
        return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

    test_loss_metric = tf.keras.metrics.Mean(name='test_loss')
    test_accuracy_metric = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

    train_loss_metric = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
    train_accuracy_metric = tf.keras.metrics.SparseCategoricalAccuracy('training_accuracy')
    
    model = model_fn()
    optimizer = tf.keras.optimizers.Adam(learning_rate=3e-5, epsilon=1e-08)


If your task is similar to the task the model of the ckeckpoint was trained on, you can already use TFT5ForConditionalGeneration for predictions without further training.


## Steps

We create two functions that will be called during the training and test steps. 

In [14]:
with strategy.scope():
    def train_step(inputs):
        input_ids, input_mask, y, y_ids, lm_labels = inputs

        with tf.GradientTape() as tape:
            predictions = model(input_ids, attention_mask=input_mask, decoder_input_ids=y_ids, lm_labels=lm_labels, training=True)[0]  # Gather only the outputs of the text-classification head
            loss = compute_loss(y[:, 1:], predictions)

        gradients = tape.gradient(loss, model.trainable_variables)

        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        train_loss_metric.update_state(loss)
        train_accuracy_metric.update_state(y[:, 1:], predictions)

    def test_step(inputs):
        input_ids, input_mask, y, y_ids, lm_labels = inputs

        predictions = model(input_ids, attention_mask=input_mask, decoder_input_ids=y_ids, lm_labels=lm_labels, training=False)[0]  # Gather only the outputs of the text-classification head
        t_loss = compute_loss(y[:, 1:], predictions)

        test_loss_metric.update_state(t_loss)
        test_accuracy_metric.update_state(y[:, 1:], predictions)

## Training & Evaluation

Finally, using all the previously defined attributes, we create two traced tf.function which will execute the training and test steps in a distributed manner. There is no need for them to return anything as the metrics will directly be updated in the steps described beforehand.

We loop over the number of epochs, training the model and evaluating it at the end of each epoch.

In [None]:
from tqdm import tqdm_notebook as tqdm

with strategy.scope():
    @tf.function
    def distributed_train_step(dataset):
        strategy.run(train_step, args=(dataset,))
 

    @tf.function
    def distributed_test_step(dataset):
        strategy.run(test_step, args=(dataset,))


    global_step = 0
    for epoch in range(EPOCHS):
        total_loss = 0.0
        training_steps = 10
        epoch_step = 0
        print_every = 1000

        ### Training loop ###
        for tensor in tqdm(train_dist_dataset, desc="Training"):
            distributed_train_step(tensor)  

            train_loss = train_loss_metric.result().numpy().astype(float)
            train_accuracy = train_accuracy_metric.result().numpy()

            global_step += 1
            epoch_step += 1

            if epoch_step % print_every == 0:
                print(f"Training step {epoch_step} Accuracy: {train_accuracy}, Training loss: {train_loss}")


        ### Test loop ###
        for tensor in tqdm(validation_dist_dataset, desc="Evaluating"):
            distributed_test_step(tensor)
            
        
        ### Output results ###
        test_accuracy = test_accuracy_metric.result().numpy()
        test_loss = test_loss_metric.result().numpy()
        print(f'Epoch: [{epoch}] Validation accuracy = {test_accuracy}')

        ### Reset metrics ###
        test_loss_metric.reset_states()
        train_accuracy_metric.reset_states()
        train_loss_metric.reset_states()
        test_accuracy_metric.reset_states()
        epoch_step = 0

        

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Training', max=1.0, style=ProgressStyle…

Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.


Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.


Training step 1000 Accuracy: 0.6952540278434753, Training loss: 31.111953735351562


In [None]:
ckpt_file = os.path.join(bucket, "checkpoint.ckpt")

In [None]:
model.save_weights(ckpt_file) 


In [None]:
model.load_weights(ckpt_file)