In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import tensorflow as tf
import numpy as np
import math
import os
import json

In [None]:
# @markdown #Load *UNBC-McMaster shoulder pain* Dataset

!mkdir -p '/content/dataset/UNBCMcMaster/'
!cp '/content/drive/MyDrive/UNBC-McMaster/Images.zip' './dataset/UNBCMcMaster/'
!cp '/content/drive/MyDrive/UNBC-McMaster/AAM_landmarks.zip' './dataset/UNBCMcMaster/'
!cp '/content/drive/MyDrive/UNBC-McMaster/Frame_Labels.zip' './dataset/UNBCMcMaster/'
!cp '/content/drive/MyDrive/UNBC-McMaster/Sequence_Labels.zip' './dataset/UNBCMcMaster/'

!unzip -q './dataset/UNBCMcMaster/Images.zip' -d './dataset/UNBCMcMaster/'
!unzip -q './dataset/UNBCMcMaster/AAM_landmarks.zip' -d './dataset/UNBCMcMaster/'
!unzip -q './dataset/UNBCMcMaster/Frame_Labels.zip' -d '/content/dataset/UNBCMcMaster/'
!unzip -q './dataset/UNBCMcMaster/Sequence_Labels.zip' -d '/content/dataset/UNBCMcMaster/'

# Data Loader & Model

In [None]:
class UNBCMcMaster(tf.keras.utils.Sequence):
    def __init__(self, x_dir, y_dir, keys, test_subj_id, subset, label='vas'):
        """ UNBC McMaster Shoulder Pain Dataset
        Args:
            x_dir (str): path to the landmark data
            y_dir (str): path to the label (pain level)
            keys ([str]): list of subject ids. ex) ['042']
            test_subj_id ([str]): list of subject ids. ex) ['042']
            subset (str): train / val / test
        """
        assert subset in ['train', 'test']
        assert label in ['vas', 'pspi']

        self.LMpath = x_dir
        self.VASpath = os.path.join(y_dir, 'Sequence_Labels', 'VAS')
        self.PSPIpath = os.path.join(y_dir, 'Frame_Labels', 'PSPI')
        self.landmark_files = [(root, name) for root, dirs, files in os.walk(self.LMpath) for
                               name in sorted(files) if name[-3:] == 'txt' and
                               ((name[2:5] in test_subj_id and subset=='test') or
                                (not(name[2:5] in test_subj_id) and name[2:5] in keys and subset=='train'))]
        self.label = label

    def __len__(self):
        return len(self.landmark_files)

    def __getitem__(self, idx):
        """return sample
            landmark: a list of 1434(=478*3) values -> 132 for aam
            vas_score: float [0.0, 1.0, ... , 10.0]
            pspi_score: float [0.0, 1.0, ... , 15.0]

            subj_id: ex) '106-nm106'
            video_id: ex) 'nm106t1aeunaff'
            landmark_id: ex) 'nm106t1aeunaff001.json' -> txt for aam
        """
        lm_dir = self.landmark_files[idx][0]
        lm_name = self.landmark_files[idx][1]

        landmark = []
        with open(os.path.join(lm_dir, lm_name), 'r') as lm_txt:
            lm_data = lm_txt.readlines()
        for line in lm_data:
            line = line.strip()
            line = line.split()
            landmark.extend([float(i) for i in line])

        landmark_id = lm_name
        video_dir = os.path.split(lm_dir)
        video_id = video_dir[1]
        subj_dir = os.path.split(video_dir[0])
        subj_id = subj_dir[1]

        name = os.path.join(self.VASpath, subj_id, video_id)
        with open(name + '.txt', 'r') as vas_txt:
            vas_str = vas_txt.read()
        vas_score = float(vas_str[:vas_str.find('e')]) * (10 ** int(vas_str[vas_str.find('+')+1:]))

        name = os.path.join(self.PSPIpath, subj_id, video_id, lm_name[:-8]+'_facs')
        with open(name + '.txt', 'r') as pspi_txt:
            pspi_str = pspi_txt.read()
        pspi_score = float(pspi_str[:pspi_str.find('e')]) * (10 ** int(pspi_str[pspi_str.find('+')+1:]))
  
        if self.label == 'vas':
            return np.array([landmark]), np.array([vas_score])
        else:
            return np.array([landmark]), np.array([pspi_score])

