This notebook captures and saves frames that are associated with left or right turn in the DR(eye)VE dataset. Turns identified by the automated turn dectection pipeline provide an insight into where approximate a turn takes place.

### Code execution and intended outcome
The automated turn dectection pipeline utilized speed, relative car position (course), longtitude and latitude information to identity the frames likely associated with a left or right turn. 

The final frames use for the model training are extracted based on the excel file locates at: $\verb|../data/dreyeve/participant_driving_data.xlsx|$, which contains the manual annotated turns.

In [1]:
import os
import configparser
import sys

# Get the parent directory path
parent_directory = os.path.dirname(os.path.abspath('__file__'))

# Construct the path to the config.ini file in the parent directory
config_file_path = os.path.join(parent_directory, '..', 'config.ini')

# Load the configuration file
config = configparser.ConfigParser()
config.read(config_file_path)

# Access configuration parameters
dreyeve_data_path = config['paths']['dreyeve_data_path']

parent_dir = os.path.abspath(os.path.join(os.path.abspath(''), '..'))
sys.path.append(parent_dir)

In [3]:
import datetime
import glob
import os
import pickle
import shutil
from pathlib import Path

import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from preprocessing.heatmap_generator import *
from preprocessing.process_driving_video import *
from IPython.display import Video

dreyeve_data_dir = f'..data/dreyeve_data_path/'
data_dir_list = sorted(glob.glob(os.path.join(dreyeve_data_dir,'*[!.txt]')))
result_output_path = 'real_motor_epochs/'

get_turn_frame = False

# Automated turn dectection pipeline

In [2]:
def read_speedcourse_df(speedcourse_dir, et_dir):

    """
    This function takes two string arguments for the directory and returns a dataframe with information associated
    with the frames and a dataframe for eye tracking information for each frame.

    Args:
        speedcourse_dir (str): directory to the speed, course, and coordinate .txt file
        et_dir (str): directory to the eye tracking .txt file 

    Return:
        speedcourse_df_sec (df): dataframe with all driving/car related data as well as turn identified with 
        those informatoin. Only rows with longtitude and latitude information are available (~ every 1 second)
        et_df (df): dataframe for eye tracking data for each frame
    """

    # load .txt file 
    speedcourse_df = pd.read_csv(speedcourse_dir, delimiter = '\t', header = None)
    et_df = pd.read_csv(et_dir, delim_whitespace= True, header = 0)
    # add column for time and rename columns and index
    speedcourse_df = speedcourse_df.iloc[:, 0:5]
    speedcourse_df.columns = ['frame', 'speed', 'course','lat','lon']
    speedcourse_df['time_sec'] = speedcourse_df.frame / (25)
    speedcourse_df['time_min'] = speedcourse_df.time_sec.apply(lambda x: str(datetime.timedelta(seconds=x)))
    speedcourse_df = speedcourse_df.set_index(['frame'])
    # compute change in latitude and longitude information
    speedcourse_df_sec = (speedcourse_df.dropna()).copy() # drop rows without latitude or longtitude information
    speedcourse_df_sec['diff_lat'] = np.gradient(speedcourse_df_sec['lat'])
    speedcourse_df_sec['diff_lat_delta'] = speedcourse_df_sec.diff_lat.diff()
    speedcourse_df_sec['diff_lon'] = np.gradient(speedcourse_df_sec['lon'])
    speedcourse_df_sec['diff_lon_delta'] = speedcourse_df_sec.diff_lon.diff()
    # compute change in relative car position
    speedcourse_df_sec['course_delta'] = speedcourse_df_sec.course.diff()
    # rescale the course difference. keep the range of course_delta between -180 to 180. 
    # e.g difference between 359 and 1 should be -2
    speedcourse_df_sec['course_delta_adj'] = speedcourse_df_sec.course_delta.apply(lambda x: x - 360 if x > 180 
                                                                                   else 360 - abs(x) if x < -180 
                                                                                   else x)
    # initial turn detection with speed and change in car position. Adjust sensitive of turn detection based on speed.
    # car returns to its initial position quicker during high speed condition. Adjusting turn detection sensitive by
    # changing the threshold (degree change in car position) to classify the turns.

    # High speed condition - Make the detection more sensitive
    speedcourse_df_sec['turn_dir'] = speedcourse_df_sec.apply(lambda x: 'Left' if (x.course_delta_adj <= -4 and x.speed >= 50) 
                                                                        else 'Right' if (x.course_delta_adj >= 4 and x.speed >= 50) 
                                                                        else 'Straight', axis = 1) 
    # low speed condition - Make the detection less sensitive
    speedcourse_df_sec['turn_dir'] = speedcourse_df_sec.apply(lambda x: 'Left' if (x.course_delta_adj < -5 and x.speed < 50) 
                                                                        else 'Right' if (x.course_delta_adj > 5 and x.speed < 50) 
                                                                        else x.turn_dir, axis = 1)
    # set speed threshold for identification of turns
    speedcourse_df_sec['turn_dir'] = speedcourse_df_sec.apply(lambda x: 'Straight' if x.speed < 20 else x.turn_dir, axis = 1)

    return speedcourse_df_sec, et_df

