In [1]:
import sys
import os
BASE_PATH = './../dynaphos-core/'
sys.path.append(BASE_PATH)

import time

import cv2
import numpy as np
import torch

from dynaphos.image_processing import sobel_processor, canny_processor
from dynaphos.simulator import GaussianSimulator
from dynaphos.utils import load_params, load_coordinates_from_yaml, Map
from dynaphos.cortex_models import \
    get_visual_field_coordinates_from_cortex_full

from tqdm import tqdm
import matplotlib.pyplot as plt

import pandas as pd

In [2]:
params = load_params('../dynaphos-core/config/params.yaml')

In [3]:
EYE_FRAMERATE = 200

params['thresholding']['use_threshold'] = False
coordinates_cortex = load_coordinates_from_yaml(
    '../dynaphos-core/config/grid_coords_dipole_valid.yaml', n_coordinates=1000)
coordinates_cortex = Map(*coordinates_cortex)
coordinates_visual_field = get_visual_field_coordinates_from_cortex_full(
    params['cortex_model'], coordinates_cortex)
simulator = GaussianSimulator(params, coordinates_visual_field)
resolution = params['run']['resolution']
params['run']['fps'] = EYE_FRAMERATE
fps = params['run']['fps']

device='cuda:0'

In [4]:
def edge_detection(frame, method='sobel', sigma = 2, canny_threshold=200):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    ksize = int(sigma // 2 * 6 + 3) # (odd integer > 3 and > 3 * sigma) 
    blurred = cv2.GaussianBlur(gray, (ksize, ksize), sigma)
    if method == 'sobel':
        edges = sobel_processor(blurred)
    elif method == 'canny':
        edges = canny_processor(blurred, canny_threshold // 2,canny_threshold)
    else:
        raise NotImplementedError("choose method='sobel' or method='canny'")
    edges = (edges - edges.min()) / (edges.max()-edges.min()) # normalize to range [0, 1]
    return edges

def float2CV(img):
    """converting float img with values in range [0,1] to openCV uint8 format )"""
    return np.clip((255*img),0,255).astype('uint8')


In [15]:
def process_video(): 
    x_offset, y_offset = resolution[0]//2, resolution[1] //2
    padded = np.zeros((height + resolution[1], width + resolution[0]))
    
    video_frame_idx = 0
    gaze_frame_idx = 0


    # Video writers 
    fourcc = cv2.VideoWriter_fourcc(*'MJPG')
    frame_writer = cv2.VideoWriter(os.path.join(save_dir,'orig.avi'), fourcc, EYE_FRAMERATE, (width, height), True)
    edge_writer = cv2.VideoWriter(os.path.join(save_dir,'edges.avi'), fourcc, EYE_FRAMERATE, (width, height), False)
    patch_writer = cv2.VideoWriter(os.path.join(save_dir,'edge_patches.avi'), fourcc, EYE_FRAMERATE, resolution, False)
    phs_writer = cv2.VideoWriter(os.path.join(save_dir,'phosphene_patches.avi'), fourcc, EYE_FRAMERATE, resolution, False)
    out_writer = cv2.VideoWriter(os.path.join(save_dir,'phosphenes.avi'), fourcc, EYE_FRAMERATE, (width, height), False)
    
    merged_writer = cv2.VideoWriter(os.path.join(save_dir,'merged.avi'), fourcc, EYE_FRAMERATE, (width*3, height), True)
    merged_patch_writer = cv2.VideoWriter(os.path.join(save_dir,'merged_patch.avi'), fourcc, EYE_FRAMERATE, (resolution[0]*2, resolution[1]), False)

    # Read video frame-by-frame
    ret, frame = cap.read()
    while(cap.isOpened() and ret): 
        
        frame = cv2.resize(frame, (width,height))

        # Do edge-detection and padding
        edges = edge_detection(frame)
        padded[y_offset:y_offset+height, 
               x_offset:x_offset+width] = edges



        # Read all gaze frames for this video frame
        while video_frame_lookup[gaze_frame_idx] == video_frame_idx:

            # Gaze-coordinates in original image
            x_pos = int(np.round(gaze_x[gaze_frame_idx] * width))
            y_pos = int(np.round(gaze_y[gaze_frame_idx] * height))

            # Draw circle for gaze location
            frame_with_gaze = cv2.circle(frame.copy(), (x_pos, y_pos), radius=x_offset, color=(0,0,150), thickness=2)
            edges_with_gaze = cv2.circle(float2CV(edges.copy()), (x_pos, y_pos), radius=x_offset, color=(255,255,255), thickness=2)

            # Gaze-coordinates in padded images
            x_pos = np.clip(x_pos, 0, width) + x_offset 
            y_pos = np.clip(y_pos, 0, height) + y_offset 

            # Extract patch from padded image (edges)
            patch = padded[y_pos-y_offset:y_pos+y_offset, # Square patch centered at x_pos, y_pos 
                           x_pos-x_offset:x_pos+x_offset]

            # Create phosphenes from input patch (edges)
            with torch.no_grad():
                act_mask = torch.from_numpy(patch).to(device)
                stim = simulator.sample_receptive_fields(act_mask) * 90e-6
                phs = simulator(stim).cpu().numpy()

            # Put the phosphenes in an output frame using the gaze-coordinates
            output = np.zeros_like(padded)
            output[y_pos-y_offset:y_pos+y_offset, 
                   x_pos-x_offset:x_pos+x_offset] = phs 
            
            output = output[y_offset:y_offset+height,x_offset:x_offset+width] # exclude the padding


            
            # Write frames to video files 
            frame_writer.write(frame_with_gaze)
            edge_writer.write(edges_with_gaze)
            patch_writer.write(float2CV(patch))
            phs_writer.write(float2CV(phs))
            out_writer.write(float2CV(output)) 
            
            # Write to merged video file
            edges_bgr = cv2.cvtColor(edges_with_gaze,cv2.COLOR_GRAY2BGR)
            output_bgr = cv2.cvtColor(float2CV(output),cv2.COLOR_GRAY2BGR)
            merged = np.concatenate([frame_with_gaze,edges_bgr,output_bgr], axis=1)
            merged_writer.write(merged)
            merged_patch_writer.write(np.concatenate([float2CV(patch),float2CV(phs)], axis=1))

            # load next gaze frame
            if gaze_frame_idx < len(gaze_data)-1:
                gaze_frame_idx += 1
            else:
                break

        # load next video frame
        ret, frame = cap.read()
        video_frame_idx +=1

    cap.release()
    edge_writer.release()
    patch_writer.release()
    phs_writer.release()
    out_writer.release()

In [None]:
for folder in ['ObjectGrabbing', 'ObjectViewing', 'Walking', 'Cycling']:
    # Data Directory
    data_dir = f'../../_Datasets/MobileEyeTracker/{folder}/'
    save_dir = f'./out/{folder}'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)


    # Load video data
    rescale = 0.8
    cap = cv2.VideoCapture(os.path.join(data_dir, 'world.mp4'))
    video_fps = cap.get(cv2.CAP_PROP_FPS)
    width, height = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    width = int(np.round(width * rescale))
    height = int(np.round(height * rescale))
    


    # Load gaze data
    gaze_data = pd.read_csv(os.path.join(data_dir, 'gaze_positions.csv'))
    gaze_x, gaze_y = gaze_data.norm_pos_x, gaze_data.norm_pos_y
    gaze_y = 1-gaze_y # the coordinate origin is bottom left
    video_frame_lookup = gaze_data.world_index - gaze_data.world_index.min()
    
    simulator.reset()
    process_video()