In [None]:
"""
Created on 15-10-2024
"""

import os
import glob
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.fftpack import fft
from scipy.fft import fft, fftfreq
from scipy.signal import cwt, find_peaks_cwt, ricker
from scipy.signal import butter, lfilter, savgol_filter, hilbert

from IntelliMaint.data_analysis import SOM
from IntelliMaint.rul_models import GPRDegradationModel
from IntelliMaint.eda import ExploratoryAnalysis
from IntelliMaint.health_assessment import HealthIndicator


import pickle as pkl
from imblearn.over_sampling import SMOTE

import sys
sys.path.insert(0, r'C:\Users\DELL\Desktop\Template\IntelliMaint')

from data_acquistion import DataAcquisition
from feature_engineering import TimeDomain, FrequencyDomain
from anomaly_detection import AnomalyDetection
from bearing import Bearing
from diagnostics import Diagnostic



#########################3Data Acquisition##############################################
# Get data from the directory
data_dir_path = r'C:\Users\DELL\Desktop\Template\data\2nd_test\2nd_test/'
data_acquisition = DataAcquisition()

# Get list of files
files = DataAcquisition.get_file_list(data_dir_path)
print(f"Total files found: {len(files)}")
##################################################################################
ber = 0
n_cols = 4

#############Feature Extraction #####################################################
# Lists to store all features and timestamps
all_features = []

for file_path in files:
    df = DataAcquisition.get_data_from_file(file_path)
    bearing = Bearing(files)
    signal = pd.to_numeric(df[str(ber)], errors='coerce').dropna().values
     
    # Extract features from the raw signal
    features = bearing.extract_features(signal)
    all_features.append(features)

# Convert the list of all features into a NumPy array
all_features = np.vstack(all_features)

# Convert all_features to a DataFrame for easier viewing with column name
feature_df = pd.DataFrame(all_features, columns=['kurtosis', 'skewness', 'ftf', 'bpfi', 'bpfo', 'bsf'])

# Print the first few rows of the DataFrame
print("First few rows of all_features:")
print(feature_df.head())
#########################################################################################

# Construct Health Indicator with SOM
normal_data_pts_som = 200
train_som = feature_df[:normal_data_pts_som]
test_som = feature_df
data_analysis = SOM()
som, scaler_ = data_analysis.train(train_som)
mqe = data_analysis.predict(som, test_som, scaler_).reshape(-1, 1)

plt.figure(figsize=(16, 8))
plt.plot(mqe)

# Train Anomaly detection with normal health indicator
normal_data_pts_cosmo = 200
hi = mqe

hi_train = hi[normal_data_pts_som:normal_data_pts_som+normal_data_pts_cosmo]
hi_test = hi[normal_data_pts_som:]
anomaly_detection = AnomalyDetection()
hi_train = pd.DataFrame(hi_train)
anomaly_detection.train_cosmo(hi_train)

# Evaluate health score using health indicator test
hi_test = pd.DataFrame(hi_test)
health_score_test, _ = anomaly_detection.test_cosmo(hi_test)
health_score_test = health_score_test.squeeze()

hi_train = pd.DataFrame(hi_train)
health_score_train, _ = anomaly_detection.test_cosmo(hi_train)
health_score_train = health_score_train.squeeze()

score_train = []
score_test = []

for i in range(hi_train.shape[0]):
    score_train.append(health_score_train[i])

for i in range(hi_test.shape[0]):
    score_test.append(health_score_test[i])

h_score = score_train + score_test
h_score = np.array(h_score)

window_length_filter = 50
h_score_filtered = savgol_filter(h_score, window_length_filter, 1)
plt.figure(figsize=(20, 10))
plt.plot([i for i in range(100, h_score.shape[0])], h_score[100:h_score.shape[0]], label='hi')
plt.plot([i for i in range(100, h_score.shape[0])], h_score_filtered[100:h_score.shape[0]], label='hi_filtered')
plt.ylabel('health score')
plt.xlabel('samples')
plt.legend()
plt.show()

########################### Perform anomaly detection ##################################
incipient_fault_threshold = 0.6
normal_data_pts_som = 200
deg_start_idx, score_till_incipient, initial_deg_pts = anomaly_detection.detect_anomaly(incipient_fault_threshold, normal_data_pts_som, files, ber, n_cols, som, scaler_, data_analysis, anomaly_detection)

min_continuous_deg_pts = 10
score_till_incipient_filtered = savgol_filter(score_till_incipient, 51, 1)
initial_deg_pts_filtered = savgol_filter(initial_deg_pts, 3, 1)

plt.figure(figsize=(20,10))
plt.plot(score_till_incipient, label='data below threshold')
plt.plot(score_till_incipient_filtered, label='data below threshold filtered')
plt.plot([i for i in range(len(score_till_incipient) - min_continuous_deg_pts, len(score_till_incipient))], initial_deg_pts, label='data above threshold')
plt.plot([i for i in range(len(score_till_incipient) - min_continuous_deg_pts, len(score_till_incipient))], initial_deg_pts_filtered, label='data above threshold filtered')
plt.plot([incipient_fault_threshold for i in range(len(score_till_incipient))], linestyle='dashed', color='#ffff00')
plt.text(0, 0.55, "incipient fault threshold")
plt.ylabel('health_score')
plt.xlabel('samples')
plt.legend()
plt.show()

######################### Diagnostics ################################3
#Prepare Data for Diagnostics
df = bearing.prepare_data1(feature_df, deg_start_idx, normal_data_pts_som, 'outer_race')
df.to_csv('2nd_test_bearing_1_outer_race1.csv', index=False)

df_features, labels = bearing.prepare_data2(feature_df, deg_start_idx, normal_data_pts_som, 'outer_race')
print(df_features.head(5))
print(labels.head(5))

# Evaluate Features for Diagnostics
diagnostics = Diagnostic()
labels = df['fault_modes']
diagnostic_features = diagnostics.evaluate_diagnostic_features(df_features, labels)
print("Diagnostic Features (sorted by FDR):")
for feature, fdr in diagnostic_features:
    print(f"{feature}: FDR = {fdr:.4f}")
diagnostics.plot_diagnostic_features(diagnostic_features, top_n=5)