In [1]:
import gymnasium as gym
import math
import os
from gym import Env
from gym import spaces
from gym.spaces import Box, Discrete
import numpy as np
from stable_baselines3.common.env_checker import check_env
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
from gym.envs.classic_control import rendering
import pygame
from pygame.locals import *

In [2]:
# Function to calculate distance between two points in screen.
def dist_calculator(point_a, point_b):
    return np.linalg.norm(point_a - point_b)

In [3]:

class GazeEnv(Env):
    def __init__(self, width, distance, ocular_noise, spatial_noise):
        self.width = math.radians(width) # Diameter of target in radians
        self.distance = math.radians(distance) # Distance from start to target in radians
        self.ocular_noise = ocular_noise # Oculomotor noise
        self.spatial_noise = spatial_noise # Visual spatial noise
        self.theta = None # Angle of target from start position
        self.current_pos = np.array([0, 0]) # Current gaze position or fixation
        self.max_fixations = 10000 # Maximum number of saccades allowed
        self.action_space = spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float64) # Gaze fixation
        self.observation_space = spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float64) # Current gaze position
        self.state_space = spaces.Box(low=-1, high=1, shape=(2, ), dtype=np.float64) # Target location
        self.belief_space = spaces.Box(low=-1, high=1, shape=(2, ), dtype=np.float64)
        self.reward_range = (-1, 0) # Reward range
        #self.viewer = None # Viewer for rendering
        
        #To visualise the env.
        pygame.init()
        self.screen = pygame.display.set_mode((1080, 720))
        self.clock = pygame.time.Clock()

    def reset(self):
        self.theta = np.random.uniform(low=0, high=2*np.pi)
        self.target_pos = np.array([self.distance * np.cos(self.theta), self.distance * np.sin(self.theta)])
        self.current_pos = np.array([0, 0])
        self.num_fixations = 1 # 1 because gaze is fixated at [0,0] initially
        self.observation, self.observation_uncertainity = self.get_observation()
        self.belief, self.belief_uncertainity = self.observation, self.observation_uncertainity #As per paper, belief is based on agent's observation
        
        return self.belief
    
    
    def step(self, action):
        saccade_movement = dist_calculator(self.current_pos, action)
        noise_std = np.random.normal(0, self.ocular_noise*saccade_movement, action.shape) #Generate random noise from gaussian distribution- ".normal(loc, scale, size)"" 
        self.current_pos = np.clip(action + noise_std, -1, 1)
        
        self.num_fixations += 1
        
        #Check the distance between gaze & target.
        dist_btw_gaze_and_target = dist_calculator(self.current_pos, self.target_pos)
        
        if dist_btw_gaze_and_target < self.width/2: #As per paper if distance is less half of target width, the gaze is in target region.
            reward = 0
            done = True
        else:
            reward = -1
            done = False
            self.observation, self.observation_uncertainity = self.get_observation()
            self.belief, self.belief_uncertainity = self.get_belief()
            
        if self.num_fixations > self.max_fixations:
            done = True
            
        
        #dict to store other essential values.
        addon_dict = { 'target_pos' : self.target_pos,
                       'current_pos': self.current_pos,
                       'belief': self.belief,
                       'action': action,
                       'num_fixation': self.num_fixations}

        return self.belief, reward, done, addon_dict
    
    def get_observation(self):
        gaze_displacement = dist_calculator(self.target_pos,self.current_pos) #gaze eccentricity
        observation_uncertainty = gaze_displacement
        spatial_noise=np.random.normal(0, self.spatial_noise*gaze_displacement, self.target_pos.shape) # visual spatial noise is calculated by gaze & target eccentricity.
        observation=np.clip(self.target_pos + spatial_noise, -1, 1)
        
        return observation, observation_uncertainty
    
    def get_belief(self):
        new_observation, new_observation_uncertainity = self.observation, self.observation_uncertainity
        prev_belief, prev_belief_uncertainity = self.belief, self.belief_uncertainity
        scale_obs = pow(prev_belief_uncertainity, 2) / (pow(new_observation_uncertainity, 2) + pow(prev_belief_uncertainity, 2))
        scale_belief = pow(new_observation_uncertainity, 2) / (pow(new_observation_uncertainity, 2) + pow(prev_belief_uncertainity, 2))
        new_belief = scale_obs * prev_belief + scale_belief * new_observation
        new_belief_uncertainity = (pow(prev_belief_uncertainity, 2) * pow(new_observation_uncertainity, 2)) / (pow(new_observation_uncertainity, 2) + pow(prev_belief_uncertainity, 2))
        
        return new_belief, new_belief_uncertainity

    def render(self, mode='human'):
        
        # Fill the screen with white
        self.screen.fill((255, 255, 255))
        screen_width = 1080
        screen_height = 720
        
        world_width = 2 #To be consistent with (x,y) coordinates of agent's position
        world_height = 2
        scale = screen_width/world_width
        radius = int(self.width*scale/2)
        target_pos_pix = np.array([int(self.target_pos[0]*scale+screen_width/2), int(self.target_pos[1]*scale+screen_height/2)])
        current_pos_pix = np.array([int(self.current_pos[0]*scale+screen_width/2), int(self.current_pos[1]*scale+screen_height/2)])
        
        #To nullify pygame not responding problem.
        pygame.event.get()
        
        #Draw target position
        pygame.draw.circle(self.screen, (255, 0, 0), target_pos_pix, radius)
        
        #Draw gaze position
        pygame.draw.circle(self.screen, (0, 0, 255), current_pos_pix, 10)
        
        # Update the display
        pygame.display.update()
        
        return radius, scale, current_pos_pix, target_pos_pix, self.target_pos[0], self.target_pos[1], self.current_pos[0], self.current_pos[1], self.target_pos, self.current_pos

