In [12]:
import sys
sys.path.append("/home/jovyan/TF_NEW/tf-transformers/src/")


import tensorflow as tf
import tqdm
from tf_transformers.models import BertModel
from tf_transformers.core import Trainer

In [75]:
# Load model

model_name = 'bert-base-cased'
num_gpus = 1
trainer = Trainer(distribution_strategy="mirrored", 
                 num_gpus=1)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


In [76]:
# Define Model
def get_model():
    model = BertModel.from_pretrained(model_name)
    return model
with trainer.distribution_strategy.scope():
    model = get_model()

You are using a model of type bert to instantiate a model of type . This is not supported for all configurations of models and can yield errors.
INFO:absl:Successful: Model checkpoints matched and loaded from /tmp/tf_transformers_cache/bert-base-cased/ckpt-1


In [None]:
# 1. mirrored Strategy (normal batch and distributed batch)
# 2. mirrored Strategy (uneven batch)
# 3. mirrored Strategy (less batch)
# 4. off Strategy
# 5. one device Strategy

In [77]:
### Do a tqdm on number of examples = 1000

num_examples = 10001
batch_size = 64
sequence_length = 128

input_ids = tf.random.uniform(minval=0, maxval=3000, shape=(num_examples, sequence_length), dtype=tf.int32)
input_mask = tf.ones_like(input_ids)
input_type_ids = tf.zeros_like(input_ids)

ds = {"input_ids": input_ids, "input_mask": input_mask, "input_type_ids": input_type_ids}

In [78]:
dataset = tf.data.Dataset.from_tensor_slices(ds).batch(batch_size, drop_remainder=False)
dataset_distributed = trainer.distribution_strategy.experimental_distribute_dataset(dataset)

### Test num_gpus=1 on dataset and dataset distributed

In [79]:
# Dataset iter
for (batch_inputs) in tqdm.tqdm(dataset):
    outputs = model(batch_inputs)

100%|██████████| 157/157 [00:36<00:00,  4.26it/s]


In [53]:
# Dataset distributed iter
for (batch_inputs) in tqdm.tqdm(dataset_distributed):
    outputs = model(batch_inputs)

79it [00:18,  4.25it/s]


### Test num_gpus=2 on dataset and dataset distributed

In [80]:
trainer = Trainer(distribution_strategy="mirrored", 
                 num_gpus=2)
# Define Model
def get_model():
    model = BertModel.from_pretrained(model_name)
    return model
with trainer.distribution_strategy.scope():
    model = get_model()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
You are using a model of type bert to instantiate a model of type . This is not supported for all configurations of models and can yield errors.
INFO:absl:Successful: Model checkpoints matched and loaded from /tmp/tf_transformers_cache/bert-base-cased/ckpt-1


In [81]:
dataset = tf.data.Dataset.from_tensor_slices(ds).batch(batch_size, drop_remainder=False)
dataset_distributed = trainer.distribution_strategy.experimental_distribute_dataset(dataset)

In [56]:
# Dataset iter (Single GPU usage only)
for (batch_inputs) in tqdm.tqdm(dataset):
    outputs = model(batch_inputs)

100%|██████████| 79/79 [00:18<00:00,  4.28it/s]


In [82]:
# Dataset iter

@tf.function
def step_call(inputs):
    outputs = model(inputs)
    return outputs

def unwrap_prereplica(output_dict):
    result = {}
    for key, per_replica_tensor in output_dict.items():
        results_tuple = per_replica_tensor.values
        # concatanate over row axis
        result[key] = tf.concat(results_tuple, axis=0)
    return result

dataset = tf.data.Dataset.from_tensor_slices(ds).batch( batch_size, drop_remainder=False)
dataset_distributed = trainer.distribution_strategy.experimental_distribute_dataset(dataset)
for (batch_inputs) in tqdm.tqdm(dataset_distributed):
    outputs = trainer.distribution_strategy.run(step_call, args=(batch_inputs,))
    outputs = unwrap_prereplica(outputs)
    

