In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import callbacks
import os
import time
import pickle
from sklearn.model_selection import train_test_split
from model.expert_5_lbp import MyModel


In [None]:
# 进行gpu的设置
tf.debugging.get_log_device_placement()  # 会将运算属于哪个gpu给打印出来
gpus = tf.config.experimental.list_physical_devices("GPU")  # 物理gpu
tf.config.experimental.set_visible_devices(gpus[0], "GPU")  # 设置某个物理gpu为可见，即为逻辑gpu
# tf.config.experimental.set_virtual_device_configuration(  # 对GPU进行逻辑拆分
#     gpus[0],
#     [
#         tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048),
#         tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048),
#     ]
# )
# for gpu in gpus:
#     tf.config.experimental.set_memory_growth(gpu, True)  # 设置gpu占用内存为自增长
logical_gpus = tf.config.experimental.list_logical_devices("GPU")  # 逻辑gpu
print(f"物理GPU数量：{len(gpus)}，逻辑GPU数量：{len(logical_gpus)}")


In [None]:
# 初始化变量
# 训练常量
batch_size = 64
image_h = 48
image_w = 48
image_target_h = 48
image_target_w = 48
seed = 7
class_num = 7
epochs = 200
data_path = r"pickle_dataset"
# 固定随机种子
tf.keras.utils.set_random_seed(seed)
tf.config.experimental.enable_op_determinism()
def scheduler(epoch):
    # 前5个epoch学习率保持不变，5个epoch后学习率按比例衰减
    if epoch < 10:
        return 0.01
    elif epoch < 100:
        return 0.001
    elif epoch < 150:
        return 0.0001
    else:
        return 0.0001

In [None]:
# 将全部数据读入进来，然后使用sklearn的函数进行拆分
with open(os.path.join(data_path, "ori_data_simple_static_strong_lbp.pickle"), 'rb') as fp:
    data, lbp_data, label = pickle.load(fp)
print("已读入")

# 统一转换
data = [tf.constant(i) / 255 for i in data]
lbp_data = [tf.constant(i) / 255 for i in lbp_data]
label = [tf.constant(i) for i in label]

# 分割数据
lst = list(range(len(data)))
train_lst, valid_lst, train_lst, valid_lst  = train_test_split(lst, lst, test_size=0.2, random_state=seed, stratify=label)
train_data = [data[i] for i in train_lst]
train_data_lbp = [lbp_data[i] for i in train_lst]
train_label = [label[i] for i in train_lst]
valid_test_data = [data[i] for i in valid_lst]
valid_test_data_lbp= [lbp_data[i] for i in valid_lst]
valid_test_label = [label[i] for i in valid_lst]

lst = list(range(len(valid_test_data)))
valid_lst, test_lst, valid_lst, test_lst  = train_test_split(lst, lst, test_size=0.5, random_state=seed, stratify=valid_test_label)
valid_data = [valid_test_data[i] for i in valid_lst]
valid_data_lbp = [valid_test_data_lbp[i] for i in valid_lst]
valid_label = [valid_test_label[i] for i in valid_lst]
test_data = [valid_test_data[i] for i in test_lst]
test_data_lbp = [valid_test_data_lbp[i] for i in test_lst]
test_label = [valid_test_label[i] for i in test_lst]

print(f"可用数据量：trian:{len(train_data)}, valid:{len(valid_data)}, test:{len(test_data)}")


In [None]:
def augment_image(image, label):
    target = tf.one_hot(label, class_num)
    return image, target


# 将数据处理称Dataset对象
time1 = time.time()
train_dataset = tf.data.Dataset.from_tensor_slices(((train_data, train_data_lbp), train_label)).map(augment_image,
                                                                                    num_parallel_calls=tf.data.experimental.AUTOTUNE) \
    .shuffle(50000, seed=seed).batch(batch_size).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
print(f"训练数据集train_dataset准备完毕, 用时：{time.time() - time1:.2f}s")

time1 = time.time()
valid_dataset = tf.data.Dataset.from_tensor_slices(((valid_data, valid_data_lbp), valid_label)).map(augment_image,
                                                                                    num_parallel_calls=tf.data.experimental.AUTOTUNE) \
    .batch(batch_size)
print(f"测试数据集valid_dataset准备完毕, 用时{time.time() - time1:.2f}s")

time1 = time.time()
test_dataset = tf.data.Dataset.from_tensor_slices(((test_data, test_data_lbp), test_label)).map(augment_image,
                                                                                    num_parallel_calls=tf.data.experimental.AUTOTUNE) \
    .batch(batch_size)
print(f"测试数据集test_dataset准备完毕, 用时{time.time() - time1:.2f}s")


In [None]:
temp = dict()
for _, j in train_dataset:
    for i in tf.argmax(j, axis=1).numpy():
        if temp.get(i) is None:
            temp[i] = 1
        else:
            temp[i] += 1
print(temp)
temp = dict()
for _, j in valid_dataset:
    for i in tf.argmax(j, axis=1).numpy():
        if temp.get(i) is None:
            temp[i] = 1
        else:
            temp[i] += 1
print(temp)
temp = dict()
for _, j in test_dataset:
    for i in tf.argmax(j, axis=1).numpy():
        if temp.get(i) is None:
            temp[i] = 1
        else:
            temp[i] += 1
print(temp)

In [None]:
model = MyModel()
model.build(input_shape=[(None, 48, 48, 1), (None, 48, 48, 1)])
model.summary()
model.compile(
    loss=tf.keras.losses.CategoricalCrossentropy(),
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    metrics=['accuracy']
)

In [None]:

def train(model, model_sign, data_sign):
    # 训练
    # tensorboard
    dir_path = os.path.join("./train_results",
                            f"train_result_sign_{model_sign}_{data_sign}")
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    cnt = 0
    for i in os.listdir(dir_path):
        cnt = max(cnt, int(i))
    cnt += 1
    os.mkdir(os.path.join(dir_path, str(cnt)))
    tensor_board_dir = os.path.join(dir_path, str(cnt), f"tensor_board_seed_{seed}")
    model_check_point_dir = os.path.join(dir_path, str(cnt), f"model_check_point_seed_{seed}")
    callback = [
        callbacks.TensorBoard(tensor_board_dir),
        callbacks.ModelCheckpoint(filepath=model_check_point_dir, save_best_only=True, monitor="val_accuracy"),
        callbacks.ReduceLROnPlateau(monitor='val_accuracy', patience=10, mode='auto', factor=0.7),
        # callbacks.LearningRateScheduler(scheduler),
        callbacks.EarlyStopping(monitor='loss', patience=20),
    ]
    print()
    # 开始训练
    history = model.fit(train_dataset, epochs=epochs, validation_data=valid_dataset, callbacks=callback)  # , shuffle=False, workers=1
    print("训练完毕")
    # 评估
    model = keras.models.load_model(model_check_point_dir)
    score = model.evaluate(test_dataset)
    print(f"最优模型评估分数：{score}")
    # 模型保存
    model.save_weights(
        os.path.join(dir_path, str(cnt), f"model_weights_{time.strftime('%Y_%m_%d')}_seed_{seed}.h5"))
    with open(os.path.join("测试结果.txt"), 'a') as fp:
        fp.write(f"{model_sign}_{data_sign}_seed_{seed}" + str(score) + "\n")
    return history.history, score


In [None]:

history, score = train(model, "expert_5_lbp", "static_strong_fer2013")
