In [1]:
import torch
import torch.nn as nn
from torch.nn import Linear, Conv2d, BatchNorm1d, BatchNorm2d, PReLU, Sequential, Module
import torch.nn.functional as F
import torchvision.transforms as transforms
from PIL import Image
from model import EmotionClassifier
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.image import show_cam_on_image
import numpy as np
import matplotlib.pyplot as plt
import cv2

import imageio
import dlib

In [2]:
class Hook():
    ''' 
    A simple hook class that returns the input and output of a layer during forward and backward pass
    '''
    def __init__(self):
        self.hook_forward = None
        self.hook_backward = None
        self.forward_out = None
        self.backward_out = None

    def hook_fn_forward(self, module, input, output):
        self.forward_out = output

    def hook_fn_backward(self, module, grad_input, grad_output):
        self.backward_out = grad_output[0] 

    def register_hook(self, module):
        self.hook_forward = module.register_forward_hook(self.hook_fn_forward)
        self.hook_backward = module.register_full_backward_hook(self.hook_fn_backward)

    def unregister_hook(self):
        self.hook_forward.remove()
        self.hook_backward.remove()

In [3]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
device = torch.device("cpu")

class_labels = ['happiness', 'surprise', 'sadness', 'anger', 'disgust', 'fear']

model = EmotionClassifier().to(device)
model.load_state_dict(torch.load('best_RAF.pth', map_location=device))
model.eval()

final_layer = model.conv5
hook = Hook()
hook.register_hook(final_layer)

transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def classify_image(image_path):
    image = Image.open(image_path).convert('RGB')
    image_tensor = transform(image).unsqueeze(0).to(device)
    image_array = np.array(image)
    with torch.no_grad():
        outputs = model(image_tensor)
        probabilities = F.softmax(outputs, dim=1)
    scores = probabilities.cpu().numpy().flatten()
    rounded_scores = [round(score, 2) for score in scores]
    return rounded_scores, image, image_array, image_tensor

In [4]:
# # cascade Landmarks, reading images / videos

# # OpenCV Real-Time Face Detection
# ## class_labels = ['happiness', 'surprise', 'sadness', 'anger', 'disgust', 'fear']

# face_classifier = cv2.CascadeClassifier(
#     cv2.data.haarcascades + "haarcascade_frontalface_default.xml")

# # access webcam as numpy.ndarray
# video_capture = cv2.VideoCapture(0)

# # text settings
# font = cv2.FONT_HERSHEY_SIMPLEX
# font_scale = 1
# font_color = (0, 255, 0)  # BGR color
# thickness = 2
# line_type = cv2.LINE_AA

# max_emotion = ''

# def detect_emotion(video_frame):
#     vid_fr_tensor = transform(video_frame).unsqueeze(0).to(device)
#     with torch.no_grad():
#         outputs = model(vid_fr_tensor)
#         probabilities = F.softmax(outputs, dim=1)
#     scores = probabilities.cpu().numpy().flatten()
#     rounded_scores = [round(score, 2) for score in scores]
#     # print(f'rounded_scores in detect_emotion {rounded_scores}')
#     return rounded_scores

# def get_max_emotion(x, y, w, h, video_frame):
#     crop_img = video_frame[y : y + h, x : x + w]
#     pil_crop_img = Image.fromarray(crop_img)
#     # slower cropping
#     rounded_scores = detect_emotion(pil_crop_img)    
#     # get index from max value in rounded_scores
#     max_index = np.argmax(rounded_scores)
#     max_emotion = class_labels[max_index]
#     # print(f'max_emotion: {max_emotion}')

#     return max_emotion

# def print_max_emotion(x, y, video_frame, max_emotion):
#     # position to put the text for the max emotion
#     org = (x, y - 15)
#     cv2.putText(video_frame, max_emotion, org, font, font_scale, font_color, thickness, line_type)
    
