In [2]:
pip install influxdb-client

Collecting influxdb-client
  Downloading influxdb_client-1.39.0-py3-none-any.whl.metadata (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.9/63.9 kB[0m [31m134.0 kB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hCollecting reactivex>=4.0.4 (from influxdb-client)
  Downloading reactivex-4.0.4-py3-none-any.whl (217 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m217.8/217.8 kB[0m [31m345.5 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Downloading influxdb_client-1.39.0-py3-none-any.whl (743 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m744.0/744.0 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: reactivex, influxdb-client
Successfully installed influxdb-client-1.39.0 reactivex-4.0.4
Note: you may need to restart the kernel to use updated packages.


In [1]:
pip install pyspark

Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m375.6 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.7
Note: you may need to restart the kernel to use updated packages.


## Real Time Data Extraction from InfluxDB

In [11]:
from influxdb_client import InfluxDBClient

# Replace with your InfluxDB details
influxdb_url = 'http://influxdb:8086'
token = 'ERKMHDv-dYn0HaOhHdTqr0eCLNm3kSXJ79aBgHqqAeAYLEDiCQk1zZYj6GBIbAFFLd-Iaeh85P4E5Y0tI5y4Vw=='
org = 'OST'
bucket = 'Epsymolo'

client = InfluxDBClient(url=influxdb_url, token=token, org=org)

query_api = client.query_api()
query = 'from(bucket: "Epsymolo")|> range(start: -1w)|> filter(fn: (r) => r["_field"] == "PowerFlowValue")|> keep(columns: ["_value", "_time", "PowerLineID"])'

# Execute the query
result = query_api.query(org=org, query=query)
for table in result:
    for record in table.records:
        print(record)
        break
    break

FluxRecord() table: 0, {'result': '_result', 'table': 0, '_time': datetime.datetime(2023, 12, 15, 0, 0, tzinfo=tzlocal()), '_value': 360.8533333333333, 'PowerLineID': '0'}


## Loading Data into Spark DataFrame

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("AnomalyDetection").getOrCreate()

# Define the schema based on the structure of your data from InfluxDB
schema = StructType([
    StructField("PowerLineID", IntegerType(), True),
    StructField("Timestamp", TimestampType(), True),
    StructField("PowerFlowValue", DoubleType(), True)
])

# Initialize an empty list to hold data points
data_for_spark = []

for table in result:
    for record in table.records:
        # Extract the values and time for each record
        power_line_id = int(record['PowerLineID'])  # Cast PowerLineID to int
        timestamp = record['_time']
        power_flow_value = record['_value']
        data_point = (power_line_id, timestamp, power_flow_value)
        data_for_spark.append(data_point)

# Create a Spark DataFrame using the data and the schema
df_real_time = spark.createDataFrame(data_for_spark, schema=schema)
df_real_time.show()

+-----------+-------------------+------------------+
|PowerLineID|          Timestamp|    PowerFlowValue|
+-----------+-------------------+------------------+
|          0|2023-12-15 00:00:00| 360.8533333333333|
|          0|2023-12-15 01:00:00|            386.47|
|          0|2023-12-15 02:00:00|            400.74|
|          0|2023-12-15 03:00:00| 401.0808333333334|
|          0|2023-12-15 04:00:00|419.02416666666664|
|          0|2023-12-15 05:00:00| 434.6133333333333|
|          0|2023-12-15 06:00:00|          387.5525|
|          0|2023-12-15 07:00:00|353.23083333333335|
|          0|2023-12-15 08:00:00| 285.8616666666666|
|          0|2023-12-15 09:00:00|272.09166666666664|
|          0|2023-12-15 10:00:00|219.03166666666667|
|          0|2023-12-15 11:00:00|          165.2675|
|          0|2023-12-15 12:00:00|145.92166666666665|
|          0|2023-12-15 13:00:00|            210.71|
|          0|2023-12-15 14:00:00| 267.6741666666667|
|          0|2023-12-15 15:00:00|263.228333333

## Preprocess the Real Time data

## Load the trained model

In [21]:
import sklearn
print(sklearn.__version__)

1.3.1


In [23]:
import joblib

# Load the model
model = joblib.load('model_1.joblib')

In [24]:
# Convert the Spark DataFrame to a Pandas DataFrame for scikit-learn
features_for_prediction = ['PowerLineID','PowerFlowValue', 'PowerChange']  # Replace with the actual features used during training
real_time_data_for_prediction = df_real_time.select(*features_for_prediction).toPandas()
# Make predictions with the scikit-learn model
predictions = model.predict(real_time_data_for_prediction)

In [27]:
# If you want to add the predictions back to the Spark DataFrame:
from pyspark.sql.types import IntegerType

# Add a new column for predictions
df_real_time = df_real_time.withColumn("Predicted_IsSignificantChange", lit(None).cast(IntegerType()))

# Convert to Pandas DataFrame to merge the predictions easily (assuming the data is small enough to fit into memory)
df_real_time_pandas = df_real_time.toPandas()

# Attach the predictions
df_real_time_pandas['Predicted_IsSignificantChange'] = predictions

# If you need to convert it back to a PySpark DataFrame
df_real_time_with_predictions = spark.createDataFrame(df_real_time_pandas)

In [29]:
df_real_time_with_predictions.show(40)

+-----------+-------------------+------------------+------------------+-------------------+-----------------------------+
|PowerLineID|          Timestamp|    PowerFlowValue|PrevPowerFlowValue|        PowerChange|Predicted_IsSignificantChange|
+-----------+-------------------+------------------+------------------+-------------------+-----------------------------+
|          0|2023-12-15 02:00:00|            400.74|            386.47| 14.269999999999982|                            1|
|          0|2023-12-15 03:00:00| 401.0808333333334|            400.74| 0.3408333333333644|                            0|
|          0|2023-12-15 04:00:00|419.02416666666664| 401.0808333333334|  17.94333333333327|                            1|
|          0|2023-12-15 05:00:00| 434.6133333333333|419.02416666666664| 15.589166666666642|                            1|
|          0|2023-12-15 06:00:00|          387.5525| 434.6133333333333| -47.06083333333328|                            1|
|          0|2023-12-15 

## Send Results to Results Topic

In [30]:
# Drop the 'PrevPowerFlowValue' column
df_real_time_pandas = df_real_time_pandas.drop(columns=['PrevPowerFlowValue'])

In [33]:
df_real_time_pandas.shape

(34652, 5)

In [32]:
from confluent_kafka import Producer
import json
import pandas as pd
import numpy as np

def delivery_report(err, msg):
    if err is not None:
        print(f"Delivery failed for message: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

def send_data_to_kafka(bootstrap_servers, topic, df):
    producer = Producer({'bootstrap.servers': bootstrap_servers})
    batch_size = 10000  # Define the batch size

    
    # Convert Timestamp to appropriate format if it's a datetime object
    if isinstance(df.iloc[0]['Timestamp'], pd.Timestamp):
        df['Timestamp'] = df['Timestamp'].apply(lambda x: int(x.timestamp()))

    # Split the DataFrame into chunks
    chunks = np.array_split(df, range(batch_size, len(df), batch_size))
    
    for chunk in chunks:
        print(f"Sending batch of {len(chunk)} rows")

        for index, row in chunk.iterrows():
            try:
                # Ensure 'PowerLineID' is a string and convert Timestamp
                row['PowerLineID'] = str(row['PowerLineID'])

                payload = json.dumps(row.to_dict())
                producer.produce(topic=topic, value=payload.encode('utf-8'), callback=delivery_report)
                producer.poll(0)

            except Exception as e:
                print(f"An error occurred: {e}")

        producer.flush()
        print(f"\nFinished sending batch of {len(chunk)} rows to Kafka topic {topic}")

if __name__ == "__main__":
    BOOTSTRAP_SERVER = 'kafka:9092'  # Replace with your actual bootstrap server
    TOPIC = 'Results'  # Replace with your actual topic

    # Assuming df_real_time_with_predictions is the DataFrame with your data
    send_data_to_kafka(BOOTSTRAP_SERVER, TOPIC, df_real_time_pandas)


Sending batch of 10000 rows
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message delivered to Results [0]
Message deliver

In [37]:
from confluent_kafka import Consumer, KafkaException, KafkaError
import json
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

# InfluxDB configuration
influxdb_url = 'http://influxdb:8086'  # Replace with your InfluxDB URL
token = 'ERKMHDv-dYn0HaOhHdTqr0eCLNm3kSXJ79aBgHqqAeAYLEDiCQk1zZYj6GBIbAFFLd-Iaeh85P4E5Y0tI5y4Vw=='  # Replace with your InfluxDB token
org = 'OST'  # Replace with your InfluxDB org
bucket = 'Epsymolo'  # Replace with your InfluxDB bucket

# Kafka configuration
kafka_conf = {
    'bootstrap.servers': 'kafka:9092',  # Replace with your Kafka bootstrap servers
    'group.id': 'my_group',  # Replace with your consumer group
    'auto.offset.reset': 'earliest',
}

# Create Kafka consumer
consumer = Consumer(kafka_conf)
# Subscribe to topic
consumer.subscribe(['Results'])  # Replace with your Kafka topic

# Create InfluxDB client
client = InfluxDBClient(url=influxdb_url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)

try:
    while True:
        msg = consumer.poll(1.0)  # Poll for messages

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}")
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # Proper message
            data_point = json.loads(msg.value().decode('utf-8'))
            #print(f"Received message: {data_point}")

            # Convert timestamp to nanoseconds to match InfluxDB precision
            data_point['Timestamp'] = int(float(data_point['Timestamp']) * 1e9)

            # Create a Point structure for InfluxDB
            point = Point("Results").tag("PowerLineID", data_point["PowerLineID"]) \
                                             .field("PowerFlowValue", data_point["PowerFlowValue"]) \
                                             .field("PowerChange", data_point["PowerChange"]) \
                                             .field("Predicted_IsSignificantChange", data_point["Predicted_IsSignificantChange"]) \
                                             .time(data_point['Timestamp'], WritePrecision.NS)

            print(f"Writing the following point to InfluxDB: {point.to_line_protocol()}")
            write_api.write(bucket=bucket, org=org, record=point)

except KeyboardInterrupt:
    pass
except Exception as e:
    print(f"Exception in consumption: {e}")
finally:
    # Close down consumer and InfluxDB client to commit final offsets.
    consumer.close()
    client.close()

Received message: {'PowerLineID': '0.0', 'Timestamp': 1702616400.0, 'PowerFlowValue': 434.6133333333333, 'PowerChange': 15.589166666666642, 'Predicted_IsSignificantChange': 1.0}
Writing the following point to InfluxDB: Results,PowerLineID=0.0 PowerChange=15.589166666666642,PowerFlowValue=434.6133333333333,Predicted_IsSignificantChange=1 1702616400000000000
Received message: {'PowerLineID': '0.0', 'Timestamp': 1702620000.0, 'PowerFlowValue': 387.5525, 'PowerChange': -47.06083333333328, 'Predicted_IsSignificantChange': 1.0}
Writing the following point to InfluxDB: Results,PowerLineID=0.0 PowerChange=-47.06083333333328,PowerFlowValue=387.5525,Predicted_IsSignificantChange=1 1702620000000000000
Received message: {'PowerLineID': '0.0', 'Timestamp': 1702623600.0, 'PowerFlowValue': 353.23083333333335, 'PowerChange': -34.32166666666666, 'Predicted_IsSignificantChange': 1.0}
Writing the following point to InfluxDB: Results,PowerLineID=0.0 PowerChange=-34.32166666666666,PowerFlowValue=353.230833