Skip to content

Commit

Permalink
Prepare for release 0.3.0 (#29)
Browse files Browse the repository at this point in the history
Prepares Maggy for the release of 0.3.0

- Adds developer documentation
- Fixes bugs
- Adds Ablation example
  • Loading branch information
moritzmeister committed Oct 15, 2019
1 parent 799d8b2 commit f8e5a90
Show file tree
Hide file tree
Showing 12 changed files with 651 additions and 28 deletions.
37 changes: 24 additions & 13 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
Maggy
=====

|Downloads| |PypiStatus| |PythonVersions| |Docs|


Maggy is a framework for efficient asynchronous optimization of expensive
black-box functions on top of Apache Spark. Compared to existing frameworks,
maggy is not bound to stage based optimization algorithms and therefore it is
able to make extensive use of early stopping in order to achieve efficient
resource utilization.

Right now, maggy supports asynchronous hyperparameter tuning of machine
learning and deep learning models, but other use cases include ablation studies
and asynchronous distributed training.
learning and deep learning models, and ablation studies on neural network
layers as well as input features.

Moreover, it provides a developer API that allows advanced usage by
implementing custom optimization algorithms and early stopping criteria.

In order to make decisions on early stopping, the Spark executors are sending
heart beats with the current performance of the model they are training to the
maggy experiment driver which is running on the Spark driver. We call the
process of training a model with a certain hyperparameter combination a
*trial*. The experiment driver then uses all information of finished trials and
the currently running ones to check in a specified interval, which of the
trials should be stopped early.
Subsequently, the experiment driver provides a new trial to the Spark
executor.
To accomodate asynchronous algorithms, support for communication between the
Driver and Executors via RPCs through Maggy was added. The Optimizer that guides
hyperparameter search is located on the Driver and it assigns trials to
Executors. Executors periodically send back to the Driver the current
performance of their trial, and the Optimizer can decide to early-stop any
ongoing trial and send the Executor a new trial instead.

Quick Start
-----------
Expand All @@ -31,8 +31,8 @@ To Install:

>>> pip install maggy

The programming model is that you wrap the code containing the model training
inside a wrapper function. Inside that wrapper function provide all imports and
The programming model consists of wrapping the code containing the model training
inside a function. Inside that wrapper function provide all imports and
parts that make up your experiment.

There are three requirements for this wrapper function:
Expand Down Expand Up @@ -94,4 +94,15 @@ see the Jupyter Notebook in the `examples` folder.
Documentation
-------------

Read our `blog post <https://www.logicalclocks.com/blog/scaling-machine-learning-and-deep-learning-with-pyspark-on-hopsworks>`_ for more details.

API documentation is available `here <https://maggy.readthedocs.io/en/latest/>`_.

.. |Downloads| image:: https://pepy.tech/badge/maggy/month
:target: https://pepy.tech/project/maggy
.. |PypiStatus| image:: https://img.shields.io/pypi/v/maggy?color=blue
:target: https://pypi.org/project/hops
.. |PythonVersions| image:: https://img.shields.io/pypi/pyversions/maggy.svg
:target: https://pypi.org/project/hops
.. |Docs| image:: https://img.shields.io/readthedocs/maggy
:target: https://maggy.readthedocs.io/en/latest/
19 changes: 18 additions & 1 deletion docs/developer.rst
Original file line number Diff line number Diff line change
@@ -1,2 +1,19 @@
Maggy Developer API
===================
===================

As a developer you have the possibility to implement your custom optimizers
or ablators. For that you can implement an abstract method, which you can then
pass as an argument when launching the experiment. For examples, please look at
existing optimizers and ablators.

maggy.optimizer module
-----------------------

.. autoclass:: maggy.optimizer.AbstractOptimizer
:members:

maggy.ablation.ablator module
-----------------------------

.. autoclass:: maggy.ablation.ablator.abstractablator.AbstractAblator
:members:
8 changes: 8 additions & 0 deletions docs/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,11 @@ maggy.callbacks module

.. autoclass:: maggy.callbacks.KerasEpochEnd
:members:

maggy.ablation module
---------------------

.. autoclass:: maggy.ablation.AblationStudy
:members:

.. automethod:: __init__
469 changes: 469 additions & 0 deletions examples/maggy-ablation-titanic-example.ipynb

Large diffs are not rendered by default.

94 changes: 90 additions & 4 deletions maggy/ablation/ablationstudy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,91 @@
class AblationStudy(object):
def __init__(self, training_dataset_name, training_dataset_version,
label_name, **kwargs):
"""The `AblationStudy` object is the entry point to define an ablation
study with maggy. This object can subsequently be passed as an argument
when the experiment is launched with `experiment.lagom()`.
Sample usage:
>>> from maggy.ablation import AblationStudy
>>> ablation_study = AblationStudy('titanic_train_dataset',
>>> label_name='survived')
Define your study by including layers and features, which should be
ablated:
>>> ablation_study.features.include('pclass', 'fare')
>>> ablation_study.model.layers.include('my_dense_two',
>>> 'my_dense_three')
You can also add a layer group using a list:
>>> ablation_study.model.layers.include_groups(['my_dense_two',
>>> 'my_dense_four'])
Or add a layer group using a prefix:
>>> ablation_study.model.layers.include_groups(prefix='my_dense')
Next you should define a base model function using the layer and feature
names you previously specified:
>>> # you only need to add the `name` parameter to layer initializers
>>> def base_model_generator():
>>> model = tf.keras.Sequential()
>>> model.add(tf.keras.layers.Dense(64, activation='relu'))
>>> model.add(tf.keras.layers.Dense(..., name='my_dense_two', ...)
>>> model.add(tf.keras.layers.Dense(32, activation='relu'))
>>> model.add(tf.keras.layers.Dense(..., name='my_dense_sigmoid', ...)
>>> # output layer
>>> model.add(tf.keras.layers.Dense(1, activation='linear'))
>>> return model
Make sure to include the generator function in the study:
>>> ablation_study.model.set_base_model_generator(base_model_generator)
Last but not least you can define your actual training function:
>>> from maggy import experiment
>>> from maggy.callbacks import KerasBatchEnd
>>> def training_function(dataset_function, model_function, reporter):
>>> import tensorflow as tf
>>> epochs = 5
>>> batch_size = 10
>>> tf_dataset = dataset_function(epochs, batch_size)
>>> model = model_function()
>>> model.compile(optimizer=tf.train.AdamOptimizer(0.001),
>>> loss='binary_crossentropy',
>>> metrics=['accuracy'])
>>> ### Maggy REPORTER
>>> callbacks = [KerasBatchEnd(reporter, metric='acc')]
>>> history = model.fit(tf_dataset, epochs=5, steps_per_epoch=30)
>>> return float(history.history['acc'][-1])
Lagom the experiment:
>>> result = experiment.lagom(map_fun=training_function,
>>> experiment_type='ablation',
>>> ablation_study=ablation_study,
>>> ablator='loco',
>>> name='Titanic-LOCO',
>>> hb_interval=5)
"""

def __init__(
self, training_dataset_name, training_dataset_version, label_name,
**kwargs):
"""Initializes the ablation study.
:param training_dataset_name: Name of the training dataset in the
featurestore.
:type training_dataset_name: str
:param training_dataset_version: Version of the training dataset to be
used.
:type training_dataset_version: int
:param label_name: Name of the target prediction label.
:type label_name: str
"""
self.features = Features()
self.model = Model()
self.hops_training_dataset_name = training_dataset_name
Expand All @@ -11,8 +96,9 @@ def __init__(self, training_dataset_name, training_dataset_version,
def to_dict(self):
"""
Returns the ablation study configuration as a Python dictionary.
:return: A dictionary with ablation study configuration parameters as keys
(i.e. 'training_dataset_name', 'included_features', etc.)
:return: A dictionary with ablation study configuration parameters as
keys (i.e. 'training_dataset_name', 'included_features', etc.)
:rtype: dict
"""
ablation_dict = {
Expand Down
2 changes: 1 addition & 1 deletion maggy/ablation/ablator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from maggy.ablation.ablator.abstractablator import AbstractAblator
from maggy.ablation.ablator.loco import LOCO
# from maggy.ablation.ablator.loco import LOCO
4 changes: 3 additions & 1 deletion maggy/ablation/ablator/abstractablator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def get_number_of_trials(self):
"""
If applicable, calculate and return the total number of trials of the ablation experiment.
Make sure to also include the base (reference) trial in the count.
:return: total number of trials of the ablation study experiment
:rtype: int
"""
Expand All @@ -25,7 +26,7 @@ def get_dataset_generator(self, ablated_feature, dataset_type='tfrecord'):
The returned function will be executed on the executor per each trial.
:param ablated_feature: the name of the feature to be excluded from the training dataset.
Must match a feature name in the corresponding feature group in the feature store.
Must match a feature name in the corresponding feature group in the feature store.
:type ablated_feature: str
:param dataset_type: type of the dataset. For now, we only support 'tfrecord'.
:return: A function that generates a TFRecordDataset
Expand Down Expand Up @@ -53,6 +54,7 @@ def get_trial(self, ablation_trial=None):
The trial should contain a dataset generator and a model generator.
Depending on the ablator policy, the trials could come from a list (buffer) of pre-made trials,
or generated on the fly.
:rtype: `Trial` or `None`
"""
pass
Expand Down
3 changes: 2 additions & 1 deletion maggy/core/experimentdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from maggy.earlystop import AbstractEarlyStop, MedianStoppingRule, NoStoppingRule
from maggy.searchspace import Searchspace

from maggy.ablation.ablator import AbstractAblator, LOCO
from maggy.ablation.ablator import AbstractAblator
from maggy.ablation.ablator.loco import LOCO
from maggy.ablation.ablationstudy import AblationStudy

from hops import constants as hopsconstants
Expand Down
6 changes: 3 additions & 3 deletions maggy/core/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ def start(self, exp_driver):
app_id = str(sc.applicationId)

method = hopsconstants.HTTP_CONFIG.HTTP_POST
connection = hopsutil._get_http_connection(https=True)
resource_url = hopsconstants.DELIMITERS.SLASH_DELIMITER + \
hopsconstants.REST_CONFIG.HOPSWORKS_REST_RESOURCE + hopsconstants.DELIMITERS.SLASH_DELIMITER + \
"maggy" + hopsconstants.DELIMITERS.SLASH_DELIMITER + "drivers"
Expand All @@ -341,8 +340,9 @@ def start(self, exp_driver):
headers = {hopsconstants.HTTP_CONFIG.HTTP_CONTENT_TYPE: hopsconstants.HTTP_CONFIG.HTTP_APPLICATION_JSON}

try:
response = hopsutil.send_request(connection, method, resource_url, body=json_embeddable, headers=headers)
if (response.status != 200):
response = hopsutil.send_request(method, resource_url, data=json_embeddable, headers=headers)

if (response.status_code // 100) != 2:
print("No connection to Hopsworks for logging.")
exp_driver._log("No connection to Hopsworks for logging.")
except Exception as e:
Expand Down
13 changes: 13 additions & 0 deletions maggy/optimizer/abstractoptimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,25 @@ def __init__(self):

@abstractmethod
def initialize(self):
"""
A hook for the developer to initialize the optimizer.
"""
pass

@abstractmethod
def get_suggestion(self, trial=None):
"""
Return a `Trial` to be assigned to an executor, or `None` if there are
no trials remaining in the experiment.
:rtype: `Trial` or `None`
"""
pass

@abstractmethod
def finalize_experiment(self, trials):
"""
This method will be called before finishing the experiment. Developers
can implement this method e.g. for cleanup or extra logging.
"""
pass
2 changes: 1 addition & 1 deletion maggy/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2'
__version__ = '0.3.0'
22 changes: 19 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,31 @@ def read(fname):
name='maggy',
version=__version__,
install_requires=[
'numpy'
'numpy==1.16.5'
],
extras_require={
'pydoop': ['pydoop'],
'tf': ['tensorflow==1.14.0'],
'docs': [
'sphinx==1.8.5',
'sphinx-autobuild',
'recommonmark',
'sphinx_rtd_theme',
'jupyter_sphinx_theme'
],
'test': [
'pylint',
'pytest',
],
'spark': ['pyspark==2.4.3']
},
author='Moritz Meister',
author_email='meister.mo@gmail.com',
description='',
description='Efficient asynchronous optimization of expensive black-box functions on top of Apache Spark',
license='GNU Affero General Public License v3',
keywords='Hyperparameter, Optimization, Auto-ML, Hops, Hadoop, TensorFlow, Spark',
url='https://github.com/logicalclocks/maggy',
# download_url = 'http://snurran.sics.se/hops/hops-util-py/hops-' + __version__ + '.tar.gz',
download_url='http://snurran.sics.se/hops/maggy/maggy-' + __version__ + '.tar.gz',
packages=find_packages(),
long_description=read('README.rst'),
classifiers=[
Expand Down

0 comments on commit f8e5a90

Please sign in to comment.