### Client Setup

Accepting streaming data from server. Then training ensemble model for following ML models:
- Random Forest Classifier
- Logistic Regression
- Linear Discriminant Analysis
- K-Neighbors Classifier
- Classification And Regression Trees
- Support Vector Classifier
- Gaussian Naive Bayes

Both of these tasks run in their own thread, hence does not interfere each-others job. This notebook shows how we van use windowing technique and multithreading to process and analyze big data. <br>
<br>

In [1]:
# define data chunk parameters
INITIAL_CHUNK_SIZE = 50   # size of chunk at start of data streaming
FINAL_CHUNK_SIZE = 20    # chunk size during data streaming
CHUNK_SIZE_INCREMENT_FACTOR = 2    # chunk size increment factor when error occurs while training ensemble model

In [2]:
# Importing required modules

# data streaming
import websockets
import asyncio
import threading

# data processing
from datetime import datetime
import numpy as np
import pandas as pd

# ensemble modelling
import pickle
from sklearn.model_selection import train_test_split
from sklearn.ensemble import VotingClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score, precision_score, recall_score
# from time import sleep
# from tensorflow import keras
# from keras.models import load_model
# from keras.wrappers.scikit_learn import KerasClassifier
import time

In [3]:
# Loading prebuilt model structures stored as '[MODEL].h5'

# # using keras.model.load_model and setting estimator type to 'classifier'
# model1 = load_model('../models/h5s/LSTM.h5')
# model1 = KerasClassifier(build_fn=model1)
# model1._estimator_type = "classifier"
# model2 = load_model('../models/h5s/GRU.h5')
# model2 = KerasClassifier(build_fn=model2)
# model2._estimator_type = "classifier"

# # using pickle.load
# model1 = pickle.load(open('../models/h5s/LSTM.h5', 'rb'))
# model1._estimator_type = "classifier"
# model2 = pickle.load(open('../models/h5s/GRU.h5', 'rb'))
# model2._estimator_type = "classifier"
model3 = pickle.load(open('../models/h5s/random-forest.h5', 'rb'))
model4 = pickle.load(open('../models/h5s/logistic-regression.h5', 'rb'))
model5 = pickle.load(open('../models/h5s/linear-discriminant-analysis.h5', 'rb'))    #
model6 = pickle.load(open('../models/h5s/kNN.h5', 'rb'))
model7 = pickle.load(open('../models/h5s/CART.h5', 'rb'))
model8 = pickle.load(open('../models/h5s/support-vector-machine.h5', 'rb'))    #
model9 = pickle.load(open('../models/h5s/naive-bayes.h5', 'rb'))    #

In [4]:
buffer = pd.DataFrame()    # init dataframe for storing the chunk
columns = ["date-time", "fridge_temperature", "fridge_temp_condition", "door_state", "door_sphone_signal", "gps_latitude", "gps_longitude", "modbus_FC1_Read_Input_Register", "modbus_FC2_Read_Discrete_Value", "modbus_FC3_Read_Holding_Register", "modbus_FC4_Read_Coil", "ml_motion_status", "ml_light_status", "thermostat_current_temperature", "thermostat_status", "weather_temperature", "weather_pressure", "weather_humidity","weather_type"]

chunk_count = 0    # realtime chunk count
initial_chunk = True    # initial chunk flag
total_rows_received = 0
event = threading.Event()

In [5]:
# Function to log specified content in specified file

def log(content="", new=False, file='stream.log', timestamp=False, print_line=0):
    """
    Logs specified content too specified pre-existing file

    :param content: describe about parameter p1
    :param new: if True then overwrites, else appends. Default is False
    :param file: destination file to save logs into
    :param timestamp: if True, adds a timestamp before the content and appends to a new line
    :param print_line: prints a line built with specified number of '-'
    :return: None
    """
    # to overwrite
    if(new):
        log = open(f'./{file}', 'w')
        log.write("")
        log.close()
    log = open(f'./{file}', 'a')

    # to add time stamp
    if(timestamp):
        log.write(f"\n{datetime.now()} ~ {content}")
    # without timestamp and new lines
    else:
        log.write(content)
    
    # to print a line
    if(print_line):
        line = "-" * print_line
        log.write(f"\n{line}\n\n")
    
    # save files with changes
    log.close()


