In [None]:
import pandas as pd
import numpy as np

# ML libraries
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.impute import SimpleImputer
from imblearn.over_sampling import SMOTE
from sklearn.ensemble import IsolationForest

# Model and evaluation
from xgboost import XGBClassifier
from sklearn.metrics import classification_report, accuracy_score

In [2]:
# Replace this with the path or method to load your actual dataset
df = pd.read_csv("../Dataset/preprocessed_trendedpointalarm.csv")

# Quick look at the data
print("Initial DataFrame shape:", df.shape)
df.head()


Initial DataFrame shape: (102319, 15)


Unnamed: 0,DateTime,ProcessID,AssetID,AlarmSeverityName,State,TransactionMessage,Stage,AlarmClassName,Year,Month,Day,DayOfWeek,Season,Hour,ProcessedMessage
0,2/1/2018 22:45,IBMS/201801024100783,1-JK1-JK1-01-E.02-AC-ACON-VAVU-0047,3 - Low,A2N,VAV-J09-01-029 SPACE TEMP ALARM,Cancelled,General-ELV,2018,1,2,Tuesday,Winter,22,vavj0901029 space temp alarm
1,3/1/2018 1:41,IBMS/201801024101029,1-JK1-JK1-00-C.27-AC-ACON-VAVU-0019,3 - Low,A2N,VAV-J09-00-027 SPACE TEMP ALARM,Cancelled,General-ELV,2018,1,3,Wednesday,Winter,1,vavj0900027 space temp alarm
2,3/1/2018 3:08,IBMS/201801034101175,1-JK1-JK1-00-C.27-AC-ACON-VAVU-0019,3 - Low,A2N,VAV-J09-00-027 SPACE TEMP ALARM,Cancelled,General-ELV,2018,1,3,Wednesday,Winter,3,vavj0900027 space temp alarm
3,3/1/2018 7:12,IBMS/201801034101667,1-JK1-JK1-00-D.01-AC-ACON-VAVU-0021,3 - Low,A2N,VAV-J09-00-029 SPACE TEMP ALARM,Cancelled,General-ELV,2018,1,3,Wednesday,Winter,7,vavj0900029 space temp alarm
4,3/1/2018 9:04,IBMS/201801034102043,0-JK1-JK1-B2-1.01-AC-ACON-MAHU-0003,2 - Medium,A2N,MAU-JK1-B1-001 Dis Air Temp,Cancelled,General-ELV,2018,1,3,Wednesday,Winter,9,maujk1b1001 dis air temp


In [3]:
# 3.1 Convert DateTime to datetime type
df['DateTime'] = pd.to_datetime(df['DateTime'], errors='coerce')

# 3.2 Drop unnecessary columns (example columns to drop—modify as needed)
cols_to_drop = ['ProcessID', 'TransactionMessage', 'ProcessedMessage']
df.drop(columns=cols_to_drop, inplace=True, errors='ignore')

# 3.3 Check for missing values
print("Missing values before imputation:")
print(df.isnull().sum())


Missing values before imputation:
DateTime             63331
AssetID                271
AlarmSeverityName        0
State                    0
Stage                 2984
AlarmClassName           0
Year                     0
Month                    0
Day                      0
DayOfWeek                0
Season                   0
Hour                     0
dtype: int64


In [4]:
# 4.1 Target variable
target_col = 'AlarmSeverityName'

# 4.2 Features
feature_cols = [col for col in df.columns if col not in [target_col, 'DateTime']]

X = df[feature_cols]
y = df[target_col]

print("Features:", feature_cols)
print("Target:", target_col)


Features: ['AssetID', 'State', 'Stage', 'AlarmClassName', 'Year', 'Month', 'Day', 'DayOfWeek', 'Season', 'Hour']
Target: AlarmSeverityName


In [5]:
# 5.1 Detect categorical columns (object or category dtype)
cat_cols = X.select_dtypes(include=['object', 'category']).columns

