In [None]:
# default_exp trainer

In [29]:
import gcp_runner.core
gcp_runner.core.export_and_reload_all(silent=True)

In [30]:
#export
from criteo_nbdev.constants import *
import datetime
import tensorflow as tf

class TrainTimeCallback(tf.keras.callbacks.Callback):
    def on_epoch_begin(self, epoch, logs=None):
        self.epoch_start_time = datetime.datetime.now()

    def on_epoch_end(self, epoch, logs=None):
        logging.info('\nepoch train time: (hh:mm:ss.ms) {}'.format(
            datetime.datetime.now() - self.epoch_start_time))
        if not self.params is None:
            if 'steps' in self.params and self.params['steps']:
                epoch_milliseconds = (datetime.datetime.now(
                ) - self.epoch_start_time).total_seconds() * 1000
                logging.info(
                    '{} ms/step'.format(epoch_milliseconds / self.params['steps']))
                if BATCH_SIZE is not None:
                    logging.info('{} microseconds/example'.format(
                        1000 * epoch_milliseconds / self.params['steps'] / BATCH_SIZE))

In [31]:
#export
from criteo_nbdev.constants import *
import tensorflow as tf


def create_categorical_feature_column_with_hash_bucket(corpus_dict, key):
    corpus_size = len(corpus_dict[key])
    hash_bucket_size = min(corpus_size, 100000)
    categorical_feature_column = tf.feature_column.categorical_column_with_hash_bucket(
        key,
        hash_bucket_size,
        dtype=tf.dtypes.string
    )
    logging.info('categorical column %s hash_bucket_size %d',
                 key, hash_bucket_size)
    return categorical_feature_column


def create_categorical_feature_column_with_vocabulary_list(corpus_dict, key):
    corpus_size = len(corpus_dict[key])
    categorical_feature_column = tf.feature_column.categorical_column_with_vocabulary_list(
        key,
        list(corpus_dict[key].keys()),
        dtype=tf.dtypes.string,
        num_oov_buckets=corpus_size
    )
    logging.info(
        'categorical column with vocabular %s corpus_size %d', key, corpus_size)

    return categorical_feature_column

def create_embedding(vocabulary_size_dict, key, categorical_feature_column):
    vocabulary_size = vocabulary_size_dict[key]
    if vocabulary_size < 10:
        logging.info(
            'categorical column %s vocabulary_size %d - creating indicator column', key, vocabulary_size)
        return tf.feature_column.indicator_column(categorical_feature_column)

    embedding_dimension = int(min(50, math.floor(6 * vocabulary_size**0.25)))
    embedding_feature_column = tf.feature_column.embedding_column(
        categorical_feature_column,
        embedding_dimension)
    return embedding_feature_column

def create_linear_feature_columns():
    return list(tf.feature_column.numeric_column(field.name, dtype=tf.dtypes.float32) for field in CSV_SCHEMA if field.field_type == 'INTEGER' and field.name != 'label')

def create_categorical_embeddings_feature_columns(corpus_dict, vocabulary_size_dict, embeddings_mode: EMBEDDINGS_MODE_TYPE):
    if embeddings_mode == EMBEDDINGS_MODE_TYPE.none:
        return []
    elif embeddings_mode == EMBEDDINGS_MODE_TYPE.hashbucket:
        return list(create_embedding(
            vocabulary_size_dict,
            key,
            create_categorical_feature_column_with_hash_bucket(corpus_dict, key))
            for key, _ in corpus_dict.items())
    elif embeddings_mode == EMBEDDINGS_MODE_TYPE.vocabular:
        return list(create_embedding(
            vocabulary_size_dict,
            key,
            create_categorical_feature_column_with_vocabulary_list(corpus_dict, key))
            for key, _ in corpus_dict.items())
    else:
        raise ValueError('invalid embedding_mode: {}'.format(embedding_mode))


In [32]:
#export
import criteo_nbdev.data_reader