In [None]:
class DeepFaceLIFT(tf.keras.Model):
    def __init__(self):
        super(DeepFaceLIFT, self).__init__()
        self.dense_1 = tf.keras.layers.Dense(units=300, activation='relu')
        self.dense_2 = tf.keras.layers.Dense(units=100, activation='relu')
        self.dense_3 = tf.keras.layers.Dense(units=10, activation='relu')
        self.dense_4 = tf.keras.layers.Dense(units=100, activation='relu')
        self.linear = tf.keras.layers.Dense(units=1, activation='linear')

    def call(self, inputs):
        out = self.dense_1(inputs)
        out = self.dense_2(out)
        out = self.dense_3(out)
        out = self.dense_4(out)
        out = self.linear(out)
        return out

    def summary(self, print_fn=None):
        x = tf.keras.Input(shape=(132))
        model = tf.keras.Model(x, self.call(x))
        return model.summary(print_fn=print_fn)

# settings

In [None]:
strategy = tf.distribute.get_strategy()
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
AUTOTUNE = tf.data.experimental.AUTOTUNE

BATCH_SIZE = 8 #@param{type:'number'}
print('BATCH_SIZE:', BATCH_SIZE)

EPOCHS = 100  #@param {type: 'number'}
LEARNING_RATE = 1e-5  #@param {type: 'number'}

KFOLDS = 5  #@param {type:'number'}
SHUFFLE = True  #@param {type:'boolean'}
RANDOM_STATE =   100#@param {type:'raw'}

X_DIR = '/content/dataset/UNBCMcMaster/AAM_landmarks'  #@param {type: 'string'}
Y_DIR = '/content/dataset/UNBCMcMaster/'  #@param {type: 'string'}

TEST_SUBJECTS = '042, 123' #@param {type: 'string'}
test_subjects = TEST_SUBJECTS.split(', ')
print('test_subjects:', test_subjects)

# Callbacks

In [None]:
from moviepy.editor import *
import cv2
from matplotlib import animation
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
from tempfile import NamedTemporaryFile

def moviefy_results(test_frames_path, test_true, test_pred, label, out_path):
    """
    Args:
        test_frames_path: path to test frames
        test_true: a list of ground truth labels for each frame (VAS or PSPI)
        test_pred: a list of predicted value
        label: 'vas' or 'pspi'
    Return:
        result video .mp4 
    """
    assert label in ['vas', 'pspi']

    frames = len(test_pred)

    if label == 'vas':
        max_score, ylabel = 15, 'VAS'
        test_true = test_true * frames
    else:
        max_score, ylabel = 20, 'PSPI'
    

    test_frames = [os.path.join(test_frames_path, f) for f in os.listdir(test_frames_path) if 
                   f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    test_frames = sorted(test_frames)

    patient_clip = ImageSequenceClip(test_frames, fps=30)

    fig, ax = plt.subplots()
    ax.set_ylim((0, max_score))
    ax.set_xlabel('Frames')
    ax.set_ylabel(ylabel)

    x = np.arange(frames)
    y_true = test_true
    y_pred = test_pred

    plt.plot(x, y_true, label='Ground Truth')
    plt.xlabel('Frames')
    plt.ylabel(ylabel)

    line_1, = ax.plot(x, y_pred, label='Prediction')

    def update(i, x, y_pred, line_1):
        line_1.set_data(x[:i], y_pred[:i])
        line_1.axes.axis([0, frames, 0, 15])
        line_1.set_label(ylabel)

        plt.legend(loc='upper right')
        ax.set_title(f'{ylabel}: {y_pred[i]:.2f}')

        return line_1,

    plot_ani = FuncAnimation(fig, update,
                        fargs=[x, y_pred, line_1],
                        blit=False, 
                        frames=np.arange(frames),
                        interval=33,
                        save_count=frames)
    
    with NamedTemporaryFile(suffix='.mp4') as f1, NamedTemporaryFile(suffix='.mp4') as f2:
        patient_clip.write_videofile(f1.name)
        plot_ani.save(f2.name, writer=animation.FFMpegWriter(fps=30))

        clip1 = VideoFileClip(f1.name).resize(height=240)
        clip2 = VideoFileClip(f2.name).resize(height=240)
        clips = [[clip1, clip2]]
        clip_out = clips_array(clips)

    plt.close()

    clip_out.write_videofile(out_path)

In [None]:
checkpoint_path = "training/cp-{epoch:04d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path, 
    verbose=1, 
    save_weights_only=True,
    save_freq='epoch')

# Train

In [None]:
from sklearn.model_selection import KFold

tf.keras.backend.clear_session()

# subject folder names
subjects = []
for d in next(os.walk(X_DIR))[1]:
    subjects.append(d[:3])
subjects = sorted(subjects)

for ts in test_subjects:
    subjects.remove(ts)

print('train/val subjects cnt:', len(subjects), '\ntest_subject:', test_subjects)


