In [2]:
import pandas as pd
from pathlib import Path
import pyarrow.parquet as pq
from dataclasses import dataclass
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.preprocessing import StandardScaler

# from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, classification_report, confusion_matrix
# from sklearn.decomposition import PCA
# from sklearn.ensemble import IsolationForest

import warnings
warnings.filterwarnings("ignore")

# hv.renderer('bokeh').theme = 'dark_minimal'


In [3]:
# dataset_root = Path(r"C:\Users\Turquin\Documents\MLFPMA - Machine Learning for Predictive Maintenance Application\Project\Dataset") # Raw string works without escaping \
dataset_root = Path("./Dataset")

@dataclass
class Case():
    info: pd.DataFrame
    measurements: pd.DataFrame


class RawDataset():
    def __init__(self, root, unit = "VG4", load_training=False, load_synthetic=False, load_anomalies=False) -> None:
        read_pq_file = lambda f: pq.read_table(root / f).to_pandas()
        
        cases = {
            "test": [f"{unit}_generator_data_testing_real_measurements.parquet", root / f"{unit}_generator_data_testing_real_info.csv" ], 
        }

        if load_training:
            cases = {
                **cases,
                "train": [f"{unit}_generator_data_training_measurements.parquet", root / f"{unit}_generator_data_training_info.csv" ], 
            }

        if load_synthetic:
            cases = {
                **cases,
                "test_s01": [f"{unit}_generator_data_testing_synthetic_01_measurements.parquet", root / f"{unit}_generator_data_testing_synthetic_01_info.csv"], 
                "test_s02": [f"{unit}_generator_data_testing_synthetic_02_measurements.parquet", root / f"{unit}_generator_data_testing_synthetic_02_info.csv"]
            }

        if load_anomalies:
            anomaly_folder = Path("synthetic_anomalies")  # Relative path
            subdataset = ["01", "02"]
            anomaly_types = ["a", "b", "c"]
            for anomaly in subdataset:
                for subtype in anomaly_types:
                    anomaly_key = f"anomaly_{anomaly}_type_{subtype}"
                    anomaly_file = f"{unit}_anomaly_{anomaly}_type_{subtype}.parquet"
                    full_anomaly_path = root / anomaly_folder / anomaly_file
                    if full_anomaly_path.exists():
                        cases[anomaly_key] = [anomaly_folder / anomaly_file, None]

        
        self.data_dict = dict()
        
        for id_c, c in cases.items():
            # if you need to verify the parquet header:
            # pq_rows = RawDataset.read_parquet_schema_df(root / c[0])
            measurements = read_pq_file(c[0])
            info = pd.read_csv(c[1]) if c[1] is not None else None
            self.data_dict[id_c] = Case(info, measurements)
    
    @staticmethod
    def read_parquet_schema_df(uri: str) -> pd.DataFrame:
        """Return a Pandas dataframe corresponding to the schema of a local URI of a parquet file.

        The returned dataframe has the columns: column, pa_dtype
        """
        # Ref: https://stackoverflow.com/a/64288036/
        schema = pq.read_schema(uri, memory_map=True)
        schema = pd.DataFrame(({"column": name, "pa_dtype": str(pa_dtype)} for name, pa_dtype in zip(schema.names, schema.types)))
        schema = schema.reindex(columns=["column", "pa_dtype"], fill_value=pd.NA)  # Ensures columns in case the parquet file has an empty dataframe.
        return schema
    

rds_u4 = RawDataset(dataset_root, "VG4", load_synthetic=False, load_training=True)
rds_u5 = RawDataset(dataset_root, "VG5", load_synthetic=True, load_training=True, load_anomalies=True)
rds_u6 = RawDataset(dataset_root, "VG6", load_synthetic=True, load_training=True, load_anomalies=True)

In [4]:
def add_anomaly_ground_truth(rds):
    subdataset = ["01", "02"]
    anomaly_types = ["a", "b", "c"]

    results = []
    for anomaly in subdataset:
        test_s012 = rds.data_dict[f'test_s{anomaly}'].measurements

        for subtype in anomaly_types:
            anomaly_key = f"anomaly_{anomaly}_type_{subtype}"
            labeled_df = rds.data_dict[anomaly_key].measurements
            test_s012.loc[labeled_df['ground_truth'] == 1, anomaly_key] = 1
        
        test_s012['anomaly'] = (test_s012[[f'anomaly_{anomaly}_type_a',f'anomaly_{anomaly}_type_b',f'anomaly_{anomaly}_type_c']].max(axis=1) == 1).astype(int)
        results.append(test_s012)

    return results