In [3]:
def find_turn_frame(speedcourse_df_sec, convert2etg = True):

    '''
    This function takes two arguments to generate finalized the detected turn.

    Args:
        speedcourse_df_sec (df): dataframe contains preliminary detected turn and other frame related informatoin
        convert2etg (bool): boolean that determine if frame number will to be convert to match video with frequency
    Returns:
        turn_frame_list (list): list of frames where the turn takes place
        turn_dir_list (list): list of turn actions corresponding to the turn_frame_list
    '''

    turn_frame_list = []
    turn_dir_list = []
    turn_dir_prev = None

    for frame_no, row in speedcourse_df_sec.iterrows():

        turn_dir_curr = row.turn_dir

        # further verify if the detected turn should be consider as a turn with longtitude and latitude information
        if (abs(row.diff_lon_delta) < 1e-10 or abs(row.diff_lat_delta) < 1e-10) and (turn_dir_curr == "Straight"):
            turn_dir_curr = turn_dir_prev
            speedcourse_df_sec.at[frame_no, 'turn_dir_adj'] = turn_dir_prev
        # eliminate all frame that are straight and only keep the starting frame of the turn for consecutive frame with
        # with the same turn actions
        if turn_dir_curr != 'Straight' and turn_dir_curr != turn_dir_prev:
            frame_saved = frame_no - 15 # frame offset to capture driving scene before turn happens
            # eliminate frames close to each other
            if (not turn_frame_list) or (frame_saved - turn_frame_list[-1] >= 100):
                turn_frame_list.append(frame_saved)
                turn_dir_list.append(row.turn_dir)

        turn_dir_prev = turn_dir_curr
    
    # convert frame number to match eye tracking video that has sampling rate of 30Hz (9000 frame)
    ## Frames for each turn were extracted using speed_course_coord.txt file (7500 frames). 
    ## Eyetracking coordinate extracted from etg_samples.txt (9000 frames). 
    ## In order to overlay eyetracking information on frames for each turn. 
    ## These frame needs to be rescaled with frequency of 30Hz
    if convert2etg:
        turn_frame_list = [int(turn_frame*30/25) for turn_frame in turn_frame_list]

    return turn_frame_list, turn_dir_list

