In [1]:
import numpy as np
import math
import torch

In [2]:
class DroneEnv:
    def __init__(self, width=2000, height=2000, checkpoint_radius=60):
        # ENV:
        self.width = width
        self.height = height
        self.checkpoint_radius = checkpoint_radius
        self.score = 0

        # ENV PARAMS:
        self.GRAVITY = 2
        self.THRUST_POWER = 3.5
        self.X_AXIS_SENS = 50
        self.Y_AXIS_SENS = 1
        self.ROTATION_SPEED = 1
        self.ROTATION_DRAG = 0.7
        self.MOVEMENT_DRAG = 0.5
        self.ACC_THROTLE = 0.4

        # initiate drone:
        self.reset(x=self.width / 2, y=self.height / 2, vx=0, vy=0, ax=0, ay=0, a=0, va=0)

    def reset(self, x, y, vx, vy, ax, ay, a, va):
        self.x = x
        self.y = y
        self.vx = vx
        self.vy = vy
        self.ax = ax
        self.ay = ay
        self.a = a # in degrees
        self.va = va

        self.spawn_checkpoint()

    def spawn_checkpoint(self):
        self.checkpoint_x = np.random.uniform(0.2 * self.width, 0.8 * self.width)
        self.checkpoint_y = np.random.uniform(0.2 * self.height, 0.8 * self.height)

    def step(self, action):
        # get thrust vaues from FFN outputs (actions):
        thrust_left = action[0]
        thrust_right = action[1]

        # PHYSICS:
        torque = thrust_left - thrust_right # factor
        thrust = self.THRUST_POWER * math.cos(math.radians(self.a)) * ((thrust_left + thrust_right) / 2)

        self.va -= self.ROTATION_SPEED * torque
        self.va *= self.ROTATION_DRAG

        self.a += self.va * self.ACC_THROTLE
        self.a %= 360.0

        self.vy += self.GRAVITY * self.ACC_THROTLE
        self.vy -= self.Y_AXIS_SENS * thrust * self.ACC_THROTLE

        angle_rad = math.radians(self.a)
        self.vx = self.X_AXIS_SENS * math.sin(angle_rad)

        self.x += self.MOVEMENT_DRAG * self.vx
        self.y += self.MOVEMENT_DRAG * self.vy

        # STATE SPACE FOR FFN:
        dx = self.checkpoint_x - self.x
        dy = self.checkpoint_y - self.y
        optimal_vector = (dx, dy)
        vel_vector = (self.vx, self.vy)
        acc_vector = (self.ax, self.ay)
        t = torch.nn.Tanh()
        d = self.a - 360 if self.a > 180 else self.a
        ang = t(torch.tensor(d))
        ang_vel = self.va # normalise

        dist = math.sqrt(dx * dx + dy * dy)
        reached = False
        out = False

        if dist < self.checkpoint_radius:
            reached = True
            self.score += 1
            self.spawn_checkpoint()

        out = True if self.x < 0 or self.x > self.width or self.y < 0 or self.y > self.height else False

        state = {
            "opt": optimal_vector, # when closer, optimally, also slower!
            "vel": vel_vector,
            "acc": acc_vector,
            "ang": ang,
            "ang_vel": ang_vel,
            "reached": reached,
            "out": out
        } # reached, out should not be inputs to FFN, only for PyGame
        
        reward = 0 # -math.dist(optimal_vector, (vel_vector)) * 0.5 - ang * 0.1 - dist * 0.01 + reached * 5 - out

        return state, reward

# state space:
# coordinates relative to checkpoints coordinates C = (x, y) -> the optimal vector from drone coordinates D to C: C-D
# drone velocity relative to C as vector
# drone acceleration relative to C as vector
# angular velocity relativ to global coordinate system

# reached
# out: if drone out, the rollout for FFN simply resets, no loss for it?

# loss:
# difference between drone vectors and optimal vector (dist = vec length -> adjusts for velocity, should slow down when close)


In [3]:
import torch
import torch.nn.functional as F
import math