# def print_all_emotion(x, y, w, h, video_frame):
#     crop_img = video_frame[y : y + h, x : x + w]
#     pil_crop_img = Image.fromarray(crop_img)
#     # slower cropping
#     rounded_scores = detect_emotion(pil_crop_img)
#     # print(f'rounded_scores in detect_bounding_box: {rounded_scores}')
#     # create text to be displayed
#     org = (x + w + 10, y - 20)
#     for index, value in enumerate(class_labels):
#         emotion_str = (f'{value}: {rounded_scores[index]:.2f}')
#         y = org[1] + 40
#         org = (org[0], y)
#         cv2.putText(video_frame, emotion_str, org, font, font_scale, font_color, thickness, line_type)
    
# # identify Face in Video Stream
# def detect_bounding_box(video_frame, counter):
#     global max_emotion
#     gray_image = cv2.cvtColor(video_frame, cv2.COLOR_BGR2GRAY)
#     faces = face_classifier.detectMultiScale(gray_image, 1.1, 5, minSize=(40, 40))
#     for (x, y, w, h) in faces:
#         # draw bounding box on face
#         cv2.rectangle(video_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
#         # crop bounding box
#         if counter == 0:
#             max_emotion = get_max_emotion(x, y, w, h, video_frame) 
        
#         print_max_emotion(x, y, video_frame, max_emotion) # displays the max_emotion according to evaluation_frequency
#         print_all_emotion(x, y, w, h, video_frame) # evaluates every video_frame for debugging

#     return faces

# counter = 0
# evaluation_frequency = 5

# # Loop for Real-Time Face Detection
# while True:

#     result, video_frame = video_capture.read()  # read frames from the video
#     if result is False:
#         break  # terminate the loop if the frame is not read successfully
    
#     faces = detect_bounding_box(video_frame, counter)  # apply the function we created to the video frame, faces as variable not used
    
#     cv2.imshow("My Face Detection Project", video_frame)  # display the processed frame in a window named "My Face Detection Project"

#     # print(type(video_frame))
#     if cv2.waitKey(1) & 0xFF == ord("q"):
#         break
    
#     counter += 1
#     if counter == evaluation_frequency:
#         counter = 0
        
# video_capture.release()
# cv2.destroyAllWindows()

In [5]:
# LBF Landmarks, saving video

# OpenCV Real-Time Face Detection
## class_labels = ['happiness', 'surprise', 'sadness', 'anger', 'disgust', 'fear']

import dlib
face_classifier = cv2.CascadeClassifier(
    cv2.data.haarcascades + "haarcascade_frontalface_default.xml")

# save facial landmark detection model's name as LBFmodel
LBFmodel = "lbfmodel.yaml"

# create an instance of the Facial landmark Detector with the model
landmark_detector  = cv2.face.createFacemarkLBF()
landmark_detector.loadModel(LBFmodel)
print(f'landmark_detector {landmark_detector}')

# text settings
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
# font_color = (255, 217, 4) # BGR color neon blue
# font_color = (96, 252, 253) # BGR color neon yellow
font_color = (154, 1, 254) # BGR color neon pink 254,1,154
thickness = 2
line_type = cv2.LINE_AA


max_emotion = ''
transparency = 0.4
landmark_threshhold = 65

def lbf_check(gray_image, faces):
    # detect landmarks on "image_gray"
    _, landmarks = landmark_detector.fit(gray_image, faces)
#     # print(type(landmarks))
#     # print(landmarks)
#     # print(landmarks[0])
#     # print(type(landmarks[0]))
#     # print(landmarks[0].shape)
#     # print(f'landmarks in lbf_check {len(landmarks)}')
#     # print(f'landmarks in lbf_check {landmarks}')
#     # print(f'Dimentsions:', landmarks.shape)
#     # for landmark in landmarks:
#     #     for x,y in landmark[0]:
#     #         # display landmarks on "image_cropped"
#     #         # with white colour in BGR and thickness 1
#     #         cv2.circle(video_frame, (x + face[0], y + face[1]), 1, (255, 255, 255), 1)
#     #         # cv2.circle(image_cropped, ((int)x, (int)y), 1, (255, 255, 255), 1)
#     number_of_landmarks = landmarks[0].shape[1]
#     # print(f'number_of_landmarks {number_of_landmarks}')
#     return number_of_landmarks

