In [None]:
!pip install -r requirements.txt

# Creating the Kafka Clients

In [None]:
from kafka import KafkaConsumer, TopicPartition, KafkaProducer
import json

def create_producer():
    return KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

def create_consumer(topic):
    consumer = KafkaConsumer(bootstrap_servers='localhost:9092', 
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))
    # Manually assign partitions
    # https://github.com/dpkp/kafka-python/issues/601#issuecomment-331419097
    assignments = []
    partitions = consumer.partitions_for_topic(topic)
    for p in partitions:
        print(f'topic {topic} - partition {p}')
        assignments.append(TopicPartition(topic, p))
    consumer.assign(assignments)

    return consumer

# Telemetry Producer

In [None]:
import json
import os

producer = create_producer()

foldername = 'rov_data'
files = [str(i) for i in range(4)]

sessions = []
for ix,file in enumerate(files):
    with open(os.path.join(foldername,'{}_injected.json'.format(file)),'r') as f:
        raw_data = json.load(f)
        sessions.append([{
                        'mtarg1':str(x['mtarg1']),
                        'mtarg2':str(x['mtarg2']),
                        'mtarg3':str(x['mtarg3']),

                        'roll':x['roll'],
                        'pitch':x['pitch'],
                        'yaw':x['yaw'],

                        'LACCX':x['LACCX'],
                        'LACCY':x['LACCY'],
                        'LACCZ':x['LACCZ'],

                        'GYROX':x['GYROX'],
                        'GYROY':x['GYROY'],
                        'GYROZ':x['GYROZ'],

                        'SC1I':x['SC1I'],
                        'SC2I':x['SC2I'],
                        'SC3I':x['SC3I'],

                        'BT1I':x['BT1I'],
                        'BT2I':x['BT2I'],

                        'vout':x['vout'],
                        'iout':x['iout'],
                        'cpuUsage':x['cpuUsage'],
            

                        'timestamp':x['timestamp']} for x in raw_data['0']['data']])



total = 0
for session in sessions:
    for sample in session:
        producer.send('telemetry-rov', sample)
        producer.flush()
        total+=1
print("Total>>",total)

Total>> 7912


# Predict Producer

## Model Features

In [None]:
features = [
    "mtarg1", 
    "mtarg2",  
    "mtarg3",
    
    "roll",
    "pitch",

    "LACCX",          
    "LACCY",  
    "LACCZ",
    
    "GYROX",
    "GYROY",  
    
    "SC1I",    
    "SC2I",    
    "SC3I",
    
    
    "BT1I",    
    "BT2I",    
    "vout",    
    "iout",    
    "cpuUsage",
        
    ]

fault_features = [
    'fault',
    'fault_type',
    'fault_value',
    'fault_duration'
]

In [None]:
import json
import joblib
import pandas as pd
import pickle
import numpy as np

# Loading regression model
reg_model = joblib.load(open('model_files/BL_GYROZ.joblib', 'rb'))
target = 'GYROZ'

# Initializing Kafka consumer (for telemetry topic) and producker  (for prediction topic)
topic = 'telemetry-rov'
producer = create_producer()
consumer = create_consumer(topic)


def main():

    consumer.seek_to_beginning()
    session_number = 0
    prev = None
    total = 0
    session_data = []
    while True:
        finished = True
        record = consumer.poll(timeout_ms=500, max_records=100, update_offsets=True)
        for k,v in record.items():
            for row in v:
                current = row.value
                current_ts = row.value.get("timestamp")
                res = calculate_residual(current,prev)
                to_send = {'residual':res,'timestamp':current_ts}
                total +=1
                producer.send('prediction-rov', to_send)
                producer.flush()
                prev = current
                prev_ts = current_ts
                finished = False
        if finished:
            break

    print("Total>>",total)

