In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install pyspark



In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Pricing and Load data") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

In [None]:
combined_data = spark.read.option("header", True).csv("/content/drive/Shareddrives/DATS6450-Project4/combined_data/*.csv")

In [None]:
from pyspark.sql.functions import month, to_timestamp

combined_data = combined_data.withColumn("timestamp", to_timestamp("timestamp", "MM/dd/yyyy HH:mm:ss"))

streaming_data = combined_data.filter(month("timestamp").isin([3, 6, 9, 12]))
training_data = combined_data.filter(~month("timestamp").isin([3, 6, 9, 12]))

print("Streaming Data Sample")
streaming_data.show(5)

print("Training Data Sample")
training_data.show(5)


Streaming Data Sample
+-------------------+------+-----+-----+------+----------+---------+----+
|          timestamp|  Name| PTID| LBMP|Losses|Congestion|Time Zone|Load|
+-------------------+------+-----+-----+------+----------+---------+----+
|2001-06-01 00:00:00|CENTRL|61754|22.62| -0.41|      0.00|      EDT|1498|
|2001-06-01 00:00:00|GENESE|61753|21.99| -1.04|      0.00|      EDT| 841|
|2001-06-01 00:00:00|MHK VL|61756|23.27|  0.24|      0.00|      EDT| 648|
|2001-06-01 00:00:00| NORTH|61755|23.01| -0.02|      0.00|      EDT| 661|
|2001-06-01 00:00:53|CAPITL|61757|16.44|  0.49|      0.00|      EDT| 965|
+-------------------+------+-----+-----+------+----------+---------+----+
only showing top 5 rows

Training Data Sample
+-------------------+------+-----+-----+------+----------+---------+----+
|          timestamp|  Name| PTID| LBMP|Losses|Congestion|Time Zone|Load|
+-------------------+------+-----+-----+------+----------+---------+----+
|2001-05-26 00:00:00|CAPITL|61757|20.52|  1.

In [None]:
from pyspark.sql.functions import col

# Convert columns to integer (rounded if necessary)
streaming_data = streaming_data.withColumn("Losses", col("Losses").cast("int")) \
                               .withColumn("Congestion", col("Congestion").cast("int")) \
                               .withColumn("Load", col("Load").cast("int"))



