# Iris ML Streaming

In [179]:
from hops import jobs, hdfs, serving, featurestore
import tensorflow as tf
from functools import reduce
import time, random
import numpy as np

FILE_NAME = 'model-monitoring-1.0-SNAPSHOT.jar'
IRIS_RESOURCES_DIR_NAME = "Resources/Iris/"
IRIS_RESOURCES_DIR = "hdfs:///Projects/" + hdfs.project_name() + "/" + IRIS_RESOURCES_DIR_NAME
APP_PATH = IRIS_RESOURCES_DIR + FILE_NAME
IRIS_MODEL_NAME="Iris"
IRIS_TRAIN_DATASET_NAME = "iris_train_dataset"
IRIS_FG_NAME = "iris_train_all_features"

# Structured Streaming
JOB_NAME = 'iris_ml_monitoring_struct'
CLASS_NAME = 'io.hops.ml.monitoring.examples.iris.IrisMLMonitoring'

## Spark streaming job

### Job config

In [180]:
def get_spark_dyn_alloc_config(dyn_alloc_enabled=True, dyn_alloc_min_exec=1, dyn_alloc_max_exec=2, dyn_alloc_init_exec=1):
    return { "spark.dynamicAllocation.enabled": dyn_alloc_enabled, "spark.dynamicAllocation.minExecutors": dyn_alloc_min_exec,
              "spark.dynamicAllocation.maxExecutors": dyn_alloc_max_exec, "spark.dynamicAllocation.initialExecutors": dyn_alloc_init_exec }

def get_spark_job_config(dyn_alloc_config, exec_instances=1, exec_gpus=0, exec_cores=1, exec_mem=2048, tf_num_ps=1, black_list_enabled=False):
    config = { "spark.executor.instances": exec_instances, "spark.executor.cores": exec_cores, "spark.executor.memory": exec_mem,
            "spark.executor.gpus": exec_gpus, "spark.tensorflow.num.ps": tf_num_ps, "spark.blacklist.enabled": black_list_enabled }
    config.update(dyn_alloc_config)
    return config

def get_job_config(app_path, main_class, experiment_type="EXPERIMENT", schedule=None, local_resources=[], dist_strategy="COLLECTIVE_ALL_REDUCE", spark_config=None):
    config = { 'appPath': app_path, 'mainClass': main_class, 'experimentType': experiment_type, 'distributionStrategy': dist_strategy, 'schedule': schedule, 'localResources': local_resources }
    if spark_config:
        base_spark_config = {'type': 'sparkJobConfiguration', 'amQueue': 'default', 'amMemory': 2048, 'amVCores': 1, 'jobType': 'SPARK',
                             'mainClass': main_class}
        config.update(base_spark_config)
        config.update(spark_config)
    return config

### Create monitoring job

In [181]:
# generic job config
spk_jb_dyn_alloc_conf = get_spark_dyn_alloc_config()
spk_jb_config = get_spark_job_config(spk_jb_dyn_alloc_conf)
job_config = get_job_config(APP_PATH, CLASS_NAME, spark_config=spk_jb_config)

# check job existance
executions = jobs.get_executions(JOB_NAME, "")
if executions:
    print("Job '{}' already exists".format(JOB_NAME))
else:
    # create streaming job
    response = jobs.create_job(JOB_NAME, job_config)
    if response and response['id']:
        print("Job created with ID", response['id'])
    else:
        print("Something went wrong")

Job created with ID 1089

## Simulate requests

### Start monitoring job

In [182]:
# job arguments:
# NOTE: Avoid doubles
job_timeout = 10*60 # 3m (seconds)
window_duration = 4*1000 # 4s (milliseconds)
slide_duration = 2*1000 # 2s (milliseconds)
watermark_delay = 2*1000 # 2s (milliseconds)
max_request_delay = 8 # seconds

kfk_topic = serving.get_kafka_topic(IRIS_MODEL_NAME)
job_args = "{} {} {} {} {}".format(kfk_topic, job_timeout, window_duration, slide_duration, watermark_delay)

In [183]:
# check executions
executions = jobs.get_executions(JOB_NAME, "")
job_execution_id = None
if executions['count'] != 0:    
    for item in executions['items']:
        if item['finalStatus'] == "UNDEFINED":
            job_execution_id = item['id']
            print("Job '{}' already running with ID {}".format(JOB_NAME, job_execution_id))
            print("State: {} - Args: '{}'".format(item['state'], item['args']))
            break

