In [31]:
import os
import pickle
import matplotlib.pyplot as plt  # import plt before librosa, conflict!
import librosa
import numpy as np
from tqdm import tqdm
import librosa.display
from sklearn.decomposition import PCA

PICKLE_DIR = 'D:\9999_OneDrive_ZHAW\OneDrive - ZHAW\BA_ZHAW_RTO\pickle'
root = 'Z:\\BA\\mimii_baseline\\dataset'

# Functions

In [32]:
def file_names(root, db_levels=None, ids=None, file_types=None, num_files=None):
    files = []
    
    if db_levels is None:
        db_levels = ['0dB', '6dB', 'min6dB']
    
    if ids is None:
        ids = ['id_00', 'id_02', 'id_04', 'id_06']
        
    if file_types is None:
        file_types = ['normal', 'abnormal']

    combinations = len(db_levels) * len(ids) * len(file_types)
    
    if num_files is not None:
        files_per_combination = num_files // combinations
    else:
        files_per_combination = None

    for db_level in db_levels:
        for id_ in ids:
            for file_type in file_types:
                folder_path = os.path.join(root, db_level, 'pump', id_, file_type)
                
                for idx, file_name in enumerate(os.listdir(folder_path)):
                    if files_per_combination is not None and idx >= files_per_combination:
                        break
                    
                    file_path = os.path.join(folder_path, file_name)
                    files.append(file_path)

    return files


def dataloader(files_list, n_fft=1024, hop_length=512, n_mels=64, frames=5):
    dims = n_mels * frames
    
    for idx in tqdm(range(len(files_list)), desc='Dataloader: '):
        signal, sr = file_load(files_list[idx])
        features = extract_features(
        signal,
        sr,
        n_fft=n_fft,
        hop_length=hop_length,
        n_mels=n_mels,
        frames=frames,
        )
        
        if idx == 0:
            dataset = np.zeros((features.shape[0] * len(files_list), dims), np.float32)
        
        dataset[
            features.shape[0] * idx : features.shape[0] * (idx + 1), :
        ] = features
        
    return dataset
        

def file_load(wav_name, mono=False, channel=0):
    signal, sr = librosa.load(wav_name, mono=False, sr=None)
    if signal.ndim <= 1:
        sound_file = signal, sr
    else:
        sound_file = signal[channel, :], sr

    return sound_file
  
    
def extract_features(signal, sr, n_fft=1024, hop_length=512, 
                     n_mels=64, frames=5):
    mel_spectrogram = librosa.feature.melspectrogram(
        y=signal, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels
    )
    log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)
    features_vector_size = log_mel_spectrogram.shape[1] - frames + 1
    
    dims = frames * n_mels

    if features_vector_size < 1:
        return np.empty((0, dims), np.float32)

    features = np.zeros((features_vector_size, dims), np.float32)
    for time in range(frames):
        features[:, n_mels * time : n_mels * (time + 1)] = log_mel_spectrogram[
            :, time : time + features_vector_size
        ].T

    return features

# pickle I/O
def save_pickle(filename, save_data, root=PICKLE_DIR):
    """
    picklenize the data.

    filename : str
        pickle filename
    data : free datatype
        some data will be picklenized

    return : None
    """
    filepath = os.path.join(root, filename) + '.pickle'
    with open(filepath, 'wb') as sf:
        pickle.dump(save_data, sf)


def load_pickle(filename, root=PICKLE_DIR):
    """
    unpicklenize the data.

    filename : str
        pickle filename

    return : data
    """
    filepath = os.path.join(root, filename) + '.pickle'
    with open(filepath, 'rb') as lf:
        load_data = pickle.load(lf)
    return load_data

In [23]:
# Load all files
all_files = file_names(root)

# Load files from specific dB levels and IDs
specific_files = file_names(root, db_levels=['0dB', '6dB'], ids=['id_00', 'id_04'])

# Load normal files only
normal_files = file_names(root, file_types=['normal'])

# Load a limited number of files from each folder
limited_files = file_names(root, num_files=10)

# Load evenly distributed files across specified dB levels, IDs, and file types
evenly_distributed_files = file_names(root, db_levels=['0dB', '6dB'], ids=['id_00', 'id_04'], num_files=20)

# load data