def detect_emotion(pil_crop_img):
    vid_fr_tensor = transform(pil_crop_img).unsqueeze(0).to(device)
    # with torch.no_grad():
    logits = model(vid_fr_tensor)
    probabilities = F.softmax(logits, dim=1)
    predicted_class = torch.argmax(probabilities, dim=1)

    predicted_class_idx = predicted_class.item()

    one_hot_output = torch.FloatTensor(1, probabilities.shape[1]).zero_()
    one_hot_output[0][predicted_class_idx] = 1
    logits.backward(one_hot_output, retain_graph=True)

    gradients = hook.backward_out
    feature_maps = hook.forward_out

    weights = torch.mean(gradients, dim=[2, 3], keepdim=True)
    cam = torch.sum(weights * feature_maps, dim=1, keepdim=True)
    cam = cam.clamp(min=0).squeeze() 

    cam -= cam.min()
    cam /= cam.max()
    cam = cam.cpu().detach().numpy()

    # scores = probabilities.cpu().numpy().flatten()
    scores = probabilities.cpu().detach().numpy().flatten()
    rounded_scores = [round(score, 2) for score in scores]
    # print(f'rounded_scores in detect_emotion {rounded_scores}')
    return rounded_scores, cam

def plot_heatmap(x, y, w, h, cam, pil_crop_img, video_frame):
    # resize cam to w, h
    cam = cv2.resize(cam, (w, h))
    
    # apply color map to resized cam
    heatmap = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET)
    heatmap = np.float32(heatmap) / 255
    
    # Get the region of interest on the video frame
    roi = video_frame[y:y+h, x:x+w, :]

    # Blend the heatmap with the ROI
    overlay = heatmap * transparency + roi / 255 * (1 - transparency)
    overlay = np.clip(overlay, 0, 1)

    # Replace the ROI with the blended overlay
    video_frame[y:y+h, x:x+w, :] = np.uint8(255 * overlay)
        
def update_max_emotion(rounded_scores):  
    # get index from max value in rounded_scores
    max_index = np.argmax(rounded_scores)
    max_emotion = class_labels[max_index]
    return max_emotion # returns max_emotion as string

def print_max_emotion(x, y, max_emotion, video_frame):
    # position to put the text for the max emotion
    org = (x, y - 15)
    cv2.putText(video_frame, max_emotion, org, font, font_scale, font_color, thickness, line_type)
    
def print_all_emotion(x, y, w, rounded_scores, video_frame):
    # create text to be displayed
    org = (x + w + 10, y - 20)
    for index, value in enumerate(class_labels):
        emotion_str = (f'{value}: {rounded_scores[index]:.2f}')
        y = org[1] + 40
        org = (org[0], y)
        cv2.putText(video_frame, emotion_str, org, font, font_scale, font_color, thickness, line_type)
    
# identify Face in Video Stream
def detect_bounding_box(video_frame, counter):
    global max_emotion
    gray_image = cv2.cvtColor(video_frame, cv2.COLOR_BGR2GRAY)
    
    faces = face_classifier.detectMultiScale(gray_image, 1.1, 5, minSize=(40, 40))
    for (x, y, w, h) in faces:
        roi = np.array([[x, y, w, h]])  # Convert (x, y, w, h) to a NumPy array
        # if lbf_check(gray_image, roi) > landmark_threshhold:
        crop_img = video_frame[y : y + h, x : x + w]
        pil_crop_img = Image.fromarray(crop_img)
        rounded_scores, cam = detect_emotion(pil_crop_img)  
        
        if counter == 0:
            max_emotion = update_max_emotion(rounded_scores) 
        
        plot_heatmap(x, y, w, h, cam, pil_crop_img, video_frame)
        print_max_emotion(x, y, max_emotion, video_frame) # displays the max_emotion according to evaluation_frequency
        print_all_emotion(x, y, w, rounded_scores, video_frame) # evaluates every video_frame for debugging
        # draw bounding box on face
        cv2.rectangle(video_frame, (x, y), (x + w, y + h), font_color, 2)

    return faces

