### Client Setup

Accepting streaming data from server. Then training ensemble model for following ML models:
- Random Forest Classifier
- Logistic Regression
- Linear Discriminant Analysis
- K-Neighbors Classifier
- Classification And Regression Trees
- Support Vector Classifier
- Gaussian Naive Bayes

Both of these tasks run in their own thread, hence does not interfere each-others job. This notebook shows how we van use windowing technique and multithreading to process and analyze big data. <br>
<br>

In [19]:
# define data chunk parameters
INITIAL_CHUNK_SIZE = 50   # size of chunk at start of data streaming
FINAL_CHUNK_SIZE = 20    # chunk size during data streaming
CHUNK_SIZE_INCREMENT_FACTOR = 2    # chunk size increment factor when error occurs while training ensemble model
SERVER_DELAY = 0.05

In [20]:
# Importing required modules

# data streaming
import websockets
import asyncio
import threading

# data processing
from datetime import datetime
import pandas as pd

# ensemble modelling
import time
import sys
import numpy as np
import pickle
from sklearn.model_selection import train_test_split
from sklearn.ensemble import VotingClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# visualize
import plotly.graph_objects as go


In [21]:
# Loading prebuilt model structures stored as '[MODEL].h5'

rf = pickle.load(open('../models/h5s/RF.h5', 'rb'))
lr = pickle.load(open('../models/h5s/LR.h5', 'rb'))
lda = pickle.load(open('../models/h5s/LDA.h5', 'rb'))
knn = pickle.load(open('../models/h5s/kNN.h5', 'rb'))
cart = pickle.load(open('../models/h5s/CART.h5', 'rb'))
svm = pickle.load(open('../models/h5s/SVC.h5', 'rb'))

In [22]:
buffer = pd.DataFrame()    # init dataframe for storing the chunk
latest_accepted_chunk = pd.DataFrame()

columns = ["date", "time", "state", "sphone_signal", "label", "type"]

chunk_count = 0    # realtime chunk count
initial_chunk = True    # initial chunk flag
total_rows_received = 0
event = threading.Event()

In [23]:
# Function to log specified content in specified file

def log(content="", new=False, file='stream.log', timestamp=False, print_line=0):
    """
    Logs specified content too specified pre-existing file

    :param content: describe about parameter p1
    :param new: if True then overwrites, else appends. Default is False
    :param file: destination file to save logs into
    :param timestamp: if True, adds a timestamp before the content and appends to a new line
    :param print_line: prints a line built with specified number of '-'
    :return: None
    """
    # to overwrite
    if(new):
        log = open(f'./{file}', 'w')
        log.write("")
        log.close()
    log = open(f'./{file}', 'a')

    # to add time stamp
    if(timestamp):
        log.write(f"\n{datetime.now()} ~ {content}")
    # without timestamp and new lines
    else:
        log.write(content)
    
    # to print a line
    if(print_line):
        line = "-" * print_line
        log.write(f"\n{line}\n\n")
    
    # save files with changes
    log.close()


In [24]:
# Pandas 3D DataFrame
# https://stackoverflow.com/questions/24290495/constructing-3d-pandas-dataframe

metrics_df_columns=np.array([*['accuracy']*7, *['precision']*7, *['recall']*7, *['f1_score']*7, *['time_taken']*7])
models = np.array(['RF', 'LR', 'LDA', 'kNN', 'CART', 'SVM', 'VC']*5)
chunk_wise_6M_ensemble_metrics = pd.DataFrame(columns=pd.MultiIndex.from_tuples(zip(metrics_df_columns, models)))
chunk_wise_final_ensemble_metrics = pd.DataFrame(columns=['model_version', 'accuracy', 'precision', 'recall', 'f1_score_value', 'time_consumed'])

In [25]:
async def ensemble(chunk_count, initial=False):
    """
    Ensembles a chunk of data for specified models

    :param chunk_count:
    :param initial:
    :return: ensemble model
    """ 

    global buffer, latest_accepted_chunk, chunk_wise_6M_ensemble_metrics, metrics_df_columns
    # data preparation
    IoT = buffer.copy()
    current_row_count = len(IoT)
    # data preparation
    x = IoT.drop(['label', 'date', 'time', 'type'], axis=1)
    y = IoT['type']

    # data split into test and train sets
    x_train,x_test,y_train,y_test = train_test_split(x, y, test_size=0.2)
 
    # ensemble model
    start = time.time()
    voting = VotingClassifier(estimators=[('RF', rf), ('LR', lr), ('LDA', lda), ('KNN', knn), ('CART', cart), ('SVM', svm)], voting='hard')
    voting.fit(x_train, y_train)
    end = time.time()

    latest_accepted_chunk = IoT
    # log chunk created
    log(f"Chunk-{chunk_count} created with {current_row_count} records.", timestamp=True, print_line=50)
    
    if(len(buffer)-current_row_count==0):
        buffer = pd.DataFrame(columns=columns)
    else:
        buffer = buffer.iloc[current_row_count-1:]

    # initialise a list for saving a row
    metrics_row = []
    # log accuracy of each model
    for model in (rf, lr, lda, knn, cart, svm, voting):
        model.fit(x_train, y_train)
        y_pred = model.predict(x_test)

        # calculate required metrics
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))
        recall = recall_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))
        f1_score_value = f1_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))
        time_consumed = end-start
        metrics_row = metrics_row + [accuracy, precision,recall, f1_score_value, time_consumed]

    # append to 'chunk_wise_6M_ensemble_metrics'
    metrics_row = np.array(metrics_row)
    # print(metrics_row)
    chunk_wise_6M_ensemble_metrics = pd.concat([chunk_wise_6M_ensemble_metrics, pd.DataFrame([metrics_row], columns=pd.MultiIndex.from_tuples(zip(metrics_df_columns, models)))])
    # print(chunk_wise_6M_ensemble_metrics)
    return voting