def calculate_residual(current,prev):
    if prev:
        # More than 0.5s has passed without the ROV sending new telemetry data
        if current['timestamp']-prev['timestamp']>500:
            print("OVER TIME LIMIT")
            return np.nan
        else:
            prev = { ft: float(prev[ft]) for ft in features }
            x = np.array(list(prev.values()))
            x=x.reshape(1,-1)

            # Min-max scaler for input features
            with open('model_files/BL_x.pickle', 'rb') as f:
                scaler_x=pickle.load(f)

            x=scaler_x.transform(x)       

            py = reg_model.predict(x)
            # Min-max scaler for the target - GYROZ
            with open('model_files/BL_y.pickle', 'rb') as f:
                scaler_y=pickle.load(f)
            py = py.reshape(1,-1)
            py = scaler_y.inverse_transform(py)
            py = py.ravel()
            y = float(current[target])
            # return residual (diff. between predicted and current)
            return abs(py[0]-y)
    else:
        return np.nan
if (__name__=='__main__'):
    main()

topic telemetry-rov - partition 0
OVER TIME LIMIT
OVER TIME LIMIT
OVER TIME LIMIT
OVER TIME LIMIT
OVER TIME LIMIT
OVER TIME LIMIT
OVER TIME LIMIT
Total>> 7912


# Session Logger

In [None]:
from datetime import datetime
from whylogs import get_or_create_session

def break_into_batches(session_data):
    min_sessions = []
    min_session = []
    for sample in session_data:
        sample_ts = sample['timestamp']
        try:
            min_ts = min_session[0]['timestamp']
        except:
            min_ts = sample_ts
        if sample_ts - min_ts < 60*1000:
            min_session.append(sample)
        else:
            dt_object = datetime.fromtimestamp(min_ts/1000)
            dt_str = datetime.strftime(dt_object, '%Y-%m-%d %H:%M')
            dt_object = datetime.strptime(dt_str,'%Y-%m-%d %H:%M')
            min_sessions.append((min_session,dt_object))
            min_session = []
            min_session.append(sample)
    return min_sessions

def log_session(dataset_name,session_data):
    session = get_or_create_session()
    session_batches = break_into_batches(session_data)
    for min_batch in session_batches:
        timestamp = min_batch[1]
        with session.logger(dataset_name=dataset_name,dataset_timestamp=timestamp) as logger:
            for r in min_batch[0]:
                del r['timestamp']
                logger.log(r)


# Telemetry Logger

In [1]:
import json
from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(bootstrap_servers='localhost:9092', 
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))

# consumer.seek_to_beginning workaround
# https://github.com/dpkp/kafka-python/issues/601#issuecomment-331419097
assignments = []
topics=['telemetry-rov']
for topic in topics:
    partitions = consumer.partitions_for_topic(topic)
    for p in partitions:
        print(f'topic {topic} - partition {p}')
        assignments.append(TopicPartition(topic, p))
consumer.assign(assignments)

consumer.seek_to_beginning()

session_number = 0
prev_ts = None
total = 0
session_data = []
while True:
    finished = True
    record = consumer.poll(timeout_ms=500, max_records=100, update_offsets=True)
    for k,v in record.items():
        for row in v:
            total+=1
            current_ts = row.value.get('timestamp',None)
            if not prev_ts:
                prev_ts = current_ts

            if abs(current_ts - prev_ts) > 5*60*1000:
                print("Logging session ",session_number)
                dataset_name = "rov_telemetry_{}".format(session_number)
                log_session(dataset_name,session_data)

                session_data = []
                session_number+=1

            session_data.append(row.value)
            prev_ts = current_ts
            finished = False

    if finished:
        print("Logging session ",session_number)
        dataset_name = "rov_telemetry_{}".format(session_number)
        log_session(dataset_name,session_data)

        break

print("Total:",total)


topic telemetry-rov - partition 0
Logging session  0
Logging session  1
Logging session  2
Logging session  3
Total: 7912


# Predict Logger

## Calculate Moving Averages

In [None]:
def moving_average(slice_,wd):
    if None in slice_:
        return np.nan
    if len(slice_) != wd:
        return np.nan
    return sum(slice_)/wd

def add_moving_averages(resid_window,current):
    r_5 = list(resid_window)[-5:]
    r_10 = list(resid_window)[-10:]
    r_15 = list(resid_window)[-15:]

    current['residual_m5'] = moving_average(r_5,5)
    current['residual_m10'] = moving_average(r_10,10)
    current['residual_m15'] = moving_average(r_15,15)

    return current