In [4]:
def capture_turn_frame(sbj_data_dir, runid, turn_frame_list):

    '''
    This function takes the three arguments and captures frames that are associated with any turn actions.

    Args:
        sbj_data_dir (str): directory of the run
        runid (int): number of the run
        turn_frame_list (list): list of frames where the turn takes place

    '''

    speedcourse_vid = os.path.join(sbj_data_dir,'video_etg.avi')
    video = cv2.VideoCapture(speedcourse_vid)

    # Check if the video file was successfully opened
    if not video.isOpened():
        print("Error opening video file")

    save_frames_path = os.path.join(result_output_path,f'run_{runid}/saved_frames')

    Path(save_frames_path).mkdir(parents=True, exist_ok=True)

    for turn_frame_idx in turn_frame_list:
        # set video to the frame where the turn happens - avoid looping through entire video
        video.set(cv2.CAP_PROP_POS_FRAMES, turn_frame_idx)
        # capture and save the frame 
        ret, frame = video.read()
        if ret:
            cv2.imwrite(os.path.join(save_frames_path,f"frame_{turn_frame_idx}.jpg"), frame)
        else: 
            break

# Annotated frames extraction and processing

In [5]:
# Regular frame's premotor period is defined as 1 second before turn actions are executing. 
# For turns that are executed when the car approaches full stop before making the turn, the premotor period is 
# longer than the 1 second. The decision of making the turn is likely made when participants decided to slow down the car 
# to prepare for the turn
# The code below is to identify premotor frame that are made at low speed/approaching stop. 

# directory of file with annotated turns
all_driving_data_path = os.path.join(dreyeve_data_dir,"run_driving_data.xlsx")

# with threshold = 20 or 15, 2 consecutive "low speed" turns (row 3602 and 3602 or row 4808 and 4810) with different 
# direction will end up conflicting each other and have the same premotor period 
speed_threshold = 10
# load file with annotated turns
driving_data_df = pd.read_excel(all_driving_data_path)

start_idx = 0
premotor_frame_ls_df = None

# turn frames at low speed
low_speed_frames = (pd.notna(driving_data_df['Direction'])) & (driving_data_df['speed'] < speed_threshold)
low_speed_frames_idx = list(driving_data_df[low_speed_frames].index)

for idx in low_speed_frames_idx:
    # identify the frames before the participant reaches the target low speed threshold. The last frame will be the premotor 
    # period frame
    premotor_frame = driving_data_df.iloc[start_idx:idx][driving_data_df.iloc[start_idx:idx].speed >= speed_threshold].iloc[-1]
    # the low speed frame associated with the premotor period frame
    premotor_frame['low_speed_frames'] = driving_data_df.iloc[idx].frame
    
    # the current run number 
    low_speed_frame_runid = driving_data_df.iloc[idx]['run_id']
    premotor_frame_runid = premotor_frame['run_id']
    
    # to ensure the frames identified are for the same run
    if low_speed_frame_runid == premotor_frame_runid:
        start_idx = idx
        premotor_frame_ls_df = pd.concat([premotor_frame_ls_df, premotor_frame],axis=1)
    else:
        # Low speed turn frame detected in frame 21 for partcipant 26. Premotor period not available
        low_speed_frame_no = driving_data_df.iloc[idx]['frame']
        print(f'Low speed turn frame detected in frame {low_speed_frame_no} for partcipant {low_speed_frame_runid}. Premotor period not available')
        continue
        
premotor_frame_ls_df = premotor_frame_ls_df.T.iloc[:, [0,1,2,4,-1]] # extract only frame, id, speed, time, and low speed frame

In [6]:
# Find annotated frames from 

