Skip to content

Commit

Permalink
improve stability
Browse files Browse the repository at this point in the history
  • Loading branch information
miraculixx committed Jul 11, 2019
1 parent 0fc8a36 commit 2272564
Show file tree
Hide file tree
Showing 15 changed files with 401 additions and 48 deletions.
2 changes: 1 addition & 1 deletion omegaml/backends/tensorflow/__init__.py
@@ -1,4 +1,4 @@
from .tfestimatormodel import TFEstimatorModelBackend, TFEstimatorModel
from .tfkeras import TensorflowKerasBackend
from .tfsavedmodel import TensorflowSavedModelPredictor, TensorflowSavedModelBackend
from .tfsavedmodel import TensorflowSavedModelPredictor, TensorflowSavedModelBackend, ServingInput
from .tfkerassavedmodel import TensorflowKerasSavedModelBackend
12 changes: 12 additions & 0 deletions omegaml/backends/tensorflow/tfdataset.py
@@ -0,0 +1,12 @@
from omegaml.backends.basedata import BaseDataBackend

class TFDatasetBackend(BaseDataBackend):
KIND = 'tf.dataset'

@classmethod
def supports(self, obj, name, **kwargs):
import tensorflow as tf
return isinstance(obj, )

def put(self, obj, name, attributes=None, **kwargs):
pass
28 changes: 19 additions & 9 deletions omegaml/backends/tensorflow/tfestimatormodel.py
Expand Up @@ -9,7 +9,7 @@

from omegaml.backends import BaseModelBackend

ok = lambda v: v is not None
ok = lambda v, vtype: isinstance(v, vtype)


class TFEstimatorModel(object):
Expand Down Expand Up @@ -44,7 +44,7 @@ def restore(self, model_dir):
self._model_dir = model_dir
return self

def make_input_fn(self, X, Y, batch_size=None):
def make_input_fn(self, X, Y, batch_size=1):
"""
Return a tf.data.Dataset from the input provided
Expand All @@ -64,6 +64,8 @@ def make_input_fn(self, X, Y, batch_size=None):
to the .fit/.predict methods using the input_fn= kwarg
"""
import tensorflow as tf
import pandas as pd
import numpy as np

def input_fn():
# if we have a dataset, use that
Expand All @@ -72,13 +74,19 @@ def input_fn():
return X
elif isinstance(Y, tf.data.Dataset):
return X.zip(Y)
# if we have something else, create a dataset from it
if ok(X) and ok(Y):
# if we have a dataframe, create a dataset from it
if ok(X, pd.DataFrame) and ok(Y, pd.Series):
dataset = tf.data.Dataset.from_tensor_slices((dict(X), Y))
elif ok(X):
result = dataset.batch(batch_size)
elif ok(X, pd.DataFrame):
dataset = tf.data.Dataset.from_tensor_slices(dict(X))
return dataset.batch(batch_size)
result = dataset.batch(batch_size)
else:
result = X, Y
return result

if isinstance(X, (dict, np.ndarray)):
input_fn = tf.estimator.inputs.numpy_input_fn(x=X, y=Y, num_epochs=1, shuffle=False)
return input_fn

