# `hops-util-py` Integration Tests

This notebook can be converted to a python file and submitted as a spark job for integration tests

## Imports

In [None]:
from hops import experiment, hdfs, tensorboard, devices, kafka, featurestore, tls, util, serving, constants
import stat
import os
import shutil
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, IntegerType, FloatType
import pandas as pd
import numpy as np
import datetime
import time
from pyspark.sql import DataFrame
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.sql import SparkSession
import tensorflow as tf
import sys
import random
from confluent_kafka import Producer, Consumer, KafkaError

## Experiment API Tests

In [None]:
def exp_asserts():
    from hops import tensorboard
    from hops import devices
    from hops import hdfs
    import os
    assert tensorboard.logdir() != None
    assert devices.get_num_gpus() >= 0
    assert hdfs.project_path() == hdfs.project_path(hdfs.project_name())
    if tensorboard.local_logdir_bool:
        assert "hdfs://" not in tensorboard.logdir()
        assert os.path.exists(tensorboard.logdir())
    else:
        assert "hdfs://" in tensorboard.logdir()
        assert hdfs.exists(tensorboard.logdir())

In [None]:
def no_ret():
    exp_asserts()

In [None]:
def no_ret_params(a, b):
    exp_asserts()

In [None]:
def single_ret_no_name():
    exp_asserts()
    return 10

In [None]:
def single_ret_no_name_params(a, b):
    exp_asserts()
    return (a+b)

In [None]:
def single_ret_path():
    exp_asserts()
    f = open('testfile.txt', 'w')
    f.write('stuff happened')
    f.close()
    return {'logfile': 'testfile.txt'}

In [None]:
def single_ret_path_params(a, b):
    exp_asserts()
    f = open('testfile.txt', 'w')
    f.write('stuff happened')
    f.close()
    return {'logfile': 'testfile.txt'}

In [None]:
def single_ret_val():
    exp_asserts()
    return {'value': 10}

In [None]:
def single_ret_val_params(a, b):
    exp_asserts()
    return {'value': a+b}

In [None]:
def multi_ret():
    exp_asserts()
    f = open('testfile.txt', 'w')
    f.write('stuff happened')
    f.close()
    return {'value': 10, 'morevals': 0.5, 'logfile': 'testfile.txt'}

In [None]:
def multi_ret_params(a, b):
    exp_asserts()
    f = open('testfile.txt', 'w')
    f.write('stuff happened')
    f.close()
    return {'value': a, 'morevals': b, 'logfile': 'testfile.txt', 'diagram': 'img.png'}

##### Test `experiment.launch`

In [None]:
params={'a' [-55, 43], 'b': [89, -13]}

logdir, hp, metric = experiment.launch(launch_no_ret, local_logdir=False)
logdir, hp, metric = experiment.launch(launch_no_ret, local_logdir=True)
logdir, hp, metric = experiment.launch(launch_no_ret_params, params, local_logdir=True)
logdir, hp, metric = experiment.launch(launch_no_ret_params, params, local_logdir=False)

logdir, hp, metric = experiment.launch(launch_single_ret_no_name, local_logdir=False, name='some custom name')
logdir, hp, metric = experiment.launch(launch_single_ret_no_name, local_logdir=True)
logdir, hp, metric = experiment.launch(launch_single_ret_no_name_params, params, local_logdir=True)
logdir, hp, metric = experiment.launch(launch_single_ret_no_name_params, params, local_logdir=False)

logdir, hp, metric = experiment.launch(launch_single_ret_path, local_logdir=False, description='some custom desc')
logdir, hp, metric = experiment.launch(launch_single_ret_path, local_logdir=True)
logdir, hp, metric = experiment.launch(launch_single_ret_path_params, params, local_logdir=True)
logdir, hp, metric = experiment.launch(launch_single_ret_path_params, params, local_logdir=False)

logdir, hp, metric = experiment.launch(launch_single_ret_val, local_logdir=False, name='some custom name', description='some custom desc')
logdir, hp, metric = experiment.launch(launch_single_ret_val, local_logdir=True)
logdir, hp, metric = experiment.launch(launch_single_ret_val_params, params, local_logdir=True)
logdir, hp, metric = experiment.launch(launch_single_ret_val_params, params, local_logdir=False)

logdir, hp, metric = experiment.launch(launch_multi_ret, local_logdir=False)
logdir, hp, metric = experiment.launch(launch_multi_ret, local_logdir=True)
logdir, hp, metric = experiment.launch(launch_multi_ret_params, params, local_logdir=False)
logdir, hp, metric = experiment.launch(launch_multi_ret_params, params, local_logdir=True)

##### Test Parallel Experiments `experiment.random_search`

In [None]:
try:
    logdir, hp, metric = experiment.grid_search(launch_no_ret_params, params)
    assert False, 'should fail due to no return value'
except:
    pass

try:
    logdir, hp, metric = experiment.random_search(launch_multi_ret_params, params, samples=2)
    assert False, 'should fail due to optimization_key not being set'
except:
    pass
    
logdir, hp, metric = experiment.grid_search(launch_single_ret_no_name_params, params, local_logdir=True, direction='min')
logdir, hp, metric = experiment.grid_search(launch_single_ret_no_name_params, params, local_logdir=False, direction='max')

logdir, hp, metric = experiment.grid_search(launch_single_ret_val_params, params, local_logdir=True, direction='min')
logdir, hp, metric = experiment.grid_search(launch_single_ret_val_params, params, local_logdir=False, direction='max')

logdir, hp, metric = experiment.grid_search(launch_multi_ret_params, params, local_logdir=False, direction='max')
logdir, hp, metric = experiment.grid_search(launch_multi_ret_params, params, local_logdir=True, direction='min')

##### Test Parallel Experiments `experiment.grid_search`

In [None]:
try:
    logdir, hp, metric = experiment.random_search(launch_no_ret_params, params, samples=2)
    assert False, 'should fail due to no return value'
except:
    pass

try:
    logdir, hp, metric = experiment.random_search(launch_multi_ret_params, params, samples=2)
    assert False, 'should fail due to optimization_key not being set'
except:
    pass

logdir, hp, metric = experiment.random_search(launch_single_ret_no_name_params, params, samples=2, local_logdir=True, direction='min')
logdir, hp, metric = experiment.random_search(launch_single_ret_no_name_params, params, samples=2, local_logdir=False, direction='max')

logdir, hp, metric = experiment.random_search(launch_single_ret_path_params, params, samples=2, local_logdir=True, direction='max')
logdir, hp, metric = experiment.random_search(launch_single_ret_path_params, params, samples=2, local_logdir=False, direction='min')

