# `hops-util-py` Integration Tests

This notebook can be converted to a python file and submitted as a spark job for integration tests

## Imports

In [1]:
from hops import experiment, hdfs, tensorboard, devices, kafka, featurestore, tls, util, serving, model, constants
from hops.experiment import Direction
from hops.model import Metric
import stat
import os
import shutil
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, IntegerType, FloatType
import pandas as pd
import numpy as np
import datetime
import time
import json
from pyspark.sql import DataFrame
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.sql import SparkSession
import tensorflow as tf
import sys
import random
from confluent_kafka import Producer, Consumer, KafkaError

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
62,application_1582118837056_0052,pyspark,idle,Link,Link


SparkSession available as 'spark'.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])

## Experiment API Tests

In [2]:
def exp_asserts():
    from hops import tensorboard
    from hops import devices
    from hops import hdfs
    import os
    assert tensorboard.logdir() != None
    assert devices.get_num_gpus() == 0
    assert hdfs.project_path() == hdfs.project_path(hdfs.project_name())
    if tensorboard.local_logdir_bool:
        assert "hdfs://" not in tensorboard.logdir()
        assert os.path.exists(tensorboard.logdir())
    else:
        assert "hdfs://" in tensorboard.logdir()
        assert hdfs.exists(tensorboard.logdir())

In [3]:
def no_ret():
    exp_asserts()

In [4]:
def no_ret_params(a, b):
    exp_asserts()

In [5]:
def single_ret_raw_value():
    exp_asserts()
    return 10

In [6]:
def single_ret_raw_value_params(a, b):
    exp_asserts()
    return a+b

In [7]:
def single_ret_path():
    exp_asserts()
    f = open('testfile.txt', 'w')
    f.write('stuff happened')
    f.close()
    return {'logfile': 'testfile.txt'}

In [8]:
def single_ret_path_params(a, b):
    exp_asserts()
    f = open('testfile.txt', 'w')
    f.write('stuff happened')
    f.close()
    return {'logfile': 'testfile.txt'}

In [9]:
def single_ret_val():
    exp_asserts()
    return {'value': -10.3}

In [10]:
def single_ret_val_params(a, b):
    exp_asserts()
    return {'value': a+b}

In [11]:
def multi_ret():
    exp_asserts()
    f = open('testfile.txt', 'w')
    f.write('stuff happened')
    f.close()
    return {'value': 10, 'morevals': 0.5, 'logfile': 'testfile.txt'}

In [12]:
def multi_ret_params(a, b):
    exp_asserts()
    f = open('testfile.txt', 'w')
    f.write('stuff happened')
    f.close()
    return {'value': a+b, 'morevals': b, 'logfile': 'testfile.txt', 'diagram': 'img.png'}

In [13]:
def create_model_in_tensorboard_logdir():
    from hops import tensorboard
    from hops import hdfs
    import os
    import uuid
    model_name = str(uuid.uuid4())
    
    if tensorboard.logdir():
        if os.path.exists(tensorboard.logdir()):
            os.mkdir(tensorboard.logdir() + '/model')
            f = open(tensorboard.logdir() + '/model/model.pb', 'w')
            f.write('model')
            f.close()
        else:
            hdfs.mkdir(tensorboard.logdir() + '/model')
            hdfs.dump("model", tensorboard.logdir() + '/model/model.pb')
    
    return {'name': model_name}

def create_model_in_tensorboard_logdir_params(a, b):
    from hops import tensorboard
    from hops import hdfs
    import os
    import uuid
    model_name = str(uuid.uuid4())
    model_path = tensorboard.logdir() + '/model/' + model_name
    
    #create a 'model'
    if tensorboard.logdir():
        if os.path.exists(tensorboard.logdir()):
            os.mkdir(tensorboard.logdir() + '/model')
            f = open(tensorboard.logdir() + '/model/model.pb', 'w')
            f.write('model')
            f.close()
        else:
            hdfs.mkdir(tensorboard.logdir() + '/model')
            hdfs.dump("model", tensorboard.logdir() + '/model/model.pb')
    
    return {'name': model_name, 'optval': a+b}

In [14]:
def export_model_in_wrapper():
    ret_dict = create_model_in_tensorboard_logdir()
    from hops import model
    from hops import tensorboard
    if tensorboard.logdir():
        model.export(tensorboard.logdir() + '/model', ret_dict['name'])

In [15]:
def assert_return_values(logdir, hp_dict, should_return_hp_dict, return_dict, should_return_return_dict):
    assert hdfs.exists(logdir)
    
    if should_return_hp_dict:
        print('Asserting hp_dict {} is a dict'.format(hp_dict))
        assert type(hp_dict) == dict
    else:
        assert not hp_dict
        
    if should_return_return_dict:
        print('Asserting return_dict {} is a dict'.format(return_dict))
        assert type(return_dict) == dict    
    else:
        assert not return_dict

##### Test `experiment.launch`

In [16]:
params={'a': [-5, 4.9], 'b': [-8, 10.3]}

logdir, return_dict = experiment.launch(no_ret, local_logdir=False, name='no ret')
assert_return_values(logdir, None, False, return_dict, True)                

Finished Experiment 

Asserting return_dict {'log': 'Experiments/application_1576760828949_0042_1/output.log'} is a dict

In [17]:
logdir, return_dict = experiment.launch(no_ret, local_logdir=True, name='no ret')
assert_return_values(logdir, None, False, return_dict, True)

Finished Experiment 

Asserting return_dict {'log': 'Experiments/application_1576760828949_0042_2/output.log'} is a dict

In [18]:
logdir, return_dict = experiment.launch(no_ret_params, params, local_logdir=True, name='no ret params')
assert_return_values(logdir, None, False, return_dict, False)

Finished Experiment

In [19]:
logdir, return_dict = experiment.launch(no_ret_params, params, local_logdir=False, name='no ret params')
assert_return_values(logdir, None, False, return_dict, False)

Finished Experiment

In [20]:
logdir, return_dict = experiment.launch(single_ret_raw_value, local_logdir=False, name='single ret raw value')
assert_return_values(logdir, None, False, return_dict, True)

Finished Experiment 

Asserting return_dict {'metric': 10, 'log': 'Experiments/application_1576760828949_0042_5/output.log'} is a dict

In [21]:
logdir, return_dict = experiment.launch(single_ret_raw_value, local_logdir=True, name='single ret raw value')
assert_return_values(logdir, None, False, return_dict, True)

Finished Experiment 

Asserting return_dict {'metric': 10, 'log': 'Experiments/application_1576760828949_0042_6/output.log'} is a dict

In [22]:
logdir, return_dict = experiment.launch(single_ret_raw_value_params, params, local_logdir=True, name='single ret raw value params')
assert_return_values(logdir, None, False, return_dict, False)

Finished Experiment

In [23]:
logdir, return_dict = experiment.launch(single_ret_raw_value_params, params, local_logdir=False, name='single ret raw value params')
assert_return_values(logdir, None, False, return_dict, False)

Finished Experiment

In [24]:
logdir, return_dict = experiment.launch(single_ret_path, local_logdir=False, description='some custom desc', name='single ret path')
assert_return_values(logdir, None, False, return_dict, True)

Finished Experiment 

Asserting return_dict {'logfile': 'Experiments/application_1576760828949_0042_9/testfile.txt', 'log': 'Experiments/application_1576760828949_0042_9/output.log'} is a dict

In [25]:
logdir, return_dict = experiment.launch(single_ret_path, local_logdir=True, name='single ret path')
assert_return_values(logdir, None, False, return_dict, True)

Finished Experiment 

Asserting return_dict {'logfile': 'Experiments/application_1576760828949_0042_10/testfile.txt', 'log': 'Experiments/application_1576760828949_0042_10/output.log'} is a dict

In [26]:
logdir, return_dict = experiment.launch(single_ret_path_params, params, local_logdir=True, name='single ret path params')
assert_return_values(logdir, None, False, return_dict, False)

Finished Experiment

In [27]:
logdir, return_dict = experiment.launch(single_ret_path_params, params, local_logdir=False, name='single ret path params')
assert_return_values(logdir, None, False, return_dict, False)

Finished Experiment

In [28]:
logdir, return_dict = experiment.launch(single_ret_val, local_logdir=False, name='single ret val', description='some custom desc')
assert_return_values(logdir, None, False, return_dict, True)

Finished Experiment 

Asserting return_dict {'value': -10.3, 'log': 'Experiments/application_1576760828949_0042_13/output.log'} is a dict

In [29]:
logdir, return_dict = experiment.launch(single_ret_val, local_logdir=True, name='single ret val')
assert_return_values(logdir, None, False, return_dict, True)

Finished Experiment 

Asserting return_dict {'value': -10.3, 'log': 'Experiments/application_1576760828949_0042_14/output.log'} is a dict

In [30]:
logdir, return_dict = experiment.launch(single_ret_val_params, params, local_logdir=True, name='single ret val params')
assert_return_values(logdir, None, False, return_dict, False)

Finished Experiment

In [31]:
logdir, return_dict = experiment.launch(single_ret_val_params, params, local_logdir=False, name='single ret val params')
assert_return_values(logdir, None, False, return_dict, False)

Finished Experiment

In [32]:
logdir, return_dict = experiment.launch(multi_ret, local_logdir=False, name='multi ret', metric_key='value')
assert_return_values(logdir, None, False, return_dict, True)

Finished Experiment 

Asserting return_dict {'value': 10, 'morevals': 0.5, 'logfile': 'Experiments/application_1576760828949_0042_17/testfile.txt', 'log': 'Experiments/application_1576760828949_0042_17/output.log'} is a dict

In [33]:
logdir, return_dict = experiment.launch(multi_ret, local_logdir=True, name='multi ret', metric_key='morevals')
assert_return_values(logdir, None, False, return_dict, True)

Finished Experiment 

Asserting return_dict {'value': 10, 'morevals': 0.5, 'logfile': 'Experiments/application_1576760828949_0042_18/testfile.txt', 'log': 'Experiments/application_1576760828949_0042_18/output.log'} is a dict

In [34]:
logdir, return_dict = experiment.launch(multi_ret_params, params, local_logdir=False, name='multi ret params')
assert_return_values(logdir, None, False, return_dict, False)

Finished Experiment

In [35]:
logdir, return_dict = experiment.launch(multi_ret_params, params, local_logdir=True, name='multi ret params')
assert_return_values(logdir, None, False, return_dict, False)

Finished Experiment

In [36]:
params={'a': [-5], 'b': [-8]}

logdir, return_dict = experiment.launch(single_ret_raw_value_params, params, local_logdir=True, name='multi ret params single comb')
assert_return_values(logdir, None, False, return_dict, True)

Finished Experiment 

Asserting return_dict {'metric': -13, 'log': 'Experiments/application_1576760828949_0042_21/a=-5&b=-8/output.log'} is a dict

In [37]:
logdir, return_dict = experiment.launch(multi_ret_params, params, local_logdir=True, name='multi ret params single comb', metric_key='value')
assert_return_values(logdir, None, False, return_dict, True)

Finished Experiment 

Asserting return_dict {'value': -13, 'morevals': -8, 'logfile': 'Experiments/application_1576760828949_0042_22/a=-5&b=-8/testfile.txt', 'diagram': 'img.png', 'log': 'Experiments/application_1576760828949_0042_22/a=-5&b=-8/output.log'} is a dict

In [38]:
experiment.launch(export_model_in_wrapper, name='model exported in wrapper', local_logdir=False)

