In [2]:
import delta_sharing
import os
from pyspark.sql import SparkSession
from tqdm import tqdm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold, train_test_split
from sklearn.linear_model import Lasso
from sklearn.metrics import classification_report, accuracy_score
import xgboost as xgb

In [3]:
df = pd.read_csv("data_sets/almost_full_data4.csv")
df_tmp = df.iloc[:1000000]
df_test = df.iloc[1000000:1500000]

In [56]:
df_tmp = df.iloc[7000000:8000000]

In [57]:
print(df_tmp["Error_Status_Cloud"].value_counts())

0.0       694703
36.0      179939
192.0      31319
4.0        28435
64.0       18466
3076.0     17944
16.0        1489
1031.0       135
512.0         74
3.0            1
8.0            1
Name: Error_Status_Cloud, dtype: int64


In [58]:
#basic data cleaning

bit_position = 5
df_tmp.dropna(subset=["Error_Status_Cloud"], inplace=True)
df_tmp["bit_5"] = df_tmp["Error_Status_Cloud"].apply(lambda x: (int(x) >> bit_position) & 1)
df_tmp["error_in_next_30_days"] = 0

df_tmp["day_difference_from_first"] = 0  # ! what does this do?

# Reference time from row 0
time_zero = df_tmp["sample_time"].iloc[0]

df_tmp["sample_time"] = pd.to_datetime(df_tmp["sample_time"], errors='coerce')
df_tmp["day_difference_from_first"] = (df_tmp["sample_time"] - df_tmp["sample_time"].iloc[0]).dt.days

df_tmp = df_tmp.sort_values(by=["device_id", "sample_time"], ascending=[True, True])


#adding error_in_next_2_days column

bit_5 = df_tmp["bit_5"].to_numpy()
device_id = df_tmp["device_id"].to_numpy()
sample_time = df_tmp["sample_time"].to_numpy()
day_difference = df_tmp["day_difference_from_first"].to_numpy()
error_flag = np.zeros(len(df_tmp), dtype=int)  # Initialize the target column as a NumPy array

# Iterate efficiently with NumPy
for i in tqdm(range(len(df_tmp) - 1, -1, -1), desc="Processing rows"):
    
    if bit_5[i] == 1:
        latest_err = i
        j = i
        curId = device_id[i]
        curDay = day_difference[i]
        #(device_id == curID) & (day_difference > day_difference[latest_day] - 2) & (day_difference < day_difference[latest_day])

        while ((j > -1) and (day_difference[j] > curDay - 2 ) and (device_id[j] == curId) and (day_difference[j] <= curDay) ):

            error_flag[j] = 1 

            if(bit_5[j] == 1):

                latest_err = j
                curDay = day_difference[j]
                
            j = j-1
        
        i = j
        

df_tmp["error_in_next_2_days"] = error_flag  # Assign the optimized result back to DataFrame


#adding rolling average columns

# Step 1: Calculate the daily average for each device
df_daily_avg = df_tmp.groupby(["device_id", "day_difference_from_first"]).agg(
    avg_DeltaT_K=("DeltaT_K", "mean"),
    avg_Flow_Volume_total_m3=("Flow_Volume_total_m3", "mean")
).reset_index()

# Step 2: Sort data correctly
df_daily_avg.sort_values(by=["device_id", "day_difference_from_first"], inplace=True)


# Select relevant numeric columns for rolling average calculation
rolling_columns = [
    "T1_remote_K", "T2_embeded_K", "RelFlow_Fb_Rel2Vmax", "RelPower_Fb_Rel2Pmax", 
    "AbsFlow_Fb_m3s", "AbsPower_Fb_W", "Heating_E_J", "Glycol_Concentration_Rel",
    "Cooling_E_J", "RelPos_Fb", "DeltaT_Limitation_Write", "SpDeltaT_K_Write", 
    "Pmax_Rel_Write", "Vmax_Rel_Write", "SpFlow_DeltaT_lmin_Write", "DDC_Sp_Rel",
    "SpDeltaT_applied_K", "DDC_BUS_Sp_Write", "dT_Manager_Ste", "Active_dT_Manager_total_h",
    "DeltaT_K", "DDC_Sp_V", "OperatingHours", "Flow_Volume_total_m3", "Y3AnalogInputValue"
]

# Step 1: Calculate daily averages for each relevant column
df_daily_avg = df_tmp.groupby(["device_id", "day_difference_from_first"])[rolling_columns].mean().reset_index()

# Step 2: Apply rolling average (7-day window) for each column
df_daily_avg = df_daily_avg.sort_values(by=["device_id", "day_difference_from_first"])