In [4]:
def train(w, d, ocular_noise, spatial_noise, run):
    env = GazeEnv(w, d, ocular_noise, spatial_noise)
    log_path = r"D:\Research-Project\infotech2023_jayakumar\log-directory"
    model = PPO('MlpPolicy', env, verbose=1, tensorboard_log=log_path)
    model.learn(total_timesteps=400000)
    print('run', run)
    model_path = os.path.join('saved-models', 'PPO_Gaze_Model_4L_trained_all')
    model.save(model_path)

In [5]:
env = GazeEnv(1.5, 10, 0.08, 0.09) #width and distance in degrees of visual angle as mentioned in our paper.

In [30]:
width = np.array([5, 4, 3, 2, 1.5, 1])
distance = np.array([5, 10])
ocular_noise=0.07
spatial_noise=0.09

In [None]:
run = 0
for w in width:
            for d in distance:
                run+=1        
                train(w, d, ocular_noise, spatial_noise, run)

In [None]:
check_env(env, warn=True)

In [None]:
episodes = 20
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0
    while not done:
        env.render()
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        #env.save_image(episode)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))
env.close()

In [7]:
pygame.quit()

In [None]:
print(info)

Training the Model

In [None]:
log_path = r"D:\Research-Project\infotech2023_jayakumar\log-directory"
model = PPO('MlpPolicy', env, verbose=1, tensorboard_log=log_path)
model.learn(total_timesteps=5000000)

To Save the trained models.

In [63]:
model_path = os.path.join('saved-models', 'PPO_Gaze_Model_50L_trained_1w_10d_10ksteps')
model.save(model_path)

In [None]:
evaluate_policy(model, env, n_eval_episodes=10, render=True)

In [None]:
model = PPO.load('D:\Research-Project\infotech2023_jayakumar\saved-models\PPO_Gaze_Model_50L_trained.zip', env = env)

In [None]:
episodes = 10
for episode in range(1, episodes+1):
    obs = env.reset()
    done = False
    score = 0 
    
    while not done:
        env.render()
        action, no_use = model.predict(obs)
        obs, reward, done, info = env.step(action)
        #env.save_image(episode)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))
env.close()