kf = KFold(n_splits=KFOLDS, shuffle=SHUFFLE, random_state=RANDOM_STATE)
subjects = np.array(subjects)

for fold, (train_idx, valid_idx) in enumerate(kf.split(subjects)):
    print(f'\n++++++++++ Fold {fold+1}/{KFOLDS} ++++++++++')

    with strategy.scope():
        model = DeepFaceLIFT()
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
                      loss='mean_squared_error',
                      metrics='mean_absolute_error')
    
    history = model.fit(UNBCMcMaster(x_dir=X_DIR,
                                     y_dir=Y_DIR,
                                     keys=subjects[train_idx],
                                     test_subj_id=test_subjects,
                                     subset='train',
                                     label='pspi'),
                        epochs=EPOCHS,
                        batch_size=BATCH_SIZE,
                        validation_data=UNBCMcMaster(x_dir=X_DIR,
                                                     y_dir=Y_DIR,
                                                     keys=subjects[valid_idx],
                                                     test_subj_id=test_subjects,
                                                     subset='train',
                                                     label='pspi'),
                        callbacks=[
                                #    RemoteLogger(),
                                   cp_callback,
                                #    RemoteSaveCheckpoint(),
                                   ],
                        )


In [None]:
!cp /content/training -r /content/drive/MyDrive/smctmp-ckpt

In [None]:
## AAM
sample_paths = [
                '/content/dataset/UNBCMcMaster/Images/043-jh043/jh043t2aaaff',
                '/content/dataset/UNBCMcMaster/Images/047-jl047/jl047t1aaaff',
                '/content/dataset/UNBCMcMaster/Images/092-ch092/ch092t1aiaff',
                ]

sample_json_paths = ['/content/dataset/UNBCMcMaster/AAM_landmarks/043-jh043/jh043t2aaaff',
                    '/content/dataset/UNBCMcMaster/AAM_landmarks/047-jl047/jl047t1aaaff',
                    '/content/dataset/UNBCMcMaster/AAM_landmarks/092-ch092/ch092t1aiaff',
                     ]

# # VAS
# sample_label_paths = ['/content/dataset/UNBCMcMaster/Sequence_Labels/VAS/042-ll042/ll042t1aiaff',
#                       '/content/dataset/UNBCMcMaster/Sequence_Labels/VAS/123-jh123/jh123t1aeaff',
#                       ]

# sample_true = []
# for sample_label in sample_label_paths:
#     with open(sample_label + '.txt', 'r') as vas_txt:
#         vas_str = vas_txt.read()
#     vas_score = float(vas_str[:vas_str.find('e')]) * (10 ** int(vas_str[vas_str.find('+')+1:]))
#     sample_true.append([vas_score])

# PSPI
sample_label_paths = ['/content/dataset/UNBCMcMaster/Frame_Labels/PSPI/043-jh043/jh043t2aaaff',
                      '/content/dataset/UNBCMcMaster/Frame_Labels/PSPI/047-jl047/jl047t1aaaff',
                      '/content/dataset/UNBCMcMaster/Frame_Labels/PSPI/092-ch092/ch092t1aiaff',
                      ]

sample_true = []
for sample_label in sample_label_paths:
    sample_pspi_seq = []
    for sample_pspi in os.listdir(sample_label):
        with open(os.path.join(sample_label, sample_pspi), 'r') as pspi_txt:
            pspi_str = pspi_txt.read()
        pspi_score = float(pspi_str[:pspi_str.find('e')]) * (10 ** int(pspi_str[pspi_str.find('+')+1:]))
        sample_pspi_seq.extend([pspi_score])
    sample_true.append(sample_pspi_seq)

## AAM
sample_landmark = []
for sample_path in sample_json_paths:
    sample_lm_seq = []
    for sample_txt in os.listdir(sample_path):
        sample_lm_frame = []
        with open(os.path.join(sample_path, sample_txt), 'r') as lm_txt:
            lm_data = lm_txt.readlines()
            for line in lm_data:
                line = line.strip()
                line = line.strip()
                line = line.split()
                sample_lm_frame.extend([float(i) for i in line])
        sample_lm_seq.append(sample_lm_frame)
    sample_landmark.append(sample_lm_seq)


    
for i in range(len(sample_paths)):
    test_pred_seq = []
    for j in range(len(sample_landmark[i])):
        test_pred = model.predict([sample_landmark[i][j]])
        test_pred_seq.extend(test_pred[0])

    moviefy_results(sample_paths[i],
                    test_true=sample_true[i],
                    test_pred=test_pred_seq,
                    label='pspi',
                    out_path=f'/content/tmp-{i}.mp4'
                    )