u5_s01, u5_s02 = add_anomaly_ground_truth(rds_u5)
u6_s01, u6_s02 = add_anomaly_ground_truth(rds_u6)

In [5]:
def anomaly_na(df):
    cols_anomaly = [col for col in df.columns if 'anomaly' in col]
    df[cols_anomaly] = df[cols_anomaly].fillna(0)
    return df 

anomaly_na(u5_s01)
anomaly_na(u5_s02)
anomaly_na(u6_s01)
anomaly_na(u6_s02)


Unnamed: 0,tot_activepower,ext_tmp,plant_tmp,charge,coupler_position,injector_01_opening,injector_02_opening,injector_03_opening,injector_04_opening,injector_05_opening,...,equilibrium_turbine_mode,dyn_only_on,pump_mode,short_circuit_mode,equilibrium_pump_mode,equilibrium_short_circuit_mode,anomaly_02_type_a,anomaly_02_type_b,anomaly_02_type_c,anomaly
2021-11-01 00:00:00+01:00,0.000000,14.845771,18.216559,4.311914,11.803000,0.0,0.0,0.0,0.0,0.0,...,False,True,False,False,False,False,0.0,0.0,0.0,0
2021-11-01 00:00:30+01:00,0.000000,14.852612,18.233796,4.464852,11.803000,0.0,0.0,0.0,0.0,0.0,...,False,True,False,False,False,False,0.0,0.0,0.0,0
2021-11-01 00:01:00+01:00,0.000000,14.859453,18.251033,4.445999,11.803000,0.0,0.0,0.0,0.0,0.0,...,False,True,False,False,False,False,0.0,0.0,0.0,0
2021-11-01 00:01:30+01:00,0.000000,14.866293,18.268270,4.140497,11.803000,0.0,0.0,0.0,0.0,0.0,...,False,True,False,False,False,False,0.0,0.0,0.0,0
2021-11-01 00:02:00+01:00,0.000000,14.873134,18.285507,4.453990,11.803000,0.0,0.0,0.0,0.0,0.0,...,False,True,False,False,False,False,0.0,0.0,0.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2021-12-30 23:58:00+01:00,-120.287556,10.201256,16.065450,-1.854672,185.406032,0.0,0.0,0.0,0.0,0.0,...,False,False,True,False,True,False,0.0,0.0,0.0,0
2021-12-30 23:58:30+01:00,-76.832558,10.203748,16.010313,-1.785339,185.403888,0.0,0.0,0.0,0.0,0.0,...,False,False,False,True,False,True,0.0,0.0,0.0,0
2021-12-30 23:59:00+01:00,0.000000,10.206240,15.955176,4.045595,185.401744,0.0,0.0,0.0,0.0,0.0,...,False,False,False,False,False,False,0.0,0.0,0.0,0
2021-12-30 23:59:30+01:00,0.000000,10.208732,15.900039,4.116665,185.399600,0.0,0.0,0.0,0.0,0.0,...,False,False,False,False,False,False,0.0,0.0,0.0,0


#### Control Variables

In [90]:
def get_control_vars(df):
    control_vars = df[(df['control_signal'] == True) | (df['input_feature'] == True)].attribute_name.values
    nature_vars = ['canal_level', 'canal_tmp', 'lake_tmp', 'ext_tmp', 'plant_tmp']
    add_control_vars = [temp for temp in nature_vars if temp in df.attribute_name.values]
    return np.append(control_vars, add_control_vars)

def get_all_measurement_vars(df):
    ''' Only these variables will get standaridized. '''
    measurement_vars = df[(df['output_feature'] == True)].attribute_name.values
    return measurement_vars


u4_control_vars = get_control_vars(rds_u4.data_dict["train"].info)
u5_control_vars = get_control_vars(rds_u5.data_dict["train"].info)
u6_control_vars = get_control_vars(rds_u6.data_dict["train"].info)

u4_measurement_vars = get_all_measurement_vars(rds_u4.data_dict["train"].info)
u5_measurement_vars = get_all_measurement_vars(rds_u5.data_dict["train"].info)

u5_control_vars
# u5_measurement_vars

