In [1]:
import logging 
import numpy as np 
import time

import gym
from gym import spaces

from ipynb.fs.full.preprocessing  import Detector, Transformer, Calibrator
'''from ipynb.fs.full.camera import CameraClient
from ipynb.fs.full.axes import AcsServiceClient
from ipynb.fs.full.scanner import ScannerClient'''

import matplotlib.pyplot as plt
import scipy
from skimage.measure import label, regionprops
import skimage.filters as skfilt
import random

logging.getLogger().setLevel(logging.INFO)

In [4]:
class AxesReachEnv(gym.GoalEnv):   
   
    def __init__(self, distance_threshold, img_size, axes_range, dot_detection_threshold, max_episode_steps, 
                 transform_matrix, init_mirr_pos, training=True, reward_type='shaped', move_velocity=200):
        
        self.img_size = img_size
        self.axes_range = axes_range
        self.training = training
        self.reward_type = reward_type
        self.distance_threshold = distance_threshold 
        self.dot_detection_threshold = dot_detection_threshold
        self.max_episode_steps = max_episode_steps 
        self.move_velocity = move_velocity
        self.transform_matrix = transform_matrix
        self.init_mirr_pos = init_mirr_pos
        
        logging.info("Hardware initialization")
       
        # -- establish connection to hardware devices
        self.scanner = ScannerClient()
        self.cam = CameraClient()
        self.axes = AcsServiceClient()
        
        # -- initialize the transformer 
        self.transformer = Transformer(self.transform_matrix)
        
        # -- initialize the spaces
        self.action_space = spaces.Box(-1., 1., shape = (2,), dtype=np.float32) # stage_x, stage_y (absolute movements)           
        self.observation_space = spaces.Dict({
             'observation': spaces.Box(-1., 1., shape=(2,),  dtype='float32'), # current stage_x and stage_y positions 
             'achieved_goal': spaces.Box(-1., 1., shape=(2,), dtype='float32'), # the dot where the laser beam is pointing now
             'desired_goal': spaces.Box(-1., 1., shape=(2,), dtype='float32')  # current black dot position on the image
            })
        
        # scanner move to the initial static position
        
        self.scanner.jump(np.array(self.init_mirr_pos)) 
        self.init_beam_pos = self.detect_beam()
       

    def step(self, action):
        previous_axes_pos = self._rescale_action(self.current_axes_position) # in real stages coordintaes
        # take an action and move the stages
        rescaled_action = self._rescale_action(action)
        self.axes.move_to_position_with_scalar_velocity(np.array(rescaled_action), self.move_velocity)
        
        self.current_axes_position = action # scaled to [-1, 1]
        time.sleep(0.1)
        
        ''' TRANFORMATIONS'''
        ''' compute where on the image the laser beam is pointing after the stages performed a move '''
        delta_move = np.array([np.array(rescaled_action) - previous_axes_pos])
        # -- laser beam movemenet would inverse from the real dot on the object 
        self.current_beam_pos -= self.transformer.transform(delta_move)
         
        obs = self._get_obs()
        
        self.current_step += 1
        done = False
        info = {
            'is_success': self._is_success(obs['achieved_goal'], obs['desired_goal']),
        }
        if self.current_step >= self.max_episode_steps or info['is_success']:
            done = True
            
        reward = self.compute_reward(obs['achieved_goal'], obs['desired_goal'], info)
        return obs, reward, done, info
     
        
    def compute_reward(self, achieved_goal, goal, info):        
        # Compute distance between goal and the achieved goal.
        d = np.linalg.norm(achieved_goal - goal, axis=-1)
        if self.reward_type == 'shaped':
            return -d
        else:
            return (d < self.distance_threshold).astype(np.float)
        
        
    # start up function prior every episode
    def reset(self):
        self.current_step = 0
        if self.training:
            self.goal = self._sample_dot().copy() # sample random dot position in pixels
        else:
            self.goal = self._detect_dot().copy() # detect the black dot position
        # -- move stages to the initial position
        self.current_axes_position = self._sample_random_axes_pos_and_move() # init axes stages 
        self.current_beam_pos = self.init_beam_pos
        obs = self._get_obs()

        return obs   
  

    def detect_beam(self):
        img = self.cam.take_picture()
        # -- smooth and find a threshold
        thresh_val = skfilt.threshold_otsu(img)
        img = scipy.ndimage.gaussian_filter(img, sigma=1)
        laser_mask = img > thresh_val
        
        # -- label image regions 
        label_laser = label(laser_mask)
        props_laser = regionprops(label_laser)
        if len(props_laser) > 1 or len(props_laser) == 0: 
            return None
        return np.array(props_laser[0].centroid, dtype=np.float32)
    
    
    def render(self):
        raise NotImplementedError()

 
    def close(self):
        if self.viewer:
            self.viewer.close()
            self.viewer = None   
        
        
    def _get_obs(self):
        desired_goal_scaled = self._scale_dot(self.goal.copy())
        achieved_goal_scaled = self._scale_dot(self.current_beam_pos.copy())
        return {
            'observation': self.current_axes_position.copy(),      
            'achieved_goal': achieved_goal_scaled,         
            'desired_goal': desired_goal_scaled                    
        }        
        
        
    def _detect_dot(self):
        img = self.cam.take_picture()
        
        dot_mask = img < self.dot_detection_threshold
        # label the region
        label_dot = label(dot_mask)
        props_dot = regionprops(label_dot)

        return np.array(props_dot[0].centroid, dtype=np.float32)
    
    
    def _sample_dot(self):
        dot = [random.uniform(0, self.img_size[0]), random.uniform(0, self.img_size[1])]
        return dot.copy()
    
    
    def _sample_random_axes_pos_and_move(self):
        """
        Sample moving to the reference position
        """
        init_pos = np.array([0, 0], dtype=np.float32) # TODO
        self.axes.move_to_position_with_scalar_velocity(init_pos, self.move_velocity)
        time.sleep(0.1)
        return init_pos
    
    
    def _is_success(self, achieved_goal, desired_goal):
        d = np.linalg.norm(achieved_goal - desired_goal, axis=-1)
        return (d < self.distance_threshold)
    
    
    def _rescale_action(self, action): # -- to the real stages movements 
        x_min = self.axes_range[0][0]
        x_max = self.axes_range[0][1]
        
        y_min = self.axes_range[1][0]
        y_max = self.axes_range[1][1]        
        
        low=np.array([x_min, y_min])
        high=np.array([x_max, y_max]) 
        return low + (0.5 * (action + 1.0) * (high - low))
    
    
    def _scale_dot(self, goal): # -- to [-1, 1]
        low = np.array([0, 0])
        high = np.array([self.img_size[0], self.img_size[1]])
        return 2.0 * ((goal - low) / (high - low)) - 1.0