logdir, hp, metric = experiment.random_search(launch_single_ret_val_params, params, samples=2, local_logdir=True, direction='min')
logdir, hp, metric = experiment.random_search(launch_single_ret_val_params, params, samples=2, local_logdir=False, direction='max')

logdir, hp, metric = experiment.random_search(launch_multi_ret_params, params, samples=2, local_logdir=False, direction='max')
logdir, hp, metric = experiment.random_search(launch_multi_ret_params, params, samples=2, local_logdir=True, direction='min')

##### Test Parallel Experiments `experiment.differential_evolution`

In [2]:
try:
    logdir, hp, metric = experiment.differential_evolution(launch_no_ret_params, params)
    assert False, 'should fail due to no return value'
except:
    pass

try:
    logdir, hp, metric = experiment.differential_evolution(launch_multi_ret_params, params)
    assert False, 'should fail due to optimization_key not being set'
except:
    pass

logdir, hp, metric = experiment.differential_evolution(launch_single_ret_no_name_params, params, local_logdir=True, direction='min')
logdir, hp, metric = experiment.differential_evolution(launch_single_ret_no_name_params, params, local_logdir=False, direction='max')

logdir, hp, metric = experiment.differential_evolution(launch_single_ret_path_params, params, local_logdir=True, direction='max')
logdir, hp, metric = experiment.differential_evolution(launch_single_ret_path_params, params, local_logdir=False, direction='min')

logdir, hp, metric = experiment.differential_evolution(launch_single_ret_val_params, params, local_logdir=True, direction='min')
logdir, hp, metric = experiment.differential_evolution(launch_single_ret_val_params, params,local_logdir=False, direction='max')

logdir, hp, metric = experiment.differential_evolution(launch_multi_ret_params, params, local_logdir=False, direction='max')
logdir, hp, metric = experiment.differential_evolution(launch_multi_ret_params, params, local_logdir=True, direction='min')

name 'experiment' is not defined
Traceback (most recent call last):
NameError: name 'experiment' is not defined



##### Test Distributed Training `experiment.collective_all_reduce`

## HopsFS Tests

##### Test HopsFS operations

- `hdfs.project_user()`
- `hdfs.project_name()`
- `hdfs.project_path()`
- `hdfs.exists()`
- `hdfs.load()`
- `hdfs.copy_to_hdfs()`
- `hdfs.copy_to_local()`
- `hdfs.ls()`
- `hdfs.lsl()`
- `hdfs.glob()`
- `hdfs.cp()`
- `hdfs.rmr()`
- `hdfs.rename()`
- `hdfs.stat()`
- `hdfs.isdir()`
- `hdfs.isfile()`
- `hdfs.add_module()`
- `hdfs.delete()`
- `hdfs.get_plain_path()`

In [None]:
project_user = hdfs.project_user()
project_name = hdfs.project_name()
assert project_name in project_user
project_path = hdfs.project_path()
assert project_name in project_path

In [None]:
logs_README = hdfs.load("Logs/README.md")
assert len(logs_README) > 0

In [None]:
hdfs.dump("test", "Logs/README_dump_test.md")
assert hdfs.exists("Logs/README_dump_test.md")

In [None]:
logs_README_dumped = hdfs.load("Logs/README_dump_test.md")
assert logs_README_dumped.decode("utf-8") == "test"

In [None]:
# copy_to_hdfs file relative path

with open('upload.txt', 'w') as f:
    f.write("first upload")
hdfs.copy_to_hdfs("upload.txt", "Resources")
assert hdfs.exists("Resources/upload.txt")
hdfs_copied_file = hdfs.load("Resources/upload.txt")
assert "first upload" == hdfs_copied_file.decode("utf-8"), "first content does not match"

with open('upload.txt', 'w') as f:
    f.write("second upload")
hdfs.copy_to_hdfs("upload.txt", "Resources", overwrite=True)
assert hdfs.exists("Resources/upload.txt")
hdfs_copied_file = hdfs.load("Resources/upload.txt")
assert "second upload" == hdfs_copied_file.decode("utf-8"), "second content does not match"

try:
    hdfs.copy_to_hdfs("upload.txt", "Resources")
    assert False
except IOError:
    pass

hdfs.rmr("Resources/upload.txt")
os.remove("upload.txt")

In [None]:
# copy_to_hdfs file absolute path

with open('upload_absolute.txt', 'w') as f:
    f.write("first upload")
hdfs.copy_to_hdfs(os.getcwd() + "/upload_absolute.txt", "Resources")
assert hdfs.exists("Resources/upload_absolute.txt")
hdfs_copied_file = hdfs.load("Resources/upload_absolute.txt")
assert "first upload" == hdfs_copied_file.decode("utf-8"), "first content does not match"

with open('upload_absolute.txt', 'w') as f:
    f.write("second upload")
hdfs.copy_to_hdfs(os.getcwd() + "/upload_absolute.txt", "Resources", overwrite=True)
assert hdfs.exists("Resources/upload_absolute.txt")
hdfs_copied_file = hdfs.load("Resources/upload_absolute.txt")
assert "second upload" == hdfs_copied_file.decode("utf-8"), "second content does not match"

try:
    hdfs.copy_to_hdfs("upload_absolute.txt", "Resources")
    assert False
except IOError:
    pass

hdfs.rmr("Resources/upload_absolute.txt")
os.remove("upload_absolute.txt")

In [None]:
# copy_to_hdfs directory relative path

if not os.path.exists("upload_dir"):
    os.mkdir("upload_dir")

assert not hdfs.exists("Resources/upload_dir")
with open('upload_dir/upload.txt', 'w') as f:
    f.write("first upload")