def create_feature_columns(embedding_mode: EMBEDDINGS_MODE_TYPE):
    corpus_dict = criteo_nbdev.data_reader.get_corpus_dict()
    vocabulary_size_dict = criteo_nbdev.data_reader.get_vocabulary_size_dict()
    feature_columns = []
    feature_columns.extend(create_linear_feature_columns())
    feature_columns.extend(
        create_categorical_embeddings_feature_columns(corpus_dict, vocabulary_size_dict, embedding_mode))
    return feature_columns

In [33]:
#export

from criteo_nbdev.constants import *
import tensorflow as tf

def create_keras_model_sequential():
    feature_columns = create_feature_columns(EMBEDDINGS_MODE_TYPE.hashbucket)

    feature_layer = tf.keras.layers.DenseFeatures(
        feature_columns, name="feature_layer")
    Dense = tf.keras.layers.Dense
    Dropout = tf.keras.layers.Dropout
    BatchNormalization = tf.keras.layers.BatchNormalization
    model = tf.keras.Sequential(
        [
            feature_layer,
            Dropout(0.3),
            Dense(598, activation=tf.nn.relu),
            Dense(598, activation=tf.nn.relu),
            Dense(598, activation=tf.nn.relu),
            Dense(1, activation=tf.nn.sigmoid)
        ])

    logging.info('compiling sequential keras model')
    # Compile Keras model
    model.compile(
        # cannot use Adagrad with mirroredstartegy https://github.com/tensorflow/tensorflow/issues/19551
        # optimizer=tf.optimizers.Adagrad(learning_rate=0.05),
        optimizer=tf.optimizers.SGD(learning_rate=0.05),
        # optimizer=tf.optimizers.Adam(learning_rate=0.0005),
        # optimizer=tf.optimizers.Adam(),
        #optimizer=tfa.optimizers.AdamW(learning_rate=0.001, weight_decay=0.1),
        loss=tf.keras.losses.BinaryCrossentropy(),
        metrics=['accuracy'])
    return model

In [34]:
#export
from criteo_nbdev.constants import *
from gcp_runner.ai_platform_constants import *
import criteo_nbdev.data_reader
import nbdev.imports
import tensorflow as tf
import logging
import math
import os

def train_and_evaluate_keras_model(
    model, 
    model_dir, 
    epochs,
    dataset_source: DATASET_SOURCE_TYPE,
    dataset_size: DATASET_SIZE_TYPE,
    embeddings_mode: EMBEDDINGS_MODE_TYPE,
    distribution_strategy: DistributionStrategyType):
    
    log_dir = os.path.join(model_dir, "logs")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=log_dir,
        histogram_freq=1,
        embeddings_freq=1,
        profile_batch=min(epochs, 2))

    checkpoints_dir = os.path.join(model_dir, "checkpoints")
    # crashing https://github.com/tensorflow/tensorflow/issues/27688
    if not os.path.exists(checkpoints_dir):
        os.makedirs(checkpoints_dir)

    callbacks = []
    train_time_callback = TrainTimeCallback()

    if DistributionStrategyType == DistributionStrategyType.TPU_STRATEGY:
        # epoch and accuracy constants are not supported when training on TPU.
        checkpoints_file_path = checkpoints_dir + "/epochs_tpu.hdf5"
        checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
            checkpoints_file_path, verbose=1, mode='max')
        callbacks = [tensorboard_callback,
                     checkpoint_callback, train_time_callback]
    else:
        if embeddings_mode == EMBEDDINGS_MODE_TYPE.manual or distribution_strategy == DistributionStrategyType.MULTI_WORKER_MIRRORED_STRATEGY:
            # accuracy fails for adagrad
            # for some reason accuracy is not available for EMBEDDINGS_MODE_TYPE.manual
            # for some reason accuracy is not available for MultiWorkerMirroredStrategy
            checkpoints_file_path = checkpoints_dir + \
                "/epochs:{epoch:03d}.hdf5"
        else:
            checkpoints_file_path = checkpoints_dir + \
                "/epochs:{epoch:03d}-accuracy:{accuracy:.3f}.hdf5"
        checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
            checkpoints_file_path, verbose=1, mode='max')
        callbacks = [tensorboard_callback, checkpoint_callback, train_time_callback]

    verbosity = 1 if nbdev.imports.in_ipython() else 2
    logging.info('training keras model')
    training_ds = criteo_nbdev.data_reader.get_dataset(dataset_source, dataset_size, DATASET_TYPE.training, embeddings_mode).repeat(epochs)
    eval_ds = criteo_nbdev.data_reader.get_dataset(dataset_source, dataset_size, DATASET_TYPE.validation, embeddings_mode).repeat(epochs)
    
    # steps_per_epoch and validation_steps are required for MultiWorkerMirroredStrategy
    model.fit(
        training_ds,
        epochs=epochs,
        verbose=verbosity,
        callbacks=callbacks,
        steps_per_epoch=criteo_nbdev.data_reader.get_steps_per_epoch(dataset_size, DATASET_TYPE.training),
        validation_data=eval_ds,
        validation_steps=criteo_nbdev.data_reader.get_steps_per_epoch(dataset_size, DATASET_TYPE.validation))

    logging.info("done training keras model, evaluating model")
    loss, accuracy = model.evaluate(
        eval_ds, 
        verbose=verbosity, 
        steps=criteo_nbdev.data_reader.get_steps_per_epoch(dataset_size, DATASET_TYPE.validation), 
        callbacks=[tensorboard_callback])
    logging.info("Eval - Loss: {}, Accuracy: {}".format(loss, accuracy))
    logging.info(model.summary())
    logging.info("done evaluating keras model")
    return {'accuracy': accuracy, 'loss': loss}


