In [1]:
import os
import sys
os.environ['CUDA_VISIBLE_DEVICES'] = "0,1,2"
sys.path.append('/root/code')

import h5py as h5
from definitions import LOG_DIR, WEIGHT_DIR, DATASET_DIR
import tensorflow as tf
from tensorflow import keras
import tensorflow.keras.backend as K
import time
import datetime
import numpy as np
from utils.contrastiveLoss import ContrastiveLoss
from models.SCNN18 import SCNN18
from models.SCNN18Regression import SCNN18Regression
# from models.SCNN18_Sigmoid_oneSecond import SCNN18_Sigmoid
from models.Complex_SCNN18_Sigmoid import Complex_SCNN18_Sigmoid
from models.Complex_SCNN18 import Complex_SCNN18
from models.SCNN18_Sigmoid_First4Modified import SCNN18_Sigmoid_First4Modified
from models.SCNN18_sigmoid_res_21pts import SCNN18_sigmoid_res_21pts
from models.SCNN18_Sigmoid import SCNN18_Sigmoid
from models.SCNN18_Sigmoid_threeSecond import SCNN18_Sigmoid_threeSecond
from models.SCNN18_Target_Task import SCNN18_Target_Task
from models.SCNN18_Sigmoid_Target_Task import SCNN18_Sigmoid_Target_Task
from models.SCNN18_random_aug import SCNN18_random_aug
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.callbacks import Callback
import utils.dataset as dataset
import logging
from logging import handlers

In [2]:
LOG = logging.getLogger('root')

def initLog(debug=False):
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s %(levelname)s %(message)s',
        datefmt='%Y-%m-%d %H:%M',
        handlers=[logging.StreamHandler(), handlers.RotatingFileHandler('SCNN18_0.1second.log', "w", 1024 * 1024 * 100, 3, "utf-8")]
    )
    LOG.setLevel(logging.DEBUG if debug else logging.INFO)
    tf.get_logger().setLevel('ERROR')

In [3]:
initLog()

In [4]:
def get_optimizer(optimizer, lr):
    optimizer = optimizer.lower()
    if optimizer == 'adadelta':
        return tf.optimizers.Adadelta() if lr == 0 else tf.optimizers.Adadelta(learning_rate=lr)
    elif optimizer == 'adagrad':
        return tf.optimizers.Adagrad() if lr == 0 else tf.optimizers.Adagrad(learning_rate=lr)
    elif optimizer == 'adam':
        return tf.optimizers.Adam() if lr == 0 else tf.optimizers.Adam(learning_rate=lr)
    elif optimizer == 'adamax':
        return tf.optimizers.Adamax() if lr == 0 else tf.optimizers.Adamax(learning_rate=lr)
    elif optimizer == 'sgd':
        return tf.optimizers.SGD() if lr == 0 else tf.optimizers.SGD(learning_rate=lr)
    elif optimizer == 'rmsprop':
        return tf.optimizers.RMSprop() if lr == 0 else tf.optimizers.RMSprop(learning_rate=lr)

In [5]:
# Define hyper-parameter
lr = 1e-4
max_epochs = 160
batch_size = 64
input_height = 32000
input_width = 1
nb_classes = 10

In [6]:
test_label_range = ['0.5s - 0.6s', '0.6s - 0.7s', '0.7s - 0.8s', '0.8s - 0.9s', '0.9s - 1.0s', 
                    '1.0s - 1.1s', '1.1s - 1.2s', '1.2s - 1.3s', '1.3s - 1.4s', '1.4s - 1.5s']

# test_ds = dataset.load('./SCNN18_0.1second/SCNNR-Jamendo-test.h5')
test_data = dataset.get_dataset_without_label('./SCNN18_0.1second/SCNNR-FMA-test.h5')
test_label = dataset.get_ground_truth('./SCNN18_0.1second/SCNNR-FMA-test.h5')
test_data = test_data[:]
test_label = test_label[:]

# test_ds = tf.data.Dataset.from_tensor_slices((test_data, test_label[:, 0]))

# test_data = dataset.get_dataset_without_label('./SCNN18_0.1second/SCNN-FMA-nogap-test.h5')
# test_label = dataset.get_ground_truth('./SCNN18_0.1second/SCNN-FMA-nogap-test.h5')
# test_ds = tf.data.Dataset.from_tensor_slices((test_data, test_label[:, 9, :]))

# dataset_length = [i for i, _ in enumerate(test_ds)][-1] + 1
# print(dataset_length)

test_dataset = 'SCNNR-FMA-test.h5'
# test_dataset = 'SCNN-RWC-ignoreGap-test.h5'

# test_ds = test_ds.batch(batch_size)
print(test_data.shape)
print(test_label.shape)

(6314, 32000, 1)
(6314, 10)


