In [1]:
import numpy as np
import scipy
import os
import matplotlib.pyplot as plt
import pandas as pd

In [2]:
class SignalMetrics():

    def __init__(self, pulses: np.ndarray):
        """
        Inicializa a classe com um array de pulsos.

        Args:
            pulses(ndarray): Array NumPy contendo valores dos pulsos.
        """

        if not isinstance(pulses, np.ndarray): # Validações de entrada
            raise TypeError("Pulses devem ser um array NumPy.")
        if pulses.size == 0:
            raise ValueError("Pulses não pode ser vazio.")
        
        self.pulses = pulses
        self._peak_value = None # prefixo _ indica que são variáveis internas (convenção Python)
        self._rms = None
        self._avg_amplitude = None

    @property # permite acessar o método como se fosse um atributo (sem parênteses)
    def peak_value(self) -> float:
        """Calcula e retorna o valor de pico do sinal"""
        if self._peak_value is None:
            self._peak_value = np.max(np.abs(self.pulses), axis=1)
        return self._peak_value
    
    @property
    def rms(self) -> float:
        """Calcula e retorna o valor RMS do sinal"""
        if self._rms is None:
            self._rms = np.sqrt(np.mean(self.pulses**2, axis=1))
        return self._rms
    
    @property
    def avg_amplitude(self) -> float:
        """Calcula e retorna a amplitude média de cada linha de pulsos"""
        if self._avg_amplitude is None:
            self._avg_amplitude = np.mean(np.abs(self.pulses), axis=1)
        return self._avg_amplitude
    
    @property
    def sqrt_amplitude(self) -> float:
        """Calcula e retorna a raiz quadrada da amplitude média da matriz"""
        return np.sqrt(self.avg_amplitude)

    def get_crest_factor(self) -> float:
        """Calcula e retorna o fator de crista da matriz"""
        return self.peak_value / self.rms
    
    def get_clearance_factor(self) -> float:
        """Calcula e retorna o fator de liberação da matriz"""
        return self.peak_value / self.sqrt_amplitude
    
    def impulse_factor(self) -> float:
        """Calcula e retorna o fator de impulso da matriz"""
        return self.peak_value / self.avg_amplitude
    
    def shape_factor(self) -> float:
        """Calcula e retorna o fator de forma da matriz"""
        return self.rms / self.avg_amplitude
    
    def skewness(self) -> float:
        """Calcula e retorna a assimetria da matriz"""
        return scipy.stats.skew(self.pulses, axis=1)    

    def kurtosis(self) -> float:
        """Calcula e retorna a curtose da matriz"""
        return scipy.stats.kurtosis(self.pulses, axis=1)

