In [1]:
import pandas as pd
import numpy as np
import json
import scipy.signal
import matplotlib.pyplot as plt

def vel_filter(ang_vel_all):
    b, a = scipy.signal.butter(2, 0.1, 'lowpass')
    return scipy.signal.filtfilt(b, a, ang_vel_all)

# Load data
data = pd.read_csv('GazeDataEval/gaze_data_eval_video_testing.csv', sep=',')

# Load JSON file for trial details
with open("eval_trial_videos.json", "r") as f:
    trial_details = json.load(f)

# Process segment JSON data
def process_segments(trial_details):
    trial_data = []
    for trial in trial_details:
        trial_id = trial['trial_id']
        video_path = trial['video_path']
        initial_crosshair = trial['initial_crosshair_placement']

        trial_data.append({
            'trial_id': trial_id,
            'video_path': video_path,
            'initial_crosshair_x': initial_crosshair['x'],
            'initial_crosshair_y': initial_crosshair['y']
        })
    return pd.DataFrame(trial_data)

segment_df = process_segments(trial_details)

# Add columns if needed
def add_video_columns(df):
    video_names = df[['trial_id', 'video_path']].rename(columns={'video_path': 'associated_video'})
    df = df.merge(video_names, on='trial_id', how='left')
    return df

segment_df = add_video_columns(segment_df)

# Map TrialID to segments in CSV data
def map_camera_motion_and_kick_force(data_df, segment_df):
    data_df = data_df.merge(
        segment_df[['trial_id', 'initial_crosshair_x', 'initial_crosshair_y', 'video_path']],
        left_on='TrialID', right_on='trial_id', how='left'
    )
    return data_df

data = map_camera_motion_and_kick_force(data, segment_df)

# Screen parameters
# screen_width = 2560
# screen_height = 1440
# screen_diagonal_size = 27.5  # inches
# distance_to_screen = 0.5842  # meters

screen_width = 3840
screen_height = 2160

# FILTERING FOR VALID TRIALS
data = data[data['TrialSection'] == "TRIAL"]
data = data[data['TrialID'] == 1]

# Process data
data['RelativeTime'] = data.groupby('TrialNumber')['Time'].transform(lambda x: x - x.min())
data['MappedPogX'] = data['BestPogX'] * screen_width
data['MappedPogY'] = data['BestPogY'] * screen_height

# Apply filter to each trial
def apply_filter_per_trial(df, column_name, new_column_name):
    df[new_column_name] = df.groupby('TrialNumber')[column_name].transform(lambda x: vel_filter(x.fillna(0)))
    return df

data = apply_filter_per_trial(data, 'MappedPogX', 'FilteredPogX')
data = apply_filter_per_trial(data, 'MappedPogY', 'FilteredPogY')

# Discard trials with high angular error
def discard_trials_with_high_angular_error(df, angular_error_threshold):
    trials_to_discard = df[df['AngularError'] > angular_error_threshold]['TrialNumber'].unique()
    return df[~df['TrialNumber'].isin(trials_to_discard)]

# angular_error_threshold = 30.0
# data = discard_trials_with_high_angular_error(data, angular_error_threshold)


Helper functions


In [2]:
import os
import cv2
import json
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Polygon
from matplotlib.animation import FFMpegWriter

def remap_filled_segmentation(object_points, video_width, video_height, screen_width, screen_height):
    """
    Remap filled segmentation points to fit the screen size based on the aspect ratio of the video and screen.

    Args:
        object_points: np.array, the points defining the filled object segmentation.
        video_width: int, width of the video.
        video_height: int, height of the video.
        screen_width: int, width of the screen.
        screen_height: int, height of the screen.

    Returns:
        np.array: Remapped segmentation points.
    """
    # Scale and adjust points
    remapped_points = np.copy(object_points)  # Avoid modifying the original points
    remapped_points[:, 1] = object_points[:, 0]/video_height * screen_height
    remapped_points[:, 0] = object_points[:, 1]/video_width * screen_width

    return remapped_points


def get_video_dimensions(mask_directory):
    """
    Detect video dimensions from the first mask image in the directory.

    Args:
        mask_directory: str, path to the directory containing mask images.

    Returns:
        tuple: (video_width, video_height)
    """
    for file_name in os.listdir(mask_directory):
        if file_name.endswith(".png"):
            mask_path = os.path.join(mask_directory, file_name)
            mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
            return mask.shape[1], mask.shape[0]  # (width, height)
    return 512, 320  # Default to 1280x720 if no masks are found

def calculate_centroid(points):
    """
    Calculate the centroid of a set of points.

    Args:
        points: np.array, array of points (x, y).

    Returns:
        tuple: (centroid_x, centroid_y)
    """
    if len(points) == 0:
        return None
    centroid_x = np.mean(points[:, 0])
    centroid_y = np.mean(points[:, 1])
    return centroid_x, centroid_y


