## PQ <-> Kafka Integration  

In [None]:
## CONNECT YOUR KAFKA STREAM TO YOUR PYTHON SCRIPT
# Start with creating a client for kafka port
from ksql import KSQLAPI
client = KSQLAPI('http://localhost:8088')

# You can also print the description of your schema 
client.ksql('describe i4Q_DataStream')

In [None]:
# By creating a query from your Kafka Stream, you can directly print current streamed messages
query = client.query('SELECT * from I4Q_DATASTREAM EMIT CHANGES;')

#for item in query:
#    print(item)

In [None]:
## DEFINE FUNCTION FOR AN INITIAL DATA FRAME, CONSISTING OF LATEST STREAM MESSAGES
# This function takes the former printed streams and stores them in panda data frames
# create_df_from_kafka() creates a pandas dataframe of the last n records
# read_latest_message() creates a single row in pandas data frame format of the last streamed message


from ksql import KSQLAPI
import pandas as pd
import json
import numpy as np
import re
import schedule
import time
from itertools import count

def create_df_from_kafka():

    # insert your ksql query here with latest updated message
    query = client.query('SELECT * from I4Q_DATASTREAM EMIT CHANGES LIMIT 5;')

    # take all as string and avoid runtime error
    whole = ''
    try:
        for item in query: 
            whole += item
    except RuntimeError:
            pass

    # adjust json file 
    records = json.loads(whole.strip().replace('\n\n\n\n', ''))

    # take header as string with separate column names
    header = str(np.array(records[0]).tolist()).split('`')

    # hard code columns to delete which are not applicable to the df
    header_subset = ['query', 'schema', 'BIGINT', ' STRING', 'DOUBLE']

    # delete inconvenient column names
    esc_lst = [re.escape(header_subset) for header_subset in header]

    # delete split character '\\'
    cols = [ x for x in esc_lst if "\\" not in x ]

    # address 2D data without header and last row (which is always the exit from the limited query)
    data = [r['row']['columns'] for r in records[1:-1]]
    
    # create convenient data frame
    df = pd.DataFrame(data=data, columns=cols)
    
    return df

    

def read_latest_message():
    # insert your ksql query here with latest updated message
    query = client.query('SELECT * from I4Q_DATASTREAM EMIT CHANGES LIMIT 1;')

    # take all as string and avoid runtime error
    whole = ''
    try:
        for item in query: 
            whole += item
    except RuntimeError:
            pass

    # adjust json file 
    records = json.loads(whole.strip().replace('\n\n\n\n', ''))

    # take header as string with separate column names
    header = str(np.array(records[0]).tolist()).split('`')

    # hard code columns to delete which are not applicable to the df
    header_subset = ['query', 'schema', 'BIGINT', ' STRING', 'DOUBLE']

    # delete inconvenient column names
    esc_lst = [re.escape(header_subset) for header_subset in header]

    # delete split character '\\'
    cols = [ x for x in esc_lst if "\\" not in x ]

    # address 2D data without header and last row (which is always the exit from the limited query)
    data = [r['row']['columns'] for r in records[1:-1]]

    # convert to appropriate data type
    new_row = pd.DataFrame(data=data, columns=cols)
    
    return new_row



In [None]:
## CREATE DATA FRAME WHICH IS CONTINUOUSLY UPDATED BY EACH NEW MESSAGE

def update_df():
    # start with already existing or with empty data frame
    df = pd.concat([create_df_from_kafka(), read_latest_message()])
    
    # if you want, you can delete the oldest message by keeping everything except the first row
    # df = df.iloc[1: , :]
    
    # loop it together to continuously add messages
    for i in range(len(df)):
        df = pd.concat([df, read_latest_message()])
        # df = df.iloc[1: , :]
   
    return df

df = update_df()
df.reset_index(inplace=True)
df

In [None]:
## CREATE FUNCTIONS TO PLOT LAST UPDATED KAFKA MESSAGE

import numpy as np
from matplotlib import pyplot as plt