In [7]:
"""
模型測試用參考程式
----------------------------------------
有些被註解的地方只是方便後續使用先行註解掉，
model.evaluate 被註解的原因是對於多輸出的模型而言，
這個 tensorflow api 無法處理多輸出模型的正確率評估，
它只能處理原始單個輸出模型的正確率評估。
"""

input_shape = (input_height, input_width)

training_weight = "2024-08-06_SCNN18_SimCLR_Sigmoid_down_stream_target_task_Jamendo_15epochs_all_layer_train.h5"

strategy = tf.distribute.MirroredStrategy(devices=[f'/gpu:{i}' for i in range(3)])
with strategy.scope():
    # model = SCNN18(input_shape, nb_classes).model()
    # model = SCNN18Regression(input_shape, nb_classes).model()
    model = SCNN18_Sigmoid_Target_Task(input_shape, nb_classes).model()
    # model = SCNN18_sigmoid_res_21pts(input_shape, nb_classes).model()
    # model = Complex_SCNN18(input_shape, nb_classes).model()
    # model = Complex_SCNN18_Sigmoid(input_shape, nb_classes).model()

    optimizer = get_optimizer('adam', lr)

    # model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    model.summary()
    
    model.load_weights(os.path.join(WEIGHT_DIR, training_weight))

# 
# score, acc = model.evaluate(test_ds, verbose=0)

# LOG.info(f'loaded_weight={training_weight}, test_dataset={test_dataset}, score={score:.6f}, acc={acc:.6f}')
# LOG.info(f'loaded_weight={training_weight}, test_dataset={test_dataset}, second_range={test_label_range[9]}, score={score:.6f}, acc={acc:.6f}')

Model: "functional_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input (InputLayer)           [(None, 32000, 1)]        0         
_________________________________________________________________
stsa (STSA)                  (None, 63, 1024, 1)       4194304   
_________________________________________________________________
random_zoom (RandomZoom)     (None, 63, 1024, 1)       0         
_________________________________________________________________
uscl_conv1 (USCLLayer)       (None, 21, 512, 64)       704       
_________________________________________________________________
uscl_conv2 (USCLLayer)       (None, 21, 512, 64)       8512      
_________________________________________________________________
uscl_conv3 (USCLLayer)       (None, 21, 512, 64)       8512      
_________________________________________________________________
uscl_conv4 (USCLLayer)       (None, 21, 256, 64)      

In [8]:
# test_data = test_data[:]
# test_label = test_label[:, 9, :]

# results = model.predict(test_data)
# predict_true = 0
# data_size = results.shape[0]

# for index, result in enumerate(results):
#     if result[1] < 0.5 and test_label[index][1] == 0:
#         predict_true += 1
#     elif result[1] >= 0.5 and test_label[index][1] == 1:
#         predict_true += 1
#     elif test_label[index][1] == 0.5:
#         data_size -= 1
#         print(0.5)
#         print(data_size)

# acc = predict_true/data_size

# LOG.info(f'loaded_weight={training_weight}, test_dataset={test_dataset}, second_range={test_label_range[9]}, acc={acc:.6f}')

In [9]:
"""
此部分程式碼為實作 model.evaluate 程式的部分
以處理多個輸出模型的正確率評估
"""
results = model.predict(test_data)
print(results.shape)

for i in range(nb_classes):
    predict_true = 0

    for index, result in enumerate(results):
        if result[i] < 0.5 and test_label[index][i] == 0:
            predict_true += 1
        elif result[i] >= 0.5 and test_label[index][i] == 1:
            predict_true += 1

    acc = predict_true/results.shape[0]

    LOG.info(f'loaded_weight={training_weight}, test_dataset={test_dataset}, second_range={test_label_range[i]}, acc={acc:.6f}')

2024-08-06 15:10 INFO loaded_weight=2024-08-06_SCNN18_SimCLR_Sigmoid_down_stream_target_task_Jamendo_15epochs_all_layer_train.h5, test_dataset=SCNNR-FMA-test.h5, second_range=0.5s - 0.6s, acc=0.818340
2024-08-06 15:10 INFO loaded_weight=2024-08-06_SCNN18_SimCLR_Sigmoid_down_stream_target_task_Jamendo_15epochs_all_layer_train.h5, test_dataset=SCNNR-FMA-test.h5, second_range=0.6s - 0.7s, acc=0.805195
2024-08-06 15:10 INFO loaded_weight=2024-08-06_SCNN18_SimCLR_Sigmoid_down_stream_target_task_Jamendo_15epochs_all_layer_train.h5, test_dataset=SCNNR-FMA-test.h5, second_range=0.7s - 0.8s, acc=0.821191
2024-08-06 15:10 INFO loaded_weight=2024-08-06_SCNN18_SimCLR_Sigmoid_down_stream_target_task_Jamendo_15epochs_all_layer_train.h5, test_dataset=SCNNR-FMA-test.h5, second_range=0.8s - 0.9s, acc=0.820874
2024-08-06 15:10 INFO loaded_weight=2024-08-06_SCNN18_SimCLR_Sigmoid_down_stream_target_task_Jamendo_15epochs_all_layer_train.h5, test_dataset=SCNNR-FMA-test.h5, second_range=0.9s - 1.0s, acc=0.81

