In [None]:
import os
import pandas as pd
import cv2
import numpy as np
import re

# Define the directory containing the movies
stimuli_dir = "Stimuli"

# Get a list of all .mp4 files in the directory
movie_files = [f for f in os.listdir(stimuli_dir) if f.endswith(".mp4")]

# Get a list of all .mp4 files in the directory
movie_files = [f for f in os.listdir(stimuli_dir) if f.endswith(".mp4")]

# Create a pandas DataFrame with the filenames
filenames = pd.DataFrame(movie_files, columns=["Filename"])

# Read emotion_ratings.csv and isolate video name and high_low_code
ratings = pd.read_csv("emotion_ratings.csv", sep=',')[['video','high_low_code']].drop_duplicates(subset='video')

missing = filenames[~filenames['Filename'].isin(ratings['video'])]

if missing.empty:
    print("✅ All filenames in df have a match in ratings['video']")
    df = ratings.copy()
else:
    print("❌ Some filenames are missing in ratings['video']:")
    print(missing)

df
# df['high_low'] = df['high_low_code'].str.split('_').str[-1]
df['high_low'] = df['high_low_code'].str.extract(r'_(high|low)$')

# Select only the Filename and high_low column
# (video column renamed 'Filename' as required by code further down)
df = df[['video','high_low']].rename(columns={'video': 'Filename'})
df.head()

✅ All filenames in df have a match in ratings['video']


Unnamed: 0,Filename,high_low
0,AK_A_32.mp4,high
1,AK_A_04.mp4,low
2,AK_D_12.mp4,high
3,AK_D_02.mp4,low
4,AK_F_24.mp4,high


In [None]:
# Add columns for the emotion type
import re

# Function to extract the emotion from the filename
def extract_emotion(filename):
    match = re.match(r"^[^_]+_([^_]+)_", filename)  # Regex to capture the emotion
    if match:
        return match.group(1)  # Return the emotion letter (e.g., D, N, F)
    else:
        return None  # If no match, return None (or you can use 'Unknown' as default)

# Apply the function to the DataFrame
df['Emotion'] = df['Filename'].apply(extract_emotion)

# Define the mapping of letters to emotion names
emotion_map = {
    'D': 'Disgust',
    'N': 'Neutral',
    'F': 'Fear',
    'H': 'Happy',
    'P': 'Pain',
    'A': 'Anger',
    'S': 'Sadness'
}

df['Emotion_Type'] = df['Emotion'].map(emotion_map)
df

Unnamed: 0,Filename,high_low,Emotion,Emotion_Type
0,AK_A_32.mp4,high,A,Anger
1,AK_A_04.mp4,low,A,Anger
2,AK_D_12.mp4,high,D,Disgust
3,AK_D_02.mp4,low,D,Disgust
4,AK_F_24.mp4,high,F,Fear
5,AK_F_02.mp4,low,F,Fear
6,AK_H_15.mp4,high,H,Happy
7,AK_H_28.mp4,low,H,Happy
8,AK_N_09.mp4,high,N,Neutral
9,AK_N_01.mp4,low,N,Neutral


In [None]:
# Downsample the initial frames. The new frames will have for each "block" of
# pixels the average value over all the pixels in the are
def downsample_frame(frame, new_size):
    """
    Downsamples a 2D grayscale frame to a smaller size.

    Parameters:
        frame (np.ndarray): 2D array representing the frame.
        new_size (tuple): (new_width, new_height)

    Returns:
        resized_frame (np.ndarray): Resized frame of shape (new_height, new_width)
    """
    resized_frame = cv2.resize(frame, new_size, interpolation=cv2.INTER_AREA)
    return resized_frame


# Calculate motion energy on the resampled frames
def calculate_motion_energy(movie_path, downsample_size):
    """
    Calculates motion energy for a movie after downsampling frames.

    Parameters:
        movie_path (str): Path to the movie file.
        downsample_size (tuple): (new_width, new_height) after downsampling.

    Returns:
        motion_energy (np.ndarray): 1D array of motion energy values (one per megapixel).
    """
    cap = cv2.VideoCapture(movie_path)

    if not cap.isOpened():
        raise ValueError(f"Cannot open video: {movie_path}")

    frames = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        small_frame = downsample_frame(gray_frame, downsample_size)
        frames.append(small_frame)

    cap.release()

    frames = np.array(frames)  # Shape: (time, height_small, width_small)

    # Calculate standard deviation across time axis (axis=0)
    pixel_std = np.std(frames, axis=0)  # Shape: (height_small, width_small)

    # Flatten to a 1D vector
    motion_energy = pixel_std.flatten()

    return motion_energy


