In [5]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
from torchvision.io import read_image
import torch
import torch.nn as nn
from tqdm.notebook import tqdm
from PIL import Image
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset

from cv2 import putText, rectangle
# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os

seed = 2022
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

[here](https://github.com/usef-kh/fer).  [here]('https://arxiv.org/abs/2105.03588'), written by authors Yousif Khaireddin and Zhuofa Chen.

In [6]:
!git clone https://github.com/usef-kh/fer

import fer

from fer.models import vgg

![network_architecture](https://github.com/usef-kh/fer/raw/master/images/architecture.jpeg)

In [7]:
emotions = ['angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral']

emotions_mapping = dict([(emotions[i], i) for i in range(len(emotions))])

print(emotions_mapping)

In [8]:
# Function to load a dataframe given a filepath (string).  Works for both training and test sets.
def create_dataframe(dataset_filepath):
    images = []
    labels = []
    
    # The images are stored in sub-folders corresponding to their labelled expression, we need to loop
    # through these folders.
    for i in range(len(emotions)):
        paths = []
        for _, _, path in os.walk(dataset_filepath + emotions[i]):
            paths.extend(path)
        label = [emotions[i]] * len(paths)
        images.extend(paths), labels.extend(label)
        
    # Create a dataframe with two columns; the first with the path to the image, 
    # the second with the label for the expression.
    df = pd.DataFrame(list(zip(images, labels)), columns=['image_path', 'emotion'])
    df['image_path'] = df['emotion'] + '/' + df['image_path']
    df['image_path'] = df['image_path'].apply(lambda x: dataset_filepath + x)
    df['encoded_emotion'] = df['emotion'].map(emotions_mapping)
    return df

In [9]:
train_path = '/kaggle/input/fer2013/train/'

# Call the function to create the dataframe
train_df = create_dataframe(train_path)
train_df.head()

In [10]:
# Print the number of faces for each expression
train_df['emotion'].value_counts()

In [11]:
# Plot 5 randomly samples faces from each class.
plt.figure(figsize=(12, 12))
idx = train_df.groupby('emotion').sample(n=5, random_state=seed).index
for i, index in enumerate(idx):
    plt.subplot(7, 5, i+1)
    plt.imshow(np.squeeze(read_image(train_df['image_path'][index])), cmap='gray')
    plt.axis('off')
    plt.title(f'{train_df["emotion"][index]}')

In [12]:
# Look at some more neutral expressions
plt.figure(figsize=(12, 12))
idx = train_df.loc[train_df['emotion'] == 'neutral'].sample(n=25, random_state=seed).index
for i, index in enumerate(idx):
    plt.subplot(5, 5, i+1)
    plt.imshow(np.squeeze(read_image(train_df['image_path'][index])), cmap='gray')
    plt.axis('off')
    plt.title(f'{train_df["emotion"][index]}')

In [13]:
# Declare the model and load the pretrained weights
model = vgg.Vgg()

fpath = '../input/facial-emotion-recognition-vggnet-state/VGGNet'

checkpoint = torch.load(fpath, map_location=device)

model.load_state_dict(checkpoint['params'])

model = model.to(device)#.eval()

model.drop = model.drop.eval()

In [14]:
test_transform = transforms.Compose([
    transforms.TenCrop(40),
    transforms.Lambda(lambda crops: torch.stack([transforms.ToTensor()(crop) for crop in crops])),
    transforms.Lambda(lambda tensors: torch.stack([transforms.Normalize(mean=(mu,), std=(st,))(t) for t in tensors])),
])
mu, st = 0, 255

In [15]:
# Load the first image in the dataset and plot the crops as a visualisation of the models input
img = Image.open(train_df['image_path'][0])
img = test_transform(img)

plt.figure(figsize=(12, 4))
for i in range(10):
    plt.title(f'{train_df["emotion"][0]}')
    plt.subplot(2, 5, i+1)
    plt.imshow(np.squeeze(img[i, :, :, :].numpy()), cmap='gray')
    plt.axis('off')

In [16]:
# Create a dataset class to load batched images
class FER2013Dataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        images = self.images[idx]
        images = Image.open(images)
        
        if self.transform:
            images = self.transform(images)
            
        labels = torch.tensor(self.labels[idx]).type(torch.long)
        
        return images, labels

In [17]:
# Load the test set
test_path = '../input/fer2013/test/'
test_df = create_dataframe(test_path)
test_df.head()

In [18]:
# Create the dataloader to pass batches to the model
test_dataset = FER2013Dataset(test_df['image_path'], test_df['encoded_emotion'], test_transform)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True, num_workers=2)

In [43]:
def evaluate(model, dataloader, criterion):
    full_loss, n_correct = 0, 0
    n_samples = 0

    with torch.no_grad():
        for i, data in tqdm(enumerate(dataloader)):
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            n_samples += labels.shape[0]
            # fuse crops and batchsize
            bs, ncrops, c, h, w = images.shape
            images = images.view(-1, c, h, w)

            # forward
            outputs = model(images)

            # combine results across the crops
            outputs = outputs.view(bs, ncrops, -1)
            outputs = (torch.sum(outputs, dim=1) / ncrops)
            
            loss = criterion(outputs, labels)
            
            outputs = outputs.argmax(axis=1)            
            n_correct += (outputs == labels).sum()

            full_loss += loss.item()


    acc = n_correct / len(test_dataloader.dataset)
    loss = full_loss / len(test_dataloader.dataset)

    print(n_correct)
    print(acc, loss, n_samples)

In [44]:
evaluate(model, test_dataloader, torch.nn.CrossEntropyLoss())

In [45]:
!pip install pytube

from pytube import YouTube

In [46]:
yt = YouTube('https://www.youtube.com/watch?v=luLpdr4n8m4')

In [47]:
from IPython.display import Markdown as md

# Show the title and thumbnail of the video url
print(yt.title)
md(f"![]({yt.thumbnail_url})")

In [48]:
# Print all the streams available for download
for x in yt.streams:
    print(x)

In [49]:
# Declare a filepath and download the stream 
video_clips_path = '/kaggle/working/video_clips/'

stream = yt.streams.filter(file_extension='mp4', progressive=True).get_highest_resolution()

stream.download(video_clips_path, 'test.mp4')

In [50]:
!pip install moviepy
from moviepy.editor import *

clip = VideoFileClip('./video_clips/test.mp4')

In [51]:
from IPython.display import Video

Video('./video_clips/test.mp4')

In [59]:
def flip_frame(gf, t):
    frame = gf(t)
    frame = cv2.flip(frame, 0)
    return frame

from moviepy.tools import find_extension
from moviepy.video.io.ffmpeg_writer import ffmpeg_write_video

def create_videofile(clip, filename):
    audio_fps = 44100
    audio_nbytes = 4
    audio_bufsize = 2000
    audio_codec = 'libmp3lame'
    audio_extension = find_extension(audio_codec)
    audiofile = f'./temp.{audio_extension}'

    clip.audio.write_audiofile(audiofile,
                              audio_fps,
                              audio_nbytes,
                              audio_bufsize,
                              audio_codec
                              )
    ffmpeg_write_video(clip, filename, fps=clip.fps, audiofile=audiofile)
    os.remove(audiofile)

In [60]:
# Print the duration of what should be a 10 second subclip.
print(clip.subclip(10, 20).duration)

In [61]:
def create_subclip(clip, t_start, t_end):
    newclip = clip.fl_time(lambda t: t + t_start, apply_to=['mask', 'audio'])
    newclip.duration = t_end - t_start
    newclip.end = newclip.start + newclip.duration
    newclip.audio.duration = t_end - t_start
    return newclip

In [62]:
# Cut the original video to only the 30 seconds or so the Clinton is talking directly to the camera.
newclip = create_subclip(clip, 60, 92)
create_videofile(newclip, './test.mp4')
Video('./test.mp4')

 [here]('https://github.com/ipazc/mtcnn').

In [63]:
!pip install mtcnn
from mtcnn.mtcnn import MTCNN

img = newclip.get_frame(1)
clf = MTCNN()
bbox, *_ = clf.detect_faces(img)
print(bbox)

In [64]:
x, y, width, height = bbox['box']
x2, y2 = x + width, y + height
face_crop = img[y:y2, x:x2, :]
plt.imshow(face_crop)

In [65]:
def crop_face(gf, t):
    frame = gf(t)
    bbox, *_ = clf.detect_faces(frame)
    x, y, x2, y2 = scale_bbox(bbox, frame.shape[:2], 2)
    frame = frame[y:y2, x:x2, :]
    frame = cv2.resize(frame, (48, 48))
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    return frame

def scale_bbox(bbox, img_dims, scale=1.0):
    ratio = np.sqrt(scale)
    x, y, width, height = bbox['box']
    cx, cy = x + width // 2, y + height // 2
    new_w, new_h = width * np.sqrt(scale), height * np.sqrt(scale)
    x, y = max(0, cx - (new_w // 2)), max(0, cy -  (new_h // 2))
    x2, y2 = min(img_dims[1], x + new_w), min(img_dims[0], y + new_h)
    return int(x), int(y), int(x2), int(y2)

In [66]:
frame = clip.get_frame(0)

In [67]:
import cv2

In [68]:
faceclip = newclip.fl(crop_face)

In [69]:
plt.figure(figsize=(12, 4))
for i in range(5):
    plt.subplot(1, 5, i+1)
    plt.imshow(faceclip.get_frame((faceclip.duration // 5) * i), cmap='gray')
    plt.title(f'Sample {i}')
    plt.axis('off')

In [70]:
preds = np.zeros(shape=(int(faceclip.duration * faceclip.fps), 7))

In [71]:
import PIL
#model.training = True
for i, frame in tqdm(enumerate(faceclip.iter_frames())):
    img = test_transform(Image.fromarray(frame)).to(device)
    with torch.no_grad():
     #   assert model.training == False, 'model is in training mode'
        outputs = model(img)
    outputs = outputs.view(1, 10, -1)
    outputs = (torch.sum(outputs, dim=1) / 10)
    preds[i, :] = outputs.cpu()

In [72]:
preds_df = pd.DataFrame(preds, columns=emotions)
preds_df['emotions'] = preds_df.idxmax(axis=1)
preds_df['time'] = preds_df.index * 1/faceclip.fps
preds_df.head()

In [73]:
preds_df = pd.DataFrame(preds, columns=emotions)
preds_df['emotions'] = preds_df.idxmax(axis=1)
preds_df['time'] = preds_df.index * 1/faceclip.fps
preds_df.head()

In [74]:
preds_df['emotions'].hist()
plt.title('Emotion Frequency')
plt.show()

In [75]:
for emotion in emotions:
    preds_df[emotion] = (preds_df['emotions'] == emotion).rolling(window=25, min_periods=1).sum()

In [76]:
preds_df.head()

In [77]:
colourmap = {
    'angry':'red',
    'disgust':'pink',
    'fear':'purple',
    'happy':'green',
    'sad':'blue',
    'surprise':'yellow',
    'neutral':'white'
}

In [78]:
preds_df[emotions] /= faceclip.fps

In [79]:
preds_df

In [80]:
preds_df = preds_df.set_index('time')
preds_df[emotions].plot(figsize=(16, 8))
plt.title('Proportion of Expression Predictions in the Previous Second')
plt.show()

In [81]:
def annotate_frame(gf, t):
    print(t)
    frame = gf(t)
    return frame

faceclip.fl(annotate_frame, apply_to=['mask'], keep_duration=True)

In [82]:
from matplotlib import colors
for key in colourmap.keys():
    colourmap[key] = tuple(255 * x for x in colors.to_rgb(colourmap[key]))

In [83]:
colourmap

In [84]:
frame = newclip.get_frame(0)
bbox, *_ = clf.detect_faces(frame)
x, y, x2, y2 = scale_bbox(bbox, frame.shape[:2], 2)
emotion = preds_df['emotions'][0]
rectangle(frame, (x, y), (x2, y2), colourmap[emotion], 1)
font = cv2.FONT_HERSHEY_SIMPLEX
(width, height), baseline = cv2.getTextSize(f'{emotion}', font, 0.9, 1)
cv2.putText(frame, f'{emotion}', (x, y+height), font, 0.9, colourmap[emotion], 1, 
            bottomLeftOrigin=False)
plt.imshow(frame)

In [85]:
def scale_bbox(bbox, img_dims, scale=1.0):
    ratio = np.sqrt(scale)
    x, y, width, height = bbox
    cx, cy = x + width // 2, y + height // 2
    new_w, new_h = width * np.sqrt(scale), height * np.sqrt(scale)
    x, y = max(0, cx - (new_w // 2)), max(0, cy -  (new_h // 2))
    x2, y2 = min(img_dims[1], x + new_w), min(img_dims[0], y + new_h)
    return int(x), int(y), int(x2), int(y2)
    
def process_frames(get_frame, t):
    frame = get_frame(t)
    bboxes = clf.detect_faces(frame)
    bboxes = [x['box'] for x in bboxes]

    for box in bboxes:

        x, y, x2, y2 = scale_bbox(box, frame.shape[:2], 2)


        face_crop = frame[y:y2, x:x2, :]
        face_crop = cv2.resize(face_crop, (48, 48))
        face_crop = cv2.cvtColor(face_crop, cv2.COLOR_RGB2GRAY)


        outputs = model(test_transform(Image.fromarray(face_crop)).to(device))
        outputs = outputs.view(1, 10, -1)
        outputs = (torch.sum(outputs, dim=1) / 10)
        emotion = emotions[outputs.argmax().item()]

        frame = rectangle(frame, (x, y), (x2, y2), colourmap[emotion], 1)
        font = cv2.FONT_HERSHEY_SIMPLEX
        (width, height), baseline = cv2.getTextSize(f'{emotion}', font, 0.9, 1)
        frame = putText(frame, f'{emotion}', (x, y+height), font, 0.9, (255, 255, 255), 1, 
                    bottomLeftOrigin=False)
        
    return frame

def annotate_clip(video, save_path=None, t_start=0, t_end=None):

    if validators.url(video):
        try:
            yt = YouTube(video)
        except ValueError:
            print('URL must be a youtube link.')
        stream = yt.streams.filter(file_extension='mp4', progressive=True).get_highest_resolution()
        stream.download('./', 'temp.mp4')
        
    clip = VideoFileClip('./temp.mp4')
    
    if t_end is None:
        t_end = clip.end
    clip = create_subclip(clip, t_start, t_end)
    
    clip = clip.fl(process_frames)

    if save_path is not None:
        create_videofile(clip, save_path)
    os.remove('./temp.mp4')

    return clip

In [86]:
plt.figure(figsize=(16, 10))
frame = newclip.get_frame(32)
plt.subplot(1, 2, 1)
plt.imshow(frame)
plt.subplot(1, 2, 2)
frame = process_frames(newclip.get_frame, 32)
plt.imshow(frame)

In [87]:
!pip install validators
import validators

In [88]:
clip = annotate_clip('https://www.youtube.com/watch?v=luLpdr4n8m4', './video_clips/t.mp4', t_start=60, t_end=92)
Video('./video_clips/t.mp4')

In [90]:
clip = annotate_clip('https://www.youtube.com/watch?v=NdAoQvqh7eY', './video_clips/keating.mp4')
Video('./video_clips/keating.mp4')

In [91]:
clip = annotate_clip('https://www.youtube.com/watch?v=ufhKWfPSQOw', './video_clips/yeonmipark.mp4', t_start=2, t_end=70)
Video('./video_clips/yeonmipark.mp4')

In [None]:
clip = annotate_clip('https://www.youtube.com/watch?v=dRQBtDtZTGA', './video_clips/davidmorrison.mp4', t_start=10, t_end=124)
Video('./video_clips/davidmorrison.mp4')

In [89]:
clip = annotate_clip('https://www.youtube.com/watch?v=eKSgq-hxlaI', './video_clips/dlift.mp4', t_start=706, t_end=760)
Video('./video_clips/dlift.mp4')