# Convert them to category if not already
for col in cat_cols:
    X[col] = X[col].astype('category')

# Label-encode the categorical columns to ensure your final dataset X has only numeric columns
label_encoders = {}
for col in cat_cols:
    le = LabelEncoder()
    X[col] = le.fit_transform(X[col].astype(str))
    label_encoders[col] = le

# 5.2 Remove rare classes(for example if "Low" repeats only once that would be a rare class) in the target (any class with <=1 occurrence, for example)
y_counts = y.value_counts()
rare_classes = y_counts[y_counts <= 1].index
df = df[~df[target_col].isin(rare_classes)]

# Update X and y after removing rare-class rows
X = df[feature_cols]
y = df[target_col]


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X[col] = X[col].astype('category')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X[col] = X[col].astype('category')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X[col] = X[col].astype('category')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_in

In [6]:
# Reapply Label Encoding to categorical columns in X
cat_cols = X.select_dtypes(include=['object']).columns
for col in cat_cols:
    le = LabelEncoder()
    X[col] = le.fit_transform(X[col].astype(str))
    # Optionally, store the encoder for later use:
    label_encoders[col] = le

# Label-encode the target variable as well
target_encoder = LabelEncoder()
y = target_encoder.fit_transform(y.astype(str))

# Impute missing values
imputer = SimpleImputer(strategy='most_frequent')
X_imputed = imputer.fit_transform(X)

# Proceed to split the data
X_train, X_test, y_train, y_test = train_test_split(
    X_imputed,
    y,
    test_size=0.2,
    random_state=42,
    stratify=y  # Preserves class distribution
)

# Convert X_train and X_test to DataFrames with proper column names
if not isinstance(X_train, pd.DataFrame):
    X_train = pd.DataFrame(X_train, columns=feature_cols)
if not isinstance(X_test, pd.DataFrame):
    X_test = pd.DataFrame(X_test, columns=feature_cols)

# Optionally, convert all columns to numeric (they should already be numeric now)
X_train = X_train.apply(pd.to_numeric, errors='coerce')
X_test = X_test.apply(pd.to_numeric, errors='coerce')


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X[col] = le.fit_transform(X[col].astype(str))
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X[col] = le.fit_transform(X[col].astype(str))
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X[col] = le.fit_transform(X[col].astype(str))
A value is trying to be set on a copy of a slice from a DataFrame.


In [7]:
# Ensure X_train is a DataFrame (if it isn’t already)
if not isinstance(X_train, pd.DataFrame):
    X_train = pd.DataFrame(X_train, columns=feature_cols)  # assign appropriate column names

# Identify categorical columns (those of object type)
categorical_cols = X_train.select_dtypes(include=['object']).columns

# Apply Label Encoding to each categorical column
label_encoders = {}
for col in categorical_cols:
    le = LabelEncoder()
    X_train[col] = le.fit_transform(X_train[col].astype(str))
    label_encoders[col] = le

# Do the same for your test set if needed:
if isinstance(X_test, np.ndarray):
    X_test = pd.DataFrame(X_test, columns=X_train.columns)
else:
    for col in categorical_cols:
        X_test[col] = label_encoders[col].transform(X_test[col].astype(str))

# Now, try SMOTE again by generating synthetic samples for the minority classes
smote = SMOTE(random_state=42)
X_train_sm, y_train_sm = smote.fit_resample(X_train, y_train)

print("Class distribution before SMOTE:", np.bincount(y_train))
print("Class distribution after SMOTE:", np.bincount(y_train_sm))

Class distribution before SMOTE: [51233 13449 16928    83   125    36]
Class distribution after SMOTE: [51233 51233 51233 51233 51233 51233]


In [8]:
from sklearn.ensemble import IsolationForest

# Apply IsolationForest on the SMOTE-resampled training data
iso_forest = IsolationForest(contamination=0.01, random_state=42)
y_train_outliers = iso_forest.fit_predict(X_train_sm)  # X_train_sm from SMOTE

