# References
https://gymnasium.farama.org/environments/toy_text/

https://gymnasium.farama.org/tutorials/gymnasium_basics/environment_creation/#sphx-glr-tutorials-gymnasium-basics-environment-creation-py

https://cs.stanford.edu/people/karpathy/reinforcejs/

In [18]:
import sys
import pickle
import random
import contextlib
from collections import defaultdict

import numpy as np
import matplotlib.pyplot as plt

In [19]:
class Object:
    def __init__(self, type, x, y):
        self.type = type
        self.x = x
        self.y = y
        self.symbol = self.assign_symbol()
        self.point = 0
        self.assign_point()
    
    def get_position(self):
        return self.x, self.y
    
    def move(self, dx, dy):
        self.x += dx
        self.y += dy
    
    def is_drone_allowed(self):
        return getattr(self, 'drone_flyable', True)
    
    def assign_point(self):  
        points = {
            'drone': 0,
            'package_1': 25,
            'package_2': 25,
            'bird': -50,      
            'wind_up': -10,    
            'wind_down': -10,
            'wind_right': -10,
            'wind_left': -10,
            'tornado': -100,
            'destination_1': 100,
            'destination_2': 100 
        }
        self.point = points.get(self.type, 0)
        
    def assign_symbol(self):
        symbols = {
            'drone': '🚁',
            'package_1': '🎁',
            'package_2': '📦',
            'bird': '🦅',
            'wind_up': '⬆️',
            'wind_down': '⬇️',
            'wind_right': '➡️',
            'wind_left': '⬅️',
            'tornado': '🌪️',
            'destination_1': '🏠',
            'destination_2': '🛖',
            'package_delivered': '✅'
        }
        return symbols.get(self.type, '❓')
    
    @staticmethod
    def print_points():
        points = {
            'drone': 0,
            'package_1': 25,
            'package_2': 25,
            'bird': -50,      
            'wind_up': -10,    
            'wind_down': -10,
            'wind_right': -10,
            'wind_left': -10,
            'tornado': -100,
            'destination_1': 100,
            'destination_2': 100,
        }
        print("Object Type | Points")
        print("--------------------")
        for obj_type, point in points.items():
            print(f"{obj_type.ljust(12)} | {point}")