In [3]:
class Processor:
    """Classe base para processamento de arquivos .mat"""
    def __init__(self, folder: str):
        self.folder = folder
        self.files = self.list_mat_files()

    def list_mat_files(self) -> list:
        """ Lista arquivos .mat no diretório """
        files = [os.path.join(self.folder, f) for f in os.listdir(self.folder) if f.endswith('.mat')]
        return files
    
    def load_mat_data(self, file: str) -> dict:
        """ Carrega dados de um arquivo .mat """
        mat_data = scipy.io.loadmat(file) # carrega arquivo como dicionário
        valid_keys = [key for key in mat_data.keys() if not key.startswith('_')] # pega chaves que não começam com '_'
        
        if not valid_keys:
            raise Exception(f'Arquivo {file} não contém dados válidos')
        return np.array(mat_data[valid_keys[0]])
    
    def process_single_file(self, file: str) -> np.ndarray:
        """ Processa um único arquivo .mat """
        file_path = os.path.join(self.folder, file)

        if not os.path.exists(file_path):
            raise Exception(f'Arquivo {file} não encontrado')
    
        return self.load_mat_data(file_path)
        
    def half_cycle(self, data, file_name):
        # Separação dos dados em semiciclo positivo e negativo
        classes = np.where(data[1, :] <= 180, 'positivo', 'negativo')
        data_w_classes = np.vstack((data, classes))
        
        # Seleção dos índices
        mask_positivo = data_w_classes[-1] == 'positivo'
        mask_negativo = ~mask_positivo
        data_pos = data_w_classes[:, mask_positivo]
        data_neg = data_w_classes[:, mask_negativo]

        # Debugging
        #print("Antes de np.delete - data_pos shape:", data_pos.shape)
        #print("Antes de np.delete - data_neg shape:", data_neg.shape)

        # Remover a linha de classes e converter para float64
        data_pos = np.delete(data_pos, -1, axis=0).astype(np.float64) if data_pos.size > 0 else np.array([])
        data_neg = np.delete(data_neg, -1, axis=0).astype(np.float64) if data_neg.size > 0 else np.array([])

        # Garantir que os arrays sejam bidimensionais
        data_pos = np.atleast_2d(data_pos)
        data_neg = np.atleast_2d(data_neg)
        
        # Mais debugging
        #print("Após np.delete - data_pos shape:", data_pos.shape)
        #print("Após np.delete - data_neg shape:", data_neg.shape)

        # Caso algum dos arrays esteja vazio, ignoramos a normalização de fases
        if data_pos.size > 0 and data_pos.shape[0] > 1:
            data_pos[1, :] = data_pos[1, :] - 180
        else:
            print(f"No arquivo {file_name}, data_pos está vazio ou não tem mais de uma linha")

        if data_neg.size > 0 and data_neg.shape[0] > 1:
            data_neg[1, :] = (data_neg[1, :] - 180)/180
        else:
            print(f"No arquivo {file_name}, data_neg está vazio ou não tem mais de uma linha")   

        return data_pos, data_neg


    def process_files(self) -> list:
        """Implementação específica para processamento de arquivos de barra"""
        processed_files = []
        for file in self.files:
            data = self.load_mat_data(file)
            data_pos, data_neg = self.half_cycle(data, file_name=file)
            processed_files.append((data_pos, data_neg))
        return processed_files

In [4]:
bar_folder_path = r"C:\Users\jmlnn\OneDrive\Engenharia\0. Pesquisa DP\arquivos"
bar_processor = Processor(bar_folder_path)
barra_processed_files = bar_processor.process_files()
barra_processed_files[0][1].shape
""" print(processed_files[0][1]);
    primeiro indice: arquivo, segundo indice: semiciclo (0 = positivo, 1 = negativo) """

' print(processed_files[0][1]);\n    primeiro indice: arquivo, segundo indice: semiciclo (0 = positivo, 1 = negativo) '

In [5]:
tp_folder_path = r"C:\Users\jmlnn\OneDrive\Engenharia\0. Pesquisa DP\arquivosTP"
tp_processor = Processor(tp_folder_path)
tp_processed_files = tp_processor.process_files()

In [6]:
euler3_folder_path = r"C:\Users\jmlnn\OneDrive\Engenharia\0. Pesquisa DP\Euler3"
euler3_processor = Processor(euler3_folder_path)
euler3_processed_files = euler3_processor.process_files()
euler3_processed_files[1][0]

No arquivo C:\Users\jmlnn\OneDrive\Engenharia\0. Pesquisa DP\Euler3\euler_cp3_condi_2.mat, data_pos está vazio ou não tem mais de uma linha
No arquivo C:\Users\jmlnn\OneDrive\Engenharia\0. Pesquisa DP\Euler3\euler_cp3_condi_3.mat, data_pos está vazio ou não tem mais de uma linha


array([], shape=(1, 0), dtype=float64)

In [7]:
euler5_folder_path = r"C:\Users\jmlnn\OneDrive\Engenharia\0. Pesquisa DP\Euler5"
euler5_processor = Processor(euler5_folder_path)
euler5_processed_files = euler5_processor.process_files()
print(f"euler5_processed_files[0][0].shape: {euler5_processed_files[0][0].shape}")
print(f"euler5_processed_files[0][1].shape: {euler5_processed_files[0][1].shape}")

