# Import Packages

In [None]:
import pandas            as pd
import numpy             as np
import matplotlib.pyplot as plt
import seaborn           as sb
import scipy.stats       as sp
import tensorflow        as tf
from tensorflow                 import keras
from keras.models    import Model
from sklearn.model_selection    import train_test_split

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Prepare Dataset for Visualization

In [None]:
# Define each condition's name and number of samples
cond = ['Tool Change', 'Chip Conveyer', 'Moving X axis', 'Moving Y axis', 'Moving Z axis', 'Spindle Movement']

NoOfData = pd.DataFrame([96, 161, 144, 41, 34, 147])
NoOfData

In [None]:
# Load STFT dataset that you saved in ML8
DataSet = np.load('/content/drive/MyDrive/SoundSTFT.npy')[:,:,:-1]
DataSet.shape

In [None]:
# Check the width and height of each STFT spectrogram
width, height = DataSet.shape[1], DataSet.shape[2]
width, height

In [None]:
# Scale the Dataset
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from joblib import dump, load

DataSet_2d = DataSet.reshape(DataSet.shape[0],-1)
print(DataSet_2d.shape)

scaler_std = StandardScaler()
scaler_mMx = MinMaxScaler()
DataSet_2d_scaled_1 = scaler_std.fit_transform(DataSet_2d)
DataSet_2d_scaled_2 = scaler_mMx.fit_transform(DataSet_2d_scaled_1)

DataSet_scaled = DataSet_2d_scaled_2.reshape(DataSet.shape[0], DataSet.shape[1], DataSet.shape[2])
print(DataSet_scaled.shape)

DataSet_rsp = DataSet_scaled.reshape(DataSet_scaled.shape[0], DataSet_scaled.shape[1], DataSet_scaled.shape[2], 1)
print(DataSet_rsp.shape)

In [None]:
# Prepare train/test dataset for Conv_AE modeling
Train_Test_Ratio = 0.2

DataSet_1 = DataSet_rsp[sum(NoOfData.iloc[:0,0]):sum(NoOfData.iloc[:0,0])+NoOfData.iloc[0,0]]
DataSet_2 = DataSet_rsp[sum(NoOfData.iloc[:1,0]):sum(NoOfData.iloc[:1,0])+NoOfData.iloc[1,0]]
DataSet_3 = DataSet_rsp[sum(NoOfData.iloc[:2,0]):sum(NoOfData.iloc[:2,0])+NoOfData.iloc[2,0]]
DataSet_4 = DataSet_rsp[sum(NoOfData.iloc[:3,0]):sum(NoOfData.iloc[:3,0])+NoOfData.iloc[3,0]]
DataSet_5 = DataSet_rsp[sum(NoOfData.iloc[:4,0]):sum(NoOfData.iloc[:4,0])+NoOfData.iloc[4,0]]
DataSet_6 = DataSet_rsp[sum(NoOfData.iloc[:5,0]):sum(NoOfData.iloc[:5,0])+NoOfData.iloc[5,0]]

TrainD1, TestD1, = train_test_split(DataSet_1, test_size=Train_Test_Ratio, random_state = 777)
TrainD2, TestD2, = train_test_split(DataSet_2, test_size=Train_Test_Ratio, random_state = 777)
TrainD3, TestD3, = train_test_split(DataSet_3, test_size=Train_Test_Ratio, random_state = 777)
TrainD4, TestD4, = train_test_split(DataSet_4, test_size=Train_Test_Ratio, random_state = 777)
TrainD5, TestD5, = train_test_split(DataSet_5, test_size=Train_Test_Ratio, random_state = 777)
TrainD6, TestD6, = train_test_split(DataSet_6, test_size=Train_Test_Ratio, random_state = 777)

TrainData  = np.concatenate([TrainD1, TrainD2, TrainD3, TrainD4, TrainD5, TrainD6], axis=0)
TestData   = np.concatenate([TestD1 , TestD2 , TestD3 , TestD4, TestD5, TestD6 ], axis=0)

TrainData.shape , TestData.shape

# Convolutional Autoencoder (Conv_AE) Model Training

In [None]:
# Build a class for customized training process confirmation
PrintAccPerEpochs = 100