experiment.launch(export_model_in_wrapper, name='model exported in wrapper', local_logdir=True)

logdir, return_dict = experiment.launch(create_model_in_tensorboard_logdir, local_logdir=True, name='model exported from local logdir')
model.export(logdir + '/model', return_dict['name'])

logdir, return_dict = experiment.launch(create_model_in_tensorboard_logdir, local_logdir=False, name='model exported from hdfs logdir')
model.export(logdir + '/model', return_dict['name'])

Finished Experiment 

Finished Experiment 

Finished Experiment 

Exported model 4cbdabe6-be0f-4f13-aa00-6617a3d89494 as version 1 successfully.
Polling 4cbdabe6-be0f-4f13-aa00-6617a3d89494 version 1 for model availability.
Model now available.
Finished Experiment 

Exported model 36798f2e-5b54-40a2-821e-c59f794d4694 as version 1 successfully.
Polling 36798f2e-5b54-40a2-821e-c59f794d4694 version 1 for model availability.
Model now available.

In [39]:
def assert_best_hyperparameters(return_dict, best_hyperparameters):
    print('Asserting best hyperparameters in return_dict {} are {}'.format(return_dict, best_hyperparameters))
    for key in best_hyperparameters.keys():
        assert float(best_hyperparameters[key]) == float(return_dict[key]), '{} not equal to {}'.format(best_hyperparameters[key], return_dict[key])

In [40]:
def assert_return_dict(logdir, return_dict):
    return_dict_contents = hdfs.load(logdir + '/.outputs.json')
    logdir_return_dict = json.loads(return_dict_contents)
    print('Assserting returned dict {} is equal to .return in best logdir {}'.format(return_dict, logdir_return_dict))
    assert return_dict == logdir_return_dict, 'dicts are not the same {} - {}'.format(return_dict, logdir_return_dict)

##### Test Parallel Experiments `experiment.grid_search`

In [41]:
params={'a': [-5, 4.9], 'b': [-8, 10.3]}
try:
    experiment.grid_search(no_ret_params, params, name='fail no ret val')
    assert False, 'should fail due to no return value'
except:
    pass

try:
    experiment.grid_search(multi_ret_params, params, name='fail no opt key')
    assert False, 'should fail due to optimization_key not being set'
except:
    pass
    
logdir, hp_dict, return_dict = experiment.grid_search(single_ret_raw_value_params, params, local_logdir=True, direction=Direction.MIN)
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.grid_search(single_ret_raw_value_params, params, local_logdir=False, direction=Direction.MAX)
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.grid_search(single_ret_val_params, params, local_logdir=True, direction=Direction.MIN, optimization_key='value')
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.grid_search(single_ret_val_params, params, local_logdir=False, direction=Direction.MAX, optimization_key='value')
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.grid_search(multi_ret_params, params, local_logdir=False, direction=Direction.MIN, optimization_key='value')
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, metric = experiment.grid_search(multi_ret_params, params, local_logdir=True, direction=Direction.MAX, optimization_key='morevals')
assert_return_values(logdir, hp_dict, True, return_dict, True)

params={'a': [-1, 1.5], 'b': [-1.5, 1]}

# Make sure minimization work
logdir, hp_dict, return_dict = experiment.grid_search(single_ret_raw_value_params, params, local_logdir=True, direction=Direction.MIN)
assert_return_values(logdir, hp_dict, True, return_dict, True)
assert_best_hyperparameters(hp_dict, {'a': -1, 'b': -1.5})
assert_return_dict(logdir, return_dict)

# Make sure maximization work
logdir, hp_dict, return_dict = experiment.grid_search(single_ret_raw_value_params, params, local_logdir=True, direction=Direction.MAX)
assert_return_values(logdir, hp_dict, True, return_dict, True)
assert_best_hyperparameters(hp_dict, {'a': 1.5, 'b': 1})
assert_return_dict(logdir, return_dict)

best_logdir, hp_dict, return_dict = experiment.grid_search(create_model_in_tensorboard_logdir_params, params, local_logdir=True, name='grid search model exported from local logdir', optimization_key='optval', direction=Direction.MIN)
model.export(best_logdir + '/model', return_dict['name'])

best_logdir, hp_dict, return_dict = experiment.grid_search(create_model_in_tensorboard_logdir_params, params, local_logdir=False, name='grid search model exported from hdfs logdir', optimization_key='optval', direction=Direction.MAX)
model.export(best_logdir + '/model', return_dict['name'])

Finished Experiment 

Asserting hp_dict {'a': -5, 'b': -8} is a dict
Asserting return_dict {'metric': -13, 'log': 'Experiments/application_1576760828949_0038_29/a=-5&b=-8/output.log'} is a dict
Finished Experiment 

Asserting hp_dict {'a': 4.9, 'b': 10.3} is a dict
Asserting return_dict {'metric': 15.200000000000001, 'log': 'Experiments/application_1576760828949_0038_30/a=4.9&b=10.3/output.log'} is a dict
Finished Experiment 

Asserting hp_dict {'a': -5, 'b': -8} is a dict
Asserting return_dict {'value': -13, 'log': 'Experiments/application_1576760828949_0038_31/a=-5&b=-8/output.log'} is a dict
Finished Experiment 

Asserting hp_dict {'a': 4.9, 'b': 10.3} is a dict
Asserting return_dict {'value': 15.200000000000001, 'log': 'Experiments/application_1576760828949_0038_32/a=4.9&b=10.3/output.log'} is a dict
Finished Experiment 