# By creating a class with initial and depending processing, we can easily implement this plot later
class LivePlotNotebook(object):

    # First create the initial plot itself
    def __init__(self):
        %matplotlib notebook
        
        # create empty plot of the frame of the XLINE chart (default size can be adjusted)
        fig, ax = plt.subplots(figsize = (9,4))
        
        # create line plot for dependent variable which has to be visualized (20 is the default for the plotted values)
        ax.plot([0]*20, label='response', color='#595959')
        
        # set limits for axis
        ax.set_xlim(0,1)
        ax.set_ylim(0,1)
        
        # create legend with default tick numbering and name it "values"
        ax.legend()
        ax.set_xlabel('values')
        
        # create grid background for better reading
        ax.grid()
                
        # calculate Process Mean
        process_mean = df['RESPONSE'].mean()
        
        # create the Upper Control Limit Line
        UCL = 0.75
        line2 = ax.axhline(UCL, color='#8B569C')
        
        # create the Lower Control Limit Line
        LCL = 0.3
        line3 = ax.axhline(LCL, color='#8B569C')
        
         # Create mean line
        line4 = ax.axhline(process_mean, color='#EAD6FF')
        
        # address and overwrite plot variables for convenient processing
        self.ax = ax
        self.fig = fig
       
        # Create a chart title
        ax.set_title('Process Control Chart', color='#8064a2')

        # determine the x-axis limits in the chart to attach reference values
        left, right = self.ax.get_xlim()
        
        # create formatted values to display in the plot
        ax.text(20.2,UCL, "UCL = " + str("{:.2f}".format(UCL)), color='#8B569C')
        ax.text(20.2, process_mean, r'$\bar{x}$' + " = " + str("{:.2f}".format(process_mean)), color='#EAD6FF')
        ax.text(20.2, LCL, "LCL = " + str("{:.2f}".format(LCL)), color='#8B569C')
        
    # Now, take the plot and let run continuous updates
    def update(self, x, actions):             
        
        # update by taking the first object
        line = self.ax.lines[0]
        
        # take lenghth of data for convenient visualization
        line.set_xdata(range(len(x)))
        line.set_ydata(x)
        
        # update action plots
        for i, line in enumerate(self.ax.lines[1:]):
            # iterate over dataframe to take x and y values
            line.set_xdata(np.argwhere(actions==i).T)               
            line.set_ydata(x[actions==i])

            # update limits to let 
            self.ax.set_xlim(0, len(actions)+5)
            self.ax.set_ylim(0, 1)

            # visualize again
            self.fig.canvas.draw()
            
            return actions
     

In [None]:
# Now, we can finally create our continuously updated plot 

import time

# call former defined class
liveplot = LivePlotNotebook()

# determine columns of interest which has to be visualized
response = df['RESPONSE']

# process data for appropriate plot integration and define number of "actions" which is the number of visualized data points 
x = response[:1]
actions = [0]*16

# create loop which iterates over length of addressed data 
for i in range(1,response.count()):
    
    # set time between updates in seconds
    time.sleep(3)
    
    # append the latest row to the empty array
    act=np.append(actions[1:15],response[i])

    # call function for plot update with defined number of actions
    actions = liveplot.update(
        x=act,
        actions=act
    )


HISTOGRAM

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.ticker import StrMethodFormatter

# Enable interactive plot
%matplotlib notebook


# Fixing random state for reproducibility
np.random.seed(19680801)

# Fixing bin edges
HIST_BINS = np.linspace(0.2, 2, 100)

# histogram our data from dataframe
data = response
n, _ = np.histogram(data, HIST_BINS)

In [None]:
def prepare_animation(bar_container):
    
    def animate(frame_number):
        # simulate new data coming in
        data = np.array([response[:frame_number]])
        n, _ = np.histogram(data, HIST_BINS)
        #Loop to update the height of the bins for each new data coming in 
        for count, rect in zip(n, bar_container.patches):
            rect.set_height(count)
        return bar_container.patches
    
    return animate

In [None]:
#Draw Histogram and define the figure size 
fig, ax = plt.subplots(figsize = (9,9))
_, _, bar_container = ax.hist(data, HIST_BINS, lw=1,
                              ec="#8B569C", fc="purple", alpha=0.5)
ax.set_ylim(top=227)  # set safe limit to ensure that all data is visible.

#ax = df.hist(column='response', bins=27, grid=False, figsize=(8,7), color='purple', zorder=2, rwidth=0.8)


# Despine
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.spines['left'].set_visible(False)

# Switch off ticks
ax.tick_params(axis="both", which="both", bottom="off", top="off", labelbottom="on", left="off", right="off", labelleft="on")

# Draw horizontal axis lines
vals = ax.get_yticks()
for tick in vals:
    ax.axhline(y=tick, linestyle='dashed', alpha=0.4, color='#eeeeee', zorder=1)

# Set title
ax.set_title("Histogram for Process Control")

# Set x-axis label
ax.set_xlabel("Sensor Response", labelpad=20, weight='bold', size=12)

# Set y-axis label
ax.set_ylabel(" Counter ", labelpad=20, weight='bold', size=12)

# Format y-axis label
ax.yaxis.set_major_formatter(StrMethodFormatter('{x:,g}'))


# Create the Upper Control Limit Line
UCL = 0.8
line2 = ax.axvline(UCL, color='#8B569C', linestyle = '--', label ='UCL')

# Create the Lower Control Limit Line
LCL = 0.3
line3 = ax.axvline(LCL, color='#8B569C', linestyle = 'dashdot', label = 'LCL')

#Show Legend for the Lines (UCL AND LCL)
plt.legend()

#print(df.size)

#Create Continous Histogram 
ani = animation.FuncAnimation(fig, prepare_animation(bar_container), df.size,
                              repeat=False, blit=True)

#Plot the Graph
plt.show()

In [None]:
import joblib

def deploy_model():

    # read latest message with former created function
    last_message = read_latest_message()

    # convert latest message to array and slice to read only sensor data
    features = np.array(last_message)
    features = features[:, 6:12]

    # load pre-developed model
    rf_model = joblib.load('rf_model.pkl')
    
    # let model run over latest features 
    output = rf_model.predict(features)
    
    print(output)

    return output

schedule.every(1).seconds.do(deploy_model)

while True:
    schedule.run_pending()
    time.sleep(1)