In [26]:
log("", True)    # init 'stream.log' file

async def get_data():
    async def client():
        """
        Client function for streaming and processing data simultaneously
        """

        global buffer, total_rows_received
        uri = "ws://localhost:8765"    # websocket endpoint

        async with websockets.connect(uri, ping_interval=None) as websocket:
            # infinite loop for receiving streamed data
            while(True):
                # wait to receive a record of data
                row = await websocket.recv()
                log(f"{total_rows_received} ")    # log row count
                data_list = row.split(",")    # split the row(comma separated format) string into a list 
                total_rows_received += 1
                # append to 'buffer'
                buffer = pd.concat([buffer, pd.DataFrame([data_list], columns = columns)])
                
                # send acknowledgement for receiving a row successfully
                await websocket.send("1")
                
    await client()

def client_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    loop.run_until_complete(get_data())
    loop.close()


client_thread = threading.Thread(target=client_loop, daemon = True)

In [36]:
# https://stackoverflow.com/questions/66923279/python-plotly-dynamic-plot

trace1 = go.Scatter(x=np.array(list(chunk_wise_final_ensemble_metrics['model_version'])), y=chunk_wise_final_ensemble_metrics['accuracy'],
                            mode='lines',
                            name='Accuracy', line=dict(width=1.5))
trace2 = go.Scatter(x=np.array(list(chunk_wise_final_ensemble_metrics['model_version'])), y=chunk_wise_final_ensemble_metrics['precision'],
                            mode='lines',
                            name='Precision', line=dict(width=1.5))
trace3 = go.Scatter(x=np.array(list(chunk_wise_final_ensemble_metrics['model_version'])), y=chunk_wise_final_ensemble_metrics['f1_score_value'],
                            mode='lines',
                            name='F1 Score', line=dict(width=1.5))

trace4 = go.Scatter(x=np.array(list(chunk_wise_final_ensemble_metrics['model_version'])), y=chunk_wise_final_ensemble_metrics['recall'],
                            mode='lines',
                            name='Recall', line=dict(width=1.5))

fig1 = go.FigureWidget(data=[trace1, trace2, trace3, trace4], layout=go.Layout(template="plotly_white", title="Ensemble Model Metrics"))
display(fig1)

trace5 = go.Scatter(x=np.array(list(chunk_wise_final_ensemble_metrics['model_version'])), y=chunk_wise_final_ensemble_metrics['time_consumed'],
                            mode='lines',
                            name='Time', line=dict(width=1.5))
# fig2.update_layout(template="plotly_dark", title="Ensemble Model Time Consumed")
fig2 = go.FigureWidget(data=[trace5], layout=go.Layout(template="plotly_white", title="Ensemble Model Time Consumed"))
display(fig2)

async def update_plot():
    global fig1, fig2, chunk_wise_final_ensemble_metrics
    fig1.data[0].x = chunk_wise_final_ensemble_metrics['model_version']
    fig1.data[0].y = chunk_wise_final_ensemble_metrics['accuracy']
    fig1.data[1].x = chunk_wise_final_ensemble_metrics['model_version']
    fig1.data[1].y = chunk_wise_final_ensemble_metrics['precision']
    fig1.data[2].x = chunk_wise_final_ensemble_metrics['model_version']
    fig1.data[2].y = chunk_wise_final_ensemble_metrics['f1_score_value']
    fig1.data[3].x = chunk_wise_final_ensemble_metrics['model_version']
    fig1.data[3].y = chunk_wise_final_ensemble_metrics['recall']

    fig2.data[0].x = chunk_wise_final_ensemble_metrics['model_version']
    fig2.data[0].y = chunk_wise_final_ensemble_metrics['time_consumed']

