# Emotion Detection Proof of Concept

In [1]:
import warnings
warnings.filterwarnings('ignore')
import dataclasses

from fer import Video
from fer import FER
import scenedetect
import cv2
from pytube import YouTube

from numpy import ndarray
import pandas

2024-02-08 22:14:24.632383: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-02-08 22:14:24.632429: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-02-08 22:14:24.633424: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-02-08 22:14:24.639512: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


#### Download video

In [2]:
yt_id = "aQoFrRq6Ds8"

In [3]:
url = f"https://www.youtube.com/watch?v={yt_id}"

###### Get the first 720p stream url

In [4]:
yt = YouTube(url)
video_stream = yt.streams
stream_url = video_stream.filter(res="720p").first().url

##### stream the video for cv2 and scenedetect

In [5]:
capture = cv2.VideoCapture(stream_url)
video = scenedetect.VideoCaptureAdapter(capture)

##### detect the scenes

In [6]:
scene_manager = scenedetect.SceneManager()
scene_manager.add_detector(scenedetect.ContentDetector())
scene_manager.detect_scenes(video=video)
scene_list = scene_manager.get_scene_list()

INFO:pyscenedetect:Downscale factor set to 5, effective resolution: 256 x 137
INFO:pyscenedetect:Detecting scenes...


Restart the video so that we can run emotion detection on it

In [7]:
capture.set(cv2.CAP_PROP_POS_FRAMES, 0)

True

#### Emotion detection

In [8]:
detector = FER(mtcnn=True)

2024-02-08 22:14:37.158122: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:274] failed call to cuInit: CUDA_ERROR_UNKNOWN: unknown error
2024-02-08 22:14:37.158148: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:129] retrieving CUDA diagnostic information for host: max
2024-02-08 22:14:37.158156: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:136] hostname: max
2024-02-08 22:14:37.158244: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:159] libcuda reported version is: 545.29.6
2024-02-08 22:14:37.158267: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:163] kernel reported version is: 545.29.6
2024-02-08 22:14:37.158273: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:241] kernel version seems to match DSO: 545.29.6


In [9]:
predictions = pandas.DataFrame(columns=["frame", "timestamp", "emotion", "score"])

In [10]:
fps = capture.get(cv2.CAP_PROP_FPS)
success, image = capture.read()
count = 0
length = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))

while success:
    # to save time, we only run the emotion detection on 1 out of every 3 rows
    if count % 3 ==0:
        emotion, score = detector.top_emotion(image) # 'happy', 0.99
        predictions.loc[len(predictions)] = {
            "frame": count,
            "timestamp": count/fps,
            "emotion": emotion,
            "score": score,
        }
    success, image = capture.read()
    count += 1

#### Collect tags into groups of "scenes"
Why are we using scene detection? We split the video into many images and those images are assigned emotions by the emotion detection algorithm. If every image in a single scene was assigned "happy" then we want only one "happy" tag to be created at the beginning of the scene. However, if the next scene was assigned "happy" we want to create an additional tag on the video to indicate that it is a separate "happy" scene.

In [11]:
@dataclasses.dataclass
class Scene:
    scene_number: int
    start_frame: int
    end_frame: int

In [12]:
scene_boundaries = []
for i, scene in enumerate(scene_list):
    scene_boundaries.append(
        Scene(
            i,
            scene[0].get_frames(),
            scene[1].get_frames()-1,
        )
    )

In [13]:
def get_scene(frame):
    for scene in scene_boundaries:
        if frame < scene.end_frame:
            return scene.scene_number
    return scene_boundaries[-1].scene_number

In [14]:
predictions["scene"] = predictions["frame"].map(get_scene)

In [15]:
predictions.to_csv(f"initial_predictions - {yt_id}.csv", index=False)

#### If you don't want to rerun the predictions, start here
### cleaning up the output predictions
The emotion detection alogorithm will produce some very low score predictions and sometimes it will produce no prediction. We assign low score and missing predictions a value of "no_emotion" based on the parameters we set in the cell below. The "neutral" prediction is extremely common, so to avoid filling the data with an unnnecessary number of "neutral" tags, we set a minimum threshold to only include "neutral" when it is a very strong prediction.

In [16]:
minimum_score = .8
exclude_neutral_min_score = .96

In [17]:
output = pandas.read_csv(f"initial_predictions - {yt_id}.csv")

In [18]:
output["old_emotion"] = output["emotion"] # this is for debugging purposes
output["score"] = output["score"].fillna(0)

output["emotion"] = output.apply(lambda x: "no_emotion" if x["score"] < minimum_score else x["emotion"], axis=1)
output["emotion"] = output.apply(
    lambda x: "no_emotion" if x["emotion"] == "neutral" and x["score"] < exclude_neutral_min_score else x["emotion"], 
    axis=1
)