class MAE_PerEpoch(keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        keras.callbacks.Callback()
        if epoch%PrintAccPerEpochs == 0:
            print("[{} Epochs] Loss(MAE) : {:.4f} ".format(epoch, logs["mae"]))

In [None]:
# pytictoc is an useful tool for time calculation
!pip install pytictoc
from pytictoc import TicToc

In [None]:
# Define hyperparameter
Epoch = 100

DenseUnits_1 = 300
DenseUnits_2 = 30

In [None]:
t = TicToc()

# Design a Conv_AE model
################################################################################
input = keras.layers.Input(shape=(TrainData.shape[1], TrainData.shape[2], TrainData.shape[3]))

# Encoder
x = keras.layers.Conv2D(4 , kernel_size=(3,3), padding = 'same', activation = 'selu')(input)
x = keras.layers.MaxPool2D(pool_size=(2,2), padding='same')(x)
x = keras.layers.Conv2D(8 , kernel_size=(3,3), padding = 'same', activation = 'selu')(x)
x = keras.layers.MaxPool2D(pool_size=(2,2), padding='same')(x)
x = keras.layers.Conv2D(16, kernel_size=(3,3), padding = 'same', activation = 'selu')(x)
x = keras.layers.MaxPool2D(pool_size=(2,2), padding='same')(x)
x = keras.layers.Flatten()(x)
x = keras.layers.Dense(units=DenseUnits_1, activation = 'selu')(x)

# Bottle neck layer (Latent space)
x = keras.layers.Dense(units=DenseUnits_2 , activation = 'selu')(x)

# Decoder
x = keras.layers.Dense(units=DenseUnits_1 , activation = 'selu')(x)
x = keras.layers.Dense(units=7040 , activation = 'selu')(x)
x = keras.layers.Reshape((5, 88, 16))(x)
x = keras.layers.Conv2DTranspose(16, kernel_size=(3,3), padding = 'same', activation = 'selu', strides = 2)(x)
x = keras.layers.Conv2DTranspose(8 , kernel_size=(3,3), padding = 'same', activation = 'selu', strides = 2)(x)
x = keras.layers.Conv2DTranspose(1 , kernel_size=(3,3), padding = 'same', activation = 'selu', strides = 2)(x)

Conv_AE = Model(input, x)

# Metric for optimization: Mean Absolute Error (MAE) = Reconstruction Error
Conv_AE.compile(loss="mae", optimizer='adam', metrics=['mae'])
Conv_AE.summary()

print('Latent space dimension : %d'%(DenseUnits_2))
################################################################################


# Conv_AE training
################################################################################
print("\n↓↓↓↓↓ Start Conv_AE training ↓↓↓↓↓\n")

t.tic() # training start time

tf.random.set_seed(777)
history = Conv_AE.fit(TrainData, TrainData, epochs=Epoch, verbose=0,
                      validation_data=(TestData, TestData), callbacks=[MAE_PerEpoch()])

t.toc() # training end time
time_s = t.tocvalue()
################################################################################

# Evaluation accuracy with TestData based on trained model
Loss, Final_MAE = Conv_AE.evaluate(TestData,  TestData, verbose=0)
print("\n[Final Epochs] MAE : %.4f"%(Final_MAE))
print("Training time : %.4f seconds / %.4f seconds"%(time_s, time_s/60))

In [None]:
# Save the trained model
Conv_AE.save("/content/drive/MyDrive/ML10/ConvAE_LsDim_%d.h5"%(DenseUnits_2))

# Save the training history
Hist = pd.DataFrame(np.zeros((Epoch,2)))
Hist.iloc[:,0] = np.array(history.history['loss'])
Hist.iloc[:,1] = np.array(history.history['val_loss'])
Hist.to_csv("/content/drive/MyDrive/ML10/ConvAE_LsDim_%d_history.csv"%(DenseUnits_2),header=None,index=None)

# Compare Original Data vs Reconstructed Data

In [None]:
# Load the saved model
Conv_AE_load = keras.models.load_model("/content/drive/MyDrive/ML10/ConvAE_LsDim_%d.h5"%(DenseUnits_2))

In [None]:
# Randomly select a data sample
datanum = np.random.randint(0,TestData.shape[0]-1)

Original = TestData[datanum].reshape(width, height)
Restored = Conv_AE_load.predict(TestData[datanum:datanum+1]).reshape(width, height)

print('Test data sample : %d'%(datanum))

time = np.arange(0,  1+1/704, 1/703)
freq = np.arange(0, 24, 24/40)

plt.figure(figsize=(10,4))

plt.subplot(1,2,1)
plt.title('Orignal Data (Test_%d)'%(datanum))
plt.pcolormesh(time, freq, Original, cmap = 'summer')
plt.xlabel('Time (s)')
plt.ylabel('Frequency (kHz)')
plt.ylim([0,6])

plt.subplot(1,2,2)
plt.title('Restored Data (Dim: %d)'%(DenseUnits_2))
plt.pcolormesh(time, freq, Restored, cmap = 'YlGn_r')
plt.xlabel('Time (s)')
plt.ylim([0,6])

plt.show()

# Extract Latent Space Features

In [None]:
from sklearn.decomposition import KernelPCA
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
import joblib

In [None]:
# Set ranges for each class
range_1_1, range_1_2 = sum(NoOfData.iloc[:0,0]), sum(NoOfData.iloc[:0,0])+NoOfData.iloc[0,0]
range_2_1, range_2_2 = sum(NoOfData.iloc[:1,0]), sum(NoOfData.iloc[:1,0])+NoOfData.iloc[1,0]
range_3_1, range_3_2 = sum(NoOfData.iloc[:2,0]), sum(NoOfData.iloc[:2,0])+NoOfData.iloc[2,0]
range_4_1, range_4_2 = sum(NoOfData.iloc[:3,0]), sum(NoOfData.iloc[:3,0])+NoOfData.iloc[3,0]
range_5_1, range_5_2 = sum(NoOfData.iloc[:4,0]), sum(NoOfData.iloc[:4,0])+NoOfData.iloc[4,0]
range_6_1, range_6_2 = sum(NoOfData.iloc[:5,0]), sum(NoOfData.iloc[:5,0])+NoOfData.iloc[5,0]

In [None]:
# Set LS_dim (latent space's dimensionality)
LS_dim = DenseUnits_2

# Search the bottle neck layer (= latent space)
x = pd.DataFrame(np.zeros((len(Conv_AE_load.layers), 2)), dtype=np.int64)

for i, layer in enumerate(Conv_AE_load.layers):

    temp_x = 1
    if i == 0:
        for j in range(1, len(Conv_AE_load.layers[i].output_shape[0])):
            temp_x = temp_x * layer.output_shape[0][j]
    else:
        for j in range(1, len(Conv_AE_load.layers[i].output_shape)):
            temp_x = temp_x * layer.output_shape[j]

    x.iloc[i,0] = i
    x.iloc[i,1] = temp_x

x_rank            = x.sort_values([1],ascending=True)
bottle_neck_layer = x_rank.iloc[0,0]
bottle_neck_dim   = x_rank.iloc[0,1]
layer_name        = Conv_AE_load.layers[bottle_neck_layer].name

print('\n\n########################## \n')
print('Latent space dimension : %d'%(LS_dim))
print('\n########################## \n\n')

print('bottle neck layer name : ' + layer_name)
print('bottle neck dimension  : %d'%(bottle_neck_dim))

# Set a model that outputs both latent space features and final prediction
model        = Conv_AE_load
Visual_model = tf.keras.models.Model([model.inputs], [model.get_layer(layer_name).output, model.output])

# Get latente space features from DataSet using Visaul_model
Latent_space = np.zeros((DataSet_rsp.shape[0] , bottle_neck_dim))

for i in range(DataSet_rsp.shape[0]):
    temp_sample = DataSet_rsp[i:i+1]
    Ls_output, pred = Visual_model(temp_sample)
    Latent_space[i] = Ls_output

# Scale the latent space features and save the scaler and features
scaler = MinMaxScaler()
Latent_space_std = scaler.fit_transform(Latent_space)

VslModel_name = 'ConvAE_LatentDim_%d'%(LS_dim)
joblib.dump(scaler, "/content/drive/MyDrive/ML10/scaler_" + VslModel_name + ".save")

pd.DataFrame(Latent_space).to_csv('/content/drive/MyDrive/ML10/LS_output_D%d.csv'%(LS_dim), index=None)
pd.DataFrame(Latent_space_std).to_csv('/content/drive/MyDrive/ML10/LS_output_D%d_scaled.csv'%(LS_dim), index=None)

# Print results
print('\nLatent space feature (scaled) data shape :')
print(Latent_space_std.shape)

In [None]:
Latent_space = pd.read_csv('/content/drive/MyDrive/ML10/LS_output_D%d.csv'%(LS_dim))
Latent_space

# Visualization of Latent Space Features by t-SNE

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.manifold      import TSNE
import time

In [None]:
####### Load latant space features #######
print('\n\n########################## \n')
print('Latent space dimension : %d'%(LS_dim))
print('\n########################## \n\n')

############ Execute t-SNE ##################
tsne         = TSNE(n_components=2, verbose=1, perplexity=100, n_iter=500, random_state=1)
tsne_results = tsne.fit_transform(Latent_space)
print('\nt-SNE result data shape :')
print(tsne_results.shape)

np.save(f'/content/drive/MyDrive/ML10/tSNE_D{LS_dim}.npy', tsne_results)

In [None]:
VslModel_name = 'ConvAE_LatentDim_%d'%(LS_dim)

plt.figure(figsize=(7,5))

plt.scatter(tsne_results[range_1_1:range_1_2,0], tsne_results[range_1_1:range_1_2,1], c='tab:red'    , label=cond[0], s=7)
plt.scatter(tsne_results[range_2_1:range_2_2,0], tsne_results[range_2_1:range_2_2,1], c='tab:orange' , label=cond[1], s=7)
plt.scatter(tsne_results[range_3_1:range_3_2,0], tsne_results[range_3_1:range_3_2,1], c='tab:green'  , label=cond[2], s=7)
plt.scatter(tsne_results[range_4_1:range_4_2,0], tsne_results[range_4_1:range_4_2,1], c='tab:cyan'   , label=cond[3], s=7)
plt.scatter(tsne_results[range_5_1:range_5_2,0], tsne_results[range_5_1:range_5_2,1], c='tab:blue'   , label=cond[4], s=7)
plt.scatter(tsne_results[range_6_1:range_6_2,0], tsne_results[range_6_1:range_6_2,1], c='tab:gray'   , label=cond[5], s=7)

plt.title('Latent space (t-SNE) - ' + VslModel_name, fontsize=12)
plt.grid(alpha=0.3)
plt.legend(fontsize=8)
plt.xlabel('t-SNE_1')
plt.ylabel('t-SNE_2')

fig = plt.gcf()
fig.savefig("/content/drive/MyDrive/ML10/tSNE_LSdim_%d.png"%(LS_dim), dpi=fig.dpi)

plt.show()