In [1]:
import glob
import os

import pandas as pd
import scipy.io

import numpy as np

In [2]:
cwruData = pd.read_parquet('data/cwru.parquet')

In [3]:
from sklearn.base import TransformerMixin, BaseEstimator

class dataSelector(TransformerMixin, BaseEstimator):

    def __init__(self, columns = 'all', **column_values):
        self.columns = columns
        self.column_values = column_values


    def fit(self, X, y=None):
        
        # Return the transformer
        return self


    def transform(self, X):
        
        X_ = X.copy()
        
        if self.columns == 'all':
            columns = slice(None)
        else:
            columns = self.columns
    
        for column, values in self.column_values.items():
            X_ = X_[X_[column].isin(values)]
    
    
        return X_.loc[:, columns]
    
    
class numpyConverter(TransformerMixin, BaseEstimator):

    def fit(self, X, y=None):
        
        # Return the transformer
        return self


    def transform(self, X):
        
        X_ = X.copy()
    
        return X_.to_numpy()
    

class numpyFlattener(TransformerMixin, BaseEstimator):

    def fit(self, X, y=None):
        
        # Return the transformer
        return self


    def transform(self, X):
        
        X_ = X.copy()
    
        return np.concatenate(X_)
    
class numpyListFilter(TransformerMixin, BaseEstimator):

    def fit(self, X, y=None):
        
        # Return the transformer
        return self


    def transform(self, X):
        
        X_ = X.copy()
    
        return np.array([array for array in X_ if isinstance(array, np.ndarray)])
    
    
class timeSeriesChunker(TransformerMixin, BaseEstimator):
    
    def __init__(self, chunk_size, keep_rest = False):
        self.chunk_size = chunk_size
        self.keep_rest = keep_rest

    def fit(self, X, y=None):
        
        # Return the transformer
        return self


    def transform(self, X):
        
        X_ = X.copy()
        X_ = np.array([np.array(list(self._chunk(array, self.chunk_size, self.keep_rest))) for array in X_])
    
        return X_  

    def _chunk(self, array, length, keep_rest):
        for i in range(0, len(array), length):
            result = array[i:i + length]
            if keep_rest:
                yield result
            else:
                if (len(result) == length):
                    yield result
                    
class FeatureExtractor(TransformerMixin, BaseEstimator):

    def fit(self, X, y=None):
        
        # Return the transformer
        return self


    def transform(self, X):
        
        X_ = X.copy()
        
        aggregated = pd.DataFrame()
        aggregated = \
        aggregated.assign(
                      maximum_value = X_.apply(np.max),
                      standard_value =  X_.apply(lambda X_: np.sqrt(np.mean((X_ - np.max(X_)) ** 2))),
                      mean_value = X_.apply(np.mean),
                      minimum_value = X_.apply(np.min),
                      mean_amplitude = X_.apply(lambda X_: np.max(np.abs(X_))),
                      root_mean_square_value = X_.apply(lambda X_: np.sqrt(np.mean((X_**2)))),
                      skewness_value = X_.apply(lambda X_: np.mean((X_**3))),
                      kurtosis_value = X_.apply(lambda X_: np.mean((X_**4))),
                      square_root_amplitude = X_.apply(lambda X_: np.mean(np.sqrt(np.abs(X_)))**2),
                 )
        
        aggregated = \
        aggregated.assign(
                      peak_to_peak_value = aggregated["maximum_value"] - aggregated["minimum_value"],
                      waveform_indicator = aggregated["root_mean_square_value"] / aggregated["mean_amplitude"],
                      pulse_indicator = aggregated["maximum_value"] / aggregated["mean_amplitude"],
                      kurtosis_index = aggregated["root_mean_square_value"] / aggregated["root_mean_square_value"],
                      peak_index = aggregated["kurtosis_value"] / aggregated["root_mean_square_value"],
                      skewness_indicator = aggregated["skewness_value"] / (aggregated["root_mean_square_value"] ** 4),
                      margin_indicator = aggregated["maximum_value"] / aggregated["square_root_amplitude"]
                 )
        
        X_ = aggregated
    
        return X_.to_numpy()

In [4]:
from sklearn.preprocessing import FunctionTransformer
transformer = FunctionTransformer(lambda array: pd.DataFrame(array))

In [5]:
from sklearn.pipeline import Pipeline

prep = \
Pipeline(steps=[('dataSelector', dataSelector(columns = ['baseData', 'driveEndData'], faultDiameter = [0.007])),
                ('numpyConverter', numpyConverter()),
                ('numpyFlattener', numpyFlattener()),
                ('numpyListFilter', numpyListFilter()),
                ('timeSeriesChunker', timeSeriesChunker(2000)),
                ('numpyFlattener2', numpyFlattener()),
                ('transformer', FunctionTransformer(lambda array: pd.DataFrame(array).T)),
                ('featureExtractor', FeatureExtractor()),
                ('reshaper', FunctionTransformer(lambda array: [a.reshape(4,4) for a in array]))])

X = prep.transform(cwruData)