FigureWidget({
    'data': [{'line': {'width': 1.5},
              'mode': 'lines',
              'name': 'Acc…

FigureWidget({
    'data': [{'line': {'width': 1.5},
              'mode': 'lines',
              'name': 'Tim…

: 

In [28]:
async def train():
    async def train_ensemble_model():
        global buffer, chunk_count, initial_chunk, chunk_wise_final_ensemble_metrics

        chunk_size = INITIAL_CHUNK_SIZE
        model = 0    # init model

        while(True):

            # 'tem_row_count' is equal to desired 'chunk_size'
            if (len(buffer) >= chunk_size):
                # if initial chunk
                if(initial_chunk):
                    try:
                        # train ensemble model only once
                        model = await ensemble(chunk_count+1, initial=True)
                        
                        # CHUNK ACCEPTED BY MODEL, NO ERRORS
                        initial_chunk = False    # initial chunk accepted, hence set to False
                        chunk_count += 1    # increment 'chunk_count'
                        chunk_size = FINAL_CHUNK_SIZE    # new chunk size reset to desired chunk size

                    # chunk rejected by ensemble model trainer, hence raise error
                    except ValueError:
                        # print(sys.exc_info())    # prints error statement/message

                        # log for class error
                        log("CLASS ERROR\n", timestamp=True)                 
                        chunk_size *= CHUNK_SIZE_INCREMENT_FACTOR    # multiplicative increase of chunk size
                
                # not initial chunk
                else:
                    # train ensemble model twice
                    try:
                        # 1. train ensemble model for chunk ---------------------------
                        new_chunk_model = await ensemble(chunk_count+1)
                        # -------------------------------------------------------------

                        # 2. ensemble model of previous chunk('model') with model of new chunk('new_chunk_model) ------
                        start = time.time()
                        new_ensemble_model = VotingClassifier(estimators=[('New Model', new_chunk_model), ('Old Model', model)], voting='hard')
                        # ---------------------------------------------------------------------------------------------

                        # fitting latest chunk into new model---------------------------------
                        IoT=latest_accepted_chunk
                        # data preparation
                        x = IoT.drop(['label', 'date', 'time', 'type'], axis=1)
                        y = IoT['type']

                        # data split into test and train sets
                        x_train,x_test,y_train,y_test = train_test_split(x, y, test_size=0.2)
                        new_ensemble_model.fit(x_train, y_train)
                        end = time.time()
                        # ---------------------------------------------------------------------

                        # calculate & print required metrics -----------------------------------------------------------
                        y_pred = new_ensemble_model.predict(x_test)
                        accuracy = accuracy_score(y_test, y_pred)
                        precision = precision_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))
                        recall = recall_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))
                        f1_score_value = f1_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))
                        time_consumed = end-start

                        chunk_wise_final_ensemble_metrics = pd.concat([chunk_wise_final_ensemble_metrics, pd.DataFrame([[f'v{chunk_count}', accuracy, precision, recall, f1_score_value, time_consumed]], columns = ['model_version','accuracy', 'precision', 'recall', 'f1_score_value', 'time_consumed'])])
                        log(chunk_wise_final_ensemble_metrics.to_string(index=False, col_space=15), True, file="ensemble.log", timestamp=False)
                        await update_plot()
                        # ----------------------------------------------------------------------------------------------

                        # ---------------------------------------------------
                        # SAVING MODEL FOR BAYESIAN MODEL TRAINING
                        filename = './models/ensemble_model_multithreaded.h5'
                        pickle.dump(new_ensemble_model, open(filename, 'wb'))
                        # ---------------------------------------------------

                        model = new_ensemble_model    # set 'model' to new_ensemble_model'
                        chunk_count += 1    # increment 'chunk_count'
                        chunk_size = FINAL_CHUNK_SIZE    # new chunk size reset to desired chunk size

                    # chunk rejected by ensemble model trainer, hence raise error
                    except ValueError:
                        # print(sys.exc_info())    # prints error statement/message

                        # log for class error
                        log("CLASS ERROR\n", timestamp=True)
                        chunk_size *= CHUNK_SIZE_INCREMENT_FACTOR    # multiplicative increase of chunk size

                event.wait(SERVER_DELAY*(chunk_size-len(buffer)*0.95))

    await train_ensemble_model()

def train_ensemble_model_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    loop.run_until_complete(train())
    loop.close()

trainEnsembleModel_thread = threading.Thread(target=train_ensemble_model_loop, daemon = True)

In [29]:
client_thread.start()
trainEnsembleModel_thread.start()

In [1]:
# chunk_wise_final_ensemble_metrics.to_csv('./chunk_wise_final_ensemble_metrics.csv')
# chunk_wise_6M_ensemble_metrics.to_csv('./chunk_wise_6M_ensemble_metrics.csv')

# fig1.write_html('./fig1.html')
# fig2.write_html('./fig2.html')

NameError: name 'fig1' is not defined

<br>
<br>
<center><b>End of File</b></center>