array(['tot_activepower', 'charge', 'coupler_position',
       'injector_01_opening', 'injector_02_opening',
       'injector_03_opening', 'injector_04_opening',
       'injector_05_opening', 'pump_calculated_flow',
       'pump_pressure_diff', 'pump_rotspeed', 'turbine_pressure',
       'turbine_rotspeed', 'water_primary_pump_01_opening',
       'water_primary_pump_02_opening', 'timer_turbine_on_off',
       'timer_injector_opening', 'ext_tmp', 'plant_tmp'], dtype=object)

Separate dataframes by operating conditions and remove unnecessary columns about operating conditions.

In [7]:
def get_operating_modes(df):
    df_equilibrium_turbine_mode = df[df['equilibrium_turbine_mode'] == True]
    df_equilibrium_pump_mode = df[df['equilibrium_pump_mode'] == True]

    operating_cond_cols = ['machine_on', 'turbine_mode', 'all',
       'equilibrium_turbine_mode', 'dyn_only_on', 'pump_mode',
       'equilibrium_pump_mode']
    
    if 'short_circuit_mode' in df.columns:
        operating_cond_cols += ['short_circuit_mode', 'equilibrium_short_circuit_mode']
    
    df_equilibrium_turbine_mode = df_equilibrium_turbine_mode.drop(columns = operating_cond_cols)
    df_equilibrium_pump_mode = df_equilibrium_pump_mode.drop(columns = operating_cond_cols)

    return df_equilibrium_turbine_mode, df_equilibrium_pump_mode


In [36]:
# train sets
u4_train_equil_turbine, u4_train_equil_pump = get_operating_modes(rds_u4.data_dict["train"].measurements)
u5_train_equil_turbine, u5_train_equil_pump = get_operating_modes(rds_u5.data_dict["train"].measurements)
u6_train_equil_turbine, u6_train_equil_pump = get_operating_modes(rds_u6.data_dict["train"].measurements)

# synethetic test sets
u5_s01_equil_turbine, u5_s01_equil_pump = get_operating_modes(u5_s01)
u5_s02_equil_turbine, u5_s02_equil_pump = get_operating_modes(u5_s02)
u6_s01_equil_turbine, u6_s01_equil_pump = get_operating_modes(u6_s01)
u6_s02_equil_turbine, u6_s02_equil_pump = get_operating_modes(u6_s02)

# real test sets
u4_test_equil_turbine, u4_test_equil_pump = get_operating_modes(rds_u4.data_dict["test"].measurements)
u5_test_equil_turbine, u5_test_equil_pump = get_operating_modes(rds_u5.data_dict["test"].measurements)
u6_test_equil_turbine, u6_test_equil_pump = get_operating_modes(rds_u6.data_dict["test"].measurements)


In [76]:
# Separating the list into the required categories
u4_pump_dfs = [
    u4_train_equil_pump,
    u4_test_equil_pump
]

u4_turbine_dfs = [
    u4_train_equil_turbine,
    u4_test_equil_turbine
]

u5_pump_dfs = [
    u5_train_equil_pump,
    u5_s01_equil_pump,
    u5_s02_equil_pump,
    u5_test_equil_pump
]

u5_turbine_dfs = [
    u5_train_equil_turbine,
    u5_s01_equil_turbine,
    u5_s02_equil_turbine,
    u5_test_equil_turbine
]

u6_pump_dfs = [
    u6_train_equil_pump,
    u6_s01_equil_pump,
    u6_s02_equil_pump,
    u6_test_equil_pump
]

u6_turbine_dfs = [
    u6_train_equil_turbine,
    u6_s01_equil_turbine,
    u6_s02_equil_turbine,
    u6_test_equil_turbine
]

all_dfs = u5_pump_dfs + u5_turbine_dfs + u6_pump_dfs + u6_turbine_dfs


# Additional Features

Add time features

In [45]:
def add_time_features(df):
    if not isinstance(df.index, pd.DatetimeIndex):
        df.index = pd.to_datetime(df.index)

    df['minute'] = df.index.minute
    df['hour'] = df.index.hour
    df['day'] = df.index.day
    df['month'] = df.index.month
    df['year'] = df.index.year
    df['dayofweek'] = df.index.dayofweek
    df['dayofyear'] = df.index.dayofyear
    df['is_weekend'] = df.index.dayofweek.isin([5, 6]).astype(int)
    return df

for i in range(len(all_dfs)):
    all_dfs[i] = add_time_features(all_dfs[i])

##### Injector Openings
Must range from 0 to 100%

In [52]:
def create_injector_opening_feature(df):
    injector_columns = [col for col in df.columns if ('injector' in col and 'opening' in col)]
    # print(injector_columns)
    df['injector_sum'] = df[injector_columns].sum(axis=1)/ len(injector_columns)
    return df