In [20]:
class Environment:
    def __init__(self, drone_position_x, drone_position_y, cols=6, rows=6, stochastic=False):
        self.total_reward = 0
        self.rows = rows
        self.cols = cols
        self.stochastic = stochastic
        self.empty = '⬜'
        self.package_delivered_1 = False
        self.package_delivered_2 = False
        self.package_picked_1 = False
        self.package_picked_2 = False
        self.terminal = False
        self.reset_env()

    def reset_env(self):
        self.total_reward = 0
        self.grid = [[self.empty for _ in range(self.cols)] for _ in range(self.rows)]
        self.init_positions()    # Initializing drone, packages, and destinations.
        self.init_obstacles()     # Adding tornado, winds, and birds.
        self.terminal = False

    def reset(self, seed=None, options=None):
        """Gymnasium-compatible reset: returns (observation, info)"""
        if seed is not None:
            random.seed(seed)
        self.reset_env()
        observation = self.grid
        info = self.render()
        return observation, info

    def init_positions(self):
        used = set()
        def get_pos():
            while True:
                pos = (random.randint(0, self.rows - 1), random.randint(0, self.cols - 1))
                if pos not in used:
                    used.add(pos)
                    return pos
        if self.stochastic:
            drone_pos = get_pos()
            pkg1_pos  = get_pos()
            dest1_pos = get_pos()
            pkg2_pos  = get_pos()
            dest2_pos = get_pos()
        else:
            drone_pos = (0, 0)
            pkg1_pos  = (2, 2)
            dest1_pos = (5, 5)
            pkg2_pos  = (4, 1)
            dest2_pos = (0, 5)
        self.drone = Object('drone', drone_pos[0], drone_pos[1])
        self.package_1 = Object('package_1', pkg1_pos[0], pkg1_pos[1])
        self.destination_1 = Object('destination_1', dest1_pos[0], dest1_pos[1])
        self.package_2 = Object('package_2', pkg2_pos[0], pkg2_pos[1])
        self.destination_2 = Object('destination_2', dest2_pos[0], dest2_pos[1])
        # Saving delivery positions to avoid overlaps with obstacles.
        self.delivery_positions = {drone_pos, pkg1_pos, dest1_pos, pkg2_pos, dest2_pos}

    def init_obstacles(self):
        # Starting with delivery-related objects.
        self.object_positions = [self.destination_1, self.package_1, self.destination_2, self.package_2]
        # Placing tornado (and its winds) avoiding delivery positions.
        if self.stochastic:
            while True:
                x = random.randint(1, self.rows - 2)
                y = random.randint(1, self.cols - 2)
                if (x, y) not in self.delivery_positions:
                    break
        else:
            x, y = 3, 3
        self.tornado = Object('tornado', x, y)
        wind_up    = Object('wind_up', x + 1, y)
        wind_down  = Object('wind_down', x - 1, y)
        wind_right = Object('wind_right', x, y + 1)
        wind_left  = Object('wind_left', x, y - 1)
        # Placing two birds. In stochastic mode, assign random positions avoiding delivery positions.
        if self.stochastic:
            birds = []
            for _ in range(2):
                while True:
                    bx = random.randint(0, self.rows - 1)
                    by = random.randint(0, self.cols - 1)
                    if (bx, by) not in self.delivery_positions:
                        birds.append(Object('bird', bx, by))
                        break
            bird_1, bird_2 = birds[0], birds[1]
        else:
            bird_1 = Object('bird', 5, 1)
            bird_2 = Object('bird', 1, 5)
        self.object_positions.extend([
            bird_1, bird_2,
            self.tornado, wind_up, wind_down, wind_right, wind_left,
            self.drone
        ])
        self.update_grid()

    def update_grid(self):
        self.grid = [[self.empty for _ in range(self.cols)] for _ in range(self.rows)]
        for obj in self.object_positions:
            if 0 <= obj.x < self.rows and 0 <= obj.y < self.cols:
                self.grid[obj.x][obj.y] = obj.symbol

    def render(self, instant=True):
        for row in zip(*self.grid):
            print(" ".join(row))
        print()

    def print_positions(self):
        for obj in self.object_positions:
            print(f"{obj.type}: ({obj.x}, {obj.y})")

    def update_drone_position(self):
        for obj in self.object_positions:
            if obj.type == 'drone':
                obj.x, obj.y = self.drone.x, self.drone.y

    def update_tornado_positions(self):
        if self.stochastic:
            if random.choice([True, False]):
                dx = random.choice([-1, 1])
                new_x = self.tornado.x + dx
                if 1 <= new_x <= self.rows - 2:
                    self.tornado.x = new_x
            else:
                dy = random.choice([-1, 1])
                new_y = self.tornado.y + dy
                if 1 <= new_y <= self.cols - 2:
                    self.tornado.y = new_y
        else:
            self.tornado.x, self.tornado.y = 3, 3
        for obj in self.object_positions:
            if obj.type == 'tornado':
                obj.x, obj.y = self.tornado.x, self.tornado.y
            elif obj.type == 'wind_up':
                obj.x, obj.y = self.tornado.x + 1, self.tornado.y
            elif obj.type == 'wind_down':
                obj.x, obj.y = self.tornado.x - 1, self.tornado.y
            elif obj.type == 'wind_right':
                obj.x, obj.y = self.tornado.x, self.tornado.y + 1
            elif obj.type == 'wind_left':
                obj.x, obj.y = self.tornado.x, self.tornado.y - 1

    def update_no_fly_zones(self):
        if self.stochastic:
            birds = [obj for obj in self.object_positions if obj.type == 'bird']
            for bird in birds:
                if random.random() < 0.1:
                    self.object_positions.remove(bird)
            if random.random() < 0.1:
                empties = [(i, j) for i in range(self.rows) for j in range(self.cols)
                           if self.grid[i][j] == self.empty]
                if empties:
                    pos = random.choice(empties)
                    self.object_positions.append(Object('bird', pos[0], pos[1]))

    def get_actual_move(self, dx, dy):
        if not self.stochastic or random.random() < 0.9:
            return dx, dy
        if dx != 0:
            return 0, random.choice([1, -1])
        if dy != 0:
            return random.choice([1, -1]), 0
        return dx, dy

    def check_reward(self, x, y):
        # For movement reward, checking if the drone lands on an obstacle.
        for obj in self.object_positions:
            if obj.x == x and obj.y == y and obj.type in ['tornado', 'wind_up', 'wind_down', 'wind_right', 'wind_left', 'bird']:
                return obj.point
        return 0

    def attempt_pickup(self, print_log=False):
        reward = 0
        # Check package 1:
        if (not self.package_picked_1 and not self.package_delivered_1 and 
            (self.drone.x, self.drone.y) == (self.package_1.x, self.package_1.y)):
            self.package_picked_1 = True
            reward += 25  # reward for successful pickup
            if self.package_1 in self.object_positions:
                self.object_positions.remove(self.package_1)
            if print_log:
                print("Picked up package 1")
        # Check package 2:
        elif (not self.package_picked_2 and not self.package_delivered_2 and 
              (self.drone.x, self.drone.y) == (self.package_2.x, self.package_2.y)):
            self.package_picked_2 = True
            reward += 25
            if self.package_2 in self.object_positions:
                self.object_positions.remove(self.package_2)
            if print_log:
                print("Picked up package 2")
        else:
            reward = -5  # penalty for unsuccessful pickup
            if print_log:
                print("Attempted pickup failed")
        self.package_status()
        return reward


    def attempt_dropoff(self, print_log=False):
        reward = 0
        # If the drone is carrying package 1:
        if self.package_picked_1:
            if (self.drone.x, self.drone.y) == (self.destination_1.x, self.destination_1.y):
                self.package_delivered_1 = True
                self.package_picked_1 = False
                # Update the destination to show the delivered package.
                self.destination_1.symbol = Object('package_delivered', 0, 0).symbol
                reward += 100
                if print_log:
                    print("Delivered package 1")
            else:
                # Dropped incorrectly: remove package from drone and create a new package object at current cell.
                self.package_picked_1 = False
                reward -= 5  # penalty for wrong dropoff
                new_pkg1 = Object('package_1', self.drone.x, self.drone.y)
                self.object_positions.append(new_pkg1)
                if print_log:
                    print("Dropped package 1 incorrectly")
        # Else if the drone is carrying package 2:
        if self.package_picked_2:
            if (self.drone.x, self.drone.y) == (self.destination_2.x, self.destination_2.y):
                self.package_delivered_2 = True
                self.package_picked_2 = False
                self.destination_2.symbol = Object('package_delivered', 0, 0).symbol
                reward += 100
                if print_log:
                    print("Delivered package 2")
            else:
                self.package_picked_2 = False
                reward -= 5  # penalty for wrong dropoff
                new_pkg2 = Object('package_2', self.drone.x, self.drone.y)
                self.object_positions.append(new_pkg2)
                if print_log:
                    print("Dropped package 2 incorrectly")
        
        if not self.package_picked_1 and not self.package_picked_2:
            # No package was carried.
            reward = -5
            if print_log:
                print("Attempted dropoff failed (no package carried)")
        
        if self.package_delivered_1 and self.package_delivered_2:
            self.terminal = True
            if print_log:
                print("Task complete: All packages delivered 😎")
        self.package_status()
        return reward

    def move_drone(self, dx, dy, render_env=False, print_log=False):
        if self.task_complete():
            return 0, True
        adx, ady = self.get_actual_move(dx, dy)
        new_x = self.drone.x + adx
        new_y = self.drone.y + ady
        if 0 <= new_x < self.rows and 0 <= new_y < self.cols:
            self.grid[self.drone.x][self.drone.y] = self.empty
            self.update_tornado_positions()
            self.update_no_fly_zones()
            self.drone.x, self.drone.y = new_x, new_y
            self.update_drone_position()
            rwd = self.check_reward(new_x, new_y)
            self.total_reward += rwd
            self.update_grid()
            if print_log:
                print(f"Drone moved to ({new_x}, {new_y}) Accumulated Reward: {self.total_reward}")
                self.package_status()
            if render_env:
                self.render()
            return rwd, self.task_complete()
        else:
            return -10, self.task_complete()

    # ----- step() method with six actions:
    #  0: left, 1: right, 2: up, 3: down, 4: pick up, 5: drop off -----
    def step_enhanced(self, action, render_env=False, print_log=False):
        if self.task_complete():
            return 0, True
        if action in [0, 1, 2, 3]:
            dx, dy = self.action_to_delta(action)
            return self.move_drone(dx, dy, render_env, print_log)
        elif action == 4:
            rwd = self.attempt_pickup(print_log)
            return rwd, self.task_complete()
        elif action == 5:
            rwd = self.attempt_dropoff(print_log)
            return rwd, self.task_complete()
        else:
            return -10, self.task_complete()
        
    def step(self, action):
        """
        Gymnasium-compatible step: accepts a single action and returns
        (observation, reward, terminated, truncated, info).
        """
        if self.task_complete():
            observation = self.grid
            return observation, 0, True, False, {}
        
        if action in [0, 1, 2, 3]:
            dx, dy = self.action_to_delta(action)
            reward, done = self.move_drone(dx, dy)
        elif action == 4:
            reward = self.attempt_pickup()
            done = self.task_complete()
        elif action == 5:
            reward = self.attempt_dropoff()
            done = self.task_complete()
        else:
            reward = -10
            done = self.task_complete()
        
        observation = self.grid
        terminated = done
        truncated = False  # No truncation logic implemented
        info = self.render()
        return observation, reward, terminated, truncated, info
    
    def action_to_delta(self, action):
        if action == 0:
            return -1, 0
        elif action == 1:
            return 1, 0
        elif action == 2:
            return 0, -1
        elif action == 3:
            return 0, 1

    def package_status(self):
        print(f"P1 - Picked: {self.package_picked_1}, Delivered: {self.package_delivered_1}")
        print(f"P2 - Picked: {self.package_picked_2}, Delivered: {self.package_delivered_2}")

    def task_complete(self):
        return self.terminal

    # Convenience wrappers for movement actions
    def drone_move_left(self, render_env=False, print_log=False):
        return self.step(0, render_env, print_log)

    def drone_move_right(self, render_env=False, print_log=False):
        return self.step(1, render_env, print_log)

    def drone_move_up(self, render_env=False, print_log=False):
        return self.step(2, render_env, print_log)

    def drone_move_down(self, render_env=False, print_log=False):
        return self.step(3, render_env, print_log)

    def random_drone_movement(self, steps=100):
        # Actions: 0: left, 1: right, 2: up, 3: down, 4: pickup, 5: dropoff.
        actions = [0, 1, 2, 3, 4, 5]
        for _ in range(steps):
            action = random.choice(actions)
            self.step_enhanced(action, render_env=True, print_log=True)
            if self.task_complete():
                break


