<a href="https://colab.research.google.com/github/pang-lee/Siamese-twins-network-with-3DCNN/blob/main/3dcnn_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import argparse
import os

In [None]:
import matplotlib
matplotlib.use('AGG')
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from keras import Input, Model
from keras.datasets import cifar10
from keras.layers import (Activation, Conv3D, Dense, Dropout, Flatten,
                          MaxPooling3D, concatenate,Lambda,BatchNormalization)
from keras.layers.activation import LeakyReLU
from keras.losses import categorical_crossentropy
from keras.models import Sequential
from keras.optimizers import adam_v2
from keras.utils import np_utils
from keras.utils.vis_utils import plot_model
from sklearn.model_selection import train_test_split

In [None]:
import videoto3d
from tqdm import tqdm
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [None]:
def plot_history(history, result_dir):
    plt.plot(history.history['accuracy'], marker='.')
    plt.plot(history.history['val_accuracy'], marker='.')
    plt.title('model accuracy')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.grid()#生成網格
    plt.legend(['acc', 'val_acc'], loc='lower right')
    plt.savefig(os.path.join(result_dir, 'model_accuracy.png'))
    plt.close()

    plt.plot(history.history['loss'], marker='.')
    plt.plot(history.history['val_loss'], marker='.')
    plt.title('model loss')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.grid()
    plt.legend(['loss', 'val_loss'], loc='upper right')
    plt.savefig(os.path.join(result_dir, 'model_loss.png'))
    plt.close()

In [None]:
def save_history(history, result_dir):
    loss = history.history['loss']
    acc = history.history['accuracy']
    val_loss = history.history['val_loss']
    val_acc = history.history['val_accuracy']
    nb_epoch = len(acc)

    with open(os.path.join(result_dir, 'result.txt'), 'w') as fp:
        fp.write('epoch\tloss\tacc\tval_loss\tval_acc\n')
        for i in range(nb_epoch):
            fp.write('{}\t{}\t{}\t{}\t{}\n'.format(
                i, loss[i], acc[i], val_loss[i], val_acc[i]))

In [None]:
def loaddata(video_dir, vid3d, nclass, result_dir, color=False, skip=True):
    #os.listdir()用於返回指定文件夾中的文件或文件夾名字列表，在這是dataset文件夾
    #裡面存放的是視頻數據
    files = os.listdir(video_dir)
    # X存放的是五維數組，各個維數代表(視頻編號，幀高，幀寬，通道數(RGB)，一個視頻提取的幀數)，
    # 例如(402,32,32,3,16)是總共402個視頻，每個視頻提取16幀，每個幀是32x32x3的圖像
    X = []
    #labels是每個視頻對應的標籤，402個視頻就有402個標籤
    labels = []
    #labellist是標籤的種類，402個視頻，但是只有3個種類，那麼labellist的shape就是3
    labellist = []

    pbar = tqdm(total=len(files))#進度條

    for filename in files:#如果用UCF-101資料夾，files就會有101個不同類別的視頻文件夾，對這101個文件夾掃過一輪
        print(filename)
        pbar.update(1)#更新進度條
        if filename == '.DS_Store':
            continue
        namelist = os.path.join(video_dir, filename)
        files2 = os.listdir(namelist)#files2是一個文件夾中的視頻，一個array
        for  files3 in  files2:#對一個類別的文件夾中所有視頻遍歷
            name = os.path.join(namelist,files3)
            print("dir is ",name)
            label = vid3d.get_UCF_classname(files3)#取得視頻對應的類別名
            if label not in labellist:#將新的類別名放入labellist中
                if len(labellist) >= nclass:
                    continue
                labellist.append(label)#每一個視頻對應的類別都要放入label中
            labels.append(label)
            # 將每個視頻處理後得到的四維數組合併,形成五維的X。
            X.append(vid3d.video3d(name, color=color, skip=skip))

    pbar.close()#關閉進度條
    with open(os.path.join(result_dir, 'classes.txt'), 'w') as fp:
        for i in range(len(labellist)):
            fp.write('{}\n'.format(labellist[i]))

    for num, label in enumerate(labellist):
        for i in range(len(labels)):
            if label == labels[i]:
                labels[i] = num#如果只分三類，那麼labels只有三種0，1，2
    if color:
        return np.array(X).transpose((0, 2, 3, 4, 1)), labels#將X的1軸放在最後，這一維表示每個視頻的15幀
    else:
        return np.array(X).transpose((0, 2, 3, 1)), labels

In [None]:
# 兩個tensor=t1,t2
# 歐幾里得距離 = sqrt(sum(square(t1-t2)))
def euclidean_distance(vects): ##網路上找的，待研究
    """求兩個向量之間的歐幾里得距離。

    Arguments:
        vectos: 包含兩個相同長度的張量的列表。

    Returns:
        包含向量之間歐式距離（作為浮點值）的張量。
    """

    x, y = vects
    sum_square = tf.math.reduce_sum(tf.math.square(x - y), axis=1, keepdims=True)
    return tf.math.sqrt(tf.math.maximum(sum_square, tf.keras.backend.epsilon()))

In [None]:
def Loss(margin=1): #網路上找的，待研究
    """Provides 'constrastive_loss' an enclosing scope with variable 'margin'.

    Arguments:
        margin: Integer, defines the baseline for distance for which pairs
                should be classified as dissimilar. - (default is 1).

    Returns:
        'constrastive_loss' function with data ('margin') attached.
    """

    # Contrastive loss = mean( (1-true_value) * square(prediction) +
    #                         true_value * square( max(margin-prediction, 0) ))
    def contrastive_loss(y_true, y_pred):
        """Calculates the constrastive loss.

        Arguments:
            y_true: List of labels, each label is of type float32.
            y_pred: List of predictions of same length as of y_true,
                    each label is of type float32.

        Returns:
            A tensor containing constrastive loss as floating point value.
        """

        square_pred = tf.math.square(y_pred)
        margin_square = tf.math.square(tf.math.maximum(margin - (y_pred), 0))
        return tf.math.reduce_mean(
            (1 - y_true) * square_pred + (y_true) * margin_square
        )

    return contrastive_loss