# Keep only inliers (labels == 1)
inlier_mask = (y_train_outliers == 1)
X_train_clean = X_train_sm[inlier_mask]
y_train_clean = y_train_sm[inlier_mask]

print("Training set size before outlier removal:", X_train_sm.shape[0])
print("Training set size after outlier removal:", X_train_clean.shape[0])


Training set size before outlier removal: 307398
Training set size after outlier removal: 304329


In [9]:
# Outlier detection BEFORE scaling
iso_forest_orig = IsolationForest(contamination=0.01, random_state=42)
outlier_preds_orig = iso_forest_orig.fit_predict(X_train_sm)

In [10]:
from sklearn.preprocessing import RobustScaler


# Apply RobustScaler on the SMOTE-resampled training data
scaler = RobustScaler()
X_train_sm_scaled = scaler.fit_transform(X_train_sm)

# Initialize IsolationForest for the scaled data
iso_forest_scaled = IsolationForest(contamination=0.01, random_state=42)
outlier_preds_scaled = iso_forest_scaled.fit_predict(X_train_sm_scaled)

# Outlier Detection BEFORE and AFTER Scaling

In [None]:
# import lightningchart as lc
# import numpy as np
# import pandas as pd
# from sklearn.preprocessing import RobustScaler
# from sklearn.ensemble import IsolationForest

# lc.set_license('my-license-key')


# # 1) CREATE A DASHBOARD WITH 1 ROW AND 2 COLUMNS
# dashboard = lc.Dashboard(theme=lc.Themes.Light, rows=1, columns=2)

# # 2) OUTLIER DETECTION BEFORE SCALING (LEFT CHART)
# chart_before = dashboard.ChartXY(
#     row_index=0,          # First (and only) row
#     column_index=0,       # Left column    
# ).set_title("Outlier Detection BEFORE Scaling")

# scatter_series_before = chart_before.add_point_series(lookup_values=True)

# # (Adjust your logic if you have a different scheme.)
# lookup_values_before = np.where(outlier_preds_orig == -1, 1, 0)  # 1=Outlier (Red), 0=Inlier (Blue)

# scatter_series_before.append_samples(
#     x_values=list(range(len(X_train_sm))),
#     y_values=X_train_sm.iloc[:, 0].tolist(),
#     lookup_values=lookup_values_before.tolist()
# )

# scatter_series_before.set_palette_point_coloring(
#     steps=[
#         {"value": 0.0, "color": lc.Color(0, 0, 255)},    # Blue for inliers
#         {"value": 1.0, "color": lc.Color(255, 0, 0)},    # Red for outliers
#     ],
#     look_up_property="value",
#     percentage_values=False
# )

# chart_before.get_default_x_axis().set_title("Sample Index")
# chart_before.get_default_y_axis().set_title("Feature 0 Value (Original)")
# chart_before.add_legend(data=scatter_series_before).set_title("Outlier Detection")


# # 3) OUTLIER DETECTION AFTER SCALING (RIGHT CHART)

# chart_after = dashboard.ChartXY(
#     row_index=0,          # Same row
#     column_index=1,       # Right column
# ).set_title(title="Outlier Detection AFTER Scaling")

# # Make sure X_train_sm is a DataFrame
# if not isinstance(X_train_sm, pd.DataFrame):
#     X_train_sm_df = pd.DataFrame(X_train_sm, columns=feature_cols)
# else:
#     X_train_sm_df = X_train_sm.copy()

# # Apply RobustScaler
# scaler = RobustScaler()
# X_train_sm_scaled = scaler.fit_transform(X_train_sm_df)
# X_train_sm_scaled_df = pd.DataFrame(X_train_sm_scaled, columns=feature_cols)