cam_or_video = 'camera'
video_path = 'test_video/test_video_noemotions02.mp4'

def create_video(output_file='eval_video.mp4'):
    video_capture = cv2.VideoCapture(video_path)
    fps = int(video_capture.get(cv2.CAP_PROP_FPS))
    frame_width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    # output_path = 'eval_video.avi'
    out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height))
    return out, video_capture 

# def create_webcam_video(output_file='output_video.avi', duration_seconds=10):
def create_cam_video(output_file='cam_eval_video.mp4'):
    video_capture = cv2.VideoCapture(0)
    frame_width = int(video_capture.get(3))
    frame_height = int(video_capture.get(4))
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_file, fourcc, 10.0, (frame_width, frame_height))
    print(out.isOpened())
    return out, video_capture


if cam_or_video == 'camera':
    out, video_capture = create_cam_video()
elif cam_or_video == 'video':
    out, video_capture = create_video()
else: 
    print('unknown input')
    print('please enter camera or video')

counter = 0
evaluation_frequency = 5

# loop for Real-Time Face Detection
while True:

    result, video_frame = video_capture.read()  # read frames from the video
    if result is False:
        break  # terminate the loop if the frame is not read successfully
    
    faces = detect_bounding_box(video_frame, counter)  # apply the function we created to the video frame, faces as variable not used
    
    cv2.imshow("My Face Detection Project", video_frame)  # display the processed frame in a window named "My Face Detection Project"

    out.write(video_frame)  # write the processed frame to the output video file
    
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break
    
    counter += 1
    if counter == evaluation_frequency:
        counter = 0

hook.unregister_hook()        
video_capture.release()
out.release()
cv2.destroyAllWindows()

loading data from : lbfmodel.yaml
landmark_detector < cv2.face.Facemark 0x111a737d0>


OpenCV: FFMPEG: tag 0x44495658/'XVID' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


True


  heatmap = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET)


In [6]:
# # def create_webcam_video(output_file='output_video.avi', duration_seconds=10):
# def create_webcam_video(output_file='webcam_eval_video.avi'):
#     # Open the webcam (default camera, 0)
#     cap = cv2.VideoCapture(0)

#     # fps_start_time = 0
#     # fps = 0
    
#     # Get the default Width and Height of the frames
#     frame_width = int(cap.get(3))
#     frame_height = int(cap.get(4))

#     # Define the codec and create a VideoWriter object
#     fourcc = cv2.VideoWriter_fourcc(*'XVID')
#     out = cv2.VideoWriter(output_file, fourcc, 23.0, (frame_width, frame_height))

#     # # Get the start time
#     # start_time = time.time()

#     # Capture video for the specified duration
#     while True:
#         ret, frame = cap.read()

#         # fps_end_time = time.time()

#         # Write the frame into the file
#         out.write(frame)

#         # Display the resulting frame
#         cv2.imshow('Webcam Video', frame)

#         # # Check if the specified duration has passed
#         # elapsed_time = time.time() - start_time
#         # if elapsed_time >= duration_seconds:
#         #     break

#         # Break the loop if 'q' key is pressed
#         if cv2.waitKey(1) & 0xFF == ord('q'):
#             break

#     # Release everything when the job is finished
#     cap.release()
#     out.release()
#     cv2.destroyAllWindows()

# if __name__ == "__main__":
#     create_webcam_video()