# start job if necessary
if job_execution_id is None:    
    response = jobs.start_job(JOB_NAME, job_args)
    job_execution_id = response['id']
    print("Job execution started with ID", job_execution_id)
    print("State: {} - Args: '{}'".format(response['state'], response['args']))

Job execution started with ID 1194
State: INITIALIZING - Args: 'Iris-inf7733 600 4000 2000 2000'

In [184]:
# see all executions
response = jobs.get_executions(JOB_NAME, "")
print("All executions:", response['count'])
for execution in response['items']:
    print("Job execution with ID {}, State: {} - Args: {}".format(execution['id'], execution['state'], execution['args']))

All executions: 1
Job execution with ID 1194, State: INITIALIZING - Args: Iris-inf7733 600 4000 2000 2000

### Start served model

In [185]:
# verify model is served and running
if serving.get_status(IRIS_MODEL_NAME) == 'Stopped':
    serving.start(IRIS_MODEL_NAME)
    time.sleep(10) # Let the serving startup correctly
else:
    print("Model '{}' already running".format(IRIS_MODEL_NAME))

Model 'Iris' already running

### Check train data statistics

In [186]:
def get_stats(name, store_type):
    if store_type == 'FEATUREGROUP':
        return featurestore.get_featuregroup_statistics(name)
    elif store_type == 'TRAINING_DATASET':
        return featurestore.get_training_dataset_statistics(name)
    raise Exception('Unknown store type')

def get_clusters(name, store_type, stats=None):
    stats = stats or get_stats(name, store_type)
    cl_an = stats.cluster_analysis
    clusters = cl_an.clusters
    return [(cl.datapoint_name, cl.cluster) for cl in clusters]

def get_correlation_matrix(name, store_type, stats=None):
    stats = stats or get_stats(name, store_type)
    features = []
    correlations = []
    row_feas = []
    for fea_corr in stats.correlation_matrix.feature_correlations:
        row_feas.append(fea_corr.feature_name)
        col_corrs = []
        for corr_val in fea_corr.correlation_values:
            if len(correlations) == 0: features.append(corr_val.feature_name)
            col_corrs.append(corr_val.correlation)
        correlations.append(col_corrs)
    row_idxs = list(map(lambda f: row_feas.index(f), features))
    correlations = np.array(correlations)[row_idxs,:]
    return features, correlations

def get_descriptive_stats(name, store_type, stats=None):
    stats = stats or get_stats(name, store_type)
    
    def merge_dicts(x,y):
        x.update(y)
        return x
    
    desc_stats = {}
    for st in stats.descriptive_stats.descriptive_stats:
        mv_dicts = list(map(lambda mv: {mv.metric_name: mv.value}, st.metric_values))
        desc_stats[st.feature_name] = reduce(merge_dicts, mv_dicts)
    return desc_stats

def get_feature_histograms(name, store_type, stats=None):
    stats = stats or get_stats(name, store_type)
    fea_hist = {}
    for fea_dist in stats.feature_histograms.feature_distributions:
        fea_hist[fea_dist.feature_name] = list(map(lambda d: vars(d), fea_dist.frequency_distribution))
    return fea_hist

In [187]:
# stats
fg_stats = featurestore.get_featuregroup_statistics(IRIS_FG_NAME)
td_stats = featurestore.get_training_dataset_statistics(IRIS_TRAIN_DATASET_NAME)

In [188]:
# clusters
td_clusters = get_clusters(IRIS_TRAIN_DATASET_NAME, 'TRAINING_DATASET', stats=td_stats)
fg_clusters = get_clusters(IRIS_FG_NAME, 'FEATUREGROUP', stats=fg_stats)

In [189]:
# correlation matrix
td_features, td_correlations = get_correlation_matrix(IRIS_TRAIN_DATASET_NAME, 'TRAINING_DATASET', stats=td_stats)
fg_features, fg_correlations = get_correlation_matrix(IRIS_FG_NAME, 'FEATUREGROUP', stats=fg_stats)