157it [00:46,  3.35it/s]


In [83]:
# Dataset iter (Double batch size)

@tf.function
def step_call(inputs):
    outputs = model(inputs)
    return outputs

def unwrap_prereplica(output_dict):
    result = {}
    for key, per_replica_tensor in output_dict.items():
        results_tuple = per_replica_tensor.values
        # concatanate over row axis
        result[key] = tf.concat(results_tuple, axis=0)
    return result

# Double batch size for 2 GPUs
dataset = tf.data.Dataset.from_tensor_slices(ds).batch(2* batch_size, drop_remainder=False)
dataset_distributed = trainer.distribution_strategy.experimental_distribute_dataset(dataset)
for (batch_inputs) in tqdm.tqdm(dataset_distributed):
    outputs = trainer.distribution_strategy.run(step_call, args=(batch_inputs,))
    outputs = unwrap_prereplica(outputs)
    

79it [00:57,  1.37it/s]


In [85]:
# Dataset iter (Double batch size)

@tf.function
def step_call(inputs):
    outputs = model(inputs)
    return outputs

def unwrap_prereplica(output_dict):
    result = {}
    for key, per_replica_tensor in output_dict.items():
        results_tuple = per_replica_tensor.values
        # concatanate over row axis
        result[key] = tf.concat(results_tuple, axis=0)
    return result

# Double batch size for 2 GPUs
dataset = tf.data.Dataset.from_tensor_slices(ds).batch(2* batch_size, drop_remainder=False)
dataset_distributed = trainer.distribution_strategy.experimental_distribute_dataset(dataset)
for (batch_inputs) in tqdm.tqdm(dataset_distributed):
    outputs = trainer.distribution_strategy.run(step_call, args=(batch_inputs,))
    #outputs = unwrap_prereplica(outputs)
    

79it [00:38,  2.05it/s]


In [84]:
# Dataset iter (Double batch size)

@tf.function
def step_call(inputs):
    outputs = model(inputs)
    return outputs

def unwrap_prereplica(output_dict):
    result = {}
    for key, per_replica_tensor in output_dict.items():
        results_tuple = per_replica_tensor.values
        # concatanate over row axis
        result[key] = tf.concat(results_tuple, axis=0)
    return result

# Double batch size for 2 GPUs
dataset = tf.data.Dataset.from_tensor_slices(ds).batch(3* batch_size, drop_remainder=False)
dataset_distributed = trainer.distribution_strategy.experimental_distribute_dataset(dataset)
for (batch_inputs) in tqdm.tqdm(dataset_distributed):
    outputs = trainer.distribution_strategy.run(step_call, args=(batch_inputs,))
    #outputs = unwrap_prereplica(outputs)
    

53it [00:40,  1.30it/s]


In [94]:
# Save model as saved model and load

import tempfile
import shutil

saved_model_dir = tempfile.mkdtemp()
model.save_as_serialize_module(saved_model_dir, overwrite=True)



INFO:tensorflow:Assets written to: /tmp/tmp6xq2r3jr/assets


INFO:tensorflow:Assets written to: /tmp/tmp6xq2r3jr/assets


In [92]:
# Load saved model inside strategy
del model

@tf.function 
def step_call_inference(inputs):
    input_ids = inputs['input_ids']
    input_mask = inputs['input_mask']
    input_type_ids = inputs['input_type_ids']
    
    outputs = inference_func(input_ids=input_ids,
                   input_mask=input_mask,
                   input_type_ids=input_type_ids)
    return outputs

with trainer.distribution_strategy.scope():
    loaded = tf.saved_model.load(saved_model_dir)
    inference_func = loaded.signatures['serving_default']
    
dataset_distributed = trainer.distribution_strategy.experimental_distribute_dataset(dataset)
for (batch_inputs) in tqdm.tqdm(dataset_distributed):
    outputs = trainer.distribution_strategy.run(step_call_inference, args=(batch_inputs,))