In [1]:
import tensorflow as tf
# tf.debugging.set_log_device_placement(True)
import tensorflow_addons as tfa
import sys
import os
if os.path.abspath('../') not in sys.path:
    sys.path.append(os.path.abspath('../'))
if os.path.abspath('../../tt_keras') not in sys.path:
    sys.path.append(os.path.abspath('../../tt_keras'))
if os.path.abspath('../../tf2-gradient-checkpointing') not in sys.path:
    sys.path.append(os.path.abspath('../../tf2-gradient-checkpointing'))

if os.path.abspath('../../t3f') not in sys.path:
    sys.path.append(os.path.abspath('../../t3f'))

import automatic_speech_recognition as asr
from automatic_speech_recognition.utils import wrap_call_methods
import time
from datetime import datetime
import argparse
import pickle
from checkpointing import checkpointable
from functools import partial

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from tqdm.notebook import tqdm
from h5_to_tflite import TF_CUSTOM_OBJECTS

In [3]:
from tensorflow import keras
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.mixed_precision import experimental as mixed_precision

In [4]:
import horovod.tensorflow.keras as hvd

In [5]:
#%load_ext tensorboard
#%tensorboard --logdir=./models/ --port=32779

In [6]:
%pdb on

Automatic pdb calling has been turned ON


## Train

In [5]:
#os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

In [6]:
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

In [7]:
def get_pipeline(model, optimizer=None):
    alphabet = asr.text.Alphabet(lang='en')
    features_extractor = asr.features.MFCC(
        features_num=26,
        sample_rate=16000,
        standardize=None,
        winlen=0.032,
        winstep=0.02,
    )
    if not optimizer:
        optimizer = tf.optimizers.Adam(lr=1e-3, beta_1=0.9, beta_2=0.999)
    decoder = asr.decoder.GreedyDecoder()
    pipeline = asr.pipeline.CTCPipeline(
        alphabet, features_extractor, model, optimizer, decoder
    )
    return pipeline

In [8]:
def tf_func_wrapper(layer):
    return tf.function(layer.call, experimental_relax_shapes=True)

def checkpointing_wrapper(layer):
    call_method = checkpointable(layer.call)
    call_method = partial(call_method, 
                         _checkpoint=True, 
                         _watch_vars=layer.trainable_variables)

    return call_method

In [9]:
def train_model(filename, dataset_idx, val_dataset_idx=None, batch_size=10, epochs=25, tensorboard=False, restart_filename=None):
    basename = os.path.basename(filename).split('.')[0]
    model_dir = os.path.join(os.path.dirname(filename), basename + '_train')
    os.makedirs(model_dir, exist_ok=True)
    
    model = keras.models.load_model(filename, custom_objects=TF_CUSTOM_OBJECTS)
    # Wrap layers in tf func and checkpoints
    model = wrap_call_methods(model, tf_func_wrapper, 
                              wrap_rnn_cells=True, 
                              wrap_time_distributed_inner=True)
    model = wrap_call_methods(model, checkpointing_wrapper, 
                              names=['time_distributed_144', 'rnn_72', 'time_distributed_145'], 
                              wrap_rnn_cells=True, 
                              wrap_time_distributed_inner=True)
    
    if restart_filename:
        model.load_weights(restart_filename)
    dataset = asr.dataset.Audio.from_csv(dataset_idx, batch_size=batch_size, use_filesizes=True)
    dataset.sort_by_length()
    dataset.shuffle_indices()
    if val_dataset_idx:
        val_dataset = asr.dataset.Audio.from_csv(val_dataset_idx, batch_size=batch_size, use_filesizes=True)

    opt_instance = tf.optimizers.Adam(0.0001 * hvd.size(), beta_1=0.9, beta_2=0.999)
#     opt_instance = tfa.optimizers.NovoGrad(0.001, beta_1=0.95, beta_2=0.5, weight_decay=0.001)

    opt = hvd.DistributedOptimizer(opt_instance)
    pipeline = get_pipeline(model, opt)
    
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
        hvd.callbacks.MetricAverageCallback(),
    ]
    schedule=tf.keras.experimental.CosineDecayRestarts(
        0.001 * hvd.size(), 10, t_mul=2.0, m_mul=1.0, alpha=0.0,
    )
    callbacks.append(LearningRateScheduler(schedule))
    if hvd.rank() == 0:
        prefix = datetime.now().strftime("%Y%m%d-%H%M%S")
        monitor_metric_name = 'loss' # if not val_dataset_idx else 'val_loss'  # val_loss is wrong and broken
        callbacks.append(
            keras.callbacks.ModelCheckpoint(
                os.path.join(model_dir, prefix + '_best.ckpt'),
                monitor=monitor_metric_name, save_weights_only=True,
                save_best_only=True))
        if tensorboard:
            logdir = os.path.join(model_dir, 'tb', prefix)
            tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir, profile_batch=1)
            callbacks.append(tensorboard_callback)

    time_start = time.time()

    hist = pipeline.fit(dataset, epochs=epochs, dev_dataset=val_dataset,
                        callbacks=callbacks,
                        verbose=1 if hvd.rank() == 0 else 0,
                        validation_steps=10)
    elapsed = time.time() - time_start
    
    if hvd.rank() == 0:
        print(f'Elapsed time: {elapsed}')
        #np.save(os.path.join(model_dir, prefix + '_hist.p'), np.array(hist))

In [10]:
train_model(
    filename='./models/tt_ds_uniform_initall_r14.h5', 
    dataset_idx='./dev-clean-index.csv',
    val_dataset_idx='./dev-clean-index.csv',
    batch_size=10,
    epochs=25,
    tensorboard=False,
    restart_filename=None,
)

Epoch 1/25
Instructions for updating:
Use ref() instead.
Instructions for updating:
Use ref() instead.
Instructions for updating:
Prefer tf.tensor_scatter_nd_add, which offers the same functionality with well-defined read-write semantics.
Instructions for updating:
Prefer tf.tensor_scatter_nd_update, which offers the same functionality with well-defined read-write semantics.
 47/270 [====>.........................] - ETA: 11:08 - loss: 443.6087

ResourceExhaustedError: OOM when allocating tensor with shape[10240,2,14] and type float on /job:localhost/replica:0/task:0/device:GPU:0 by allocator GPU_0_bfc [Op:ZerosLike]

In [None]:
if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='Train ctc asr model')
    parser.add_argument('--filename', type=str,
                        help='filename of the model')
    parser.add_argument('--dataset', type=str,
                        help='path to the dataset index',)
    parser.add_argument('--val_dataset', type=str,
                        help='path to the validation dataset index (optional)',
                        default=None)
    parser.add_argument('--batch_size', type=int,
                       help='batch size for training and validation',
                       default=10)
    parser.add_argument('--epochs', type=int,
                       help='number of epochs to use for training',
                       default=25)
    parser.add_argument('--tensorboard', type=bool,
                       help='if tensorboard log will be written',
                       default=False)
    parser.add_argument('--restart_filename', type=str,
                       help='filename of the checkpoint to restart from',
                       default=None)
    args = parser.parse_args()
    
    train_model(args.filename, args.dataset,
                args.val_dataset, args.epochs,
                args.tensorboard, args.restart_filename)

training times: 25 epochs - 10 batch / epoch
 tt mixed - gpu - 9427s - batch = 1
 tt mixed - cpu - 29900s - batch = 1
 tt full  - cpu - 14050s - batch = 1

In [13]:
filename='./models/ds.h5'
dataset_idx='./data/train-clean-100-index.csv'
val_dataset_idx='./data/dev-clean-index.csv'
epochs=25
tensorboard=True
restart_filename=None