# Iceberg Classification Step 2: Model Training in distributed training
The following code includes demonstration for:
- get data from ``feature store``
- training with ``TFRecord``
- distributed training

In [1]:
import tensorflow as tf
print("Version of TensorFlow is {}".format(tf.__version__))

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
72,application_1574692443370_0076,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
Version of TensorFlow is 1.14.0

In [2]:
from hops import featurestore
from hops import experiment
from hops import tensorboard

In [3]:
def create_tf_dataset_train():
    tfrecord_path="train_tfrecords_iceberg_classification_dataset"
    name_list=["band_1", "band_2", "band_avg", "is_iceberg"]
    dataset_dir = featurestore.get_training_dataset_path(tfrecord_path)
    input_files = tf.gfile.Glob(dataset_dir + "/part-r-*")
    dataset = tf.data.TFRecordDataset(input_files)
    # 'tf_record_schema' is needed because we need to parse a single example from all the TFRecords we have
    tf_record_schema = featurestore.get_training_dataset_tf_record_schema(tfrecord_path)

    def decode(example_proto):
        example = tf.parse_single_example(example_proto, tf_record_schema)
        x = tf.stack([example[name_list[0]], example[name_list[1]], example[name_list[2]]], axis=1)
        x = tf.reshape(x, [75, 75, 3])
        y = [tf.cast(example[name_list[3]], tf.float32)]
        return x,y
    
    dataset = dataset.map(decode).shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE).repeat(NUM_EPOCHS)
    return dataset

In [4]:
def create_tf_dataset_test():
    tfrecord_path="test_tfrecords_iceberg_classification_dataset"
    name_list=["band_1", "band_2", "band_avg", "is_iceberg"]
    dataset_dir = featurestore.get_training_dataset_path(tfrecord_path)
    input_files = tf.gfile.Glob(dataset_dir + "/part-r-*")
    dataset = tf.data.TFRecordDataset(input_files)
    # 'tf_record_schema' is needed because we need to parse a single example from all the TFRecords we have
    tf_record_schema = featurestore.get_training_dataset_tf_record_schema(tfrecord_path)

    def decode(example_proto):
        example = tf.parse_single_example(example_proto, tf_record_schema)
        x = tf.stack([example[name_list[0]], example[name_list[1]], example[name_list[2]]], axis=1)
        x = tf.reshape(x, [75, 75, 3])
        y = [tf.cast(example[name_list[3]], tf.float32)]
        return x,y
    
    dataset = dataset.map(decode).shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE).repeat(NUM_EPOCHS)
    return dataset

In [5]:
def create_model():
    model = tf.keras.models.Sequential()
    
    #Conv Layer 1
    model.add(tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu', input_shape=INPUT_SHAPE))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2,2)))
    model.add(tf.keras.layers.Dropout(0.2))

    #Conv Layer 2
    model.add(tf.keras.layers.Conv2D(128, kernel_size=(3, 3), activation='relu' ))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))

    #Conv Layer 3
    model.add(tf.keras.layers.Conv2D(128, kernel_size=(3, 3), activation='relu'))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))

    #Conv Layer 4
    model.add(tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu'))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))

    #Flatten the data for upcoming dense layers
    model.add(tf.keras.layers.Flatten())

    #Dense Layers
    model.add(tf.keras.layers.Dense(512))
    model.add(tf.keras.layers.Activation('relu'))
    model.add(tf.keras.layers.Dropout(0.2))

    #Dense Layer 2
    model.add(tf.keras.layers.Dense(256))
    model.add(tf.keras.layers.Activation('relu'))
    model.add(tf.keras.layers.Dropout(0.2))

    #Sigmoid Layer
    model.add(tf.keras.layers.Dense(1))
    model.add(tf.keras.layers.Activation('sigmoid'))
    return model

In [6]:
def train_fn(learning_rate):
    """
    Defines the training loop:
    
    1. Get Model
    2. Define custom metrics
    3. Compile Model
    4. Convert Keras model to TF Estimator
    5. Fit model on train dataset
    6. Evaluate model on validation dataset
    7. Save validation results to HopsFS
    8. Export trained model for serving
    """
    # Tell Keras we are traning (in case it does different functionality between train/test time)
    tf.keras.backend.set_learning_phase(True)

    # 1. Get model
    print("Defning the model")
    model = create_model()
    print("Defining the model complete")
    
    
    # 2. Compile the model
    print("Compiling the model")
    model.compile(optimizer=tf.train.AdamOptimizer(learning_rate), loss='binary_crossentropy',  
                  metrics=['accuracy'])
    print("Compiling the model complete")
    
    # 3. Convert Keras model to TF Estimator
    # Define DistributionStrategies and convert the Keras Model to an
    # Estimator that utilizes these DistributionStrateges.
    # Evaluator is a single worker, so using MirroredStrategy.
    # Training is automatically distributed on all available GPUs when using MirroredStrategy
    print("Convert keras model to a Tensorflow Estimator")
    run_config = tf.estimator.RunConfig(
#             train_distribute=tf.contrib.distribute.MirroredStrategy())
            train_distribute=tf.distribute.experimental.MultiWorkerMirroredStrategy())
    keras_estimator = tf.keras.estimator.model_to_estimator(keras_model=model, 
               config=run_config, model_dir=tensorboard.logdir())
    print("Keras model to estimator conversion complete")
    
    
    # 4. Fit model on training dataset
    print("Starting training...")
    tf.estimator.train_and_evaluate(keras_estimator, train_spec=tf.estimator.TrainSpec(
        input_fn=lambda: create_tf_dataset_train()),
        eval_spec=tf.estimator.EvalSpec(
            input_fn=lambda: create_tf_dataset_test()))
    print("Training complete")
    
#     # 5. Evalute model on validation dataset
#     print("Evaluating model on validation dataset")
#     eval_results = keras_estimator.evaluate(lambda: create_tf_dataset(VAL_DATASET, SHUFFLE_BUFFER_SIZE, BATCH_SIZE, NUM_EPOCHS))    
#     val_top1acc = str(eval_results["accuracy"])
#     val_top3acc = str(eval_results["top3_acc"])
#     val_top5acc = str(eval_results["top5_acc"])
#     validation_results = {
#         "top1_acc": val_top1acc,
#         "val_top3_acc": val_top3acc,
#         "val_top5_acc": val_top5acc
#     }
#     print("Evaluation complete")
    
#     # 6. Save validation results to HopsFS
#     print("Saving validation results to HopsFS..")
#     val_results_path = hdfs.project_path() + "Resources/" + VALIDATION_RESULTS_FILE 
#     hdfs.dump(json.dumps(validation_results), val_results_path)
#     print("Saving validation results complete")

## Launch the experiment

In [8]:
# Hyperparameter for TFRecords
NUM_EPOCHS = 2
BATCH_SIZE = 32
SHUFFLE_BUFFER_SIZE = 10000
# Hyperparameter for learning rate
LEARNING_RATE = 0.001
args_d = {}
args_d["learning_rate"] = [LEARNING_RATE]
# Input shape of the model
INPUT_SHAPE= (75, 75, 3)

In [9]:
experiment_result_path = experiment.launch(
    train_fn, 
    args_dict = args_d,
    name='tinyimagenet_resnet_distributed_training',
    description="Training TinyImageNet Using Distributed Training",
    local_logdir=True
)

Finished Experiment

# The END!