for i in range(len(all_dfs)):
    all_dfs[i] = create_injector_opening_feature(all_dfs[i])

In [None]:
# def baseline_anomaly_score(df):
#     '''Anomaly score based soley on injector opening. 
#     If an injector is less than 0% open or more than 100% open, it's an anomaly. 
#     This happens in synethtic anomaly type 1 b.'''

#     injector_columns = [col for col in df.columns if ('injector' in col and 'opening' in col)]
#     df['anomaly_score'] = df[injector_columns].apply(lambda row: (row < 0).any() or (row > 100).any(), axis=1).astype(int)
#     return df

# for i in range(len(u5_pump_dfs)):
#     u5_pump_dfs[i] = baseline_anomaly_score(u5_pump_dfs[i])

In [None]:
# injector_columns = [col for col in u5_s01_equil_pump.columns if ('injector' in col and 'opening' in col)]
# u5_s01_equil_pump[injector_columns].min()
# # u5_pump_dfs[3]['anomaly_score'].value_counts()

injector_01_opening    0.0
injector_02_opening    0.0
injector_03_opening    0.0
injector_04_opening    0.0
injector_05_opening    0.0
dtype: float64

##### Standardize columns

In [92]:
cols_to_scale4 = [col for col in u4_measurement_vars if 'injector' not in col]
cols_to_scale = [col for col in u5_measurement_vars if 'injector' not in col]

print(cols_to_scale4)

['tot_activepower', 'plant_tmp', 'ext_tmp', 'water_primary_cold_tmp', 'water_primary_hot_tmp', 'valve_opening', 'refri_bath_level', 'aspi_bath_level', 'canal_level', 'canal_tmp', 'water_primary_filter_out_pressure', 'water_primary_filter_in_pressure', 'lake_tmp', 'coupler_position', 'tot_reactivepower', 'pump_rotspeed', 'turbine_rotspeed', 'exc_freq', 'exc_current', 'exc_voltage', 'powerfactor', 'elec_freq', 'ph01_current', 'ph02_current', 'ph03_current', 'ph01_voltage', 'ph12_voltage', 'ph02_voltage', 'ph23_voltage', 'ph03_voltage', 'ph31_voltage', 'air_circ_hot_tmp', 'air_circ_cold_01_tmp', 'air_circ_cold_02_tmp', 'stat_magn_01_tmp', 'stat_magn_02_tmp', 'stat_coil_ph01_01_tmp', 'stat_coil_ph01_02_tmp', 'stat_coil_ph02_01_tmp', 'stat_coil_ph03_01_tmp', 'stat_coil_ph03_02_tmp', 'water_circ_hot_01_tmp', 'water_circ_hot_02_tmp', 'water_circ_cold_tmp']


In [93]:
# Define columns to scale
scaler = StandardScaler()

# Function to scale specific columns
def scale_columns(df, cols_to_scale, scaler):
    df_scaled = df.copy()
    if not set(cols_to_scale).issubset(df.columns):
        raise ValueError("Some columns in cols_to_scale are not present in the DataFrame.")
    # Scale only the specified columns
    df_scaled[cols_to_scale] = scaler.transform(df[cols_to_scale])
    return df_scaled

# # u4 pump
scaler.fit(u4_train_equil_pump[cols_to_scale4])
scaled_dfs_u4_pump = [
    scale_columns(df, cols_to_scale4, scaler) for df in u4_pump_dfs
]

# u4 turbine
scaler.fit(u4_train_equil_turbine[cols_to_scale4])
scaled_dfs_u4_turbine = [
    scale_columns(df, cols_to_scale4, scaler) for df in u4_turbine_dfs
]

# u5 pump
scaler.fit(u5_train_equil_pump[cols_to_scale])
scaled_dfs_u5_pump = [
    scale_columns(df, cols_to_scale, scaler) for df in u5_pump_dfs
]

# u5 turbine
scaler.fit(u5_train_equil_turbine[cols_to_scale])
scaled_dfs_u5_turbine = [
    scale_columns(df, cols_to_scale, scaler) for df in u5_turbine_dfs
]

# u6 pump
scaler.fit(u6_train_equil_pump[cols_to_scale])
scaled_dfs_u6_pump = [
    scale_columns(df, cols_to_scale, scaler) for df in u6_pump_dfs
]