def find_steering_frame(driving_data_df, premotor_frame_ls_df, convert2etg = True):

    '''
    This function takes 3 arguments to return the finalized list of frames for the turns

    Args:
        driving_data_df (df): the dataframe with annotated turns for each frames
        premotor_frame_ls_df (df): the premotor period dataframe for turn  with low speed 
        convert2etg (bool): boolean that determine if frame number will to be convert to match video with frequency

    Returns: 
        all_turn_frame_list (dict): dictionary with a list of identified turn frame for each runs
    '''
    
    all_turn_frame_list = {} 

    for run in range(1, 75):

        # use only portion of the dataframe that is specifc to the run
        run_driving_data_df = driving_data_df[driving_data_df.run_id == run]
        turn_frame_list = list(run_driving_data_df[pd.notna(run_driving_data_df['Direction'])]['frame'])
        turn_dir_list = list(run_driving_data_df[pd.notna(run_driving_data_df['Direction'])]['Direction'])
        turn_dir_encoded_list = [0 if steer == "Left" else 1 for steer in turn_dir_list]

        # handle low speed frame's premotor frame
        if run in list(premotor_frame_ls_df.run_id):
            # use only portion of the premotor frame dataframe that is specifc to the run
            run_premotor_frame_ls_df = premotor_frame_ls_df[premotor_frame_ls_df.run_id == run]
            # address runs that have mix low speed frame and "normal" speed frames.
            premotor_frame_list = [frame - 25 if frame not in list(run_premotor_frame_ls_df.low_speed_frames)
                                   else list(run_premotor_frame_ls_df[run_premotor_frame_ls_df.low_speed_frames == frame].frame)[0]
                                   for frame in turn_frame_list]
        else: 
        # handle runs only with "normal" speed frame's premotor frame
            premotor_frame_list = list(np.array(turn_frame_list) - 25)

        # eliminate turn does not have a premotor period (1 second), i.e. turns take place within the 1st second.
        premotor_frame_list = [premotor_frame if premotor_frame >= 0 else 0 for premotor_frame in premotor_frame_list]

        # convert frame number to match eye tracking video that has sampling rate of 30Hz
        if convert2etg:
            turn_frame_list = [int(turn_frame*30/25) for turn_frame in turn_frame_list]
            premotor_frame_list = [int(premotor_frame*30/25) for premotor_frame in premotor_frame_list]
        
        # save turn frame related information to a dictionary by run number
        all_turn_frame_list[run] = (premotor_frame_list, 
                                    turn_frame_list, 
                                    turn_dir_list, 
                                    turn_dir_encoded_list)

    return all_turn_frame_list
    

In [None]:
all_steering_dict = find_steering_frame(driving_data_df, premotor_frame_ls_df,convert2etg = True)

# capture the turn frame
if get_turn_frame:
    for runid, data_dir in enumerate(data_dir_list[:-2]):

        print(f"Extracting run {runid + 1} motor frame ...")
        # directory for the .txt file for eyetracking and car speed, position, and coordinate information
        speedcourse_dir = os.path.join(data_dir,'speed_course_coord.txt')
        et_dir = os.path.join(data_dir,'etg_samples.txt')

        # load dataframe and extract list of frames associated with turns
        speedcourse_df, et_df = read_speedcourse_df(speedcourse_dir, et_dir)
        premotor_frames_list = all_steering_dict[runid+1][0]
        turn_frames_list = all_steering_dict[runid+1][1]
        turn_dir_list = all_steering_dict[runid+1][2]
        turn_dir_encoded_list = all_steering_dict[runid+1][3]

        # Extract frames for each major and minor turns or lane changes
        capture_turn_frame(data_dir, runid+1, premotor_frames_list)
        # driving video directory
        speedcourse_vid = os.path.join(data_dir,'video_etg.avi')
        # save list of turn frames and associated information
        for idx in range(len(premotor_frames_list)):
            premotor_frame = premotor_frames_list[idx]
            frame_info = {
                "run": runid+1,
                "trial": idx+1, 
                "premotor_frame": premotor_frame,
                "turn_frame": turn_frames_list[idx],
                "steering_dir": turn_dir_list[idx],
                "steering_dir_encoded": turn_dir_encoded_list[idx],
                }
            # generate fixation map, edges, and original image ... etc
            process_driving_video(premotor_frame, et_df, frame_info, output_path = result_output_path, vid_path=speedcourse_vid, 
                            save_prepend=f'run_{runid+1}/frame{premotor_frame}', show_inline_images=False)