In [None]:
import json
from collections import deque
from logger_tools import log_session
import numpy as np
from kafkaConnector import create_consumer

topic = 'prediction-rov'
consumer = create_consumer(topic)

resid_window = deque(maxlen=15)
j = 0
session_number = 0
prev_ts = None
session_data = []
consumer.seek_to_beginning()
while True:
    finished = True
    record = consumer.poll(timeout_ms=500, max_records=100, update_offsets=True)
    for k,v in record.items():
        for row in v:
            current = row.value
            current_ts = current['timestamp']
            resid_window.append(current['residual'])
            current = add_moving_averages(resid_window,current)

            if not prev_ts:
                prev_ts = current_ts

            if abs(current_ts - prev_ts) > 5*60*1000:
                print("Logging session ",session_number)
                dataset_name = "rov_prediction_{}".format(session_number)
                log_session(dataset_name,session_data)

                session_data = []
                session_number+=1
            session_data.append(current)
            prev_ts = current_ts
            finished = False
    if finished:
        print("Logging session ",session_number)
        dataset_name = "rov_prediction_{}".format(session_number)
        log_session(dataset_name,session_data)
        break



topic prediction-rov - partition 0
Logging session  0
Logging session  1
Logging session  2
Logging session  3


# The Monitoring Dashboard

Let's first import some required libraries:

In [1]:
from IPython.display import display, clear_output
from ipywidgets import interact, fixed, IntSlider, HBox, Layout, Output, VBox
import ipywidgets as widgets
from IPython.display import display,Markdown

We need to be able to identify how many sessions we have, so the user can select the appropriate one.
Let's sweep the content in the __whylogs_output__ folder and search for the relevant dataset names:

In [2]:
import os
telemetry_sessions = [x for x in next(os.walk('whylogs-output'))[1] if 'rov_telemetry' in x]
prediction_sessions = [x for x in next(os.walk('whylogs-output'))[1] if 'rov_prediction' in x]
session_numbers = list(range(len(telemetry_sessions)))
print(session_numbers)

[0, 1, 2, 3]


We also need to identify all of the available features in the __rov_telemetry__ sessions, because that will also be a user input.

We'll do that by merging all of the __telemetry__ sessions, and getting the __columns__ proprerty, and then sorting it alphabetically:

In [3]:
import glob
from whylogs import DatasetProfile
from whylogs.viz import ProfileVisualizer

features_per_session = []
for s_no in session_numbers:
    binaries = glob.glob('whylogs-output/rov_telemetry_{}/**/*.bin'.format(s_no), recursive=True)
    binaries
    # currently, whylogs writer writes non-delimited files
    profiles = [DatasetProfile.read_protobuf(x, delimited_file=False) for x in binaries]
    from functools import reduce
    merged = reduce(lambda x, y: x.merge(y), profiles)
    merged.columns
    # viz = ProfileVisualizer()
    # viz.set_profiles(profiles)
    # viz.plot_distribution("GYROZ")
    feature_list = sorted([x for x in merged.columns])
    features_per_session.append(feature_list)


Another user input is the sliding window of the moving average. This will depend on the __predict_logger__, so we'll need to inspect the available features of the __rov_prediction__ topic, which will have features of the form:

residual_m{}

Where {} is an int, indication the window size for the moving average.

To do that, we'll merge all of the __prediction__ sessions, get the list of columns, and then split by the "__\___" character, to get the last element and find the window sizes we want:

In [4]:
import glob
from whylogs import DatasetProfile
from whylogs.viz import ProfileVisualizer

def to_numeric(avg_windows_per_session):
    new_avg_windows_per_session = []
    for avg_list in avg_windows_per_session:
        new_avg_list = []
        for res in avg_list:
            try:
                wd = int(res.split("_")[-1][1:])
            except:
                wd = 1
            new_avg_list.append(wd)
        new_avg_windows_per_session.append(sorted(new_avg_list))
    return new_avg_windows_per_session