# u6 turbine
scaler.fit(u6_train_equil_turbine[cols_to_scale])
scaled_dfs_u6_turbine = [
    scale_columns(df, cols_to_scale, scaler) for df in u6_turbine_dfs
]

Unpack the dataframes

In [96]:
(u4_train_equil_pump, u4_test_equil_pump) = scaled_dfs_u4_pump
(u4_train_equil_turbine, u4_test_equil_turbine) = scaled_dfs_u4_turbine
(u5_train_equil_pump, u5_s01_equil_pump, u5_s02_equil_pump, u5_test_equil_pump) = scaled_dfs_u5_pump
(u5_train_equil_turbine, u5_s01_equil_turbine, u5_s02_equil_turbine, u5_test_equil_turbine) = scaled_dfs_u5_turbine
(u6_train_equil_pump, u6_s01_equil_pump, u6_s02_equil_pump, u6_test_equil_pump) = scaled_dfs_u6_pump
(u6_train_equil_turbine, u6_s01_equil_turbine, u6_s02_equil_turbine, u6_test_equil_turbine) = scaled_dfs_u6_turbine

In [98]:
u5_train_equil_pump.to_csv('u5_train_equil_pump.csv')
u5_s01_equil_pump.to_csv('u5_s01_equil_pump.csv')

Copy this into your notebook to import the variables

In [21]:
'''
# training data
u4_train_equil_turbine = data_preprocessing.u4_train_equil_turbine
u4_train_equil_pump = data_preprocessing.u4_train_equil_pump
u5_train_equil_turbine = data_preprocessing.u5_train_equil_turbine
u5_train_equil_pump = data_preprocessing.u5_train_equil_pump
u6_train_equil_turbine = data_preprocessing.u6_train_equil_turbine
u6_train_equil_pump = data_preprocessing.u6_train_equil_pump

# synethetic test sets
u5_s01_equil_turbine = data_preprocessing.u5_s01_equil_turbine
u5_s01_equil_pump = data_preprocessing.u5_s01_equil_pump
u5_s02_equil_turbine = data_preprocessing.u5_s02_equil_turbine
u5_s02_equil_pump = data_preprocessing.u5_s02_equil_pump
u6_s01_equil_turbine = data_preprocessing.u6_s01_equil_turbine
u6_s01_equil_pump = data_preprocessing.u6_s01_equil_pump
u6_s02_equil_turbine = data_preprocessing.u6_s02_equil_turbine
u6_s02_equil_pump = data_preprocessing.u6_s02_equil_pump

# real test sets
u4_test_equil_turbine = data_preprocessing.u4_test_equil_turbine
u4_test_equil_pump = data_preprocessing.u4_test_equil_pump
u5_test_equil_turbine = data_preprocessing.u5_test_equil_turbine
u5_test_equil_pump = data_preprocessing.u5_test_equil_pump
u6_test_equil_turbine = data_preprocessing.u6_test_equil_turbine
u6_test_equil_pump = data_preprocessing.u6_test_equil_pump
'''

'\n# training data\nu4_train_equil_turbine = data_preprocessing.u4_train_equil_turbine\nu4_train_equil_pump = data_preprocessing.u4_train_equil_pump\nu5_train_equil_turbine = data_preprocessing.u5_train_equil_turbine\nu5_train_equil_pump = data_preprocessing.u5_train_equil_pump\nu6_train_equil_turbine = data_preprocessing.u6_train_equil_turbine\nu6_train_equil_pump = data_preprocessing.u6_train_equil_pump\n\n# synethetic test sets\nu5_s01_equil_turbine = data_preprocessing.u5_s01_equil_turbine\nu5_s01_equil_pump = data_preprocessing.u5_s01_equil_pump\nu5_s02_equil_turbine = data_preprocessing.u5_s02_equil_turbine\nu5_s02_equil_pump = data_preprocessing.u5_s02_equil_pump\nu6_s01_equil_turbine = data_preprocessing.u6_s01_equil_turbine\nu6_s01_equil_pump = data_preprocessing.u6_s01_equil_pump\nu6_s02_equil_turbine = data_preprocessing.u6_s02_equil_turbine\nu6_s02_equil_pump = data_preprocessing.u6_s02_equil_pump\n\n# real test sets\nu4_test_equil_turbine = data_preprocessing.u4_test_equil

Some examples:

For Unit 4:
- train on u4_train_equil_pump, test on u4_test_equil_pump

For Unit 5:
- train on u5_train_equil_turbine, test on u5_s01_equil_turbine, u5_s02_equil_turbine, u5_test_equil_turbine