# # Apply IsolationForest to detect outliers in scaled data
# iso_forest_scaled = IsolationForest(contamination=0.01, random_state=42)
# outlier_preds_scaled = iso_forest_scaled.fit_predict(X_train_sm_scaled_df)

# # Convert predictions to int (inliers=+1, outliers=-1)
# lookup_values_after = [int(val) for val in outlier_preds_scaled]

# series_after = chart_after.add_point_series(lookup_values=True)
# series_after.append_samples(
#     x_values=list(range(len(X_train_sm_scaled_df))),
#     y_values=X_train_sm_scaled_df.iloc[:, 0].tolist(),  # Plot feature-0
#     lookup_values=lookup_values_after
# )

# series_after.set_palette_point_coloring(
#     steps=[
#         {"value": -1.0, "color": lc.Color(255, 0, 0)},  # Red for outliers
#         {"value":  1.0, "color": lc.Color(0, 0, 255)}   # Blue for inliers
#     ],
#     look_up_property="value",
#     percentage_values=False
# )

# chart_after.get_default_x_axis().set_title("Sample Index")
# chart_after.get_default_y_axis().set_title("Feature 0 Value (Scaled)")
# chart_after.add_legend(data=series_after).set_title("Outlier Detection")

# dashboard.open(method='browser')


In [11]:
from xgboost import XGBClassifier

# Initialize the XGBoost classifier
xgb_model = XGBClassifier(
    random_state=42,
    use_label_encoder=False,
    eval_metric='logloss'
)

# Train the model using the cleaned training set
xgb_model.fit(X_train_clean, y_train_clean)


Parameters: { "use_label_encoder" } are not used.



In [12]:
# Ensure that X_test is numeric and in DataFrame form (should already be the case)
y_pred = xgb_model.predict(X_test)

In [13]:
from sklearn.metrics import accuracy_score, classification_report