In [24]:
six_00_norm = file_names(root, db_levels=['6dB'], ids=['id_00'], file_types=['normal'])
six_00_abnorm = file_names(root, db_levels=['6dB'], ids=['id_00'], file_types=['abnormal'])

six_02_norm = file_names(root, db_levels=['6dB'], ids=['id_02'], file_types=['normal'])
six_02_abnorm = file_names(root, db_levels=['6dB'], ids=['id_02'], file_types=['abnormal'])

six_04_norm = file_names(root, db_levels=['6dB'], ids=['id_04'], file_types=['normal'])
six_04_abnorm = file_names(root, db_levels=['6dB'], ids=['id_04'], file_types=['abnormal'])

six_06_norm = file_names(root, db_levels=['6dB'], ids=['id_06'], file_types=['normal'])
six_06_abnorm = file_names(root, db_levels=['6dB'], ids=['id_06'], file_types=['abnormal'])

six_norm = file_names(root, db_levels=['6dB'], ids=['id_00', 'id_02', 'id_04', 'id_06'], file_types=['normal'])
six_abnorm = file_names(root, db_levels=['6dB'], ids=['id_00', 'id_02', 'id_04', 'id_06'], file_types=['abnormal'])

six_all = file_names(root, db_levels=['6dB'], ids=['id_00', 'id_02', 'id_04', 'id_06'])

print(len(six_00_norm), len(six_00_abnorm))
print(len(six_02_norm), len(six_02_abnorm))
print(len(six_04_norm), len(six_04_abnorm))
print(len(six_06_norm), len(six_06_abnorm))
print(len(six_norm), len(six_abnorm))
print(len(six_all))

1006 143
1005 111
702 100
1036 102
3749 456
4205


In [26]:
# takes about 5 minutes
n_fft = 1024
hop_length = 512
n_mels = 64
frames = 5

dim_1 = 313 - frames + 1
dim_2 = n_mels * frames

# Load file loacations
six_norm = file_names(root, db_levels=['6dB'], ids=['id_00', 'id_02', 'id_04', 'id_06'], file_types=['normal'])
six_abnorm = file_names(root, db_levels=['6dB'], ids=['id_00', 'id_02', 'id_04', 'id_06'], file_types=['abnormal'])

# Feature labelling
six_norm_labels = np.zeros(len(six_norm))
six_abnorm_labels = np.ones(len(six_abnorm))

# load six_norm_data normal and abnormal
six_norm_data = dataloader(
    six_norm, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels, frames=frames)
six_abnorm_data = dataloader(
    six_abnorm, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels, frames=frames)


Dataloader: 100%|██████████████████████████████████████████████████████████████████| 3749/3749 [04:07<00:00, 15.14it/s]
Dataloader: 100%|████████████████████████████████████████████████████████████████████| 456/456 [00:31<00:00, 14.67it/s]


In [27]:
print(six_norm_data)
print(len(six_norm_data[0]))

[[-20.155865  -20.236065  -16.026154  ... -31.498413  -33.182472
  -33.891582 ]
 [-12.563705  -14.5558815 -10.9439125 ... -28.492458  -29.713936
  -27.130947 ]
 [-14.09877   -16.142235  -10.725767  ... -30.762085  -32.16601
  -31.544228 ]
 ...
 [-10.915076   -9.411437  -13.841331  ... -45.56337   -46.086052
  -49.170116 ]
 [ -8.277532  -12.288207  -12.049769  ... -44.709644  -47.02185
  -46.44263  ]
 [ -7.603442  -11.675008  -12.90816   ... -47.67592   -48.4262
  -47.76262  ]]
320


In [33]:
save_pickle('six_norm', six_norm)
save_pickle('six_abnorm', six_abnorm)
save_pickle('six_norm_data', six_norm_data)
save_pickle('six_abnorm_data', six_abnorm_data)

In [18]:
six_norm = load_pickle('six_norm')
six_abnorm = load_pickle('six_abnorm')
six_norm_data = load_pickle('six_norm_data')
six_abnorm_data = load_pickle('six_abnorm_data')

FileNotFoundError: [Errno 2] No such file or directory: '/pickle\\six_norm'

# reshape data to 3D

The reshaping step is done to reorganize the data into a 3D format that makes it easier to work with when analyzing audio data. In this case, it seems that dim_1 corresponds to the number of time frames, and dim_2 corresponds to the number of mel frequency bands multiplied by the number of frames.