In [190]:
# descriptive statistics
td_desc_stats = get_descriptive_stats(IRIS_TRAIN_DATASET_NAME, 'TRAINING_DATASET', stats=td_stats)
fg_desc_stats = get_descriptive_stats(IRIS_FG_NAME, 'FEATUREGROUP', stats=fg_stats)

In [191]:
# feature histograms
td_feature_hist = get_feature_histograms(IRIS_TRAIN_DATASET_NAME, 'TRAINING_DATASET', stats=td_stats)
fg_feature_hist = get_feature_histograms(IRIS_FG_NAME, 'FEATUREGROUP', stats=fg_stats)

In [192]:
# statistics per feature
feature_stats = {}

In [193]:
print(td_desc_stats)

{'species': {'count': 120.0, 'mean': 1.0, 'stddev': 0.84016806, 'min': 0.0, 'max': 2.0}, 'petal_width': {'count': 120.0, 'mean': 1.1966667, 'stddev': 0.7820393, 'min': 0.1, 'max': 2.5}, 'petal_length': {'count': 120.0, 'mean': 3.7391667, 'stddev': 1.8221004, 'min': 1.0, 'max': 6.9}, 'sepal_width': {'count': 120.0, 'mean': 3.065, 'stddev': 0.42715594, 'min': 2.0, 'max': 4.4}, 'sepal_length': {'count': 120.0, 'mean': 5.845, 'stddev': 0.86857843, 'min': 4.4, 'max': 7.9}}

In [194]:
print(td_feature_hist)