In [None]:
pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.2.2-py2.py3-none-any.whl.metadata (10.0 kB)
Downloading kafka_python-2.2.2-py2.py3-none-any.whl (307 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m307.3/307.3 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.2.2


## Question 1: Random forest Comsumer code

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row, Window
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import PipelineModel
from kafka import KafkaConsumer
from collections import defaultdict
import time, json, math


def calculate_rmse(actuals, predictions):
    if not actuals or not predictions or len(actuals) != len(predictions):
        return float('nan')
    squared_errors = [(a - p)**2 for a, p in zip(actuals, predictions)]
    return math.sqrt(sum(squared_errors) / len(squared_errors))

# Kafka Configuration
TOPIC_NAME = "test_streaming"
SASL_MECHANISM = 'SCRAM-SHA-256'
bootstrap_servers = "kafka-28c8ad04-rachanak0611-3f37.j.aivencloud.com:19404"
kafka_password = "AVNS_BpO_fc9_Co4g5b3_ehW"

# Kafka Consumer Setup
consumer = KafkaConsumer(
    TOPIC_NAME,
    auto_offset_reset="earliest",
    bootstrap_servers=bootstrap_servers,
    client_id="CONSUMER_CLIENT_ID",
    group_id="CONSUMER_GROUP_ID",
    sasl_mechanism=SASL_MECHANISM,
    sasl_plain_username="avnadmin",
    sasl_plain_password=kafka_password,
    security_protocol="SASL_SSL",
    ssl_cafile="/content/drive/Shareddrives/DATS6450-Project4/ca.pem",
    max_poll_interval_ms=300000,
    heartbeat_interval_ms=3000,
    session_timeout_ms=10000,
    request_timeout_ms=20000
)

# Spark session
spark = SparkSession.builder.appName("LBMPStreamingModel").getOrCreate()
model = PipelineModel.load("/content/drive/Shareddrives/DATS6450-Project4/rf_lbmp_model_spark")

# Settings
MIN_ROWS_TO_PROCESS = 100
lag_periods = 3
rolling_window_span = 5
buffer_max_size = 10000

# Tracking
global_buffer = []
actuals = []
predictions = []
start_time = time.time()

def process_global_data(data_chunk):
    try:
        df = spark.createDataFrame([Row(**row) for row in data_chunk])

        df = df.withColumn("timestamp", F.to_timestamp("timestamp")) \
               .withColumn("LBMP", F.col("LBMP").cast("float")) \
               .withColumn("Load", F.col("Load").cast("float")) \
               .withColumn("Losses", F.col("Losses").cast("float")) \
               .withColumn("Congestion", F.col("Congestion").cast("float"))

        # Sequential window with row indices
        df = df.withColumn("row_idx", F.monotonically_increasing_id())
        w = Window.orderBy("row_idx")

        for col_name in ["Load", "Losses", "Congestion"]:
            for lag_num in range(1, lag_periods + 1):
                df = df.withColumn(f"{col_name}_lag_{lag_num}", F.lag(col_name, lag_num).over(w))

        rolling_window = w.rowsBetween(-(rolling_window_span - 1), 0)
        for col_name in ["Load", "Losses", "Congestion"]:
            df = df.withColumn(f"{col_name}_ewma", F.avg(col_name).over(rolling_window))

        df = df.withColumn("day_of_week", F.dayofweek("timestamp")) \
               .withColumn("hour", F.hour("timestamp")) \
               .dropna()

        if df.count() == 0:
            print("No valid rows after processing global buffer")
            return

        feature_cols = [c for c in df.columns if c not in ["timestamp", "Name", "PTID", "Time Zone", "LBMP", "row_idx"]]
        prediction_df = df.select(*feature_cols)

        predictions_df = model.transform(prediction_df).select("prediction")
        predicted_values = [row["prediction"] for row in predictions_df.collect()]
        actual_values = [row["LBMP"] for row in df.select("LBMP").collect()]

        for act, pred in zip(actual_values, predicted_values):
            actuals.append(act)
            predictions.append(pred)
            print(f"[{time.time()-start_time:.1f}s] Actual: {act:.2f}, Predicted: {pred:.2f}")

        if len(actuals) % 100 == 0:
            rmse = calculate_rmse(actuals, predictions)
            print(f"\n RMSE after {len(actuals)} predictions: {rmse:.4f}\n")

    except Exception as e:
        print(f"Error processing global data: {str(e)}")

try:
    while True:
        records = consumer.poll(timeout_ms=1000)

        for _, messages in records.items():
            for message in messages:
                try:
                    data = json.loads(message.value.decode('utf-8'))
                    global_buffer.append(data)
                    print(f"Buffered row (total: {len(global_buffer)} rows)")

                    if len(global_buffer) >= MIN_ROWS_TO_PROCESS:
                      data_chunk = global_buffer[:MIN_ROWS_TO_PROCESS]
                      process_global_data(data_chunk)
                      global_buffer = global_buffer[MIN_ROWS_TO_PROCESS:]

                except Exception as e:
                    print(f"Error processing message: {str(e)}")
                    continue

        if len(global_buffer) > buffer_max_size:
            print(f"Trimming global buffer (size: {len(global_buffer)})")
            global_buffer = global_buffer[-buffer_max_size:]

        time.sleep(0.1)

except KeyboardInterrupt:
    print("\nShutting down...")
    print(f"Unprocessed rows remaining: {len(global_buffer)}")
    if len(actuals) > 0:
        rmse = calculate_rmse(actuals, predictions)
        print(f"Final RMSE: {rmse:.4f} from {len(actuals)} predictions")

finally:
    consumer.close()


ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to node 2 failed: NodeNotReadyError: 2
ERROR:kafka.consumer.fetcher:Fetch to 

Buffered row (total: 1 rows)
Buffered row (total: 2 rows)
Buffered row (total: 3 rows)
Buffered row (total: 4 rows)
Buffered row (total: 5 rows)
Buffered row (total: 6 rows)
Buffered row (total: 7 rows)
Buffered row (total: 8 rows)
Buffered row (total: 9 rows)
Buffered row (total: 10 rows)
Buffered row (total: 11 rows)
Buffered row (total: 12 rows)
Buffered row (total: 13 rows)
Buffered row (total: 14 rows)
Buffered row (total: 15 rows)
Buffered row (total: 16 rows)
Buffered row (total: 17 rows)
Buffered row (total: 18 rows)
Buffered row (total: 19 rows)
Buffered row (total: 20 rows)
Buffered row (total: 21 rows)
Buffered row (total: 22 rows)
Buffered row (total: 23 rows)
Buffered row (total: 24 rows)
Buffered row (total: 25 rows)
Buffered row (total: 26 rows)
Buffered row (total: 27 rows)
Buffered row (total: 28 rows)
Buffered row (total: 29 rows)
Buffered row (total: 30 rows)
Buffered row (total: 31 rows)
Buffered row (total: 32 rows)
Buffered row (total: 33 rows)
Buffered row (total



The random foresy model seems to be overfitted hence, lets work on a different model which is Linear regression

##Question 1: Consumer with Linear regression model

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row, Window
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import PipelineModel
from kafka import KafkaConsumer
from collections import defaultdict
import time, json, math


def calculate_rmse(actuals, predictions):
    if not actuals or not predictions or len(actuals) != len(predictions):
        return float('nan')
    squared_errors = [(a - p)**2 for a, p in zip(actuals, predictions)]
    return math.sqrt(sum(squared_errors) / len(squared_errors))

# Kafka Configuration
TOPIC_NAME = "test_streaming"
SASL_MECHANISM = 'SCRAM-SHA-256'
bootstrap_servers = "kafka-28c8ad04-rachanak0611-3f37.j.aivencloud.com:19404"
kafka_password = "AVNS_BpO_fc9_Co4g5b3_ehW"

# Kafka Consumer Setup
consumer = KafkaConsumer(
    TOPIC_NAME,
    auto_offset_reset="earliest",
    bootstrap_servers=bootstrap_servers,
    client_id="CONSUMER_CLIENT_ID",
    group_id="CONSUMER_GROUP_ID",
    sasl_mechanism=SASL_MECHANISM,
    sasl_plain_username="avnadmin",
    sasl_plain_password=kafka_password,
    security_protocol="SASL_SSL",
    ssl_cafile="/content/drive/Shareddrives/DATS6450-Project4/ca.pem",
    max_poll_interval_ms=300000,
    heartbeat_interval_ms=3000,
    session_timeout_ms=10000,
    request_timeout_ms=20000
)

last_processed_index = 0


# Spark session
spark = SparkSession.builder.appName("LBMPStreamingModel").getOrCreate()
model = PipelineModel.load("/content/drive/Shareddrives/DATS6450-Project4/lr_lbmp_model_spark")

# Settings
MIN_ROWS_TO_PROCESS = 100
lag_periods = 3
rolling_window_span = 5
buffer_max_size = 10000

# Tracking
global_buffer = []
actuals = []
predictions = []
start_time = time.time()

def process_global_data(data_chunk):
    try:
        df = spark.createDataFrame([Row(**row) for row in data_chunk])

        df = df.withColumn("timestamp", F.to_timestamp("timestamp")) \
               .withColumn("LBMP", F.col("LBMP").cast("float")) \
               .withColumn("Load", F.col("Load").cast("float")) \
               .withColumn("Losses", F.col("Losses").cast("float")) \
               .withColumn("Congestion", F.col("Congestion").cast("float"))

        # Sequential window with row indices
        df = df.withColumn("row_idx", F.monotonically_increasing_id())
        w = Window.orderBy("row_idx")

        for col_name in ["Load", "Losses", "Congestion"]:
            for lag_num in range(1, lag_periods + 1):
                df = df.withColumn(f"{col_name}_lag_{lag_num}", F.lag(col_name, lag_num).over(w))

        rolling_window = w.rowsBetween(-(rolling_window_span - 1), 0)
        for col_name in ["Load", "Losses", "Congestion"]:
            df = df.withColumn(f"{col_name}_ewma", F.avg(col_name).over(rolling_window))

        df = df.withColumn("day_of_week", F.dayofweek("timestamp")) \
               .withColumn("hour", F.hour("timestamp")) \
               .dropna()

        if df.count() == 0:
            print("No valid rows after processing global buffer")
            return

        latest_row = df.orderBy("timestamp", ascending=False).limit(1)
        lbmp_value = latest_row.select("LBMP").collect()[0]["LBMP"]
        feature_cols = [c for c in latest_row.columns if c not in ["timestamp", "Name", "PTID", "Time Zone", "LBMP", "row_idx"]]
        prediction_df = latest_row.select(*feature_cols)

        prediction = model.transform(prediction_df)
        predicted = prediction.select("prediction").collect()[0]["prediction"]

        actuals.append(lbmp_value)
        predictions.append(predicted)

        print(f"[{time.time()-start_time:.1f}s] Actual: {lbmp_value:.2f}, Predicted: {predicted:.2f}")

        if len(actuals) % 10 == 0:
            rmse = calculate_rmse(actuals, predictions)
            print(f"Current RMSE: {rmse:.4f}")

    except Exception as e:
        print(f"Error processing global data: {str(e)}")

try:
    while True:
        records = consumer.poll(timeout_ms=1000)

        for _, messages in records.items():
            for message in messages:
                try:
                    data = json.loads(message.value.decode('utf-8'))
                    global_buffer.append(data)
                    print(f"Buffered row (total: {len(global_buffer)} rows)")

                    if len(global_buffer) >= MIN_ROWS_TO_PROCESS:
                        #data_chunk = global_buffer[:MIN_ROWS_TO_PROCESS]
                        process_global_data(global_buffer)

                except Exception as e:
                    print(f"Error processing message: {str(e)}")
                    continue

                if len(global_buffer) >= MIN_ROWS_TO_PROCESS:
                    data_chunk = global_buffer[last_processed_index:]
                    if data_chunk:
                        process_global_data(data_chunk)
                        last_processed_index = len(global_buffer)


        time.sleep(0.1)

except KeyboardInterrupt:
    print("\nShutting down...")
    print(f"Unprocessed rows remaining: {len(global_buffer)}")
    if len(actuals) > 0:
        rmse = calculate_rmse(actuals, predictions)
        print(f"Final RMSE: {rmse:.4f} from {len(actuals)} predictions")

finally:
    consumer.close()


Buffered row (total: 1 rows)
Buffered row (total: 2 rows)
Buffered row (total: 3 rows)
Buffered row (total: 4 rows)
Buffered row (total: 5 rows)
Buffered row (total: 6 rows)
Buffered row (total: 7 rows)
Buffered row (total: 8 rows)
Buffered row (total: 9 rows)
Buffered row (total: 10 rows)
Buffered row (total: 11 rows)
Buffered row (total: 12 rows)
Buffered row (total: 13 rows)
Buffered row (total: 14 rows)
Buffered row (total: 15 rows)
Buffered row (total: 16 rows)
Buffered row (total: 17 rows)
Buffered row (total: 18 rows)
Buffered row (total: 19 rows)
Buffered row (total: 20 rows)
Buffered row (total: 21 rows)
Buffered row (total: 22 rows)
Buffered row (total: 23 rows)
Buffered row (total: 24 rows)
Buffered row (total: 25 rows)
Buffered row (total: 26 rows)
Buffered row (total: 27 rows)
Buffered row (total: 28 rows)
Buffered row (total: 29 rows)
Buffered row (total: 30 rows)
Buffered row (total: 31 rows)
Buffered row (total: 32 rows)
Buffered row (total: 33 rows)
Buffered row (total

ERROR:kafka.coordinator:Heartbeat thread for group CONSUMER_GROUP_ID failed due to unexpected error: cannot release un-acquired lock


No valid rows after processing global buffer
Buffered row (total: 574 rows)
[988.9s] Actual: 24.18, Predicted: 28.35
No valid rows after processing global buffer
Buffered row (total: 575 rows)
[990.6s] Actual: 24.18, Predicted: 28.35
No valid rows after processing global buffer
Buffered row (total: 576 rows)
[992.2s] Actual: 24.18, Predicted: 28.35
No valid rows after processing global buffer
Buffered row (total: 577 rows)
[994.1s] Actual: 24.18, Predicted: 28.35
No valid rows after processing global buffer
Buffered row (total: 578 rows)
[996.9s] Actual: 26.40, Predicted: 41.22
Current RMSE: 9.1043
No valid rows after processing global buffer
Buffered row (total: 579 rows)
[998.9s] Actual: 26.40, Predicted: 41.22
No valid rows after processing global buffer
Buffered row (total: 580 rows)
[1000.5s] Actual: 25.87, Predicted: 35.29
No valid rows after processing global buffer
Buffered row (total: 581 rows)
[1002.1s] Actual: 25.87, Predicted: 35.29
No valid rows after processing global buf

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt



Shutting down...
Unprocessed rows remaining: 686
Final RMSE: 9.0013 from 587 predictions


As observed linear regression performs better compared to random forest and the RMSE also is far better for linear regression model

##Train ARIMA model

## Question 2: Consumer code for ARIMA model

In [None]:
from kafka import KafkaConsumer
import json
import pandas as pd
import joblib
from statsmodels.tsa.arima.model import ARIMAResults
from sklearn.preprocessing import StandardScaler
import numpy as np

# Load ARIMA model
model_fit = ARIMAResults.load("/content/drive/My Drive/DATS6450-Project4/arima_model.pkl")

# Load the scaler
scaler = joblib.load("/content/drive/My Drive/DATS6450-Project4/arima_scaler.pkl")

# Kafka setup
TOPIC_NAME = "streaming_11"
SASL_MECHANISM = 'SCRAM-SHA-256'
bootstrap_servers = f"kafka-28c8ad04-rachanak0611-3f37.j.aivencloud.com:19404"
kafka_password = "AVNS_BpO_fc9_Co4g5b3_ehW"

consumer = KafkaConsumer(
    TOPIC_NAME,
    auto_offset_reset="earliest",
    bootstrap_servers=bootstrap_servers,
    client_id="CONSUMER_CLIENT_ID",
    group_id="CONSUMER_GROUP_ID",
    sasl_mechanism=SASL_MECHANISM,
    sasl_plain_username="avnadmin",
    sasl_plain_password=kafka_password,
    security_protocol="SASL_SSL",
    ssl_cafile="/content/drive/My Drive/DATS6450-Project4/ca.pem"
)

lbmp_series = []

print("Listening to Kafka stream...")

for message in consumer:
    message_str = message.value.decode('utf-8')

    try:
        data = json.loads(message_str)
    except json.JSONDecodeError:
        print(f"Failed to decode message: {message_str}")
        continue

    try:
        lbmp_val = pd.to_numeric(data["LBMP"], errors='coerce')

        if pd.isna(lbmp_val):
            continue

        try:
            lbmp_scaled = scaler.transform([[lbmp_val]])[0][0]
        except ValueError:

            print("Scaler not fitted. Fitting with available data...")
            scaler.fit([[lbmp_val]])
            joblib.dump(scaler, "/content/drive/My Drive/DATS6450-Project4/arima_scaler.pkl")  # Save the fitted scaler
            lbmp_scaled = lbmp_val
            print("Scaler refitted and saved.")

        lbmp_series.append(lbmp_scaled)
        print(f"Scaled LBMP value: {lbmp_scaled}")

    except KeyError:
        print(f"Key 'LBMP' not found in the message: {data}")
        continue
    except Exception as e:
        print(f"Error processing LBMP value: {e}")
        continue

    # Process the LBMP series once enough data is collected
    if len(lbmp_series) >= 10:
        series = pd.Series(lbmp_series)

        # Make prediction
        pred = model_fit.predict(start=0, end=len(series) - 1)
        residual = series.iloc[-1] - pred.iloc[-1]

        # Calculate MAD (Median Absolute Deviation) for dynamic thresholding
        median_residual = np.median(series - pred)
        mad = np.median(np.abs((series - pred) - median_residual))

        # Set dynamic thresholds based on MAD
        anomaly_threshold_upper = median_residual + 3 * mad
        anomaly_threshold_lower = median_residual - 3 * mad

        # Determine if anomaly based on residual and dynamic thresholds
        is_anomaly = int((residual > anomaly_threshold_upper) or (residual < anomaly_threshold_lower))

        # Output the results
        print(f"Time: {data.get('timestamp', 'N/A')}, LBMP: {lbmp_val:.2f}, Scaled: {lbmp_scaled:.2f}, Residual: {residual:.2f}, Anomaly: {is_anomaly}")
    else:
        print(f"Waiting for more data... ({len(lbmp_series)} collected)")


Listening to Kafka stream...


ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=3 host=34.95.23.216:19404 <connecting> [IPv4 ('34.95.23.216', 19404)]>: Connect attempt returned error 110. Disconnecting.
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=3 host=34.95.23.216:19404 <connecting> [IPv4 ('34.95.23.216', 19404)]>: Closing connection. KafkaConnectionError: 110 ETIMEDOUT
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=2 host=35.203.26.124:19404 <connecting> [IPv4 ('35.203.26.124', 19404)]>: Connect attempt returned error 110. Disconnecting.
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=2 host=35.203.26.124:19404 <connecting> [IPv4 ('35.203.26.124', 19404)]>: Closing connection. KafkaConnectionError: 110 ETIMEDOUT
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=1 host=34.118.173.179:19404 <connecting> [IPv4 ('34.118.173.179', 19404)]>: Connect attempt returned error 110. Disconnecting.
ER

Scaled LBMP value: 11.100000000000001
Waiting for more data... (1 collected)
Scaled LBMP value: 10.66
Waiting for more data... (2 collected)
Scaled LBMP value: 9.3
Waiting for more data... (3 collected)
Scaled LBMP value: 7.420000000000002
Waiting for more data... (4 collected)
Scaled LBMP value: 11.120000000000001
Waiting for more data... (5 collected)
Scaled LBMP value: 11.830000000000002
Waiting for more data... (6 collected)
Scaled LBMP value: 8.95
Waiting for more data... (7 collected)
Scaled LBMP value: 11.379999999999999
Waiting for more data... (8 collected)
Scaled LBMP value: 12.600000000000001
Waiting for more data... (9 collected)
Scaled LBMP value: 11.61
Time: 2001-06-02T20:32:32Z, LBMP: 24.47, Scaled: 11.61, Residual: 11.86, Anomaly: 0
Scaled LBMP value: 11.129999999999999
Time: 2001-06-02T20:32:32Z, LBMP: 23.99, Scaled: 11.13, Residual: 11.39, Anomaly: 0
Scaled LBMP value: 9.010000000000002
Time: 2001-06-02T20:32:32Z, LBMP: 21.87, Scaled: 9.01, Residual: 9.57, Anomaly: 0


ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=3 host=34.95.23.216:19404 <connecting> [IPv4 ('34.95.23.216', 19404)]>: Connect attempt returned error 110. Disconnecting.
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=3 host=34.95.23.216:19404 <connecting> [IPv4 ('34.95.23.216', 19404)]>: Closing connection. KafkaConnectionError: 110 ETIMEDOUT


Scaled LBMP value: -13.82
Time: 2001-06-03T00:31:32Z, LBMP: -0.96, Scaled: -13.82, Residual: -13.52, Anomaly: 1
Scaled LBMP value: -13.83
Time: 2001-06-03T00:31:32Z, LBMP: -0.97, Scaled: -13.83, Residual: -13.53, Anomaly: 1
Scaled LBMP value: -13.82
Time: 2001-06-03T00:31:32Z, LBMP: -0.96, Scaled: -13.82, Residual: -13.52, Anomaly: 1
Scaled LBMP value: -13.799999999999999
Time: 2001-06-03T00:31:32Z, LBMP: -0.94, Scaled: -13.80, Residual: -13.47, Anomaly: 1
Scaled LBMP value: -13.809999999999999
Time: 2001-06-03T00:31:32Z, LBMP: -0.95, Scaled: -13.81, Residual: -13.45, Anomaly: 1
Scaled LBMP value: -13.719999999999999
Time: 2001-06-03T00:31:32Z, LBMP: -0.86, Scaled: -13.72, Residual: -13.34, Anomaly: 1
Scaled LBMP value: 8.990000000000002
Time: 2001-06-03T00:36:32Z, LBMP: 21.85, Scaled: 8.99, Residual: 9.38, Anomaly: 0
Scaled LBMP value: 8.060000000000002
Time: 2001-06-03T00:36:32Z, LBMP: 20.92, Scaled: 8.06, Residual: 8.46, Anomaly: 0
Scaled LBMP value: 6.580000000000002
Time: 2001-06-

ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=1 host=34.118.173.179:19404 <connecting> [IPv4 ('34.118.173.179', 19404)]>: Connect attempt returned error 110. Disconnecting.
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=1 host=34.118.173.179:19404 <connecting> [IPv4 ('34.118.173.179', 19404)]>: Closing connection. KafkaConnectionError: 110 ETIMEDOUT
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=2 host=35.203.26.124:19404 <connecting> [IPv4 ('35.203.26.124', 19404)]>: Connect attempt returned error 110. Disconnecting.
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=2 host=35.203.26.124:19404 <connecting> [IPv4 ('35.203.26.124', 19404)]>: Closing connection. KafkaConnectionError: 110 ETIMEDOUT


Scaled LBMP value: 22.65
Time: 2001-06-03T01:45:32Z, LBMP: 35.51, Scaled: 22.65, Residual: 22.95, Anomaly: 0
Scaled LBMP value: 26.630000000000003
Time: 2001-06-03T01:50:32Z, LBMP: 39.49, Scaled: 26.63, Residual: 26.93, Anomaly: 0
Scaled LBMP value: 23.560000000000002
Time: 2001-06-03T01:50:32Z, LBMP: 36.42, Scaled: 23.56, Residual: 23.87, Anomaly: 0
Scaled LBMP value: 26.07
Time: 2001-06-03T01:50:32Z, LBMP: 38.93, Scaled: 26.07, Residual: 26.38, Anomaly: 0
Scaled LBMP value: 23.200000000000003
Time: 2001-06-03T01:50:32Z, LBMP: 36.06, Scaled: 23.20, Residual: 23.52, Anomaly: 0
Scaled LBMP value: 3.379999999999999
Time: 2001-06-03T01:55:02Z, LBMP: 16.24, Scaled: 3.38, Residual: 3.70, Anomaly: 0
Scaled LBMP value: 4.539999999999999
Time: 2001-06-03T01:55:02Z, LBMP: 17.40, Scaled: 4.54, Residual: 4.86, Anomaly: 0
Scaled LBMP value: 4.460000000000001
Time: 2001-06-03T01:55:02Z, LBMP: 17.32, Scaled: 4.46, Residual: 4.78, Anomaly: 0
Scaled LBMP value: 4.93
Time: 2001-06-03T02:00:00Z, LBMP: 1

ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=3 host=34.95.23.216:19404 <connecting> [IPv4 ('34.95.23.216', 19404)]>: Connect attempt returned error 110. Disconnecting.
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=3 host=34.95.23.216:19404 <connecting> [IPv4 ('34.95.23.216', 19404)]>: Closing connection. KafkaConnectionError: 110 ETIMEDOUT


Scaled LBMP value: -13.879999999999999
Time: 2001-06-03T02:41:32Z, LBMP: -1.02, Scaled: -13.88, Residual: -12.48, Anomaly: 1
Scaled LBMP value: -13.93
Time: 2001-06-03T02:41:32Z, LBMP: -1.07, Scaled: -13.93, Residual: -12.46, Anomaly: 1
Scaled LBMP value: -13.94
Time: 2001-06-03T02:41:32Z, LBMP: -1.08, Scaled: -13.94, Residual: -12.44, Anomaly: 1
Scaled LBMP value: -13.91
Time: 2001-06-03T02:41:32Z, LBMP: -1.05, Scaled: -13.91, Residual: -12.40, Anomaly: 1
Scaled LBMP value: -13.959999999999999
Time: 2001-06-03T02:46:32Z, LBMP: -1.10, Scaled: -13.96, Residual: -12.45, Anomaly: 1
Scaled LBMP value: -13.889999999999999
Time: 2001-06-03T02:46:32Z, LBMP: -1.03, Scaled: -13.89, Residual: -12.62, Anomaly: 1
Scaled LBMP value: -13.959999999999999
Time: 2001-06-03T02:46:32Z, LBMP: -1.10, Scaled: -13.96, Residual: -12.97, Anomaly: 1
Scaled LBMP value: -13.879999999999999
Time: 2001-06-03T02:46:32Z, LBMP: -1.02, Scaled: -13.88, Residual: -13.09, Anomaly: 1
Scaled LBMP value: -13.95
Time: 2001-06

ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=1 host=34.118.173.179:19404 <connecting> [IPv4 ('34.118.173.179', 19404)]>: Connect attempt returned error 110. Disconnecting.
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=1 host=34.118.173.179:19404 <connecting> [IPv4 ('34.118.173.179', 19404)]>: Closing connection. KafkaConnectionError: 110 ETIMEDOUT
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=2 host=35.203.26.124:19404 <connecting> [IPv4 ('35.203.26.124', 19404)]>: Connect attempt returned error 110. Disconnecting.
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=2 host=35.203.26.124:19404 <connecting> [IPv4 ('35.203.26.124', 19404)]>: Closing connection. KafkaConnectionError: 110 ETIMEDOUT


Scaled LBMP value: -13.879999999999999
Time: 2001-06-03T03:40:32Z, LBMP: -1.02, Scaled: -13.88, Residual: -13.53, Anomaly: 1
Scaled LBMP value: -13.94
Time: 2001-06-03T03:40:32Z, LBMP: -1.08, Scaled: -13.94, Residual: -13.59, Anomaly: 1
Scaled LBMP value: -13.879999999999999
Time: 2001-06-03T03:40:32Z, LBMP: -1.02, Scaled: -13.88, Residual: -13.54, Anomaly: 1
Scaled LBMP value: -13.879999999999999
Time: 2001-06-03T03:44:32Z, LBMP: -1.02, Scaled: -13.88, Residual: -13.54, Anomaly: 0
Scaled LBMP value: -13.91
Time: 2001-06-03T03:44:32Z, LBMP: -1.05, Scaled: -13.91, Residual: -13.57, Anomaly: 0
Scaled LBMP value: -13.95
Time: 2001-06-03T03:49:32Z, LBMP: -1.09, Scaled: -13.95, Residual: -13.61, Anomaly: 0
Scaled LBMP value: -13.879999999999999
Time: 2001-06-03T03:49:32Z, LBMP: -1.02, Scaled: -13.88, Residual: -13.54, Anomaly: 0
Scaled LBMP value: -13.93
Time: 2001-06-03T03:49:32Z, LBMP: -1.07, Scaled: -13.93, Residual: -13.60, Anomaly: 0
Scaled LBMP value: -13.879999999999999
Time: 2001-06

ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=3 host=34.95.23.216:19404 <connecting> [IPv4 ('34.95.23.216', 19404)]>: Connect attempt returned error 110. Disconnecting.
ERROR:kafka.conn:<BrokerConnection client_id=CONSUMER_CLIENT_ID, node_id=3 host=34.95.23.216:19404 <connecting> [IPv4 ('34.95.23.216', 19404)]>: Closing connection. KafkaConnectionError: 110 ETIMEDOUT


Scaled LBMP value: 9.3
Time: 2001-06-03T04:34:08Z, LBMP: 22.16, Scaled: 9.30, Residual: 9.78, Anomaly: 0
Scaled LBMP value: 7.670000000000002
Time: 2001-06-03T04:34:08Z, LBMP: 20.53, Scaled: 7.67, Residual: 8.15, Anomaly: 0
Scaled LBMP value: 8.650000000000002
Time: 2001-06-03T04:34:08Z, LBMP: 21.51, Scaled: 8.65, Residual: 9.11, Anomaly: 0
Scaled LBMP value: 8.719999999999999
Time: 2001-06-03T04:34:08Z, LBMP: 21.58, Scaled: 8.72, Residual: 9.17, Anomaly: 0
Scaled LBMP value: 8.41
Time: 2001-06-03T04:34:08Z, LBMP: 21.27, Scaled: 8.41, Residual: 8.85, Anomaly: 0
Scaled LBMP value: 7.100000000000001
Time: 2001-06-03T04:34:08Z, LBMP: 19.96, Scaled: 7.10, Residual: 7.53, Anomaly: 0
Scaled LBMP value: 9.170000000000002
Time: 2001-06-03T04:39:02Z, LBMP: 22.03, Scaled: 9.17, Residual: 9.59, Anomaly: 0
Scaled LBMP value: 7.57
Time: 2001-06-03T04:39:02Z, LBMP: 20.43, Scaled: 7.57, Residual: 7.99, Anomaly: 0
Scaled LBMP value: 8.830000000000002
Time: 2001-06-03T04:39:02Z, LBMP: 21.69, Scaled: 8.

KeyboardInterrupt: 

As observed from the results the anomaly though at certain areas not correct does correctly predict the too obvious ones which is important