def fit(self, X=None, Y=None, input_fn=None, batch_size=100, **kwargs):
Expand All @@ -87,7 +95,7 @@ def fit(self, X=None, Y=None, input_fn=None, batch_size=100, **kwargs):
X (Dataset|ndarray): features
Y (Dataset|ndarray): labels, optional
"""
assert (ok(X) or ok(input_fn)), "specify either X, Y or input_fn, not both"
assert (ok(X, object) or ok(input_fn, object)), "specify either X, Y or input_fn, not both"
if input_fn is None:
input_fn = self.make_input_fn(X, Y, batch_size=batch_size)
return self.estimator.train(input_fn=input_fn)
Expand All @@ -98,7 +106,7 @@ def score(self, X=None, Y=None, input_fn=None, batch_size=100, **kwargs):
X (Dataset|ndarray): features
Y (Dataset|ndarray): labels, optional
"""
assert (ok(X) or ok(input_fn)), "specify either X, Y or input_fn, not both"
assert (ok(X, object) or ok(input_fn, object)), "specify either X, Y or input_fn, not both"
if input_fn is None:
input_fn = self.make_input_fn(X, Y, batch_size=batch_size)
return self.estimator.evaluate(input_fn=input_fn)
Expand All @@ -109,7 +117,9 @@ def predict(self, X=None, Y=None, input_fn=None, batch_size=1, **kwargs):
X (Dataset|ndarray): features
Y (Dataset|ndarray): labels, optional
"""
assert (ok(X) or ok(input_fn)), "specify either X, Y or input_fn, not both"
options1 = (X is None) and (input_fn is not None)
options2 = (X is not None) and (input_fn is None)
assert options1 or options2, "specify either X, Y or input_fn, not both"
if input_fn is None:
input_fn = self.make_input_fn(X, Y, batch_size=batch_size)
return self.estimator.predict(input_fn=input_fn)
Expand Down
13 changes: 9 additions & 4 deletions omegaml/backends/tensorflow/tfkeras.py
Expand Up @@ -29,13 +29,18 @@ def _fix_model_for_saving(self, model):
import tensorflow as tf
from tensorflow.python.keras import backend as K
with K.name_scope(model.optimizer.__class__.__name__):
for i, var in enumerate(model.optimizer.weights):
name = 'variable{}'.format(i)
model.optimizer.weights[i] = tf.Variable(var, name=name)
try:
for i, var in enumerate(model.optimizer.weights):
name = 'variable{}'.format(i)
model.optimizer.weights[i] = tf.Variable(var, name=name)
except NotImplementedError:
pass

def _load_model(self, fn):
# override to implement model loading
from keras.engine.saving import load_model
import tensorflow as tf
from tensorflow import keras
load_model = keras.engine.saving.load_model
return load_model(fn)

def _load_model(self, fn):
Expand Down
1 change: 0 additions & 1 deletion omegaml/backends/tensorflow/tfkerassavedmodel.py
@@ -1,7 +1,6 @@
import tempfile

from omegaml.backends.tensorflow import TensorflowSavedModelBackend
from omegaml.util import temp_filename


class TensorflowKerasSavedModelBackend(TensorflowSavedModelBackend):
Expand Down
129 changes: 125 additions & 4 deletions omegaml/backends/tensorflow/tfsavedmodel.py
Expand Up @@ -4,9 +4,12 @@
from shutil import rmtree
from zipfile import ZipFile, ZIP_DEFLATED

import numpy as np
import tensorflow as tf
from mongoengine import GridFSProxy

from omegaml.backends import BaseModelBackend
from omegaml.backends.tensorflow.tfestimatormodel import ok


class TensorflowSavedModelPredictor(object):
Expand All @@ -17,9 +20,27 @@ class TensorflowSavedModelPredictor(object):
def __init__(self, model_dir):
from tensorflow.contrib import predictor
self.predict_fn = predictor.from_saved_model(model_dir)
self.input_names = list(self.predict_fn.feed_tensors.keys())
self.output_names = list(self.predict_fn.fetch_tensors.keys())

def _convert_to_model_input(self, X):
# coerce input into expected feature mapping
if len(self.input_names) > 1:
raise ValueError('multiple inputs not supported')
model_input = {
self.input_names[0]: X
}
return model_input

def _convert_to_model_output(self, yhat):
# coerce output into dict or array-like response
if len(self.output_names) == 1:
yhat = yhat[self.output_names[0]]
return yhat

def predict(self, X):
return self.predict_fn(X)
yhat = self.predict_fn(self._convert_to_model_input(X))
return self._convert_to_model_output(yhat)


class TensorflowSavedModelBackend(BaseModelBackend):
Expand All @@ -34,7 +55,14 @@ def supports(self, obj, name, **kwargs):
def _package_savedmodel(self, export_base_dir, filename):
fname = os.path.basename(filename)
zipfname = os.path.join(self.model_store.tmppath, fname)
export_base_dir = glob.glob(os.path.join(export_base_dir, '*'))[0]
# check if we have an intermediate directory (timestamp)
# as in export_base_dir/<timestamp>, if so, use this as the base directory
# see https://www.tensorflow.org/guide/saved_model#perform_the_export
# we need this check because not all SavedModel exports create a timestamp
# directory. e.g. keras.save_keras_model() does not, while Estimator.export_saved_model does
files = glob.glob(os.path.join(export_base_dir, '*'))
if len(files) == 1:
export_base_dir = files[0]
with ZipFile(zipfname, 'w', compression=ZIP_DEFLATED) as zipf:
for part in glob.glob(os.path.join(export_base_dir, '**'), recursive=True):
zipf.write(part, os.path.relpath(part, export_base_dir))
Expand Down Expand Up @@ -108,11 +136,104 @@ def predict(
model = self.get_model(modelname)
X = self.data_store.get(Xname)
result = model.predict(X)
if not pure_python:
result = pd.Series(result)
if pure_python:
result = result.tolist()
if rName:
result = self.data_store.put(result, rName)
return result

def fit(self, modelname, Xname, Yname=None, pure_python=True, tpu_specs=None, **kwargs):
raise ValueError('cannot fit a saved model')


class ServingInput(object):
# FIXME this is not working yet
def __init__(self, model=None, features=None, like=None, shape=None, dtype=None,
batchsize=1, from_keras=False):
"""
Helper to create serving_input_fn
Uses tf.build_raw_serving_input_receiver_fn to build a ServingInputReceiver
from the given inputs
Usage:
# use existing ndarray e.g. training or test data to specify a single input feature
ServingInput(features=['x']], like=ndarray)
# specify the dtype and shape explicitely
ServingInput(features=['x']], shape=(1, 28, 28))
# use multiple features
ServingInput(features={'f1': tf.Feature(...))
# for tf.keras models turned estimator, specify from_keras
# to ensure the input features are renamed correctly.
ServingInput(features=['x'], like=ndarray, from_keras=True)
Args:
model:
features:
like:
shape:
dtype:
batchsize:
from_keras:
"""
self.model = model
self.features = features or ['X']
self.like = like
self.shape = shape
self.dtype = dtype
self.batchsize = batchsize
self.from_keras = from_keras

def build(self):
if isinstance(self.features, dict):
input_fn = self.from_features()
elif isinstance(self.like, np.ndarray):
shape = tuple((self.batchsize, *self.like.shape[1:])) # assume (rows, *cols)
input_fn = self.from_ndarray(shape, self.like.dtype)
elif isinstance(self.shape, (list,tuple,np.ndarray)):
input_fn = self.from_ndarray(self.shape, self.dtype)
return input_fn

def __call__(self):
input_fn = self.build()
return input_fn()

def from_features(self):
input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(
self.features,
default_batch_size=self.batchsize
)
return input_fn

def from_ndarray(self, shape, dtype):
if self.from_keras:
input_layer_name = '{}_input'.format(self.features[0])
else:
input_layer_name = self.features[0]
features = {
input_layer_name: tf.placeholder(dtype=dtype, shape=shape, )
}
input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(
features,
default_batch_size=None
)
return input_fn

def from_dataframe(self, columns, input_layer_name='X',
batch_size=1, dtype=np.float32):
def serving_input_fn():
ndim = len(columns)
X_name = '{}_input'.format(input_layer_name)
placeholder = tf.placeholder(dtype=np.float32,
shape=(batch_size, ndim),
name=X_name)
receiver_tensors = {X_name: placeholder}
features = {X_name: placeholder}
return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)




7 changes: 7 additions & 0 deletions omegaml/backends/virtualobj.py
Expand Up @@ -87,6 +87,13 @@ def get(self, name, version=-1, force_python=False, lazy=False, **kwargs):
obj = dill.load(outf)
return obj

def predict(self, modelname, xName, rName, **kwargs):
# make this work as a model backend too
meta = self.model_store.metadata(modelname)
handler = self.get(modelname)
X = self.data_store.get(xName)
return handler(method='predict', data=X, meta=meta)

def virtualobj(fn):
"""
function decorator to create a virtual object handler from any
Expand Down
6 changes: 2 additions & 4 deletions omegaml/defaults.py
Expand Up @@ -44,12 +44,12 @@
#: storage backends
OMEGA_STORE_BACKENDS = {
'sklearn.joblib': 'omegaml.backends.ScikitLearnBackend',
'keras.h5': 'omegaml.backends.keras.KerasBackend',
'ndarray.bin': 'omegaml.backends.npndarray.NumpyNDArrayBackend',
'tfkeras.h5': 'omegaml.backends.tensorflow.TensorflowKerasBackend',
'tfkeras.savedmodel': 'omegaml.backends.tensorflow.TensorflowKerasSavedModelBackend',
'tf.savedmodel': 'omegaml.backends.tensorflow.TensorflowSavedModelBackend',
'tfestimator.model': 'omegaml.backends.tensorflow.TFEstimatorModelBackend',
'keras.h5': 'omegaml.backends.keras.KerasBackend',
'ndarray.bin': 'omegaml.backends.npndarray.NumpyNDArrayBackend',
'virtualobj.dill': 'omegaml.backends.virtualobj.VirtualObjectBackend',
}
#: storage mixins
Expand Down Expand Up @@ -77,8 +77,6 @@
]