# PID controller
def opt_thrusts(state, min_thrust=0.0, max_thrust=1.0):
    # states:
    opt_vec = torch.tensor(state["opt"], dtype=torch.float32)
    vel_vec = torch.tensor(state["vel"], dtype=torch.float32)
    acc_vec = torch.tensor(state["acc"], dtype=torch.float32)
    a = state["ang"]
    va = torch.tanh(torch.tensor(state["ang_vel"]))

    # computing more values
    limiter = torch.relu(-vel_vec[1]) # factor inside because should not be capped until large
    ya = torch.sigmoid(opt_vec[1] - acc_vec[1])
    yv = torch.sigmoid(opt_vec[1] - vel_vec[1])
    xa = torch.tanh(opt_vec[0] - acc_vec[0])
    xv = torch.tanh(opt_vec[0] - vel_vec[0])

    # factors
    f0 = 1/2
    f1 = 1/10
    f2 = 1/5
    f3 = 1/4.5
    f4 = 0 # not needed
    f5 = 1/35

    # thrusts
    l = 1 - yv + ya*f0 - xv*f1 - xa*f2 + a*f3 + va*f4 - limiter*f5
    r = 1 - yv + ya*f0 + xv*f1 + xa*f2 - a*f3 - va*f4 - limiter*f5

    return l, r


In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.datasets as datasets 
import torchvision.transforms as transforms

print(torch.version.cuda)
device = torch.device("cuda")
torch.cuda.is_available()

11.8


False

In [5]:
input_size = 8
hidden_size0 = 64
hidden_size1 = 32
out_size = 2

In [6]:
class Net(nn.Module):
    def __init__(self, input_size, hidden_size0, hidden_size1, out_size):
        super(Net, self).__init__()
        self.fc0 = nn.Linear(input_size, hidden_size0)     # 8 - 64
        self.fc1 = nn.Linear(hidden_size0, hidden_size1)   # 64 - 32
        self.fc2 = nn.Linear(hidden_size1, out_size)       # 64 - 2
        self.tanh = nn.Tanh()
        self.sigmoid = nn.Sigmoid()
        self.init_weights()

    def init_weights(self):
        nn.init.xavier_uniform_(self.fc0.weight, gain=nn.init.calculate_gain('tanh')) 
        nn.init.xavier_uniform_(self.fc1.weight, gain=nn.init.calculate_gain('tanh'))
        nn.init.xavier_uniform_(self.fc2.weight, gain=nn.init.calculate_gain('tanh'))

    def forward(self, x):
        out = self.tanh(self.fc0(x))
        out = self.tanh(self.fc1(out))
        out = self.sigmoid(self.fc2(out))
        return out

In [7]:
rollout_length = 100
learning_rate = 0.00075

In [8]:
net = Net(input_size, hidden_size0, hidden_size1, out_size)
CUDA = torch.cuda.is_available()
if CUDA:
    net = net.cuda()

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)

In [9]:
# training
from IPython.display import clear_output, display
from itertools import product
import random as ra

drone = DroneEnv()
drone.reset(x=drone.width / 2, y=drone.height / 2, vx=0, vy=0, ax=0, ay=0, a=0, va=0)
state, _ = drone.step([0, 0])

xcoo = [xcoo for xcoo in range(30, drone.width - 30, 65)]
ycoo = [ycoo for ycoo in range(30, drone.height - 30, 65)]
vxs = [vx * 5 for vx in range(-3, 4)] # 7 coordinates
vys = [vy * 5 for vy in range(-3, 4)] # 7 coordinates
angs = [ang / 1.5 for ang in range(-3, 4)] # 7 angles
# axs, ays, vangs will be randomly sampled before each rollout

states = list(product(xcoo, ycoo, vxs, vys, angs))
ra.shuffle(states) # shuffle to sample in random states