# Apply rolling mean grouped by device_id
for col in rolling_columns:
    df_daily_avg[f"rolling_avg_{col}"] = (
        df_daily_avg.groupby("device_id")[col]
        .rolling(2, min_periods=1)
        .mean()
        .reset_index(level=0, drop=True)
    )

# Step 3: Merge rolling averages back into df_tmp
rolling_avg_columns = [f"rolling_avg_{col}" for col in rolling_columns]

df_tmp = df_tmp.merge(
    df_daily_avg[["device_id", "day_difference_from_first"] + rolling_avg_columns],
    on=["device_id", "day_difference_from_first"],
    how="left"
)

df_tmp["error_in_next_2_days"] = df_tmp["error_in_next_2_days"].fillna(0)


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_tmp.dropna(subset=["Error_Status_Cloud"], inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_tmp["bit_5"] = df_tmp["Error_Status_Cloud"].apply(lambda x: (int(x) >> bit_position) & 1)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_tmp["error_in_next_30_days"] = 0
A value is trying to be set on a copy of a slice from a DataFrame.
Try usi

KeyboardInterrupt: 

In [45]:
df_tmp.to_csv("preprocessed_data_bit5.csv", index=False)

In [9]:
print(df_tmp["error_in_next_2_days"].value_counts())

0    999898
Name: error_in_next_2_days, dtype: int64


In [24]:
df_tmp = pd.read_csv("preprocessed_data.csv")

In [33]:
df_test = pd.read_csv("preprocessed_test_data.csv")

In [54]:
#X_train, X_test, y_train, y_test = train_test_split(df_tmp[['rolling_avg_DeltaT_K', 'rolling_avg_Flow_Volume_total_m3']],df_tmp["error_in_next_2_days"], test_size=0.25, random_state=42)
# Select all rolling average columns dynamically
rolling_avg_features = [col for col in df_tmp.columns if col.startswith("rolling_avg_")]

# Ensure no NaN values in features
df_tmp[rolling_avg_features] = df_tmp[rolling_avg_features].fillna(0)

# Split features (X) and target variable (y)
X = df_tmp[rolling_avg_features]
y = df_tmp["error_in_next_2_days"]

ratio = len(df_tmp[df_tmp['error_in_next_2_days'] == 0]) / len(df_tmp[df_tmp['error_in_next_2_days'] == 1])

# Train XGBoost Model
model = xgb.XGBClassifier(
    scale_pos_weight=ratio,
    n_estimators=1000, 
    max_depth=25, 
    learning_rate=0.01,
    subsample=0.8, 
    colsample_bytree=0.8, 
    use_label_encoder=False,
    eval_metric="logloss"
)

# Fit the model
model.fit(X, y)

Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


In [17]:
print(df_tmp["Error_Status_Cloud"].value_counts())

0.0       462496
16.0       17688
64.0       11534
4.0         4154
1028.0      3916
192.0         94
256.0         11
2048.0         2
512.0          1
Name: Error_Status_Cloud, dtype: int64


In [27]:
#basic data cleaning

bit_position = 6
df_tmp.dropna(subset=["Error_Status_Cloud"], inplace=True)
df_tmp["bit_6"] = df_tmp["Error_Status_Cloud"].apply(lambda x: (int(x) >> bit_position) & 1)
df_tmp["error_in_next_30_days"] = 0

df_tmp["day_difference_from_first"] = 0  # Initialize with 0

# Reference time from row 0
time_zero = df_tmp["sample_time"].iloc[0]

df_tmp["sample_time"] = pd.to_datetime(df_tmp["sample_time"], errors='coerce')
df_tmp["day_difference_from_first"] = (df_tmp["sample_time"] - df_tmp["sample_time"].iloc[0]).dt.days

df_tmp = df_tmp.sort_values(by=["device_id", "sample_time"], ascending=[True, True])


#adding error_in_next_2_days column

bit_6 = df_tmp["bit_6"].to_numpy()
device_id = df_tmp["device_id"].to_numpy()
sample_time = df_tmp["sample_time"].to_numpy()
day_difference = df_tmp["day_difference_from_first"].to_numpy()
error_flag = np.zeros(len(df_tmp), dtype=int)  # Initialize the target column as a NumPy array

# Iterate efficiently with NumPy
for i in tqdm(range(len(df_tmp) - 1, -1, -1), desc="Processing rows"):
    if bit_6[i] == 1:
        curID = device_id[i]
        latest_day = i
        
        # Find indices where device_id matches & day difference is within 2 days
        mask = (device_id == curID) & (day_difference > day_difference[latest_day] - 2) & (day_difference < day_difference[latest_day])
        error_flag[mask] = 1  # Set error flag for all matching rows

df_tmp["error_in_next_2_days"] = error_flag  # Assign the optimized result back to DataFrame


#adding rolling average columns

# Step 1: Calculate the daily average for each device
df_daily_avg = df_tmp.groupby(["device_id", "day_difference_from_first"]).agg(
    avg_DeltaT_K=("DeltaT_K", "mean"),
    avg_Flow_Volume_total_m3=("Flow_Volume_total_m3", "mean")
).reset_index()

# Step 2: Sort data correctly
df_daily_avg.sort_values(by=["device_id", "day_difference_from_first"], inplace=True)

'''
# Step 3: Apply rolling window (without "on=")
df_daily_avg["rolling_avg_DeltaT_K"] = (
    df_daily_avg.groupby("device_id")["avg_DeltaT_K"]
    .rolling(7, min_periods=1)
    .mean()
    .reset_index(level=0, drop=True)
)

df_daily_avg["rolling_avg_Flow_Volume_total_m3"] = (
    df_daily_avg.groupby("device_id")["avg_Flow_Volume_total_m3"]
    .rolling(7, min_periods=1)
    .mean()
    .reset_index(level=0, drop=True)
)

# Step 4: Merge rolling averages back into the original DataFrame
df_tmp = df_tmp.merge(
    df_daily_avg[["device_id", "day_difference_from_first", "rolling_avg_DeltaT_K", "rolling_avg_Flow_Volume_total_m3"]],
    on=["device_id", "day_difference_from_first"],
    how="left"
)
'''
# Select relevant numeric columns for rolling average calculation
rolling_columns = [
    "T1_remote_K", "T2_embeded_K", "RelFlow_Fb_Rel2Vmax", "RelPower_Fb_Rel2Pmax", 
    "AbsFlow_Fb_m3s", "AbsPower_Fb_W", "Heating_E_J", "Glycol_Concentration_Rel",
    "Cooling_E_J", "RelPos_Fb", "DeltaT_Limitation_Write", "SpDeltaT_K_Write", 
    "Pmax_Rel_Write", "Vmax_Rel_Write", "SpFlow_DeltaT_lmin_Write", "DDC_Sp_Rel",
    "SpDeltaT_applied_K", "DDC_BUS_Sp_Write", "dT_Manager_Ste", "Active_dT_Manager_total_h",
    "DeltaT_K", "DDC_Sp_V", "OperatingHours", "Flow_Volume_total_m3", "Y3AnalogInputValue"
]

# Step 1: Calculate daily averages for each relevant column
df_daily_avg = df_tmp.groupby(["device_id", "day_difference_from_first"])[rolling_columns].mean().reset_index()

# Step 2: Apply rolling average (7-day window) for each column
df_daily_avg = df_daily_avg.sort_values(by=["device_id", "day_difference_from_first"])

# Apply rolling mean grouped by device_id
for col in rolling_columns:
    df_daily_avg[f"rolling_avg_{col}"] = (
        df_daily_avg.groupby("device_id")[col]
        .rolling(2, min_periods=1)
        .mean()
        .reset_index(level=0, drop=True)
    )

# Step 3: Merge rolling averages back into df_tmp
rolling_avg_columns = [f"rolling_avg_{col}" for col in rolling_columns]

df_tmp = df_tmp.merge(
    df_daily_avg[["device_id", "day_difference_from_first"] + rolling_avg_columns],
    on=["device_id", "day_difference_from_first"],
    how="left"
)

df_tmp["error_in_next_2_days"] = df_tmp["error_in_next_2_days"].fillna(0)


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_tmp.dropna(subset=["Error_Status_Cloud"], inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_tmp["bit_6"] = df_tmp["Error_Status_Cloud"].apply(lambda x: (int(x) >> bit_position) & 1)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_tmp["error_in_next_30_days"] = 0
A value is trying to be set on a copy of a slice from a DataFrame.
Try usi

In [51]:
df_tmp.to_csv("preprocessed_test_data_bit5.csv", index=False)

In [53]:
df_tmp = pd.read_csv("preprocessed_data.csv")

In [50]:
df_test = df_tmp

In [55]:
# Select all rolling average features dynamically
rolling_avg_features = [col for col in df_tmp.columns if col.startswith("rolling_avg_")]

# Ensure no missing values in feature columns before prediction
df_test[rolling_avg_features] = df_test[rolling_avg_features].fillna(0)

# Make predictions using the trained model
y_pred = model.predict(df_test[rolling_avg_features])

# Compute classification report
report = classification_report(df_test['error_in_next_2_days'], y_pred)

# Print results
print("Classification Report:")
print(report)


Classification Report:
              precision    recall  f1-score   support

           0       0.92      0.91      0.92    903617
           1       0.00      0.00      0.00     72238

    accuracy                           0.84    975855
   macro avg       0.46      0.46      0.46    975855
weighted avg       0.85      0.84      0.85    975855



In [42]:
num_devices_with_error = df_tmp[df_tmp["bit_6"] == 1]["device_id"].nunique()
print(num_devices_with_error)

16


In [None]:
#basic data cleaning

bit_position = 5
df_tmp.dropna(subset=["Error_Status_Cloud"], inplace=True)
df_tmp["bit_5"] = df_tmp["Error_Status_Cloud"].apply(lambda x: (int(x) >> bit_position) & 1)
df_tmp["error_in_next_30_days"] = 0

df_tmp["day_difference_from_first"] = 0  # ! what does this do?

# Reference time from row 0
time_zero = df_tmp["sample_time"].iloc[0]

df_tmp["sample_time"] = pd.to_datetime(df_tmp["sample_time"], errors='coerce')
df_tmp["day_difference_from_first"] = (df_tmp["sample_time"] - df_tmp["sample_time"].iloc[0]).dt.days

df_tmp = df_tmp.sort_values(by=["device_id", "sample_time"], ascending=[True, True])


#adding error_in_next_2_days column

bit_5 = df_tmp["bit_5"].to_numpy()
device_id = df_tmp["device_id"].to_numpy()
sample_time = df_tmp["sample_time"].to_numpy()
day_difference = df_tmp["day_difference_from_first"].to_numpy()
error_flag = np.zeros(len(df_tmp), dtype=int)  # Initialize the target column as a NumPy array

# Iterate efficiently with NumPy
for i in tqdm(range(len(df_tmp) - 1, -1, -1), desc="Processing rows"):
    if bit_5[i] == 1:
        curID = device_id[i]
        latest_day = i
        
        # Find indices where device_id matches & day difference is within 2 days
        mask = (device_id == curID) & (day_difference > day_difference[latest_day] - 2) & (day_difference <= day_difference[latest_day])
        error_flag[mask] = 1  # Set error flag for all matching rows

df_tmp["error_in_next_2_days"] = error_flag  # Assign the optimized result back to DataFrame


#adding rolling average columns

# Step 1: Calculate the daily average for each device
df_daily_avg = df_tmp.groupby(["device_id", "day_difference_from_first"]).agg(
    avg_DeltaT_K=("DeltaT_K", "mean"),
    avg_Flow_Volume_total_m3=("Flow_Volume_total_m3", "mean")
).reset_index()

# Step 2: Sort data correctly
df_daily_avg.sort_values(by=["device_id", "day_difference_from_first"], inplace=True)

# Select relevant numeric columns for rolling average calculation
rolling_columns = [
    "T1_remote_K", "T2_embeded_K", "RelFlow_Fb_Rel2Vmax", "RelPower_Fb_Rel2Pmax", 
    "AbsFlow_Fb_m3s", "AbsPower_Fb_W", "Heating_E_J", "Glycol_Concentration_Rel",
    "Cooling_E_J", "RelPos_Fb", "DeltaT_Limitation_Write", "SpDeltaT_K_Write", 
    "Pmax_Rel_Write", "Vmax_Rel_Write", "SpFlow_DeltaT_lmin_Write", "DDC_Sp_Rel",
    "SpDeltaT_applied_K", "DDC_BUS_Sp_Write", "dT_Manager_Ste", "Active_dT_Manager_total_h",
    "DeltaT_K", "DDC_Sp_V", "OperatingHours", "Flow_Volume_total_m3", "Y3AnalogInputValue"
]

# Step 1: Calculate daily averages for each relevant column
df_daily_avg = df_tmp.groupby(["device_id", "day_difference_from_first"])[rolling_columns].mean().reset_index()

# Step 2: Apply rolling average (7-day window) for each column
df_daily_avg = df_daily_avg.sort_values(by=["device_id", "day_difference_from_first"])

# Apply rolling mean grouped by device_id
for col in rolling_columns:
    df_daily_avg[f"rolling_avg_{col}"] = (
        df_daily_avg.groupby("device_id")[col]
        .rolling(2, min_periods=1)
        .mean()
        .reset_index(level=0, drop=True)
    )

# Step 3: Merge rolling averages back into df_tmp
rolling_avg_columns = [f"rolling_avg_{col}" for col in rolling_columns]

df_tmp = df_tmp.merge(
    df_daily_avg[["device_id", "day_difference_from_first"] + rolling_avg_columns],
    on=["device_id", "day_difference_from_first"],
    how="left"
)

df_tmp["error_in_next_2_days"] = df_tmp["error_in_next_2_days"].fillna(0)
