In [1]:
import numpy as np
from stable_baselines3 import PPO
from gymnasium import spaces, Env

In [5]:
class MultiMarkEnv(Env):
    def __init__(self, config):
        self.min_x = -400
        self.max_x = 400
        self.min_y = -400
        self.max_y = 400
        self.MAX_SPEED = 10
        self.TURNING_RATE = 12.5 * 180 / np.pi
        self.ITERS_PER_ACTION = 5
        self.SPEED_PENALTY = 0.4
        self.SPEED_RECOVERY_IN_SECONDS = 4
        self.MAX_REMAINING_SECONDS = config['max_remaining_seconds']
        self.target_x = 0
        self.target_y = 0

        self.MAX_DISTANCE = np.sqrt((self.max_x - self.min_x) ** 2 + (self.max_y - self.min_y) ** 2)

        self.coords = config['coords']
        
        self.observation = np.zeros((6,))
        self.action_space = spaces.Box(low=-1, high=1, shape=(1,))
        
        # fields = [
        #   :distance,
        #   :vmg,
        #   :heading,
        #   :angle_to_target,
        #   :has_tacked,
        #   :has_reached_target
        # ]
        
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(6,))
        self.reward_range = spaces.Box(low=-1, high=self.MAX_SPEED, shape=())

    def reset(self, seed = None, options = None):
        super().reset(seed=seed)
        # Initialization logic
        # Initialize state variables: x, y, speed, etc.
        # Return the initial observation

        self.tack_count = 0
        self.heading = 0
        self.angle_to_mark = 0
        self.speed = 0
        self.vmg = 0
        self.has_tacked = False
        self.remaining_seconds = self.MAX_REMAINING_SECONDS
        self.delta_t = 0

        random_observation = np.zeros(self.observation_space.shape)  

        coord_idx = np.random.choice(np.arange(0, self.coords.shape[0]))
        coord = self.coords[coord_idx]

        self.x = coord[0]
        self.y = coord[1]
        self.target_x = coord[2]
        self.target_y = coord[3]
        self.distance = np.sqrt((self.target_x - self.x) ** 2 + self.target_y ** 2)
        self.initial_distance = self.distance

        self.observation[0] = self.distance / self.MAX_DISTANCE
        
        return self.observation, {}

    def step(self, action):
        self.apply_action(action).calculate_reward().is_terminal()

        self.observation = np.stack([
            self.distance / self.MAX_DISTANCE,
            self.vmg,
            self.heading,
            self.angle_to_mark,
            self.has_tacked,
            self.is_terminal and not self.is_truncated
        ])
        
        return self.observation, self.reward, self.is_terminal, self.is_truncated, {}

    def render(self, mode='human'):
        pass

    def apply_action(self, action):
        dtheta = action * np.pi
        prev_heading = self.heading
        
        turning_time = np.abs(dtheta) / self.TURNING_RATE
        dt = turning_time / self.ITERS_PER_ACTION
        
        dtheta_steps = dt * np.sign(dtheta) * np.broadcast_to(self.TURNING_RATE, (self.ITERS_PER_ACTION))
        heading_steps = np.cumsum(dtheta_steps) + prev_heading
        heading_steps = self.wrap_phase(heading_steps)

        speed_steps = self.speed_from_heading(heading_steps)

        tacking_mask = heading_steps < np.pi != prev_heading < np.pi
        # cumulative max for boolean:
        tacking_mask = np.cumsum(tacking_mask) > 0

        speed_penalty_multiplier = 1 - self.SPEED_PENALTY

        penalized_speed_steps = np.where(tacking_mask, speed_penalty_multiplier * speed_steps, speed_steps)
        has_tacked = np.any(tacking_mask)
        tack_count = self.tack_count + has_tacked

        dy = dt * np.cos(heading_steps) * penalized_speed_steps
        dx = dt * np.sin(heading_steps) * penalized_speed_steps

        x = self.x + np.sum(dx)
        y = self.y + np.sum(dy)

        heading = heading_steps[-1]
        speed = speed_steps[-1]
        
        speed_steps = np.linspace(speed, speed / speed_penalty_multiplier, num=self.SPEED_RECOVERY_IN_SECONDS)
        x = x + np.sin(heading) * np.sum(speed_steps)
        y = y + np.cos(heading) * np.sum(speed_steps)
        speed = speed_steps[-1]

        dx = self.target_x - x
        dy = self.target_y - y

        angle_to_mark = self.wrap_phase(np.atan2(dx, dy))

        target_unit = np.stack([np.cos(angle_to_mark), np.sin(angle_to_mark)])
        heading_unit = np.stack([np.cos(heading), np.sin(heading)])

        vmg = (target_unit @ heading_unit) * speed
        delta_t = turning_time + self.SPEED_RECOVERY_IN_SECONDS

        distance = np.sqrt((self.target_x - x) ** 2 + (self.target_y - y) ** 2)

        self.x = x
        self.y = y
        self.tack_count = tack_count
        self.heading = heading
        self.angle_to_mark = angle_to_mark
        self.speed = speed
        self.vmg = vmg
        self.has_tacked = has_tacked
        self.distance = distance
        self.remaining_seconds -= delta_t
        self.delta_t = delta_t

        return self
        
    def wrap_phase(self, angles):
        return np.remainder(np.remainder(angles, 2 * np.pi) + 2 * np.pi, 2 * np.pi)

    def speed_from_heading(self, headings):
        raise "not implemented"

    def is_terminal(self):
        if self.distance < 20:
            self.is_terminal = True
            self.is_truncated = False
            return self

        has_collided = self.x < self.MIN_X or self.x > self.MAX_X or self.y < self.MIN_Y or self.y > self.MAX_Y
        
        if has_collided or self.remaining_seconds < 1:
            self.is_terminal = True
            self.is_truncated = True
            return self

        self.is_terminal = False
        self.is_truncated = False
        return self

    def calculate_reward(self):
        if self.is_terminal and not self.is_truncated:
            # good ending
            self.reward = 1.0
            return self

        if self.is_terminal and self.is_truncated:
            self.reward = -self.distance / self.initial_distance
            return self

        if self.has_tacked:
            self.reward = -0.1
            return self

        self.reward = -0.01 * self.vmg / self.MAX_SPEED
        return self

In [6]:
# Configuration

r = 250

config = {
    'max_tacks': 2,
    'coords': np.array([[0, 0, 0, r], [0, r, 0, 0]]),
    'max_remaining_seconds': 500
}

# Initialize Environment
env = MultiMarkEnv(config)

# Initialize PPO model
model = PPO("MlpPolicy", env, verbose=1)

# Train the model
model.learn(total_timesteps=20000)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


TypeError: exceptions must derive from BaseException