{'species': [{'bin': '0.0', 'frequency': 42}, {'bin': '0.1', 'frequency': 0}, {'bin': '0.2', 'frequency': 0}, {'bin': '0.30000000000000004', 'frequency': 0}, {'bin': '0.4', 'frequency': 0}, {'bin': '0.5', 'frequency': 0}, {'bin': '0.6000000000000001', 'frequency': 0}, {'bin': '0.7000000000000001', 'frequency': 0}, {'bin': '0.8', 'frequency': 0}, {'bin': '0.9', 'frequency': 0}, {'bin': '1.0', 'frequency': 36}, {'bin': '1.1', 'frequency': 0}, {'bin': '1.2000000000000002', 'frequency': 0}, {'bin': '1.3', 'frequency': 0}, {'bin': '1.4000000000000001', 'frequency': 0}, {'bin': '1.5', 'frequency': 0}, {'bin': '1.6', 'frequency': 0}, {'bin': '1.7000000000000002', 'frequency': 0}, {'bin': '1.8', 'frequency': 0}, {'bin': '1.9000000000000001', 'frequency': 42}], 'petal_width': [{'bin': '0.10000000149011612', 'frequency': 27}, {'bin': '0.22000000141561032', 'frequency': 7}, {'bin': '0.34000000134110453', 'frequency': 7}, {'bin': '0.4600000012665987', 'frequency': 0}, {'bin': '0.5800000011920929',

### Send requests

In [195]:
def generate_instance(verbose=False):
    sl = round(np.random.uniform(3,9), 1)
    sw = round(np.random.uniform(1,6), 1)
    pl = round(np.random.uniform(0.1,8), 1)
    pw = round(np.random.uniform(0.1,3.5), 1)
    print("Request: ", [sl, sw, pl, pw])
    return [sl, sw, pl, pw]

In [196]:
def send_request(n_instances, signature_name, verbose=False):
    instances = [generate_instance(verbose=verbose) for i in range(n_instances)]
    data = { "signature_name": signature_name,
             "instances": instances }
    response = serving.make_inference_request(IRIS_MODEL_NAME, data)
    return response['predictions']

In [None]:
N_REQUESTS = 120

time.sleep(15) # Let the job initiate completely

for i in range(N_REQUESTS):
    time.sleep(round(np.random.uniform(0, max_request_delay), 2))
    # choose api randomly
    signature = random.choice([tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY, 'predict_instances'])
    # choose nº instances randomly
    n_instances = random.randint(1, 10)
    # send request
    preds = send_request(n_instances, signature, verbose=False)

time.sleep(job_timeout) # wait until job finishes

#### Statistics

In [None]:
import pyarrow.parquet as pq
from hops import hdfs

LOGS_STATS_DIR = IRIS_RESOURCES_DIR + kfk_topic + "-stats-parquet/"
logs_stats_parquet_file = spark.read.parquet(LOGS_STATS_DIR + "*.parquet")

In [None]:
logs_stats_parquet_file.createOrReplaceTempView("logs_stats_parquet_file")
desc_stats_df = spark.sql("SELECT window, feature, min, max, mean, avg, count, stddev FROM logs_stats_parquet_file ORDER BY window")
distr_stats_df = spark.sql("SELECT window, feature, distr FROM logs_stats_parquet_file ORDER BY window")
corr_stats_df = spark.sql("SELECT window, feature, corr FROM logs_stats_parquet_file ORDER BY window")
cov_stats_df = spark.sql("SELECT window, feature, cov FROM logs_stats_parquet_file ORDER BY window")

In [None]:
print(desc_stats_df.show(5, truncate=False))

In [None]:
print(distr_stats_df.show(1, truncate=False))

In [None]:
print(corr_stats_df.show(5, truncate=False))

In [None]:
print(cov_stats_df.show(5, truncate=False))

#### Outliers

In [None]:
import pyarrow.parquet as pq
from hops import hdfs

LOGS_OUTLIERS_DIR = IRIS_RESOURCES_DIR + kfk_topic + "-outliers-parquet/"
logs_outliers_parquet_file = spark.read.parquet(LOGS_OUTLIERS_DIR + "*.parquet")

In [None]:
logs_outliers_parquet_file.createOrReplaceTempView("logs_outliers_parquet_file")
outliers_df = spark.sql("SELECT * FROM logs_outliers_parquet_file ORDER BY window ASC, feature ASC")

In [None]:
print(outliers_df.show(5, truncate=False))

#### Drift

In [None]:
import pyarrow.parquet as pq
from hops import hdfs

LOGS_DRIFT_DIR = IRIS_RESOURCES_DIR + kfk_topic + "-drift-parquet/"
logs_drift_parquet_file = spark.read.parquet(LOGS_DRIFT_DIR + "*.parquet")

In [None]:
logs_drift_parquet_file.createOrReplaceTempView("logs_drift_parquet_file")
drift_df = spark.sql("SELECT * FROM logs_drift_parquet_file ORDER BY window")

In [None]:
print(drift_df.show(5, truncate=False))

### Visualize logs

In [None]:
from pyspark.sql.functions import col, unix_timestamp, explode
    
distr_df = distr_stats_df \
    .withColumn("window_start", unix_timestamp(col("window.start")) * 1000) \
    .withColumn("window_end", unix_timestamp(col("window.end")) * 1000) \
    .drop("window") \
    .select(col("window_start"), col("window_end"), col("feature"), explode("distr")) \
    .select(col("window_start"), col("window_end"), col("feature").cast("string"), col("key").cast("float"), col("value").cast("float"))
distr_df.printSchema()

proc_drift_df = drift_df \
    .withColumn("window_start", unix_timestamp(col("window.start")) * 1000) \
    .withColumn("window_end", unix_timestamp(col("window.end")) * 1000) \
    .drop("window") \
    .select(col("window_start"), col("window_end"), col("feature").cast("string"), col("drift").cast("string"), col("value").cast("float"))

proc_drift_df.printSchema()

proc_drift_df.show(5)

In [None]:
%%local
if 'distr_df' in locals() or 'distr_df' in globals(): del distr_df
if 'proc_drift_df' in locals() or 'proc_drift_df' in globals(): del proc_drift_df

In [None]:
%%spark -o distr_df

In [None]:
%%spark -o proc_drift_df

In [None]:
%%local
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# fix column types after conversion from spark df
distr_df.feature = distr_df.feature.astype("string")
proc_drift_df.feature = proc_drift_df.feature.astype("string")
proc_drift_df.drift = proc_drift_df.drift.astype("string")
proc_drift_df.value = proc_drift_df.value.values.astype("float")

In [None]:
%%local

# Feature distributions

f, axes = plt.subplots(2, 2, figsize=(20, 10))
for ax, feature in zip(axes.flat, distr_df["feature"].unique()):
    feature_distr_df = distr_df[distr_df["feature"]==feature]
    plot = sns.barplot(x=round(feature_distr_df["key"], 2), y=feature_distr_df["value"], ax=ax)
    plot.set_xticklabels(plot.get_xticklabels(), visible=True, rotation=50)

In [None]:
%%local

# Drift

f, axes = plt.subplots(1, 3, figsize=(40, 10), sharex=True)
for ax, drift in zip(axes.flat, proc_drift_df["drift"].unique()):
    features_drift_df = proc_drift_df[proc_drift_df["drift"]==drift]
    ax = sns.lineplot(x=features_drift_df["window_start"], y=features_drift_df["value"], hue=features_drift_df["feature"], ax=ax)
    xticks = ax.get_xticks()
    ax.set_xticklabels([pd.to_datetime(tm, unit='ms').strftime('%Y-%m-%d\n %H:%M:%S') for tm in xticks], rotation=50)

### Stop served model

In [None]:
# stop the model
if serving.get_status(IRIS_MODEL_NAME) != 'Stopped':
    serving.stop(IRIS_MODEL_NAME)
    print("Model '{}' stopped".format(IRIS_MODEL_NAME))
else:
    print("Model '{}' already stopped".format(IRIS_MODEL_NAME))


### Stop monitoring job

> **WARNING**: Currently 'stop_job' method does not work. Url is not built properly, data is not serialized and header is not added.

> The url "/hopsworks-api/api/project/119/jobs/iris_ml_monitoring_dstream/executions/status" is missing the execution number.

> It should be "/hopsworks-api/api/project/119/jobs/iris_ml_monitoring_dstream/executions/<EXECUTOR_NUMBER>/status".

In [None]:
# NOT WORKING

# stop job
# response = jobs.stop_job(JOB_NAME)
# print(response)

In [None]:
# Source: https://github.com/logicalclocks/hops-util-py/blob/7804a0d6734fe6e8a23c2598547316d40776e94c/hops/jobs.py#L96

# Modification of stop_job method
from hops import constants, util, hdfs
from hops.exceptions import RestAPIError
import json
def stop_job(name, execution_id):
    """
    Stop the current execution of the job.
    Returns:
        The job status.
    """
    headers = {constants.HTTP_CONFIG.HTTP_CONTENT_TYPE: constants.HTTP_CONFIG.HTTP_APPLICATION_JSON}
    method = constants.HTTP_CONFIG.HTTP_PUT
    resource_url = constants.DELIMITERS.SLASH_DELIMITER + \
                   constants.REST_CONFIG.HOPSWORKS_REST_RESOURCE + constants.DELIMITERS.SLASH_DELIMITER + \
                   constants.REST_CONFIG.HOPSWORKS_PROJECT_RESOURCE + constants.DELIMITERS.SLASH_DELIMITER + \
                   hdfs.project_id() + constants.DELIMITERS.SLASH_DELIMITER + \
                   constants.REST_CONFIG.HOPSWORKS_JOBS_RESOURCE + constants.DELIMITERS.SLASH_DELIMITER + \
                   name + constants.DELIMITERS.SLASH_DELIMITER + \
                   constants.REST_CONFIG.HOPSWORKS_EXECUTIONS_RESOURCE + constants.DELIMITERS.SLASH_DELIMITER + \
                   str(execution_id) + constants.DELIMITERS.SLASH_DELIMITER + \
                   "status"

    status = {"status":"stopped"}
    response = util.send_request(method, resource_url, data=json.dumps(status), headers=headers)
    response_object = response.json()
    if response.status_code >= 400:
        error_code, error_msg, user_msg = util._parse_rest_error(response_object)
        raise RestAPIError("Could not perform action on job's execution (url: {}), server response: \n "
                           "HTTP code: {}, HTTP reason: {}, error code: {}, error msg: {}, user msg: {}".format(
            resource_url, response.status_code, response.reason, error_code, error_msg, user_msg))
    return response_object

In [None]:
# stop the job
executions = jobs.get_executions(JOB_NAME, "")
for item in executions['items']:
    if item['id'] == job_execution_id and item['finalStatus'] == 'UNDEFINED':
        response = stop_job(JOB_NAME, job_execution_id)
        print("JOB execution with ID {} stopped when: \n - Duration: {} - Progress: {}".format(job_execution_id, response['duration'], response['progress']))
    else:
        print("JOB execution with ID {} already stopped: \n - Duration: {} - Progress: {} - Final status: {} - State: {}".format(job_execution_id, item['duration'], item['progress'], item['finalStatus'], item['state']))