Skip to content

Commit

Permalink
Analysis: Add typing hinting and validation for the results (#35)
Browse files Browse the repository at this point in the history
* Analysis: Add typing annotations and pydantic models to analysis functions

* Documentation: Update sensor validation notebook to use the new types
  • Loading branch information
lucianolorenti committed Feb 27, 2024
1 parent 2212495 commit 0db97ae
Show file tree
Hide file tree
Showing 11 changed files with 357 additions and 444 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 3.0.2
current_version = 3.0.3
commit = True
tag = True

Expand Down
2 changes: 1 addition & 1 deletion ceruleo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
CACHE_PATH.mkdir(parents=True, exist_ok=True)


__version__ = "3.0.2"
__version__ = "3.0.3"
95 changes: 74 additions & 21 deletions ceruleo/dataset/analysis/correlation.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,58 @@
from itertools import combinations
from typing import List, Optional, Tuple
from typing import Dict, List, Optional, Tuple

import pandas as pd
from ceruleo.dataset.ts_dataset import AbstractPDMDataset
from ceruleo.dataset.utils import iterate_over_features
from pydantic import BaseModel


class CorrelationAnalysisElement(BaseModel):
mean_correlation: float
std_correlation: float
max_correlation: float
min_correlation: float
abs_mean_correlation: float
std_abs_mean_correlation: float


class CorrelationAnalysis(BaseModel):
data: Dict[Tuple[str, str], CorrelationAnalysisElement]

def get(self, feature_1: str, feature_2: str) -> CorrelationAnalysisElement:
needle = (feature_1, feature_2)
if needle not in self.data:
needle = (feature_2, feature_1)

if needle not in self.data:
raise KeyError(f"Correlation between {feature_1} and {feature_2} not found")
return self.data[needle]

def to_pandas(self) -> pd.DataFrame:
return (
pd.DataFrame.from_dict(
{(k[0], k[1]): v.model_dump() for k, v in self.data.items()},
orient="index",
)
.reset_index()
.rename(columns={"level_0": "feature_1", "level_1": "feature_2"})
)


def correlation_analysis(
dataset: AbstractPDMDataset,
corr_threshold: float = 0.7,
features: Optional[List[str]] = None,
) -> pd.DataFrame:
) -> CorrelationAnalysis:
"""
Correlation Analysis
Compute the correlation between all the features given an Iterable of executions.
Parameters:
dataset: Dataset of time series
corr_threshold: Threshold to consider two features of a single execution highly correlated
features: List of features to consider when computing the correlations
Returns:
A DataFrame indexed with the column names with the following columns:
A CorrelationAnalysis object with map indexed by two colun names and the following information:s
- Mean Correlation
- Std Correlation
Expand Down Expand Up @@ -51,23 +82,45 @@ def correlation_analysis(
correlated_features.extend(correlated_features_for_execution)

df = pd.DataFrame(correlated_features, columns=["Feature 1", "Feature 2", "Corr"])
output = df.groupby(by=["Feature 1", "Feature 2"]).mean()
output.rename(columns={"Corr": "Mean Correlation"}, inplace=True)
output["Std Correlation"] = df.groupby(by=["Feature 1", "Feature 2"]).std()
output = df.groupby(by=["Feature 1", "Feature 2"]).agg(
{
"Corr": [
"mean",
"std",
"max",
"min",
]
}
)

def percentage_above_treshold(x):
return (x["Corr"].abs() > corr_threshold).mean() * 100
# Calculate additional statistics
output["Abs mean correlation"] = df.groupby(by=["Feature 1", "Feature 2"])[
"Corr"
].apply(lambda x: x.abs().mean())
output["Std abs mean correlation"] = df.groupby(by=["Feature 1", "Feature 2"])[
"Corr"
].apply(lambda x: x.abs().std())

output["Percentage of lives with a high correlation"] = df.groupby(
by=["Feature 1", "Feature 2"]
).apply(percentage_above_treshold)
output.columns = [
"mean_correlation",
"std_correlation",
"max_correlation",
"min_correlation",
"abs_mean_correlation",
"std_abs_mean_correlation",
]

output["Abs mean correlation"] = df.groupby(by=["Feature 1", "Feature 2"]).apply(
lambda x: x.abs().mean()
)
output["Std mean correlation"] = df.groupby(by=["Feature 1", "Feature 2"]).apply(
lambda x: x.abs().std()
output = output.fillna(0)
return CorrelationAnalysis(
data={
(k[0], k[1]): CorrelationAnalysisElement(
mean_correlation=v["mean_correlation"],
std_correlation=v["std_correlation"],
max_correlation=v["max_correlation"],
min_correlation=v["min_correlation"],
abs_mean_correlation=v["abs_mean_correlation"],
std_abs_mean_correlation=v["std_abs_mean_correlation"],
)
for k, v in output.iterrows()
}
)
output["Max correlation"] = df.groupby(by=["Feature 1", "Feature 2"]).max()
output["Min correlation"] = df.groupby(by=["Feature 1", "Feature 2"]).min()
return output
85 changes: 61 additions & 24 deletions ceruleo/dataset/analysis/numerical_features.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,46 @@
from collections import defaultdict

from enum import Enum
from typing import Dict, List, Optional, Union

import antropy as ant
import numpy as np
import pandas as pd
from pydantic import BaseModel
from scipy.stats import spearmanr
from sklearn.feature_selection import mutual_info_regression
from tqdm.auto import tqdm
from uncertainties import ufloat

from ceruleo.dataset.transformed import TransformedDataset
from ceruleo.dataset.ts_dataset import AbstractPDMDataset
from ceruleo.dataset.utils import iterate_over_features_and_target


class MetricType(str, Enum):
std = "std"
correlation = "correlation"
autocorrelation = "autocorrelation"
monotonicity = "monotonicity"
number_of_unique_elements = "number_of_unique_elements"
mutual_information = "mutual_information"
null = "null"
entropy = "entropy"

@staticmethod
def from_str(s: str) -> "MetricType":
return MetricType(s)


class MetricValues(BaseModel):
mean: float
std: float
max: float
min: float


class NumericalFeaturesAnalysis(BaseModel):
feature: str
metric: Dict[MetricType, MetricValues]


def entropy(s: np.ndarray) -> float:
"""
Approximate entropy
Expand Down Expand Up @@ -134,16 +161,18 @@ def mutual_information(x: np.ndarray, y: np.ndarray) -> float:
}


