In [None]:
from tensorflow.keras.models import load_model
from kafka import KafkaConsumer
from joblib import load
import numpy as np
import time

In [None]:
model_lstm = load_model('./models/kdd_model_lstm.keras')

In [None]:
model_gnb = load('./models/kdd_model_gnb.joblib')

In [None]:
def create_sequences(data, seq_length=1):
    xs = []
    dataLen = 40
    for i in range(dataLen - seq_length + 1):
        x = data[i:(i + seq_length)]
        xs.append(x)
    return np.array(xs)

In [None]:
class PredictionModel:
    def __init__(self, model):
        self.model = model
        self.correct = 0
        self.total = 0
        
    def predict_GNB(self, data):
        dataList = data.split(',')
        dataNumeric = np.array(dataList, dtype=float)
        label = dataNumeric[-1]
        features = dataNumeric[:-1]
        prediction = self.model.predict(features.reshape(1, -1))
        self.total += 1        
        print('Time: {} Total predictions: {}'.format(time.time(), self.total))
        return prediction

    def predict_LSTM(self, data):
        dataList = data.split(',')
        dataNumeric = np.array(dataList, dtype=float)
        label = dataNumeric[-1]
        features = dataNumeric[:-1]
        features = features.reshape((1, 1, 40))
        prediction = self.model.predict(features)
        self.total += 1        
        print('Time: {} Total predictions: {}'.format(time.time(), self.total))
        return prediction

In [None]:
class RealTimePredictor:
    def __init__(self, model, topic, brokers):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=brokers,
            auto_offset_reset='earliest',  # Start reading at the earliest message
            value_deserializer=lambda x: x.decode('utf-8')  # Deserialize messages as UTF-8 encoded strings
        )
        self.model = model
        
    def consume_messages_lstm(self):
        for message in self.consumer:
            data = message.value
            prediction = self.model.predict_LSTM(data)
    
    def consume_messages_gnb(self):
        for message in self.consumer:
            data = message.value
            prediction = self.model.predict_GNB(data)

In [None]:
topic = "streamOut25A"
brokers = "localhost:9092"

lstm = PredictionModel(model_lstm)
gnb = PredictionModel(model_gnb)

In [None]:
kafkaConsumerGNB = RealTimePredictor(gnb, topic, brokers)
kafkaConsumerGNB.consume_messages_gnb()

In [None]:
print("GNB took: " )

In [None]:
kafkaConsumerLSTM = RealTimePredictor(lstm, topic, brokers)
kafkaConsumerLSTM.consume_messages_lstm()