Skip to content

Commit

Permalink
merge danny fork
Browse files Browse the repository at this point in the history
  • Loading branch information
danielenricocahall committed Jan 14, 2021
2 parents 92ff63b + 4f3b16e commit 588f722
Show file tree
Hide file tree
Showing 45 changed files with 617 additions and 656 deletions.
22 changes: 12 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
sudo: required
dist: trusty
dist: bionic
language: python
python:
- "2.7"
- "3.4"
- "3.7"
before_install:
- sudo add-apt-repository -y ppa:openjdk-r/ppa
- sudo apt-get -qq update
- sudo apt-get install -y openjdk-8-jdk --no-install-recommends
- sudo update-java-alternatives -s java-1.8.0-openjdk-amd64
- export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
env:
- SPARK_LOCAL_IP=0.0.0.0
install:
- if [[ "$TRAVIS_PYTHON_VERSION" == "2.7" ]]; then
wget https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh -O miniconda.sh;
else
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh;
fi
- wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh;
- bash miniconda.sh -b -p $HOME/miniconda
- export PATH="$HOME/miniconda/bin:$PATH"
- hash -r
Expand All @@ -22,7 +25,6 @@ install:
- pip install -e .[tests]

script:
- python -c "import keras.backend"
- py.test --pep8 -m pep8 tests/
- pytest tests/
after_success:
- coveralls
24 changes: 11 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
This project is now archived. It's been fun working on it, but it's time for me to move on. Thank you for all the support and feedback over the last couple of years. Daniel Cahall has kindly agreed to continue development for this project [in his fork](https://github.com/danielenricocahall/elephas) and will handle future releases. Thank you! :v:
---

# Elephas: Distributed Deep Learning with Keras & Spark

Expand Down Expand Up @@ -75,9 +73,9 @@ sc = SparkContext(conf=conf)

Next, you define and compile a Keras model
```python
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import SGD
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.optimizers import SGD
model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
Expand All @@ -98,7 +96,7 @@ rdd = to_simple_rdd(sc, x_train, y_train)

The basic model in Elephas is the `SparkModel`. You initialize a `SparkModel` by passing in a compiled Keras model,
an update frequency and a parallelization mode. After that you can simply `fit` the model on your RDD. Elephas `fit`
has the same options as a Keras model, so you can pass `epochs`, `batch_size` etc. as you're used to from Keras.
has the same options as a Keras model, so you can pass `epochs`, `batch_size` etc. as you're used to from tensorflow.keras.

```python
from elephas.spark_model import SparkModel
Expand Down Expand Up @@ -181,8 +179,8 @@ from hyperopt import STATUS_OK
from hyperas.distributions import choice, uniform

def data():
from keras.datasets import mnist
from keras.utils import np_utils
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.reshape(60000, 784)
x_test = x_test.reshape(10000, 784)
Expand All @@ -191,15 +189,15 @@ def data():
x_train /= 255
x_test /= 255
nb_classes = 10
y_train = np_utils.to_categorical(y_train, nb_classes)
y_test = np_utils.to_categorical(y_test, nb_classes)
y_train = to_categorical(y_train, nb_classes)
y_test = to_categorical(y_test, nb_classes)
return x_train, y_train, x_test, y_test


def model(x_train, y_train, x_test, y_test):
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import RMSprop
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.optimizers import RMSprop

model = Sequential()
model.add(Dense(512, input_shape=(784,)))
Expand Down
3 changes: 0 additions & 3 deletions docs/autogen.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# -*- coding: utf-8 -*-

from __future__ import print_function
from __future__ import unicode_literals

import re
import inspect
import os
Expand Down
4 changes: 1 addition & 3 deletions elephas/hyperparam.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from __future__ import print_function
from __future__ import absolute_import
from hyperopt import Trials, rand
from hyperas.ensemble import VotingModel
from hyperas.optim import get_hyperopt_model_string, base_minimizer
import numpy as np
from keras.models import model_from_yaml
from tensorflow.keras.models import model_from_yaml
import six.moves.cPickle as pickle
from six.moves import range
# depend on hyperas, boto etc. is optional
Expand Down
2 changes: 0 additions & 2 deletions elephas/ml/adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from pyspark.sql import SQLContext
from pyspark.mllib.regression import LabeledPoint
from ..utils.rdd_utils import from_labeled_point, to_labeled_point, lp_to_simple_rdd
Expand Down
1 change: 0 additions & 1 deletion elephas/ml/params.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from __future__ import absolute_import
from pyspark.ml.param.shared import Param, Params


Expand Down
102 changes: 90 additions & 12 deletions elephas/ml_model.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
from __future__ import absolute_import, print_function
from enum import Enum

import tensorflow.keras as keras
import numpy as np
import copy
import h5py
import json

from pyspark.ml.param.shared import HasOutputCol, HasFeaturesCol, HasLabelCol
from pyspark import keyword_only
from pyspark import keyword_only, RDD
from pyspark.ml import Estimator, Model
from pyspark.sql.types import StringType, DoubleType, StructField

from keras.models import model_from_yaml
from keras.optimizers import get as get_optimizer

from tensorflow.keras.models import model_from_yaml
from tensorflow.keras.optimizers import get as get_optimizer

from .spark_model import SparkModel
from .utils.rdd_utils import from_vector
Expand All @@ -30,6 +30,7 @@ class ElephasEstimator(Estimator, HasCategoricalLabels, HasValidationSplit, HasK
Returns a trained model in form of a SparkML Model, which is also a Transformer.
"""

@keyword_only
def __init__(self, **kwargs):
super(ElephasEstimator, self).__init__()
Expand Down Expand Up @@ -69,6 +70,9 @@ def set_params(self, **kwargs):
"""
return self._set(**kwargs)

def get_model(self):
return model_from_yaml(self.get_keras_model_config())

def _fit(self, df):
"""Private fit method of the Estimator, which trains the model.
"""
Expand Down Expand Up @@ -96,7 +100,8 @@ def _fit(self, df):
return ElephasTransformer(labelCol=self.getLabelCol(),
outputCol='prediction',
keras_model_config=spark_model.master_network.to_yaml(),
weights=weights)
weights=weights,
loss=loss)


def load_ml_estimator(file_name):
Expand All @@ -110,12 +115,16 @@ class ElephasTransformer(Model, HasKerasModelConfig, HasLabelCol, HasOutputCol):
"""SparkML Transformer implementation. Contains a trained model,
with which new feature data can be transformed into labels.
"""

@keyword_only
def __init__(self, **kwargs):
super(ElephasTransformer, self).__init__()
if "weights" in kwargs.keys():
# Strip model weights from parameters to init Transformer
self.weights = kwargs.pop('weights')
if "loss" in kwargs.keys():
# Extract loss from parameters
self.model_type = LossModelTypeMapper().get_model_type(kwargs.pop('loss'))
self.set_params(**kwargs)

@keyword_only
Expand Down Expand Up @@ -152,16 +161,12 @@ def _transform(self, df):
new_schema.add(StructField(output_col, StringType(), True))

rdd = df.rdd.coalesce(1)
features = np.asarray(
rdd.map(lambda x: from_vector(x.features)).collect())
features = np.asarray(rdd.map(lambda x: from_vector(x.features)).collect())
# Note that we collect, since executing this on the rdd would require model serialization once again
model = model_from_yaml(self.get_keras_model_config())
model.set_weights(self.weights.value)
predictions = rdd.ctx.parallelize(
model.predict_classes(features)).coalesce(1)
predictions = predictions.map(lambda x: tuple(str(x)))

results_rdd = rdd.zip(predictions).map(lambda x: x[0] + x[1])
results_rdd = compute_predictions(model, self.model_type, rdd, features)
results_df = df.sql_ctx.createDataFrame(results_rdd, new_schema)
results_df = results_df.withColumn(
output_col, results_df[output_col].cast(DoubleType()))
Expand All @@ -176,3 +181,76 @@ def load_ml_transformer(file_name):
elephas_conf = json.loads(f.attrs.get('distributed_config'))
config = elephas_conf.get('config')
return ElephasTransformer(**config)


class ModelType(Enum):
CLASSIFICATION = 1
REGRESSION = 2


class _Singleton(type):
""" A metaclass that creates a Singleton base class when called. """
_instances = {}

def __call__(cls, *args):
if cls not in cls._instances:
cls._instances[cls] = super(_Singleton, cls).__call__(*args)
return cls._instances[cls]


class Singleton(_Singleton('SingletonMeta', (object,), {})):
pass


class LossModelTypeMapper(Singleton):
"""
Mapper for losses -> model type
"""
def __init__(self):
loss_to_model_type = {}
loss_to_model_type.update(
{'mean_squared_error': ModelType.REGRESSION,
'mean_absolute_error': ModelType.REGRESSION,
'mse': ModelType.REGRESSION,
'mae': ModelType.REGRESSION,
'cosine_proximity': ModelType.REGRESSION,
'mean_absolute_percentage_error': ModelType.REGRESSION,
'mean_squared_logarithmic_error': ModelType.REGRESSION,
'logcosh': ModelType.REGRESSION,
'binary_crossentropy': ModelType.CLASSIFICATION,
'categorical_crossentropy': ModelType.CLASSIFICATION,
'sparse_categorical_crossentropy': ModelType.CLASSIFICATION})
self.__mapping = loss_to_model_type

def get_model_type(self, loss):
return self.__mapping.get(loss)

def register_loss(self, loss, model_type):
if callable(loss):
loss = loss.__name__
self.__mapping.update({loss: model_type})


def compute_predictions(model, model_type, rdd, features):
predict_function = determine_predict_function(model, model_type)
predictions = rdd.ctx.parallelize(predict_function(features)).coalesce(1)
if model_type == ModelType.CLASSIFICATION:
predictions = predictions.map(lambda x: tuple(str(x)))
else:
predictions = predictions.map(lambda x: tuple([float(x)]))
results_rdd = rdd.zip(predictions).map(lambda x: x[0] + x[1])
return results_rdd


def determine_predict_function(model,
model_type):
if model_type == ModelType.CLASSIFICATION:
if isinstance(model, keras.models.Sequential):
predict_function = model.predict_classes
else:
# support for functional API
predict_function = lambda x: model.predict(x).argmax(axis=-1)
else:
predict_function = model.predict

return predict_function
2 changes: 0 additions & 2 deletions elephas/mllib/adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from pyspark.mllib.linalg import Matrices, Vectors


Expand Down
48 changes: 28 additions & 20 deletions elephas/parameter/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import absolute_import
from __future__ import print_function

import abc

import numpy as np
import socket
import six.moves.cPickle as pickle


try:
import urllib.request as urllib2
except ImportError:
Expand All @@ -13,16 +13,20 @@
from ..utils.sockets import determine_master, send, receive


class BaseParameterClient(object):
class BaseParameterClient(abc.ABC):
"""BaseParameterClient
Parameter-server clients can do two things: retrieve the current parameters
from the corresponding server, and send updates (`delta`) to the server.
"""
__metaclass__ = abc.ABCMeta
client_type = 'base'

def __init__(self):
raise NotImplementedError
@classmethod
def get_client(cls, client_type, port=4000):
try:
return next(cl for cl in cls.__subclasses__() if cl.client_type == client_type)(port)
except StopIteration:
raise ValueError("Parameter server mode has to be either `http` or `socket`, "
"got {}".format(client_type))

@abc.abstractmethod
def update_parameters(self, delta):
Expand All @@ -39,14 +43,14 @@ def get_parameters(self):

class HttpClient(BaseParameterClient):
"""HttpClient
Uses HTTP protocol for communication with its corresponding parameter server,
namely HttpServer. The HTTP server provides two endpoints, `/parameters` to
get parameters and `/update` to update the server's parameters.
"""

def __init__(self, port=4000):
client_type = 'http'

def __init__(self, port=4000):
self.master_url = determine_master(port=port)
self.headers = {'Content-Type': 'application/elephas'}

Expand All @@ -64,23 +68,27 @@ def update_parameters(self, delta):

class SocketClient(BaseParameterClient):
"""SocketClient
Uses a socket connection to communicate with an instance of `SocketServer`.
The socket server listens to two types of events. Those with a `g` prefix
indicate a get-request, those with a `u` indicate a parameter update.
"""
client_type = 'socket'

def __init__(self, port=4000):

host = self.master_url.split(':')[0]
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((host, port))
self.port = port

def get_parameters(self):
self.socket.sendall(b'g')
return np.asarray(receive(self.socket))
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
host = determine_master(port=self.port).split(':')[0]
sock.connect((host, self.port))
sock.sendall(b'g')
data = np.asarray(receive(sock))
return data

def update_parameters(self, delta):
data = {'delta': delta}
self.socket.sendall(b'u')
send(self.socket, data)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
host = determine_master(port=self.port).split(':')[0]
sock.connect((host, self.port))
data = {'delta': delta}
sock.sendall(b'u')
send(sock, data)

0 comments on commit 588f722

Please sign in to comment.