In [1]:
%matplotlib inline
import matplotlib.image as mp_image
import matplotlib.pyplot as plt
import subprocess
import numpy as np
import pandas as pd
import tensorflow as tf
import cv2
from functools import partial

In [2]:
sess = tf.Session()

In [3]:
def extract_filename(full_name, filename_extension):
    return full_name.split(sep="."+filename_extension)[0].split(sep='/')[-1]

extract_png_filename = partial(extract_filename, filename_extension='png')

## 讀取訓練資料到陣列

In [4]:
# ground truth
df_UNBC_pain_score = pd.read_csv('./UNBC_pain_score_table.csv') # 讀取疼痛分數資料
df_UNBC_pain_score.sort_values(by=['PSPI'], ascending=False, inplace=True) # 高分排前面
df_UNBC_pain_score.reset_index(drop=True, inplace=True) # 更新 index

In [5]:
# feed training data

# 建構檔案路徑
# croped_face_folder = '../datalake/crop_face_from_UNBC_pain/'
file_ext = '.png'
# 讀取目錄底下所有檔案絕對路徑
paths_croped_face_img = subprocess.getoutput('find /home/lky/crop_face_from_UNBC_pain_GCP/ -type f -name "*.png"').split('\n')
print("count = {}".format(+len(paths_croped_face_img)))
# paths_croped_face_img

count = 46376


In [6]:
# 只使用有被偵測到臉的
df_croped_face_img = pd.DataFrame(
                        data = {'full_name':paths_croped_face_img, 
                                'file_name':[i for i in map(extract_png_filename, paths_croped_face_img)]})

In [7]:
# 合併檔案路徑表和分數表，之後只用這張表，清空不必要變數
df_croped_face_img = df_croped_face_img.merge(right=df_UNBC_pain_score, on='file_name', how='inner')
paths_croped_face_img = []
df_UNBC_pain_score = []
print(df_croped_face_img.index.size)

46376


In [8]:
# 把臉部 ROI 轉化成可以直接輸入 nn 的 vector
def rect_img_to_input_vector(raw_img):
    import skimage
    gray_img = cv2.cvtColor(raw_img, cv2.COLOR_BGR2GRAY) #轉灰
#     gray_img = cv2.equalizeHist(gray_img) #亮度正規化
    norm_size_img = cv2.resize(gray_img,(128, 128)) #統一尺寸
    max_pooling_img = skimage.measure.block_reduce(norm_size_img, (2,2), np.max) #最大池化
    reshape_img = np.reshape(max_pooling_img, [-1]) #拉平
    return reshape_img

In [9]:
# 讀取影像函式
def read_dataset(list_data_path):
    import skimage.measure
    dataset = []
    read_fail_set = []
    print("read img {:5d}, {}".format(len(dataset), subprocess.getoutput('date')))
    #-------------------------------------------------------------------
    for img_full_name in list_data_path:
        try:
            raw_img = cv2.imread(img_full_name)
        except:
            read_fail_set.append(img_full_name)
            print(img_full_name, len(read_fail_set))

        dataset.append(rect_img_to_input_vector(raw_img)) #push in list

        if(0 == len(dataset)%5000):
            print("read img {:5d}, {}".format(len(dataset), subprocess.getoutput('date')))
    #-------------------------------------------------------------------
    print("read img {:5d}, {}".format(len(dataset), subprocess.getoutput('date')))
    return dataset

## 分割 [ 訓練集 , 測試集 ]

In [10]:
# shuffle
df_croped_face_img = df_croped_face_img.reindex(index = np.random.permutation(df_croped_face_img.index))

In [11]:
# 只以 PSPI 排序
df_croped_face_img.sort_values(by=['PSPI'], ascending=False, inplace=True)

In [12]:
# 重新給 index，不留原本的 index
df_croped_face_img.reset_index(inplace=True, drop=True)

In [13]:
# 雙數抽樣(訓練集)
df_croped_face_img_training = df_croped_face_img.iloc[[i for i in df_croped_face_img.index if i&1==0 ],:]

# 單數抽樣(測試集)
df_croped_face_img_validation = df_croped_face_img.iloc[[i for i in df_croped_face_img.index if i&1==1 ],:]

## 讀取資料集影像內容

In [14]:
training_img_dataset = read_dataset(df_croped_face_img_training['full_name'])
validation_img_dataset = read_dataset(df_croped_face_img_validation['full_name'])

