In [None]:
import torch
import stable_baselines3
import sys

print("python version:", sys.version)
print("stable_baselines3 version:", stable_baselines3.__version__)
print("torch version:", torch.__version__)
print("cuda available:", torch.cuda.is_available())
print("cuda version:", torch.version.cuda)
print("cudnn version:", torch.backends.cudnn.version())

# 2D Quadcopter Gym Environment

In [None]:
import gym
from gym import spaces
import numpy as np
import matplotlib.pyplot as plt
import cv2

class QuadcopterEnv(gym.Env):
    def __init__(self):
        # Define the action space and observation space
        self.action_space = spaces.MultiBinary(2) #spaces.Box(low=0, high=1, shape=(2,))
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(6,))

        # Define any other environment-specific parameters
        self.max_steps = 500  # Maximum number of steps in an episode
        self.dt = 0.01  # Time step duration
        self.goal_threshold = 0.1  # Threshold for considering the goal reached

        # Define any other necessary variables
        self.state = np.zeros(6)
        self.step_count = 0

        # Initialize the display window
        self.window_created = False
        self.window_name = "Quadcopter Position"
        self.img_size = 800
        self.scale = self.img_size / 20.0
        self.action = (0, 0)

        # videowriter
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        self.writer = cv2.VideoWriter('output.mp4', fourcc, 100, (self.img_size, self.img_size))
        self.recording = False

    def reset(self):
        # Initialize the state and step count
        x0     = np.random.uniform(-5,5)
        y0     = np.random.uniform(-5,5)
        vx0    = np.random.uniform(-1,1)
        vy0    = np.random.uniform(-1,1)
        theta0 = np.random.uniform(-1,1)
        q0     = np.random.uniform(-1,1)
        
        self.state = np.array([x0,y0,vx0,vy0,theta0,q0])  # Initial state of the quadcopter
        self.step_count = 0
        return self.state

    def step(self, action):
        # Update the state based on the action
        u1, u2 = action
        x, z, vx, vz, theta, q = self.state

        x += vx * self.dt
        z += vz * self.dt
        vx += (-0.5 * vx + (-1.51670951156812 * u1 - 1.51670951156812 * u2 - 9.04884318766067) * np.sin(theta)) * self.dt
        vz += (-0.5 * vz + (1.51670951156812 * u1 + 1.51670951156812 * u2 + 9.04884318766067) * np.cos(theta) - 9.81) * self.dt
        theta += q * self.dt
        q += (-38.0032206119163 * u1 + 38.0032206119163 * u2) * self.dt

        # Update the state and step count
        self.state = np.array([x, z, vx, vz, theta, q])
        self.step_count += 1

        # Update the action (for rendering purposes)
        self.action = action

        # Calculate the reward
        reward = -0.1 * x**2 - 0.1 * z**2
        
        goal_reached = np.linalg.norm(self.state) < self.goal_threshold
        if goal_reached:
            reward = 100
        
        out_of_bound = np.abs(x) > 10 or np.abs(z) > 10
        if out_of_bound:
            reward = -100

        # Check if the episode is done
        done = self.step_count >= self.max_steps or goal_reached or out_of_bound

        return self.state, reward, done, {}
    
    def map_to_image(self, x, z):
        # Map the quadcopter's position to the image coordinates
        x = int(x * self.scale + self.img_size / 2)
        z = int(self.img_size - z * self.scale - self.img_size / 2)
        return x, z

    def render(self, mode='human'):
        if not self.window_created:
            # Create the named window and move it to a specific location
            cv2.namedWindow(self.window_name)
            self.window_created = True
        # Create a black image as the background
        image = 255*np.ones((self.img_size, self.img_size, 3), dtype=np.uint8)

        # Define left and right motors' positions
        x, z, vx, vz, theta, q = self.state
        length = 0.2 # Length of the quadcopter's arm
        direction = np.array([np.cos(theta), np.sin(theta)])
        left_motor = np.array([x, z]) - length * direction
        right_motor = np.array([x, z]) + length * direction

        # Define the thrust vectors
        u1, u2 = self.action
        left_thrust = u1*np.array([np.cos(theta+np.pi/2), np.sin(theta+np.pi/2)])*length
        right_thrust = u2*np.array([np.cos(theta+np.pi/2), np.sin(theta+np.pi/2)])*length

        # Draw the quadcopter as a line segment
        x1, z1 = self.map_to_image(left_motor[0], left_motor[1])
        x2, z2 = self.map_to_image(right_motor[0], right_motor[1])
        
        # Draw cross in the origin
        size = 0.2
        cv2.line(image, self.map_to_image(0,size), self.map_to_image(0, -size), (200, 200, 200), 2)
        cv2.line(image, self.map_to_image(size,0), self.map_to_image(-size, 0), (200, 200, 200), 2)

        # Draw the action as an arrow
        cv2.arrowedLine(image, (x1, z1), (x1+int(left_thrust[0]*self.scale), z1-int(left_thrust[1]*self.scale)), (0, 0, 255), 2)
        cv2.arrowedLine(image, (x2, z2), (x2+int(right_thrust[0]*self.scale), z2-int(right_thrust[1]*self.scale)), (0, 0, 255), 2)
        cv2.line(image, (x1, z1), (x2, z2), (255, 0, 0), 2)

        key = cv2.waitKeyEx(1)

        # Check if the "Esc" key is pressed
        if key == 27:  # 27 is the ASCII code for the "Esc" key
            cv2.destroyAllWindows()
            self.window_created = False
            raise KeyboardInterrupt("Esc key pressed")
        # record if 'r' is pressed
        if key == ord('r'):
            self.recording = not self.recording
            if self.recording:
                print("Recording started")
            else:
                self.writer.release()
        if self.recording:
            self.writer.write(image)
            # write text on the image
            cv2.putText(image, "Recording", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        # Show the image in the OpenCV window
        cv2.imshow(self.window_name, image)
        return image
    
    def get_drone_state(self):
        # Outputs a dict containing all information for rendering
        state_dict = dict(zip(['x', 'z', 'vx', 'vz', 'theta', 'q'], self.state))
        u1, u2 = self.action
        action = u2, u2, u1, u1 
        action_dict = dict(zip(['u1', 'u2', 'u3', 'u4'], action))
        return {**state_dict, **action_dict}

# Define and Train the PPO agent

In [None]:
import os
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from datetime import datetime

models_dir = 'models/ppo_2Dquad'
log_dir = 'logs/ppo_2Dquad'

if not os.path.exists(models_dir):
    os.makedirs(models_dir)
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

# Date and time string for unique folder names
datetime_str = datetime.now().strftime("%Y%m%d-%H%M%S")

# Create SubprocVecEnv for parallelization
env = make_vec_env(lambda: QuadcopterEnv(), n_envs=20)

# ReLU net with 3 hidden layers of size 120
policy_kwargs = dict(activation_fn=torch.nn.ReLU, net_arch=[120, 120, 120])
model = PPO("MlpPolicy", env, policy_kwargs=policy_kwargs, verbose=0, tensorboard_log=log_dir)

# Model summary
print(model.policy)

# Train the agent
TIMESTEPS = 10000
for i in range(1,100):
    model.learn(total_timesteps=TIMESTEPS, reset_num_timesteps=False, tb_log_name='PPO_'+datetime_str)
    model.save(models_dir + '/PPO_' + datetime_str + '/' + str(i))

In [None]:
from stable_baselines3 import PPO
folder = 'models/ppo_2Dquad/PPO_20230627-180454'
env = QuadcopterEnv()
model = PPO.load(folder + "/999", env=env)

episodes = 100

for ep in range(episodes):
	obs = env.reset()
	done = False
	while True:
		# pass observation to model to get predicted action
		action, _states = model.predict(obs)

		# pass action to env and get info back
		obs, rewards, done, info = env.step(action)
		
		# reset if done
		if done:
			obs = env.reset()

		# reset env if 'r' key is pressed
		# if cv2.waitKey(1) & 0xFF == ord('r') or done:
		# 	obs = env.reset()
			
		# show the environment on the screen
		env.render()
env.close()
cv2.destroyAllWindows()

In [6]:
from quadcopter_animation import animation
# Run the trained agent
env.reset()
done = False

def run():
    global done
    if done:
        env.reset()
    # try model predict and otherwise print env.state
    try:
        action, _ = model.predict(env.state)
    except:
        print(env.state)
        raise Exception("Model prediction failed")
    state, reward, done, _ = env.step(action)
    return env.get_drone_state()

animation.view(run)

QObject::moveToThread: Current thread (0x691245c0) is not the object's thread (0xbb32f260).
Cannot move to target thread (0x691245c0)

QObject::moveToThread: Current thread (0x691245c0) is not the object's thread (0xbb32f260).
Cannot move to target thread (0x691245c0)

QObject::moveToThread: Current thread (0x691245c0) is not the object's thread (0xbb32f260).
Cannot move to target thread (0x691245c0)

QObject::moveToThread: Current thread (0x691245c0) is not the object's thread (0xbb32f260).
Cannot move to target thread (0x691245c0)

QObject::moveToThread: Current thread (0x691245c0) is not the object's thread (0xbb32f260).
Cannot move to target thread (0x691245c0)

QObject::moveToThread: Current thread (0x691245c0) is not the object's thread (0xbb32f260).
Cannot move to target thread (0x691245c0)

QObject::moveToThread: Current thread (0x691245c0) is not the object's thread (0xbb32f260).
Cannot move to target thread (0x691245c0)

QObject::moveToThread: Current thread (0x691245c0) is n