def analysis_single_time_series(
def analysis_single_cycle(
X: np.ndarray,
y: np.ndarray,
out: Dict[str, Dict[MetricType, List[float]]],
column_names: List[str],
data: Optional[Dict] = None,
what_to_compute: List[str] = [],
) -> dict:
):
"""
Compute the analysis for a single run-to-failure cycle
Parameters:
X: Input Features
y: RUL Target
Expand All @@ -152,11 +181,10 @@ def analysis_single_time_series(
what_to_compute: Features to compute
Returns:
Dictionary containing the computed info
A dictionary with the analysis of the features
"""

if data is None:
data = defaultdict(lambda: defaultdict(list))
if len(what_to_compute) == 0:
what_to_compute = list(sorted(metrics.keys()))
for column_index in range(len(column_names)):
Expand All @@ -165,30 +193,34 @@ def analysis_single_time_series(
x_ts = np.squeeze(X.loc[:, column_name].values)

m = metrics[what](x_ts, y)
metric_type = MetricType.from_str(what)
out[column_name][metric_type].append(m)

data[column_name][what].append(m)
return data
return out


def merge_analysis(data: dict) -> pd.DataFrame:
data_df = defaultdict(lambda: defaultdict(list))
def merge_cycle_analysis(
data: Dict[str, Dict[MetricType, List[float]]],
) -> Dict[str, NumericalFeaturesAnalysis]:
out = {k: NumericalFeaturesAnalysis(feature=k, metric={}) for k in data.keys()}
for column_name in data.keys():
for what in data[column_name]:
data_df[column_name][f"{what} Mean"] = ufloat(
np.nanmean(data[column_name][what]),
np.nanstd(data[column_name][what]),
metric_type = MetricType.from_str(what)
out[column_name].metric[metric_type] = MetricValues(
mean=np.nanmean(data[column_name][what]),
std=np.nanstd(data[column_name][what]),
max=np.nanmax(data[column_name][what]),
min=np.nanmin(data[column_name][what]),
)
data_df[column_name][f"{what} Max"] = np.nanmax(data[column_name][what])
data_df[column_name][f"{what} Min"] = np.nanmin(data[column_name][what])
return pd.DataFrame(data_df).T
return out


def analysis(
dataset: Union[TransformedDataset, AbstractPDMDataset],
*,
show_progress: bool = False,
what_to_compute: List[str] = [],
) -> pd.DataFrame:
) -> NumericalFeaturesAnalysis:
"""
Compute analysis of numerical features
Expand All @@ -208,12 +240,11 @@ def analysis(
Returns:
Dataframe with the columns specified by what_to_compute
NumericalFeaturesAnalysis
"""

if len(what_to_compute) == 0:
what_to_compute = list(sorted(metrics.keys()))
data = defaultdict(lambda: defaultdict(list))
iterator = dataset
if show_progress:
iterator = tqdm(iterator)
Expand All @@ -222,7 +253,13 @@ def analysis(
column_names = dataset.transformer.column_names
else:
column_names = dataset.numeric_features()

data_per_cycle = {
k: {MetricType.from_str(what): [] for what in what_to_compute}
for k in column_names
}
for X, y in iterate_over_features_and_target(dataset):
y = np.squeeze(y)
data = analysis_single_time_series(X, y, column_names, data, what_to_compute)
return merge_analysis(data)
analysis_single_cycle(X, y, data_per_cycle, column_names, what_to_compute)

return merge_cycle_analysis(data_per_cycle)
29 changes: 19 additions & 10 deletions ceruleo/dataset/analysis/sample_rate.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import logging
from typing import List, Optional, Tuple
from typing import Optional

import numpy as np
import pandas as pd
from pydantic import BaseModel

from ceruleo.dataset.ts_dataset import AbstractPDMDataset

logger = logging.getLogger(__name__)


class SampleRateAnalysis(BaseModel):
mode: float
mean: float
std: float

def to_pandas(self) -> pd.Series:
return pd.Series(self.model_dump()).to_frame().T


def sample_rate(ds: AbstractPDMDataset, unit: str = "s") -> np.ndarray:
"""Obtain an array of time difference between two consecutive samples
Expand All @@ -30,9 +41,10 @@ def sample_rate(ds: AbstractPDMDataset, unit: str = "s") -> np.ndarray:
return np.array(time_diff)



def sample_rate_summary(
ds: AbstractPDMDataset, unit: Optional[str] = "s"
) -> pd.DataFrame:
) -> SampleRateAnalysis:
"""
Obtain the mean, mode and standard deviation of the sample rate of the dataset
Expand All @@ -41,14 +53,11 @@ def sample_rate_summary(
unit: Unit to convert the time differences
Returns:
A Dataframe with the following columns: Mean sample rate, Std sample rate, Mode sample rate
A SampleRateAnalysis with the following information: Mean sample rate, Std sample rate, Mode sample rate
"""
sr = sample_rate(ds, unit)
return pd.DataFrame(
{
"Mean sample rate": np.mean(sr),
"Std sample rate": np.std(sr),
"Mode sample rate": pd.Series(sr).mode().values[0],
},
index=["Dataset"],
return SampleRateAnalysis(
mean=np.mean(sr),
std=np.std(sr),
mode=pd.Series(sr).mode().values[0],
)
9 changes: 0 additions & 9 deletions ceruleo/dataset/catalog/PHMDataset2018.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,6 @@ def get_key_from_filename(filename: str) -> str:
)
)


def _load_life(self, filename: str) -> pd.DataFrame:
return pd.read_parquet(filename)

def get_time_series(self, i: int) -> pd.DataFrame:
df = self._load_life(self.cycles_metadata.iloc[i]["Filename"])
return df


def prepare_raw_dataset(self):
"""Download and unzip the raw files
Expand Down
Loading

0 comments on commit 0db97ae

Please sign in to comment.