In [None]:
import gcp_runner.local_runner
gcp_runner.local_runner.run_python(train_and_evaluate_keras_model_small, python_binary='python3')

In [1]:
import logging
logging.getLogger().setLevel(logging.INFO)

In [2]:
#export

import gcp_runner.core
gcp_runner.core.export_and_reload_all(silent=True)

# def train_keras_sequential(strategy, model_dir):
#     return train_and_evaluate_keras_model(create_keras_model_sequential(), model_dir)

# train_keras_sequential(None, './models/model1')

def train_and_evaluate_keras_model_small(distribution_strategy=None, **kwargs):
    print('distribution_strategy:')
    print(distribution_strategy)
    print('kwargs:')
    print(kwargs)
#     print('args:')7
#     print(args)
    #train_and_evaluate_keras_model(create_keras_model_sequential(), './models/model1', 2, DATASET_SOURCE_TYPE.bq, DATASET_SIZE_TYPE.full, EMBEDDINGS_MODE_TYPE.hashbucket, None)
    
#train_and_evaluate_keras_model_small()



In [3]:
train_and_evaluate_keras_model_small(**{'distribution_strategy':'d', 'other_arg': 'o'})

distribution_strategy:
d
kwargs:
{'other_arg': 'o'}


In [4]:
import gcp_runner.core
from gcp_runner.ai_platform_constants import DistributionStrategyType


gcp_runner.core.get_run_python_args(train_and_evaluate_keras_model_small, distribution_strategy=DistributionStrategyType.ONE_DEVICE_STRATEGY, arg_1='arg_1_value')

['python',
 '-u',
 '-m',
 'gcp_runner.entry_point',
 '--module-name=criteo_nbdev.trainer',
 '--function-name=train_and_evaluate_keras_model_small',
 '--distribution-strategy=tf.distribute.OneDeviceStrategy',
 '--arg-1=arg_1_value']

In [5]:
import gcp_runner.local_runner
gcp_runner.local_runner.run_docker(
    train_and_evaluate_keras_model_small,
    'gcr.io/alekseyv-scalableai-dev/criteo-nbdev',
    build_docker_file='./Dockerfile', distribution_strategy=DistributionStrategyType.ONE_DEVICE_STRATEGY, arg_1='arg_1_value')

Building Docker image:
docker build -f ./Dockerfile -t gcr.io/alekseyv-scalableai-dev/criteo-nbdev ./
Sending build context to Docker daemon  2.922MB
Step 1/12 : FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-1
 ---> 4f3009408e35
Step 2/12 : WORKDIR /root
 ---> Using cache
 ---> ffd9474da319