The reshaping is performed as follows:

normal_data and abnormal_data are reshaped into 3D arrays with dimensions: (samples, dim_1, dim_2). The samples dimension represents the number of audio files, dim_1 represents the time frames, and dim_2 represents the mel frequency bands multiplied by the number of frames.
normal_data_resh and abnormal_data_resh are then concatenated along the first axis (samples) to create a single 3D array called data_resh.

In [4]:
def reshape_data(normal_data, abnormal_data, frames, n_mels):
    dim_1 = 313 - frames + 1
    dim_2 = n_mels * frames

    normal_data_resh = normal_data.reshape(int(normal_data.shape[0] / dim_1), dim_1, dim_2)
    abnormal_data_resh = abnormal_data.reshape(int(abnormal_data.shape[0] / dim_1), dim_1, dim_2)

    return normal_data_resh, abnormal_data_resh


In [5]:
n_mels = 64
frames = 5
six_norm_resh, six_abnorm_resh = reshape_data(six_norm_data, six_abnorm_data, frames=frames, n_mels=n_mels)


In [6]:
print(six_norm_resh[0])
print(len(six_norm_resh[0]))

[[-20.155865  -20.236065  -16.026154  ... -31.498413  -33.182472
  -33.891582 ]
 [-12.563705  -14.5558815 -10.9439125 ... -28.492458  -29.713936
  -27.130947 ]
 [-14.09877   -16.142235  -10.725767  ... -30.762085  -32.16601
  -31.544228 ]
 ...
 [-15.199739  -12.418951  -12.241966  ... -31.563438  -31.347225
  -31.381538 ]
 [-12.651146  -12.740814   -5.7199745 ... -29.44743   -29.09539
  -28.265282 ]
 [-10.292263  -13.19899    -8.907797  ... -29.134132  -28.441452
  -30.727715 ]]
309


# Plot

In [7]:
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
import random
import os

def plot_examples(normal_files, abnormal_files, n_examples=3, hop_length=512, n_fft=1024, random_samples=False):
    fig, axs = plt.subplots(n_examples, 2, figsize=(24, 10 * n_examples))

    if random_samples:
        normal_indices = random.sample(range(len(normal_files)), n_examples)
        abnormal_indices = random.sample(range(len(abnormal_files)), n_examples)
    else:
        normal_indices = list(range(n_examples))
        abnormal_indices = list(range(n_examples))

    for i in range(n_examples):
        # Load and process normal example
        file_norm = normal_files[normal_indices[i]]
        id_norm = os.path.dirname(file_norm).split("\\")[-2]
        print()
        y_norm, sr_norm = librosa.load(file_norm)
        D_norm = librosa.stft(y_norm, hop_length=hop_length, n_fft=n_fft)
        S_db_norm = librosa.amplitude_to_db(np.abs(D_norm), ref=np.max)

        # Plot normal example
        img_norm = librosa.display.specshow(
            S_db_norm,
            sr=sr_norm,
            hop_length=hop_length,
            x_axis='time',
            y_axis='log',
            ax=axs[i, 0]
        )
        axs[i, 0].set_title(f'Pump: {id_norm} - Normal {i+1} - Index #{normal_indices[i]}')
        fig.colorbar(img_norm, ax=axs[i, 0], format="%+2.f dB")

        # Load and process abnormal example
        file_abnorm = abnormal_files[abnormal_indices[i]]
        id_abnorm = os.path.dirname(file_abnorm).split("\\")[-2]
        y_abnorm, sr_abnorm = librosa.load(file_abnorm)
        D_abnorm = librosa.stft(y_abnorm, hop_length=hop_length, n_fft=n_fft)
        S_db_abnorm = librosa.amplitude_to_db(np.abs(D_abnorm), ref=np.max)

        # Plot abnormal example
        img_abnorm = librosa.display.specshow(
            S_db_abnorm,
            sr=sr_abnorm,
            hop_length=hop_length,
            x_axis='time',
            y_axis='log',
            ax=axs[i, 1]
        )
        axs[i, 1].set_title(f'Pump: {id_abnorm} - Abnormal {i+1} - Index #{abnormal_indices[i]}')
        fig.colorbar(img_abnorm, ax=axs[i, 1], format="%+2.f dB")

    plt.tight_layout()
    plt.show()

