Skip to content
This repository has been archived by the owner on Apr 14, 2023. It is now read-only.

Commit

Permalink
update notebooks for Experiment, Parallel Experiments and Distributed…
Browse files Browse the repository at this point in the history
… Training abstractions
  • Loading branch information
robzor92 committed Oct 4, 2018
1 parent 53ce61e commit 18f2a52
Show file tree
Hide file tree
Showing 38 changed files with 1,074 additions and 5,770 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Minimal mnist example on Hopsworks\n",
"---\n",
"\n",
"<font color='red'> <h3>Tested with TensorFlow 1.10</h3></font>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Hops Experiment paradigm <a class=\"anchor\" id='paradigm'></a>\n",
"\n",
"To be able to run your TensorFlow code on Hops, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function. If you wish to run gridsearch over a given set of hyperparameters, you can define arguments for this wrapper function that corresponds to the name of your hyperparameters.\n",
"\n",
"You can also submit one or more `.py`, `.zip` or `.egg` files that contain your code and import them in the wrapper function. To include files, navigate back to HopsWorks and restart restart Jupyter, you can then include files in the Jupyter configuration.\n",
"\n",
"## The `hops` python module\n",
"\n",
"Below you can see the aforementioned wrapper function, which is coincidently named `wrapper` but could potentially be named anything. You can see two imports from the `hops` module, a `tensorboard` and an `hdfs` module. These are the only two modules that you will need to use in your TensorFlow wrapper function. \n",
"\n",
"### Using the `tensorboard` module\n",
"The `tensorboard` module allow us to get the log directory for summaries and checkpoints to be written to the TensorBoard we will see in a bit. The only function that we currently need to call is `tensorboard.logdir()`, which returns the path to the TensorBoard log directory. Furthermore, the content of this directory will be put in as a Dataset in your project in HopsFS after each hyperparameter configuration is finished. The `experiment.launch` function, that we will look at abit further down will return the exact path, which you can then navigate to using HopsWorks to inspect the files.\n",
"\n",
"The directory could in practice be used to store other data that should be accessible after each hyperparameter configuration is finished.\n",
"```python\n",
"# Use this module to get the TensorBoard logdir\n",
"from hops import tensorboard\n",
"tensorboard_logdir = tensorboard.logdir()\n",
"```\n",
"\n",
"\n",
"### Using the `hdfs` module\n",
"The `hdfs` module provides a single method to get the path in HopsFS where your data is stored, namely by calling `hdfs.project_path()`. The path resolves to the root path for your project, which is the view that you see when you click `Data Sets` in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, which is created automatically for each project, the path to the mnist data would be `hdfs.project_path() + 'Resources/mnist'`\n",
"```python\n",
"# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project\n",
"from hops import hdfs\n",
"project_path = hdfs.project_path()\n",
"```\n",
"\n",
"![image11-Dataset-ProjectPath.png](../../images/datasets.png)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def allreduce_mnist():\n",
"\n",
" import tensorflow as tf\n",
" from hops import devices\n",
" from hops import tensorboard\n",
"\n",
" PREDICT = tf.estimator.ModeKeys.PREDICT\n",
" EVAL = tf.estimator.ModeKeys.EVAL\n",
" TRAIN = tf.estimator.ModeKeys.TRAIN\n",
" learning_rate=0.002\n",
" batch_size=128\n",
" training_steps=5000\n",
"\n",
" def build_estimator(config):\n",
" \"\"\"\n",
" Build the estimator based on the given config and params.\n",
" Args:\n",
" config (RunConfig): RunConfig object that defines how to run the Estimator.\n",
" params (object): hyper-parameters (can be argparse object).\n",
" \"\"\"\n",
" return tf.estimator.Estimator(\n",
" model_fn=model_fn,\n",
" config=config,\n",
" )\n",
"\n",
"\n",
" def model_fn(features, labels, mode):\n",
" \"\"\"Model function used in the estimator.\n",
" Args:\n",
" features (Tensor): Input features to the model.\n",
" labels (Tensor): Labels tensor for training and evaluation.\n",
" mode (ModeKeys): Specifies if training, evaluation or prediction.\n",
" params (object): hyper-parameters (can be argparse object).\n",
" Returns:\n",
" (EstimatorSpec): Model to be run by Estimator.\n",
" \"\"\"\n",
" \n",
" features = tf.cast(features, tf.float32)\n",
" # Define model's architecture\n",
" logits = architecture(features, mode)\n",
" class_predictions = tf.argmax(logits, axis=-1)\n",
" # Setup the estimator according to the phase (Train, eval, predict)\n",
" loss = None\n",
" train_op = None\n",
" eval_metric_ops = {}\n",
" predictions = class_predictions\n",
" # Loss will only be tracked during training or evaluation.\n",
" if mode in (TRAIN, EVAL):\n",
" loss = tf.losses.sparse_softmax_cross_entropy(\n",
" labels=tf.cast(labels, tf.int32),\n",
" logits=logits)\n",
" # Training operator only needed during training.\n",
" if mode == TRAIN:\n",
" train_op = get_train_op_fn(loss)\n",
" # Evaluation operator only needed during evaluation\n",
" if mode == EVAL:\n",
" eval_metric_ops = {\n",
" 'accuracy': tf.metrics.accuracy(\n",
" labels=labels,\n",
" predictions=class_predictions,\n",
" name='accuracy')\n",
" }\n",
" # Class predictions and probabilities only needed during inference.\n",
" if mode == PREDICT:\n",
" predictions = {\n",
" 'classes': class_predictions,\n",
" 'probabilities': tf.nn.softmax(logits, name='softmax_tensor')\n",
" }\n",
" return tf.estimator.EstimatorSpec(\n",
" mode=mode,\n",
" predictions=predictions,\n",
" loss=loss,\n",
" train_op=train_op,\n",
" eval_metric_ops=eval_metric_ops\n",
" )\n",
"\n",
"\n",
" def architecture(inputs, mode, scope='MnistConvNet'):\n",
" \"\"\"Return the output operation following the network architecture.\n",
" Args:\n",
" inputs (Tensor): Input Tensor\n",
" mode (ModeKeys): Runtime mode (train, eval, predict)\n",
" scope (str): Name of the scope of the architecture\n",
" Returns:\n",
" Logits output Op for the network.\n",
" \"\"\"\n",
" with tf.variable_scope(scope):\n",
" inputs = inputs / 255\n",
" input_layer = tf.reshape(inputs, [-1, 28, 28, 1])\n",
" conv1 = tf.layers.conv2d(\n",
" inputs=input_layer,\n",
" filters=20,\n",
" kernel_size=[5, 5],\n",
" padding='valid',\n",
" activation=tf.nn.relu)\n",
" pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)\n",
" conv2 = tf.layers.conv2d(\n",
" inputs=pool1,\n",
" filters=40,\n",
" kernel_size=[5, 5],\n",
" padding='valid',\n",
" activation=tf.nn.relu)\n",
" pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)\n",
" flatten = tf.reshape(pool2, [-1, 4 * 4 * 40])\n",
" dense1 = tf.layers.dense(inputs=flatten, units=256, activation=tf.nn.relu)\n",
" dropout = tf.layers.dropout(\n",
" inputs=dense1, rate=0.5, training=mode==tf.estimator.ModeKeys.TRAIN)\n",
" dense2 = tf.layers.dense(inputs=dropout, units=10)\n",
" return dense2\n",
"\n",
"\n",
" def get_train_op_fn(loss):\n",
" \"\"\"Get the training Op.\n",
" Args:\n",
" loss (Tensor): Scalar Tensor that represents the loss function.\n",
" params (object): Hyper-parameters (needs to have `learning_rate`)\n",
" Returns:\n",
" Training Op\n",
" \"\"\"\n",
" optimizer = tf.train.AdamOptimizer(learning_rate)\n",
" train_op = optimizer.minimize(\n",
" loss=loss,\n",
" global_step=tf.train.get_global_step())\n",
" return train_op\n",
"\n",
"\n",
" def get_train_inputs(batch_size, mnist_data):\n",
" \"\"\"Return the input function to get the training data.\n",
" Args:\n",
" batch_size (int): Batch size of training iterator that is returned\n",
" by the input function.\n",
" mnist_data ((array, array): Mnist training data as (inputs, labels).\n",
" Returns:\n",
" DataSet: A tensorflow DataSet object to represent the training input\n",
" pipeline.\n",
" \"\"\"\n",
" dataset = tf.data.Dataset.from_tensor_slices(mnist_data)\n",
" dataset = dataset.shuffle(\n",
" buffer_size=1000, reshuffle_each_iteration=True\n",
" ).repeat(count=None).batch(batch_size)\n",
" return dataset\n",
"\n",
"\n",
" def get_eval_inputs(batch_size, mnist_data):\n",
" \"\"\"Return the input function to get the validation data.\n",
" Args:\n",
" batch_size (int): Batch size of validation iterator that is returned\n",
" by the input function.\n",
" mnist_data ((array, array): Mnist test data as (inputs, labels).\n",
" Returns:\n",
" DataSet: A tensorflow DataSet object to represent the validation input\n",
" pipeline.\n",
" \"\"\"\n",
" dataset = tf.data.Dataset.from_tensor_slices(mnist_data)\n",
" dataset = dataset.batch(batch_size)\n",
" return dataset\n",
" \n",
"\n",
" # Read parameters and input data\n",
" mnist_train, mnist_test = tf.keras.datasets.mnist.load_data()\n",
" \n",
" config = tf.estimator.RunConfig(\n",
" experimental_distribute=tf.contrib.distribute.DistributeConfig(\n",
" train_distribute=tf.contrib.distribute.CollectiveAllReduceStrategy(\n",
" num_gpus_per_worker=devices.get_num_gpus()),\n",
" eval_distribute=tf.contrib.distribute.MirroredStrategy(\n",
" num_gpus_per_worker=devices.get_num_gpus())),\n",
" model_dir=tensorboard.logdir(),\n",
" save_summary_steps=100,\n",
" log_step_count_steps=100,\n",
" save_checkpoints_steps=500)\n",
" # Setup the Estimator\n",
" model_estimator = build_estimator(config)\n",
" # Setup and start training and validation\n",
" train_spec = tf.estimator.TrainSpec(\n",
" input_fn=lambda: get_train_inputs(batch_size, mnist_train),\n",
" max_steps=training_steps)\n",
" eval_spec = tf.estimator.EvalSpec(\n",
" input_fn=lambda: get_eval_inputs(batch_size, mnist_test),\n",
" steps=None,\n",
" start_delay_secs=10, # Start evaluating after 10 sec.\n",
" throttle_secs=30 # Evaluate only every 30 sec\n",
" )\n",
" \n",
" tf.estimator.train_and_evaluate(model_estimator, train_spec, eval_spec)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from hops import experiment\n",
"from hops import hdfs\n",
"\n",
"notebook = hdfs.project_path() + \"Jupyter/Distributed_Training/collective_allreduce_strategy/mnist.ipynb\"\n",
"experiment.allreduce(allreduce_mnist,\n",
" name='mnist estimator', \n",
" description='A minimal mnist example with two hidden layers',\n",
" versioned_resources=[notebook], local_logdir=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Monitoring execution - TensorBoard <a class=\"anchor\" id='tensorboard'></a>\n",
"To find the TensorBoard for the execution, please go back to HopsWorks and enter the Experiments service.\n",
"Then copy & paste the experiment_id into the textbox and press enter to start a TensorBoard to see all experiments being run in parallel.\n",
"\n",
"![Image7-Monitor.png](../../images/experiments_service.png)\n",
"![Image7-Monitor.png](../../images/tensorboard.png)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "",
"name": "pysparkkernel"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 2
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# ParameterServerStrategy on Hops\n",
"---\n",
"\n",
"<font color='red'> <h3>Tested with TensorFlow 1.11</h3></font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def distributed_training():\n",
"\n",
" import sys\n",
" import numpy as np\n",
" import tensorflow as tf\n",
" import os\n",
" \n",
" from hops import tensorboard\n",
" from hops import devices\n",
"\n",
" def input_fn():\n",
" x = np.random.random((1024, 10))\n",
" y = np.random.randint(2, size=(1024, 1))\n",
" x = tf.cast(x, tf.float32)\n",
" dataset = tf.data.Dataset.from_tensor_slices((x, y))\n",
" dataset = dataset.repeat(500)\n",
" dataset = dataset.batch(32)\n",
" return dataset\n",
"\n",
" model_dir = tensorboard.logdir()\n",
" print('Using %s to store checkpoints.' % model_dir)\n",
"\n",
" # Define a Keras Model.\n",
" model = tf.keras.Sequential()\n",
" model.add(tf.keras.layers.Dense(16, activation='relu', input_shape=(10,)))\n",
" model.add(tf.keras.layers.Dense(1, activation='sigmoid'))\n",
"\n",
" # Compile the model.\n",
" optimizer = tf.train.GradientDescentOptimizer(0.2)\n",
" model.compile(loss='binary_crossentropy', optimizer=optimizer)\n",
" model.summary()\n",
" tf.keras.backend.set_learning_phase(True)\n",
"\n",
" # Define DistributionStrategies and convert the Keras Model to an\n",
" # Estimator that utilizes these DistributionStrateges.\n",
" # Evaluator is a single worker, so using MirroredStrategy.\n",
" run_config = tf.estimator.RunConfig(\n",
" experimental_distribute=tf.contrib.distribute.DistributeConfig(\n",
" train_distribute=tf.contrib.distribute.ParameterServerStrategy(\n",
" num_gpus_per_worker=devices.get_num_gpus()),\n",
" eval_distribute=tf.contrib.distribute.MirroredStrategy(\n",
" num_gpus_per_worker=devices.get_num_gpus())))\n",
" keras_estimator = tf.keras.estimator.model_to_estimator(keras_model=model, config=run_config, model_dir=model_dir)\n",
"\n",
" # Train and evaluate the model. Evaluation will be skipped if there is not an\n",
" # \"evaluator\" job in the cluster.\n",
" tf.estimator.train_and_evaluate(keras_estimator, train_spec=tf.estimator.TrainSpec(input_fn=input_fn),\n",
" eval_spec=tf.estimator.EvalSpec(input_fn=input_fn))\n",
" \n",
"from hops import experiment\n",
"experiment.parameter_server(distributed_training)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "",
"name": "pysparkkernel"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 2
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit 18f2a52

Please sign in to comment.