(6314, 10)


2024-08-06 15:10 INFO loaded_weight=2024-08-06_SCNN18_SimCLR_Sigmoid_down_stream_target_task_Jamendo_15epochs_all_layer_train.h5, test_dataset=SCNNR-FMA-test.h5, second_range=1.0s - 1.1s, acc=0.815965
2024-08-06 15:10 INFO loaded_weight=2024-08-06_SCNN18_SimCLR_Sigmoid_down_stream_target_task_Jamendo_15epochs_all_layer_train.h5, test_dataset=SCNNR-FMA-test.h5, second_range=1.1s - 1.2s, acc=0.817548
2024-08-06 15:10 INFO loaded_weight=2024-08-06_SCNN18_SimCLR_Sigmoid_down_stream_target_task_Jamendo_15epochs_all_layer_train.h5, test_dataset=SCNNR-FMA-test.h5, second_range=1.2s - 1.3s, acc=0.826734
2024-08-06 15:10 INFO loaded_weight=2024-08-06_SCNN18_SimCLR_Sigmoid_down_stream_target_task_Jamendo_15epochs_all_layer_train.h5, test_dataset=SCNNR-FMA-test.h5, second_range=1.3s - 1.4s, acc=0.818657
2024-08-06 15:10 INFO loaded_weight=2024-08-06_SCNN18_SimCLR_Sigmoid_down_stream_target_task_Jamendo_15epochs_all_layer_train.h5, test_dataset=SCNNR-FMA-test.h5, second_range=1.4s - 1.5s, acc=0.81

In [10]:
# results = model.predict(test_data)
# print(results.shape)

# for i in range(1, 10):
#     threshold = i * 0.1
#     accs = []
#     for j in range(nb_classes):
#         predict_true = 0

#         for index, result in enumerate(results):
#             if result[j] < threshold and test_label[index][j] == 0:
#                 predict_true += 1
#             elif result[j] >= threshold and test_label[index][j] == 1:
#                 predict_true += 1

#         acc = predict_true/results.shape[0]
#         accs.append(acc)

#     LOG.info(f'threshold = {threshold:.1f}, loaded_weight={training_weight}, test_dataset={test_dataset}, acc={np.round(np.mean(accs)*100, 4)}%')

In [11]:
# complex output using softmax

# predictions = model.predict(test_data)
# results = np.square(predictions[0]) + np.square(predictions[1])

# predict_true = 0
# data_size = results.shape[0]
# print(data_size)

# for index, result in enumerate(results):
#     if result[0] > result[1] and test_label[index][1] == 0:
#         predict_true += 1
#     elif result[1] > result[0] and test_label[index][1] == 1:
#         predict_true += 1
#     elif result[1] == result[0]:
#         data_size -= 1

# acc = predict_true/data_size

# LOG.info(f'loaded_weight={training_weight}, test_dataset={test_dataset}, acc={acc:.6f}')

In [12]:
# determinate threshold for complex sigmoid

# test_data_for_threshold = test_data[:int(len(test_data)/10)]
# test_label_for_threshold = test_label[:int(len(test_label)/10)]

# predictions = model.predict(test_data_for_threshold)
# results = np.sqrt(np.square(predictions[0]) + np.square(predictions[1]))

# accs = []
# data_size = results.shape[0]
# print(data_size)

# for i in range(0, int(np.sqrt(2)*1000)):
#     predict_true = 0
#     t = i / 1000
#     for index, result in enumerate(results):
#         if result <= t and test_label_for_threshold[index] == 0:
#             predict_true += 1
#         elif result > t and test_label_for_threshold[index] == 1:
#             predict_true += 1

#     acc = predict_true/data_size
#     accs.append(acc)

# threshold = np.argmax(np.array(accs))/1000
# predictions = model.predict(test_data)
# results = np.sqrt(np.square(predictions[0]) + np.square(predictions[1]))
# predict_true = 0
# data_size = results.shape[0]
# print(data_size)

# for index, result in enumerate(results):
#     if result <= threshold and test_label[index] == 0:
#         predict_true += 1
#     elif result > threshold and test_label[index] == 1:
#         predict_true += 1

# acc = predict_true/data_size

# LOG.info(f'loaded_weight={training_weight}, test_dataset={test_dataset}, acc={acc:.6f}')