euler5_processed_files[0][0].shape: (403, 1)
euler5_processed_files[0][1].shape: (403, 2)


In [8]:
ponta_folder_path = r"C:\Users\jmlnn\OneDrive\Engenharia\0. Pesquisa DP\ponta"
ponta_processor = Processor(ponta_folder_path)
ponta_processed_files = ponta_processor.process_files()
print(f"ponta_processed_files[0][0].shape: {ponta_processed_files[0][0].shape}")
print(f"ponta_processed_files[0][1].shape: {ponta_processed_files[0][1].shape}")

ponta_processed_files[0][0].shape: (403, 4)
ponta_processed_files[0][1].shape: (403, 4)


In [9]:
barra_dict_pulses = {}
for i in range(len(barra_processed_files)):
    for j in range(2):
        if j == 0:
            barra_dict_pulses[f"sinal {i} semiciclo positivo"] = barra_processed_files[i][j]
        else:
            barra_dict_pulses[f"sinal {i} semiciclo negativo"] = barra_processed_files[i][j]
    
barra_dict_pulses.keys()

dict_keys(['sinal 0 semiciclo positivo', 'sinal 0 semiciclo negativo', 'sinal 1 semiciclo positivo', 'sinal 1 semiciclo negativo', 'sinal 2 semiciclo positivo', 'sinal 2 semiciclo negativo', 'sinal 3 semiciclo positivo', 'sinal 3 semiciclo negativo'])

In [10]:
tp_dict_pulses = {}
for i in range(len(barra_processed_files)):
    for j in range(2):
        if j == 0:
            tp_dict_pulses[f"sinal {i} semiciclo positivo"] = tp_processed_files[i][j]
        else:
            tp_dict_pulses[f"sinal {i} semiciclo negativo"] = tp_processed_files[i][j]
    
tp_dict_pulses.keys()

dict_keys(['sinal 0 semiciclo positivo', 'sinal 0 semiciclo negativo', 'sinal 1 semiciclo positivo', 'sinal 1 semiciclo negativo', 'sinal 2 semiciclo positivo', 'sinal 2 semiciclo negativo', 'sinal 3 semiciclo positivo', 'sinal 3 semiciclo negativo'])

In [12]:
for i, j in barra_processed_files:
    print(i.shape, j.shape) # i: positivo, j: negativo

(403, 29) (403, 23)
(403, 1986) (403, 1986)
(403, 11) (403, 53)
(403, 20) (403, 50)


In [16]:
for i, j in euler3_processed_files:
    print(i.shape, j.shape) # i: positivo, j: negativo

(403, 5) (403, 9)
(1, 0) (403, 5)
(1, 0) (403, 7)
(403, 9) (403, 10)


In [17]:
for i, j in euler5_processed_files:
    print(i.shape, j.shape) # i: positivo, j: negativo

(403, 1) (403, 2)
(403, 2) (403, 4)
(403, 5) (403, 2)
(403, 3) (403, 2)


In [18]:
for i, j in ponta_processed_files:
    print(i.shape, j.shape) # i: positivo, j: negativo

(403, 4) (403, 4)
(403, 665) (403, 477)
(403, 1) (403, 4)
(403, 95) (403, 126)


In [21]:
barra_pos, barra_neg = barra_processed_files[0]
tp_pos, tp_neg = tp_processed_files[0]
# euler3_pos, euler3_neg = euler3_processed_files[0] # stand-by até resolver o que fazer com dados vazios
euler5_pos, euler5_neg = euler5_processed_files[0]
ponta_pos, ponta_neg = ponta_processed_files[0]

In [22]:
from sklearn.model_selection import train_test_split

# Divisão dos dados em treino e teste da barra
barra_pos_train, barra_pos_test = train_test_split(barra_pos, test_size=0.3, random_state=42)
barra_neg_train, barra_neg_test = train_test_split(barra_neg, test_size=0.3, random_state=42)

