# testing some stuff

In [None]:
import os
import numpy as np
import scipy.io
import matplotlib.pyplot as plt

def load_and_pad_data(file_path, n_time_slices, max_length=999):
    # Load data
    data = scipy.io.loadmat(file_path)
    vibration_data = data['data'].flatten()

    # Sampling frequency
    fs = data.get('fs', 1).flatten()[0]

    # Generate spectrogram
    # The output Pxx is the segments x freqs array of instantaneous power, freqs is the frequency vector, bins are the centers of the time bins
    Pxx, freqs, bins, im = plt.specgram(vibration_data, NFFT=1024, Fs=fs, noverlap=512, scale='dB', mode='magnitude')

    # Close the plot as we only need the data
    plt.close()

    # Select first n time slices
    # print(Pxx.shape)
    selected_slices = Pxx[:, :n_time_slices]

    # Padding
    padded_sequence = np.zeros((Pxx.shape[0], max_length))
    padded_sequence[:, :spectrogram_sequence.shape[1]] = spectrogram_sequence

    return padded_sequence

def normalize_data(data):
    # Flatten the data
    flat_data = data.flatten()

    # Normalize the data
    normalized_data = (flat_data - np.min(flat_data)) / (np.max(flat_data) - np.min(flat_data))

    # Reshape it back to the original shape
    normalized_data = normalized_data.reshape(data.shape)

    return normalized_data

# Path to your folder
data_folder = r"C:\Users\simon\signal_analysis\vibration_anal\vibration_analysis_nov\data\HUST bearing a practical dataset for ball bearing fault diagnosis\HUST bearing a practical dataset for ball bearing fault diagnosis\HUST bearing dataset"

# List of .mat files
mat_files = [os.path.join(data_folder, file) for file in os.listdir(data_folder) if file.endswith('.mat')]

# Number of time slices you want to consider
n_time_slices = 50  # Adjust this based on your requirements

# Process each file
for file in mat_files:
    # Load and extract sequence from spectrogram
    spectrogram_sequence = load_data(file, n_time_slices)

    # Normalize the sequence
    normalized_sequence = normalize_data(spectrogram_sequence)
    print(normalized_sequence.shape)
    break

    # Now, normalized_sequence is ready to be used as input to your model
    # You can proceed with feeding this into your Transformer model


In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input, LayerNormalization, Dropout

# Define a simple Transformer block
def transformer_block(inputs, num_heads, dff, rate=0.1):
    # Multi-head attention and dropout
    attn_output = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=dff)(inputs, inputs)
    attn_output = Dropout(rate)(attn_output)
    out1 = LayerNormalization(epsilon=1e-6)(inputs + attn_output)

    # Feed forward and dropout
    ffn_output = Dense(dff, activation='relu')(out1)
    ffn_output = Dense(inputs.shape[-1])(ffn_output)
    ffn_output = Dropout(rate)(ffn_output)

    # Return output
    return LayerNormalization(epsilon=1e-6)(out1 + ffn_output)

# Define your model
def create_transformer_model(num_time_slices, d_model, num_heads, dff):
    inputs = Input(shape=(num_time_slices, d_model))
    x = transformer_block(inputs, num_heads, dff)
    
    # Output layer for prediction
    outputs = Dense(1, activation='sigmoid')(x[:, 0, :])

    return Model(inputs=inputs, outputs=outputs)

# Initialize the model
model = create_transformer_model(num_time_slices=50, d_model=1024, num_heads=8, dff=2048)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])


In [None]:
import random
import math

num_epochs = 5
shuffled_files = mat_files
random.shuffle(shuffled_files)
training_files = shuffled_files[:math.floor(len(shuffled_files)*0.8)]
testing_files = shuffled_files[math.ceil(len(shuffled_files)*0.2):]

# Example of training loop
for epoch in range(num_epochs):
    for file in training_files:
        # Load and pad data
        padded_sequence = load_and_pad_data(file)
        
        # Prepare labels and other necessary preprocessing steps
        # ...

        # Train your model
        model.train_on_batch(padded_sequence, labels)


# TRAINING

In [None]:
import os
import random
import math
import numpy as np
import scipy.io
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input, LayerNormalization, Dropout, MultiHeadAttention

# Data loading and preprocessing
def load_and_pad_data(file_path, max_length=999):
    data = scipy.io.loadmat(file_path)
    vibration_data = data['data'].flatten()
    fs = data.get('fs', 1).flatten()[0]

    # Generate spectrogram
    Pxx, freqs, bins, im = plt.specgram(vibration_data, NFFT=1024, Fs=fs, noverlap=512, scale='dB', mode='magnitude')
    plt.close()

    # Normalize spectrogram
    Pxx_normalized = (Pxx - np.min(Pxx)) / (np.max(Pxx) - np.min(Pxx))

    # Pad spectrogram
    padded_sequence = np.zeros((Pxx.shape[0], max_length))
    padded_sequence[:, :Pxx_normalized.shape[1]] = Pxx_normalized

    return padded_sequence, Pxx_normalized.shape[1]

# Label generation and normalization
def calculate_and_normalize_labels(sequence_length, max_length, max_time_to_failure):
    time_per_slice = 1  # Adjust this based on your data sampling rate
    labels = np.array([(max_length - i) * time_per_slice for i in range(sequence_length)])
    labels = labels / max_time_to_failure
    return np.pad(labels, (0, max_length - sequence_length), 'constant', constant_values=0)

# Transformer block
def transformer_block(inputs, num_heads, dff, rate=0.1):
    attn_output = MultiHeadAttention(num_heads=num_heads, key_dim=dff)(inputs, inputs)
    attn_output = Dropout(rate)(attn_output)
    out1 = LayerNormalization(epsilon=1e-6)(inputs + attn_output)
    ffn_output = Dense(dff, activation='relu')(out1)
    ffn_output = Dense(inputs.shape[-1])(ffn_output)
    ffn_output = Dropout(rate)(ffn_output)
    return LayerNormalization(epsilon=1e-6)(out1 + ffn_output)