In [8]:
def plot_single_example(file, example_type='normal', hop_length=512, n_fft=1024):
    # Extract information from the file path
    id_num = os.path.dirname(file).split("\\")[-2]
    file_index = os.path.basename(file).split('.')[0]

    # Load and process the example
    y, sr = librosa.load(file)
    D = librosa.stft(y, hop_length=hop_length, n_fft=n_fft)
    S_db = librosa.amplitude_to_db(np.abs(D), ref=np.max)

    # Plot the example
    fig, ax = plt.subplots(figsize=(24, 10))
    img = librosa.display.specshow(
        S_db,
        sr=sr,
        hop_length=hop_length,
        x_axis='time',
        y_axis='log',
        ax=ax
    )
    ax.set_title(f'Pump: {id_num} - {example_type.capitalize()} - Index #{file_index}')
    fig.colorbar(img, ax=ax, format="%+2.f dB")
    plt.tight_layout()
    plt.show()


In [9]:
# Plot a normal example
plot_single_example(six_norm[0], example_type='normal', hop_length=512, n_fft=1024)

# Plot an abnormal example
plot_single_example(six_abnorm[0], example_type='abnormal', hop_length=512, n_fft=1024)


AttributeError: module 'matplotlib' has no attribute 'axes'

<Figure size 2400x1000 with 0 Axes>

AttributeError: module 'matplotlib' has no attribute 'pyplot'

In [None]:
# Call the function with random_samples=True (default) for random samples
# plot_examples(six_norm, six_abnorm, n_examples=3, hop_length=512, n_fft=1024)

# Call the function with random_samples=False for the first n_examples samples
plot_examples(six_norm, six_abnorm, n_examples=3, hop_length=512, n_fft=1024, random_samples=True)

In [None]:
plot_examples(six_00_norm, six_00_abnorm, n_examples=3, hop_length=512, n_fft=1024, random_samples=False)

plot_examples(six_00_norm, six_00_abnorm, n_examples=3, hop_length=512, n_fft=1024, random_samples=True)


In [None]:
plot_examples(six_02_norm, six_02_abnorm, n_examples=3, hop_length=512, n_fft=1024, random_samples=False)

plot_examples(six_02_norm, six_02_abnorm, n_examples=3, hop_length=512, n_fft=1024, random_samples=True)


# Normalization

In [None]:
def normalize_data(x, lb, ub, max_v=1.0, min_v=-1.0):
    '''
    Max-Min normalize of 'x' with max value 'max_v' min value 'min_v'
    '''

    # Set-up
    if len(ub)==0:
        ub = x.max(0) # OPTION 1
        # applied to the first dimension (0) columns of the data
        #ub = np.percentile(x, 99.9, axis=0, keepdims=True) # OPTION 2:
        
    if len(lb)==0:
        lb = x.min(0) 
        #lb = np.percentile(x, 0.1, axis=0, keepdims=True)
    
    ub.shape = (1,-1)
    lb.shape = (1,-1)           
    max_min = max_v - min_v
    delta = ub-lb

    # Compute
    x_n = max_min * (x - lb) / delta + min_v
    if 0 in delta:
        idx = np.ravel(delta == 0)
        x_n[:,idx] = x[:,idx] - lb[:, idx]

    return x_n, lb, ub 

In [None]:
six_norm_data_n, lb, ub = normalize_data(six_norm_data, [], [], max_v=1.0, min_v=0.0)
print(six_norm_data_n)

six_abnorm_data_n, lb, ub = normalize_data(six_abnorm_data, [], [], max_v=1.0, min_v=0.0)
print(six_abnorm_data_n)


# PCA

In [None]:
pca = PCA(n_components=2)
pca_result = pca.fit_transform(six_norm_data_n)


## PCA of 6dB - normal - all machines 

In [None]:
machine_ids = [os.path.dirname(file).split("\\")[-2] for file in six_norm]

colors = {"id_00": "red", "id_02": "blue", "id_04": "green", "id_06": "purple"}

fig, ax = plt.subplots()
for i, machine_id in enumerate(machine_ids):
    ax.scatter(pca_result[i, 0], pca_result[i, 1], color=colors[machine_id], label=machine_id)

