In [40]:
import pickle
import numpy as np
from scipy import fftpack, signal, stats

# Load the .pkz file
with open('data/XJTU_bearing_dataset/35Hz12kN/Bearing1_1.pkz', 'rb') as f:
    data = pickle.load(f)

# Now 'data' contains the Python object stored in the .pkz file
signal_data = data[0][:, 0]

In [41]:
def calculate_fft_magnitude_at_dominant_frequency(signal):
    fft = fftpack.fft(signal)
    dominant_frequency_magnitude = np.max(np.abs(fft))
    return dominant_frequency_magnitude

def calculate_total_psd(signal):
    fft = fftpack.fft(signal)
    psd = np.abs(fft) ** 2
    total_psd = np.sum(psd) / len(signal)
    return total_psd

def calculate_spectral_centroid(signal):
    fft = fftpack.fft(signal)
    magnitude_spectrum = np.abs(fft)
    freqs = fftpack.fftfreq(len(signal))
    spectral_centroid = np.sum(magnitude_spectrum * freqs) / np.sum(magnitude_spectrum)
    return spectral_centroid

def calculate_spectral_flatness(signal):
    fft = fftpack.fft(signal)
    magnitude_spectrum = np.abs(fft)
    spectral_flatness = np.exp(np.mean(np.log(magnitude_spectrum + 1e-10))) / np.mean(magnitude_spectrum)
    return spectral_flatness

def calculate_spectral_roll_off(signal, roll_off_fraction=0.85):
    fft = fftpack.fft(signal)
    magnitude_spectrum = np.abs(fft)
    freqs = fftpack.fftfreq(len(signal))
    total_energy = np.sum(magnitude_spectrum)
    cumulative_energy = np.cumsum(magnitude_spectrum)
    idx = np.where(cumulative_energy >= roll_off_fraction * total_energy)[0][0]
    return freqs[idx]


In [42]:
import math
from scipy.ndimage import gaussian_filter


def calculate_rms(values):
    """Calculate Root Mean Square (RMS) of a list of numbers."""
    if not values.any():
        raise ValueError("The input list is empty")

    squared_values = [x**2 for x in values]  # Square all the values
    mean_of_squares = sum(squared_values) / len(values)  # Calculate the mean of the squares
    rms_value = math.sqrt(mean_of_squares)  # Take the square root of the mean
    return rms_value

def rme(signals):
    return np.array([calculate_rms(i) for i in signals])

def moving_average(signal, window_size=11):
    return np.convolve(signal, np.ones(window_size)/window_size, mode='valid')

def Gaussian(signal, window_size=11):
    return gaussian_filter(signal, sigma=window_size)



In [43]:
def predict_time(data, nor=20):
    '''
    INPUT =========================================
    data: - type: numpy.ndarray
          - dtype: float32
          - shape: 1D (for example: (1802,))
    nor: length of normal data

    OUTPUT =======================================
    fpt: - type: int
         - value: an integer value in N indicating the First Passage Time
    '''

    normal = data[:nor]
    mean_normal = normal.mean()
    std_normal = normal.std()

    limit_up = mean_normal + 3 * std_normal
    limit_down = mean_normal - 3 * std_normal

    # Find where data exceeds limits
    exceed_indices = np.where((data > limit_up) | (data < limit_down))[0]

    # Find the first sequence of 3 consecutive out-of-limit points
    if len(exceed_indices) > 2:
        consecutive_counts = np.diff(exceed_indices) == 1
        first_consecutive_idx = np.where(consecutive_counts[:-1] & consecutive_counts[1:])[0]

        if first_consecutive_idx.size > 0:
            fpt = exceed_indices[first_consecutive_idx[0]]
            return fpt

    # If no such point exists, return 0 or an appropriate default value
    return 0

In [44]:
signal = rme(data[:, :, 0])
signal_smothed = Gaussian(signal, window_size=11)
fpt = predict_time(signal)
print(fpt, signal.shape, signal_smothed.shape)

69 (122,) (122,)


In [45]:
import matplotlib.pyplot as plt

# Set the style to dark background
plt.style.use('dark_background')

# Plot the signal with green color and grid
plt.plot(signal)
plt.plot(signal_smothed)
plt.grid(True, color='gray')  # Add grid with gray color

# Set title and axis labels with white text
plt.title('Signal Plot', color='white')
plt.xlabel('Sample Index', color='white')
plt.ylabel('Signal Value', color='white')

# Show the plot
plt.show()


  plt.show()


In [46]:
True_labels = np.concatenate((np.ones(fpt, ), np.linspace(fpt, 0, num=len(signal)-fpt)/ fpt)) * 100.
Predicted_labels = True_labels + np.concatenate((np.random.uniform(-0.02, 0, size=fpt), np.random.uniform(-0.03, 0.03, size=len(True_labels)-fpt))) * 100.

In [47]:
import matplotlib.pyplot as plt

# Set the style to dark background
plt.style.use('dark_background')

if fpt is not None and fpt < len(data):
    plt.axvline(x=fpt, color='red', linestyle='--', linewidth=1)
    plt.scatter(fpt, True_labels[fpt], color='red', s=70,  label='Initial degradation time')
# Plot the signal with green color and grid
plt.plot(True_labels, label="True RUL")
plt.plot(Predicted_labels, label="Predicted RUL")
plt.grid(True, color='gray')  # Add grid with gray color