read img     0, 五  9月  8 16:14:08 CST 2017
read img  5000, 五  9月  8 16:14:22 CST 2017
read img 10000, 五  9月  8 16:14:40 CST 2017
read img 15000, 五  9月  8 16:14:57 CST 2017
read img 20000, 五  9月  8 16:15:14 CST 2017
read img 23188, 五  9月  8 16:15:23 CST 2017
read img     0, 五  9月  8 16:15:23 CST 2017
read img  5000, 五  9月  8 16:15:38 CST 2017
read img 10000, 五  9月  8 16:15:52 CST 2017
read img 15000, 五  9月  8 16:16:10 CST 2017
read img 20000, 五  9月  8 16:16:27 CST 2017
read img 23188, 五  9月  8 16:16:36 CST 2017


In [15]:
# 對資料集正規化
## 正規化的單位並不是一張影像，而是一個位置，所以是對每一張影像的同一個位置的像素正規化
from sklearn import preprocessing
normalizer = preprocessing.MinMaxScaler(feature_range=(-0.9, 0.9))
normalizer.fit( training_img_dataset + validation_img_dataset ) # 訓練驗證一起 fit 正規化模型
training_img_dataset = normalizer.transform(training_img_dataset) # 訓練集轉換
validation_img_dataset = normalizer.transform(validation_img_dataset) # 驗證集轉換
print("{}".format(subprocess.getoutput('date')))

五  9月  8 16:16:43 CST 2017


## Neural Network construct

In [17]:
# 建構神經網路

# 單層建構函式
def make_layer(inputs, in_size, out_size, activation_function=None):
    Weights = tf.Variable(tf.random_normal([in_size, out_size]))
    biases = tf.Variable(tf.zeros([1, out_size]) + 0.1)
    Wx_plus_b = tf.add(tf.matmul(inputs, Weights), biases)
    if activation_function is None:
        outputs = Wx_plus_b
    else:
        outputs = activation_function(Wx_plus_b)
    return outputs

in_size = 64*64
out_size = 1 

# 定義輸出入變數
xs = tf.placeholder(tf.float32, [None, in_size])
ys = tf.placeholder(tf.float32, [None, out_size])

# 定義多層流程
layer_hidden_1 = make_layer(xs, in_size, 64, activation_function = tf.nn.tanh)
layer_output   = make_layer(layer_hidden_1, 64, out_size) 

In [18]:
# 定義損失函數
loss = tf.reduce_mean(tf.reduce_sum(tf.square(ys - layer_output)))

In [19]:
# 定義學習方法
learning_rate = 0.003
momentum = 0.1
# train_step = tf.train.MomentumOptimizer(learning_rate, momentum).minimize(loss)
train_step = tf.train.MomentumOptimizer(learning_rate, momentum).minimize(loss)

In [20]:
# 變數初始化
init = tf.global_variables_initializer()
sess = tf.Session()
sess.run(init)

# Training

In [23]:
# 模型訓練前的整體誤差
cost = sess.run(loss, feed_dict = {
                                    xs: training_img_dataset,
                                    ys: [[i] for i in df_croped_face_img_training['PSPI']]})
print("epoch = None, cost = {}, {}".format(cost/len(training_img_dataset) , subprocess.getoutput('date')))

# 訓練
for epoch in range(20):
    train_squence = [i for i in range(len(training_img_dataset))] #產生線性的資料流水號序列
    np.random.shuffle(train_squence)
    
    # 這一輪訓練
    for i in train_squence:
        _ = sess.run([train_step], feed_dict = {
                                            xs: [training_img_dataset[i]],
                                            ys: [[df_croped_face_img_training['PSPI'].iloc[i]]]})
    
    # 這一輪訓練後的整體誤差
    cost = sess.run(loss, feed_dict = {
                                    xs: training_img_dataset,
                                    ys: [[i] for i in df_croped_face_img_training['PSPI']]})
    print("epoch = {}, cost = {}, {}".format(epoch, cost/len(training_img_dataset) , subprocess.getoutput('date')))