# Create a legend with unique labels
handles, labels = ax.get_legend_handles_labels()
unique_labels = dict(zip(labels, handles))
ax.legend(unique_labels.values(), unique_labels.keys())

ax.set_title(f'6dB - PCA all machines')
ax.set_xlabel("PC 1")
ax.set_ylabel("PC 2")
plt.show()


## PCA of 6dB - normal and abnormal - seperate machines

In [None]:
# Perform PCA on normalized data
pca = PCA(n_components=2)
pca_norm_result = pca.fit_transform(six_norm_data_n)
pca_abnorm_result = pca.fit_transform(six_norm_data_n)

# Create scatter plots for each machine ID with normal and abnormal data
machine_ids = ['id_00', 'id_02', 'id_04', 'id_06']
colors = {"normal": "red", "abnormal": "blue"}

fig, axs = plt.subplots(2, 2, figsize=(10, 10))

for idx, machine_id in enumerate(machine_ids):
    ax = axs[idx // 2, idx % 2]

    # Filter normal and abnormal data by machine ID
    norm_indices = [i for i, file in enumerate(six_norm) if machine_id in file]
    abnorm_indices = [i for i, file in enumerate(six_abnorm) if machine_id in file]

    # Plot normal data
    ax.scatter(pca_norm_result[norm_indices, 0], pca_norm_result[norm_indices, 1], color=colors["normal"], label="normal")
    # Plot abnormal data
    ax.scatter(pca_abnorm_result[abnorm_indices, 0], pca_abnorm_result[abnorm_indices, 1], color=colors["abnormal"], label="abnormal")

    ax.legend()
    ax.set_title(f'6dB - PCA machine {machine_id}')
    ax.set_xlabel("PC 1")
    ax.set_ylabel("PC 2")

plt.tight_layout()
plt.show()


# SNR

Found the formula ond wiki:
https://en.wikipedia.org/wiki/Signal-to-noise_ratio


In [None]:
def snr(signal, noise):
    signal_power = np.mean(signal ** 2)
    noise_power = np.mean(noise ** 2)
    return 10 * np.log10(signal_power / noise_power)

root = 'Z:/BA/mimii_baseline/dataset/'
machine_ids = ['id_00', 'id_02', 'id_04', 'id_06']
db_levels = ['6dB', '0dB', 'min6dB']

snr_results = {}

for db_level in db_levels:
    snr_results[db_level] = {}
    
    for machine_id in machine_ids:
        normal_files = file_names(root, db_levels=[db_level], ids=[machine_id], file_types=['normal'])
        abnormal_files = file_names(root, db_levels=[db_level], ids=[machine_id], file_types=['abnormal'])
        
        snr_values = []
        
        for normal_file, abnormal_file in zip(normal_files, abnormal_files):
            normal_signal, sr = librosa.load(normal_file)
            abnormal_signal, _ = librosa.load(abnormal_file)
            noise = abnormal_signal - normal_signal

            snr_value = snr(normal_signal, noise)
            snr_values.append(snr_value)

        snr_results[db_level][machine_id] = np.mean(snr_values)

print(snr_results)


In [None]:
# Prepare data
dB_levels = list(snr_results.keys())
machine_ids = list(snr_results[dB_levels[0]].keys())
snr_values = [[snr_results[dB_level][machine_id] for machine_id in machine_ids] for dB_level in dB_levels]

# Plot
fig, ax = plt.subplots(figsize=(10, 6))
bar_width = 0.25
x = np.arange(len(machine_ids))

for i, (dB_level, values) in enumerate(zip(dB_levels, snr_values)):
    ax.bar(x + i * bar_width, values, width=bar_width, label=dB_level)

ax.set_ylabel('SNR (dB)')
ax.set_xlabel('Machine ID')
ax.set_title('SNR for different machines and dB levels')
ax.set_xticks(x + bar_width / 2)
ax.set_xticklabels(machine_ids)
ax.legend()

plt.tight_layout()
plt.show()

I'm not sure what to do with the results or how i should interpret it:

negative SNR level means more noise is present in the signal, the noise level is higher than the signal level
--> 6dB should have the highest noise level


on lower dB levels the noise is lower because the SNR gets lower.

blue 6dB should have the highest noise level compared to the others?

green min6dB should have the lowest noise level

within the group 6dB (which has the highest noise level), machine id_04 should perfrom the best with the lowest noise level.