# =========================================
# ----- DO NOT MODIFY BELOW THIS LINE -----
# =========================================
Expand Down
26 changes: 20 additions & 6 deletions omegaml/restapi/__init__.py
Expand Up @@ -7,7 +7,7 @@
from mongoengine import DoesNotExist

import omegaml as om
from omegaml.restapi.util import strict
from omegaml.restapi.util import strict, ensure_json_serializable
from .app import api

import numpy as np
Expand All @@ -19,11 +19,13 @@
PredictInput = strict(api).model('ModelInputSchema', {
'columns': fields.List(fields.String),
'data': fields.List(fields.Raw),
'shape': fields.List(fields.Integer),
'dataset': fields.String,
})

PredictOutput = api.model('PredictOutput', {
'model': fields.String,
'result': fields.List(fields.Raw),
'result': fields.Raw,
})

DatasetInput = api.model('DatasetInput', {
Expand Down Expand Up @@ -55,11 +57,23 @@ class ModelResource(Resource):
@api.marshal_with(PredictOutput)
def put(self, model_id):
data = api.payload.get('data')
columns = api.payload.get('columns')
df = pd.DataFrame(data)[columns]
promise = om.runtime.model(model_id).predict(df)
dataset = api.payload.get('dataset')
if data is not None:
columns = api.payload.get('columns')
shape = api.payload.get('shape')
df = pd.DataFrame(data)[columns]
if shape is not None:
assert len(columns) == 1, "only 1 column allowed to be reshaped"
col = columns[0]
df[col] = df[col].apply(lambda v: np.array(v).reshape(shape))
df = np.stack(df[col])
promise = om.runtime.model(model_id).predict(df)
elif dataset:
promise = om.runtime.model(model_id).predict(dataset)
else:
raise ValueError('require either data or dataset')
result = promise.get()
return {'model': model_id, 'result': result.tolist()}
return {'model': model_id, 'result': ensure_json_serializable(result)}


@api.route('/v1/dataset/<string:dataset_id>')
Expand Down

0 comments on commit 2272564

Please sign in to comment.