In [1]:
from kafka import KafkaConsumer
import sys
import json
import time
from confluent_kafka import Consumer, KafkaError
from json import loads
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime
import warnings

# Suppress sklearn warning about missing feature names
warnings.filterwarnings("ignore", message="X does not have valid feature names, but IsolationForest")

import numpy as np  # Import NumPy for array manipulation
# Reading messages from the consumer with a 5-second interval

In [2]:
import joblib
# Load the saved model
loaded_model = joblib.load('isolation_forest_model.joblib')

In [None]:
# Kafka broker address
bootstrap_servers = 'localhost:9093'
 
# Kafka topic to consume messages from
kafka_topic = 'Topic_test'
 
# # Consumer group ID
# group_id = 'group1'
 
# InfluxDB connection details
influxdb_host = 'localhost'
influxdb_port = 8086
influxdb_username = 'mahirasadzade'
influxdb_password = 'mahira123'
bucket = "BloodPressure"
organization = "OstProject"
influxdb_token = "DG2B6mm_vDI_4oVm9OW3pCiv1A6ID5Jr9XGgn_7uIbG8iElhhzwYOOXw5Begzk3fTI9M1nh-XI6Qb2qA1k7RIg=="

# Create Kafka consumer configuration
consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    # 'group.id': group_id,
    'auto.offset.reset': 'earliest'
}
 
with InfluxDBClient(url="http://localhost:8086", token=influxdb_token, org=organization,bucket=bucket) as client:
        write_api = client.write_api(write_options=SYNCHRONOUS)
        consumer = KafkaConsumer(kafka_topic, bootstrap_servers=['localhost:9093'],auto_offset_reset='earliest', api_version=(0,10),enable_auto_commit=True,value_deserializer=lambda x: loads(x.decode('utf-8')))
        print("Connection established")
        i=0
        for message in consumer:
            print(f"{message.value}")
            # Reshape the input data into a 2D array
            sbp = np.array(message.value.get('SBP')).reshape(-1, 1)
            dbp = np.array(message.value.get('DBP')).reshape(-1, 1)
            hr = np.array(message.value.get('HR')).reshape(-1, 1)
            rr = np.array(message.value.get('RR')).reshape(-1, 1)
            co = np.array(message.value.get('CO')).reshape(-1, 1)

            # Use the loaded model for prediction
            score = loaded_model.decision_function(np.concatenate((sbp, dbp), axis=1))[0]
            anomaly_value = loaded_model.predict(np.concatenate((sbp, dbp), axis=1))[0]
            # Detect anomalies based on the threshold
            detection = 1 if anomaly_value == -1 else 0
            label = "Normal" if anomaly_value == -1 else "Anormal"
            #print(sbp[0][0], dbp[0][0], detection)
            row_count = 1
             # Create InfluxDB point
            point = Point("IsolationForestv4") \
                    .field("Detection", int(detection)) \
                    .field("Label", label) \
                    .field("SBP", float(sbp[0][0])) \
                    .field("DBP", float(dbp[0][0])) \
                    .field("HR", float(hr[0][0])) \
                    .field("RR", float(rr[0][0])) \
                    .field("CO", float(co[0][0])) \
                    .field("All records", row_count) \
                    .field("AnomalyScore", score)
                   
            print(point)
            # Print the table
            from tabulate import tabulate
            table = [["SBP", "DBP", "Anomaly Score", "Anomaly"],
                             [sbp, dbp, score, detection]]
            print(tabulate(table, headers="firstrow", tablefmt="grid"))
            
            # Write the point to InfluxDB
            write_api = client.write_api(write_options=SYNCHRONOUS)
            write_api.write(bucket, organization, point)
            
            i+=1
            print("Message sent to influxDB", i)
        