In [6]:
async def ensemble(chunk_count, initial=False):
    """
    Ensembles a chunk of data for specified models

    :param chunk_count:
    :param initial:
    :return: ensemble model
    """ 

    global buffer, latest_accepted_chunk
    # data preparation
    IoT = buffer.copy()
    current_row_count = len(IoT)
    # data preparation
    x = IoT.drop(['weather_type', 'date-time'], axis=1)
    y = IoT['weather_type']

    # data split into test and train sets
    x_train,x_test,y_train,y_test = train_test_split(x, y, test_size=0.2)
 
    # ensemble model
    # voting = VotingClassifier(estimators=[('SVM', model8), ('LR', model4), ('LSTM', model1), ('GRU', model2), ('RF', model3), ('LDA', model5), ('KNN', model6), ('CART', model7), ('Naive-Bayes', model9)], voting='hard')
    # voting = VotingClassifier(estimators=[('SVM', model8), ('LR', model4), ('RF', model3), ('LDA', model5), ('KNN', model6), ('CART', model7), ('Naive-Bayes', model9)], voting='hard')
    voting = VotingClassifier(estimators=[('RF', model3), ('LR', model4), ('LDA', model5), ('KNN', model6), ('CART', model7), ('SVM', model8), ('NB', model9)], voting='hard')
    voting.fit(x_train, y_train)

    latest_accepted_chunk = IoT
    # log chunk created
    log(f"Chunk-{chunk_count} created with {current_row_count} records.", timestamp=True, print_line=50)
    
    if(len(buffer)-current_row_count==0):
        buffer = pd.DataFrame(columns=columns)
    else:
        buffer = buffer.iloc[current_row_count-1:]

    if(not initial):
        log(f"", file="ensemble.log", print_line=20)
    
    # log model version
    log(f"Model v{chunk_count}\n", initial, file="ensemble.log", timestamp=True)

    # log accuracy of each model
    # for clf in (model8, model4, model1, model2, model3, model5, model6, model7, model9, voting):
    # for clf in (model8, model4, model3, model5, model6, model7, model9, voting):
    for clf in (model3, model4, model5, model6, model7, model8, model9, voting):
        train_start_time=time.time()
        clf.fit(x_train, y_train)
        train_end_time=time.time()
        test_start_time=time.time()
        y_pred = clf.predict(x_test)
        test_end_time=time.time()
        
        # ---------------------------------------------------
        # SAVING INDIVIDUAL MODELS FOR BAYESIAN MODEL TRAINING
        # filename = f'./models/{clf.__class__.__name__}.h5'
        # pickle.dump(ensemble_model, open(filename, 'wb'))
        # ---------------------------------------------------

        log(f"{clf.__class__.__name__}: {accuracy_score(y_test, y_pred)}\n", file="ensemble.log")
        log(f"{clf.__class__.__name__} accuracy : {accuracy_score(y_test, y_pred)}\n", file="ensemble.log")
        log(f"{clf.__class__.__name__} precision : {precision_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))}\n", file="ensemble.log")
        log(f"{clf.__class__.__name__} recall : {recall_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))}\n", file="ensemble.log")
        log(f"{clf.__class__.__name__} f1_score : {f1_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))}\n", file="ensemble.log")
        log(f"{clf.__class__.__name__} train time : {train_end_time-train_start_time}, sec\n", file="ensemble.log")
        log(f"{clf.__class__.__name__} test time : {test_end_time-test_start_time}, sec\n", file="ensemble.log")
        log(f"\n\n", file="ensemble.log")

    return voting

In [7]:
# # define data chunk parameters
# INITIAL_CHUNK_SIZE = 50   # size of chunk at start of data streaming
# FINAL_CHUNK_SIZE = 20    # chunk size during data streaming
# CHUNK_SIZE_INCREMENT_FACTOR = 2    # chunk size increment factor when error occurs while training ensemble model

# async def client():
#     """
#     Client function for streaming and processing data simultaneously
#     """

#     uri = "ws://localhost:8765"    # websocket endpoint
#     temp_row_count = 0    # realtime chunk size
#     chunk_size = INITIAL_CHUNK_SIZE
#     chunk_count = 0    # realtime chunk count
#     initial_chunk = True    # initial chunk flag 
#     model = 0    # init model
#     log("", True)    # init 'stream.log' file
#     async with websockets.connect(uri, ping_interval=None) as websocket:
#         df = pd.DataFrame()    # init dataframe for storing the chunk

#         # infinite loop for receiving streamed data
#         while(True):
#             # wait to receive a record of data
#             row = await websocket.recv()
#             log(f"{temp_row_count} ")    # log row count
#             data_list = row.split(",")    # split the row(comma separated format) string into a list 