# # E.g. calculating motion_energy on the resampled movie for one movie
#
# filename = df['Filename'][0]
# movie_path = os.path.join(stimuli_dir, filename)
#
# new_size = (24, 20)  # (width, height)
# energy_vector = calculate_motion_energy(movie_path, downsample_size=new_size)
# print(energy_vector)  # Should print (24 * 20,) = (480,)





In [None]:
# Calculate motion energy on all movies and return an np.array of m-by-b
# where n is the number of movies and b is the number of "megapixels" in the
# resampled

# Define the downsample size
# new_size = (5, 4)  # (width, height) after downsampling  FOR TESTING ONLY
new_size = (24, 20)  # (width, height) after downsampling



# List to store the motion energy vectors
motion_energy_list = []

# Loop over each movie filename in the DataFrame
for filename in df["Filename"]:
    movie_path = os.path.join(stimuli_dir, filename)
    energy_vector = calculate_motion_energy(movie_path, downsample_size=new_size)
    motion_energy_list.append(energy_vector)

# Stack into a 2D array
motion_energy_matrix = np.vstack(motion_energy_list)

# Check the shape
print(motion_energy_matrix.shape)


(56, 480)


In [None]:
# Add the motion energy columns to the df


# First, generate column names
num_megapixels = motion_energy_matrix.shape[1]
motion_energy_columns = [f"Motion_Energy_{i+1}" for i in range(num_megapixels)]

# Create a new DataFrame with the motion energy data
motion_energy_df = pd.DataFrame(motion_energy_matrix, columns=motion_energy_columns)

# Concatenate the original df and the motion_energy_df horizontally
df_full = pd.concat([df, motion_energy_df], axis=1)

# Done!
df_full.head()


Unnamed: 0,Filename,high_low,Emotion,Emotion_Type,Motion_Energy_1,Motion_Energy_2,Motion_Energy_3,Motion_Energy_4,Motion_Energy_5,Motion_Energy_6,...,Motion_Energy_471,Motion_Energy_472,Motion_Energy_473,Motion_Energy_474,Motion_Energy_475,Motion_Energy_476,Motion_Energy_477,Motion_Energy_478,Motion_Energy_479,Motion_Energy_480
0,AK_A_32.mp4,high,A,Anger,0.425145,0.0,0.0,0.499307,0.453518,0.0,...,3.315476,3.739065,10.512491,1.480249,1.789667,3.328401,22.444021,45.90775,3.022194,0.510282
1,AK_A_04.mp4,low,A,Anger,0.453518,0.0,0.0,0.497222,0.407682,0.0,...,1.560639,1.348283,3.831637,0.474415,0.800363,1.004147,15.033964,12.544988,0.387656,0.223297
2,AK_D_12.mp4,high,D,Disgust,0.306892,0.223297,0.0,0.407682,0.160073,0.223297,...,3.848589,2.443404,5.355764,0.425145,0.440347,0.521026,0.965112,12.256379,0.782425,0.364642
3,AK_D_02.mp4,low,D,Disgust,0.0,0.269657,0.160073,0.160073,0.0,0.338032,...,0.387656,0.499307,15.824698,0.493728,0.623301,0.547596,3.462102,7.347951,0.497222,0.160073
4,AK_F_24.mp4,high,F,Fear,0.0,0.269657,0.407682,0.160073,0.269657,0.0,...,4.715742,55.605954,5.638215,1.299904,5.190018,4.045448,12.102975,55.89504,60.073247,5.742211


In [None]:
# Save the final dataframe to a CSV file
output_filename = "motion_energy_SUBSAMP.csv"
df_full.to_csv(output_filename, index=False)

print(f"Saved motion energy data to {output_filename}")

Saved motion energy data to motion_energy_SUBSAMP.csv