for idx0, (x, y, vx, vy, ang) in range(0): #enumerate(states):
    loss = 0
    running_loss = 0

    vang = np.random.uniform(-0.7, 0.7)
    ax, ay = 0, 0
    drone.reset(x, y, vx, vy, ax, ay, ang, vang)
    
    for idx1 in range(rollout_length): # simulate few steps with step function from here:
        opt_vec = torch.tensor(state["opt"], dtype=torch.float32)
        vel_vec = torch.tensor(state["vel"], dtype=torch.float32)
        acc_vec = torch.tensor(state["acc"], dtype=torch.float32)
        a = state["ang"]
        va = torch.tanh(torch.tensor(state["ang_vel"]))
        
        opt = opt_thrusts(state)
        labels = torch.tensor([opt[0], opt[1]])
        input_tensor = torch.cat([opt_vec[0].unsqueeze(0), opt_vec[1].unsqueeze(0), 
                                vel_vec[0].unsqueeze(0), vel_vec[1].unsqueeze(0), 
                                acc_vec[0].unsqueeze(0), acc_vec[1].unsqueeze(0), 
                                a.unsqueeze(0), va.unsqueeze(0)])
        input_tensor = input_tensor.unsqueeze(0)

        if CUDA:
            input_tensor = input_tensor.cuda()
            labels = labels.cuda()

        outputs = net(input_tensor)
        outputs = outputs.squeeze()
        state, _ = drone.step([float(outputs[0]), float(outputs[1])])
        
        loss = criterion(outputs, labels)
        running_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if not (idx0 % 10):
        clear_output(wait=True)
        display(f"epoch: {"ignore:.0f"}, rollout: {idx0} loss: {loss:.4f}, running_loss: {running_loss:.4f}")
        
print("done!")

# torch.save(net.state_dict(), f'drone_AI.pth')


done!


In [10]:
# torch.save(net.state_dict(), f'drone_AI.pth')

In [11]:
# initialise new droneAI with trained params
inf_net = Net(input_size, hidden_size0, hidden_size1, out_size)
CUDA = torch.cuda.is_available()

if CUDA:
    inf_net = inf_net.cuda()

state_dict = torch.load("drone_AI.pth", map_location="cuda" if CUDA else "cpu")
inf_net.load_state_dict(state_dict)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(inf_net.parameters(), lr=learning_rate)

inf_net.eval() # ready for inference


Net(
  (fc0): Linear(in_features=8, out_features=64, bias=True)
  (fc1): Linear(in_features=64, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=2, bias=True)
  (tanh): Tanh()
  (sigmoid): Sigmoid()
)

In [14]:
# VISUALISATION FOR DEBUGGING & INFERENCE:
import pygame
from IPython.display import display, clear_output # because this is .ipynb

# drone
drone_ai = DroneEnv(width=1000, height=1000)
drone_ai.reset(x=drone_ai.width / 2, y=drone_ai.height / 2, vx=0, vy=0, ax=0, ay=0, a=0, va=0)
drone_pid = DroneEnv(width=1000, height=1000)
drone_pid.reset(x=drone_pid.width / 2, y=drone_pid.height / 2, vx=0, vy=0, ax=0, ay=0, a=0, va=0)

# pygame drone_ai
pygame.init()
screen = pygame.display.set_mode((drone_ai.width, drone_ai.height))
pygame.display.set_caption("DroneEnv Visualisation")
clock = pygame.time.Clock()

# constants
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
RED = (255, 0, 0)
TURQUOISE   = (64,224,208)
GREEN = (0, 255, 0)
DRONE_W = 60
DRONE_H = 20

# DRAWING FUNC FOR PYGAME:
def draw_drone(screen, x, y, angle_deg, l, r, col):
    angle = math.radians(angle_deg)
    hw, hh = DRONE_W / 2, DRONE_H / 2
    points = [(-hw, -hh), (hw, -hh), ( hw,  hh), (-hw,  hh)] # vertices of drone (just a block)

    rot = []
    for px, py in points:
        rx = px * math.cos(angle) - py * math.sin(angle)
        ry = px * math.sin(angle) + py * math.cos(angle)
        rot.append((float(x) + rx, float(y) + ry))

    r_start = (rot[3][0] - 5, rot[3][1] - 5)
    l_start = (rot[2][0] + 5, rot[2][1] - 5)
    r_end = (rot[3][0] - 5, rot[3][1] + r*40)
    l_end = (rot[2][0] + 5, rot[2][1] + l*40)

    pygame.draw.polygon(screen, col, rot) # (pygame screen, colour, absolute drone vertices)
    pygame.draw.line(screen, TURQUOISE, r_start, r_end, 3)
    pygame.draw.line(screen, TURQUOISE, l_start, l_end, 3)