In [21]:
# For a deterministic episode:
env_det = Environment(0, 0, stochastic=False)
env_det.render()
env_det.random_drone_movement(steps=50)

🚁 ⬜ ⬜ ⬜ ⬜ ⬜
⬜ ⬜ ⬜ ⬜ 📦 🦅
⬜ ⬜ 🎁 ⬅️ ⬜ ⬜
⬜ ⬜ ⬇️ 🌪️ ⬆️ ⬜
⬜ ⬜ ⬜ ➡️ ⬜ ⬜
🛖 🦅 ⬜ ⬜ ⬜ 🏠

Drone moved to (1, 0) Accumulated Reward: 0
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: False
⬜ 🚁 ⬜ ⬜ ⬜ ⬜
⬜ ⬜ ⬜ ⬜ 📦 🦅
⬜ ⬜ 🎁 ⬅️ ⬜ ⬜
⬜ ⬜ ⬇️ 🌪️ ⬆️ ⬜
⬜ ⬜ ⬜ ➡️ ⬜ ⬜
🛖 🦅 ⬜ ⬜ ⬜ 🏠

Drone moved to (0, 0) Accumulated Reward: 0
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: False
🚁 ⬜ ⬜ ⬜ ⬜ ⬜
⬜ ⬜ ⬜ ⬜ 📦 🦅
⬜ ⬜ 🎁 ⬅️ ⬜ ⬜
⬜ ⬜ ⬇️ 🌪️ ⬆️ ⬜
⬜ ⬜ ⬜ ➡️ ⬜ ⬜
🛖 🦅 ⬜ ⬜ ⬜ 🏠

