# Install kymatio

In [None]:
!pip install kymatio

In [None]:
from google.colab import drive
drive.mount('/gdrive')

In [None]:
%cd /gdrive/My Drive/phuc_code_file

In [None]:
import kymatio
kymatio.__version__

# Import libraries

In [None]:
import tensorflow as tf
from sklearn.model_selection import train_test_split
import numpy as np 
import pandas as pd 
from google.colab import widgets
import matplotlib.pyplot as plt
from sklearn import preprocessing 
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.metrics import precision_score, recall_score, f1_score
from utils import file_helper, feature_extraction

# Data Pipeline

In [None]:
train_X = np.load('Dataset/split_data/Binary Split/X_train.npy')
train_y = np.load('Dataset/split_data/Binary Split/y_train.npy')
val_X = np.load('Dataset/split_data/Binary Split/X_val.npy')
val_y = np.load('Dataset/split_data/Binary Split/y_val.npy')
test_X = np.load('Dataset/split_data/Binary Split/X_test.npy')
test_y = np.load('Dataset/split_data/Binary Split/y_test.npy')

#Note:
#class       label_encoded
# Human:          1
# Non_human:      0 

In [None]:
# Reshape feature: 
train_X = np.reshape(train_X,(-1,5704))
val_X = np.reshape(val_X,(-1,5704))
test_X = np.reshape(test_X,(-1,5704))
print(train_X.shape)

In [None]:
BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 100

train_dataset = tf.data.Dataset.from_tensor_slices((train_X, train_y)).shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)
val_dataset = tf.data.Dataset.from_tensor_slices((val_X, val_y)).shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_X, test_y)).batch(BATCH_SIZE)


# Wavelet Scattering Transform Model

In [None]:
J = 8
T = 5704
Q = 12
log_eps = 1e-6

In [None]:
from kymatio.keras import Scattering1D
from tensorflow.keras import layers
tf.keras.backend.clear_session()
def Wavelet_Simple(): 
  input = layers.Input(shape=(T))
  x = Scattering1D(J, Q = Q)(input)
  x = layers.Lambda(lambda x: x[..., 1:, :])(x)
  # To increase discriminability, we take the logarithm of the scattering
  # coefficients (after adding a small constant to make sure nothing blows up
  # when scattering coefficients are close to zero). This is known as the
  # log-scattering transform.
  x = layers.Lambda(lambda x: tf.math.log(tf.abs(x) + log_eps))(x)
  x = layers.GlobalAveragePooling1D(data_format='channels_first')(x)
  x = layers.BatchNormalization(axis=1)(x)
  x = layers.Flatten()(x)
  x = layers.Dense(100, activation="relu")(x)
  output = layers.Dense(1, activation='sigmoid')(x)
  return tf.keras.models.Model(input, output)



In [None]:
model = Wavelet_Simple()

In [None]:
model.summary()

In [None]:
METRICS = [
      tf.keras.metrics.BinaryAccuracy(name='accuracy'),
      tf.keras.metrics.Precision(name='precision'),
      tf.keras.metrics.Recall(name='recall'),
      tf.keras.metrics.AUC(name='auc'),

]

# Optimizers, Callbacks, Reduce LR

In [None]:
from tensorflow.keras.optimizers import RMSprop,Adam
model.compile(optimizer = RMSprop(learning_rate = 1e-3), loss = tf.keras.losses.binary_crossentropy,metrics = [METRICS])

In [None]:
import os 
#Provides unique names for checkpoints and adjust the checkpointing frequency
checkpoint_path = "Wavelet_Bi_RMSprop/cp-{epoch:04d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

#Create a call back that save the model's weights
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath = checkpoint_path,
                                                 save_weights_only = True,
                                                 verbose = 1,
                                                 period = 10)
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=4 ,verbose = 1, min_lr=1e-7)

## Train / Valid

In [None]:
model_train = model.fit(train_dataset,
                        epochs = 50,
                        validation_data = val_dataset,
                        )#,callbacks = [lr_schedule]callbacks = [early_stop],callbacks = [cp_callback]

In [None]:
model.save("Wavelet_Binary")
model.save_weights("Wavelet_Binary_weights")

# Plot Loss vs Accuracy

In [None]:
plt.figure(figsize =(15,4))
plt.plot(model_train.epoch, model_train.history["loss"], label="Train")
plt.plot(model_train.epoch, model_train.history["val_loss"], label="Valid")
plt.xlabel("Epoch")
plt.ylabel("loss")
plt.legend()
plt.show()

In [None]:
plt.figure(figsize =(15,4))
plt.plot(model_train.epoch, model_train.history["accuracy"], label="Train")
plt.plot(model_train.epoch, model_train.history["val_accuracy"], label="Valid")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

## Test

In [None]:
test_eval = model.predict(test_X, batch_size = 64)
model.evaluate(test_X, test_y, batch_size= 64, verbose = 1)


## Classification Report

In [None]:
from sklearn.metrics import classification_report
pred = np.round(test_eval, 0)
pred[pred >= 0.5] = 1
pred[pred < 0.5] = 0
label = ["NON_HUMAN","HUMAN"]
target = ["Class {}".format(i) for i in range(5)]
classification_metrics = classification_report(test_y ,pred, target_names = label)
print(classification_metrics)



In [None]:
from utils.confusion_matrix_pretty_print import plot_confusion_matrix_from_data

columns = ["NON_HUMAN","HUMAN"]
# columns = []
annot = True
cmap = 'Oranges'
fmt = '.2f'
lw = 0.5
cbar = False
show_null_values = 2
pred_val_axis = 'y'
#size::
fz = 12
figsize = [9,9]
if(len(test_y) > 10):
    fz=9; figsize=[14,14]
plot_confusion_matrix_from_data(test_y, pred, columns,annot, cmap, fmt, fz, lw, cbar, figsize, show_null_values, pred_val_axis)