Asserting hp_dict {'a': -5, 'b': -8} is a dict
Asserting return_dict {'value': -13, 'morevals': -8, 'logfile': 'Experiments/application_1576760828949_0038_33/a=-5

##### Test Parallel Experiments `experiment.random_search`

In [42]:
params={'a': [-5, 4.9], 'b': [-8, 10.3]}
try:
    experiment.random_search(no_ret_params, params, samples=2, name='fail opt no ret')
    assert False, 'should fail due to no return value'
except:
    pass

try:
    experiment.random_search(multi_ret_params, params, samples=2, name='fail opt no key')
    assert False, 'should fail due to optimization_key not being set'
except:
    pass

logdir, hp_dict, return_dict = experiment.random_search(single_ret_raw_value_params, params, samples=2, local_logdir=True, direction=Direction.MIN)
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.random_search(single_ret_raw_value_params, params, samples=2, local_logdir=False, direction=Direction.MAX)
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.random_search(single_ret_val_params, params, samples=2, local_logdir=True, direction=Direction.MIN, optimization_key='value')
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.random_search(single_ret_val_params, params, samples=2, local_logdir=False, direction=Direction.MAX, optimization_key='value')
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.random_search(multi_ret_params, params, samples=2, local_logdir=False, direction=Direction.MAX, optimization_key='value')
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.random_search(multi_ret_params, params, samples=2, local_logdir=True, direction=Direction.MIN, optimization_key='morevals')
assert_return_values(logdir, hp_dict, True, return_dict, True)

params={'a': [-1, 1], 'b': [-1, 1]}

# Make sure minimization work
logdir, hp_dict, return_dict = experiment.random_search(single_ret_raw_value_params, params, local_logdir=True, direction=Direction.MIN, samples=100)
assert_return_values(logdir, hp_dict, True, return_dict, True)
assert_best_hyperparameters(hp_dict, {'a': -1, 'b': -1})
assert_return_dict(logdir, return_dict)

# Make sure maximization work
logdir, hp_dict, return_dict = experiment.random_search(single_ret_raw_value_params, params, local_logdir=True, direction=Direction.MAX, samples=100)
assert_return_values(logdir, hp_dict, True, return_dict, True)
assert_best_hyperparameters(hp_dict, {'a': 1, 'b': 1})
assert_return_dict(logdir, return_dict)

best_logdir, hp_dict, return_dict = experiment.random_search(create_model_in_tensorboard_logdir_params, params, local_logdir=True, name='random search model exported from local logdir', optimization_key='optval', direction=Direction.MIN)
model.export(best_logdir + '/model', return_dict['name'])

best_logdir, hp_dict, return_dict = experiment.random_search(create_model_in_tensorboard_logdir_params, params, local_logdir=False, name='random search model exported from hdfs logdir', optimization_key='optval', direction=Direction.MAX)
model.export(best_logdir + '/model', return_dict['name'])

Finished Experiment 

Asserting hp_dict {'a': 1.2390300985714848, 'b': -6.451446027582621} is a dict
Asserting return_dict {'metric': -5.2124159290111365, 'log': 'Experiments/application_1576760828949_0038_41/a=1.2390300985714848&b=-6.451446027582621/output.log'} is a dict
Finished Experiment 

Asserting hp_dict {'a': -0.032969382452864515, 'b': 8.682981990601842} is a dict
Asserting return_dict {'metric': 8.650012608148977, 'log': 'Experiments/application_1576760828949_0038_42/a=-0.032969382452864515&b=8.682981990601842/output.log'} is a dict
Finished Experiment 

Asserting hp_dict {'a': -3.228158062746669, 'b': -5.604547101493724} is a dict
Asserting return_dict {'value': -8.832705164240393, 'log': 'Experiments/application_1576760828949_0038_43/a=-3.228158062746669&b=-5.604547101493724/output.log'} is a dict
Finished Experiment 

Asserting hp_dict {'a': -0.7379360799420356, 'b': 8.274378875045688} is a dict
Asserting return_dict {'value': 7.536442795103652, 'log': 'Experiments/applic

##### Test Parallel Experiments `experiment.differential_evolution`

In [43]:
params={'a': [1, 4.9], 'b': [3, 10.3]}
try:
    experiment.differential_evolution(no_ret_params, params, name='fail opt no ret')
    assert False, 'should fail due to no return value'
except:
    pass

try:
    experiment.differential_evolution(multi_ret_params, params, name='fail opt no key')
    assert False, 'should fail due to optimization_key not being set'
except:
    pass

logdir, hp_dict, return_dict = experiment.differential_evolution(single_ret_raw_value_params, params, local_logdir=True, direction=Direction.MIN)
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.differential_evolution(single_ret_raw_value_params, params, local_logdir=False, direction=Direction.MAX)
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.differential_evolution(single_ret_val_params, params, local_logdir=True, direction=Direction.MIN, optimization_key='value')
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.differential_evolution(single_ret_val_params, params,local_logdir=False, direction=Direction.MAX, optimization_key='value')
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.differential_evolution(multi_ret_params, params, local_logdir=False, direction=Direction.MAX, optimization_key='value')
assert_return_values(logdir, hp_dict, True, return_dict, True)

logdir, hp_dict, return_dict = experiment.differential_evolution(multi_ret_params, params, local_logdir=True, direction=Direction.MIN, optimization_key='morevals')
assert_return_values(logdir, hp_dict, True, return_dict, True)

params={'a': [4, 5], 'b': [1, 2]}

# Make sure minimization work
logdir, hp_dict, return_dict = experiment.differential_evolution(single_ret_raw_value_params, params, local_logdir=True, direction=Direction.MIN)
assert_return_values(logdir, hp_dict, True, return_dict, True)
assert_best_hyperparameters(hp_dict, {'a': 4, 'b': 1})
assert_return_dict(logdir, return_dict)

# Make sure maximization work
logdir, hp_dict, return_dict = experiment.differential_evolution(single_ret_raw_value_params, params, local_logdir=True, direction=Direction.MAX)
assert_return_values(logdir, hp_dict, True, return_dict, True)
assert_best_hyperparameters(hp_dict, {'a': 5, 'b': 2})
assert_return_dict(logdir, return_dict)

best_logdir, hp_dict, return_dict = experiment.differential_evolution(create_model_in_tensorboard_logdir_params, params, local_logdir=True, name='diff evo model exported from local logdir', optimization_key='optval', direction=Direction.MIN)
model.export(best_logdir + '/model', return_dict['name'])

best_logdir, hp_dict, return_dict = experiment.differential_evolution(create_model_in_tensorboard_logdir_params, params, local_logdir=False, name='diff evo model exported from hdfs logdir', optimization_key='optval', direction=Direction.MAX)
model.export(best_logdir + '/model', return_dict['name'])

Generation 1 || average metric: 8.166666666666666, best metric: 7.0, best parameter combination: ['a=3', 'b=4']

Generation 2 || average metric: 7.166666666666667, best metric: 7.0, best parameter combination: ['a=3', 'b=4']

Generation 3 || average metric: 7.0, best metric: 7.0, best parameter combination: ['a=3', 'b=4']

Generation 4 || average metric: 7.0, best metric: 7.0, best parameter combination: ['a=3', 'b=4']

Finished Experiment 

Asserting hp_dict {'a': 2, 'b': 5} is a dict
Asserting return_dict {'metric': 7, 'log': 'Experiments/application_1576760828949_0038_53/generation.0/a=2&b=5/output.log'} is a dict
Generation 1 || average metric: 9.0, best metric: 11.0, best parameter combination: ['a=4', 'b=7']

Generation 2 || average metric: 9.666666666666666, best metric: 11.0, best parameter combination: ['a=3', 'b=8']

Generation 3 || average metric: 10.5, best metric: 12.0, best parameter combination: ['a=4', 'b=8']

Generation 4 || average metric: 10.833333333333334, best met

In [41]:
def dist_exp_asserts():
    from hops import tensorboard
    from hops import devices
    from hops import hdfs
    import os
    assert devices.get_num_gpus()==0
    assert hdfs.project_path() == hdfs.project_path(hdfs.project_name())
    
    tf_config = json.loads(os.environ['TF_CONFIG'])
    
    role = tf_config['task']['type']
    
    print(tensorboard.logdir())
    
    # Only chief and evaluator role should have access to TB logdir to write checkpoints/summary/evaluation etc
    if role == 'chief':
        assert tensorboard.logdir() != None, 'chief TB is None'
        if tensorboard.local_logdir_bool:
            assert "hdfs://" not in tensorboard.logdir(), 'chief TB is not local'
            assert os.path.exists(tensorboard.logdir()), 'chief local TB path does not exists'
        else:
            assert "hdfs://" in tensorboard.logdir(), 'chief TB is not in HDFS'
            assert hdfs.exists(tensorboard.logdir()), 'chief hdfs TB path does not exists'
    elif role == 'worker' or role == 'ps':
        assert tensorboard.logdir() == None, 'ps or worker TB is not None {}'.format(tf_config)
    elif role == 'evaluator':
        assert tensorboard.logdir() != None, 'evaluator TB is None'
        assert hdfs.exists(tensorboard.logdir()), 'evaluator TB path does not exists'

In [42]:
def no_ret():
    dist_exp_asserts()

In [43]:
def multi_return():
    dist_exp_asserts()
    f = open('testfile.txt', 'w')
    f.write('stuff happened')
    f.close()
    f = open('img.png', 'w')
    f.write('stuff happened')
    f.close()
    return {'value': 10, 'morevals': 3}

In [47]:
experiment.mirrored(no_ret)

Finished Experiment 

('hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Experiments/application_1576760828949_0042_33', {'log': 'Experiments/application_1576760828949_0042_33/chief_0_output.log'})

In [48]:
experiment.mirrored(multi_return)

Finished Experiment 

('hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Experiments/application_1576760828949_0042_34', {'value': 10, 'morevals': 3, 'log': 'Experiments/application_1576760828949_0042_34/output.log'})

In [49]:
experiment.mirrored(export_model_in_wrapper, name='mirrored model exported in wrapper', local_logdir=False)

experiment.mirrored(export_model_in_wrapper, name='mirrored model exported in wrapper', local_logdir=True)

logdir, return_dict = experiment.mirrored(create_model_in_tensorboard_logdir, local_logdir=True, name='mirrored model exported from local logdir')
model.export(logdir + '/model', return_dict['name'])

logdir, return_dict = experiment.mirrored(create_model_in_tensorboard_logdir, local_logdir=False, name='mirrored model exported from hdfs logdir')
model.export(logdir + '/model', return_dict['name'])

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 34.0 failed 1 times, most recent failure: Lost task 1.0 in stage 34.0 (TID 61, hopsworks0.logicalclocks.com, executor 4): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:593)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(Interruptibl

## HopsFS Tests

##### Test HopsFS operations

- `hdfs.project_user()`
- `hdfs.project_name()`
- `hdfs.project_path()`
- `hdfs.exists()`
- `hdfs.load()`
- `hdfs.copy_to_hdfs()`
- `hdfs.copy_to_local()`
- `hdfs.ls()`
- `hdfs.lsl()`
- `hdfs.glob()`
- `hdfs.cp()`
- `hdfs.rmr()`
- `hdfs.rename()`
- `hdfs.stat()`
- `hdfs.isdir()`
- `hdfs.isfile()`
- `hdfs.add_module()`
- `hdfs.delete()`
- `hdfs.get_plain_path()`

In [56]:
project_user = hdfs.project_user()
project_name = hdfs.project_name()
assert project_name in project_user
project_path = hdfs.project_path()
assert project_name in project_path

In [57]:
logs_README = hdfs.load("Logs/README.md")
assert len(logs_README) > 0

In [58]:
hdfs.dump("test", "Logs/README_dump_test.md")
assert hdfs.exists("Logs/README_dump_test.md")

In [59]:
logs_README_dumped = hdfs.load("Logs/README_dump_test.md")
assert logs_README_dumped.decode("utf-8") == "test"

In [60]:
# copy_to_hdfs file relative path

with open('upload.txt', 'w') as f:
    f.write("first upload")
hdfs.copy_to_hdfs("upload.txt", "Resources")
assert hdfs.exists("Resources/upload.txt")
hdfs_copied_file = hdfs.load("Resources/upload.txt")
assert "first upload" == hdfs_copied_file.decode("utf-8"), "first content does not match"

with open('upload.txt', 'w') as f:
    f.write("second upload")
hdfs.copy_to_hdfs("upload.txt", "Resources", overwrite=True)
assert hdfs.exists("Resources/upload.txt")
hdfs_copied_file = hdfs.load("Resources/upload.txt")
assert "second upload" == hdfs_copied_file.decode("utf-8"), "second content does not match"

try:
    hdfs.copy_to_hdfs("upload.txt", "Resources")
    assert False
except IOError:
    pass

hdfs.rmr("Resources/upload.txt")
os.remove("upload.txt")

Started copying local path upload.txt to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources

Finished copying

Started copying local path upload.txt to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/upload.txt

Finished copying

Started copying local path upload.txt to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources

In [61]:
# copy_to_hdfs file absolute path

with open('upload_absolute.txt', 'w') as f:
    f.write("first upload")
hdfs.copy_to_hdfs(os.getcwd() + "/upload_absolute.txt", "Resources")
assert hdfs.exists("Resources/upload_absolute.txt")
hdfs_copied_file = hdfs.load("Resources/upload_absolute.txt")
assert "first upload" == hdfs_copied_file.decode("utf-8"), "first content does not match"

with open('upload_absolute.txt', 'w') as f:
    f.write("second upload")
hdfs.copy_to_hdfs(os.getcwd() + "/upload_absolute.txt", "Resources", overwrite=True)
assert hdfs.exists("Resources/upload_absolute.txt")
hdfs_copied_file = hdfs.load("Resources/upload_absolute.txt")
assert "second upload" == hdfs_copied_file.decode("utf-8"), "second content does not match"

try:
    hdfs.copy_to_hdfs("upload_absolute.txt", "Resources")
    assert False
except IOError:
    pass

hdfs.rmr("Resources/upload_absolute.txt")
os.remove("upload_absolute.txt")

Started copying local path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/upload_absolute.txt to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources

Finished copying

Started copying local path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/upload_absolute.txt to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/upload_absolute.txt

Finished copying

Started copying local path upload_absolute.txt to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources

In [62]:
# copy_to_hdfs directory relative path

if not os.path.exists("upload_dir"):
    os.mkdir("upload_dir")

assert not hdfs.exists("Resources/upload_dir")
with open('upload_dir/upload.txt', 'w') as f:
    f.write("first upload")
hdfs.copy_to_hdfs("upload_dir", "Resources")
hdfs_copied_file = hdfs.load("Resources/upload_dir/upload.txt")
assert hdfs.exists("Resources/upload_dir")
with open('upload_dir/upload.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "first content compare failed"

with open('upload_dir/upload.txt', 'w') as f:
    f.write("second upload")
hdfs.copy_to_hdfs("upload_dir", "Resources", overwrite=True)
hdfs_copied_file = hdfs.load("Resources/upload_dir/upload.txt")
assert hdfs.exists("Resources/upload_dir")
with open('upload_dir/upload.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "second content compare failed"

shutil.rmtree("upload_dir")
hdfs.rmr("Resources/upload_dir")

Started copying local path upload_dir to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources

Finished copying

Started copying local path upload_dir to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/upload_dir

Finished copying

In [63]:
# copy_to_hdfs directory absolute path

if not os.path.exists("upload_dir_absolute"):
    os.mkdir("upload_dir_absolute")
    
assert not hdfs.exists("Resources/upload_dir_absolute")
with open('upload_dir_absolute/upload.txt', 'w') as f:
    f.write("first upload")
hdfs.copy_to_hdfs(os.getcwd() + "/upload_dir_absolute", "Resources")
hdfs_copied_file = hdfs.load("Resources/upload_dir_absolute/upload.txt")
assert hdfs.exists("Resources/upload_dir_absolute")
with open('upload_dir_absolute/upload.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "first content compare failed"

with open('upload_dir_absolute/upload.txt', 'w') as f:
    f.write("second upload")
hdfs.copy_to_hdfs(os.getcwd() + "/upload_dir_absolute", "Resources", overwrite=True)
hdfs_copied_file = hdfs.load("Resources/upload_dir_absolute/upload.txt")
assert hdfs.exists("Resources/upload_dir_absolute")
with open('upload_dir_absolute/upload.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "second content compare failed"

shutil.rmtree("upload_dir_absolute")
hdfs.rmr("Resources/upload_dir_absolute")

Started copying local path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/upload_dir_absolute to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources

Finished copying

Started copying local path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/upload_dir_absolute to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/upload_dir_absolute

Finished copying

In [64]:
#copy_to_local file

# Download first time
hdfs.dump("initial content", "Resources/somefile.txt")
hdfs.copy_to_local("Resources/somefile.txt")
hdfs_copied_file = hdfs.load("Resources/somefile.txt")
with open('somefile.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "first content compare failed"
first_modified = os.path.getmtime("somefile.txt")

# Download second time
hdfs.copy_to_local("Resources/somefile.txt")
hdfs_copied_file = hdfs.load("Resources/somefile.txt")
with open('somefile.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "second content compare failed"
second_modified = os.path.getmtime("somefile.txt")
assert first_modified == second_modified, "modified time not matching"

# Content changing on disk
hdfs.dump("content changed at some point", "Resources/somefile.txt")
hdfs_new_content = hdfs.load("Resources/somefile.txt")
hdfs.copy_to_local("Resources/somefile.txt")
with open('somefile.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_new_content.decode("utf-8") == local_copied_file, "third content compare failed"
third_modified = os.path.getmtime("somefile.txt")
assert not second_modified == third_modified, "modified time not matching"

# Download last time with overwrite, file should have changed on disk
hdfs.copy_to_local("Resources/somefile.txt", overwrite=True)
hdfs_copied_file = hdfs.load("Resources/somefile.txt")
with open('somefile.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "fourth content compare failed"
fourth_modified = os.path.getmtime("somefile.txt")
assert not third_modified == fourth_modified, "modified time not matching"

# Download again to make sure overwrite did not cause problems
hdfs.copy_to_local("Resources/somefile.txt")
hdfs_copied_file = hdfs.load("Resources/somefile.txt")
with open('somefile.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "fifth content compare failed"
fifth_modified = os.path.getmtime("somefile.txt")
assert fourth_modified == fifth_modified, "modified time not matching"

hdfs.rmr("Resources/somefile.txt")
os.remove("somefile.txt")

Started copying hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/somefile.txt to local disk on path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/

Finished copying

File hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/somefile.txt is already localized, skipping download...
Started copying hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/somefile.txt to local disk on path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/

Finished copying

Started copying hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/somefile.txt to local disk on path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949

In [65]:
#copy_to_local directory

assert not os.path.exists("Resources")
hdfs.copy_to_local("Resources")
first_modified = os.path.getmtime("Resources")
assert os.path.exists("Resources")
assert os.path.isdir("Resources")

hdfs.copy_to_local("Resources")
second_modified = os.path.getmtime("Resources")
assert first_modified == second_modified

localized_dir = hdfs.copy_to_local("Resources", overwrite=True)
third_modified = os.path.getmtime("Resources")
assert not second_modified == third_modified
num_files_first = len(os.listdir(localized_dir))

# Add a new file, it should also be localized
hdfs.dump("a wild file appeared", "Resources/newfile.txt")
hdfs.copy_to_local("Resources")
fourth_modified = os.path.getmtime("Resources")
assert first_modified == second_modified
num_files_second = len(os.listdir(localized_dir))
assert (num_files_first + 1) == num_files_second
assert not third_modified == fourth_modified

hdfs.rmr("Resources/newfile.txt")
shutil.rmtree("Resources")

Started copying hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources to local disk on path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/

Finished copying

Full directory subtree already on local disk and unchanged. Set overwrite=True to force download
Started copying hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources to local disk on path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/

Finished copying

Started copying hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources to local disk on path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/

Finished copying

In [66]:
logs_files_md = hdfs.glob("Logs/*.md")
logs_path_names = hdfs.lsl("Logs/")
if hdfs.exists("Logs/test.txt"):
    hdfs.rmr("Logs/test.txt")
assert not hdfs.exists("Logs/test.txt")

In [67]:
hdfs.dump("dummy", "Resources/test.txt")
hdfs.cp("Resources/test.txt", "Logs/")
logs_files = hdfs.ls("Logs/")
assert "test.txt" in ",".join(logs_files)

In [68]:
hdfs.mkdir("Logs/test_dir")
assert hdfs.exists("Logs/test_dir")

In [69]:
logs_files_prior_delete = hdfs.ls("Logs/")
hdfs.rmr("Logs/test_dir")
logs_files_after_delete = hdfs.ls("Logs/")
assert len(logs_files_prior_delete) > len(logs_files_after_delete)

In [70]:
logs_files_prior_move = hdfs.ls("Logs/")
assert "README_dump_test.md" in ",".join(logs_files_prior_move)

In [71]:
hdfs.move("Logs/README_dump_test.md", "Logs/README_dump_test2.md")
logs_files_after_move = hdfs.ls("Logs/")
assert "README_dump_test.md" not in ",".join(logs_files_after_move)
assert "README_dump_test2.md" in ",".join(logs_files_after_move)

In [72]:
logs_files_prior_rename = hdfs.ls("Logs/")
assert "README_dump_test2.md" in ",".join(logs_files_prior_rename)

In [73]:
hdfs.rename("Logs/README_dump_test2.md", "Logs/README_dump_test.md")
logs_files_after_rename = hdfs.ls("Logs/")
assert "Logs/README_dump_test2.md" not in ",".join(logs_files_after_rename)
assert "Logs/README_dump_test.md" in ",".join(logs_files_after_rename)

In [74]:
file_stat = hdfs.stat("Logs/README.md")
hdfs.chmod("Logs/README.md", 775)
file_stat = hdfs.stat("Logs/README.md")
assert 775 == file_stat.st_mode

In [75]:
hdfs.chmod("Logs/README.md", 777)
file_stat = hdfs.stat("Logs/README.md")
assert 777 == file_stat.st_mode

In [76]:
file_owner = file_stat.st_uid
assert hdfs.exists("Logs/")
assert not hdfs.exists("Not_Existing/neither_am_i")

In [77]:
assert hdfs.isdir("Resources")
assert not hdfs.isdir("Resources/README.md")

In [78]:
assert hdfs.isfile("Resources/README.md")
assert not hdfs.isfile("Resources")

In [79]:
hdfs.dump("def simple():\n\treturn 5", "Resources/my_module.py")
py_path = hdfs.add_module("Resources/my_module.py")
assert py_path in sys.path
import my_module
assert my_module.simple() == 5

Started copying hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/my_module.py to local disk on path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/localized_deps

Finished copying

In [80]:
plain = hdfs.get_plain_path("hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/")
assert plain == "/Projects/demo_deep_learning_admin000/Models/"

In [81]:
hdfs.mkdir("Logs/test_delete_dir")
assert hdfs.exists("Logs/test_delete_dir")
hdfs.delete("Logs/test_delete_dir")
assert not hdfs.exists("Logs/test_delete_dir")

## Feature Store Tests

These tests require that you have the following files in the Resources directory:

- `attendances_features.csv`
- `games_features.csv`
- `players_features.csv`
- `season_scores_features.csv`
- `teams_features.csv`

These files can be downloaded from here: `http://snurran.sics.se/hops/hops-util-py_test/`

##### Test Featurestore Create Feature Group Operations (`featurestore.create_featuregroup()`)

In [2]:
def load_fs_sample_data():
    resources_path = hdfs.project_path() + "Resources/"
    games_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "games_features.csv")
    players_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "players_features.csv")
    teams_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "teams_features.csv")
    season_scores_features_df = spark.read.format("csv").option("header", "true").option("inferSchema","true").load(resources_path + "season_scores_features.csv")
    attendances_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "attendances_features.csv")
    return games_features_df,players_features_df,teams_features_df,season_scores_features_df, attendances_features_df
games_features_df,players_features_df,teams_features_df,season_scores_features_df, attendances_features_df = load_fs_sample_data()

In [3]:
featurestore.create_featuregroup(
    games_features_df,
    "games_features",
    description="Features of average season scores for football teams"
)

computing descriptive statistics for : games_features, version: 1
computing feature correlation for: games_features, version: 1
computing feature histograms for: games_features, version: 1
computing cluster analysis for: games_features, version: 1
Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use ittests_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [4]:
featurestore.create_featuregroup(
    teams_features_df,
    "teams_features",
    description="a spanish version of teams_features"
)

computing descriptive statistics for : teams_features, version: 1
computing feature correlation for: teams_features, version: 1
computing feature histograms for: teams_features, version: 1
computing cluster analysis for: teams_features, version: 1
Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use ittests_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [5]:
featurestore.create_featuregroup(
    season_scores_features_df,
    "season_scores_features",
    description="Features of average season scores for football teams"
)

computing descriptive statistics for : season_scores_features, version: 1
computing feature correlation for: season_scores_features, version: 1
computing feature histograms for: season_scores_features, version: 1
computing cluster analysis for: season_scores_features, version: 1
Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use ittests_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [6]:
featurestore.create_featuregroup(
    attendances_features_df,
    "attendances_features",
    description="Features of average attendance of games of football teams"
)

computing descriptive statistics for : attendances_features, version: 1
computing feature correlation for: attendances_features, version: 1
computing feature histograms for: attendances_features, version: 1
computing cluster analysis for: attendances_features, version: 1
Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use ittests_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [7]:
teams_features_1_df = featurestore.get_featuregroup("teams_features")
teams_features_2_df = teams_features_1_df.withColumnRenamed(
    "team_id", "equipo_id").withColumnRenamed(
    "team_budget", "equipo_presupuesto").withColumnRenamed(
    "team_position", "equipo_posicion")

Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM teams_features_1 against offline feature store

In [8]:
featurestore.create_featuregroup(
    teams_features_2_df,
    "teams_features_spanish",
    description="a spanish version of teams_features",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use ittests_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [9]:
featurestore.create_featuregroup(
    teams_features_2_df,
    "teams_features_spanish",
    description="a spanish version of teams_features",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    featurestore=featurestore.project_featurestore(),
    featuregroup_version=1
)

Could not create feature group (url: /hopsworks-api/api/project/183/featurestores/131/featuregroups), server response: 
 HTTP code: 400, HTTP reason: Bad Request, error code: 270089, error msg: The feature group you are trying to create does already exist., user msg: project: ittests, featurestoreId: 131
Traceback (most recent call last):
  File "/srv/hops/anaconda/anaconda/envs/ittests/lib/python3.6/site-packages/hops/featurestore.py", line 648, in create_featuregroup
    online=online, online_types=online_types, offline=offline)
  File "/srv/hops/anaconda/anaconda/envs/ittests/lib/python3.6/site-packages/hops/featurestore_impl/core.py", line 1841, in _do_create_featuregroup
    None, None, online)
  File "/srv/hops/anaconda/anaconda/envs/ittests/lib/python3.6/site-packages/hops/featurestore_impl/rest/rest_rpc.py", line 291, in _create_featuregroup_rest
    resource_url, response.status_code, response.reason, error_code, error_msg, user_msg))
hops.exceptions.RestAPIError: Could not cr

In [10]:
featurestore.create_featuregroup(
    teams_features_2_df,
    "teams_features_spanish",
    description="a spanish version of teams_features",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    featuregroup_version=2
)

Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use ittests_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [11]:
from hops import hdfs
query = "SELECT * FROM games_features_1 WHERE score > 1"
storage_connector = hdfs.project_name() + "_featurestore"
featuregroup_name = "games_features_on_demand"
featurestore.create_on_demand_featuregroup(query, featuregroup_name, storage_connector)

Feature group created successfully

In [12]:
assert "games_features_1" in featurestore.get_featuregroups()
assert "teams_features_1" in featurestore.get_featuregroups()
assert "season_scores_features_1" in featurestore.get_featuregroups()
assert "attendances_features_1" in featurestore.get_featuregroups()
assert "teams_features_spanish_1" in featurestore.get_featuregroups()
assert "teams_features_spanish_2" in featurestore.get_featuregroups()
assert "games_features_on_demand_1" in featurestore.get_featuregroups()

##### Test Featurestore Utility Operations, 

- `featurestore.get_metadata()`,
- `featurestore.project_featurestore()`, 
- `featurestore.get_latest_featuregroup_version()`, 
- `featurestore.get_features_list()`

In [13]:
featurestore.get_featurestore_metadata(update_cache=True)

<hops.featurestore_impl.dao.common.featurestore_metadata.FeaturestoreMetadata object at 0x7f80d5eeb908>

In [14]:
assert featurestore.project_featurestore() == hdfs.project_name() + "_featurestore"

In [15]:
assert featurestore.project_featurestore() in featurestore.get_project_featurestores()

In [16]:
assert len(featurestore.get_project_featurestores()) == 1

In [17]:
assert featurestore.get_latest_featuregroup_version("teams_features_spanish") == 2

In [18]:
assert featurestore.get_latest_featuregroup_version("teams_features") == 1

In [19]:
assert "away_team_id" in featurestore.get_features_list()

In [20]:
assert "home_team_id" in featurestore.get_features_list()

In [21]:
assert (hdfs.project_name() + "_featurestore", 'JDBC') in featurestore.get_storage_connectors()

In [22]:
assert len(featurestore.get_storage_connectors()) >= 3

##### Test Read operations of Features and Feature Groups, 

- `featurestore.get_feature()`, 
- `featurestore.get_features()`, 
- `featurestore.get_featuregroup()`

In [23]:
tmp = featurestore.get_feature("team_budget")
assert tmp.count() == 50
assert len(tmp.columns) == 1
assert "team_budget" in tmp.columns

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 1 feature from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT team_budget FROM teams_features_1 against offline feature store

In [24]:
tmp = featurestore.get_feature(
    "team_budget", 
    featurestore=featurestore.project_featurestore(), 
    featuregroup="teams_features", 
    featuregroup_version = 1,
    dataframe_type = "spark"
)
assert tmp.count() == 50
assert len(tmp.columns) == 1
assert "team_budget" in tmp.columns

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 1 feature from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT team_budget FROM teams_features_1 against offline feature store

In [25]:
tmp = featurestore.get_featuregroup("teams_features")
assert tmp.count() == 50
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns

Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM teams_features_1 against offline feature store

In [26]:
tmp = featurestore.get_featuregroup(
    "teams_features", 
    featurestore=featurestore.project_featurestore(), 
    featuregroup_version = 1,
    dataframe_type = "spark"
)
assert tmp.count() == 50
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns

Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM teams_features_1 against offline feature store

In [27]:
features = ["team_budget", "average_attendance"]
tmp = featurestore.get_features(
    features
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 2 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id` against offline feature store

In [28]:
features = ["teams_features_1.team_budget", "attendances_features_1.average_attendance"]
tmp = featurestore.get_features(features)
assert set(["team_budget", "average_attendance"]) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 2 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT attendances_features_1.average_attendance, teams_features_1.team_budget FROM attendances_features_1 JOIN teams_features_1 ON attendances_features_1.`team_id`=teams_features_1.`team_id` against offline feature store

In [29]:
features = ["team_budget", "average_attendance"]
tmp = featurestore.get_features(
    features,
    featurestore=featurestore.project_featurestore(),
    featuregroups_version_dict={
        "teams_features": 1, 
        "attendances_features": 1
    }
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 2 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT team_budget, average_attendance FROM attendances_features_1 JOIN teams_features_1 ON attendances_features_1.`team_id`=teams_features_1.`team_id` against offline feature store

In [30]:
tmp = featurestore.get_features(
    features,
    featurestore=featurestore.project_featurestore(),
    featuregroups_version_dict={
        "teams_features": 1, 
        "attendances_features": 1
    },
    join_key = "team_id",
    dataframe_type = "spark"
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 2 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id` against offline feature store

In [31]:
features = ["team_budget", "average_attendance",
    "team_position", "sum_attendance"
    ]
tmp = featurestore.get_features(
   features
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 4 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT sum_attendance, team_budget, average_attendance, team_position FROM attendances_features_1 JOIN teams_features_1 ON attendances_features_1.`team_id`=teams_features_1.`team_id` against offline feature store

In [32]:
features = ["team_budget", "team_id"]
tmp = featurestore.get_features(
    features,
    featuregroups_version_dict = {
        "teams_features" : 1
    }
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 2 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT team_budget, team_id FROM teams_features_1 against offline feature store

In [33]:
tmp = featurestore.sql(
    "SELECT team_budget, score " \
    "FROM teams_features_1 JOIN games_features_1 ON " \
    "games_features_1.home_team_id = teams_features_1.team_id")
features = ['team_budget', 'score']
assert set(features) == set(tmp.columns)
assert tmp.count() == 49
assert len(tmp.columns) == len(features)

Running sql: use ittests_featurestore against offline feature store
Running sql: SELECT team_budget, score FROM teams_features_1 JOIN games_features_1 ON games_features_1.home_team_id = teams_features_1.team_id against offline feature store

In [34]:
tmp = featurestore.sql("SELECT * FROM teams_features_1 WHERE team_position < 5")
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns
for x in tmp.toPandas()["team_position"].values:
    assert x < 5

Running sql: use ittests_featurestore against offline feature store
Running sql: SELECT * FROM teams_features_1 WHERE team_position < 5 against offline feature store

In [35]:
tmp = featurestore.sql("SELECT * FROM teams_features_1 WHERE team_position < 5",
                featurestore=featurestore.project_featurestore(), 
                 dataframe_type = "spark")
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns
for x in tmp.toPandas()["team_position"].values:
    assert x < 5

Running sql: use ittests_featurestore against offline feature store
Running sql: SELECT * FROM teams_features_1 WHERE team_position < 5 against offline feature store

#####  Test Insert Operations in Existing Feature Groups, `featurestore.insert_into_featuregroup()`

In [36]:
sqlContext = SQLContext(spark.sparkContext)
schema = StructType([StructField("equipo_id", IntegerType(), True),
                     StructField("equipo_presupuesto", FloatType(), True),
                     StructField("equipo_posicion", IntegerType(), True)
                        ])
sample_df = sqlContext.createDataFrame([(999, 41251.52, 1), (998, 1319.4, 8), (997, 21219.1, 2)], schema)
insert_count = sample_df.count()
assert insert_count == 3

In [37]:
spanish_team_features_df = featurestore.get_featuregroup(
    "teams_features_spanish")
pre_insert_count = spanish_team_features_df.count()
assert pre_insert_count == 50

Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM teams_features_spanish_1 against offline feature store

In [38]:
featurestore.insert_into_featuregroup(
    sample_df, 
    "teams_features_spanish"
)
spanish_team_features_df_updated = featurestore.get_featuregroup(
    "teams_features_spanish")

after_insert_count = spanish_team_features_df_updated.count()
assert after_insert_count == 53

Inserting data into offline feature group teams_features_spanish...
Running sql: use ittests_featurestore against offline feature store
Inserting data into offline feature group teams_features_spanish... [COMPLETE]
Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM teams_features_spanish_1 against offline feature store
Insertion into feature group was successful
Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM teams_features_spanish_1 against offline feature store

In [40]:
featurestore.insert_into_featuregroup(
    sample_df, 
    "teams_features_spanish", 
    featurestore=featurestore.project_featurestore(), 
    featuregroup_version=1, 
    mode="append"
)

after_insert_count2 = featurestore.get_featuregroup("teams_features_spanish").count()
assert after_insert_count2 == 56


Traceback (most recent call last):
AssertionError



In [41]:
featurestore.insert_into_featuregroup(
    sample_df, 
    "teams_features_spanish",
    mode="overwrite")

count_after_overwrite = featurestore.get_featuregroup("teams_features_spanish").count()
assert count_after_overwrite == 3

Inserting data into offline feature group teams_features_spanish...
Running sql: use ittests_featurestore against offline feature store
Inserting data into offline feature group teams_features_spanish... [COMPLETE]
Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM teams_features_spanish_1 against offline feature store
Insertion into feature group was successful
Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM teams_features_spanish_1 against offline feature store

##### Test integration of feature store with Numpy, Pandas and plain Python

In [42]:
pandas_df = featurestore.get_features(["team_budget", "average_attendance"], dataframe_type="pandas")
assert "team_budget" in pandas_df.columns.values
assert "average_attendance" in pandas_df.columns.values
assert len(pandas_df) == 50
assert len(pandas_df.columns.values) == 2
assert isinstance(pandas_df, pd.DataFrame)

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 2 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id` against offline feature store

In [43]:
numpy_df = featurestore.get_features(["team_budget", "average_attendance"], 
                                      dataframe_type="numpy")
assert numpy_df.shape[0] == 50
assert numpy_df.shape[1] == 2
assert isinstance(numpy_df, np.ndarray)

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 2 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id` against offline feature store

In [44]:
python_df = featurestore.get_features(["team_budget", "average_attendance"], 
                                      dataframe_type="python")
assert len(python_df) == 50
assert isinstance(python_df, list)

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 2 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id` against offline feature store

In [45]:
spark_df = featurestore.get_features(["team_budget", "average_attendance"], 
                                      dataframe_type="spark")
assert spark_df.count() == 50
assert isinstance(spark_df, DataFrame)

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 2 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT team_budget, average_attendance FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id` against offline feature store

In [46]:
# Let's rename the columns to differentiate this feature group from existing ones in the feature store
pandas_df.columns = ["team_budget_test", "average_attendance_test"]

featurestore.create_featuregroup(
    pandas_df,
    "pandas_test_example",
    description="test featuregroup created from pandas dataframe",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)
assert "pandas_test_example_1" in featurestore.get_featuregroups()

Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use ittests_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [47]:
count_pre_pandas_insert_overwrite = featurestore.get_featuregroup("pandas_test_example").count()
featurestore.insert_into_featuregroup(
    pandas_df, 
    "pandas_test_example",
    descriptive_statistics=False, 
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    mode="overwrite")
count_after_pandas_insert_overwrite = featurestore.get_featuregroup("pandas_test_example").count()
assert count_pre_pandas_insert_overwrite == count_after_pandas_insert_overwrite

insert_into_featuregroup() got an unexpected keyword argument 'descriptive_statistics'
Traceback (most recent call last):
TypeError: insert_into_featuregroup() got an unexpected keyword argument 'descriptive_statistics'



In [48]:
featurestore.create_featuregroup(
    numpy_df,
    "numpy_test_example",
    description="test featuregroup created from numpy matrix",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)
assert "numpy_test_example_1" in featurestore.get_featuregroups()

Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use ittests_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [50]:
numpy_test_df_count_pre_insert_overwrite = featurestore.get_featuregroup("numpy_test_example", dataframe_type="spark").count()
featurestore.insert_into_featuregroup(
    numpy_df, 
    "numpy_test_example",

    mode="overwrite")
numpy_test_df_count_after_insert_overwrite = featurestore.get_featuregroup("numpy_test_example", dataframe_type="spark").count()
assert numpy_test_df_count_pre_insert_overwrite == numpy_test_df_count_pre_insert_overwrite

Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM numpy_test_example_1 against offline feature store
Inserting data into offline feature group numpy_test_example...
Running sql: use ittests_featurestore against offline feature store
Inserting data into offline feature group numpy_test_example... [COMPLETE]
Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM numpy_test_example_1 against offline feature store
Insertion into feature group was successful
Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM numpy_test_example_1 against offline feature store

In [51]:
featurestore.create_featuregroup(
    python_df,
    "python_test_example",
    description="test featuregroup created from python 2D list",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

python_test_df_count_pre_insert_overwrite = featurestore.get_featuregroup("python_test_example", dataframe_type="spark").count()
assert "python_test_example_1" in featurestore.get_featuregroups()

Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use ittests_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully
Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM python_test_example_1 against offline feature store

In [53]:
featurestore.insert_into_featuregroup(
    python_df, 
    "python_test_example",
    mode="overwrite")

python_test_df_count_after_insert_overwrite = featurestore.get_featuregroup("python_test_example", dataframe_type="spark").count()
assert python_test_df_count_pre_insert_overwrite == python_test_df_count_after_insert_overwrite

Inserting data into offline feature group python_test_example...
Running sql: use ittests_featurestore against offline feature store
Inserting data into offline feature group python_test_example... [COMPLETE]
Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM python_test_example_1 against offline feature store
Insertion into feature group was successful
Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM python_test_example_1 against offline feature store

##### Test update Feature Store Statistics `featurestore.update_featuregroup_stats()`

In [54]:
featurestore.update_featuregroup_stats("teams_features")

Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM teams_features_1 against offline feature store
computing descriptive statistics for : teams_features, version: 1
computing feature correlation for: teams_features, version: 1
computing feature histograms for: teams_features, version: 1
computing cluster analysis for: teams_features, version: 1

In [55]:
featurestore.update_featuregroup_stats(
    "teams_features", 
    featuregroup_version=1, 
    featurestore=featurestore.project_featurestore(), 
    descriptive_statistics=True,
    feature_correlation=True, 
    feature_histograms=True,
    cluster_analysis=True,
    stat_columns=None)

Running sql: use ittests_featurestore against offline feature store
SQL string for the query created successfully
Running sql: SELECT * FROM teams_features_1 against offline feature store
computing descriptive statistics for : teams_features, version: 1
computing feature correlation for: teams_features, version: 1
computing feature histograms for: teams_features, version: 1
computing cluster analysis for: teams_features, version: 1

##### Test Write Training Dataset Operations 

- `featurestore.get_latest_training_dataset_version()`
- `create_training_dataset()`

In [56]:
features_df = featurestore.get_features(
    ["team_budget", "average_attendance",
    "team_position"]
)
latest_version = featurestore.get_latest_training_dataset_version("team_position_prediction")

Running sql: use ittests_featurestore against offline feature store
Logical query plan for getting 3 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT team_budget, average_attendance, team_position FROM teams_features_1 JOIN attendances_features_1 ON teams_features_1.`team_id`=attendances_features_1.`team_id` against offline feature store

In [57]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    training_dataset_version = 1
)

write feature frame, write_mode: overwrite
Training Dataset created successfully

In [58]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_csv",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="csv",
    training_dataset_version= 1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [59]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_tsv",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="tsv",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [60]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_parquet",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="parquet",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [61]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_orc",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="orc",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [62]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_avro",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="avro",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [63]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_hdf5",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="hdf5",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully
  hdf5_file = h5py.File(tf)

In [64]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_npy",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="npy",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

Training Dataset created successfully

In [65]:
# Petastorm is only supported in python 3

PetastormSchema = Unischema('team_position_prediction_petastorm_schema', [
    UnischemaField('team_budget', np.float32, (), ScalarCodec(FloatType()), False),
    UnischemaField('average_attendance', np.float32, (), ScalarCodec(FloatType()), False),
    UnischemaField('team_position', np.int32, (), ScalarCodec(IntegerType()), False)
])

petastorm_args = {
    "schema": PetastormSchema
}

featurestore.create_training_dataset(
    features_df, "team_position_prediction_petastorm",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="petastorm",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None,
    petastorm_args=petastorm_args
)

Training Dataset created successfully

In [66]:
tds = featurestore.get_training_datasets()
assert 'team_position_prediction_1' in tds
assert 'team_position_prediction_csv_1' in tds
assert 'team_position_prediction_tsv_1' in tds
assert 'team_position_prediction_parquet_1' in tds
assert 'team_position_prediction_orc_1' in tds
assert 'team_position_prediction_avro_1' in tds
assert 'team_position_prediction_hdf5_1'in tds
assert 'team_position_prediction_npy_1' in tds
assert 'team_position_prediction_petastorm_1' in tds

##### Test Insert into an existing training dataset, `featurestore.insert_into_training_dataset()`

In [67]:
count_pre_insert = featurestore.get_training_dataset("team_position_prediction_csv").count()
featurestore.insert_into_training_dataset(
    features_df, 
    "team_position_prediction_csv",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    training_dataset_version=featurestore.get_latest_training_dataset_version("team_position_prediction_csv")
)
count_after_insert = featurestore.get_training_dataset("team_position_prediction_csv").count()
assert count_pre_insert == count_after_insert # td only support overwrites

insert_into_training_dataset
Writing Feature Frame, data format: csv
Insertion into training dataset was successful

##### Test Training Dataset Utility Methods

- `featurestore.get_training_dataset_path()`
- `featurestore.get_training_dataset_tf_record_schema`

In [68]:
assert hdfs.project_path() in featurestore.get_training_dataset_path("team_position_prediction_csv")

In [69]:
assert hdfs.project_name() + "_Training_Datasets" in featurestore.get_training_dataset_path("team_position_prediction_csv")

In [70]:
assert "team_position_prediction_csv" in featurestore.get_training_dataset_path("team_position_prediction_csv")

In [71]:
tf_schema = featurestore.get_training_dataset_tf_record_schema("team_position_prediction")
assert tf_schema == {'team_budget': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'average_attendance': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'team_position': tf.FixedLenFeature(shape=[], dtype=tf.int64, default_value=None)}

In [72]:
features_df = featurestore.get_training_dataset("team_position_prediction")
tf_schema = featurestore.get_dataframe_tf_record_schema(features_df)
assert tf_schema == {'team_budget': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'average_attendance': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'team_position': tf.FixedLenFeature(shape=[], dtype=tf.int64, default_value=None)}

##### Test update Training Dataset stats

- `featurestore.update_training_dataset_stats()`

In [73]:
featurestore.update_training_dataset_stats("team_position_prediction")

computing descriptive statistics for : team_position_prediction, version: 1
computing feature correlation for: team_position_prediction, version: 1
computing feature histograms for: team_position_prediction, version: 1
computing cluster analysis for: team_position_prediction, version: 1

In [74]:
featurestore.update_training_dataset_stats(
    "team_position_prediction", 
    training_dataset_version=1, 
    featurestore=featurestore.project_featurestore(), 
    descriptive_statistics=True,
    feature_correlation=True, 
    feature_histograms=True,
    cluster_analysis=True,
    stat_columns=None)

computing descriptive statistics for : team_position_prediction, version: 1
computing feature correlation for: team_position_prediction, version: 1
computing feature histograms for: team_position_prediction, version: 1
computing cluster analysis for: team_position_prediction, version: 1

##### Test Read Training Datasets API `featurestore.get_training_dataset()`

In [None]:
cols = ['team_budget', 'average_attendance', 'team_position']
tmp = featurestore.get_training_dataset("team_position_prediction_csv")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_hdf5")
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_petastorm")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_avro")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_orc")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_tsv")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_npy")
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_parquet")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

##### Test Featurestore Get Statistics

- `featurestore.get_featuregroup_statistics()`
- `featurestore.get_training_dataset_statistics()`

In [None]:
stats = featurestore.get_featuregroup_statistics("teams_features")
assert not stats.cluster_analysis is None
assert not stats.cluster_analysis.clusters is None
assert not stats.cluster_analysis.datapoints is None
assert len(stats.cluster_analysis.clusters) == len(stats.cluster_analysis.datapoints)
assert not stats.cluster_analysis.clusters[0].datapoint_name is None
assert not stats.cluster_analysis.clusters[0].cluster is None
assert not stats.correlation_matrix is None
assert not stats.correlation_matrix.feature_correlations is None
assert len(stats.correlation_matrix.feature_correlations) > 0
assert len(stats.correlation_matrix.feature_correlations) < constants.FEATURE_STORE.MAX_CORRELATION_MATRIX_COLUMNS
assert not stats.correlation_matrix.feature_correlations[0].feature_name is None
assert not stats.correlation_matrix.feature_correlations[0].correlation_values is None
assert len(stats.correlation_matrix.feature_correlations[0].correlation_values) == \
len(stats.correlation_matrix.feature_correlations)
assert not stats.descriptive_stats is None
assert not stats.descriptive_stats.descriptive_stats is None
assert len(stats.descriptive_stats.descriptive_stats) > 0
assert not stats.descriptive_stats.descriptive_stats[0].feature_name is None
assert not stats.descriptive_stats.descriptive_stats[0].metric_values is None
assert len(stats.descriptive_stats.descriptive_stats[0].metric_values) > 0
assert not stats.descriptive_stats.descriptive_stats[0].metric_values[0].metric_name is None
assert not stats.descriptive_stats.descriptive_stats[0].metric_values[0].value is None
assert not stats.feature_histograms is None
assert not stats.feature_histograms.feature_distributions is None
assert len(stats.feature_histograms.feature_distributions) > 0
assert not stats.feature_histograms.feature_distributions[0].feature_name is None
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution is None
assert len(stats.feature_histograms.feature_distributions[0].frequency_distribution) > 0
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution[0].bin is None
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution[0].frequency is None

In [None]:
stats = featurestore.get_training_dataset_statistics("team_position_prediction")
assert not stats.cluster_analysis is None
assert not stats.cluster_analysis.clusters is None
assert not stats.cluster_analysis.datapoints is None
assert len(stats.cluster_analysis.clusters) == len(stats.cluster_analysis.datapoints)
assert not stats.cluster_analysis.clusters[0].datapoint_name is None
assert not stats.cluster_analysis.clusters[0].cluster is None
assert not stats.correlation_matrix is None
assert not stats.correlation_matrix.feature_correlations is None
assert len(stats.correlation_matrix.feature_correlations) > 0
assert len(stats.correlation_matrix.feature_correlations) < constants.FEATURE_STORE.MAX_CORRELATION_MATRIX_COLUMNS
assert not stats.correlation_matrix.feature_correlations[0].feature_name is None
assert not stats.correlation_matrix.feature_correlations[0].correlation_values is None
assert len(stats.correlation_matrix.feature_correlations[0].correlation_values) == len(stats.correlation_matrix.feature_correlations)
assert not stats.descriptive_stats is None
assert not stats.descriptive_stats.descriptive_stats is None
assert len(stats.descriptive_stats.descriptive_stats) > 0
assert not stats.descriptive_stats.descriptive_stats[0].feature_name is None
assert not stats.descriptive_stats.descriptive_stats[0].metric_values is None
assert len(stats.descriptive_stats.descriptive_stats[0].metric_values) > 0
assert not stats.descriptive_stats.descriptive_stats[0].metric_values[0].metric_name is None
assert not stats.descriptive_stats.descriptive_stats[0].metric_values[0].value is None
assert not stats.feature_histograms is None
assert not stats.feature_histograms.feature_distributions is None
assert len(stats.feature_histograms.feature_distributions) > 0
assert not stats.feature_histograms.feature_distributions[0].feature_name is None
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution is None
assert len(stats.feature_histograms.feature_distributions[0].frequency_distribution) > 0
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution[0].bin is None
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution[0].frequency is None

##### Test Featurestore Visualizations

- `featurestore.visualize_featuregroup_distributions()`
- `featurestore.visualize_featuregroup_correlations()`
- `featurestore.visualize_featuregroup_clusters()`
- `featurestore.visualize_featuregroup_descriptive_stats()`
- `featurestore.visualize_training_dataset_distributions()`
- `featurestore.visualize_training_dataset_correlations()`
- `featurestore.visualize_traniing_dataset_clusters()`
- `featurestore.visualize_training_dataset_descriptive_stats()`

In [None]:
fig = featurestore.visualize_featuregroup_distributions("teams_features", plot=False)
fig.savefig("teams_features_distributions.png")

In [None]:
fig = featurestore.visualize_featuregroup_correlations("teams_features", plot=False)
fig.savefig("teams_features_correlations.png")

In [None]:
fig = featurestore.visualize_featuregroup_clusters("teams_features", plot=False)
fig.savefig("teams_features_clusters.png")

In [None]:
desc_stats_df = featurestore.visualize_featuregroup_descriptive_stats("teams_features")
desc_stats_df.head()

In [None]:
fig = featurestore.visualize_training_dataset_distributions("team_position_prediction", plot=False)
fig.savefig("team_position_prediction_distributions.png")

In [None]:
fig = featurestore.visualize_training_dataset_correlations("team_position_prediction", plot=False)
fig.savefig("team_position_prediction_correlations.png")

In [None]:
fig = featurestore.visualize_training_dataset_clusters("team_position_prediction", plot=False)
fig.savefig("team_position_prediction_clusters.png")

In [None]:
desc_stats_df = featurestore.visualize_training_dataset_descriptive_stats("team_position_prediction")
desc_stats_df.head()

##### Cleanup (Delete FS Contents so that next test run works the same)

In [None]:
# Delete feature groups
spark.sql('use ' + featurestore.project_featurestore())
for fg in featurestore.get_featuregroups():
    try:
        spark.sql("drop table " + fg)
    except:
        pass

In [None]:
# Delete training datasets
td_dir = hdfs.project_name() + "_Training_Datasets/"
for td in featurestore.get_training_datasets():
    try:
        hdfs.rmr(td_dir + td)
    except:
        pass

In [None]:
featurestore.get_featurestore_metadata(update_cache=True)
# on demand feature group will still be there.. maybe add delete endpoint in the python SDK?
#assert featurestore.get_featuregroups() == [] 
assert featurestore.get_training_datasets() == []

##### Test Metadata operations of Feature Groups, 

- `featurestore.add_metadata()`, 
- `featurestore.get_metadata()`, 
- `featurestore.remove_metadata()`

In [None]:
featurestore.add_metadata("teams_features", {"attr1" : "attr1 value", "attr2" : "attr2 value"})
md = featurestore.get_metadata("teams_features", ["attr1"])
assert len(md) == 1
assert md["attr1"] == "attr1 value"
md = featurestore.get_metadata("teams_features")
assert len(md) == 2
assert md["attr1"] == "attr1 value"
assert md["attr2"] == "attr2 value"
featurestore.remove_metadata("teams_features", ["attr1"])
md = featurestore.get_metadata("teams_features")
assert len(md) == 1
assert md["attr2"] == "attr2 value"

## Kafka Tests

##### Test default config 

- `kafka.get_default_config()`, 
- `kafka.get_security_protocol()`,
- `kafka.get_broker_endpoints_list()`

In [174]:
config = kafka.get_kafka_default_config()
assert "bootstrap.servers" in config
assert "security.protocol" in config
assert "ssl.ca.location" in config
assert "ssl.key.location" in config
assert "ssl.certificate.location" in config

In [175]:
assert len(kafka.get_security_protocol()) > 0
assert len(kafka.get_broker_endpoints_list()) > 0

## TLS Tests

##### Test access to TLS tokens

- `tls.get_key_store()`
- `tls.get_trust_store()`
- `tls.get_key_store_pwd()`
- `tls.get_trust_store_pwd()`
- `tls.get_client_certificate_location()`
- `tls.get_client_key_location()`
- `tls.get_ca_chain_location()`

In [176]:
assert len(tls.get_key_store()) > 0
assert len(tls.get_trust_store()) > 0
assert len(tls.get_key_store_pwd()) > 0
assert len(tls.get_trust_store_pwd()) > 0
assert len(tls.get_client_certificate_location()) > 0
assert len(tls.get_client_key_location()) > 0
assert len(tls.get_ca_chain_location()) > 0

## Serving Tests

These tests require that you have the following files in the Resources directory:

- `iris_model.knn`
- `iris_flower_classifier.py`
- `mnist`

Where mnist is a directory containing a tensorflow model.

These files can be downloaded from here: `http://snurran.sics.se/hops/hops-util-py_test/`

##### Test Export Model HDFS

In [177]:
model_path_relative = "Resources"

model.export(model_path_relative, "IrisFlowerClassifier", model_version=1, overwrite=True)
model.export(model_path_relative, "IrisFlowerClassifier", model_version=2, overwrite=True, metrics={'accuracy': 21.5, 'loss': 31.3})
model.export(model_path_relative, "IrisFlowerClassifier", model_version=3, overwrite=True, metrics={'accuracy': 0.5})
model.export(model_path_relative, "IrisFlowerClassifier", model_version=4, overwrite=True, metrics={'accuracy': 9})
model.export(model_path_relative, "IrisFlowerClassifier", model_version=5, overwrite=True, metrics={'accuracy': 30})

model_path_abs = hdfs.project_path() + "Resources"

model.export(model_path_abs, "IrisFlowerClassifier_abs", metrics={'accuracy': 10.9})
model.export(model_path_abs, "IrisFlowerClassifier_abs", metrics={'accuracy': 21.5, 'loss': 31.3})
model.export(model_path_abs, "IrisFlowerClassifier_abs", metrics={'accuracy': 0.5})
model.export(model_path_abs, "IrisFlowerClassifier_abs")
model.export(model_path_abs, "IrisFlowerClassifier_abs", metrics={'accuracy': 30})

Exported model IrisFlowerClassifier as version 1 successfully.
Polling IrisFlowerClassifier version 1 for model availability.
Model now available.
Exported model IrisFlowerClassifier as version 2 successfully.
Polling IrisFlowerClassifier version 2 for model availability.
Model now available.
Exported model IrisFlowerClassifier as version 3 successfully.
Polling IrisFlowerClassifier version 3 for model availability.
Model now available.
Exported model IrisFlowerClassifier as version 4 successfully.
Polling IrisFlowerClassifier version 4 for model availability.
Model now available.
Exported model IrisFlowerClassifier as version 5 successfully.
Polling IrisFlowerClassifier version 5 for model availability.
Model now available.
Exported model IrisFlowerClassifier_abs as version 1 successfully.
Polling IrisFlowerClassifier_abs version 1 for model availability.
Model now available.
Exported model IrisFlowerClassifier_abs as version 2 successfully.
Polling IrisFlowerClassifier_abs version 2 

##### Test Export Model Local

In [178]:
local_model_dir = os.getcwd() + '/model'
local_model_file = local_model_dir + '/model.pb'
if not os.path.exists(local_model_dir):
    os.mkdir(local_model_dir)
    f = open(local_model_file, "w")
    f.write("model")
    f.close()
    
model.export(local_model_dir, 'local_model_dir')
model.export('model', 'local_model_dir')

model.export(local_model_file, 'local_model_file')

Started copying local path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/model/model.pb to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/local_model_dir/1

Finished copying

Exported model local_model_dir as version 1 successfully.
Polling local_model_dir version 1 for model availability.
Model now available.
Started copying local path model/model.pb to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/local_model_dir/2

Finished copying

Exported model local_model_dir as version 2 successfully.
Polling local_model_dir version 2 for model availability.
Model now available.
Started copying local path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/6rleftXNS7jE4GPq0aancNC_IQkiuF3dRnRBytwdAR4/appcache/application_1576760828949_0038/container_e01_1576760828949_0038_01_000001/model/model.pb to hdfs path hdfs://10

In [179]:
try:
    model.export(model_path_relative, "IrisFlowerClassifier", model_version=4, overwrite=True, metrics={'accuracy': "not number"})
    assert False
except AssertionError:
    assert True    

In [180]:
try:
    model.export(model_path_abs, "IrisFlowerClassifier", model_version=4, overwrite=True, metrics={1337: "0.5"})
    assert False
except AssertionError:
    assert True    

In [181]:
assert hdfs.exists("Models/IrisFlowerClassifier/1/iris_knn.pkl")
assert hdfs.exists("Models/IrisFlowerClassifier/1/iris_flower_classifier.py")

In [182]:
best_model = model.get_best_model("IrisFlowerClassifier", 'accuracy', Metric.MAX)
print(best_model)
assert best_model['name'] == "IrisFlowerClassifier"
assert best_model['version'] == 5

{'type': 'modelDTO', 'href': 'https://hopsworks0.logicalclocks.com:8181/hopsworks-api/api/project/120/models/IrisFlowerClassifier_5', 'created': 1576776038946, 'description': 'A collection of models for IrisFlowerClassifier', 'environment': ['Models/IrisFlowerClassifier/5/environment_cpu_1576776042073.yml'], 'experimentId': 'application_1576760828949_0038_73', 'id': 'IrisFlowerClassifier_5', 'metrics': {'accuracy': '30'}, 'name': 'IrisFlowerClassifier', 'program': 'Models/IrisFlowerClassifier/5/program.ipynb', 'userFullName': 'Admin Admin', 'version': 5}

In [183]:
best_model = model.get_best_model("IrisFlowerClassifier", 'accuracy', Metric.MIN)
print(best_model)
assert best_model['name'] == "IrisFlowerClassifier"
assert best_model['version'] == 3

{'type': 'modelDTO', 'href': 'https://hopsworks0.logicalclocks.com:8181/hopsworks-api/api/project/120/models/IrisFlowerClassifier_3', 'created': 1576776022276, 'description': 'A collection of models for IrisFlowerClassifier', 'environment': ['Models/IrisFlowerClassifier/3/environment_cpu_1576776025357.yml'], 'experimentId': 'application_1576760828949_0038_73', 'id': 'IrisFlowerClassifier_3', 'metrics': {'accuracy': '0.5'}, 'name': 'IrisFlowerClassifier', 'program': 'Models/IrisFlowerClassifier/3/program.ipynb', 'userFullName': 'Admin Admin', 'version': 3}

In [184]:
try:
    best_model = model.get_best_model("not_exist", 'accuracy', Metric.MIN)
    assert False
except model.ModelNotFound:
    assert True

In [185]:
try:
    best_model = model.get_best_model("IrisFlowerClassifier", 'not_exist', Metric.MIN)
    assert False
except model.ModelNotFound:
    assert True

In [186]:
try:
    model.get_model("mnist", 3)
    assert False
except model.ModelNotFound:
    assert True

In [None]:
model_path = "Resources/mnist/1"
model.export(model_path, "mnist", model_version=1, overwrite=True)
model_path = "Resources/mnist/2"
model.export(model_path, "mnist", model_version=2, overwrite=True)
assert hdfs.exists("Models/mnist/1")
assert hdfs.exists("Models/mnist/2")

##### Test Serve Model

In [188]:
script_path = "Models/IrisFlowerClassifier/1/iris_flower_classifier.py"
serving.exists("IrisFlowerClassifier")
if serving.exists("IrisFlowerClassifier"):
    serving.delete("IrisFlowerClassifier")
serving.create_or_update(script_path, "IrisFlowerClassifier", model_server="PYTHON", model_version=1)

Deleting serving with name: IrisFlowerClassifier...
Serving with name: IrisFlowerClassifier successfully deleted
Creating a serving for model IrisFlowerClassifier ...
Serving for model IrisFlowerClassifier successfully created

In [189]:
assert serving.exists("IrisFlowerClassifier")

In [190]:
model_path = "Models/mnist/2/"
if serving.exists("mnist"):
    serving.delete("mnist")
serving.create_or_update(model_path, "mnist", model_server="TENSORFLOW_SERVING", serving_tool="KSERVE", model_version=2)

Creating a serving for model mnist ...
Serving for model mnist successfully created

In [191]:
assert serving.exists("mnist")

##### Test Data Access Operations on Model

In [192]:
assert serving.get_id("IrisFlowerClassifier") is not None
assert serving.get_id("mnist") is not None
assert "Models/IrisFlowerClassifier/1/iris_flower_classifier.py" in serving.get_artifact_path("IrisFlowerClassifier")
assert "Models/mnist/2/" in serving.get_artifact_path("mnist")
assert serving.get_model_server("IrisFlowerClassifier") == "PYTHON"
assert serving.get_model_server("mnist") == "TENSORFLOW_SERVING"
assert serving.get_version("IrisFlowerClassifier") == 1
assert serving.get_version("mnist") == 2
assert serving.get_kafka_topic("IrisFlowerClassifier") is not None
assert serving.get_kafka_topic("mnist") is not None
assert serving.get_status("IrisFlowerClassifier") == "Stopped"
assert serving.get_status("mnist") == "Stopped"

##### Test Start/Stop Serving

In [193]:
serving.start("IrisFlowerClassifier")
serving.start("mnist")

Starting serving with name: IrisFlowerClassifier...
Serving with name: IrisFlowerClassifier successfully started
Starting serving with name: mnist...
Serving with name: mnist successfully started

In [194]:
assert serving.get_status("IrisFlowerClassifier") == "Running"
assert serving.get_status("mnist") == "Running"

In [195]:
serving.stop("IrisFlowerClassifier")
serving.stop("mnist")

Stopping serving with name: IrisFlowerClassifier...
Serving with name: IrisFlowerClassifier successfully stopped
Stopping serving with name: mnist...
Serving with name: mnist successfully stopped

In [196]:
assert serving.get_status("IrisFlowerClassifier") == "Stopped"
assert serving.get_status("mnist") == "Stopped"

##### Test Send Inference Requests

In [197]:
serving.start("IrisFlowerClassifier")
serving.start("mnist")

Starting serving with name: IrisFlowerClassifier...
Serving with name: IrisFlowerClassifier successfully started
Starting serving with name: mnist...
Serving with name: mnist successfully started

In [198]:
for i in range(20):
    data = {"inputs" : [[random.uniform(1, 8) for i in range(4)]]}
    response = serving.make_inference_request("IrisFlowerClassifier", data)
    print(response)
    assert response is not None
    assert "predictions" or "prediction" in response

Could not create or update serving (url: /hopsworks-api/api/project/120/inference/models/IrisFlowerClassifier:predict), server response: 
 HTTP code: 500, HTTP reason: Internal Server Error, error code: 250007, error msg: Serving instance internal error, user msg: <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>500 Internal Server Error</title>
<h1>Internal Server Error</h1>
<p>The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.</p>

Traceback (most recent call last):
  File "/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/serving.py", line 527, in make_inference_request
    return _make_inference_request_rest(serving_name, data, verb)
  File "/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/serving.py", line 562, in _make_inference_request_rest
    error_code, error_msg, user_msg))
hops.exceptions.RestAPIError: Could

In [199]:
for i in range(20):
    data = {
                "signature_name": 'predict_images',
                "instances": [np.random.rand(784).tolist()]
            }
    response = serving.make_inference_request("mnist", data)
    print(response)
    assert response is not None
    assert "predictions" in response

{'predictions': [[0.000496011868, 1.18008785e-08, 0.201137573, 0.580856144, 2.75102479e-07, 0.214045227, 0.00014854352, 4.37322706e-05, 0.00325523899, 1.73046719e-05]]}
{'predictions': [[0.000641128456, 1.4078928e-09, 0.0745601952, 0.490354359, 8.57360305e-07, 0.387143135, 0.00227252091, 6.10170246e-05, 0.0447829627, 0.000183804674]]}
{'predictions': [[0.000418708398, 2.07440487e-09, 0.145863384, 0.377752811, 1.90975516e-06, 0.469116181, 0.000256557396, 5.82233115e-05, 0.00651905406, 1.31911293e-05]]}
{'predictions': [[4.32615634e-05, 2.3104592e-08, 0.115783177, 0.704663873, 1.35617867e-07, 0.174041361, 4.74509507e-05, 4.05858827e-06, 0.00541397836, 2.67096084e-06]]}
{'predictions': [[0.00078638579, 8.21143598e-09, 0.357391894, 0.280190051, 1.31995034e-06, 0.35193646, 0.000322822307, 3.73325965e-05, 0.00927347504, 6.03416083e-05]]}
{'predictions': [[0.00021984303, 5.10015497e-09, 0.081924893, 0.460112602, 3.47095443e-06, 0.427535713, 0.0067972932, 1.43612915e-05, 0.0232881252, 0.000103

##### Test Kafka Inference Log

In [200]:
topic = serving.get_kafka_topic("IrisFlowerClassifier")
config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [topic]
consumer.subscribe(topics)
json_schema = kafka.get_schema(topic)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

In [201]:
for i in range(0, 10):
    msg = consumer.poll(timeout=1.5)
    if msg is not None:
        value = msg.value()
        event_dict = kafka.parse_avro_msg(value, avro_schema)
        assert "modelName" in event_dict
        assert "requestTimestamp" in event_dict
        assert "modelServer" in event_dict
        assert "servingTool" in event_dict
        assert "inferenceResponse" in event_dict
        assert event_dict["modelName"] == "IrisFlowerClassifier"
        assert event_dict["modelServer"] == "PYTHON"
        assert event_dict["servingTool"] == "DEFAULT"

In [202]:
topic = serving.get_kafka_topic("mnist")
config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [topic]
consumer.subscribe(topics)
json_schema = kafka.get_schema(topic)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

In [203]:
for i in range(0, 10):
    msg = consumer.poll(timeout=1.5)
    if msg is not None:
        value = msg.value()
        event_dict = kafka.parse_avro_msg(value, avro_schema)
        assert "modelName" in event_dict
        assert "requestTimestamp" in event_dict
        assert "modelServer" in event_dict
        assert "servingTool" in event_dict
        assert "inferenceResponse" in event_dict
        assert event_dict["modelName"] == "mnist"
        assert event_dict["modelServer"] == "TENSORFLOW_SERVING"
        assert event_dict["servingTool"] == "KSERVE"

##### Test Delete Serving

In [204]:
serving.delete("IrisFlowerClassifier")
serving.delete("mnist")

Deleting serving with name: IrisFlowerClassifier...
Serving with name: IrisFlowerClassifier successfully deleted
Deleting serving with name: mnist...
Serving with name: mnist successfully deleted

In [205]:
assert not serving.exists("IrisFlowerClassifier")
assert not serving.exists("mnist")

## Pandas and Numpy helper

In [206]:
from hops import pandas_helper as pandas
import pandas as pd

lst = ['Geeks', 'For', 'Geeks', 'is', 'portal', 'for', 'Geeks']

data = {'Name':['Tom', 'nick', 'krish', 'jack'], 'Age':[20, 21, 19, 18]}

data1 = {'Name':['Jai', 'Princi', 'Gaurav', 'Anuj'],
        'Age':[27, 24, 22, 32],
        'Address':['Delhi', 'Kanpur', 'Allahabad', 'Kannauj'],
        'Qualification':['Msc', 'MA', 'MCA', 'Phd']}

pandas_df =  pd.DataFrame(data)
pandas.write_csv("Resources/team-pandas.csv", pandas_df)
pandas.write_parquet("Resources/team-pandas.parquet", pandas_df)
pandas.write_json("Resources/team-pandas.json", pandas_df)

test_df = pandas.read_csv("Resources/team-pandas.csv")
test_df.count()

test_df = pandas.read_json("Resources/team-pandas.json")
test_df.count()

#test_df = pandas.read_parquet("Resources/team-pandas.parquet")
#test_df.count()

Name    4
Age     4
dtype: int64

In [207]:
from hops import numpy_helper as numpy
import numpy as np

numpy_df = np.array([1, 2, 3])
x = np.arange(10)

numpy_df.shape

numpy.save("Resources/numpy-path.npy", numpy_df)
numpy.savez("Resources/numpy-path.npz", numpy_df, x )
numpy.savez("Resources/numpy-compressed.npz", numpy_df, x )

npzfile = numpy.load("Resources/numpy-path.npz")
npzfile.files

compressed = numpy.load("Resources/numpy-compressed.npz")
compressed.files

Started copying local path numpy-path.npy to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources//numpy-path.npy

Finished copying

Started copying local path numpy-path.npz to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources//numpy-path.npz

Finished copying

Started copying local path numpy-compressed.npz to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources//numpy-compressed.npz

Finished copying

File hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/numpy-path.npz is already localized, skipping download...
File hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/numpy-compressed.npz is already localized, skipping download...
['arr_0', 'arr_1']

## Upload REST API tests

Test of the UploadService client

In [None]:
import os
from hops import project, dataset

# Create a a file locally and upload it to the Resources dataset
filename = "test.txt"
lines = 1000000
with open(filename, "w") as file: # Use file to refer to the file object
    text = "0123456789\n" #10 byte string
    #write a 1MB rows (~10MB file)
    for i in range (0, lines):
        file.write(text)

# Create API key
response = util.send_request("POST", "/hopsworks-api/api/users/apiKey?name=test-key&scope=DATASET_VIEW&scope=DATASET_CREATE&scope=DATASET_DELETE&scope=PROJECT")
if response.status_code <300:
    print(response)
    response_body = response.json()
    print(response_body)
    api_key = response_body['key']
    print(api_key)

    # Write API key to local file
    with open("api_key", "w") as file:
        file.write(api_key)

project.connect(hdfs.project_name(), api_key="api_key")

dataset.upload(filename, "Models")
# Unset this env var so next cells will use JWT
if 'API_KEY' in os.environ:
    del os.environ['API_KEY']

hdfs.copy_to_local(os.path.join("Resources", filename), overwrite=True)
# Copy the file from hdfs and assert number of lines
assert len(open(filename).readlines(  )) == lines

## Download REST API tests

Test of the DownloadService client

In [None]:
import os
from hops import project, dataset

# Create a file locally and upload it to the Resources dataset
filename = "test.txt"
lines = 1000000
with open(filename, "w") as file: # Use file to refer to the file object
    text = "0123456789\n" #10 byte string
    #write a 1MB rows (~10MB file)
    for i in range (0, lines):
        file.write(text)

# Copy the file to HDFS
hdfs.copy_to_hdfs(filename, "Resources", overwrite=True)

# Create API key
response = util.send_request("POST", "/hopsworks-api/api/users/apiKey?name=test-key&scope=DATASET_VIEW")
if response.status_code <300:
    print(response)
    response_body = response.json()
    print(response_body)
    api_key = response_body['key']
    print(api_key)

    # Write API key to local file
    with open("api_key", "w") as file:
        file.write(api_key)

project.connect(hdfs.project_name(), api_key="api_key")

remote_path = "Projects/" + hdfs.project_name() + "/Resources/" + filename 
downloaded_filename = "downloaded_test.txt"
dataset.download(remote_path, downloaded_filename, chunk_size=2)
# Unset this env var so next cells will use JWT
if 'API_KEY' in os.environ:
    del os.environ['API_KEY']

# Assert the number of lines
assert len(open(downloaded_filename).readlines()) == lines