In [None]:
# 引入所需要的机器学习库
import numpy as np
import cv2
import os
import pandas as pd
import sklearn
import sys
import time
import tensorflow as tf
from tensorflow import keras
from ast import literal_eval
from keras.preprocessing.image import img_to_array, load_img

In [None]:
# 声明实验所需要的各项目录
root_path = "H:\视频数据"
video_dir = "video"
picture_dir = "picture"
project_info = "original_info.csv"
project_process_info = "project_process_info.csv"
model_path = "model.h5"

In [None]:
# 文件夹不存在创建文件夹
def mkdir(path):
    folder = os.path.exists(path)
    if not folder:
        os.makedirs(path)
        print("Make new director.")
    else:
        print("The director exists.")

In [None]:
#视频转化为图片序列，每256帧裁图一张，最终将结果变量记录保存；后续也可以按照视频时长按比例截图
video_path = root_path + "/" + video_dir
seq_picture_path = root_path + "/" + picture_dir
df = pd.read_csv(root_path + "/" + project_info, sep=',')
result_df = []
for index in df.index:
    print("Load " + str(index) + "th video.")
    video_name = df.iloc[index]["name"].split(".")[0]
    video_format = "mp4"
    video_seq_path = seq_picture_path + "/" + video_name + ".mp4"
    mkdir(video_seq_path)
    video_label = df.iloc[index]["score"]
    video_seq_list = []
    cap = cv2.VideoCapture(video_path + "/" + video_name + "." + video_format)
    print(video_path + "/" + video_name + "." + video_format)
    frame_num = 0
    while(cap.isOpened()):
        ret, frame = cap.read()
        frame_num += 1
        if ret == True:
            if frame_num % 256 == 0:
                print("Watch the " + str(frame_num/256) + " video")
                fram_path = video_seq_path + "/" + str(frame_num/256) + ".jpg"
                video_seq_list.append(fram_path)
                cv2.imencode(".jpg", frame)[1].tofile(fram_path)
            if cv2.waitKey(1)&0xFF == ord('q'):
                break
        else:
            break
    cap.release()
    cv2.destroyAllWindows()
    result_df.append((video_name, video_format, video_seq_list, video_label))
result_pdf = pd.DataFrame(result_df, columns = ["video_name", "video_format", "video_seq_list", "video_label"])
result_pdf.to_csv(root_path + "/" + project_process_info, index = False)

In [None]:
# 生成训练模型所需的配置文件
result_pdf = pd.DataFrame(result_df, columns = ["video_name", "video_format", "video_seq_list", "video_label"])
result_pdf.to_csv(root_path + "/" + project_process_info, index = False)

In [None]:
# 读取生成的配置文件
seq_length = 64
result_pdf = pd.read_csv(root_path + "/" + project_process_info)
result_pdf['video_seq_list'] = result_pdf['video_seq_list'].apply(literal_eval)
result_pdf

In [None]:
# 图像预处理
def process_image(image, target_shape):
    h, w, _  = target_shape
    image = load_img(image, target_size=(h, w))
    image_arr = img_to_array(image)
    x = (image_arr/255.).astype(np.float32)
    return x

In [None]:
# 数据集乱序
result_pdf = sklearn.utils.shuffle(result_pdf)
result_pdf

In [None]:
# 划分训练集和测试集
train_result_pdf = result_pdf.iloc[:-50]
test_result_pdf = result_pdf.iloc[-50:]

In [None]:
# 训练数据转化为tensor
video_seq_list = train_result_pdf["video_seq_list"].values

video_seq_array = list()
for samples in video_seq_list:
    sample_arrays = [process_image(x, (128,128,3)) for x in samples]
    sample_arrays = np.array(sample_arrays)
    if seq_length > len(sample_arrays):
        try:
            sample_arrays = np.concatenate([sample_arrays, np.zeros([seq_length-len(sample_arrays),128,128,3])], axis = 0)
        except Exception:
            print("Error ")
            continue
    else:
        sample_arrays = sample_arrays[:seq_length,:,:,:]
    print(sample_arrays.shape)
    video_seq_array.append(sample_arrays)
video_seq_array = np.array(video_seq_array)
print(video_seq_array.shape)

In [None]:
# 训练集label处理，当前视频分为四类
labels = train_result_pdf["video_label"].values
one_hot_labels = np.zeros([labels.shape[0], 4])
label_dict = {'A':0,'B':1,'C':2,'D':3}
for index in range(len(labels)):
    one_hot_labels[index][label_dict[labels[index]]] = 1
one_hot_labels.shape
one_hot_labels

In [None]:
# 定义模型
seq_length = 64
image_size = 128
channel = 3

inputs = keras.Input(shape= (seq_length, image_size, image_size, channel))
resnet50 = keras.applications.mobilenet_v2.MobileNetV2(input_shape=(128, 128, 3), weights='imagenet')

# resnet50 = keras.applications.ResNet50(include_top=False, pooling = 'max', weights='imagenet')
for layers in resnet50.layers[0:-2]:
    layers.trainable = False

cnn = keras.Model(inputs=resnet50.input, outputs=resnet50.output)
encoded_frames = keras.layers.TimeDistributed(cnn)(inputs)
encoded_sequence = keras.layers.LSTM(32)(encoded_frames)

hidden_layers = keras.layers.Dense(64, activation ="relu")(encoded_sequence)
outputs = keras.layers.Dense(4,activation="softmax")(hidden_layers)
model = keras.Model([inputs], outputs)
model.compile(loss = "categorical_crossentropy", optimizer="adam", metrics= ["accuracy"])
model.summary()

In [None]:
# 训练模型
hist = model.fit([video_seq_array],one_hot_labels, validation_split = 0.2, batch_size=10, verbose=1, epochs=30, shuffle=True)
hist.history

In [None]:
# 保存模型
model.save(model_path)

In [None]:
# 测试数据转化为tensor
video_seq_list = test_result_pdf["video_seq_list"].values

video_seq_array = list()
for samples in video_seq_list:
    sample_arrays = [process_image(x, (128,128,3)) for x in samples]
    sample_arrays = np.array(sample_arrays)
    if seq_length > len(sample_arrays):
        try:
            sample_arrays = np.concatenate([sample_arrays, np.zeros([seq_length-len(sample_arrays),128,128,3])], axis = 0)
        except Exception:
            print("Error ")
            continue
    else:
        sample_arrays = sample_arrays[:seq_length,:,:,:]
    print(sample_arrays.shape)
    video_seq_array.append(sample_arrays)
video_seq_array = np.array(video_seq_array)
print(video_seq_array.shape)

In [None]:
# 测试集label处理
labels = test_result_pdf["video_label"].values
one_hot_labels = np.zeros([labels.shape[0], 4])
label_dict = {'A':0,'B':1,'C':2,'D':3}
for index in range(len(labels)):
    one_hot_labels[index][label_dict[labels[index]]] = 1
one_hot_labels.shape
one_hot_labels

In [None]:
# 模型评价
result = model.evaluate([video_seq_array],one_hot_labels, batch_size=10, verbose=1)
result

In [None]:
# 模型预测
result = model.predict([video_seq_array], batch_size=10, verbose=1)
result_label = np.argmax(result, axis=1)
result_label

In [None]:
# 计算平均得分误差
label_dict = {'A':0,'B':1,'C':2,'D':3}
convert_label = np.array([label_dict[item] for item in labels])
np.mean(np.abs(convert_label - result_label))