---
title: "Inference Analysis"
date: 2021-04-25
type: technical_note
draft: false
---

# Monitor the Prediction Logs

![overview-6.png](./images/overview-6.png)

In other to monitor the prediction logs in a streaming fashion, we can run a streaming job from the Hopsworks UI that reads the predictions logs from the Kafka topic specified previously, performs analysis on these logs and stores statistics, outliers and drift detection metrics into another Kafka topic, Parquet files or Csv files.

### Start the Monitoring Job

To achieve this, we need to create a streaming job using the jar file `job-1.0-SNAPSHOT.jar` located together with the demo notebooks and the following job configuration:

- **Main class name:** `io.hops.ml.monitoring.job.Monitor`
- **Default arguments:** `--conf card_fraud_monitoring_job_config.json`

Then, in advance configuration add the json file with name `card_fraud_monitoring_job_config.json` stored together with the demo notebooks. You can customize the monitoring job by modifying this configuration file. Among other things, you can define which statistics to compute, the algorithms for detecting data drift or where to store the resulting analysis.

Once the monitoring job is running and the previous notebook has already made some predictions, we can access the statistics, outliers and drift detection that are continuously computed.

In [1]:
from hops import hdfs
import pyarrow.parquet as pq
from hops import kafka
from hops import tls
from confluent_kafka import Producer, Consumer
import json

import pandas as pd
pd.options.display.max_columns = None
pd.options.display.max_rows = None
pd.set_option('display.max_colwidth', None)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
64,application_1623853832952_0049,pyspark,idle,Link,Link


SparkSession available as 'spark'.


### Inference Statistics

Read inference statistics from parquet files

In [2]:
MONITORING_DIR = "hdfs:///Projects/" + hdfs.project_name() + "/Resources/CardFraudDetection/Monitoring/"
LOGS_STATS_DIR =  MONITORING_DIR + "credit_card_activity_stats-parquet/"
hdfs.mkdir(LOGS_STATS_DIR)

In [3]:
credit_card_activity_stats = spark.read.parquet(LOGS_STATS_DIR + "*.parquet")

In [4]:
credit_card_activity_stats.createOrReplaceTempView("credit_card_activity_stats")

In [5]:
desc_stats_df = spark.sql("SELECT window, feature, min, max, mean, stddev FROM credit_card_activity_stats ORDER BY window")
distr_stats_df = spark.sql("SELECT feature, distr FROM credit_card_activity_stats ORDER BY window")
corr_stats_df = spark.sql("SELECT window, feature, corr FROM credit_card_activity_stats ORDER BY window")
cov_stats_df = spark.sql("SELECT feature, cov FROM credit_card_activity_stats ORDER BY window")

#### Descriptive statistics

In [6]:
print(desc_stats_df.show(6, truncate=False))

+------------------------------------------+-----------------+------------------+------------------+----+------+
|window                                    |feature          |min               |max               |mean|stddev|
+------------------------------------------+-----------------+------------------+------------------+----+------+
|{2021-06-22 13:00:44, 2021-06-22 13:00:50}|num_trans_per_1h |1.0005            |2.4701648761537442|0.05|0.27  |
|{2021-06-22 13:00:44, 2021-06-22 13:00:50}|avg_amt_per_12h  |1.0005            |1.3102870163717324|0.01|0.1   |
|{2021-06-22 13:00:44, 2021-06-22 13:00:50}|avg_amt_per_1h   |1.00099           |5.791895          |0.17|1.35  |
|{2021-06-22 13:00:44, 2021-06-22 13:00:50}|avg_amt_per_10m  |1.001             |1.0015            |0.0 |0.05  |
|{2021-06-22 13:00:44, 2021-06-22 13:00:50}|stdev_amt_per_12h|1.0105183333333334|2.2248025         |0.04|0.33  |
|{2021-06-22 13:00:44, 2021-06-22 13:00:50}|stdev_amt_per_1h |1.000745          |4.4910049999999

#### Distributions

In [7]:
print(distr_stats_df.show(6, truncate=False))