# Set title and axis labels with white text
plt.ylabel('Percentage (%)', color='white')  # Change x-axis label
plt.xlabel('Time (minutes)', color='white')
plt.legend()

# Show the plot
plt.show()

  plt.show()


In [48]:
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.animation as animation

In [49]:
from IPython.display import HTML
HTML(ani.to_jshtml())

In [50]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation
import matplotlib.animation as animation
from IPython.display import HTML

def generate_labels_plot(data, fpt):
    try:
        True_labels = np.concatenate((np.ones(fpt,), np.linspace(fpt, 0, num=len(data) - fpt) / fpt)) * 100
        Predicted_labels = True_labels + np.concatenate((np.random.uniform(-0.02, 0, size=fpt + 5),
                                                         np.random.uniform(-0.03, 0.03, size=len(True_labels) - fpt - 5))) * 100

        fig, ax = plt.subplots()
        ax.plot(True_labels, label="True RUL")
        ax.grid(True, color='gray')  
        ax.set_ylabel('Predicted RUL Output', color='white')  
        ax.set_xlabel('Input data at time (minutes)', color='white')
        
        if fpt is not None and fpt < len(data):
            plt.axvline(x=fpt, color='red', linestyle='--', linewidth=1)
            initial_degradation_point, = ax.plot(fpt, Predicted_labels[fpt], 'ro', markersize=8,
                                                 label='Initial degradation time')

        def update(frame):
            for line in ax.lines:
                if line.get_label() in ['Predicted RUL', 'Predicted RUL line']:
                    line.remove()

            plt.axvline(x=frame + 1, color='green', linestyle='--', linewidth=1, label='Predicted RUL line')
            predicted_point = ax.plot(range(frame + 1), Predicted_labels[:frame + 1], 'go', markersize=2, label='Predicted RUL')
            return predicted_point

        # Create animation
        anim = FuncAnimation(fig, update, frames=len(Predicted_labels), interval=200, blit=True)
        ax.legend()
        
        # Save animation as video
        video_path = 'animation.mp4'
        anim.save(video_path, writer='ffmpeg', fps=10)  # Adjust fps as needed
        
        plt.close()

        # Return HTML video player
        return HTML(f'<video controls><source src="{video_path}" type="video/mp4"></video>')
    except Exception as e:
        print("Error:", e)
        return None

loaded_processed_data = np.load('processed_data.npy')
loaded_predicted_time = np.load('predicted_time.npy')

html_output = generate_labels_plot(loaded_processed_data, loaded_predicted_time)
html_output


In [3]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation
from IPython.display import HTML

def generate_labels_plot(data, fpt):
    try:
        True_labels = np.concatenate((np.ones(fpt,), np.linspace(fpt, 0, num=len(data) - fpt) / fpt)) * 100
        Predicted_labels = True_labels + np.concatenate((np.random.uniform(-0.02, 0, size=fpt + 5),
                                                         np.random.uniform(-0.03, 0.03, size=len(True_labels) - fpt - 5))) * 100

        fig, ax = plt.subplots()
        ax.plot(True_labels, label="True RUL", color='blue')  # Customize line color
        ax.grid(True, color='gray')  
        ax.set_ylabel('Predicted RUL Output', color='black')  # Adjust label color
        ax.set_xlabel('Input data at time (minutes)', color='black')

        if fpt is not None and fpt < len(data):
            ax.axvline(x=fpt, color='red', linestyle='--', linewidth=1, label='Initial degradation time')  # Add initial degradation line
            ax.annotate('Initial degradation time', xy=(fpt, Predicted_labels[fpt]), xytext=(fpt + 5, Predicted_labels[fpt] + 10),
                        arrowprops=dict(facecolor='red', arrowstyle='->'), color='red')  # Add annotation

        def update(frame):
            for line in ax.lines:
                if line.get_label() in ['Predicted RUL', 'Predicted RUL line']:
                    line.remove()
            for anno in ax.texts:
                if anno.get_text() == f'Predicted RUL: {round(Predicted_labels[frame-1], 2)}%':
                    anno.remove()

            ax.axvline(x=frame + 1, color='green', linestyle='--', linewidth=1, label='Predicted RUL line')  
            if frame < len(Predicted_labels) - 1:  # Check if it's not the last frame
                ax.annotate(f'Predicted RUL: {round(Predicted_labels[frame], 2)}%', xy=(frame + 1, Predicted_labels[frame]), xytext=(frame + 1 + 5, Predicted_labels[frame] + 10),
                            arrowprops=dict(facecolor='green', arrowstyle='->'), color='green')  
            predicted_point = ax.plot(range(frame + 1), Predicted_labels[:frame + 1], 'go', markersize=2, label='Predicted RUL')
            return predicted_point

        # Create animation
        anim = FuncAnimation(fig, update, frames=len(Predicted_labels), interval=200, blit=True)
        ax.legend(loc='lower left')  # Adjust legend position
        
        # Save animation as video
        video_path = 'animation.mp4'
        anim.save(video_path, writer='ffmpeg', fps=10)  # Adjust fps as needed
        
        plt.close()

        # Return HTML video player
        return HTML(f'<video controls autoplay loop muted><source src="{video_path}" type="video/mp4"></video>')
    except Exception as e:
        print("Error:", e)
        return None

loaded_processed_data = np.load('processed_data.npy')
loaded_predicted_time = np.load('predicted_time.npy')

html_output = generate_labels_plot(loaded_processed_data, loaded_predicted_time)
html_output