# Divisão dos dados em treino e teste do TP
tp_pos_train, tp_pos_test = train_test_split(tp_pos, test_size=0.3, random_state=42)
tp_neg_train, tp_neg_test = train_test_split(tp_neg, test_size=0.3, random_state=42)

# Euler3

# Divisão dos dados em treino e teste do Euler5
euler5_pos_train, euler5_pos_test = train_test_split(euler5_pos, test_size=0.3, random_state=42)
euler5_neg_train, euler5_neg_test = train_test_split(euler5_neg, test_size=0.3, random_state=42)

# Divisão dos dados em treino e teste da ponta
ponta_pos_train, ponta_pos_test = train_test_split(ponta_pos, test_size=0.3, random_state=42)
ponta_neg_train, ponta_neg_test = train_test_split(ponta_neg, test_size=0.3, random_state=42)

In [33]:
barra_metrics_sp = SignalMetrics(barra_pos_train)
barra_pos_train_feat = np.array([barra_metrics_sp.peak_value, barra_metrics_sp.rms, barra_metrics_sp.avg_amplitude,
                                barra_metrics_sp.sqrt_amplitude, barra_metrics_sp.get_crest_factor(),
                                barra_metrics_sp.get_clearance_factor(), barra_metrics_sp.impulse_factor(),
                                barra_metrics_sp.shape_factor() ]) # barra_metrics_sp.skewness(), barra_metrics_sp.kurtosis()])

barra_metrics_sn = SignalMetrics(barra_neg_train)
barra_neg_train_feat = np.array([barra_metrics_sn.peak_value, barra_metrics_sn.rms, barra_metrics_sn.avg_amplitude,
                                barra_metrics_sn.sqrt_amplitude, barra_metrics_sn.get_crest_factor(),
                                barra_metrics_sn.get_clearance_factor(), barra_metrics_sn.impulse_factor(),
                                barra_metrics_sn.shape_factor() ]) # barra_metrics_sn.skewness(), barra_metrics_sn.kurtosis()])

barra_metrics_sp_test = SignalMetrics(barra_pos_test)
barra_pos_test_feat = np.array([barra_metrics_sp_test.peak_value, barra_metrics_sp_test.rms, barra_metrics_sp_test.avg_amplitude,
                                barra_metrics_sp_test.sqrt_amplitude, barra_metrics_sp_test.get_crest_factor(),
                                barra_metrics_sp_test.get_clearance_factor(), barra_metrics_sp_test.impulse_factor(),
                                barra_metrics_sp_test.shape_factor() ]) # barra_metrics_sp_test.skewness(), barra_metrics_sp_test.kurtosis()])

barra_metrics_sn_test = SignalMetrics(barra_neg_test)
barra_neg_test_feat = np.array([barra_metrics_sn_test.peak_value, barra_metrics_sn_test.rms, barra_metrics_sn_test.avg_amplitude,
                                barra_metrics_sn_test.sqrt_amplitude, barra_metrics_sn_test.get_crest_factor(),
                                barra_metrics_sn_test.get_clearance_factor(), barra_metrics_sn_test.impulse_factor(),
                                barra_metrics_sn_test.shape_factor() ]) # barra_metrics_sn_test.skewness(), barra_metrics_sn_test.kurtosis()])

In [None]:
tp_metrics_sp = SignalMetrics(tp_pos_train)
tp_pos_train_feat = np.array([tp_metrics_sp.peak_value, tp_metrics_sp.rms, tp_metrics_sp.avg_amplitude,
                                tp_metrics_sp.sqrt_amplitude, tp_metrics_sp.get_crest_factor(),
                                tp_metrics_sp.get_clearance_factor(), tp_metrics_sp.impulse_factor(),
                                tp_metrics_sp.shape_factor() ]) # tp_metrics_sp.skewness(), tp_metrics_sp.kurtosis()])