accuracy = accuracy_score(y_test, y_pred)
print(f"XGBoost Accuracy: {accuracy:.5f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

XGBoost Accuracy: 0.97811

Classification Report:
              precision    recall  f1-score   support

           0       0.99      0.97      0.98     12809
           1       0.91      0.97      0.94      3362
           2       1.00      1.00      1.00      4232
           3       0.83      0.95      0.89        21
           4       0.69      0.87      0.77        31
           5       1.00      1.00      1.00         9

    accuracy                           0.98     20464
   macro avg       0.90      0.96      0.93     20464
weighted avg       0.98      0.98      0.98     20464



# XGBoost Feature Importance

In [None]:
# import lightningchart as lc
# import pandas as pd
# import numpy as np
# from xgboost import XGBClassifier

# lc.set_license('my-license-key')

# # Assume your feature names are defined:
# feature_columns = [
#     "AssetID", "State", "Stage", "AlarmClassName", 
#     "Year", "Month", "Day", "DayOfWeek", "Season", "Hour"
# ]

# # Retrieve feature importances from the XGBoost model
# importances = xgb_model.feature_importances_

# # Sort features by importance in descending order
# sorted_indices = np.argsort(importances)[::-1]
# sorted_features = np.array(feature_columns)[sorted_indices]
# sorted_importances = importances[sorted_indices]

# # Convert importance scores to Python floats for JSON serialization
# bar_chart_data = [
#     {"category": sorted_features[i], "value": float(sorted_importances[i])}
#     for i in range(len(sorted_features))
# ]

# # Create a horizontal Bar Chart using LightningChart with a dark theme
# chart = lc.BarChart(
#     theme=lc.Themes.Dark,
#     title="XGBoost Feature Importance",
#     vertical=False
# )

# chart.set_sorting("disabled")
# chart.set_data(bar_chart_data)

# chart.set_palette_colors(
#     steps=[
#         {"value": float(min(sorted_importances)), "color": lc.Color(0, 0, 255)},     # Blue
#         {"value": float(np.median(sorted_importances)), "color": lc.Color(255, 255, 0)},  # Yellow
#         {"value": float(max(sorted_importances)), "color": lc.Color(255, 0, 0)},       # Red
#     ],
#     percentage_values=False,
#     look_up_property="x",
# )
# chart.open(method='browser')


127.0.0.1 - - [04/Mar/2025 10:10:29] "GET / HTTP/1.1" 200 -


<lightningchart.charts.bar_chart.BarChart at 0x18493f3afd0>

In [None]:
import shap

# 1. Create a TreeExplainer for the trained XGBoost model
explainer = shap.TreeExplainer(xgb_model)

# 2. Compute SHAP values for the test set
# Note: For multi-class classification, shap_values is a list. Here, we assume either a binary model or we use the first class.
shap_values = explainer.shap_values(X_test)
if isinstance(shap_values, list):
    # For multi-class, choose the SHAP values for the first class as an example
    shap_values = shap_values[0]

In [15]:
# 6. SHAP Waterfall Plot for a single instance
shap_values_class0 = shap_values[:, :, 0]
instance_index = 0  # change this to choose a different test instance
print(f"Generating SHAP waterfall plot for instance index: {instance_index}")

# Retrieve the expected value (base value). For multi-class models, use the one corresponding to your chosen class.
base_value = (explainer.expected_value[0] if isinstance(explainer.expected_value, list)
              else explainer.expected_value)

# Create an Explanation object for the chosen instance
instance_shap = shap.Explanation(
    values=shap_values_class0[instance_index],
    base_values=base_value,
    data=X_test.iloc[instance_index],
    feature_names=X_test.columns
)

Generating SHAP waterfall plot for instance index: 0


In [None]:
import numpy as np
import pandas as pd
import time
import lightningchart as lc
from collections import deque
from sklearn.preprocessing import LabelEncoder

lc.set_license('my-license-key')

dashboard = lc.Dashboard(theme=lc.Themes.Dark, rows=8, columns=10)


# ROW 0: Real-Time Alarm Predictions
chart1 = dashboard.ChartXY(title="Real-Time Alarm Predictions", row_index=0, column_index=0, row_span=4, column_span=9)
actual_series = chart1.add_point_line_series().set_name("Actual Alarms")
predicted_series = chart1.add_point_line_series().set_name("Predicted Alarms")

actual_series.set_point_size(6).set_point_color(lc.Color("yellow")).set_line_color(lc.Color("yellow"))
predicted_series.set_point_size(6).set_point_color(lc.Color("red")).set_line_color(lc.Color("red"))

past_prediction_series = []

max_points = 50  # Limit displayed points
timestamps = deque(maxlen=max_points)
actual_values = deque(maxlen=max_points)
predicted_timestamps = deque(maxlen=2)
predicted_values = deque(maxlen=2)

unique_labels = np.unique(np.concatenate((y_test, y_pred)))
target_encoder = LabelEncoder()
target_encoder.fit(unique_labels)

y_test_encoded = target_encoder.transform(y_test)
y_pred_encoded = target_encoder.transform(y_pred)

total_points = len(y_pred_encoded)
time_delay_prediction = 2.0   # Delay for row 0 predicted points
time_delay_after_actual = 2.0

row0_time_counter = 0
row0_timestamps_list = []
row0_actual_values_list = []

# ----- Severity Count Text Chart -----
text_chart = dashboard.ChartXY(row_index=0, column_index=9, row_span=2, column_span=1)
text_chart.set_title("")
text_chart.get_default_x_axis().set_tick_strategy("Empty")
text_chart.get_default_y_axis().set_tick_strategy("Empty")
# Initialize textboxes with "0"
low_textbox_title = text_chart.add_textbox("Low severities", 5, 8)
low_textbox_title.set_text_font(16, weight="bold").set_stroke(thickness=0, color=lc.Color(0, 0, 0, 0))
median_textbox = text_chart.add_textbox("Medium severities", 5, 5)
median_textbox.set_text_font(15, weight="bold").set_stroke(thickness=0, color=lc.Color(0, 0, 0, 0))
high_textbox = text_chart.add_textbox("High severities", 5, 2)
high_textbox.set_text_font(16, weight="bold").set_stroke(thickness=0, color=lc.Color(0, 0, 0, 0))

# ----- Severity Situation Image Chart -----
image_chart = dashboard.ChartXY(row_index=2, column_index=9, row_span=2, column_span=1)
image_chart.get_default_x_axis().set_tick_strategy("Empty")
image_chart.get_default_y_axis().set_tick_strategy("Empty")
image_series = image_chart.add_point_series().add(5,5)

def update_row0_batch():
    global row0_time_counter, row0_timestamps_list, row0_actual_values_list
    if row0_time_counter >= total_points:
        return
    predicted_values.clear()
    predicted_timestamps.clear()
    for _ in range(2):
        if row0_time_counter < total_points:
            predicted_timestamps.append(row0_time_counter)
            predicted_values.append(int(y_pred_encoded[row0_time_counter]))
            row0_time_counter += 1
    predicted_series.clear().add(x=list(predicted_timestamps), y=list(predicted_values))
    time.sleep(time_delay_prediction)
    new_gray_series = chart1.add_point_line_series().set_name("Past Prediction")\
                         .set_point_color(lc.Color("orange")).set_line_color(lc.Color("orange"))
    new_gray_series.add(x=list(predicted_timestamps), y=list(predicted_values))
    past_prediction_series.append(new_gray_series)
    for t in predicted_timestamps:
        row0_timestamps_list.append(t)
        row0_actual_values_list.append(int(y_test_encoded[t]))
    actual_series.add(x=list(predicted_timestamps), y=[int(y_test_encoded[t]) for t in predicted_timestamps])
    predicted_series.clear()

    # Update the severity count textboxes as before.
    number_of_low = row0_actual_values_list.count(0)
    number_of_median = row0_actual_values_list.count(1)
    number_of_high = row0_actual_values_list.count(2)

    low_textbox_value = text_chart.add_textbox(str(number_of_low), 5, 7)
    low_textbox_value.set_text_font(15, weight="bold").set_stroke(thickness=0, color=lc.Color(0, 0, 0, 0))
    median_textbox = text_chart.add_textbox(str(number_of_median), 5, 4)
    median_textbox.set_text_font(15, weight="bold").set_stroke(thickness=0, color=lc.Color(0, 0, 0, 0))
    high_textbox = text_chart.add_textbox(str(number_of_high), 5, 1)
    high_textbox.set_text_font(15, weight="bold").set_stroke(thickness=0, color=lc.Color(0, 0, 0, 0))

    # NEW: Update the image based on the second predicted point only.
        # NEW: Update the image and title based on the second predicted point only.
    if len(predicted_values) >= 2:
        second_predicted = predicted_values[1]
        if second_predicted == 0:
            severity_image = "low.png"
            severity_title = "Low Severity"
        elif second_predicted == 1:
            severity_image = "median.png"
            severity_title = "Median Severity"
        elif second_predicted == 2:
            severity_image = "high.png"
            severity_title = "High Severity"
        else:
            severity_image = "low.png"
            severity_title = "Low Severity"
    else:
        severity_image = "low.png"
        severity_title = "Low Severity"

    image_series.set_point_image_style(
        source=f"D:/Computer Aplication/WorkPlacement/Projects/Project21/Images/{severity_image}"
    ).set_point_size(0.5)
    image_chart.set_title(severity_title).set_title_font(weight='bold', size=15)

    
    time.sleep(1)

# Create a dummy series for past predictions with the desired gray style.
dummy_past_series = chart1.add_point_line_series().set_name("Past Predictions")
dummy_past_series.set_point_size(6).set_point_color(lc.Color("orange")).set_line_color(lc.Color("orange"))
dummy_past_series.clear()  # Clear any points so it doesn't show on the chart

# Add a legend that includes the three series.
chart1.add_legend().add(actual_series).add(predicted_series).add(dummy_past_series).set_title("Legend")


# ROW 1: Real-Time Certainty Heatmap
chart2 = dashboard.ChartXY(title="Real-Time Certainty Heatmap", row_index=4, column_index=0, row_span=4, column_span=10)

raw_predictions = xgb_model.predict_proba(X_test)[:, 2]
min_val = raw_predictions.min()
if min_val < 0:
    raw_predictions -= min_val
new_max = raw_predictions.max()
if new_max > 1:
    raw_predictions /= new_max

predictions_series_pd = pd.Series(raw_predictions)
window_size = 10
predictions_smoothed = predictions_series_pd.rolling(window=window_size, min_periods=1).mean().values
predictions_smoothed = np.clip(predictions_smoothed, 0, 1)
predictions = predictions_smoothed

num_rows = 100
columns_count = len(predictions)
mat = np.zeros((num_rows, columns_count))

time_delay_heatmap = 0.01
lead_time = 400
batch_size = 10

def generate_certainty_heatmap(pred):
    row_values = np.linspace(0, 1, num_rows)
    column = 1 - np.abs(row_values - pred)
    return np.clip(column, 0, 1)

heatmap_series = chart2.add_heatmap_grid_series(
    columns=columns_count,
    rows=num_rows,
    data_order="rows"
).set_name("Real-Time Certainty Heatmap")

heatmap_series.set_start(0, 0)
heatmap_series.set_step(1, 1.0 / num_rows)
heatmap_series.set_palette_coloring(
    steps=[
        {"value": 0.5, "color": lc.Color(0, 0, 0, 0)},
        {"value": 0.75, "color": lc.Color(255, 255, 0, 64)},
        {"value": 0.9, "color": lc.Color(255, 192, 0, 128)},
        {"value": 1.0, "color": lc.Color(255, 0, 0, 128)},
    ],
    look_up_property="value",
    percentage_values=True,
    interpolate=True,
)
heatmap_series.hide_wireframe()
heatmap_series.set_intensity_interpolation(True)

predicted_series2 = chart2.add_line_series()
predicted_series2.set_name("Predicted Values (Smoothed_Low Severity)").set_line_color(lc.Color(0, 255, 255, 128))
predicted_series2.set_line_thickness(-1)

chart2.add_legend().add(predicted_series2).add(heatmap_series).set_title("Legend")
chart2.get_default_x_axis().set_interval(0, columns_count)
chart2.get_default_y_axis().set_interval(0, 1)

for t in range(lead_time):
    if t < columns_count:
        mat[:, t] = generate_certainty_heatmap(predictions[t])
heatmap_series.invalidate_intensity_values(mat)

heatmap_counter = lead_time

def update_heatmap_batch():
    global heatmap_counter
    if heatmap_counter >= columns_count:
        return
    # Do not clear predicted_series2; accumulate new points.
    for i in range(batch_size):
        idx = heatmap_counter + i
        if idx < columns_count:
            mat[:, idx] = generate_certainty_heatmap(predictions[idx])
    for i in range(batch_size):
        idx = heatmap_counter + i - lead_time
        if 0 <= idx < columns_count:
            predicted_series2.add(x=idx, y=float(predictions[idx]))
    heatmap_series.invalidate_intensity_values(mat)
    heatmap_counter += batch_size
    time.sleep(time_delay_heatmap)


# Open the Dashboard (only once)
dashboard.open(live=True, method='browser')


# Combined Streaming Loop (Sequentially interleaved updates)
while (row0_time_counter < total_points) or (heatmap_counter < columns_count):
    if row0_time_counter < total_points:
        update_row0_batch()
    if heatmap_counter < columns_count:
        update_heatmap_batch()


KeyboardInterrupt: 