In [1]:
# Create training videos
import cv2
import numpy as np
from time import sleep
import glob
import os
import sys
from PIL import Image

NUM_FRAMES = 100
SEQ_LENGTH = 10
TAKES_PER = 3
MOODS = ['RELAXED', 'EXCITED']
HIDDEN_SIZE = 256

In [5]:
# Photo studio
# Here you can record movies to use as training data
# Get ready to act either excited or relaxed!
# Videos are stored in data dir
def capture(num_frames, path='out.avi', countdown=0):
    for i in reversed(range(max(0, countdown))):
        i = 'GO!' if i == 0 else '{}  '.format(i)
        sys.stdout.write("{}   \r".format(i))
        sys.stdout.flush()
        sleep(1)

    # Create a VideoCapture object
    cap = cv2.VideoCapture(0)

    # Check if camera opened successfully
    if (cap.isOpened() == False): 
        print("Unable to read camera feed")

    # Default resolutions of the frame are obtained.The default resolutions are system dependent.
    # We convert the resolutions from float to integer.
    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))

    # Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file.
    out = cv2.VideoWriter(path, cv2.VideoWriter_fourcc('M','J','P','G'), 10, (frame_width,frame_height))

    print('Recording started')
    for i in range(num_frames):

        ret, frame = cap.read()

        if ret == True:     
            # Write the frame into the file 'output.avi'
            out.write(frame)


    # When everything done, release the video capture and video write objects
    cap.release()
    out.release()
    print('Recording stopped')
    
for take in range(TAKES_PER):
    for mood in MOODS:
        path = 'data/{}{}.avi'.format(mood, take)
        print('Get ready to act:', mood)
        capture(NUM_FRAMES, path=path, countdown=3)

Get ready to act: RELAXED
Recording started
Recording stopped
Get ready to act: EXCITED
Recording started
Recording stopped
Get ready to act: RELAXED
Recording started
Recording stopped
Get ready to act: EXCITED
Recording started
Recording stopped
Get ready to act: RELAXED
Recording started
Recording stopped
Get ready to act: EXCITED
Recording started
Recording stopped


In [8]:
# Process videos, create training data
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input
import numpy as np

class VGGFramePreprocessor():
    
    def __init__(self, vgg_model):
        self.vgg_model = vgg_model
    
    def process(self, frame):
        img_data = cv2.resize(frame,(224,224))
        img_data = np.expand_dims(img_data, axis=0)
        img_data = preprocess_input(img_data)
        x = self.vgg_model.predict(img_data).flatten()
        x = np.expand_dims(x, axis=0)
        return x

def get_video_frames(video_path):
    vidcap = cv2.VideoCapture(video_path)
    success, frame = vidcap.read()
    while success:
        yield frame
        success,frame = vidcap.read()
    vidcap.release()
    
frame_preprocessor = VGGFramePreprocessor(VGG16(weights='imagenet', include_top=False))

# Load movies and transform frames to features
movies = []
for mood in MOODS:
    y = np.array([0,1]) if mood == 'EXCITED' else np.array([1,0])
    for video_path in glob.glob('data/{}*.avi'.format(mood)):
        print('preprocessing', video_path)
        X = [frame_preprocessor.process(frame) for frame in get_video_frames(video_path)]
        X = np.concatenate(X)
        movies.append({
            'X': X,
            'y': y
        })



preprocessing data/RELAXED2.avi
preprocessing data/RELAXED1.avi
preprocessing data/RELAXED0.avi
preprocessing data/EXCITED1.avi
preprocessing data/EXCITED0.avi
preprocessing data/EXCITED2.avi


In [9]:
# Display an image
img = Image.fromarray(movies[0]['X'][0].reshape(49,512))
img.show()


In [10]:
# Create windows from movies
X, y = [], []
for movie in movies:
    movie_X = movie['X']
    sequences = []
    for i in range(len(movie_X) - SEQ_LENGTH):
        sequence = movie_X[i: i + SEQ_LENGTH]
        X.append(np.expand_dims(sequence, axis=0))
        y.append(np.expand_dims(movie['y'], axis=0))


X = np.concatenate(X)
y = np.concatenate(y)
print(X.shape)
print(y.shape)
print(X[0])
print(y[0])


(540, 10, 25088)
(540, 2)
[[ 0.         0.         0.        ... 24.366207  15.429358   0.       ]
 [ 0.         0.         0.        ... 33.379276  16.243172   0.       ]
 [ 0.         0.         0.        ... 36.367126  16.402796   0.       ]
 ...
 [ 0.         0.         0.        ... 35.094444  15.89079    0.       ]
 [ 0.         0.         0.        ... 32.163406  15.844413   1.9477578]
 [ 0.         0.         0.        ... 28.701618  14.477264   0.       ]]
[1 0]


In [11]:
from keras.models import Sequential
from keras.layers import Dense, Activation, Dropout, LSTM
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

HIDDEN_SIZE_LSTM = 16
HIDDEN_SIZE_DENSE = 16

model = Sequential()
model.add(LSTM(HIDDEN_SIZE_LSTM, return_sequences=True, input_shape=(SEQ_LENGTH, X.shape[2])))
#model.add(LSTM(HIDDEN_SIZE_LSTM, return_sequences=True))
model.add(LSTM(HIDDEN_SIZE_LSTM))
model.add(Dropout(0.5))
model.add(Dense(2, activation='softmax'))

model.compile(loss='categorical_crossentropy',
              optimizer='rmsprop',
              metrics=['accuracy'])

x_train, x_test, y_train, y_test = train_test_split(X, y, random_state=42)
model.fit(x_train, y_train,
          batch_size=10, epochs=5,
          validation_split=0.1)

Train on 364 samples, validate on 41 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0xb280125f8>

In [12]:
y_true = [np.argmax(y) for y in y_test]
y_pred = [np.argmax(pred) for pred in model.predict(x_test)]
score = f1_score(y_true, y_pred)
print('F1:', score)

F1: 1.0


In [13]:
# Infer on live video
from collections import deque
from math import ceil

test_frames = 200
buffer = deque(maxlen=SEQ_LENGTH)

# Initialize camera
cap = cv2.VideoCapture(0)
# Check if camera opened successfully
if (cap.isOpened() == False): 
    print("Unable to read camera feed")
    test_frames = 0

# Start processing video
for i in range(test_frames):
    ret, frame = cap.read()
    if ret:
        processed = frame_preprocessor.process(frame)
        buffer.append(processed)
    # enough data in buffer?
    if len(buffer) == SEQ_LENGTH:
        # predict
        x_buffer = np.expand_dims(np.concatenate(buffer), axis=0)
        prediction = model.predict(np.array(x_buffer))
        # compute a hash
        buffer_hash = hash(bytes(x_buffer))
        relaxed = prediction[0][0]
        excited = prediction[0][1]
        mood = MOODS[np.argmax(prediction[0])]
        progress = i
        message = 'frame {}  hash: {} . relaxed: {:.04f}%  excited: {:.04f}%  mood: {}   \r'.format(
            progress, buffer_hash, relaxed, excited, mood)
    else:
        # buffer more
        progress = ceil(100*i/SEQ_LENGTH)
        message = 'buffering {}%         \r'.format(progress)
    sys.stdout.write(message)
    sys.stdout.flush()

cap.release()

frame 199  hash: 5555158754670880279 . relaxed: 0.0014%  excited: 0.9986%  mood: EXCITED    