tp_metrics_sn = SignalMetrics(tp_neg_train)
tp_neg_train_feat = np.array([tp_metrics_sn.peak_value, tp_metrics_sn.rms, tp_metrics_sn.avg_amplitude,
                                tp_metrics_sn.sqrt_amplitude, tp_metrics_sn.get_crest_factor(),
                                tp_metrics_sn.get_clearance_factor(), tp_metrics_sn.impulse_factor(),
                                tp_metrics_sn.shape_factor() ]) # tp_metrics_sn.skewness(), tp_metrics_sn.kurtosis()])

tp_metrics_sp_test = SignalMetrics(tp_pos_test)
tp_pos_test_feat = np.array([tp_metrics_sp_test.peak_value, tp_metrics_sp_test.rms, tp_metrics_sp_test.avg_amplitude,
                                tp_metrics_sp_test.sqrt_amplitude, tp_metrics_sp_test.get_crest_factor(),
                                tp_metrics_sp_test.get_clearance_factor(), tp_metrics_sp_test.impulse_factor(),
                                tp_metrics_sp_test.shape_factor() ]) # tp_metrics_sp_test.skewness(), tp_metrics_sp_test.kurtosis()])
tp_metrics_sn_test = SignalMetrics(tp_neg_test)
tp_neg_test_feat = np.array([tp_metrics_sn_test.peak_value, tp_metrics_sn_test.rms, tp_metrics_sn_test.avg_amplitude,
                                tp_metrics_sn_test.sqrt_amplitude, tp_metrics_sn_test.get_crest_factor(),
                                tp_metrics_sn_test.get_clearance_factor(), tp_metrics_sn_test.impulse_factor(),
                                tp_metrics_sn_test.shape_factor() ]) # tp_metrics_sn_test.skewness(), tp_metrics_sn_test.kurtosis()])

In [32]:
euler5_metrics_sp = SignalMetrics(euler5_pos_train)
# cada parâmetro estatística é uma linha da matriz X_pos_features
euler5_pos_train_feat = np.array([euler5_metrics_sp.peak_value, euler5_metrics_sp.rms, euler5_metrics_sp.avg_amplitude,
                                euler5_metrics_sp.sqrt_amplitude, euler5_metrics_sp.get_crest_factor(),
                                euler5_metrics_sp.get_clearance_factor(), euler5_metrics_sp.impulse_factor(),
                                euler5_metrics_sp.shape_factor() ]) # euler5_metrics_sp.skewness(), euler5_metrics_sp.kurtosis()])
euler5_metrics_sn = SignalMetrics(euler5_neg_train)
# cada parâmetro estatística é uma linha da matriz X_neg_features
euler5_neg_train_feat = np.array([euler5_metrics_sn.peak_value, euler5_metrics_sn.rms, euler5_metrics_sn.avg_amplitude,
                                euler5_metrics_sn.sqrt_amplitude, euler5_metrics_sn.get_crest_factor(),
                                euler5_metrics_sn.get_clearance_factor(), euler5_metrics_sn.impulse_factor(),
                                euler5_metrics_sn.shape_factor() ]) # euler5_metrics_sn.skewness(), euler5_metrics_sn.kurtosis()])

In [None]:
# Create the DataFrame with labeled statistical parameters
feature_names = [
    'Average Amplitude',
    'RMS',
    'Peak Value',
    'Square Root Amplitude',
    'Crest Factor',
    'Clearance Factor',
    'Impulse Factor',
    'Shape Factor',
    'Skewness',
    'Kurtosis'
]

# Create DataFrame with features as columns
df_features = pd.DataFrame(
    barra_pos_train_feat.T,
    columns=feature_names
)

print("Características Estatísticas do semiciclo positivo")
df_features

Características Estatísticas do semiciclo positivo