Step 3/12 : ENV PROJECT_ID=alekseyv-scalableai-dev
 ---> Using cache
 ---> e65292b3e3c8
Step 4/12 : ENV GOOGLE_APPLICATION_CREDENTIALS=/root/service_account_key.json
 ---> Using cache
 ---> 41bad1b2743d
Step 5/12 : COPY service_account_key.json /root/
 ---> Using cache
 ---> cf7f34d90229
Step 6/12 : RUN pip install nbdev
 ---> Using cache
 ---> b1d87c2fcf23
Step 7/12 : ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache


 ---> 5269cc28acd2
Step 8/12 : RUN git clone https://github.com/vlasenkoalexey/gcp_runner
 ---> Running in 6d3c2a6ee67e
[91mCloning into 'gcp_runner'...
[0mRemoving intermediate container 6d3c2a6ee67e
 ---> 71cacad26f01
Step 9/12 : RUN pip install -e 

0

In [7]:
import gcp_runner.ai_platform_runner
gcp_runner.ai_platform_runner.run_package(
     train_and_evaluate_keras_model_small,
     'gs://alekseyv-scalableai-dev-criteo-model-bucket/test-job-dir',
     use_chief_in_tf_config=None,
     region='us-west1')

running training job using package on Google Cloud Platform AI:
gcloud ai-platform jobs submit training ai_platform_runner_train_package_20200331_183505 \ 
 --runtime-version=2.1 \ 
 --python-version=3.7 \ 
 --stream-logs \ 
 --module-name=criteo_nbdev.entry_point \ 
 --package-path=/Users/alekseyv/vlasenkoalexey/criteo_nbdev/criteo_nbdev \ 
 --scale-tier=basic \ 
 --region=us-west1 \ 
 --job-dir=gs://alekseyv-scalableai-dev-criteo-model-bucket/test-job-dir \ 
 -- \ 
 --job-dir=gs://alekseyv-scalableai-dev-criteo-model-bucket/test-job-dir \ 
 --module-name=criteo_nbdev.trainer \ 
 --function-name=train_and_evaluate_keras_model_small
[31mERROR: (gcloud.ai-platform.jobs.submit.training) Packaging of user Python code failed with message:[0m
[31m[0m
[31merror in criteo_nbdev setup command: 'install_requires' must be a string or list of strings containing valid project/version requirement specifiers; Invalid requirement, parse error at "',pandas='"[0m
[31m[0m
[31m[0m
[31mTry manu

1

In [None]:
import gcp_runner.ai_platform_runner
gcp_runner.ai_platform_runner.run_docker_image(
     train_and_evaluate_keras_model_small,
     'gs://alekseyv-scalableai-dev-criteo-model-bucket/test-job-dir',
     master_image_uri='gcr.io/alekseyv-scalableai-dev/criteo-nbdev',
     build_docker_file='./Dockerfile',
     region='us-west1')

Building Docker image:
docker build -f ./Dockerfile -t gcr.io/alekseyv-scalableai-dev/criteo-nbdev ./
Sending build context to Docker daemon  2.922MB
Step 1/12 : FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-1
 ---> 4f3009408e35
Step 2/12 : WORKDIR /root
 ---> Using cache
 ---> ffd9474da319
Step 3/12 : ENV PROJECT_ID=alekseyv-scalableai-dev
 ---> Using cache
 ---> e65292b3e3c8
Step 4/12 : ENV GOOGLE_APPLICATION_CREDENTIALS=/root/service_account_key.json
 ---> Using cache
 ---> 41bad1b2743d
Step 5/12 : COPY service_account_key.json /root/
 ---> Using cache
 ---> cf7f34d90229
Step 6/12 : RUN pip install nbdev
 ---> Using cache
 ---> b1d87c2fcf23
Step 7/12 : ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache


 ---> 90870c6dfd00
Step 8/12 : RUN git clone https://github.com/vlasenkoalexey/gcp_runner
 ---> Running in 12b3e07e4ab5
[91mCloning into 'gcp_runner'...
[0mRemoving intermediate container 12b3e07e4ab5
 ---> b157e9b59059
Step 9/12 : RUN pip install -e 