In [None]:
def main():
    parser = argparse.ArgumentParser(
        description='simple 3D convolution for action recognition')
    parser.add_argument('--batch', type=int, default=128)
    parser.add_argument('--epoch', type=int, default=100)
    parser.add_argument('--videos', type=str, default='UCF101',
                        help='directory where videos are stored')
    parser.add_argument('--nclass', type=int, default=101)
    parser.add_argument('--output', type=str, required=True)
    parser.add_argument('--color', type=bool, default=False)
    parser.add_argument('--skip', type=bool, default=True)
    parser.add_argument('--depth', type=int, default=10)
    args = parser.parse_args()

    img_rows, img_cols, frames = 32, 32, args.depth
    channel = 3 if args.color else 1
    fname_npz = 'dataset_{}_{}_{}.npz'.format(
        args.nclass, args.depth, args.skip)

    vid3d = videoto3d.Videoto3D(img_rows, img_cols, frames)
    nb_classes = args.nclass
    if os.path.exists(fname_npz):
        loadeddata = np.load(fname_npz)
        X, Y = loadeddata["X"], loadeddata["Y"]
    else:
        x, y = loaddata(args.videos, vid3d, args.nclass,
                        args.output, args.color, args.skip)
        X = x.reshape((x.shape[0], img_rows, img_cols, frames, channel))
        Y = np_utils.to_categorical(y, nb_classes)

        X = X.astype('float32')
        np.savez(fname_npz, X=X, Y=Y)
        print('Saved dataset to dataset.npz.')
    print('X_shape:{}\nY_shape:{}'.format(X.shape, Y.shape))


    # 3DCNN網路架構建立
    input_ = Input(shape=(X.shape[1:]))
    
    model_3dcnn = Conv3D(32, kernel_size=(3, 3, 3), activation='relu', padding='same')(input_)
    model_3dcnn = Conv3D(32, kernel_size=(3, 3, 3), activation='softmax', padding='same')(model_3dcnn)
    model_3dcnn = MaxPooling3D(pool_size=(3, 3, 3), padding='SAME')(model_3dcnn)
    model_3dcnn = Dropout(0.25)(model_3dcnn)
    model_3dcnn = Conv3D(64, kernel_size=(3, 3, 3), activation='relu', padding='same')(model_3dcnn)
    model_3dcnn = Conv3D(64, kernel_size=(3, 3, 3), activation='softmax', padding='same')(model_3dcnn)
    model_3dcnn = MaxPooling3D(pool_size=(3, 3, 3), padding='SAME')(model_3dcnn)
    model_3dcnn = Dropout(0.25)(model_3dcnn)
    model_3dcnn = Flatten()(model_3dcnn)
    model_3dcnn = Dense(512, activation='sigmoid')(model_3dcnn)
    model_3dcnn = Dropout(0.5)(model_3dcnn)
    output_ = Dense(nb_classes, activation='softmax')(model_3dcnn)
   
    cnn3d_model = Model(input_ ,output_)
    plot_model(cnn3d_model, show_shapes=True,
               to_file=os.path.join(args.output, 'model_3dcnn.png'))
    
    #建立孿生網路
    input_1 =  Input(shape=(X.shape[1:]))
    input_2 =  Input(shape=(X.shape[1:]))
    
    tower_1 = cnn3d_model(input_1)
    tower_2 = cnn3d_model(input_2)
    
    merge_layer = Lambda(euclidean_distance)([tower_1, tower_2])
    normal_layer = BatchNormalization()(merge_layer)
    output_layer = Dense(1, activation="sigmoid")(normal_layer)
    
    siamese_model =Model([input_1, input_2], output_layer)

    siamese_model.compile(loss=Loss(margin=1),
                  optimizer='RMSprop', metrics=['accuracy'])
    siamese_model.summary()

    plot_model(siamese_model, show_shapes=True,
               to_file=os.path.join(args.output, 'model_siamese.png'))#畫出模型架構圖

    X_train, X_test, Y_train, Y_test = train_test_split(
        X, Y, test_size=0.2, random_state=43)

    history = siamese_model.fit([X_train,X_train],Y_train, validation_data=([X_test,X_test], Y_test), batch_size=args.batch,
                        epochs=args.epoch, verbose=1, shuffle=True)#verbose=1表示輸出進度條信息， shuffle=True訓練過程打亂輸入樣本順序
    siamese_model.evaluate([X_test,X_test], Y_test, verbose=0)#verbose=0表示不輸出日誌信息
    model_json = siamese_model.to_json()
    if not os.path.isdir(args.output):
        os.makedirs(args.output)
    with open(os.path.join(args.output, 'ucf101_3dcnnmodel.json'), 'w') as json_file:
        json_file.write(model_json)
    siamese_model.save_weights(os.path.join(args.output, 'ucf101_3dcnnmodel.hd5'))

    loss, acc = siamese_model.evaluate([X_test,X_test], Y_test, verbose=0)
    print('Test loss:', loss)
    print('Test accuracy:', acc)
    print(history.history.keys())
    plot_history(history, args.output)
    save_history(history, args.output)

In [None]:
if __name__ == '__main__':
    main()