#             # if starting a new chunk, reinitialize 'df' to empty dataframe
#             if(temp_row_count == 0):
#                 df = pd.DataFrame()
#                 # add row into the dataframe
#                 df = pd.DataFrame([data_list], columns = ["date-time","fridge_temperature","fridge_temp_condition","fridge_label","fridge_type","door_state","door_sphone_signal","door_label","gps_type","ml_motion_status_x","ml_light_status_x","ml_label_x","ml_type_x","gps_latitude","gps_longitude","gps_label","gps_type","modbus_FC1_Read_Input_Register","modbus_FC2_Read_Discrete_Value","modbus_FC3_Read_Holding_Register","modbus_FC4_Read_Coil","modbus_label","modbus_type","ml_motion_status_y","ml_light_status_y","ml_label_y","ml_type_y","thermostat_current_temperature","thermostat_status","thermostat_label","thermostat_type","weather_temperature","weather_pressure","weather_humidity","weather_label","weather_type"])
            
#             #  else old chunk
#             else:
#                 # append to 'df'
#                 df = pd.concat([df, pd.DataFrame([data_list], columns = ["date-time","fridge_temperature","fridge_temp_condition","fridge_label","fridge_type","door_state","door_sphone_signal","door_label","gps_type","ml_motion_status_x","ml_light_status_x","ml_label_x","ml_type_x","gps_latitude","gps_longitude","gps_label","gps_type","modbus_FC1_Read_Input_Register","modbus_FC2_Read_Discrete_Value","modbus_FC3_Read_Holding_Register","modbus_FC4_Read_Coil","modbus_label","modbus_type","ml_motion_status_y","ml_light_status_y","ml_label_y","ml_type_y","thermostat_current_temperature","thermostat_status","thermostat_label","thermostat_type","weather_temperature","weather_pressure","weather_humidity","weather_label","weather_type"])])
#             temp_row_count += 1    # increment chunk size
            
#             # 'tem_row_count' is equal to desired 'chunk_size'
#             if (temp_row_count == chunk_size):
#                 # if initial chunk
#                 if(initial_chunk):
#                     try:
#                         # train ensemble model only once
#                         model = await ensemble(df, chunk_count+1, initial=True)
#                         # CHUNK ACCEPTED BY MODEL, NO ERRORS
#                         initial_chunk = False    # initial chunk accepted, hence set to False
#                         chunk_count += 1    # increment 'chunk_count'

#                         # log chunk created
#                         log(f"Chunk-{chunk_count} created with {temp_row_count} records.", timestamp=True, print_line=50)

#                         chunk_size = FINAL_CHUNK_SIZE    # new chunk size reset to desired chunk size
#                         temp_row_count = 0    # reset 'temp_row_count' for new chunk

#                     # chunk rejected by ensemble model trainer, hence raise error
#                     except ValueError:
#                         # print(sys.exc_info())    # prints error statement/message

#                         # log for class error
#                         log("CLASS ERROR\n", timestamp=True)
#                         chunk_size *= CHUNK_SIZE_INCREMENT_FACTOR    # multiplicative increase of chunk size
                
#                 # not initial chunk
#                 else:
#                     # train ensemble model twice
#                     try:
#                         # 1. train ensemble model for chunk
#                         new_chunk_model = await ensemble(df, chunk_count+1)
#                         # 2. ensemble model of previous chunk('model') with model of new chunk('new_chunk_model)
#                         new_ensemble_model = VotingClassifier(estimators=[('New Model', new_chunk_model), ('Old Model', model)], voting='hard')
                        
#                         # ---------------------------------------------------
#                         # SAVING MODEL FOR BAYESIAN MODEL TRAINING
#                         # filename = './models/ensemble_model.h5'
#                         # pickle.dump(new_ensemble_model, open(filename, 'wb'))
#                         # ---------------------------------------------------

#                         model = new_ensemble_model    # set 'model' to new_ensemble_model'
#                         chunk_count += 1    # increment 'chunk_count'

#                         # log chunk created
#                         log(f"Chunk-{chunk_count} created with {temp_row_count} records.", timestamp=True, print_line=50)
#                         chunk_size = FINAL_CHUNK_SIZE    # new chunk size reset to desired chunk size
#                         temp_row_count = 0    # reset 'temp_row_count' for new chunk

#                     # chunk rejected by ensemble model trainer, hence raise error
#                     except ValueError:
#                         # print(sys.exc_info())    # prints error statement/message

#                         # log for class error
#                         log("CLASS ERROR\n", timestamp=True)
#                         chunk_size *= CHUNK_SIZE_INCREMENT_FACTOR    # multiplicative increase of chunk size

#             # send acknowledgement for receiving a row successfully
#             await websocket.send("1")

# await client()

In [8]:
log("", True)    # init 'stream.log' file