#### detect shifts in emotions
A shift in emotion is when the emotion changes within a given scene. If the first 10 seconds of a scene showed a happy person and the next 10 seconds showed the person being sad, then we would want 2 tags: one "happy" tag at the beginning of the scene and then a "sad" tag at the 10 second mark when the emotion shifts. In order to determine an emotion "shift" we want to look at a window of predictions and store the mode of that

In [19]:
# how many images to consider at a time when deciding if an emotion shift
steps = 3

In [20]:
def most_common_tag_and_frame(list_of_rows):
    """
    Detect the most common emotion that was detected in a collection of images 
    and the frame that this emotion occurs on first
    Returns None if there is a tie or if all of the rows are labeled "no_emotion"
    """
    tags = []
    for r in list_of_rows:
        if r["emotion"] != "no_emotion":
            tags.append(r["emotion"])
    if not tags:
        return None
    counts = {tag: tags.count(tag) for tag in tags}
    if len(set(tags)) > 1 and len(set(counts.values())) == 1:
        return None
    most_common = max(set(tags), key=counts.get)
    for row in list_of_rows:
        if row["emotion"] == most_common:
            prevalence = counts[most_common]/len(tags)
            return most_common, row["frame"]

In [21]:
def get_emotion_shifts(dataframe_of_scenes):
    """
    returns a list of frames that indicate when the emotion shifted
    """
    rows = []
    for i, r in dataframe_of_scenes.iterrows():
        rows.append(r)
    if len(dataframe_of_scenes) < steps:
        frame = most_common_tag_and_frame(rows)[1]
        return [frame, ]

    output_list = []
    i = 0
    last_emotion = None
    while i + steps <= len(rows):
        most_common = most_common_tag_and_frame(rows[i:i+steps])
        if most_common is not None:
            current_emotion = most_common[0]
            frame = most_common[1]
            if last_emotion != current_emotion:
                last_emotion = current_emotion
                output_list.append(frame)
        i += 1
    return output_list

In [22]:
emotion_shifts = []
for scene in output["scene"].unique():
    scene_df = output[output["scene"] == scene]
    if len(scene_df):
        emotion_shifts.extend(
            get_emotion_shifts(scene_df),
        )

output["emotion_shift"] = output["frame"].map(lambda x: x in emotion_shifts)

In [23]:
output["emotion_shift"] = output["frame"].map(lambda x: x in emotion_shifts)

In [24]:
output.to_csv(f"emotion_detection - {yt_id}.csv")

#### Combine everything for a function that takes in a filename and outputs a mapping of timestamps to emotions tags

In [25]:
from videobookmarks.tag import get_video_details
from videobookmarks.datamodel.datamodel import PostgresDataModel
import os
from datetime import date

In [26]:
DB_URL = os.getenv("DB_URL")
datamodel = PostgresDataModel(DB_URL)

In [27]:
video_id = datamodel.load_video_id(yt_id)

In [28]:
if video_id is None:
    video_details = get_video_details(yt_id)
    video_id = datamodel.create_video_id(
        yt_id,
        video_details["thumbnail_url"],
        video_details["title"],
    )

In [29]:
video_id = datamodel.load_video_id(yt_id)
user_id = datamodel.get_user_with_name("mesterhammerfic").id
description = f"""
minimum score: {minimum_score} | 
exclude neutral: {exclude_neutral_min_score} | 
step size: {steps}
"""
tag_list_id = datamodel.create_tag_list(f"emotion detection test {date.today()}", description, user_id)

In [30]:
for index, row in output[(output["emotion_shift"] == True) & (output.emotion != "no_emotion")].iterrows():
    datamodel.add_tag(
        row["emotion"],
        row["timestamp"],
        tag_list_id,
        video_id,
        user_id,
    )

In [None]:
video_id = datamodel.load_video_id(yt_id)

if video_id is None:
    video_details = get_video_details(yt_id)
    video_id = datamodel.create_video_id(
        yt_id,
        video_details["thumbnail_url"],
        video_details["title"],
    )

video_id = datamodel.load_video_id(yt_id)
user_id = datamodel.get_user_with_name("mesterhammerfic").id
description = f"""
minimum score: {minimum_score} | 
exclude neutral: {exclude_neutral_min_score} | 
step size: {steps}
"""
tag_list_id = datamodel.create_tag_list(f"emotion detection test {date.today()}", description, user_id)

for index, row in output[(output["emotion_shift"] == True) & (output.emotion != "no_emotion")].iterrows():
    datamodel.add_tag(
        row["emotion"],
        row["timestamp"],
        tag_list_id,
        video_id,
        user_id,
    )