hdfs.copy_to_hdfs("upload_dir", "Resources")
hdfs_copied_file = hdfs.load("Resources/upload_dir/upload.txt")
assert hdfs.exists("Resources/upload_dir")
with open('upload_dir/upload.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "first content compare failed"

with open('upload_dir/upload.txt', 'w') as f:
    f.write("second upload")
hdfs.copy_to_hdfs("upload_dir", "Resources", overwrite=True)
hdfs_copied_file = hdfs.load("Resources/upload_dir/upload.txt")
assert hdfs.exists("Resources/upload_dir")
with open('upload_dir/upload.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "second content compare failed"

shutil.rmtree("upload_dir")
hdfs.rmr("Resources/upload_dir")

In [None]:
# copy_to_hdfs directory absolute path

if not os.path.exists("upload_dir_absolute"):
    os.mkdir("upload_dir_absolute")
    
assert not hdfs.exists("Resources/upload_dir_absolute")
with open('upload_dir_absolute/upload.txt', 'w') as f:
    f.write("first upload")
hdfs.copy_to_hdfs(os.getcwd() + "/upload_dir_absolute", "Resources")
hdfs_copied_file = hdfs.load("Resources/upload_dir_absolute/upload.txt")
assert hdfs.exists("Resources/upload_dir_absolute")
with open('upload_dir_absolute/upload.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "first content compare failed"

with open('upload_dir_absolute/upload.txt', 'w') as f:
    f.write("second upload")
hdfs.copy_to_hdfs(os.getcwd() + "/upload_dir_absolute", "Resources", overwrite=True)
hdfs_copied_file = hdfs.load("Resources/upload_dir_absolute/upload.txt")
assert hdfs.exists("Resources/upload_dir_absolute")
with open('upload_dir_absolute/upload.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "second content compare failed"

shutil.rmtree("upload_dir_absolute")
hdfs.rmr("Resources/upload_dir_absolute")

In [None]:
#copy_to_local file

# Download first time
hdfs.dump("initial content", "Resources/somefile.txt")
hdfs.copy_to_local("Resources/somefile.txt")
hdfs_copied_file = hdfs.load("Resources/somefile.txt")
with open('somefile.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "first content compare failed"
first_modified = os.path.getmtime("somefile.txt")

# Download second time
hdfs.copy_to_local("Resources/somefile.txt")
hdfs_copied_file = hdfs.load("Resources/somefile.txt")
with open('somefile.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "second content compare failed"
second_modified = os.path.getmtime("somefile.txt")
assert first_modified == second_modified, "modified time not matching"

# Content changing on disk
hdfs.dump("content changed at some point", "Resources/somefile.txt")
hdfs_new_content = hdfs.load("Resources/somefile.txt")
hdfs.copy_to_local("Resources/somefile.txt")
with open('somefile.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_new_content.decode("utf-8") == local_copied_file, "third content compare failed"
third_modified = os.path.getmtime("somefile.txt")
assert not second_modified == third_modified, "modified time not matching"

# Download last time with overwrite, file should have changed on disk
hdfs.copy_to_local("Resources/somefile.txt", overwrite=True)
hdfs_copied_file = hdfs.load("Resources/somefile.txt")
with open('somefile.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "fourth content compare failed"
fourth_modified = os.path.getmtime("somefile.txt")
assert not third_modified == fourth_modified, "modified time not matching"

# Download again to make sure overwrite did not cause problems
hdfs.copy_to_local("Resources/somefile.txt")
hdfs_copied_file = hdfs.load("Resources/somefile.txt")
with open('somefile.txt', 'r') as f:
    local_copied_file = f.read()
assert hdfs_copied_file.decode("utf-8") == local_copied_file, "fifth content compare failed"
fifth_modified = os.path.getmtime("somefile.txt")
assert fourth_modified == fifth_modified, "modified time not matching"

hdfs.rmr("Resources/somefile.txt")
os.remove("somefile.txt")

In [None]:
#copy_to_local directory

assert not os.path.exists("Resources")
hdfs.copy_to_local("Resources")
first_modified = os.path.getmtime("Resources")
assert os.path.exists("Resources")
assert os.path.isdir("Resources")

hdfs.copy_to_local("Resources")
second_modified = os.path.getmtime("Resources")
assert first_modified == second_modified

localized_dir = hdfs.copy_to_local("Resources", overwrite=True)
third_modified = os.path.getmtime("Resources")
assert not second_modified == third_modified
num_files_first = len(os.listdir(localized_dir))

# Add a new file, it should also be localized
hdfs.dump("a wild file appeared", "Resources/newfile.txt")
hdfs.copy_to_local("Resources")
fourth_modified = os.path.getmtime("Resources")
assert first_modified == second_modified
num_files_second = len(os.listdir(localized_dir))
assert (num_files_first + 1) == num_files_second
assert not third_modified == fourth_modified

hdfs.rmr("Resources/newfile.txt")
shutil.rmtree("Resources")

In [None]:
logs_files_md = hdfs.glob("Logs/*.md")
logs_path_names = hdfs.lsl("Logs/")
if hdfs.exists("Logs/test.txt"):
    hdfs.rmr("Logs/test.txt")
assert not hdfs.exists("Logs/test.txt")

In [None]:
hdfs.dump("dummy", "Resources/test.txt")
hdfs.cp("Resources/test.txt", "Logs/")
logs_files = hdfs.ls("Logs/")
assert "test.txt" in ",".join(logs_files)

In [None]:
hdfs.mkdir("Logs/test_dir")
assert hdfs.exists("Logs/test_dir")

In [None]:
logs_files_prior_delete = hdfs.ls("Logs/")
hdfs.rmr("Logs/test_dir")
logs_files_after_delete = hdfs.ls("Logs/")
assert len(logs_files_prior_delete) > len(logs_files_after_delete)

In [None]:
logs_files_prior_move = hdfs.ls("Logs/")
assert "README_dump_test.md" in ",".join(logs_files_prior_move)

In [None]:
hdfs.move("Logs/README_dump_test.md", "Logs/README_dump_test2.md")
logs_files_after_move = hdfs.ls("Logs/")
assert "README_dump_test.md" not in ",".join(logs_files_after_move)
assert "README_dump_test2.md" in ",".join(logs_files_after_move)

In [None]:
logs_files_prior_rename = hdfs.ls("Logs/")
assert "README_dump_test2.md" in ",".join(logs_files_prior_rename)

In [None]:
hdfs.rename("Logs/README_dump_test2.md", "Logs/README_dump_test.md")
logs_files_after_rename = hdfs.ls("Logs/")
assert "Logs/README_dump_test2.md" not in ",".join(logs_files_after_rename)
assert "Logs/README_dump_test.md" in ",".join(logs_files_after_rename)

In [None]:
file_stat = hdfs.stat("Logs/README.md")
hdfs.chmod("Logs/README.md", 775)
file_stat = hdfs.stat("Logs/README.md")
assert 775 == file_stat.st_mode

In [None]:
hdfs.chmod("Logs/README.md", 777)
file_stat = hdfs.stat("Logs/README.md")
assert 777 == file_stat.st_mode

In [None]:
file_owner = file_stat.st_uid
assert hdfs.exists("Logs/")
assert not hdfs.exists("Not_Existing/neither_am_i")

In [None]:
assert hdfs.isdir("Resources")
assert not hdfs.isdir("Resources/README.md")

In [None]:
assert hdfs.isfile("Resources/README.md")
assert not hdfs.isfile("Resources")

In [None]:
hdfs.dump("def simple():\n\treturn 5", "Resources/my_module.py")
py_path = hdfs.add_module("Resources/my_module.py")
assert py_path in sys.path
import my_module
assert my_module.simple() == 5

In [None]:
plain = hdfs.get_plain_path("hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/")
assert plain == "/Projects/demo_deep_learning_admin000/Models/"

In [None]:
hdfs.mkdir("Logs/test_delete_dir")
assert hdfs.exists("Logs/test_delete_dir")
hdfs.delete("Logs/test_delete_dir")
assert not hdfs.exists("Logs/test_delete_dir")

## Feature Store Tests

These tests require that you have the following files in the Resources directory:

- `attendances_features.csv`
- `games_features.csv`
- `players_features.csv`
- `season_scores_features.csv`
- `teams_features.csv`

These files can be downloaded from here: `http://snurran.sics.se/hops/hops-util-py_test/`

##### Test Featurestore Create Feature Group Operations (`featurestore.create_featuregroup()`)

In [None]:
def load_fs_sample_data():
    resources_path = hdfs.project_path() + "Resources/"
    games_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "games_features.csv")
    players_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "players_features.csv")
    teams_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "teams_features.csv")
    season_scores_features_df = spark.read.format("csv").option("header", "true").option("inferSchema","true").load(resources_path + "season_scores_features.csv")
    attendances_features_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(resources_path + "attendances_features.csv")
    return games_features_df,players_features_df,teams_features_df,season_scores_features_df, attendances_features_df
games_features_df,players_features_df,teams_features_df,season_scores_features_df, attendances_features_df = load_fs_sample_data()

In [None]:
featurestore.create_featuregroup(
    games_features_df,
    "games_features",
    description="Features of average season scores for football teams"
)

In [None]:
featurestore.create_featuregroup(
    teams_features_df,
    "teams_features",
    description="a spanish version of teams_features"
)

In [None]:
featurestore.create_featuregroup(
    season_scores_features_df,
    "season_scores_features",
    description="Features of average season scores for football teams"
)

In [None]:
featurestore.create_featuregroup(
    attendances_features_df,
    "attendances_features",
    description="Features of average attendance of games of football teams"
)

In [None]:
teams_features_1_df = featurestore.get_featuregroup("teams_features")
teams_features_2_df = teams_features_1_df.withColumnRenamed(
    "team_id", "equipo_id").withColumnRenamed(
    "team_budget", "equipo_presupuesto").withColumnRenamed(
    "team_position", "equipo_posicion")

In [None]:
featurestore.create_featuregroup(
    teams_features_2_df,
    "teams_features_spanish",
    description="a spanish version of teams_features",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

In [None]:
featurestore.create_featuregroup(
    teams_features_2_df,
    "teams_features_spanish",
    description="a spanish version of teams_features",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    featurestore=featurestore.project_featurestore(),
    featuregroup_version=1
)

In [None]:
featurestore.create_featuregroup(
    teams_features_2_df,
    "teams_features_spanish",
    description="a spanish version of teams_features",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    featuregroup_version=2
)

In [None]:
from hops import hdfs
query = "SELECT * FROM games_features_1 WHERE score > 1"
storage_connector = hdfs.project_name() + "_featurestore"
featuregroup_name = "games_features_on_demand"
featurestore.create_on_demand_featuregroup(query, featuregroup_name, storage_connector)

In [None]:
assert "games_features_1" in featurestore.get_featuregroups()
assert "teams_features_1" in featurestore.get_featuregroups()
assert "season_scores_features_1" in featurestore.get_featuregroups()
assert "attendances_features_1" in featurestore.get_featuregroups()
assert "teams_features_spanish_1" in featurestore.get_featuregroups()
assert "teams_features_spanish_2" in featurestore.get_featuregroups()
assert "games_features_on_demand_1" in featurestore.get_featuregroups()

##### Test Featurestore Utility Operations, 

- `featurestore.get_metadata()`,
- `featurestore.project_featurestore()`, 
- `featurestore.get_latest_featuregroup_version()`, 
- `featurestore.get_features_list()`

In [None]:
featurestore.get_featurestore_metadata(update_cache=True)

In [None]:
assert featurestore.project_featurestore() == hdfs.project_name() + "_featurestore"

In [None]:
assert featurestore.project_featurestore() in featurestore.get_project_featurestores()

In [None]:
assert len(featurestore.get_project_featurestores()) == 1

In [None]:
assert featurestore.get_latest_featuregroup_version("teams_features_spanish") == 2

In [None]:
assert featurestore.get_latest_featuregroup_version("teams_features") == 1

In [None]:
assert "away_team_id" in featurestore.get_features_list()

In [None]:
assert "home_team_id" in featurestore.get_features_list()

In [None]:
assert (hdfs.project_name() + "_featurestore", 'JDBC') in featurestore.get_storage_connectors()

In [None]:
assert len(featurestore.get_storage_connectors()) >= 3

##### Test Read operations of Features and Feature Groups, 

- `featurestore.get_feature()`, 
- `featurestore.get_features()`, 
- `featurestore.get_featuregroup()`

In [None]:
tmp = featurestore.get_feature("team_budget")
assert tmp.count() == 50
assert len(tmp.columns) == 1
assert "team_budget" in tmp.columns

In [None]:
tmp = featurestore.get_feature(
    "team_budget", 
    featurestore=featurestore.project_featurestore(), 
    featuregroup="teams_features", 
    featuregroup_version = 1,
    dataframe_type = "spark"
)
assert tmp.count() == 50
assert len(tmp.columns) == 1
assert "team_budget" in tmp.columns

In [None]:
tmp = featurestore.get_featuregroup("teams_features")
assert tmp.count() == 50
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns

In [None]:
tmp = featurestore.get_featuregroup(
    "teams_features", 
    featurestore=featurestore.project_featurestore(), 
    featuregroup_version = 1,
    dataframe_type = "spark"
)
assert tmp.count() == 50
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns

In [None]:
features = ["team_budget", "average_attendance"]
tmp = featurestore.get_features(
    features
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

In [None]:
features = ["teams_features_1.team_budget", "attendances_features_1.average_attendance"]
tmp = featurestore.get_features(features)
assert set(["team_budget", "average_attendance"]) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

In [None]:
features = ["team_budget", "average_attendance"]
tmp = featurestore.get_features(
    features,
    featurestore=featurestore.project_featurestore(),
    featuregroups_version_dict={
        "teams_features": 1, 
        "attendances_features": 1
    }
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

In [None]:
tmp = featurestore.get_features(
    features,
    featurestore=featurestore.project_featurestore(),
    featuregroups_version_dict={
        "teams_features": 1, 
        "attendances_features": 1
    },
    join_key = "team_id",
    dataframe_type = "spark"
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

In [None]:
features = ["team_budget", "average_attendance",
    "team_position", "sum_attendance"
    ]
tmp = featurestore.get_features(
   features
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

In [None]:
features = ["team_budget", "team_id"]
tmp = featurestore.get_features(
    features,
    featuregroups_version_dict = {
        "teams_features" : 1
    }
)
assert set(features) == set(tmp.columns)
assert tmp.count() == 50
assert len(tmp.columns) == len(features)

In [None]:
tmp = featurestore.sql(
    "SELECT team_budget, score " \
    "FROM teams_features_1 JOIN games_features_1 ON " \
    "games_features_1.home_team_id = teams_features_1.team_id")
features = ['team_budget', 'score']
assert set(features) == set(tmp.columns)
assert tmp.count() == 49
assert len(tmp.columns) == len(features)

In [None]:
tmp = featurestore.sql("SELECT * FROM teams_features_1 WHERE team_position < 5")
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns
for x in tmp.toPandas()["team_position"].values:
    assert x < 5

In [None]:
tmp = featurestore.sql("SELECT * FROM teams_features_1 WHERE team_position < 5",
                featurestore=featurestore.project_featurestore(), 
                 dataframe_type = "spark")
assert len(tmp.columns) == 3
assert "team_budget" in tmp.columns
assert "team_id" in tmp.columns
assert "team_position" in tmp.columns
for x in tmp.toPandas()["team_position"].values:
    assert x < 5

#####  Test Insert Operations in Existing Feature Groups, `featurestore.insert_into_featuregroup()`

In [None]:
sqlContext = SQLContext(spark.sparkContext)
schema = StructType([StructField("equipo_id", IntegerType(), True),
                     StructField("equipo_presupuesto", FloatType(), True),
                     StructField("equipo_posicion", IntegerType(), True)
                        ])
sample_df = sqlContext.createDataFrame([(999, 41251.52, 1), (998, 1319.4, 8), (997, 21219.1, 2)], schema)
insert_count = sample_df.count()
assert insert_count == 3

In [None]:
spanish_team_features_df = featurestore.get_featuregroup(
    "teams_features_spanish")
pre_insert_count = spanish_team_features_df.count()
assert pre_insert_count == 50

In [None]:
featurestore.insert_into_featuregroup(
    sample_df, 
    "teams_features_spanish", 
    descriptive_statistics=False, 
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)
spanish_team_features_df_updated = featurestore.get_featuregroup(
    "teams_features_spanish")

after_insert_count = spanish_team_features_df_updated.count()
assert after_insert_count == 53

In [None]:
featurestore.insert_into_featuregroup(
    sample_df, 
    "teams_features_spanish", 
    featurestore=featurestore.project_featurestore(), 
    featuregroup_version=1, 
    mode="append",
    descriptive_statistics=False, 
    feature_correlation=False, 
    feature_histograms=False,
    cluster_analysis=False, 
    stat_columns=None, 
    num_bins=20, 
    corr_method='pearson',
    num_clusters=5
)

after_insert_count2 = featurestore.get_featuregroup("teams_features_spanish").count()
assert after_insert_count2 == 56

In [None]:
featurestore.insert_into_featuregroup(
    sample_df, 
    "teams_features_spanish",
    descriptive_statistics=False, 
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    mode="overwrite")

count_after_overwrite = featurestore.get_featuregroup("teams_features_spanish").count()
assert count_after_overwrite == 3

##### Test integration of feature store with Numpy, Pandas and plain Python

In [None]:
pandas_df = featurestore.get_features(["team_budget", "average_attendance"], dataframe_type="pandas")
assert "team_budget" in pandas_df.columns.values
assert "average_attendance" in pandas_df.columns.values
assert len(pandas_df) == 50
assert len(pandas_df.columns.values) == 2
assert isinstance(pandas_df, pd.DataFrame)

In [None]:
numpy_df = featurestore.get_features(["team_budget", "average_attendance"], 
                                      dataframe_type="numpy")
assert numpy_df.shape[0] == 50
assert numpy_df.shape[1] == 2
assert isinstance(numpy_df, np.ndarray)

In [None]:
python_df = featurestore.get_features(["team_budget", "average_attendance"], 
                                      dataframe_type="python")
assert len(python_df) == 50
assert isinstance(python_df, list)

In [None]:
spark_df = featurestore.get_features(["team_budget", "average_attendance"], 
                                      dataframe_type="spark")
assert spark_df.count() == 50
assert isinstance(spark_df, DataFrame)

In [None]:
# Let's rename the columns to differentiate this feature group from existing ones in the feature store
pandas_df.columns = ["team_budget_test", "average_attendance_test"]

featurestore.create_featuregroup(
    pandas_df,
    "pandas_test_example",
    description="test featuregroup created from pandas dataframe",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)
assert "pandas_test_example_1" in featurestore.get_featuregroups()

In [None]:
count_pre_pandas_insert_overwrite = featurestore.get_featuregroup("pandas_test_example").count()
featurestore.insert_into_featuregroup(
    pandas_df, 
    "pandas_test_example",
    descriptive_statistics=False, 
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    mode="overwrite")
count_after_pandas_insert_overwrite = featurestore.get_featuregroup("pandas_test_example").count()
assert count_pre_pandas_insert_overwrite == count_after_pandas_insert_overwrite

In [None]:
featurestore.create_featuregroup(
    numpy_df,
    "numpy_test_example",
    description="test featuregroup created from numpy matrix",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)
assert "numpy_test_example_1" in featurestore.get_featuregroups()

In [None]:
numpy_test_df_count_pre_insert_overwrite = featurestore.get_featuregroup("numpy_test_example", dataframe_type="spark").count()
featurestore.insert_into_featuregroup(
    numpy_df, 
    "numpy_test_example",
    descriptive_statistics=False, 
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    mode="overwrite")
numpy_test_df_count_after_insert_overwrite = featurestore.get_featuregroup("numpy_test_example", dataframe_type="spark").count()
assert numpy_test_df_count_pre_insert_overwrite == numpy_test_df_count_pre_insert_overwrite

In [None]:
featurestore.create_featuregroup(
    python_df,
    "python_test_example",
    description="test featuregroup created from python 2D list",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

python_test_df_count_pre_insert_overwrite = featurestore.get_featuregroup("python_test_example", dataframe_type="spark").count()
assert "python_test_example_1" in featurestore.get_featuregroups()

In [None]:
featurestore.insert_into_featuregroup(
    python_df, 
    "python_test_example",
    descriptive_statistics=False, 
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    mode="overwrite")

python_test_df_count_after_insert_overwrite = featurestore.get_featuregroup("python_test_example", dataframe_type="spark").count()
assert python_test_df_count_pre_insert_overwrite == python_test_df_count_after_insert_overwrite

##### Test update Feature Store Statistics `featurestore.update_featuregroup_stats()`

In [None]:
featurestore.update_featuregroup_stats("teams_features")

In [None]:
featurestore.update_featuregroup_stats(
    "teams_features", 
    featuregroup_version=1, 
    featurestore=featurestore.project_featurestore(), 
    descriptive_statistics=True,
    feature_correlation=True, 
    feature_histograms=True,
    cluster_analysis=True,
    stat_columns=None)

##### Test Write Training Dataset Operations 

- `featurestore.get_latest_training_dataset_version()`
- `create_training_dataset()`

In [None]:
features_df = featurestore.get_features(
    ["team_budget", "average_attendance",
    "team_position"]
)
latest_version = featurestore.get_latest_training_dataset_version("team_position_prediction")

In [None]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    training_dataset_version = 1
)

In [None]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_csv",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="csv",
    training_dataset_version= 1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

In [None]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_tsv",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="tsv",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

In [None]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_parquet",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="parquet",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

In [None]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_orc",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="orc",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

In [None]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_avro",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="avro",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

In [None]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_hdf5",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="hdf5",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

In [None]:
featurestore.create_training_dataset(
    features_df, "team_position_prediction_npy",
    description="a dataset with features for football teams, used for training a model to predict league-position",
    featurestore=featurestore.project_featurestore(),
    data_format="npy",
    training_dataset_version=1,
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    stat_columns=None)

In [None]:
# Petastorm is only supported in python 3
if sys.version_info[0] >= 3:
    PetastormSchema = Unischema('team_position_prediction_petastorm_schema', [
        UnischemaField('team_budget', np.float32, (), ScalarCodec(FloatType()), False),
        UnischemaField('average_attendance', np.float32, (), ScalarCodec(FloatType()), False),
        UnischemaField('team_position', np.int32, (), ScalarCodec(IntegerType()), False)
    ])

    petastorm_args = {
        "schema": PetastormSchema
    }

    featurestore.create_training_dataset(
        features_df, "team_position_prediction_petastorm",
        description="a dataset with features for football teams, used for training a model to predict league-position",
        featurestore=featurestore.project_featurestore(),
        data_format="petastorm",
        training_dataset_version=1,
        descriptive_statistics=False,
        feature_correlation=False,
        feature_histograms=False,
        cluster_analysis=False,
        stat_columns=None,
        petastorm_args=petastorm_args
    )

In [None]:
tds = featurestore.get_training_datasets()
assert 'team_position_prediction_1' in tds
assert 'team_position_prediction_csv_1' in tds
assert 'team_position_prediction_tsv_1' in tds
assert 'team_position_prediction_parquet_1' in tds
assert 'team_position_prediction_orc_1' in tds
assert 'team_position_prediction_avro_1' in tds
assert 'team_position_prediction_hdf5_1'in tds
assert 'team_position_prediction_npy_1' in tds
if sys.version_info[0] >= 3:
    assert 'team_position_prediction_petastorm_1' in tds

##### Test Insert into an existing training dataset, `featurestore.insert_into_training_dataset()`

In [None]:
count_pre_insert = featurestore.get_training_dataset("team_position_prediction_csv").count()
featurestore.insert_into_training_dataset(
    features_df, 
    "team_position_prediction_csv",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False,
    training_dataset_version=featurestore.get_latest_training_dataset_version("team_position_prediction_csv")
)
count_after_insert = featurestore.get_training_dataset("team_position_prediction_csv").count()
assert count_pre_insert == count_after_insert # td only support overwrites

##### Test Training Dataset Utility Methods

- `featurestore.get_training_dataset_path()`
- `featurestore.get_training_dataset_tf_record_schema`

In [None]:
assert hdfs.project_path() in featurestore.get_training_dataset_path("team_position_prediction_csv")

In [None]:
assert hdfs.project_name() + "_Training_Datasets" in featurestore.get_training_dataset_path("team_position_prediction_csv")

In [None]:
assert "team_position_prediction_csv" in featurestore.get_training_dataset_path("team_position_prediction_csv")

In [None]:
tf_schema = featurestore.get_training_dataset_tf_record_schema("team_position_prediction")
assert tf_schema == {'team_budget': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'average_attendance': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'team_position': tf.FixedLenFeature(shape=[], dtype=tf.int64, default_value=None)}

In [None]:
features_df = featurestore.get_training_dataset("team_position_prediction")
tf_schema = featurestore.get_dataframe_tf_record_schema(features_df)
assert tf_schema == {'team_budget': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'average_attendance': tf.FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 
                     'team_position': tf.FixedLenFeature(shape=[], dtype=tf.int64, default_value=None)}

##### Test update Training Dataset stats

- `featurestore.update_training_dataset_stats()`

In [None]:
featurestore.update_training_dataset_stats("team_position_prediction")

In [None]:
featurestore.update_training_dataset_stats(
    "team_position_prediction", 
    training_dataset_version=1, 
    featurestore=featurestore.project_featurestore(), 
    descriptive_statistics=True,
    feature_correlation=True, 
    feature_histograms=True,
    cluster_analysis=True,
    stat_columns=None)

##### Test Read Training Datasets API `featurestore.get_training_dataset()`

In [None]:
cols = ['team_budget', 'average_attendance', 'team_position']
tmp = featurestore.get_training_dataset("team_position_prediction_csv")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_hdf5")
assert tmp.count() == 50

In [None]:
if sys.version_info[0] >= 3:
    tmp = featurestore.get_training_dataset("team_position_prediction_petastorm")
    assert set(tmp.columns) == set(cols)
    assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_avro")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_orc")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_tsv")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_npy")
assert tmp.count() == 50

In [None]:
tmp = featurestore.get_training_dataset("team_position_prediction_parquet")
assert set(tmp.columns) == set(cols)
assert tmp.count() == 50

##### Test Featurestore Get Statistics

- `featurestore.get_featuregroup_statistics()`
- `featurestore.get_training_dataset_statistics()`

In [None]:
stats = featurestore.get_featuregroup_statistics("teams_features")
assert not stats.cluster_analysis is None
assert not stats.cluster_analysis.clusters is None
assert not stats.cluster_analysis.datapoints is None
assert len(stats.cluster_analysis.clusters) == len(stats.cluster_analysis.datapoints)
assert not stats.cluster_analysis.clusters[0].datapoint_name is None
assert not stats.cluster_analysis.clusters[0].cluster is None
assert not stats.correlation_matrix is None
assert not stats.correlation_matrix.feature_correlations is None
assert len(stats.correlation_matrix.feature_correlations) > 0
assert len(stats.correlation_matrix.feature_correlations) < constants.FEATURE_STORE.MAX_CORRELATION_MATRIX_COLUMNS
assert not stats.correlation_matrix.feature_correlations[0].feature_name is None
assert not stats.correlation_matrix.feature_correlations[0].correlation_values is None
assert len(stats.correlation_matrix.feature_correlations[0].correlation_values) == \
len(stats.correlation_matrix.feature_correlations)
assert not stats.descriptive_stats is None
assert not stats.descriptive_stats.descriptive_stats is None
assert len(stats.descriptive_stats.descriptive_stats) > 0
assert not stats.descriptive_stats.descriptive_stats[0].feature_name is None
assert not stats.descriptive_stats.descriptive_stats[0].metric_values is None
assert len(stats.descriptive_stats.descriptive_stats[0].metric_values) > 0
assert not stats.descriptive_stats.descriptive_stats[0].metric_values[0].metric_name is None
assert not stats.descriptive_stats.descriptive_stats[0].metric_values[0].value is None
assert not stats.feature_histograms is None
assert not stats.feature_histograms.feature_distributions is None
assert len(stats.feature_histograms.feature_distributions) > 0
assert not stats.feature_histograms.feature_distributions[0].feature_name is None
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution is None
assert len(stats.feature_histograms.feature_distributions[0].frequency_distribution) > 0
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution[0].bin is None
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution[0].frequency is None

In [None]:
stats = featurestore.get_training_dataset_statistics("team_position_prediction")
assert not stats.cluster_analysis is None
assert not stats.cluster_analysis.clusters is None
assert not stats.cluster_analysis.datapoints is None
assert len(stats.cluster_analysis.clusters) == len(stats.cluster_analysis.datapoints)
assert not stats.cluster_analysis.clusters[0].datapoint_name is None
assert not stats.cluster_analysis.clusters[0].cluster is None
assert not stats.correlation_matrix is None
assert not stats.correlation_matrix.feature_correlations is None
assert len(stats.correlation_matrix.feature_correlations) > 0
assert len(stats.correlation_matrix.feature_correlations) < constants.FEATURE_STORE.MAX_CORRELATION_MATRIX_COLUMNS
assert not stats.correlation_matrix.feature_correlations[0].feature_name is None
assert not stats.correlation_matrix.feature_correlations[0].correlation_values is None
assert len(stats.correlation_matrix.feature_correlations[0].correlation_values) == len(stats.correlation_matrix.feature_correlations)
assert not stats.descriptive_stats is None
assert not stats.descriptive_stats.descriptive_stats is None
assert len(stats.descriptive_stats.descriptive_stats) > 0
assert not stats.descriptive_stats.descriptive_stats[0].feature_name is None
assert not stats.descriptive_stats.descriptive_stats[0].metric_values is None
assert len(stats.descriptive_stats.descriptive_stats[0].metric_values) > 0
assert not stats.descriptive_stats.descriptive_stats[0].metric_values[0].metric_name is None
assert not stats.descriptive_stats.descriptive_stats[0].metric_values[0].value is None
assert not stats.feature_histograms is None
assert not stats.feature_histograms.feature_distributions is None
assert len(stats.feature_histograms.feature_distributions) > 0
assert not stats.feature_histograms.feature_distributions[0].feature_name is None
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution is None
assert len(stats.feature_histograms.feature_distributions[0].frequency_distribution) > 0
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution[0].bin is None
assert not stats.feature_histograms.feature_distributions[0].frequency_distribution[0].frequency is None

##### Test Featurestore Visualizations

- `featurestore.visualize_featuregroup_distributions()`
- `featurestore.visualize_featuregroup_correlations()`
- `featurestore.visualize_featuregroup_clusters()`
- `featurestore.visualize_featuregroup_descriptive_stats()`
- `featurestore.visualize_training_dataset_distributions()`
- `featurestore.visualize_training_dataset_correlations()`
- `featurestore.visualize_traniing_dataset_clusters()`
- `featurestore.visualize_training_dataset_descriptive_stats()`

In [None]:
fig = featurestore.visualize_featuregroup_distributions("teams_features", plot=False)
fig.savefig("teams_features_distributions.png")

In [None]:
fig = featurestore.visualize_featuregroup_correlations("teams_features", plot=False)
fig.savefig("teams_features_correlations.png")

In [None]:
fig = featurestore.visualize_featuregroup_clusters("teams_features", plot=False)
fig.savefig("teams_features_clusters.png")

In [None]:
desc_stats_df = featurestore.visualize_featuregroup_descriptive_stats("teams_features")
desc_stats_df.head()

In [None]:
fig = featurestore.visualize_training_dataset_distributions("team_position_prediction", plot=False)
fig.savefig("team_position_prediction_distributions.png")

In [None]:
fig = featurestore.visualize_training_dataset_correlations("team_position_prediction", plot=False)
fig.savefig("team_position_prediction_correlations.png")

In [None]:
fig = featurestore.visualize_training_dataset_clusters("team_position_prediction", plot=False)
fig.savefig("team_position_prediction_clusters.png")

In [None]:
desc_stats_df = featurestore.visualize_training_dataset_descriptive_stats("team_position_prediction")
desc_stats_df.head()

##### Cleanup (Delete FS Contents so that next test run works the same)

In [None]:
# Delete feature groups
spark.sql('use ' + featurestore.project_featurestore())
for fg in featurestore.get_featuregroups():
    try:
        spark.sql("drop table " + fg)
    except:
        pass

In [None]:
# Delete training datasets
td_dir = hdfs.project_name() + "_Training_Datasets/"
for td in featurestore.get_training_datasets():
    try:
        hdfs.rmr(td_dir + td)
    except:
        pass

In [None]:
featurestore.get_featurestore_metadata(update_cache=True)
# on demand feature group will still be there.. maybe add delete endpoint in the python SDK?
#assert featurestore.get_featuregroups() == [] 
assert featurestore.get_training_datasets() == []

## Kafka Tests

##### Test default config 

- `kafka.get_default_config()`, 
- `kafka.get_security_protocol()`,
- `kafka.get_broker_endpoints_list()`

In [None]:
config = kafka.get_kafka_default_config()
assert "bootstrap.servers" in config
assert "security.protocol" in config
assert "ssl.ca.location" in config
assert "ssl.key.location" in config
assert "ssl.certificate.location" in config

In [None]:
assert len(kafka.get_security_protocol()) > 0
assert len(kafka.get_broker_endpoints_list()) > 0

## TLS Tests

##### Test access to TLS tokens

- `tls.get_key_store()`
- `tls.get_trust_store()`
- `tls.get_key_store_pwd()`
- `tls.get_trust_store_pwd()`
- `tls.get_client_certificate_location()`
- `tls.get_client_key_location()`
- `tls.get_ca_chain_location()`

In [None]:
assert len(tls.get_key_store()) > 0
assert len(tls.get_trust_store()) > 0
assert len(tls.get_key_store_pwd()) > 0
assert len(tls.get_trust_store_pwd()) > 0
assert len(tls.get_client_certificate_location()) > 0
assert len(tls.get_client_key_location()) > 0
assert len(tls.get_ca_chain_location()) > 0

## Serving Tests

These tests require that you have the following files in the Resources directory:

- `iris_model.knn`
- `iris_flower_classifier.py`
- `mnist`

Where mnist is a directory containing a tensorflow model.

These files can be downloaded from here: `http://snurran.sics.se/hops/hops-util-py_test/`

##### Test Export Model

In [None]:
model_path = "Resources/iris_knn.pkl"
serving.export(model_path, "IrisFlowerClassifier", 1, overwrite=True)
assert hdfs.exists("Models/IrisFlowerClassifier/1/iris_knn.pkl")

In [None]:
model_path = "Resources/iris_flower_classifier.py"
serving.export(model_path, "IrisFlowerClassifier", 1, overwrite=True)
assert hdfs.exists("Models/IrisFlowerClassifier/1/iris_flower_classifier.py")

In [None]:
model_path = "Resources/mnist/"
serving.export(model_path, "mnist", 2, overwrite=True)
assert hdfs.exists("Models/mnist/2/")

##### Test Serve Model

In [None]:
script_path = "Models/IrisFlowerClassifier/1/iris_flower_classifier.py"
if serving.exists("IrisFlowerClassifier"):
    serving.delete("IrisFlowerClassifier")
serving.create_or_update(script_path, "IrisFlowerClassifier", serving_type="SKLEARN", 
                                 model_version=1)

In [None]:
assert serving.exists("IrisFlowerClassifier")

In [None]:
model_path = "Models/mnist/2/"
if serving.exists("mnist"):
    serving.delete("mnist")
serving.create_or_update(model_path, "mnist", serving_type="TENSORFLOW", 
                                 model_version=2)

In [None]:
assert serving.exists("mnist")

##### Test Data Access Operations on Model

In [None]:
assert serving.get_id("IrisFlowerClassifier") is not None
assert serving.get_id("mnist") is not None
assert "Models/IrisFlowerClassifier/1/iris_flower_classifier.py" in serving.get_artifact_path("IrisFlowerClassifier")
assert "Models/mnist/2/" in serving.get_artifact_path("mnist")
assert serving.get_type("IrisFlowerClassifier") == "SKLEARN"
assert serving.get_type("mnist") == "TENSORFLOW"
assert serving.get_version("IrisFlowerClassifier") == 1
assert serving.get_version("mnist") == 2
assert serving.get_kafka_topic("IrisFlowerClassifier") is not None
assert serving.get_kafka_topic("mnist") is not None
assert serving.get_status("IrisFlowerClassifier") == "Stopped"
assert serving.get_status("mnist") == "Stopped"

##### Test Start/Stop Serving

In [None]:
serving.start("IrisFlowerClassifier")
serving.start("mnist")

In [None]:
assert serving.get_status("IrisFlowerClassifier") == "Running"
assert serving.get_status("mnist") == "Running"

In [None]:
serving.stop("IrisFlowerClassifier")
serving.stop("mnist")

In [None]:
assert serving.get_status("IrisFlowerClassifier") == "Stopped"
assert serving.get_status("mnist") == "Stopped"

##### Test Send Inference Requests

In [None]:
serving.start("IrisFlowerClassifier")
serving.start("mnist")

In [None]:
for i in range(20):
    data = {"inputs" : [[random.uniform(1, 8) for i in range(4)]]}
    response = serving.make_inference_request("IrisFlowerClassifier", data)
    assert response is not None
    assert "predictions" or "prediction" in response

In [None]:
for i in range(20):
    data = {
                "signature_name": 'predict_images',
                "instances": [np.random.rand(784).tolist()]
            }
    response = serving.make_inference_request("mnist", data)
    assert response is not None
    assert "predictions" in response

##### Test Kafka Inference Log

In [None]:
# Avro Python is only supported in python 2
if sys.version_info[0] < 3:
    topic = serving.get_kafka_topic("IrisFlowerClassifier")
    config = kafka.get_kafka_default_config()
    config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
    consumer = Consumer(config)
    topics = [topic]
    consumer.subscribe(topics)
    json_schema = kafka.get_schema(topic)
    avro_schema = kafka.convert_json_schema_to_avro(json_schema)

In [None]:
# Avro Python is only supported in python 2
if sys.version_info[0] < 3:
    for i in range(0, 10):
        msg = consumer.poll(timeout=1.5)
        if msg is not None:
            value = msg.value()
            event_dict = kafka.parse_avro_msg(value, avro_schema)
            assert "modelName" in event_dict
            assert "requestTimestamp" in event_dict
            assert "servingType" in event_dict
            assert "inferenceResponse" in event_dict
            assert event_dict["modelName"] == "IrisFlowerClassifier"
            assert event_dict["servingType"] == "SKLEARN"

In [None]:
# Avro Python is only supported in python 2
if sys.version_info[0] < 3:
    topic = serving.get_kafka_topic("mnist")
    config = kafka.get_kafka_default_config()
    config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
    consumer = Consumer(config)
    topics = [topic]
    consumer.subscribe(topics)
    json_schema = kafka.get_schema(topic)
    avro_schema = kafka.convert_json_schema_to_avro(json_schema)

In [None]:
# Avro Python is only supported in python 2
if sys.version_info[0] < 3:
    for i in range(0, 10):
        msg = consumer.poll(timeout=1.5)
        if msg is not None:
            value = msg.value()
            event_dict = kafka.parse_avro_msg(value, avro_schema)
            assert "modelName" in event_dict
            assert "requestTimestamp" in event_dict
            assert "servingType" in event_dict
            assert "inferenceResponse" in event_dict
            assert event_dict["modelName"] == "mnist"
            assert event_dict["servingType"] == "TENSORFLOW"

##### Test Delete Serving

In [None]:
serving.delete("IrisFlowerClassifier")
serving.delete("mnist")

In [None]:
assert not serving.exists("IrisFlowerClassifier")
assert not serving.exists("mnist")