In [3]:
import random
import gym
import serial
import numpy as np
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy

In [4]:
class MicroSwimmerV0(gym.Env):
    # Define the initialization function
    def __init__(self):

        self.action_space = gym.spaces.Discrete(9)

        self.observation_space = gym.spaces.Dict({
            'agent_location': gym.spaces.Box(low=0, high=10, shape=(2,), dtype=np.float32),
            'target_location': gym.spaces.Box(low=0, high=10, shape=(2,), dtype=np.float32),
        })

        self.pool_width = 10
        self.pool_height = 10
        self.target_location = np.array([random.randint(5, 10), random.randint(5, 10)])
        self.agent_location = np.array([2, 2])
        self.max_episode_steps = 20
        self.steps = 0

    # Define the step function that takes an action as input and returns the next state, reward, done, and info
    def step(self, action):

        # calculate distance to target as previous distance
        previous_distance = np.linalg.norm(self.agent_location - self.target_location)

        # move microswimmer with action
        movement = self.get_movement_from_action(action)

        self.agent_location = self.agent_location + movement

        # calculate distance to target as current distance
        current_distance = np.linalg.norm(self.agent_location - self.target_location)

        # calculate reward
        reward = self.calculate_reward(previous_distance, current_distance)

        # update steps
        self.steps += 1

        # determine if episode is terminated
        done = (current_distance < 1.5) or (self.steps >= self.max_episode_steps) or (current_distance == 0)

        # return next state, reward, done, and info

        new_obs = {
            'agent_location': self.agent_location,
            'target_location': self.target_location
        }

        return new_obs, reward, done, {}

    # Define the calculate_reward function
    def calculate_reward(self, previous_distance, current_distance):

        if current_distance < previous_distance:
            reward = 2
        elif current_distance == previous_distance:
            reward = -1
        else:
            reward = -2

        if current_distance < 1.5:
            reward = 15

        return reward

    # Define the get_movement_from_action function
    def get_movement_from_action(self, action):
        action_scalar = int(action)  # Convert action to integer
        movement_dict = {
            0: np.array([0, 0]),  # no movement x
            1: np.array([0, 1]),  # down d
            2: np.array([1, 1]),  # up-right e
            3: np.array([1, 0]),  # right r
            4: np.array([1, -1]),  # down-right c
            5: np.array([0, -1]),  # up u
            6: np.array([-1, -1]),  # down-left z
            7: np.array([-1, 0]),  # left l
            8: np.array([-1, 1])  # up-left q
        }
        return movement_dict[action_scalar]

    # Define the render function
    def render(self):

        pool_size = 10
        # Create an empty pool image
        # print episode, step, and score information
        pool_img = np.zeros((self.pool_height, self.pool_width))

        # Draw the target location
        target_pos = (int(self.target_location[0]), int(self.target_location[1]))
        pool_img[target_pos[1], target_pos[0]] = 1.0

        # Convert the continuous position to integer indices
        microswimmer_pos = (int(self.agent_location[0]), int(self.agent_location[1]))
        microswimmer_pos = np.clip(microswimmer_pos, 0, pool_size - 1)  # Clip the indices to valid range

        # Draw the current location of the microswimmer
        pool_img[microswimmer_pos[1], microswimmer_pos[0]] = 0.5

        # Print the pool image
        for row in pool_img:
            for val in row:
                if val == 1.0:
                    print("T", end=" ")  # Target location
                elif val == 0.5:
                    print("M", end=" ")  # Microswimmer location
                else:
                    print(".", end=" ")  # Empty space
            print()

    # Define the reset function
    def reset(self):

        self.agent_location = np.array([random.randint(0,9), random.randint(0,9)])
        self.target_location = np.array([random.randint(0, 9), random.randint(0, 9)])

        return {
            'agent_location': self.agent_location,
            'target_location': self.target_location
        }

    # Define the close function
    def close(self):
        pass


In [7]:
def send_action_to_arduino(act):
    # send action to arduino
    # ser = serial.Serial('/dev/cu.usbserial-10', 9600, timeout=1)
    direction = get_direction_from_action(act)
    # ser.write(direction.encode())

    print(direction)


def get_direction_from_action(action):
    action_scalar = int(action)  # Convert action to integer
    movement_dict = {
        0: "x",
        1: "d",
        2: "e",
        3: "r",
        4: "c",
        5: "u",
        6: "z",
        7: "l",
        8: "q"
    }
    return movement_dict[int(action)]

In [8]:
env = MicroSwimmerV0()

model = DQN('MultiInputPolicy', env, verbose=1, learning_rate=1e-3)
model.learn(total_timesteps=100000)
model.save("2D_Env")

mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=10)
print(f"Mean reward: {mean_reward}")

del model

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


TypeError: string indices must be integers