# Create Transformer model
def create_transformer_model(num_time_slices, d_model, num_heads, dff):
    inputs = Input(shape=(num_time_slices, d_model))
    x = transformer_block(inputs, num_heads, dff)
    outputs = Dense(1)(x[:, 0, :])  # Linear activation for regression
    return Model(inputs=inputs, outputs=outputs)

# Main script
data_folder = "your_data_folder_path"
mat_files = [os.path.join(data_folder, file) for file in os.listdir(data_folder) if file.endswith('.mat')]

# Split data into train/test sets
random.shuffle(mat_files)
split_index = math.floor(len(mat_files) * 0.8)
training_files = mat_files[:split_index]
testing_files = mat_files[split_index:]

# Model parameters
num_epochs = 5
max_sequence_length = 999
max_time_to_failure = 1000  # Adjust based on your dataset
num_features = 1024  # Adjust based on the number of frequency bins

# Initialize the model
model = create_transformer_model(max_sequence_length, num_features, num_heads=8, dff=2048)
model.compile(optimizer='adam', loss='mean_squared_error')

# Training loop
for epoch in range(num_epochs):
    for file in training_files:
        # Load and pad data
        padded_sequence, sequence_length = load_and_pad_data(file, max_sequence_length)

        # Calculate and normalize labels
        labels = calculate_and_normalize_labels(sequence_length, max_sequence_length, max_time_to_failure)

        # Expand dimensions to match input shape for the model
        padded_sequence = np.expand_dims(padded_sequence, axis=0)
        labels = np.expand_dims(labels, axis=0)

        # Train the model
        model.train_on_batch(padded_sequence, labels)

In [None]:
import os
import random
import math
import numpy as np
import scipy.io
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input, LayerNormalization, Dropout, MultiHeadAttention

# Data loading and preprocessing
def load_and_process_data(file_path, sequence_length, start_index):
    data = scipy.io.loadmat(file_path)
    spectrogram = data['spectrogram']
    
    # Select a sequence starting at a random index
    if spectrogram.shape[1] > sequence_length:
        selected_spectrogram = spectrogram[:, start_index:start_index + sequence_length]
    else:
        selected_spectrogram = spectrogram

    # Normalize spectrogram
    Pxx_normalized = (selected_spectrogram - np.min(selected_spectrogram)) / (np.max(selected_spectrogram) - np.min(selected_spectrogram))
    return Pxx_normalized

# Label generation and normalization
def calculate_and_normalize_labels(sequence_length, max_time_to_failure):
    time_per_slice = 1  # Adjust this based on your data sampling rate
    labels = np.array([(sequence_length - i) * time_per_slice for i in range(sequence_length)])
    labels = labels / max_time_to_failure
    return labels

# Transformer block
def transformer_block(inputs, num_heads, dff, rate=0.1):
    attn_output = MultiHeadAttention(num_heads=num_heads, key_dim=dff)(inputs, inputs)
    attn_output = Dropout(rate)(attn_output)
    out1 = LayerNormalization(epsilon=1e-6)(inputs + attn_output)
    ffn_output = Dense(dff, activation='relu')(out1)
    ffn_output = Dense(inputs.shape[-1])(ffn_output)
    ffn_output = Dropout(rate)(ffn_output)
    return LayerNormalization(epsilon=1e-6)(out1 + ffn_output)

# Create Transformer model
def create_transformer_model(num_time_slices, d_model, num_heads, dff):
    inputs = Input(shape=(num_time_slices, d_model))
    x = transformer_block(inputs, num_heads, dff)
    outputs = Dense(1)(x[:, 0, :])  # Linear activation for regression
    return Model(inputs=inputs, outputs=outputs)

# Main script
data_folder = r"C:\Users\simon\signal_analysis\vibration_anal\vibration_analysis_nov\data\IMS\processed\sep_spec"  # Replace with your data folder path
mat_files = [os.path.join(data_folder, file) for file in os.listdir(data_folder) if file.endswith('.mat')]

# Split data into train/test sets
random.shuffle(mat_files)
split_index = math.floor(len(mat_files) * 0.8)
training_files = mat_files[:split_index]
testing_files = mat_files[split_index:]

# Model parameters
num_epochs = 5
max_sequence_length = 999  # You can adjust this as needed
max_time_to_failure = 1000  # Adjust based on your dataset
num_features = 1024  # This should match the number of frequency bins in the spectrogram

# Initialize the model
model = create_transformer_model(max_sequence_length, num_features, num_heads=8, dff=2048)
model.compile(optimizer='adam', loss='mean_squared_error')

# Training loop
for epoch in range(num_epochs):
    for file in training_files:
        # Determine the sequence length (could be variable) and starting index
        sequence_length = random.randint(100, max_sequence_length)  # Example range for variable sequence length
        start_index = random.randint(0, max_sequence_length - sequence_length)
        
        # Load and process data
        spectrogram_normalized = load_and_process_data(file, sequence_length, start_index)

        # Calculate and normalize labels
        labels = calculate_and_normalize_labels(sequence_length, max_time_to_failure)

        # Expand dimensions to match input shape for the model
        spectrogram_normalized = np.expand_dims(spectrogram_normalized, axis=0)
        labels = np.expand_dims(labels, axis=0)

        # Train the model
        model.train_on_batch(spectrogram_normalized, labels)

# Save the model after training
model.save(r'C:\Users\simon\signal_analysis\vibration_anal\vibration_analysis_nov\models\first_drafts\model')