Attempted pickup failed
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: False
Attempted pickup failed
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: False
Drone moved to (0, 1) Accumulated Reward: 0
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: False
⬜ ⬜ ⬜ ⬜ ⬜ ⬜
🚁 ⬜ ⬜ ⬜ 📦 🦅
⬜ ⬜ 🎁 ⬅️ ⬜ ⬜
⬜ ⬜ ⬇️ 🌪️ ⬆️ ⬜
⬜ ⬜ ⬜ ➡️ ⬜ ⬜
🛖 🦅 ⬜ ⬜ ⬜ 🏠

Drone moved to (0, 0) Accumulated Reward: 0
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: False
🚁 ⬜ ⬜ ⬜ ⬜ ⬜
⬜ ⬜ ⬜ ⬜ 

In [22]:
# For a stochastic episode:
env_sto = Environment(0, 0, stochastic=True)
env_sto.render()
env_sto.random_drone_movement(steps=50)

⬜ ⬜ ⬜ ⬜ 🏠 🦅
⬜ ⬜ ⬜ ⬜ ⬜ ⬜
📦 ⬜ ⬜ ⬜ ⬜ 🚁
⬜ ⬜ ⬜ ⬜ ⬅️ ⬜
🎁 ⬜ ⬜ ⬇️ 🌪️ ⬆️
⬜ ⬜ 🦅 🛖 ➡️ ⬜