+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|feature          |distr                                                                                                                                                                  |
+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|num_trans_per_1h |{1.0016000509262085 -> 0.0, 1.0010000467300415 -> 0.0, 1.001900053024292 -> 0.0, 1.0022000551223755 -> 0.0, 1.001300048828125 -> 0.0}                                  |
|avg_amt_per_12h  |{1.5285710096359253 -> 0.0, 2.5619930028915405 -> 0.0, 3.0787039995193481 -> 0.0, 1.0118600130081177 -> 2.0, 2.0452820062637329 -> 0.0}                                |
|avg_amt_per_1h   |{1.0005899667739868 -> 24.0, 5.1820502281

#### Correlations

In [8]:
print(corr_stats_df.show(6, truncate=False))

+------------------------------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|window                                    |feature          |corr                                                                                                                                                                                                                |
+------------------------------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{2021-06-22 13:00:44, 2021-06-22 13:00:50}|num_trans_per_1h |{avg_amt_per_10m -> -0.05, avg_amt_per_12h -> -0.05, stdev_amt_per_1h -> -0.11, avg_amt_per_1h -> 0.01, num_tr

#### Covariance

In [9]:
print(cov_stats_df.show(6, truncate=False))

+-----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|feature          |cov                                                                                                                                                                                                                                                    |
+-----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|num_trans_per_1h |{avg_amt_per_10m -> 6361.52, avg_amt_per_12h -> -124049.62, stdev_amt_per_1h -> -2562591.0, avg_amt_per_1h -> 2809013.47, num_trans_per_12h -> 9551.43, stdev_amt_per_12h -> 1517

## Outliers and Data Drift Detection (kafka)

In [10]:
def get_consumer(topic):
    config = kafka.get_kafka_default_config()
    config['default.topic.config'] = {'auto.offset.reset': 'latest'}
    consumer = Consumer(config)
    consumer.subscribe([topic])
    return consumer

In [11]:
def poll(consumer, n=2):
    df = pd.DataFrame([])
    for i in range(0, n):
        msg = consumer.poll(timeout=5.0)
        if msg is not None:
            value = msg.value()
            try: 
                d = json.loads(value.decode('utf-8'))
                df_msg = pd.DataFrame(d.items()).transpose()
                df_msg.columns = df_msg.iloc[0]
                df = df.append(df_msg.drop(df_msg.index[[0]]))
            except Exception as e:
                print("A message was read but there was an error parsing it")
                print(e)
    return df

### Outliers detected

In [12]:
outliers_consumer = get_consumer("credit_card_activity_outliers")

In [13]:
outliers = poll(outliers_consumer, 20)

In [14]:
outliers.head(10)

0            feature    value  type           outlier  \
1  stdev_amt_per_12h    1.003   min  descriptiveStats   
1  num_trans_per_10m  2.50457  mean  descriptiveStats   
1  num_trans_per_10m  2.50457   max  descriptiveStats   
1  stdev_amt_per_10m  3.04124  mean  descriptiveStats   
1    avg_amt_per_12h    1.001   min  descriptiveStats   
1   num_trans_per_1h   1.0005   min  descriptiveStats   
1  num_trans_per_12h    1.001   min  descriptiveStats   
1  stdev_amt_per_12h   1.0045   min  descriptiveStats   
1  num_trans_per_10m  1.18522  mean  descriptiveStats   
1  num_trans_per_10m  1.18522   max  descriptiveStats   

0               requestTime             detectionTime  
1  2021-06-17T13:58:19.000Z  2021-06-17T14:15:20.382Z  
1  2021-06-17T13:58:19.000Z  2021-06-17T14:15:20.382Z  
1  2021-06-17T13:58:19.000Z  2021-06-17T14:15:20.382Z  
1  2021-06-17T13:58:19.000Z  2021-06-17T14:15:20.382Z  
1  2021-06-17T13:58:19.000Z  2021-06-17T14:15:20.386Z  
1  2021-06-17T13:58:19.000Z  2021-06

### Data drift detected

In [25]:
drift_consumer = get_consumer("credit_card_activity_drift")

In [28]:
drift = poll(drift_consumer, 10)

In [29]:
drift.head(5)

0                                                                    window  \
1  {'start': '2021-06-17T14:13:36.000Z', 'end': '2021-06-17T14:13:42.000Z'}   
1  {'start': '2021-06-17T14:13:36.000Z', 'end': '2021-06-17T14:13:42.000Z'}   
1  {'start': '2021-06-17T14:13:36.000Z', 'end': '2021-06-17T14:13:42.000Z'}   
1  {'start': '2021-06-17T14:13:36.000Z', 'end': '2021-06-17T14:13:42.000Z'}   
1  {'start': '2021-06-17T14:13:36.000Z', 'end': '2021-06-17T14:13:42.000Z'}   

0            feature            drift     value             detectionTime  
1   num_trans_per_1h      wasserstein  0.733333  2021-06-17T14:15:54.584Z  
1   num_trans_per_1h  kullbackLeibler  0.972924  2021-06-17T14:15:54.584Z  
1   num_trans_per_1h    jensenShannon  0.282642  2021-06-17T14:15:54.584Z  
1  num_trans_per_12h      wasserstein         2  2021-06-17T14:15:54.584Z  
1  num_trans_per_12h  kullbackLeibler   1.60944  2021-06-17T14:15:54.585Z