async def get_data():
    async def client():
        """
        Client function for streaming and processing data simultaneously
        """

        global buffer, total_rows_received
        uri = "ws://localhost:8765"    # websocket endpoint

        async with websockets.connect(uri, ping_interval=None) as websocket:
            # infinite loop for receiving streamed data
            while(True):
                # wait to receive a record of data
                row = await websocket.recv()
                log(f"{total_rows_received} ")    # log row count
                data_list = row.split(",")    # split the row(comma separated format) string into a list 
                total_rows_received += 1
                # append to 'buffer'
                buffer = pd.concat([buffer, pd.DataFrame([data_list], columns = columns)])
                
                # send acknowledgement for receiving a row successfully
                await websocket.send("1")
                
    await client()

def client_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    loop.run_until_complete(get_data())
    loop.close()


client_thread = threading.Thread(target=client_loop, daemon = True)

In [9]:
async def train():
    async def train_ensemble_model():
        global buffer, chunk_count, initial_chunk

        chunk_size = INITIAL_CHUNK_SIZE
        model = 0    # init model

        while(True):

            # 'tem_row_count' is equal to desired 'chunk_size'
            if (len(buffer) >= chunk_size):
                # if initial chunk
                if(initial_chunk):
                    try:
                        # train ensemble model only once
                        model = await ensemble(chunk_count+1, initial=True)
                        
                        # CHUNK ACCEPTED BY MODEL, NO ERRORS
                        initial_chunk = False    # initial chunk accepted, hence set to False
                        chunk_count += 1    # increment 'chunk_count'
                        chunk_size = FINAL_CHUNK_SIZE    # new chunk size reset to desired chunk size

                    # chunk rejected by ensemble model trainer, hence raise error
                    except ValueError:
                        # print(sys.exc_info())    # prints error statement/message

                        # log for class error
                        log("CLASS ERROR\n", timestamp=True)                 
                        chunk_size *= CHUNK_SIZE_INCREMENT_FACTOR    # multiplicative increase of chunk size
                
                # not initial chunk
                else:
                    # train ensemble model twice
                    try:
                        # 1. train ensemble model for chunk
                        new_chunk_model = await ensemble(chunk_count+1)
                        # 2. ensemble model of previous chunk('model') with model of new chunk('new_chunk_model)
                        new_ensemble_model = VotingClassifier(estimators=[('New Model', new_chunk_model), ('Old Model', model)], voting='hard')

                        IoT=latest_accepted_chunk
                        # data preparation
                        x = IoT.drop(['weather_type', 'date-time'], axis=1)
                        y = IoT['weather_type']

                        # data split into test and train sets
                        x_train,x_test,y_train,y_test = train_test_split(x, y, test_size=0.2)
                        train_start_time=time.time()
                        new_ensemble_model.fit(x_train, y_train)
                        train_end_time=time.time()
                        test_start_time=time.time()
                        y_pred = new_ensemble_model.predict(x_test)
                        test_end_time=time.time()
                        # print(accuracy_score(y_test, y_pred))
                        log(f"Accuracy : {accuracy_score(y_test, y_pred)}\n", file="final_ensemble.log")
                        log(f"Precision : {precision_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))}\n", file="final_ensemble.log")
                        log(f"Recall : {recall_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))}\n", file="final_ensemble.log")
                        log(f"F1 Score : {f1_score(y_test, y_pred, average='weighted', labels=np.unique(y_pred))}\n", file="final_ensemble.log")
                        log(f"Train time : {train_end_time-train_start_time}, sec\n", file="final_ensemble.log")
                        log(f"Test time : {test_end_time-test_start_time}, sec\n", file="final_ensemble.log")
                        log(f"\n\n", file="final_ensemble.log")
                        
                        # ---------------------------------------------------
                        # SAVING MODEL FOR BAYESIAN MODEL TRAINING
                        filename = './models/ensemble_model_multithreaded.h5'
                        pickle.dump(new_ensemble_model, open(filename, 'wb'))
                        # ---------------------------------------------------

                        model = new_ensemble_model    # set 'model' to new_ensemble_model'
                        chunk_count += 1    # increment 'chunk_count'
                        chunk_size = FINAL_CHUNK_SIZE    # new chunk size reset to desired chunk size

                    # chunk rejected by ensemble model trainer, hence raise error
                    except ValueError:
                        # print(sys.exc_info())    # prints error statement/message

                        # log for class error
                        log("CLASS ERROR\n", timestamp=True)
                        chunk_size *= CHUNK_SIZE_INCREMENT_FACTOR    # multiplicative increase of chunk size
                event.wait(0.1*(chunk_size-len(buffer)*0.95))

    await train_ensemble_model()

def train_ensemble_model_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    loop.run_until_complete(train())
    loop.close()

trainEnsembleModel_thread = threading.Thread(target=train_ensemble_model_loop, daemon = True)

In [10]:
client_thread.start()
trainEnsembleModel_thread.start()

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver opt

<br>
<br>
<center><b>End of File</b></center>