Skip to content
This repository has been archived by the owner on Apr 15, 2022. It is now read-only.

Commit

Permalink
Dbaas 3992 (#63)
Browse files Browse the repository at this point in the history
* working on metadata, cleanup other code

* added metadata function

* forgot to call

* syntax

* syntax

* case sensitive

* syntax

* case sensitive

* new table design

* syntax change
  • Loading branch information
Epstein committed Jun 22, 2020
1 parent c05d67a commit 29d0ed2
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 42 deletions.
9 changes: 9 additions & 0 deletions splicemachine/mlflow_support/constants.py
Expand Up @@ -62,3 +62,12 @@ def get_valid() -> tuple:
return (
FileExtensions.spark, FileExtensions.keras, FileExtensions.h2o, FileExtensions.sklearn
)

class ModelStatuses():
"""
Class containing names
for In Database Model Deployments
"""
deployed: str = 'DEPLOYED'
deleted: str = 'DELETED'
SUPPORTED_STATUSES = [deployed, deleted]
33 changes: 23 additions & 10 deletions splicemachine/mlflow_support/mlflow_support.py
Expand Up @@ -625,13 +625,10 @@ def _deploy_db(db_schema_name,
if library == DBLibraries.MLeap:
# Mleap needs a dataframe in order to serialize the model
df = get_df_for_mleap(mlflow._splice_context, schema_table_name, df)
model_type, classes = SparkUtils.prep_model_for_deployment(mlflow._splice_context, fitted_model, df, classes, run_id)
elif library == DBLibraries.H2OMOJO:
model_type, classes = H2OUtils.prep_model_for_deployment(mlflow._splice_context, fitted_model, classes, run_id)
elif library == DBLibraries.SKLearn:
model_type, classes = SKUtils.prep_model_for_deployment(mlflow._splice_context, fitted_model, classes, run_id, sklearn_args)
elif library == DBLibraries.Keras:
model_type, classes = KerasUtils.prep_model_for_deployment(mlflow._splice_context, fitted_model, classes, run_id, pred_threshold)

model_type, classes, model_already_exists = ModelUtils[library].prep_model_for_deployment(mlflow._splice_context,
fitted_model, classes, run_id,
df, pred_threshold, sklearn_args)


print(f'Deploying model {run_id} to table {schema_table_name}')
Expand Down Expand Up @@ -668,18 +665,34 @@ def _deploy_db(db_schema_name,
create_parsing_trigger(mlflow._splice_context, schema_table_name, primary_key, run_id, classes, model_type, verbose)
print('Done.')

add_model_to_metadata(mlflow._splice_context, run_id, schema_table_name)


except Exception as e:
import traceback
print('Model deployment failed. Rolling back transactions')
# drop_tables_on_failure(mlflow._splice_context, schema_table_name, run_id)
exc = 'Model deployment failed. Rolling back transactions.\n'
print(exc)
drop_tables_on_failure(mlflow._splice_context, schema_table_name, run_id, model_already_exists)
if not verbose:
exc += 'For more insight into the SQL statement that generated this error, rerun with verbose=True'
traceback.print_exc()
raise SpliceMachineException(exc)

print('Model Deployed.')

@_mlflow_patch('get_deployed_models')
def _get_deployed_models() -> PandasDF:
"""
Get the currently deployed models in the database
:return: Pandas df
"""

return mlflow._splice_context.df(
"""
SELECT * FROM MLMANAGER.LIVE_MODEL_STATUS
"""
).toPandas()


def apply_patches():
"""
Expand All @@ -689,7 +702,7 @@ def apply_patches():
targets = [_register_splice_context, _lp, _lm, _timer, _log_artifact, _log_feature_transformations,
_log_model_params, _log_pipeline_stages, _log_model, _load_model, _download_artifact,
_start_run, _current_run_id, _current_exp_id, _deploy_aws, _deploy_azure, _deploy_db, _login_director,
_get_run_ids_by_name]
_get_run_ids_by_name, _get_deployed_models]

for target in targets:
gorilla.apply(gorilla.Patch(mlflow, target.__name__.lstrip('_'), target, settings=_GORILLA_SETTINGS))
Expand Down
129 changes: 97 additions & 32 deletions splicemachine/mlflow_support/utilities.py
Expand Up @@ -53,7 +53,10 @@ class H2OUtils:
def prep_model_for_deployment(splice_context: PySpliceContext,
model: H2OModel,
classes: List[str],
run_id: str) -> (H2OModelType, List[str]):
run_id: str,
df: SparkDF or None,
pred_threshold: float or None,
sklearn_args: Dict[str,str] or None) -> (H2OModelType, List[str]):
"""
Gets the H2O mojo model
Gets the model type
Expand All @@ -69,7 +72,7 @@ def prep_model_for_deployment(splice_context: PySpliceContext,

# Get the H2O MOJO model and insert it into the MODELS table
h2omojo, rawmojo = H2OUtils.get_h2omojo_model(splice_context, model)
H2OUtils.insert_h2omojo_model(splice_context, run_id, h2omojo)
model_already_exists = H2OUtils.insert_h2omojo_model(splice_context, run_id, h2omojo)

# Get model type
model_type, model_category = H2OUtils.get_model_type(h2omojo)
Expand Down Expand Up @@ -106,7 +109,7 @@ def prep_model_for_deployment(splice_context: PySpliceContext,
elif model_category == 'AnomalyDetection':
classes = ['score', 'normalizedScore']

return model_type, classes
return model_type, classes, model_already_exists

@staticmethod
def get_model_type(h2omojo: object) -> (H2OModelType, str):
Expand Down Expand Up @@ -153,14 +156,14 @@ def load_h2o_model(model_blob: bytes) -> H2OModel:
return model

@staticmethod
def insert_h2omojo_model(splice_context: PySpliceContext, run_id: str, model: object) -> None:
def insert_h2omojo_model(splice_context: PySpliceContext, run_id: str, model: object) -> bool:
baos = splice_context.jvm.java.io.ByteArrayOutputStream()
oos = splice_context.jvm.java.io.ObjectOutputStream(baos)
oos.writeObject(model)
oos.flush()
oos.close()
byte_array = baos.toByteArray()
insert_model(splice_context, run_id, byte_array, 'h2omojo', h2o.__version__)
return insert_model(splice_context, run_id, byte_array, 'h2omojo', h2o.__version__)


class SKUtils:
Expand All @@ -174,9 +177,9 @@ def load_sklearn_model(model_blob: bytes):
return load_pickle_string(model_blob)

@staticmethod
def insert_sklearn_model(splice_context: PySpliceContext, run_id: str, model: ScikitModel) -> None:
def insert_sklearn_model(splice_context: PySpliceContext, run_id: str, model: ScikitModel) -> bool:
byte_stream = save_pickle_string(model)
insert_model(splice_context, run_id, byte_stream, 'sklearn', sklearn_version)
return insert_model(splice_context, run_id, byte_stream, 'sklearn', sklearn_version)

@staticmethod
def validate_sklearn_args(model: ScikitModel, sklearn_args: Dict[str, str]) -> Dict[str, str]:
Expand Down Expand Up @@ -207,7 +210,11 @@ def validate_sklearn_args(model: ScikitModel, sklearn_args: Dict[str, str]) -> D
t = ('return_std', 'return_cov')
exc = f'predict_args value is invalid. Available options are {t}'
else:
model_params = get_model_params(model.predict) if hasattr(model, 'predict') else get_model_params(model.transform)
if isinstance(model, SKPipeline): # If we are working with a Pipeline, we want to check the last step for arguments
m = model.steps[-1][-1]
model_params = get_model_params(m.predict) if hasattr(m, 'predict') else get_model_params(m.transform)
else:
model_params = get_model_params(model.predict) if hasattr(model, 'predict') else get_model_params(model.transform)
if p not in model_params.parameters:
exc = f'predict_args set to {p} but that parameter is not available for this model!'
elif sklearn_args and 'predict_args' not in sklearn_args and 'predict_call' not in sklearn_args:
Expand All @@ -224,12 +231,14 @@ def prep_model_for_deployment(splice_context: PySpliceContext,
model: ScikitModel,
classes: List[str],
run_id: str,
sklearn_args: Dict[str, str]) -> (SklearnModelType, List[str]):
df: SparkDF or None,
pred_threshold: float or None,
sklearn_args: Dict[str,str] or None) -> (SklearnModelType, List[str]):

sklearn_args = SKUtils.validate_sklearn_args(model, sklearn_args)

model_type = SKUtils.get_model_type(model, sklearn_args)
SKUtils.insert_sklearn_model(splice_context, run_id, model)
model_already_exists = SKUtils.insert_sklearn_model(splice_context, run_id, model)
if classes and model_type != SklearnModelType.KEY_VALUE:
print('Prediction labels found but model is not type Classification. Removing labels')
classes = None
Expand Down Expand Up @@ -260,7 +269,7 @@ def prep_model_for_deployment(splice_context: PySpliceContext,
if classes:
print(f'Prediction labels found. Using {classes} as labels for predictions {list(range(0, len(classes)))} respectively')

return model_type, classes
return model_type, classes, model_already_exists

@staticmethod
def get_pipeline_model_type(pipeline: SKPipeline) -> SklearnModelType:
Expand Down Expand Up @@ -322,12 +331,12 @@ def load_keras_model(model_blob):
return load_kr_model(hfile)

@staticmethod
def insert_keras_model(splice_context: PySpliceContext, run_id: str, model: KerasModel) -> None:
def insert_keras_model(splice_context: PySpliceContext, run_id: str, model: KerasModel) -> bool:
model.save('/tmp/model.h5')
with open('/tmp/model.h5', 'rb') as f:
byte_stream = bytearray(bytes(f.read()))
insert_model(splice_context, run_id, byte_stream, 'keras', KERAS_VERSION)
remove('/tmp/model.h5')
return insert_model(splice_context, run_id, byte_stream, 'keras', KERAS_VERSION)

@staticmethod
def get_keras_model_type(model: KerasModel, pred_threshold: float) -> KerasModelType:
Expand Down Expand Up @@ -364,7 +373,9 @@ def prep_model_for_deployment(splice_context: PySpliceContext,
model: KerasModel,
classes: List[str],
run_id: str,
pred_threshold: float) -> (KerasModelType, List[str]):
df: SparkDF or None,
pred_threshold: float or None,
sklearn_args: Dict[str,str] or None)-> (KerasModelType, List[str]):
"""
Inserts the model into the MODELS table for deployment
Gets the Keras model type
Expand All @@ -377,7 +388,7 @@ def prep_model_for_deployment(splice_context: PySpliceContext,
:return: (KerasModelType, List[str]) the modelType and the classes
"""
KerasUtils.validate_keras_model(model)
KerasUtils.insert_keras_model(splice_context, run_id, model)
model_already_exists = KerasUtils.insert_keras_model(splice_context, run_id, model)
model_type: KerasModelType = KerasUtils.get_keras_model_type(model, pred_threshold)
if model_type == KerasModelType.KEY_VALUE:
output_shape = model.layers[-1].output_shape
Expand All @@ -390,7 +401,7 @@ def prep_model_for_deployment(splice_context: PySpliceContext,
classes = ['prediction'] + classes
if len(classes) > 2 and pred_threshold:
print(f"Found multiclass model with pred_threshold {pred_threshold}. Ignoring threshold.")
return model_type, classes
return model_type, classes, model_already_exists



Expand Down Expand Up @@ -610,9 +621,11 @@ def load_spark_model(splice_ctx, spark_pipeline_blob):
@staticmethod
def prep_model_for_deployment(splice_context: PySpliceContext,
fittedPipe: PipelineModel,
df: SparkDF,
classes: List[str],
run_id: str) -> (SparkModelType, List[str]):
run_id: str,
df: SparkDF,
pred_threshold: float or None,
sklearn_args: Dict[str,str] or None) -> (SparkModelType, List[str]):
"""
All preprocessing steps to prepare for in DB deployment. Get the mleap model, get class labels
:param fittedPipe:
Expand Down Expand Up @@ -644,9 +657,9 @@ def prep_model_for_deployment(splice_context: PySpliceContext,
df = fittedPipe.transform(df)
# Get the Mleap model and insert it into the MODELS table
mleap_model = get_mleap_model(splice_context, fittedPipe, df, run_id)
insert_mleap_model(splice_context, run_id, mleap_model)
model_already_exists = insert_mleap_model(splice_context, run_id, mleap_model)

return model_type, classes
return model_type, classes, model_already_exists


def get_model_library(model) -> DBLibraries:
Expand Down Expand Up @@ -697,7 +710,7 @@ def get_user():
" Cloud Jupyter is currently unsupported")


def insert_model(splice_context: PySpliceContext, run_id: str, byte_array: bytearray, library: str, version: str) -> None:
def insert_model(splice_context: PySpliceContext, run_id: str, byte_array: bytearray, library: str, version: str) -> bool:
"""
Insert a serialized model into the Mlmanager models table
:param splice_context: pysplicectx
Expand All @@ -713,6 +726,7 @@ def insert_model(splice_context: PySpliceContext, run_id: str, byte_array: bytea
if model_exists:
print(
'A model with this run ID is already deployed. We are NOT replacing it. We will use the currently existing model.\nTo replace, use a new run_id')
return True

else:
db_connection = splice_context.getConnection()
Expand All @@ -728,6 +742,7 @@ def insert_model(splice_context: PySpliceContext, run_id: str, byte_array: bytea

prepared_statement.execute()
prepared_statement.close()
return False


def insert_artifact(splice_context, name, byte_array, run_uuid, file_ext=None):
Expand Down Expand Up @@ -793,7 +808,9 @@ def get_mleap_model(splice_context, fittedPipe, df, run_id: str):
return obj


def insert_mleap_model(splice_context, run_id, model):
def insert_mleap_model(splice_context: PySpliceContext,
run_id: str,
model: PipelineModel or SparkModel) -> bool:
"""
Insert an MLeap Transformer model into the database as a Blob
:param splice_context: pysplicectx
Expand All @@ -809,7 +826,7 @@ def insert_mleap_model(splice_context, run_id, model):
oos.flush()
oos.close()
byte_array = baos.toByteArray()
insert_model(splice_context, run_id, byte_array, 'mleap', MLEAP_VERSION)
return insert_model(splice_context, run_id, byte_array, 'mleap', MLEAP_VERSION)


def validate_primary_key(splice_ctx: PySpliceContext,
Expand Down Expand Up @@ -994,8 +1011,8 @@ def create_vti_prediction_trigger(splice_context: PySpliceContext,
prediction_call += f", '{pred_threshold}'"

prediction_call += ')'

SQL_PRED_TRIGGER = f'CREATE TRIGGER runModel_{schema_table_name.replace(".", "_")}_{run_id}\n \tAFTER INSERT\n ' \
schema = schema_table_name.split('.')[0]
SQL_PRED_TRIGGER = f'CREATE TRIGGER {schema}.runModel_{schema_table_name.replace(".", "_")}_{run_id}\n \tAFTER INSERT\n ' \
f'\tON {schema_table_name}\n \tREFERENCING NEW AS NEWROW\n \tFOR EACH ROW\n \t\tUPDATE ' \
f'{schema_table_name} SET ('

Expand Down Expand Up @@ -1068,7 +1085,8 @@ def create_prediction_trigger(splice_context, schema_table_name, run_id, feature
elif model_type == H2OModelType.KEY_VALUE:
prediction_call = 'MLMANAGER.PREDICT_KEY_VALUE'

SQL_PRED_TRIGGER = f'CREATE TRIGGER runModel_{schema_table_name.replace(".", "_")}_{run_id}\n \tBEFORE INSERT\n ' \
schema = schema_table_name.split('.')[0]
SQL_PRED_TRIGGER = f'CREATE TRIGGER {schema}.runModel_{schema_table_name.replace(".", "_")}_{run_id}\n \tBEFORE INSERT\n ' \
f'\tON {schema_table_name}\n \tREFERENCING NEW AS NEWROW\n \tFOR EACH ROW\n \tBEGIN ATOMIC \t\t' \
f'SET NEWROW.PREDICTION='

Expand Down Expand Up @@ -1105,7 +1123,8 @@ def create_parsing_trigger(splice_context, schema_table_name, primary_key, run_i
:param model_type: (Enum) the model type (H2OModelType or SparkModelType)
:param verbose: (bool) whether to print the SQL query
"""
SQL_PARSE_TRIGGER = f'CREATE TRIGGER PARSERESULT_{schema_table_name.replace(".", "_")}_{run_id}' \
schema = schema_table_name.split('.')[0]
SQL_PARSE_TRIGGER = f'CREATE TRIGGER {schema}.PARSERESULT_{schema_table_name.replace(".", "_")}_{run_id}' \
f'\n \tBEFORE INSERT\n \tON {schema_table_name}\n \tREFERENCING NEW AS NEWROW\n' \
f' \tFOR EACH ROW\n \t\tBEGIN ATOMIC\n\t set '
set_prediction_case_str = 'NEWROW.PREDICTION=\n\t\tCASE\n'
Expand Down Expand Up @@ -1187,11 +1206,57 @@ def get_df_for_mleap(splice_ctx: PySpliceContext,

return df

def add_model_to_metadata(splice_context: PySpliceContext,
run_id: str,
schema_table_name: str) -> None:

if splice_context.tableExists(f'{SQL.MLMANAGER_SCHEMA}.MODEL_METADATA'):
schema_table_name = schema_table_name.upper()
schema, table = schema_table_name.split('.')

table_id = splice_context.df(f"select a.tableid from sys.systables a join sys.sysschemas b on a.schemaid=b.schemaid "
f"where a.tablename='{table}' and b.schemaname='{schema}'").collect()[0][0]

trigger_name_1 = f"RUNMODEL_{schema_table_name.replace('.','_')}_{run_id}".upper()
trigger_id_1, create_ts = splice_context.df(f"select triggerid, varchar(creationtimestamp) from sys.systriggers "
f"where triggername='{trigger_name_1}' and tableid='{table_id}'")\
.collect()[0]

def drop_tables_on_failure(splice_context, schema_table_name, run_id) -> None:
# Not all models will have a second trigger
trigger_name_2 = f"PARSERESULT_{schema_table_name.replace('.', '_')}_{run_id}".upper()
trigger_id_2 = splice_context.df(f"select triggerid from sys.systriggers where triggername='{trigger_name_2}' "
f"and tableid='{table_id}'").collect()

# Adding extra single quote to trigger_id_2 case NULL
trigger_id_2 = f"'{trigger_id_2[0][0]}'" if trigger_id_2 else 'NULL'

# We don't add the quotes around trigger_id_2 here because we handle it above in the NULL case
splice_context.execute(f"INSERT INTO {SQL.MLMANAGER_SCHEMA}.MODEL_METADATA"
f"(RUN_UUID, ACTION, TABLEID, TRIGGER_TYPE, TRIGGERID, TRIGGERID_2, DB_ENV, DB_USER, ACTION_DATE)"
f"values ('{run_id}', 'DEPLOYED', '{table_id}', 'INSERT', '{trigger_id_1}', {trigger_id_2},"
f"'PROD', '{get_user()}', '{create_ts}')")



def drop_tables_on_failure(splice_context: PySpliceContext,
schema_table_name: str,
run_id: str,
model_already_exists: bool) -> None:
"""
Drop the tables if the db deployment fails
Due to some limitations DB-7726 we can't use fully utilize a single consistent JDBC connection using NSDS
So we will try to rollback on failure using basic logic.
If the model was already in the models table (ie it had been deployed before), we will leave it. Otherwise, delete
Leave the tables.
"""
splice_context.execute(f'DROP TABLE IF EXISTS {schema_table_name}')
splice_context.execute(f'DROP TABLE IF EXISTS {schema_table_name}_preds')
splice_context.execute(f'DELETE FROM {SQL.MLMANAGER_SCHEMA}.MODELS WHERE RUN_UUID=\'{run_id}\'')

# splice_context.execute(f'DROP TABLE IF EXISTS {schema_table_name}')
if not model_already_exists:
splice_context.execute(f'DELETE FROM {SQL.MLMANAGER_SCHEMA}.MODELS WHERE RUN_UUID=\'{run_id}\'')

ModelUtils = {
DBLibraries.MLeap: SparkUtils,
DBLibraries.H2OMOJO: H2OUtils,
DBLibraries.Keras: KerasUtils,
DBLibraries.SKLearn: SKUtils,
}

0 comments on commit 29d0ed2

Please sign in to comment.