Unnamed: 0,Average Amplitude,RMS,Peak Value,Square Root Amplitude,Crest Factor,Clearance Factor,Impulse Factor,Shape Factor,Skewness,Kurtosis
0,0.134579,0.152483,0.277914,0.366850,1.822587,0.757568,2.065061,1.133038,-0.450837,-0.457900
1,0.156179,0.170933,0.304423,0.395195,1.780947,0.770309,1.949187,1.094466,-0.468266,0.124711
2,0.176750,0.198353,0.430664,0.420417,2.171197,1.024374,2.436570,1.122224,0.825240,0.923464
3,0.149511,0.179046,0.410456,0.386667,2.292460,1.061525,2.745323,1.197544,0.371186,-0.030124
4,3.443169,3.518325,5.333804,1.855578,1.516006,2.874470,1.549097,1.021828,0.474102,-0.281877
...,...,...,...,...,...,...,...,...,...,...
317,0.160445,0.184272,0.393162,0.400556,2.133598,0.981538,2.450437,1.148500,0.306100,-0.101035
318,7.572534,7.783048,11.414009,2.751824,1.466522,4.147798,1.507291,1.027800,-0.054869,-0.991979
319,0.143607,0.157392,0.332188,0.378955,2.110573,0.876590,2.313175,1.095994,0.477655,1.021759
320,0.158178,0.181110,0.351413,0.397716,1.940333,0.883579,2.221632,1.144974,0.392414,-0.421955


In [43]:
# Create DataFrame with features as columns
df_features = pd.DataFrame(
    X_neg_train_feat.T,
    columns=feature_names
)

print("Características Estatísticas do semiciclo negativo")
df_features

Características Estatísticas do semiciclo negativo


Unnamed: 0,Average Amplitude,RMS,Peak Value,Square Root Amplitude,Crest Factor,Clearance Factor,Impulse Factor,Shape Factor,Skewness,Kurtosis
0,0.092950,0.105953,0.198741,0.304877,1.875742,0.651874,2.138154,1.139898,-0.094811,-0.938270
1,0.088638,0.105390,0.203584,0.297722,1.931716,0.683805,2.296787,1.188988,0.266723,-0.783111
2,0.104874,0.120962,0.266526,0.323843,2.203392,0.823010,2.541384,1.153396,1.262964,1.018871
3,0.089874,0.106791,0.284928,0.299790,2.668087,0.950424,3.170300,1.188229,1.295931,2.804161
4,2.381755,2.467687,4.009614,1.543294,1.624847,2.598089,1.683470,1.036079,0.763161,-0.289745
...,...,...,...,...,...,...,...,...,...,...
317,0.113625,0.127687,0.242421,0.337084,1.898562,0.719171,2.133509,1.123750,0.581138,-0.390068
318,4.663186,4.866903,8.105532,2.159441,1.665439,3.753532,1.738196,1.043686,0.612379,-0.438495
319,0.101515,0.124813,0.316091,0.318614,2.532517,0.992081,3.113736,1.229503,1.634055,2.302725
320,0.107241,0.117860,0.266526,0.327476,2.261374,0.813880,2.485310,1.099026,1.796588,3.278033


In [44]:
metrics_sp_test = SignalMetrics(X_pos_test)

X_pos_test_feat = np.array([
    metrics_sp_test.avg_amplitude,
    metrics_sp_test.rms,
    metrics_sp_test.peak_value,
    metrics_sp_test.sqrt_amplitude,
    metrics_sp_test.get_crest_factor(),
    metrics_sp_test.get_clearance_factor(),
    metrics_sp_test.impulse_factor(),
    metrics_sp_test.shape_factor(),
    metrics_sp_test.skewness(),
    metrics_sp_test.kurtosis()
])

X_pos_test_feat.shape

(10, 81)

In [45]:
metrics_sn_test = SignalMetrics(X_neg_test)

X_neg_test_feat = np.array([
    metrics_sn_test.avg_amplitude,
    metrics_sn_test.rms,
    metrics_sn_test.peak_value,
    metrics_sn_test.sqrt_amplitude,
    metrics_sn_test.get_crest_factor(),
    metrics_sn_test.get_clearance_factor(),
    metrics_sn_test.impulse_factor(),
    metrics_sn_test.shape_factor(),
    metrics_sn_test.skewness(),
    metrics_sn_test.kurtosis()
])

X_neg_test_feat.shape

(10, 81)