Drone moved to (5, 1) Accumulated Reward: 0
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: False
⬜ ⬜ ⬜ ⬜ 🏠 🦅
⬜ ⬜ ⬜ ⬜ ⬜ 🚁
📦 ⬜ ⬜ ⬜ ⬅️ ⬜
⬜ ⬜ ⬜ ⬇️ 🌪️ ⬆️
🎁 ⬜ ⬜ ⬜ ➡️ ⬜
⬜ ⬜ 🦅 🛖 ⬜ ⬜

Drone moved to (5, 2) Accumulated Reward: -10
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: False
⬜ ⬜ ⬜ ⬜ 🏠 🦅
⬜ ⬜ ⬜ ⬜ ⬅️ ⬜
📦 ⬜ ⬜ ⬇️ 🌪️ 🚁
⬜ ⬜ ⬜ ⬜ ➡️ ⬜
🎁 ⬜ ⬜ ⬜ ⬜ ⬜
⬜ ⬜ 🦅 🛖 ⬜ ⬜

Drone moved to (4, 2) Accumulated Reward: -20
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: False
⬜ ⬜ ⬜ ⬜ 🏠 🦅
⬜ ⬜ ⬜ ⬅️ ⬜ ⬜
📦 ⬜ ⬇️ 🌪️ 🚁 ⬜
⬜ ⬜ ⬜ ➡️ ⬜ ⬜
🎁 ⬜ ⬜ ⬜ ⬜ ⬜
⬜ ⬜ 🦅 🛖 ⬜ ⬜

Attempted pickup failed
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: False
Attempted dropoff failed (no package carried)
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: False
Drone moved to (4, 3) Accumulated Reward: -30
P1 - Picked: False, Delivered: False
P2 - Picked: False, Delivered: 