# PYGAME VARS:
running = True
pause = False
state, _ = drone_ai.step([0, 0])
state_pid, _ = drone_pid.step([0, 0])

while running:
    # USER CONTROL:
    running = not any(event.type == pygame.QUIT for event in pygame.event.get())

    keys = pygame.key.get_pressed() # user input / alternative to FFN inputs
    thrust_left_human  = 1 if keys[pygame.K_LEFT]  else 0 # thrust control (will both be continuous [0, 1] for FFN outputs, here, for simplicity, only binary)
    thrust_right_human = 1 if keys[pygame.K_RIGHT] else 0 # note: 0.5 thrust should perfectly balance gravity at any height in this drone_ai
    running = False if keys[pygame.K_ESCAPE] else running
    pause, timeout = (not pause, 200) if keys[pygame.K_p] else pause, 0
    pygame.time.wait(timeout) # to prevent spamming keys

    # FRAME UPDATES:
    opt_vec = torch.tensor(state["opt"], dtype=torch.float32)
    vel_vec = torch.tensor(state["vel"], dtype=torch.float32)
    acc_vec = torch.tensor(state["acc"], dtype=torch.float32)
    a = state["ang"]
    va = torch.tanh(torch.tensor(state["ang_vel"]))
    
    # state vector to tensor for FFN:
    input_tensor = torch.cat([opt_vec[0].unsqueeze(0), opt_vec[1].unsqueeze(0), 
                            vel_vec[0].unsqueeze(0), vel_vec[1].unsqueeze(0), 
                            acc_vec[0].unsqueeze(0), acc_vec[1].unsqueeze(0), 
                            a.unsqueeze(0), va.unsqueeze(0)])

    if CUDA:
        input_tensor = input_tensor.cuda()
        
    output = inf_net(input_tensor) # FFN prediction
    thrust_left, thrust_right = float(output[0]), float(output[1])
    pid_output = opt_thrusts(state_pid)
    thrust_left_pid, thrust_right_pid = float(pid_output[0]), float(pid_output[1]) # PID controller

    state, _ = drone_ai.step([thrust_left, thrust_right])
    state_pid, _ = drone_pid.step([thrust_left_human, thrust_right_human])

    if not pause and state["out"]: # when outside of box
        drone_ai.reset(x=drone_ai.width / 2, y=drone_ai.height / 2, vx=0, vy=0, ax=0, ay=0, a=0, va=0)

    elif not pause and state_pid["out"]: # when outside of box
        drone_pid.reset(x=drone_pid.width / 2, y=drone_pid.height / 2, vx=0, vy=0, ax=0, ay=0, a=0, va=0)

    screen.fill(BLACK)
    
    pygame.draw.circle(screen, GREEN, (int(drone_ai.checkpoint_x), int(drone_ai.checkpoint_y)), drone_ai.checkpoint_radius, 2)
    draw_drone(screen, drone_ai.x, drone_ai.y, drone_ai.a, thrust_left, thrust_right, GREEN) # drone with new state
    
    pygame.draw.circle(screen, RED, (int(drone_pid.checkpoint_x), int(drone_pid.checkpoint_y)), drone_pid.checkpoint_radius, 2)
    draw_drone(screen, drone_pid.x, drone_pid.y, drone_pid.a, thrust_left_human, thrust_right_human, RED) # drone with new state
    
    pygame.display.flip()
    clock.tick(60)

pygame.quit()

print(f"AI: {drone_ai.score}, PID: {drone_pid.score}" )


AI: 8, PID: 2
