diff --git a/.travis.yml b/.travis.yml index 8aa36e4..ae99a77 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,15 +1,18 @@ sudo: required -dist: trusty +dist: bionic language: python python: - - "2.7" - - "3.4" + - "3.7" +before_install: + - sudo add-apt-repository -y ppa:openjdk-r/ppa + - sudo apt-get -qq update + - sudo apt-get install -y openjdk-8-jdk --no-install-recommends + - sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 + - export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 +env: + - SPARK_LOCAL_IP=0.0.0.0 install: - - if [[ "$TRAVIS_PYTHON_VERSION" == "2.7" ]]; then - wget https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh -O miniconda.sh; - else - wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh; - fi + - wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh; - bash miniconda.sh -b -p $HOME/miniconda - export PATH="$HOME/miniconda/bin:$PATH" - hash -r @@ -22,7 +25,6 @@ install: - pip install -e .[tests] script: - - python -c "import keras.backend" - - py.test --pep8 -m pep8 tests/ + - pytest tests/ after_success: - coveralls diff --git a/README.md b/README.md index 1b282ee..2be3d4a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,3 @@ -This project is now archived. It's been fun working on it, but it's time for me to move on. Thank you for all the support and feedback over the last couple of years. Daniel Cahall has kindly agreed to continue development for this project [in his fork](https://github.com/danielenricocahall/elephas) and will handle future releases. Thank you! :v: ---- # Elephas: Distributed Deep Learning with Keras & Spark @@ -75,9 +73,9 @@ sc = SparkContext(conf=conf) Next, you define and compile a Keras model ```python -from keras.models import Sequential -from keras.layers.core import Dense, Dropout, Activation -from keras.optimizers import SGD +from tensorflow.keras.models import Sequential +from tensorflow.keras.layers import Dense, Dropout, Activation +from tensorflow.keras.optimizers import SGD model = Sequential() model.add(Dense(128, input_dim=784)) model.add(Activation('relu')) @@ -98,7 +96,7 @@ rdd = to_simple_rdd(sc, x_train, y_train) The basic model in Elephas is the `SparkModel`. You initialize a `SparkModel` by passing in a compiled Keras model, an update frequency and a parallelization mode. After that you can simply `fit` the model on your RDD. Elephas `fit` -has the same options as a Keras model, so you can pass `epochs`, `batch_size` etc. as you're used to from Keras. +has the same options as a Keras model, so you can pass `epochs`, `batch_size` etc. as you're used to from tensorflow.keras. ```python from elephas.spark_model import SparkModel @@ -181,8 +179,8 @@ from hyperopt import STATUS_OK from hyperas.distributions import choice, uniform def data(): - from keras.datasets import mnist - from keras.utils import np_utils + from tensorflow.keras.datasets import mnist + from tensorflow.keras.utils import to_categorical (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train = x_train.reshape(60000, 784) x_test = x_test.reshape(10000, 784) @@ -191,15 +189,15 @@ def data(): x_train /= 255 x_test /= 255 nb_classes = 10 - y_train = np_utils.to_categorical(y_train, nb_classes) - y_test = np_utils.to_categorical(y_test, nb_classes) + y_train = to_categorical(y_train, nb_classes) + y_test = to_categorical(y_test, nb_classes) return x_train, y_train, x_test, y_test def model(x_train, y_train, x_test, y_test): - from keras.models import Sequential - from keras.layers.core import Dense, Dropout, Activation - from keras.optimizers import RMSprop + from tensorflow.keras.models import Sequential + from tensorflow.keras.layers import Dense, Dropout, Activation + from tensorflow.keras.optimizers import RMSprop model = Sequential() model.add(Dense(512, input_shape=(784,))) diff --git a/docs/autogen.py b/docs/autogen.py index 6fd4566..90406d5 100644 --- a/docs/autogen.py +++ b/docs/autogen.py @@ -1,8 +1,5 @@ # -*- coding: utf-8 -*- -from __future__ import print_function -from __future__ import unicode_literals - import re import inspect import os diff --git a/elephas/hyperparam.py b/elephas/hyperparam.py index 3b6725b..a4e433f 100644 --- a/elephas/hyperparam.py +++ b/elephas/hyperparam.py @@ -1,10 +1,8 @@ -from __future__ import print_function -from __future__ import absolute_import from hyperopt import Trials, rand from hyperas.ensemble import VotingModel from hyperas.optim import get_hyperopt_model_string, base_minimizer import numpy as np -from keras.models import model_from_yaml +from tensorflow.keras.models import model_from_yaml import six.moves.cPickle as pickle from six.moves import range # depend on hyperas, boto etc. is optional diff --git a/elephas/ml/adapter.py b/elephas/ml/adapter.py index 030d30c..0d5b293 100644 --- a/elephas/ml/adapter.py +++ b/elephas/ml/adapter.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from pyspark.sql import SQLContext from pyspark.mllib.regression import LabeledPoint from ..utils.rdd_utils import from_labeled_point, to_labeled_point, lp_to_simple_rdd diff --git a/elephas/ml/params.py b/elephas/ml/params.py index de8a043..fddb3e7 100644 --- a/elephas/ml/params.py +++ b/elephas/ml/params.py @@ -1,4 +1,3 @@ -from __future__ import absolute_import from pyspark.ml.param.shared import Param, Params diff --git a/elephas/ml_model.py b/elephas/ml_model.py index e9d3295..c47a12c 100644 --- a/elephas/ml_model.py +++ b/elephas/ml_model.py @@ -1,18 +1,18 @@ -from __future__ import absolute_import, print_function +from enum import Enum +import tensorflow.keras as keras import numpy as np import copy import h5py import json from pyspark.ml.param.shared import HasOutputCol, HasFeaturesCol, HasLabelCol -from pyspark import keyword_only +from pyspark import keyword_only, RDD from pyspark.ml import Estimator, Model from pyspark.sql.types import StringType, DoubleType, StructField -from keras.models import model_from_yaml -from keras.optimizers import get as get_optimizer - +from tensorflow.keras.models import model_from_yaml +from tensorflow.keras.optimizers import get as get_optimizer from .spark_model import SparkModel from .utils.rdd_utils import from_vector @@ -30,6 +30,7 @@ class ElephasEstimator(Estimator, HasCategoricalLabels, HasValidationSplit, HasK Returns a trained model in form of a SparkML Model, which is also a Transformer. """ + @keyword_only def __init__(self, **kwargs): super(ElephasEstimator, self).__init__() @@ -69,6 +70,9 @@ def set_params(self, **kwargs): """ return self._set(**kwargs) + def get_model(self): + return model_from_yaml(self.get_keras_model_config()) + def _fit(self, df): """Private fit method of the Estimator, which trains the model. """ @@ -96,7 +100,8 @@ def _fit(self, df): return ElephasTransformer(labelCol=self.getLabelCol(), outputCol='prediction', keras_model_config=spark_model.master_network.to_yaml(), - weights=weights) + weights=weights, + loss=loss) def load_ml_estimator(file_name): @@ -110,12 +115,16 @@ class ElephasTransformer(Model, HasKerasModelConfig, HasLabelCol, HasOutputCol): """SparkML Transformer implementation. Contains a trained model, with which new feature data can be transformed into labels. """ + @keyword_only def __init__(self, **kwargs): super(ElephasTransformer, self).__init__() if "weights" in kwargs.keys(): # Strip model weights from parameters to init Transformer self.weights = kwargs.pop('weights') + if "loss" in kwargs.keys(): + # Extract loss from parameters + self.model_type = LossModelTypeMapper().get_model_type(kwargs.pop('loss')) self.set_params(**kwargs) @keyword_only @@ -152,16 +161,12 @@ def _transform(self, df): new_schema.add(StructField(output_col, StringType(), True)) rdd = df.rdd.coalesce(1) - features = np.asarray( - rdd.map(lambda x: from_vector(x.features)).collect()) + features = np.asarray(rdd.map(lambda x: from_vector(x.features)).collect()) # Note that we collect, since executing this on the rdd would require model serialization once again model = model_from_yaml(self.get_keras_model_config()) model.set_weights(self.weights.value) - predictions = rdd.ctx.parallelize( - model.predict_classes(features)).coalesce(1) - predictions = predictions.map(lambda x: tuple(str(x))) - results_rdd = rdd.zip(predictions).map(lambda x: x[0] + x[1]) + results_rdd = compute_predictions(model, self.model_type, rdd, features) results_df = df.sql_ctx.createDataFrame(results_rdd, new_schema) results_df = results_df.withColumn( output_col, results_df[output_col].cast(DoubleType())) @@ -176,3 +181,76 @@ def load_ml_transformer(file_name): elephas_conf = json.loads(f.attrs.get('distributed_config')) config = elephas_conf.get('config') return ElephasTransformer(**config) + + +class ModelType(Enum): + CLASSIFICATION = 1 + REGRESSION = 2 + + +class _Singleton(type): + """ A metaclass that creates a Singleton base class when called. """ + _instances = {} + + def __call__(cls, *args): + if cls not in cls._instances: + cls._instances[cls] = super(_Singleton, cls).__call__(*args) + return cls._instances[cls] + + +class Singleton(_Singleton('SingletonMeta', (object,), {})): + pass + + +class LossModelTypeMapper(Singleton): + """ + Mapper for losses -> model type + """ + def __init__(self): + loss_to_model_type = {} + loss_to_model_type.update( + {'mean_squared_error': ModelType.REGRESSION, + 'mean_absolute_error': ModelType.REGRESSION, + 'mse': ModelType.REGRESSION, + 'mae': ModelType.REGRESSION, + 'cosine_proximity': ModelType.REGRESSION, + 'mean_absolute_percentage_error': ModelType.REGRESSION, + 'mean_squared_logarithmic_error': ModelType.REGRESSION, + 'logcosh': ModelType.REGRESSION, + 'binary_crossentropy': ModelType.CLASSIFICATION, + 'categorical_crossentropy': ModelType.CLASSIFICATION, + 'sparse_categorical_crossentropy': ModelType.CLASSIFICATION}) + self.__mapping = loss_to_model_type + + def get_model_type(self, loss): + return self.__mapping.get(loss) + + def register_loss(self, loss, model_type): + if callable(loss): + loss = loss.__name__ + self.__mapping.update({loss: model_type}) + + +def compute_predictions(model, model_type, rdd, features): + predict_function = determine_predict_function(model, model_type) + predictions = rdd.ctx.parallelize(predict_function(features)).coalesce(1) + if model_type == ModelType.CLASSIFICATION: + predictions = predictions.map(lambda x: tuple(str(x))) + else: + predictions = predictions.map(lambda x: tuple([float(x)])) + results_rdd = rdd.zip(predictions).map(lambda x: x[0] + x[1]) + return results_rdd + + +def determine_predict_function(model, + model_type): + if model_type == ModelType.CLASSIFICATION: + if isinstance(model, keras.models.Sequential): + predict_function = model.predict_classes + else: + # support for functional API + predict_function = lambda x: model.predict(x).argmax(axis=-1) + else: + predict_function = model.predict + + return predict_function diff --git a/elephas/mllib/adapter.py b/elephas/mllib/adapter.py index 9814faf..fdd62d5 100644 --- a/elephas/mllib/adapter.py +++ b/elephas/mllib/adapter.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from pyspark.mllib.linalg import Matrices, Vectors diff --git a/elephas/parameter/client.py b/elephas/parameter/client.py index 3e84527..7f8f741 100644 --- a/elephas/parameter/client.py +++ b/elephas/parameter/client.py @@ -1,10 +1,10 @@ -from __future__ import absolute_import -from __future__ import print_function - import abc + import numpy as np import socket import six.moves.cPickle as pickle + + try: import urllib.request as urllib2 except ImportError: @@ -13,16 +13,20 @@ from ..utils.sockets import determine_master, send, receive -class BaseParameterClient(object): +class BaseParameterClient(abc.ABC): """BaseParameterClient - Parameter-server clients can do two things: retrieve the current parameters from the corresponding server, and send updates (`delta`) to the server. """ - __metaclass__ = abc.ABCMeta + client_type = 'base' - def __init__(self): - raise NotImplementedError + @classmethod + def get_client(cls, client_type, port=4000): + try: + return next(cl for cl in cls.__subclasses__() if cl.client_type == client_type)(port) + except StopIteration: + raise ValueError("Parameter server mode has to be either `http` or `socket`, " + "got {}".format(client_type)) @abc.abstractmethod def update_parameters(self, delta): @@ -39,14 +43,14 @@ def get_parameters(self): class HttpClient(BaseParameterClient): """HttpClient - Uses HTTP protocol for communication with its corresponding parameter server, namely HttpServer. The HTTP server provides two endpoints, `/parameters` to get parameters and `/update` to update the server's parameters. """ - def __init__(self, port=4000): + client_type = 'http' + def __init__(self, port=4000): self.master_url = determine_master(port=port) self.headers = {'Content-Type': 'application/elephas'} @@ -64,23 +68,27 @@ def update_parameters(self, delta): class SocketClient(BaseParameterClient): """SocketClient - Uses a socket connection to communicate with an instance of `SocketServer`. The socket server listens to two types of events. Those with a `g` prefix indicate a get-request, those with a `u` indicate a parameter update. """ + client_type = 'socket' def __init__(self, port=4000): - - host = self.master_url.split(':')[0] - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.connect((host, port)) + self.port = port def get_parameters(self): - self.socket.sendall(b'g') - return np.asarray(receive(self.socket)) + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + host = determine_master(port=self.port).split(':')[0] + sock.connect((host, self.port)) + sock.sendall(b'g') + data = np.asarray(receive(sock)) + return data def update_parameters(self, delta): - data = {'delta': delta} - self.socket.sendall(b'u') - send(self.socket, data) + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + host = determine_master(port=self.port).split(':')[0] + sock.connect((host, self.port)) + data = {'delta': delta} + sock.sendall(b'u') + send(sock, data) diff --git a/elephas/parameter/factory.py b/elephas/parameter/factory.py new file mode 100644 index 0000000..1459fd9 --- /dev/null +++ b/elephas/parameter/factory.py @@ -0,0 +1,42 @@ +from abc import ABC, abstractmethod + +from elephas.parameter import HttpClient, HttpServer, SocketClient, SocketServer + + +class ClientServerFactory(ABC): + _type = 'base' + + @classmethod + def get_factory(cls, _type): + try: + return next(cl for cl in cls.__subclasses__() if cl._type == _type)() + except StopIteration: + raise ValueError("Unknown factory type {}".format(_type)) + + @abstractmethod + def create_client(self, *args, **kwargs): + pass + + @abstractmethod + def create_server(self, *args, **kwargs): + pass + + +class HttpFactory(ClientServerFactory): + _type = 'http' + + def create_client(self, *args, **kwargs): + return HttpClient(*args, **kwargs) + + def create_server(self, *args, **kwargs): + return HttpServer(*args, **kwargs) + + +class SocketFactory(ClientServerFactory): + _type = 'socket' + + def create_client(self, *args, **kwargs): + return SocketClient(*args, **kwargs) + + def create_server(self, *args, **kwargs): + return SocketServer(*args, **kwargs) \ No newline at end of file diff --git a/elephas/parameter/server.py b/elephas/parameter/server.py index 77e610f..b3b58bb 100644 --- a/elephas/parameter/server.py +++ b/elephas/parameter/server.py @@ -19,10 +19,10 @@ class BaseParameterServer(object): Parameter servers can be started and stopped. Server implementations have to cater to the needs of their respective BaseParameterClient instances. """ - __metaclass__ = abc.ABCMeta - def __init__(self): - raise NotImplementedError + def __init__(self, model, port, **kwargs): + self.master_network = dict_to_model(model) + self.port = port @abc.abstractmethod def start(self): @@ -45,8 +45,7 @@ class HttpServer(BaseParameterServer): POST updates. """ - def __init__(self, model, mode, port=4000, debug=True, - threaded=True, use_reloader=False): + def __init__(self, model, port, **kwargs): """Initializes and HTTP server from a serialized Keras model a parallelisation mode and a port to run the Flask application on. In hogwild mode no read- or write-locks will be acquired, in asynchronous @@ -60,20 +59,18 @@ def __init__(self, model, mode, port=4000, debug=True, :param use_reloader: boolean, Flask `use_reloader` argument """ - self.master_network = dict_to_model(model) - self.mode = mode + super().__init__(model, port, **kwargs) + self.mode = kwargs.get('mode') self.master_url = None - self.port = port - if is_running_in_notebook(): self.threaded = False self.use_reloader = False self.debug = False else: - self.debug = debug - self.threaded = threaded - self.use_reloader = use_reloader + self.debug = kwargs.get("debug", True) + self.threaded = kwargs.get("threaded", True) + self.use_reloader = kwargs.get("use_reloader", False) self.lock = Lock() self.pickled_weights = None @@ -145,7 +142,7 @@ class SocketServer(BaseParameterServer): """ - def __init__(self, model, port=4000): + def __init__(self, model, port, **kwargs): """Initializes a Socket server instance from a serializer Keras model and a port to listen to. @@ -153,8 +150,7 @@ def __init__(self, model, port=4000): :param port: int, port to run the socket on """ - self.model = dict_to_model(model) - self.port = port + super().__init__(model, port, **kwargs) self.socket = None self.runs = False self.connections = [] @@ -175,7 +171,9 @@ def stop(self): def start_server(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - sock.bind(('0.0.0.0', self.port)) + master_url = determine_master(port=self.port).split(':')[0] + host = master_url.split(':')[0] + sock.bind((host, self.port)) sock.listen(5) self.socket = sock self.runs = True @@ -190,7 +188,8 @@ def stop_server(self): self.socket.close() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: - sock.connect(("localhost", self.port)) + host = determine_master(port=self.port).split(':')[0] + sock.connect((host, self.port)) sock.close() except Exception: pass @@ -200,14 +199,17 @@ def stop_server(self): def update_parameters(self, conn): data = receive(conn) delta = data['delta'] - with self.lock: - weights = self.model.get_weights() + delta - self.model.set_weights(weights) + weights = self.master_network.get_weights() + self.lock.acquire_write() + # apply the gradient + self.master_network.set_weights(subtract_params(weights, delta)) + self.lock.release() def get_parameters(self, conn): - with self.lock: - weights = self.model.get_weights() + self.lock.acquire_read() + weights = self.master_network.get_weights() send(conn, weights) + self.lock.release() def action_listener(self, conn): while self.runs: @@ -216,15 +218,12 @@ def action_listener(self, conn): self.update_parameters(conn) elif get_or_update == 'g': self.get_parameters(conn) - else: - raise ValueError('Received invalid action') def run(self): while self.runs: - try: - conn, addr = self.socket.accept() - thread = Thread(target=self.action_listener, args=(conn, addr)) - thread.start() - self.connections.append(thread) - except Exception: - print("Failed to set up socket connection.") + conn, addr = self.socket.accept() + thread = Thread(target=self.action_listener, args=(conn, )) + thread.start() + self.connections.append(thread) + + diff --git a/elephas/spark_model.py b/elephas/spark_model.py index 24ed6b8..b64e9dd 100644 --- a/elephas/spark_model.py +++ b/elephas/spark_model.py @@ -1,20 +1,16 @@ -from __future__ import absolute_import -from __future__ import print_function - import pyspark import h5py import json -from keras.optimizers import serialize as serialize_optimizer -from keras.optimizers import get as get_optimizer -from keras.models import load_model +from tensorflow.keras.optimizers import serialize as serialize_optimizer +from tensorflow.keras.optimizers import get as get_optimizer +from tensorflow.keras.models import load_model +from .parameter.factory import ClientServerFactory from .utils import subtract_params from .utils import lp_to_simple_rdd from .utils import model_to_dict from .mllib import to_matrix, from_matrix, to_vector, from_vector from .worker import AsynchronousSparkWorker, SparkWorker -from .parameter import HttpServer, SocketServer -from .parameter import HttpClient, SocketClient class SparkModel(object): @@ -41,7 +37,7 @@ def __init__(self, model, mode='asynchronous', frequency='epoch', parameter_ser if not hasattr(model, "loss"): raise Exception( "Compile your Keras model before initializing an Elephas model with it") - metrics = model.metrics + metrics = [metric.name for metric in model.metrics] loss = model.loss optimizer = serialize_optimizer(model.optimizer) @@ -65,23 +61,9 @@ def __init__(self, model, mode='asynchronous', frequency='epoch', parameter_ser self.serialized_model = model_to_dict(model) if self.mode is not 'synchronous': - if self.parameter_server_mode == 'http': - self.parameter_server = HttpServer( - self.serialized_model, self.mode, self.port) - self.client = HttpClient(self.port) - elif self.parameter_server_mode == 'socket': - self.parameter_server = SocketServer(self.serialized_model) - self.client = SocketClient() - else: - raise ValueError("Parameter server mode has to be either `http` or `socket`, " - "got {}".format(self.parameter_server_mode)) - - @staticmethod - def get_train_config(epochs, batch_size, verbose, validation_split): - return {'epochs': epochs, - 'batch_size': batch_size, - 'verbose': verbose, - 'validation_split': validation_split} + factory = ClientServerFactory.get_factory(self.parameter_server_mode) + self.parameter_server = factory.create_server(self.serialized_model, self.port, mode=self.mode) + self.client = factory.create_client(self.port) def get_config(self): base_config = { @@ -131,8 +113,7 @@ def predict_classes(self, data): """ return self._master_network.predict_classes(data) - def fit(self, rdd, epochs=10, batch_size=32, - verbose=0, validation_split=0.1): + def fit(self, rdd, **kwargs): """ Train an elephas model on an RDD. The Keras model configuration as specified in the elephas model is sent to Spark workers, abd each worker will be trained @@ -149,12 +130,12 @@ def fit(self, rdd, epochs=10, batch_size=32, rdd = rdd.repartition(self.num_workers) if self.mode in ['asynchronous', 'synchronous', 'hogwild']: - self._fit(rdd, epochs, batch_size, verbose, validation_split) + self._fit(rdd, **kwargs) else: raise ValueError( "Choose from one of the modes: asynchronous, synchronous or hogwild") - def _fit(self, rdd, epochs, batch_size, verbose, validation_split): + def _fit(self, rdd, **kwargs): """Protected train method to make wrapping of modes easier """ self._master_network.compile(optimizer=get_optimizer(self.master_optimizer), @@ -162,9 +143,7 @@ def _fit(self, rdd, epochs, batch_size, verbose, validation_split): metrics=self.master_metrics) if self.mode in ['asynchronous', 'hogwild']: self.start_server() - train_config = self.get_train_config( - epochs, batch_size, verbose, validation_split) - mode = self.parameter_server_mode + train_config = kwargs freq = self.frequency optimizer = self.master_optimizer loss = self.master_loss @@ -178,7 +157,7 @@ def _fit(self, rdd, epochs, batch_size, verbose, validation_split): if self.mode in ['asynchronous', 'hogwild']: print('>>> Initialize workers') worker = AsynchronousSparkWorker( - yaml, parameters, mode, train_config, freq, optimizer, loss, metrics, custom) + yaml, parameters, self.client, train_config, freq, optimizer, loss, metrics, custom) print('>>> Distribute load') rdd.mapPartitions(worker.train).collect() print('>>> Async training complete.') diff --git a/elephas/utils/functional_utils.py b/elephas/utils/functional_utils.py index 83a11fb..1b3f478 100644 --- a/elephas/utils/functional_utils.py +++ b/elephas/utils/functional_utils.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import numpy as np from six.moves import zip @@ -11,10 +9,7 @@ def add_params(param_list_left, param_list_right): :param param_list_right: list of numpy arrays :return: list of numpy arrays """ - res = [] - for x, y in zip(param_list_left, param_list_right): - res.append(x + y) - return res + return [x + y for x, y in zip(param_list_left, param_list_right)] def subtract_params(param_list_left, param_list_right): @@ -24,10 +19,7 @@ def subtract_params(param_list_left, param_list_right): :param param_list_right: list of numpy arrays :return: list of numpy arrays """ - res = [] - for x, y in zip(param_list_left, param_list_right): - res.append(x - y) - return res + return [x - y for x, y in zip(param_list_left, param_list_right)] def get_neutral(array_list): @@ -37,10 +29,7 @@ def get_neutral(array_list): :param array_list: list of numpy arrays :return: list of zeros of same shape as input """ - res = [] - for x in array_list: - res.append(np.zeros_like(x)) - return res + return [np.zeros_like(x) for x in array_list] def divide_by(array_list, num_workers): @@ -50,6 +39,4 @@ def divide_by(array_list, num_workers): :param num_workers: :return: """ - for i, x in enumerate(array_list): - array_list[i] /= num_workers - return array_list + return [x / num_workers for x in array_list] diff --git a/elephas/utils/rdd_utils.py b/elephas/utils/rdd_utils.py index 1765dbc..f4c704f 100644 --- a/elephas/utils/rdd_utils.py +++ b/elephas/utils/rdd_utils.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - from pyspark.mllib.regression import LabeledPoint import numpy as np diff --git a/elephas/utils/rwlock.py b/elephas/utils/rwlock.py index f1e1e86..d4ceaf5 100644 --- a/elephas/utils/rwlock.py +++ b/elephas/utils/rwlock.py @@ -2,7 +2,6 @@ Many readers can hold the lock XOR one and only one writer http://majid.info/blog/a-reader-writer-lock-for-python/ """ -from __future__ import absolute_import import threading version = """$Id: 04-1.html,v 1.3 2006/12/05 17:45:12 majid Exp $""" diff --git a/elephas/utils/serialization.py b/elephas/utils/serialization.py index db5bc04..957d7ee 100644 --- a/elephas/utils/serialization.py +++ b/elephas/utils/serialization.py @@ -1,4 +1,4 @@ -from keras.models import model_from_json +from tensorflow.keras.models import model_from_json def model_to_dict(model): diff --git a/elephas/utils/sockets.py b/elephas/utils/sockets.py index 89541eb..23e39e1 100644 --- a/elephas/utils/sockets.py +++ b/elephas/utils/sockets.py @@ -30,7 +30,7 @@ def _receive_all(socket, num_bytes): :return: received data """ - buffer = '' + buffer = b'' buffer_size = 0 bytes_left = num_bytes while buffer_size < num_bytes: diff --git a/elephas/worker.py b/elephas/worker.py index 4e142d0..62480f9 100644 --- a/elephas/worker.py +++ b/elephas/worker.py @@ -1,11 +1,11 @@ import numpy as np from itertools import tee -from keras.utils.generic_utils import slice_arrays -from keras.models import model_from_yaml -from keras.optimizers import get as get_optimizer +from tensorflow.keras.models import model_from_yaml +from tensorflow.keras.optimizers import get as get_optimizer +from tensorflow.python.keras.utils.generic_utils import slice_arrays from .utils import subtract_params -from .parameter import SocketClient, HttpClient +from .parameter import BaseParameterClient class SparkWorker(object): @@ -53,16 +53,15 @@ class AsynchronousSparkWorker(object): """Asynchronous Spark worker. This code will be executed on workers. """ - def __init__(self, yaml, parameters, parameter_server_mode, train_config, frequency, + def __init__(self, yaml, parameters, client, train_config, frequency, master_optimizer, master_loss, master_metrics, custom_objects): - if parameter_server_mode == 'http': - self.client = HttpClient() - elif parameter_server_mode == 'socket': - self.client = SocketClient() + if isinstance(client, BaseParameterClient): + # either supply a client object directly + self.client = client else: - raise ValueError("Parameter server mode has to be either `http` or `socket`, " - "got {}".format(parameter_server_mode)) + # or a string to create a client + self.client = BaseParameterClient.get_client(client) self.train_config = train_config self.frequency = frequency @@ -85,7 +84,6 @@ def train(self, data_iterator): if x_train.size == 0: return - optimizer = get_optimizer(self.master_optimizer) self.model = model_from_yaml(self.yaml, self.custom_objects) self.model.compile(optimizer=get_optimizer(self.master_optimizer), loss=self.master_loss, metrics=self.master_metrics) diff --git a/examples/Spark_ML_Pipeline.ipynb b/examples/Spark_ML_Pipeline.ipynb index ed8be8e..0522ea9 100644 --- a/examples/Spark_ML_Pipeline.ipynb +++ b/examples/Spark_ML_Pipeline.ipynb @@ -301,9 +301,9 @@ }, "outputs": [], "source": [ - "from keras.models import Sequential\n", - "from keras.layers.core import Dense, Dropout, Activation\n", - "from keras.utils import np_utils, generic_utils\n", + "from tensorflow.keras.models import Sequential\n", + "from tensorflow.keras.layers import Dense, Dropout, Activation\n", + "from tensorflow.keras.utils import to_categorical, generic_utils\n", "\n", "nb_classes = train_df.select(\"category\").distinct().count()\n", "input_dim = len(train_df.select(\"features\").first()[0])\n", @@ -355,7 +355,7 @@ ], "source": [ "from elephas.ml_model import ElephasEstimator\n", - "from keras import optimizers\n", + "from tensorflow.keras import optimizers\n", "\n", "\n", "adam = optimizers.Adam(lr=0.01)\n", diff --git a/examples/basic_import.py b/examples/basic_import.py index 4556052..0889667 100644 --- a/examples/basic_import.py +++ b/examples/basic_import.py @@ -1,6 +1,6 @@ from elephas.java import java_classes, adapter -from keras.models import Sequential -from keras.layers import Dense +from tensorflow.keras.models import Sequential +from tensorflow.keras.layers import Dense model = Sequential() diff --git a/examples/elephas_import.py b/examples/elephas_import.py index 5af85e7..9d8802d 100644 --- a/examples/elephas_import.py +++ b/examples/elephas_import.py @@ -2,7 +2,7 @@ from elephas.dl4j import ParameterAveragingModel from elephas.utils import rdd_utils import keras -from keras.utils import np_utils +from tensorflow.keras.utils import to_categorical def main(): @@ -27,8 +27,8 @@ def main(): x_test = x_test.astype("float64") # Convert class vectors to binary class matrices - y_train = np_utils.to_categorical(y_train, 10) - y_test = np_utils.to_categorical(y_test, 10) + y_train = to_categorical(y_train, 10) + y_test = to_categorical(y_test, 10) y_train = y_train.astype("float64") y_test = y_test.astype("float64") x_train /= 255 diff --git a/examples/hyperparam_optimization.py b/examples/hyperparam_optimization.py index b94d133..18f6adb 100644 --- a/examples/hyperparam_optimization.py +++ b/examples/hyperparam_optimization.py @@ -14,8 +14,8 @@ def data(): used in model function below. This function is separated from model() so that hyperopt won't reload data for each evaluation run. """ - from keras.datasets import mnist - from keras.utils import np_utils + from tensorflow.keras.datasets import mnist + from tensorflow.keras.utils import to_categorical (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train = x_train.reshape(60000, 784) x_test = x_test.reshape(10000, 784) @@ -24,8 +24,8 @@ def data(): x_train /= 255 x_test /= 255 nb_classes = 10 - y_train = np_utils.to_categorical(y_train, nb_classes) - y_test = np_utils.to_categorical(y_test, nb_classes) + y_train = to_categorical(y_train, nb_classes) + y_test = to_categorical(y_test, nb_classes) return x_train, y_train, x_test, y_test @@ -39,9 +39,9 @@ def model(x_train, y_train, x_test, y_test): The last one is optional, though recommended, namely: - model: specify the model just created so that we can later use it again. """ - from keras.models import Sequential - from keras.layers.core import Dense, Dropout, Activation - from keras.optimizers import RMSprop + from tensorflow.keras.models import Sequential + from tensorflow.keras.layers import Dense, Dropout, Activation + from tensorflow.keras.optimizers import RMSprop keras_model = Sequential() keras_model.add(Dense(512, input_shape=(784,))) diff --git a/examples/ml_mlp.py b/examples/ml_mlp.py index 6c7b029..ec9581b 100644 --- a/examples/ml_mlp.py +++ b/examples/ml_mlp.py @@ -1,11 +1,8 @@ -from __future__ import absolute_import -from __future__ import print_function - -from keras.datasets import mnist -from keras.models import Sequential -from keras.layers.core import Dense, Dropout, Activation -from keras.utils import np_utils -from keras import optimizers +from tensorflow.keras.datasets import mnist +from tensorflow.keras.models import Sequential +from tensorflow.keras.layers import Dense, Dropout, Activation +from tensorflow.keras.utils import to_categorical +from tensorflow.keras import optimizers from elephas.ml_model import ElephasEstimator from elephas.ml.adapter import to_data_frame @@ -33,8 +30,8 @@ print(x_test.shape[0], 'test samples') # Convert class vectors to binary class matrices -y_train = np_utils.to_categorical(y_train, nb_classes) -y_test = np_utils.to_categorical(y_test, nb_classes) +y_train = to_categorical(y_train, nb_classes) +y_test = to_categorical(y_test, nb_classes) model = Sequential() model.add(Dense(128, input_dim=784)) diff --git a/examples/ml_pipeline_otto.py b/examples/ml_pipeline_otto.py index 3341915..dce0fd2 100644 --- a/examples/ml_pipeline_otto.py +++ b/examples/ml_pipeline_otto.py @@ -1,6 +1,3 @@ -from __future__ import print_function -from __future__ import absolute_import - from pyspark.ml.linalg import Vectors import numpy as np import random @@ -10,9 +7,9 @@ from pyspark.ml.feature import StringIndexer, StandardScaler from pyspark.ml import Pipeline -from keras import optimizers -from keras.models import Sequential -from keras.layers import Dense, Dropout, Activation +from tensorflow.keras import optimizers +from tensorflow.keras.models import Sequential +from tensorflow.keras.layers import Dense, Dropout, Activation from elephas.ml_model import ElephasEstimator diff --git a/examples/mllib_mlp.py b/examples/mllib_mlp.py index 70b1df5..f575b0c 100644 --- a/examples/mllib_mlp.py +++ b/examples/mllib_mlp.py @@ -1,11 +1,8 @@ -from __future__ import absolute_import -from __future__ import print_function - -from keras.datasets import mnist -from keras.models import Sequential -from keras.layers.core import Dense, Dropout, Activation -from keras.optimizers import RMSprop -from keras.utils import np_utils +from tensorflow.keras.datasets import mnist +from tensorflow.keras.models import Sequential +from tensorflow.keras.layers import Dense, Dropout, Activation +from tensorflow.keras.optimizers import RMSprop +from tensorflow.keras.utils import to_categorical from elephas.spark_model import SparkMLlibModel from elephas.utils.rdd_utils import to_labeled_point @@ -30,8 +27,8 @@ print(x_test.shape[0], 'test samples') # Convert class vectors to binary class matrices -y_train = np_utils.to_categorical(y_train, nb_classes) -y_test = np_utils.to_categorical(y_test, nb_classes) +y_train = to_categorical(y_train, nb_classes) +y_test = to_categorical(y_test, nb_classes) model = Sequential() model.add(Dense(128, input_dim=784)) @@ -54,7 +51,7 @@ # Build RDD from numpy features and labels lp_rdd = to_labeled_point(sc, x_train, y_train, categorical=True) -# Initialize SparkModel from Keras model and Spark context +# Initialize SparkModel from tensorflow.keras model and Spark context spark_model = SparkMLlibModel(model=model, frequency='epoch', mode='synchronous') # Train Spark model diff --git a/examples/mnist_mlp_spark.py b/examples/mnist_mlp_spark.py index 34fff8d..b5f7a68 100644 --- a/examples/mnist_mlp_spark.py +++ b/examples/mnist_mlp_spark.py @@ -1,11 +1,8 @@ -from __future__ import absolute_import -from __future__ import print_function - -from keras.datasets import mnist -from keras.models import Sequential -from keras.layers.core import Dense, Dropout, Activation -from keras.optimizers import SGD -from keras.utils import np_utils +from tensorflow.keras.datasets import mnist +from tensorflow.keras.models import Sequential +from tensorflow.keras.layers import Dense, Dropout, Activation +from tensorflow.keras.optimizers import SGD +from tensorflow.keras.utils import to_categorical from elephas.spark_model import SparkModel from elephas.utils.rdd_utils import to_simple_rdd @@ -34,8 +31,8 @@ print(x_test.shape[0], 'test samples') # Convert class vectors to binary class matrices -y_train = np_utils.to_categorical(y_train, nb_classes) -y_test = np_utils.to_categorical(y_test, nb_classes) +y_train = to_categorical(y_train, nb_classes) +y_test = to_categorical(y_test, nb_classes) model = Sequential() model.add(Dense(128, input_dim=784)) @@ -53,7 +50,7 @@ # Build RDD from numpy features and labels rdd = to_simple_rdd(sc, x_train, y_train) -# Initialize SparkModel from Keras model and Spark context +# Initialize SparkModel from tensorflow.keras model and Spark context spark_model = SparkModel(model, frequency='epoch', mode='asynchronous') # Train Spark model diff --git a/examples/mnist_mlp_spark_synchronous.py b/examples/mnist_mlp_spark_synchronous.py index 10a89c7..0798d3e 100644 --- a/examples/mnist_mlp_spark_synchronous.py +++ b/examples/mnist_mlp_spark_synchronous.py @@ -1,11 +1,8 @@ -from __future__ import absolute_import -from __future__ import print_function - -from keras.datasets import mnist -from keras.models import Sequential -from keras.layers.core import Dense, Dropout, Activation -from keras.optimizers import SGD -from keras.utils import np_utils +from tensorflow.keras.datasets import mnist +from tensorflow.keras.models import Sequential +from tensorflow.keras.layers import Dense, Dropout, Activation +from tensorflow.keras.optimizers import SGD +from tensorflow.keras.utils import to_categorical from elephas.spark_model import SparkModel from elephas.utils.rdd_utils import to_simple_rdd @@ -34,8 +31,8 @@ print(x_test.shape[0], 'test samples') # Convert class vectors to binary class matrices -y_train = np_utils.to_categorical(y_train, nb_classes) -y_test = np_utils.to_categorical(y_test, nb_classes) +y_train = to_categorical(y_train, nb_classes) +y_test = to_categorical(y_test, nb_classes) model = Sequential() model.add(Dense(128, input_dim=784)) @@ -53,7 +50,7 @@ # Build RDD from numpy features and labels rdd = to_simple_rdd(sc, x_train, y_train) -# Initialize SparkModel from Keras model and Spark context +# Initialize SparkModel from tensorflow.keras model and Spark context spark_model = SparkModel(model, mode='synchronous') # Train Spark model diff --git a/release.sh b/release.sh index 8f5d122..5b66bea 100755 --- a/release.sh +++ b/release.sh @@ -3,8 +3,7 @@ # remove old wheels sudo rm -rf dist/* -# Build Python 2 & 3 wheels for current version -sudo python2 setup.py sdist bdist_wheel +# Build Python 3 wheels for current version sudo python3 setup.py sdist bdist_wheel # Upload to PyPI with twine. Needs full "skymind" credentials in ~/.pypirc diff --git a/requirements.txt b/requirements.txt index bca255e..3c1441d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,7 @@ Flask==1.0.2 hyperas==0.4 -Keras==2.2.2 -Keras-Applications==1.0.4 -Keras-Preprocessing==1.0.2 -numpy==1.14.5 -pyspark==2.4.0 -six==1.11.0 -tensorflow==1.15.4 -networkx==1.1 -pydl4j>=0.1.3 \ No newline at end of file +Keras==2.2.5 +pyspark +tensorflow<2.2 +pydl4j>=0.1.3 +h5py==2.10.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 9d5ec19..ae3ee46 100644 --- a/setup.py +++ b/setup.py @@ -1,18 +1,24 @@ -from __future__ import absolute_import from setuptools import setup from setuptools import find_packages setup(name='elephas', - version='0.4.3', + version='1.0.0', description='Deep learning on Spark with Keras', - url='http://github.com/maxpumperla/elephas', - download_url='https://github.com/maxpumperla/elephas/tarball/0.4.3', - author='Max Pumperla', - author_email='max.pumperla@googlemail.com', - install_requires=['cython', 'tensorflow', 'keras', 'hyperas', 'flask', 'six', 'pyspark'], + url='http://github.com/danielenricocahall/elephas', + download_url='https://github.com/danielenricocahall/elephas/tarball/1.0.0', + author='Daniel Cahall', + author_email='danielenricocahall@gmail.com', + install_requires=['cython', + 'tensorflow<2.2', + 'keras==2.2.5', + 'hyperas', + 'flask', + 'six', + 'h5py==2.10.0', + 'pyspark<=3.1'], extras_require={ 'java': ['pydl4j>=0.1.3'], - 'tests': ['pytest', 'pytest-pep8', 'pytest-cov', 'mock'] + 'tests': ['pytest', 'pytest-pep8', 'pytest-cov', 'pytest-spark', 'mock'] }, packages=find_packages(), license='MIT', @@ -23,7 +29,5 @@ 'Environment :: Console', 'License :: OSI Approved :: Apache Software License', 'Operating System :: OS Independent', - 'Programming Language :: Python', - 'Programming Language :: Python :: 2', 'Programming Language :: Python :: 3' ]) diff --git a/tests/conftest.py b/tests/conftest.py index 961724a..416cb99 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,41 +1,64 @@ -from pyspark import SparkContext, SparkConf -from pyspark.sql import SQLContext import pytest -import logging - - -def quiet_py4j(): - """ turn down spark logging for the test context """ - logger = logging.getLogger('py4j') - logger.setLevel(logging.WARN) - - -@pytest.fixture(scope="session") -def spark_context(request): - """ fixture for creating a SparkContext - Args: - request: pytest.FixtureRequest object - """ - conf = (SparkConf().setMaster("local[2]").setAppName( - "pytest-pyspark-local-testing")) - sc = SparkContext(conf=conf) - request.addfinalizer(lambda: sc.stop()) - - quiet_py4j() - return sc - - -@pytest.fixture(scope="session") -def sql_context(request): - """ fixture for creating a Spark SQLContext - Args: - request: pytest.FixtureRequest object - """ - conf = (SparkConf().setMaster("local[2]").setAppName( - "pytest-pyspark-local-testing")) - sc = SparkContext(conf=conf) - sql_context = SQLContext(sc) - request.addfinalizer(lambda: sc.stop()) - - quiet_py4j() - return sql_context +from tensorflow.keras import Sequential, Input, Model +from tensorflow.keras.datasets import mnist, boston_housing +from tensorflow.keras.layers import Dense, Activation, Dropout +from tensorflow.keras.utils import to_categorical + + +@pytest.fixture +def classification_model(): + model = Sequential() + model.add(Dense(128, input_dim=784)) + model.add(Activation('relu')) + model.add(Dropout(0.2)) + model.add(Dense(128)) + model.add(Activation('relu')) + model.add(Dropout(0.2)) + model.add(Dense(10)) + model.add(Activation('softmax')) + return model + + +@pytest.fixture +def regression_model(): + model = Sequential() + model.add(Dense(64, activation='relu', input_shape=(13,))) + model.add(Dense(64, activation='relu')) + model.add(Dense(1, activation='linear')) + return model + + +@pytest.fixture +def classification_model_functional(): + input_layer = Input(shape=(784, )) + hidden = Dense(128, activation='relu')(input_layer) + dropout = Dropout(0.2)(hidden) + hidden2 = Dense(128, activation='relu')(dropout) + dropout2 = Dropout(0.2)(hidden2) + output = Dense(10, activation='softmax')(dropout2) + model = Model(inputs=input_layer, outputs=output) + return model + + +@pytest.fixture(scope='session') +def mnist_data(): + (x_train, y_train), (x_test, y_test) = mnist.load_data() + + x_train = x_train.reshape(60000, 784) + x_test = x_test.reshape(10000, 784) + x_train = x_train.astype("float32") + x_test = x_test.astype("float32") + x_train /= 255 + x_test /= 255 + + # Convert class vectors to binary class matrices + y_train = to_categorical(y_train, 10) + y_test = to_categorical(y_test, 10) + return x_train, y_train, x_test, y_test + + +@pytest.fixture(scope='session') +def boston_housing_dataset(): + (x_train, y_train), (x_test, y_test) = boston_housing.load_data() + return x_train, y_train, x_test, y_test + diff --git a/tests/integration/test_mnist_mlp_spark.py b/tests/integration/test_mnist_mlp_spark.py deleted file mode 100644 index 02d31f3..0000000 --- a/tests/integration/test_mnist_mlp_spark.py +++ /dev/null @@ -1,67 +0,0 @@ -from __future__ import absolute_import -from __future__ import print_function - -from keras.datasets import mnist -from keras.models import Sequential -from keras.layers.core import Dense, Dropout, Activation -from keras.optimizers import SGD -from keras.utils import np_utils - -from elephas.spark_model import SparkModel -from elephas.utils.rdd_utils import to_simple_rdd - -import pytest -pytest.mark.usefixtures("spark_context") - - -def test_async_mode(spark_context): - # Define basic parameters - batch_size = 64 - nb_classes = 10 - epochs = 1 - - # Load data - (x_train, y_train), (x_test, y_test) = mnist.load_data() - - x_train = x_train.reshape(60000, 784) - x_test = x_test.reshape(10000, 784) - x_train = x_train.astype("float32") - x_test = x_test.astype("float32") - x_train /= 255 - x_test /= 255 - print(x_train.shape[0], 'train samples') - print(x_test.shape[0], 'test samples') - - # Convert class vectors to binary class matrices - y_train = np_utils.to_categorical(y_train, nb_classes) - y_test = np_utils.to_categorical(y_test, nb_classes) - - model = Sequential() - model.add(Dense(128, input_dim=784)) - model.add(Activation('relu')) - model.add(Dropout(0.2)) - model.add(Dense(128)) - model.add(Activation('relu')) - model.add(Dropout(0.2)) - model.add(Dense(10)) - model.add(Activation('softmax')) - - sgd = SGD(lr=0.1) - model.compile(sgd, 'categorical_crossentropy', ['acc']) - - # Build RDD from numpy features and labels - rdd = to_simple_rdd(spark_context, x_train, y_train) - - # Initialize SparkModel from Keras model and Spark context - spark_model = SparkModel(model, frequency='epoch', mode='asynchronous') - - # Train Spark model - spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, - verbose=0, validation_split=0.1) - # Evaluate Spark model by evaluating the underlying model - score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) - assert score[1] >= 0.7 - - -if __name__ == '__main__': - pytest.main([__file__]) diff --git a/tests/integration/test_mnist_mlp_spark_synchronous.py b/tests/integration/test_mnist_mlp_spark_synchronous.py deleted file mode 100644 index 47fa542..0000000 --- a/tests/integration/test_mnist_mlp_spark_synchronous.py +++ /dev/null @@ -1,70 +0,0 @@ -from __future__ import absolute_import -from __future__ import print_function - -from keras.datasets import mnist -from keras.models import Sequential -from keras.layers.core import Dense, Dropout, Activation -from keras.optimizers import SGD -from keras.utils import np_utils - -from elephas.spark_model import SparkModel -from elephas.utils.rdd_utils import to_simple_rdd - -from pyspark import SparkContext, SparkConf - -import pytest -pytest.mark.usefixtures("spark_context") - - -def test_sync_mode(spark_context): - # Define basic parameters - batch_size = 64 - nb_classes = 10 - epochs = 10 - - # Load data - (x_train, y_train), (x_test, y_test) = mnist.load_data() - - x_train = x_train.reshape(60000, 784) - x_test = x_test.reshape(10000, 784) - x_train = x_train.astype("float32") - x_test = x_test.astype("float32") - x_train /= 255 - x_test /= 255 - print(x_train.shape[0], 'train samples') - print(x_test.shape[0], 'test samples') - - # Convert class vectors to binary class matrices - y_train = np_utils.to_categorical(y_train, nb_classes) - y_test = np_utils.to_categorical(y_test, nb_classes) - - model = Sequential() - model.add(Dense(128, input_dim=784)) - model.add(Activation('relu')) - model.add(Dropout(0.2)) - model.add(Dense(128)) - model.add(Activation('relu')) - model.add(Dropout(0.2)) - model.add(Dense(10)) - model.add(Activation('softmax')) - - sgd = SGD(lr=0.1) - model.compile(sgd, 'categorical_crossentropy', ['acc']) - - # Build RDD from numpy features and labels - rdd = to_simple_rdd(spark_context, x_train, y_train) - - # Initialize SparkModel from Keras model and Spark context - spark_model = SparkModel(model, mode='synchronous') - - # Train Spark model - spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, - verbose=2, validation_split=0.1) - - # Evaluate Spark model by evaluating the underlying model - score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) - assert score[1] >= 0.70 - - -if __name__ == '__main__': - pytest.main([__file__]) diff --git a/tests/integration/test_training.py b/tests/integration/test_training.py new file mode 100644 index 0000000..d7c7041 --- /dev/null +++ b/tests/integration/test_training.py @@ -0,0 +1,83 @@ +from tensorflow.keras.optimizers import SGD + +from elephas.spark_model import SparkModel +from elephas.utils.rdd_utils import to_simple_rdd + +import pytest + + +@pytest.mark.parametrize('mode', ['synchronous', 'asynchronous', 'hogwild']) +def test_training_modes_classification(spark_context, mode, mnist_data, classification_model): + # Define basic parameters + batch_size = 64 + epochs = 10 + + # Load data + x_train, y_train, x_test, y_test = mnist_data + x_train = x_train[:1000] + y_train = y_train[:1000] + + sgd = SGD(lr=0.1) + classification_model.compile(sgd, 'categorical_crossentropy', ['acc']) + + # Build RDD from numpy features and labels + rdd = to_simple_rdd(spark_context, x_train, y_train) + + # Initialize SparkModel from keras model and Spark context + spark_model = SparkModel(classification_model, frequency='epoch', mode=mode) + + # Train Spark model + spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, + verbose=0, validation_split=0.1) + # Evaluate Spark model by evaluating the underlying model + score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) + + assert score + + +@pytest.mark.parametrize('mode', ['synchronous', 'asynchronous', 'hogwild']) +def test_training_modes_regression(spark_context, mode, boston_housing_dataset, regression_model): + x_train, y_train, x_test, y_test = boston_housing_dataset + rdd = to_simple_rdd(spark_context, x_train, y_train) + + # Define basic parameters + batch_size = 64 + epochs = 10 + sgd = SGD(lr=0.0000001) + regression_model.compile(sgd, 'mse', ['mae']) + spark_model = SparkModel(regression_model, frequency='epoch', mode=mode) + + # Train Spark model + spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, + verbose=0, validation_split=0.1) + score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) + assert score + + +@pytest.mark.parametrize('mode', ['asynchronous', 'hogwild']) +def test_training_asynchronous_socket(spark_context, mode, mnist_data, classification_model): + # Define basic parameters + batch_size = 64 + epochs = 10 + # Load data + x_train, y_train, x_test, y_test = mnist_data + x_train = x_train[:1000] + y_train = y_train[:1000] + + sgd = SGD(lr=0.1) + classification_model.compile(sgd, 'categorical_crossentropy', ['acc']) + + # Build RDD from numpy features and labels + rdd = to_simple_rdd(spark_context, x_train, y_train) + + # Initialize SparkModel from keras model and Spark context + spark_model = SparkModel(classification_model, frequency='epoch', + mode=mode, parameter_server_mode='socket', port=4001) + + # Train Spark model + spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, + verbose=0, validation_split=0.1) + # Evaluate Spark model by evaluating the underlying model + score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) + + assert score diff --git a/tests/ml/test_adapter.py b/tests/ml/test_adapter.py index cfb4db3..89272f5 100644 --- a/tests/ml/test_adapter.py +++ b/tests/ml/test_adapter.py @@ -1,7 +1,5 @@ import numpy as np from elephas.ml import adapter -import pytest -pytest.mark.usefixtures("spark_context") def test_to_data_frame(spark_context): diff --git a/tests/parameter/test_client.py b/tests/parameter/test_client.py index 96c610a..70a87f2 100644 --- a/tests/parameter/test_client.py +++ b/tests/parameter/test_client.py @@ -1 +1,12 @@ -# TODO test clients +from unittest.mock import patch + +import pytest + +from elephas.parameter import BaseParameterClient, HttpClient, SocketClient + + +@pytest.mark.parametrize('client_type, obj', [('http', HttpClient), + ('socket', SocketClient)]) +def test_client_factory_method(client_type, obj): + with patch('elephas.parameter.client.socket'): + assert type(BaseParameterClient.get_client(client_type, 4000)) == obj diff --git a/tests/test_hyperparam.py b/tests/test_hyperparam.py index 3942094..51a5422 100644 --- a/tests/test_hyperparam.py +++ b/tests/test_hyperparam.py @@ -1,16 +1,13 @@ -import pytest from hyperopt import STATUS_OK from hyperas.distributions import choice, uniform import six.moves.cPickle as pickle from elephas.hyperparam import HyperParamModel -pytest.mark.usefixtures("spark_context") - def data(): - from keras.datasets import mnist - from keras.utils import np_utils + from tensorflow.keras.datasets import mnist + from tensorflow.keras.utils import to_categorical (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train = x_train.reshape(60000, 784) x_test = x_test.reshape(10000, 784) @@ -19,15 +16,15 @@ def data(): x_train /= 255 x_test /= 255 nb_classes = 10 - y_train = np_utils.to_categorical(y_train, nb_classes) - y_test = np_utils.to_categorical(y_test, nb_classes) + y_train = to_categorical(y_train, nb_classes) + y_test = to_categorical(y_test, nb_classes) return x_train, y_train, x_test, y_test def model(x_train, y_train, x_test, y_test): - from keras.models import Sequential - from keras.layers.core import Dense, Dropout, Activation - from keras.optimizers import RMSprop + from tensorflow.keras.models import Sequential + from tensorflow.keras.layers import Dense, Dropout, Activation + from tensorflow.keras.optimizers import RMSprop keras_model = Sequential() keras_model.add(Dense(512, input_shape=(784,))) @@ -49,11 +46,10 @@ def model(x_train, y_train, x_test, y_test): verbose=2, validation_data=(x_test, y_test)) score, acc = keras_model.evaluate(x_test, y_test, verbose=0) - print('Test accuracy:', acc) return {'loss': -acc, 'status': STATUS_OK, 'model': keras_model.to_yaml(), 'weights': pickle.dumps(keras_model.get_weights())} def test_hyper_param_model(spark_context): hyperparam_model = HyperParamModel(spark_context) - hyperparam_model.minimize(model=model, data=data, max_evals=1) + assert hyperparam_model.minimize(model=model, data=data, max_evals=1) diff --git a/tests/test_ml_model.py b/tests/test_ml_model.py index 0eb9ee3..5dec0ed 100644 --- a/tests/test_ml_model.py +++ b/tests/test_ml_model.py @@ -1,71 +1,40 @@ -from __future__ import absolute_import -from __future__ import print_function - -from keras.datasets import mnist -from keras.models import Sequential -from keras.layers.core import Dense, Dropout, Activation -from keras.utils import np_utils -from keras import optimizers +import pytest +from tensorflow.keras import optimizers -from elephas.ml_model import ElephasEstimator, load_ml_estimator, ElephasTransformer, load_ml_transformer +from elephas.ml_model import ElephasEstimator, load_ml_estimator, ElephasTransformer, load_ml_transformer, \ + ModelType, LossModelTypeMapper from elephas.ml.adapter import to_data_frame -from pyspark.mllib.evaluation import MulticlassMetrics +from pyspark.mllib.evaluation import MulticlassMetrics, RegressionMetrics from pyspark.ml import Pipeline -import pytest -pytest.mark.usefixtures("spark_context") - -# Define basic parameters -batch_size = 64 -nb_classes = 10 -epochs = 1 - -# Load data -(x_train, y_train), (x_test, y_test) = mnist.load_data() - -x_train = x_train.reshape(60000, 784)[:1000] -x_test = x_test.reshape(10000, 784) -x_train = x_train.astype("float32") -x_test = x_test.astype("float32") -x_train /= 255 -x_test /= 255 -print(x_train.shape[0], 'train samples') -print(x_test.shape[0], 'test samples') - -# Convert class vectors to binary class matrices -y_train = np_utils.to_categorical(y_train, nb_classes) -y_test = np_utils.to_categorical(y_test, nb_classes) - -model = Sequential() -model.add(Dense(128, input_dim=784)) -model.add(Activation('relu')) -model.add(Dropout(0.2)) -model.add(Dense(128)) -model.add(Activation('relu')) -model.add(Dropout(0.2)) -model.add(Dense(10)) -model.add(Activation('softmax')) - - -def test_serialization_transformer(): + +def test_serialization_transformer(classification_model): transformer = ElephasTransformer() - transformer.set_keras_model_config(model.to_yaml()) + transformer.set_keras_model_config(classification_model.to_yaml()) transformer.save("test.h5") - load_ml_transformer("test.h5") + loaded_model = load_ml_transformer("test.h5") + assert loaded_model.get_model().to_yaml() == classification_model.to_yaml() -def test_serialization_estimator(): +def test_serialization_estimator(classification_model): estimator = ElephasEstimator() - estimator.set_keras_model_config(model.to_yaml()) + estimator.set_keras_model_config(classification_model.to_yaml()) estimator.set_loss("categorical_crossentropy") estimator.save("test.h5") - load_ml_estimator("test.h5") + loaded_model = load_ml_estimator("test.h5") + assert loaded_model.get_model().to_yaml() == classification_model.to_yaml() -def test_spark_ml_model(spark_context): +def test_spark_ml_model_classification(spark_context, classification_model, mnist_data): + batch_size = 64 + nb_classes = 10 + epochs = 1 + x_train, y_train, x_test, y_test = mnist_data + x_train = x_train[:1000] + y_train = y_train[:1000] df = to_data_frame(spark_context, x_train, y_train, categorical=True) test_df = to_data_frame(spark_context, x_test, y_test, categorical=True) @@ -74,7 +43,7 @@ def test_spark_ml_model(spark_context): # Initialize Spark ML Estimator estimator = ElephasEstimator() - estimator.set_keras_model_config(model.to_yaml()) + estimator.set_keras_model_config(classification_model.to_yaml()) estimator.set_optimizer_config(sgd_conf) estimator.set_mode("synchronous") estimator.set_loss("categorical_crossentropy") @@ -96,5 +65,90 @@ def test_spark_ml_model(spark_context): prediction_and_label = pnl.rdd.map(lambda row: (row.label, row.prediction)) metrics = MulticlassMetrics(prediction_and_label) - print(metrics.precision()) - print(metrics.recall()) + print(metrics.accuracy) + + +def test_functional_model(spark_context, classification_model_functional, mnist_data): + batch_size = 64 + epochs = 1 + + x_train, y_train, x_test, y_test = mnist_data + x_train = x_train[:1000] + y_train = y_train[:1000] + df = to_data_frame(spark_context, x_train, y_train, categorical=True) + test_df = to_data_frame(spark_context, x_test, y_test, categorical=True) + + sgd = optimizers.SGD() + sgd_conf = optimizers.serialize(sgd) + estimator = ElephasEstimator() + estimator.set_keras_model_config(classification_model_functional.to_yaml()) + estimator.set_optimizer_config(sgd_conf) + estimator.set_mode("synchronous") + estimator.set_loss("categorical_crossentropy") + estimator.set_metrics(['acc']) + estimator.set_epochs(epochs) + estimator.set_batch_size(batch_size) + estimator.set_validation_split(0.1) + estimator.set_categorical_labels(True) + estimator.set_nb_classes(10) + pipeline = Pipeline(stages=[estimator]) + fitted_pipeline = pipeline.fit(df) + prediction = fitted_pipeline.transform(test_df) + pnl = prediction.select("label", "prediction") + pnl.show(100) + + prediction_and_label = pnl.rdd.map(lambda row: (row.label, row.prediction)) + metrics = MulticlassMetrics(prediction_and_label) + print(metrics.accuracy) + + +def test_regression_model(spark_context, regression_model, boston_housing_dataset): + batch_size = 64 + epochs = 10 + + x_train, y_train, x_test, y_test = boston_housing_dataset + df = to_data_frame(spark_context, x_train, y_train) + test_df = to_data_frame(spark_context, x_test, y_test) + + sgd = optimizers.SGD(lr=0.00001) + sgd_conf = optimizers.serialize(sgd) + estimator = ElephasEstimator() + estimator.set_keras_model_config(regression_model.to_yaml()) + estimator.set_optimizer_config(sgd_conf) + estimator.set_mode("synchronous") + estimator.set_loss("mae") + estimator.set_metrics(['mae']) + estimator.set_epochs(epochs) + estimator.set_batch_size(batch_size) + estimator.set_validation_split(0.01) + estimator.set_categorical_labels(False) + + pipeline = Pipeline(stages=[estimator]) + fitted_pipeline = pipeline.fit(df) + prediction = fitted_pipeline.transform(test_df) + pnl = prediction.select("label", "prediction") + pnl.show(100) + + prediction_and_observations = pnl.rdd.map(lambda row: (row.label, row.prediction)) + metrics = RegressionMetrics(prediction_and_observations) + print(metrics.r2) + + +@pytest.mark.parametrize('loss, model_type', [('binary_crossentropy', ModelType.CLASSIFICATION), + ('mean_squared_error', ModelType.REGRESSION), + ('categorical_crossentropy', ModelType.CLASSIFICATION), + ('mean_absolute_error', ModelType.REGRESSION)]) +def test_model_type_mapper(loss, model_type): + assert LossModelTypeMapper().get_model_type(loss) == model_type + + +def test_model_type_mapper_custom(): + LossModelTypeMapper().register_loss('test', ModelType.REGRESSION) + assert LossModelTypeMapper().get_model_type('test') == ModelType.REGRESSION + + +def test_model_type_mapper_custom_callable(): + def custom_loss(y_true, y_pred): + return y_true - y_pred + LossModelTypeMapper().register_loss(custom_loss, ModelType.REGRESSION) + assert LossModelTypeMapper().get_model_type('custom_loss') == ModelType.REGRESSION diff --git a/tests/test_mllib_model.py b/tests/test_mllib_model.py index d2b3006..01afc19 100644 --- a/tests/test_mllib_model.py +++ b/tests/test_mllib_model.py @@ -1,66 +1,40 @@ -from keras.datasets import mnist -from keras.models import Sequential -from keras.layers.core import Dense, Dropout, Activation -from keras.optimizers import RMSprop -from keras.utils import np_utils + +from tensorflow.keras.optimizers import RMSprop from elephas.spark_model import SparkMLlibModel, load_spark_model from elephas.utils.rdd_utils import to_labeled_point -import pytest -pytest.mark.usefixtures("spark_context") - # Define basic parameters batch_size = 64 nb_classes = 10 epochs = 3 -# Load data -(x_train, y_train), (x_test, y_test) = mnist.load_data() - -x_train = x_train.reshape(60000, 784)[:1000] -x_test = x_test.reshape(10000, 784) -x_train = x_train.astype("float32") -x_test = x_test.astype("float32") -x_train /= 255 -x_test /= 255 -print(x_train.shape[0], 'train samples') -print(x_test.shape[0], 'test samples') - -# Convert class vectors to binary class matrices -y_train = np_utils.to_categorical(y_train, nb_classes) -y_test = np_utils.to_categorical(y_test, nb_classes) - -model = Sequential() -model.add(Dense(128, input_dim=784)) -model.add(Activation('relu')) -model.add(Dropout(0.2)) -model.add(Dense(128)) -model.add(Activation('relu')) -model.add(Dropout(0.2)) -model.add(Dense(10)) -model.add(Activation('softmax')) - # Compile model -rms = RMSprop() -model.compile(rms, 'categorical_crossentropy', ['acc']) -def test_serialization(): +def test_serialization(classification_model): + rms = RMSprop() + classification_model.compile(rms, 'categorical_crossentropy', ['acc']) spark_model = SparkMLlibModel( - model, frequency='epoch', mode='synchronous', num_workers=2) + classification_model, frequency='epoch', mode='synchronous', num_workers=2) spark_model.save("test.h5") - load_spark_model("test.h5") + loaded_model = load_spark_model("test.h5") + assert loaded_model.master_network.to_yaml() -def test_mllib_model(spark_context): +def test_mllib_model(spark_context, classification_model, mnist_data): + rms = RMSprop() + classification_model.compile(rms, 'categorical_crossentropy', ['acc']) + x_train, y_train, x_test, y_test = mnist_data + x_train = x_train[:1000] + y_train = y_train[:1000] # Build RDD from numpy features and labels lp_rdd = to_labeled_point(spark_context, x_train, y_train, categorical=True) - # Initialize SparkModel from Keras model and Spark context + # Initialize SparkModel from tensorflow.keras model and Spark context spark_model = SparkMLlibModel( - model=model, frequency='epoch', mode='synchronous') + model=classification_model, frequency='epoch', mode='synchronous') # Train Spark model spark_model.fit(lp_rdd, epochs=5, batch_size=32, verbose=0, @@ -68,4 +42,4 @@ def test_mllib_model(spark_context): # Evaluate Spark model by evaluating the underlying model score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) - print('Test accuracy:', score[1]) + assert score \ No newline at end of file diff --git a/tests/test_model_serialization.py b/tests/test_model_serialization.py index fbed64c..d656f31 100644 --- a/tests/test_model_serialization.py +++ b/tests/test_model_serialization.py @@ -1,30 +1,15 @@ -from __future__ import absolute_import -from __future__ import print_function import pytest -from keras.models import Sequential, Model -from keras.layers import Dense, Dropout, Activation, Input +from tensorflow.keras.models import Model +from tensorflow.keras.layers import Dense, Input from elephas.spark_model import SparkModel -def test_sequential_serialization(): - # Create Spark context - pytest.mark.usefixtures("spark_context") - - seq_model = Sequential() - seq_model.add(Dense(128, input_dim=784)) - seq_model.add(Activation('relu')) - seq_model.add(Dropout(0.2)) - seq_model.add(Dense(128)) - seq_model.add(Activation('relu')) - seq_model.add(Dropout(0.2)) - seq_model.add(Dense(10)) - seq_model.add(Activation('softmax')) - - seq_model.compile( +def test_sequential_serialization(spark_context, classification_model): + classification_model.compile( optimizer="sgd", loss="categorical_crossentropy", metrics=["acc"]) - spark_model = SparkModel(seq_model, frequency='epoch', mode='synchronous') + spark_model = SparkModel(classification_model, frequency='epoch', mode='synchronous') spark_model.save("elephas_sequential.h5") @@ -51,7 +36,7 @@ def test_model_serialization(): @pytest.mark.skip(reason="not feasible on travis right now") def test_java_avg_serde(): - from elephas.dl4j import ParameterAveragingModel, ParameterSharingModel + from elephas.dl4j import ParameterAveragingModel inputs = Input(shape=(784,)) x = Dense(64, activation='relu')(inputs) @@ -73,7 +58,7 @@ def test_java_avg_serde(): @pytest.mark.skip(reason="not feasible on travis right now") def test_java_sharing_serde(): - from elephas.dl4j import ParameterAveragingModel, ParameterSharingModel + from elephas.dl4j import ParameterSharingModel inputs = Input(shape=(784,)) x = Dense(64, activation='relu')(inputs) @@ -91,6 +76,3 @@ def test_java_sharing_serde(): threshold_step=1e-5, collect_stats=False, save_file='temp.h5') spark_model.save("java_param_sharing_model.h5") - -if __name__ == '__main__': - pytest.main([__file__]) diff --git a/tests/test_spark_model.py b/tests/test_spark_model.py deleted file mode 100644 index 2829967..0000000 --- a/tests/test_spark_model.py +++ /dev/null @@ -1,84 +0,0 @@ -from __future__ import absolute_import -from __future__ import print_function -import pytest - -from keras.datasets import mnist -from keras.models import Sequential -from keras.layers.core import Dense, Dropout, Activation -from keras.utils import np_utils - -from elephas.spark_model import SparkModel -from elephas.utils.rdd_utils import to_simple_rdd - - -# Define basic parameters -batch_size = 64 -nb_classes = 10 -epochs = 1 - -# Create Spark context -pytest.mark.usefixtures("spark_context") - - -# Load data -(x_train, y_train), (x_test, y_test) = mnist.load_data() - -x_train = x_train.reshape(60000, 784) -x_test = x_test.reshape(10000, 784) -x_train = x_train.astype("float32") -x_test = x_test.astype("float32") -x_train /= 255 -x_test /= 255 -print(x_train.shape[0], 'train samples') -print(x_test.shape[0], 'test samples') - -# Convert class vectors to binary class matrices -y_train = np_utils.to_categorical(y_train, nb_classes) -y_test = np_utils.to_categorical(y_test, nb_classes) - -model = Sequential() -model.add(Dense(128, input_dim=784)) -model.add(Activation('relu')) -model.add(Dropout(0.2)) -model.add(Dense(128)) -model.add(Activation('relu')) -model.add(Dropout(0.2)) -model.add(Dense(10)) -model.add(Activation('softmax')) - -model.compile(optimizer="sgd", - loss="categorical_crossentropy", metrics=["acc"]) - - -def test_spark_model_end_to_end(spark_context): - rdd = to_simple_rdd(spark_context, x_train, y_train) - - # sync epoch - spark_model = SparkModel(model, frequency='epoch', - mode='synchronous', num_workers=2) - spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, - verbose=2, validation_split=0.1) - score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) - print('Test accuracy:', score[1]) - - # sync batch - spark_model = SparkModel(model, frequency='batch', - mode='synchronous', num_workers=2) - spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, - verbose=2, validation_split=0.1) - score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) - print('Test accuracy:', score[1]) - - # async epoch - spark_model = SparkModel(model, frequency='epoch', mode='asynchronous') - spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, - verbose=2, validation_split=0.1) - score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) - print('Test accuracy:', score[1]) - - # hog wild epoch - spark_model = SparkModel(model, frequency='epoch', mode='hogwild') - spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, - verbose=2, validation_split=0.1) - score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) - print('Test accuracy:', score[1]) diff --git a/tests/test_worker.py b/tests/test_worker.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/utils/test_functional_utils.py b/tests/utils/test_functional_utils.py index a388b91..722f7c1 100644 --- a/tests/utils/test_functional_utils.py +++ b/tests/utils/test_functional_utils.py @@ -2,7 +2,6 @@ import numpy as np from elephas.utils import functional_utils -pytest.mark.usefixtures("spark_context") def test_add_params(): diff --git a/tests/utils/test_rdd_utils.py b/tests/utils/test_rdd_utils.py index 4b96298..7709390 100644 --- a/tests/utils/test_rdd_utils.py +++ b/tests/utils/test_rdd_utils.py @@ -1,9 +1,6 @@ -import pytest import numpy as np from elephas.utils import rdd_utils -pytest.mark.usefixtures("spark_context") - def test_to_simple_rdd(spark_context): features = np.ones((5, 10)) diff --git a/tests/utils/test_serialization.py b/tests/utils/test_serialization.py index a9f7319..c058da4 100644 --- a/tests/utils/test_serialization.py +++ b/tests/utils/test_serialization.py @@ -1,16 +1,19 @@ import pytest -from keras.models import Sequential +from tensorflow.keras.models import Sequential from elephas.utils import serialization def test_model_to_dict(): model = Sequential() + model.build((1,)) dict_model = serialization.model_to_dict(model) - assert dict_model.keys() == ['model', 'weights'] + assert list(dict_model.keys()) == ['model', 'weights'] def test_dict_to_model(): model = Sequential() + model.build((1,)) + dict_model = serialization.model_to_dict(model) recovered = serialization.dict_to_model(dict_model)