epoch = None, cost = 91.07775573572538, 五  9月  8 16:18:03 CST 2017
epoch = 0, cost = 1.492387630455408, 五  9月  8 16:18:34 CST 2017
epoch = 1, cost = 1.1962593293136536, 五  9月  8 16:19:02 CST 2017
epoch = 2, cost = 1.1783268487471967, 五  9月  8 16:19:37 CST 2017
epoch = 3, cost = 1.0893211871442126, 五  9月  8 16:20:14 CST 2017
epoch = 4, cost = 1.215447679025789, 五  9月  8 16:20:47 CST 2017
epoch = 5, cost = 0.9060680632223564, 五  9月  8 16:21:23 CST 2017
epoch = 6, cost = 0.9370267959181473, 五  9月  8 16:21:51 CST 2017
epoch = 7, cost = 1.0395725968173193, 五  9月  8 16:22:19 CST 2017
epoch = 8, cost = 0.8534354380498533, 五  9月  8 16:22:50 CST 2017
epoch = 9, cost = 0.9237418734905986, 五  9月  8 16:23:22 CST 2017
epoch = 10, cost = 1.3375233822451267, 五  9月  8 16:23:51 CST 2017
epoch = 11, cost = 0.8334725935828877, 五  9月  8 16:24:23 CST 2017
epoch = 12, cost = 0.7065374938006728, 五  9月  8 16:24:54 CST 2017
epoch = 13, cost = 0.6889851852925004, 五  9月  8 16:25:30 CST 2017
epoch = 14, cost = 0.

# Validation

In [24]:
# 計算實驗結果
from sklearn.metrics import mean_squared_error
predict_result = sess.run(layer_output, feed_dict={xs:validation_img_dataset})[:,0]
ground_truth = df_croped_face_img_validation['PSPI'].values
print("PCCs = {}".format(np.corrcoef(x=ground_truth, y=predict_result)).replace('\n','\n'+' '*7))
print("MSE  = {}".format(mean_squared_error(ground_truth, predict_result)))

PCCs = [[ 1.          0.75508123]
        [ 0.75508123  1.        ]]
MSE  = 0.7373299312125363


In [25]:
# 模型存檔
saver = tf.train.Saver()
saver_path = saver.save(sess, "trained_model/train_net_by_pain_face.ckpt")
print(saver_path)

trained_model/train_net_by_pain_face.ckpt


# 即時推論檢驗

In [None]:
%matplotlib inline

import cv2
import matplotlib.pyplot as plt
from IPython import display

In [None]:
# len(training_dataset[0])

In [37]:
normalizer = preprocessing.StandardScaler(
#     with_mean=True, with_std=True
)
normalizer.fit_transform([[1,2,3],[4,5,6],[7,8,9]])

array([[-1.22474487, -1.22474487, -1.22474487],
       [ 0.        ,  0.        ,  0.        ],
       [ 1.22474487,  1.22474487,  1.22474487]])

In [None]:
vc = cv2.VideoCapture(0)

if vc.isOpened(): # try to get the first frame
    is_capturing, frame = vc.read()
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)    # makes the blues image look real colored
    webcam_preview = plt.imshow(frame)    
else:
    is_capturing = False

# 人臉辨識分類器初始化
face_cascade = cv2.CascadeClassifier('../haarcascades/haarcascade_frontalface_alt2.xml')
            
while is_capturing:
    try:    # Lookout for a keyboardInterrupt to stop the script
        is_capturing, frame = vc.read()
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)    # makes the blues image look real colored
        
        #----------------------
        _faces = face_cascade.detectMultiScale(frame, 1.1, 3, minSize = (96, 96))
        # 如果有偵測到臉
        if(() != _faces):
            x,y,w,h = _faces[0]
            pspi = sess.run(
                    layer_output,
                    feed_dict={xs:rect_img_to_input_vector(frame[y:y+h, x:x+w])})
            print(pspi)
            cv2.rectangle(frame,(x,y), (x+w,y+h), (0xFF,0,0), thickness =2)
            cv2.putText(
                img = frame,
                text = "PSPI={}".format(pspi),
                org = (20,20),
                fontFace = cv2.FONT_HERSHEY_SIMPLEX,
                fontScale = 2,
                color = (255 ,0 ,0))
        #----------------------
        
        webcam_preview.set_data(frame)
        plt.draw()

        display.clear_output(wait=True)
        display.display(plt.gcf())

        plt.pause(1/30)    # the pause time is = 1 / framerate
    except KeyboardInterrupt:
        vc.release()