def calculate_velocity(centroid1, centroid2, time_delta, screen_width, screen_height, distance_from_screen):
    """
    Calculate the velocity (on-screen and angular) of an object.

    Args:
        centroid1: tuple, previous centroid (x, y).
        centroid2: tuple, current centroid (x, y).
        time_delta: float, time difference between frames.
        screen_width: int, width of the screen.
        screen_height: int, height of the screen.
        distance_from_screen: float, distance from the screen (meters).

    Returns:
        tuple: (on_screen_velocity, angular_velocity)
    """
    if centroid1 is None or centroid2 is None or time_delta == 0:
        return 0, 0

    dx = (centroid2[0] - centroid1[0]) / screen_width
    dy = (centroid2[1] - centroid1[1]) / screen_height

    # On-screen velocity in normalized units
    on_screen_velocity = np.sqrt(dx**2 + dy**2) / time_delta

    # Angular velocity (degrees/sec)
    angular_velocity_radians = np.arctan(on_screen_velocity / distance_from_screen)
    angular_velocity_degrees = np.degrees(angular_velocity_radians)

    return on_screen_velocity, angular_velocity_degrees


Params


In [3]:
output_video_path = "fps_clip_1_clip_annotated_video.mp4"
mask_directory = "fps_clip_1_clip_masks"
frame_output_directory = "fps_clip_1_clip_frames_output"
object_id = 8
distance_from_screen = 0.5  # Distance from screen in meters

# Detect video dimensions from masks
video_width, video_height = get_video_dimensions(mask_directory)

In [4]:
import os
import cv2
import json
import numpy as np

# Ensure the output directory for frames exists
if not os.path.exists(frame_output_directory):
    os.makedirs(frame_output_directory)

# Prepare video writer
fps = 8
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (screen_width, screen_height))

# Variables to track previous centroid and time
previous_centroid = None
previous_time = None

unique_frames = data['VideoFrameNumber'].unique()
for frame_idx in unique_frames:
    # Match gaze data for the current frame
    frame_data = data[data['VideoFrameNumber'] == frame_idx]
    time_tick = frame_data['TimeTick'].iloc[0]
    
    gaze_points = frame_data[['MappedPogX', 'MappedPogY']].to_numpy()

    # Load mask points for the object
    mask_file = os.path.join(mask_directory, f"frame_{frame_idx}_points.json")
    if os.path.exists(mask_file):
        with open(mask_file, 'r') as f:
            points_data = json.load(f)

        object_points = np.array(points_data[str(object_id)]['points']) if str(object_id) in points_data else None
    else:
        object_points = None

    # Draw annotations on a black background
    frame = np.zeros((screen_height, screen_width, 3), dtype=np.uint8)
    if object_points is not None:
        remapped_object_points = remap_filled_segmentation(object_points, video_width, video_height, screen_width, screen_height)
        for point in remapped_object_points:
            cv2.circle(frame, (int(point[0]), int(point[1])), radius=2, color=(255, 255, 255), thickness=-1)

        # Calculate centroid
        centroid = calculate_centroid(remapped_object_points)
        if centroid:
            cv2.circle(frame, (int(centroid[0]), int(centroid[1])), radius=5, color=(0, 255, 0), thickness=-1)

            # Calculate velocity if we have a previous centroid
            if previous_centroid is not None and previous_time is not None:
                time_delta = (time_tick - previous_time) / 1e6  # Convert TimeTick to seconds
                on_screen_velocity, angular_velocity = calculate_velocity(
                    previous_centroid, centroid, time_delta, screen_width, screen_height, distance_from_screen
                )
                print(f"Frame {frame_idx}: On-Screen Velocity: {on_screen_velocity:.2f}, Angular Velocity: {angular_velocity:.2f} degrees/sec")

            # Update previous centroid and time
            previous_centroid = centroid
            previous_time = time_tick

    for gaze_point in gaze_points:
        gaze_x, gaze_y = int(gaze_point[0]), int(gaze_point[1])
        cv2.circle(frame, (gaze_x, gaze_y), radius=5, color=(0, 0, 255), thickness=-1)
    # Write annotated frame to video
    video_writer.write(frame)

    # Save each frame as an image
    frame_filename = os.path.join(frame_output_directory, f"frame_{frame_idx}.png")
    cv2.imwrite(frame_filename, frame)

    print(f"Processed frame {frame_idx}", end='\r')

video_writer.release()
print("Finished creating velocity-annotated video.")


Frame 1: On-Screen Velocity: 0.53, Angular Velocity: 46.44 degrees/sec
Frame 2: On-Screen Velocity: 0.20, Angular Velocity: 21.38 degrees/sec
Frame 3: On-Screen Velocity: 0.15, Angular Velocity: 16.88 degrees/sec
Frame 4: On-Screen Velocity: 0.09, Angular Velocity: 10.10 degrees/sec
Frame 5: On-Screen Velocity: 0.21, Angular Velocity: 22.52 degrees/sec
Frame 6: On-Screen Velocity: 0.00, Angular Velocity: 0.01 degrees/sec
Frame 7: On-Screen Velocity: 0.18, Angular Velocity: 19.94 degrees/sec
Frame 8: On-Screen Velocity: 0.07, Angular Velocity: 7.97 degrees/sec
Frame 9: On-Screen Velocity: 0.00, Angular Velocity: 0.02 degrees/sec
Frame 10: On-Screen Velocity: 0.10, Angular Velocity: 11.32 degrees/sec
Frame 11: On-Screen Velocity: 0.07, Angular Velocity: 7.58 degrees/sec
Frame 12: On-Screen Velocity: 0.03, Angular Velocity: 3.67 degrees/sec
Frame 13: On-Screen Velocity: 0.09, Angular Velocity: 10.41 degrees/sec
Frame 14: On-Screen Velocity: 0.06, Angular Velocity: 6.63 degrees/sec
Frame 1