avg_windows_per_session = []
for s_no in session_numbers:
    binaries = glob.glob('whylogs-output/rov_prediction_{}/**/*.bin'.format(s_no), recursive=True)
    binaries
    # currently, whylogs writer writes non-delimited files
    profiles = [DatasetProfile.read_protobuf(x, delimited_file=False) for x in binaries]
    from functools import reduce
    merged = reduce(lambda x, y: x.merge(y), profiles)
    merged.columns
    # viz = ProfileVisualizer()
    # viz.set_profiles(profiles)
    # viz.plot_distribution("GYROZ")
    avg_list = [x for x in merged.columns]
    avg_windows_per_session.append(avg_list)

avg_windows_per_session = to_numeric(avg_windows_per_session)

Ok, now we have all the information to actually build the widget.

Let's define our user inputs and how we will make them available for the user:
- Session Number : Dropdown
- Feature List for Telemetry Topic: Dropdown
- Plot Type for Telemetry Features: Dropdown
- Window Size for the Residual's Moving Average: Slider
- Plot Type for Prediction Features: Dropdown


In [5]:
import ipywidgets as widgets
import glob
from whylogs import DatasetProfile
from whylogs.viz import ProfileVisualizer
from datetime import datetime

plot_types = ["Distribution","Data Types","Missing Values","Uniqueness"]
plot_types_res = ["Distribution","Missing Values"]

dropdown_plots = widgets.Dropdown(options = plot_types,value="Distribution",description='Plot Type')
dropdown_plots_res = widgets.Dropdown(options = plot_types_res,value="Distribution",description='Plot Type')

dropdown_session = widgets.Dropdown(options = session_numbers,description='Session No.')
dropdown_features = widgets.Dropdown(options = features_per_session[dropdown_session.value],value="GYROZ",description='Feature:')
slider_avg=widgets.SelectionSlider(description='Mov. Avg.',   options=avg_windows_per_session[dropdown_session.value])


We will have basically, two main areas of Output:
- output_telemetry
- output_prediction

In `output_telemetry`, we'll display:
- Session Date
- The chosen Plot for the chosen Feature

In `output_prediction`, we'll display:
- The chosen Plot for the residual, applying the chosen moving average

We separate that creating a `widgets.Output`, and calling `with output_xxx` in order to display things in the right order: 

In [6]:

output_telemetry = widgets.Output(layout={'width': 'auto','align_items':'center','object_fit':'fill','margin':'0px 10px 10px 0px'})
output_prediction = widgets.Output(layout={'width': 'auto','align_items':'center','object_fit':'fill','margin':'0px 10px 10px 0px'})

def plot_feature_distribution(session_number,feature_name,plot_type):
#     print("change new>",change.new)

    with output_telemetry:
        output_telemetry.clear_output(wait=True)

#         display(change.new)
#         output_session.clear_output()

#         print("change new>",change.new)
        telemetry_session = "rov_telemetry_{}".format(session_number)
        binaries = glob.glob('whylogs-output/{}/**/*.bin'.format(telemetry_session), recursive=True)
        binaries
        # currently, whylogs writer writes non-delimited files
        profiles = [DatasetProfile.read_protobuf(x, delimited_file=False) for x in binaries]
        profiles
        display(Markdown("# Session Date:"))
        dt_object = profiles[0].dataset_timestamp 
        dt_str = datetime.strftime(dt_object, '%Y-%m-%d %H:%M')
        display(Markdown("%s"%(dt_str)))
        display(Markdown("# Telemetry"))

        viz = ProfileVisualizer()
        viz.set_profiles(profiles)
        if plot_type == "Distribution":
            display(viz.plot_distribution(feature_name,ts_format='%H:%M'))
        if plot_type == "Data Types":
            display(viz.plot_data_types(feature_name,ts_format='%H:%M'))
        if plot_type == "Missing Values":
            display(viz.plot_missing_values(feature_name,ts_format='%H:%M'))
        if plot_type == "Uniqueness":
            display(viz.plot_uniqueness(feature_name,ts_format='%H:%M'))
            

def plot_residual_distribution(session_number,wd,plot_type):
#     print("change new>",change.new)

    with output_prediction:
        output_prediction.clear_output(wait=True)
        display(Markdown("# Residuals"))
        telemetry_session = "rov_prediction_{}".format(session_number)
        binaries = glob.glob('whylogs-output/{}/**/*.bin'.format(telemetry_session), recursive=True)
        binaries
        # currently, whylogs writer writes non-delimited files
        profiles = [DatasetProfile.read_protobuf(x, delimited_file=False) for x in binaries]
        profiles
        viz = ProfileVisualizer()
        viz.set_profiles(profiles)
        if wd == 1:
            resid_name = "residual"
        else:
            resid_name = "residual_m{}".format(wd)
        if plot_type == "Distribution":
            display(viz.plot_distribution(resid_name,ts_format='%H:%M'))
        if plot_type == "Missing Values":
            display(viz.plot_missing_values(resid_name,ts_format='%H:%M'))



Now, we need to listen for changes in the input widgets (dropdowns and sliders), and call the `plot_feature_distribution` and `plot_residual_distribution` whenever there is a change, which will display our plots in the appropriate outputs.

We do that by defining an `eventhandler` for each input widget. These handlers will be called whenever there's a change in the respective input widget. In each `eventhandler`, we'll make it plot both `telemetry` and `prediction` sections, with the updated values (which will always be the changed value of the respective input widget, and the current state for the rest of the inputs.

In [7]:
def dropdown_session_eventhandler(change):
    plot_feature_distribution(change.new,dropdown_features.value,dropdown_plots.value)
    plot_residual_distribution(change.new,slider_avg.value,dropdown_plots_res.value)

def dropdown_plots_eventhandler(change):
    plot_feature_distribution(dropdown_session.value,dropdown_features.value,change.new)
    plot_residual_distribution(dropdown_session.value,slider_avg.value,dropdown_plots_res.value)

def dropdown_plots_res_eventhandler(change):
    plot_residual_distribution(dropdown_session.value,slider_avg.value,change.new)
    plot_feature_distribution(dropdown_session.value,dropdown_features.value,dropdown_plots.value)

def dropdown_features_eventhandler(change):
    plot_feature_distribution(dropdown_session.value,change.new,dropdown_plots.value)
    plot_residual_distribution(dropdown_session.value,slider_avg.value,dropdown_plots_res.value)

def slider_window_eventhandler(change):
    plot_residual_distribution(dropdown_session.value,change.new,dropdown_plots_res.value)
    plot_feature_distribution(dropdown_session.value,dropdown_features.value,dropdown_plots.value)

After defining the `eventhandlers`, we need to set the inputs to observe them:

In [8]:
dropdown_session.observe(dropdown_session_eventhandler, names='value')
dropdown_features.observe(dropdown_features_eventhandler, names='value')
dropdown_plots.observe(dropdown_plots_eventhandler, names='value')
dropdown_plots_res.observe(dropdown_plots_res_eventhandler, names='value')
slider_avg.observe(slider_window_eventhandler,names='value')

Now, we need to centralize all inputs and outputs in a single Box.

We do that by creating a VBox, setting some layouts configurations for it, and then informing what will be on the VBox.

V is for Vertical, so the arguments we send to it will be displayed vertically and in order.

We want to show the following elements in this order:
- Session Dropdown (INPUT)
- Telemetry Plot (OUTPUT)
- Feature List (INPUT)
- Plot type for Telemetry (INPUT)
- Prediction Plot (OUTPUT)
- Plot type for Prediction (INPUT)

In [9]:
from ipywidgets import interact, fixed, IntSlider, HBox, Layout, Output, VBox
import ipywidgets as widgets
box_layout = Layout(
                    align_items='center',
                    justify_content='space-around',
                    display='flex',
                    border='solid',
                    )
box = VBox([dropdown_session,output_telemetry,dropdown_features,dropdown_plots,output_prediction,dropdown_plots_res,slider_avg],layout=box_layout)
display(box)

VBox(children=(Dropdown(description='Session No.', index=1, options=(0, 1, 2, 3), value=1), Output(layout=Layo…