In [2]:
import numpy as np
import math
import torch

In [3]:
class DroneEnv:
    def __init__(self, width=1000, height=1000, checkpoint_radius=60):
        # ENV:
        self.width = width
        self.height = height
        self.checkpoint_radius = checkpoint_radius

        # ENV PARAMS:
        self.GRAVITY = 2
        self.THRUST_POWER = 3.5
        self.X_AXIS_SENS = 50
        self.Y_AXIS_SENS = 1
        self.ROTATION_SPEED = 1
        self.ROTATION_DRAG = 0.7
        self.MOVEMENT_DRAG = 0.5
        self.ACC_THROTLE = 0.4

        # initiate drone:
        self.reset(x=self.width / 2, y=self.height / 2, vx=0, vy=0, ax=0, ay=0, a=0, va=0)

    def reset(self, x, y, vx, vy, ax, ay, a, va):
        self.x = x
        self.y = y
        self.vx = vx
        self.vy = vy
        self.ax = ax
        self.ay = ay
        self.a = a # in degrees
        self.va = va

        self.spawn_checkpoint()

    def spawn_checkpoint(self):
        self.checkpoint_x = np.random.uniform(0.1 * self.width, 0.9 * self.width)
        self.checkpoint_y = np.random.uniform(0.1 * self.height, 0.9 * self.height)

    def step(self, action):
        # get thrust vaues from FFN outputs (actions):
        thrust_left = action[0]
        thrust_right = action[1]

        # PHYSICS:
        torque = thrust_left - thrust_right # factor
        thrust = self.THRUST_POWER * math.cos(math.radians(self.a)) * ((thrust_left + thrust_right) / 2)

        self.va -= self.ROTATION_SPEED * torque
        self.va *= self.ROTATION_DRAG

        self.a += self.va * self.ACC_THROTLE
        self.a %= 360.0

        self.vy += self.GRAVITY * self.ACC_THROTLE
        self.vy -= self.Y_AXIS_SENS * thrust * self.ACC_THROTLE

        angle_rad = math.radians(self.a)
        self.vx = self.X_AXIS_SENS * math.sin(angle_rad)

        self.x += self.MOVEMENT_DRAG * self.vx
        self.y += self.MOVEMENT_DRAG * self.vy

        # STATE SPACE FOR FFN:
        dx = self.checkpoint_x - self.x
        dy = self.checkpoint_y - self.y
        optimal_vector = (dx, dy)
        vel_vector = (self.vx, self.vy)
        acc_vector = (self.ax, self.ay)
        t = torch.nn.Tanh()
        d = self.a - 360 if self.a > 180 else self.a
        ang = t(torch.tensor(d))
        ang_vel = self.va # normalise

        dist = math.sqrt(dx * dx + dy * dy)
        reached = False
        out = False

        if dist < self.checkpoint_radius:
            reached = True
            self.spawn_checkpoint()

        out = True if self.x < 0 or self.x > self.width or self.y < 0 or self.y > self.height else False

        state = {
            "opt": optimal_vector, # when closer, optimally, also slower!
            "vel": vel_vector,
            "acc": acc_vector,
            "ang": ang,
            "ang_vel": ang_vel,
            "reached": reached,
            "out": out
        } # reached, out should not be inputs to FFN, only for PyGame
        
        reward = 0 # -math.dist(optimal_vector, (vel_vector)) * 0.5 - ang * 0.1 - dist * 0.01 + reached * 5 - out

        return state, reward

# state space:
# coordinates relative to checkpoints coordinates C = (x, y) -> the optimal vector from drone coordinates D to C: C-D
# drone velocity relative to C as vector
# drone acceleration relative to C as vector
# angular velocity relativ to global coordinate system

# reached
# out: if drone out, the rollout for FFN simply resets, no loss for it?

# loss:
# difference between drone vectors and optimal vector (dist = vec length -> adjusts for velocity, should slow down when close)


In [None]:
import torch
import torch.nn.functional as F
import math

# PID controller
def opt_thrusts(state, min_thrust=0.0, max_thrust=1.0):
    # states:
    opt_vec = torch.tensor(state["opt"], dtype=torch.float32)
    vel_vec = torch.tensor(state["vel"], dtype=torch.float32)
    acc_vec = torch.tensor(state["acc"], dtype=torch.float32)
    a = state["ang"]
    va = torch.tanh(torch.tensor(state["ang_vel"]))

    # computing more values
    limiter = torch.relu(-vel_vec[1]) # factor inside because should not be capped until large
    ya = torch.sigmoid(opt_vec[1] - acc_vec[1])
    yv = torch.sigmoid(opt_vec[1] - vel_vec[1])
    xa = torch.tanh(opt_vec[0] - acc_vec[0])
    xv = torch.tanh(opt_vec[0] - vel_vec[0])

    # factors
    f0 = 1/2
    f1 = 1/10
    f2 = 1/5
    f3 = 1/4.5
    f4 = 0 # not needed
    f5 = 1/35

    # thrusts
    l = 1 - yv + ya*f0 - xv*f1 - xa*f2 + a*f3 + va*f4 - limiter*f5
    r = 1 - yv + ya*f0 + xv*f1 + xa*f2 - a*f3 - va*f4 - limiter*f5

    return l, r


In [25]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.datasets as datasets 
import torchvision.transforms as transforms

print(torch.version.cuda)
device = torch.device("cuda")
torch.cuda.is_available()

12.1


True

In [66]:
input_size = 8
hidden_size0 = 32
hidden_size1 = 16
out_size = 2

In [67]:
class Net(nn.Module):
    def __init__(self, input_size, hidden_size0, hidden_size1, out_size):
        super(Net, self).__init__()
        self.fc0 = nn.Linear(input_size, hidden_size0)     # 8 - 128
        self.fc1 = nn.Linear(hidden_size0, hidden_size1)   # 128 - 64
        self.fc2 = nn.Linear(hidden_size1, out_size)       # 64 - 2
        self.tanh = nn.Tanh()
        self.sigmoid = nn.Sigmoid()
        self.init_weights()

    def init_weights(self):
        nn.init.xavier_uniform_(self.fc0.weight, gain=nn.init.calculate_gain('tanh')) 
        nn.init.xavier_uniform_(self.fc1.weight, gain=nn.init.calculate_gain('tanh'))
        nn.init.xavier_uniform_(self.fc2.weight, gain=nn.init.calculate_gain('tanh'))

    def forward(self, x):
        out = self.tanh(self.fc0(x))
        out = self.tanh(self.fc1(out))
        out = self.sigmoid(self.fc2(out))
        return out

In [None]:
epochs = 10000
rollout_length = 20
learning_rate = 0.001

In [69]:
net = Net(input_size, hidden_size0, hidden_size1, out_size)
CUDA = torch.cuda.is_available()
if CUDA:
    net = net.cuda()

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)

In [71]:
# training

drone = DroneEnv()
drone.reset(x=drone.width / 2, y=drone.height / 2, vx=0, vy=0, ax=0, ay=0, a=0, va=0)
state, _ = drone.step([0, 0])

for epoch in range(epochs):
    loss = 0
    running_loss = 0
    
    for _ in range (rollout_length):
        opt_vec = torch.tensor(state["opt"], dtype=torch.float32)
        vel_vec = torch.tensor(state["vel"], dtype=torch.float32)
        acc_vec = torch.tensor(state["acc"], dtype=torch.float32)
        a = state["ang"]
        va = torch.tanh(torch.tensor(state["ang_vel"]))
        
        opt = opt_thrusts(state)
        labels = torch.tensor([opt[0], opt[1]])
        input_tensor = torch.cat([opt_vec[0].unsqueeze(0), opt_vec[1].unsqueeze(0), 
                                vel_vec[0].unsqueeze(0), vel_vec[1].unsqueeze(0), 
                                acc_vec[0].unsqueeze(0), acc_vec[1].unsqueeze(0), 
                                a.unsqueeze(0), va.unsqueeze(0)])

        if CUDA:
            input_tensor = input_tensor.cuda()
            labels = labels.cuda()

        outputs = net(input_tensor)
        state, _ = drone.step([float(outputs[0]), float(outputs[1])])
        
        loss = criterion(outputs, labels)
        running_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
                
        print(f"loss: {loss}, running_loss: {running_loss}")

    # init new state for next rollout -> function takes in epoch as index and returns new starting state for reset function.

print("done!")

torch.save(net.state_dict(), f'drone_AI.pth')


loss: 1.295528769493103, running_loss: 1.295528769493103
loss: 1.3187562227249146, running_loss: 2.6142849922180176
loss: 1.346513271331787, running_loss: 3.9607982635498047
loss: 1.368686318397522, running_loss: 5.329484581947327
loss: 1.3815593719482422, running_loss: 6.711043953895569
loss: 1.386620044708252, running_loss: 8.09766399860382
loss: 1.387122392654419, running_loss: 9.48478639125824
loss: 1.3855278491973877, running_loss: 10.870314240455627
loss: 1.38326096534729, running_loss: 12.253575205802917
loss: 1.381231427192688, running_loss: 13.634806632995605
loss: 1.3801844120025635, running_loss: 15.014991044998169
loss: 1.3805575370788574, running_loss: 16.395548582077026
loss: 1.3818185329437256, running_loss: 17.777367115020752
loss: 1.3827582597732544, running_loss: 19.160125374794006
loss: 1.3830164670944214, running_loss: 20.543141841888428
loss: 1.3827431201934814, running_loss: 21.92588496208191
loss: 1.3821711540222168, running_loss: 23.308056116104126
loss: 1.37700

KeyboardInterrupt: 

In [38]:
# initialise new droneAI with trained params





In [None]:
# VISUALISATION FOR DEBUGGING & INFERENCE:
import pygame
from IPython.display import display, clear_output # because this is .ipynb

# drone
env = DroneEnv(width=1000, height=1000)
env.reset(x=env.width / 2, y=env.height / 2, vx=0, vy=0, ax=0, ay=0, a=0, va=0)

# pygame env
pygame.init()
screen = pygame.display.set_mode((env.width, env.height))
pygame.display.set_caption("DroneEnv Visualisation")
clock = pygame.time.Clock()

# constants
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
RED   = (255, 0, 0)
GREEN = (0, 255, 0)
DRONE_W = 60
DRONE_H = 20

# DRAWING FUNC FOR PYGAME:
def draw_drone(screen, x, y, angle_deg):
    angle = math.radians(angle_deg)
    hw, hh = DRONE_W / 2, DRONE_H / 2
    points = [(-hw, -hh), ( hw, -hh), ( hw,  hh), (-hw,  hh)] # vertices of drone (just a block)

    rot = []
    for px, py in points:
        rx = px * math.cos(angle) - py * math.sin(angle)
        ry = px * math.sin(angle) + py * math.cos(angle)
        rot.append((float(x) + rx, float(y) + ry))
        #print(x, y)

    pygame.draw.polygon(screen, WHITE, rot) # (pygame screen, colour, absolute drone vertices)

# PYGAME VARS:
running = True
pause = False
state, reward = env.step([0, 0])

while running:
    # USER CONTROL:
    running = not any(event.type == pygame.QUIT for event in pygame.event.get())

    keys = pygame.key.get_pressed() # user input / alternative to FFN inputs
    thrust_left  = 1 if keys[pygame.K_LEFT]  else 0 # thrust control (will both be continuous [0, 1] for FFN outputs, here, for simplicity, only binary)
    thrust_right = 1 if keys[pygame.K_RIGHT] else 0 # note: 0.5 thrust should perfectly balance gravity at any height in this env
    running = False if keys[pygame.K_ESCAPE] else running
    pause, timeout = (not pause, 200) if keys[pygame.K_p] else pause, 0
    pygame.time.wait(timeout) # to prevent spamming keys

    # FRAME UPDATES:
    thrust_left, thrust_right = opt_thrusts(state) # PID controller

    opt_vec = torch.tensor(state["opt"], dtype=torch.float32)
    vel_vec = torch.tensor(state["vel"], dtype=torch.float32)
    acc_vec = torch.tensor(state["acc"], dtype=torch.float32)
    a = state["ang"]
    va = torch.tanh(torch.tensor(state["ang_vel"]))
    labels = opt_thrusts(state)
    input_tensor = torch.tensor([opt_vec, vel_vec, acc_vec, a, va])

    if CUDA:
        input_tensor = input_tensor.cuda()
        
    #output = droneAI(input_tensor)
    #thrust_left, thrust_right = float(output[0]), float(output[0])

    state, reward = env.step([thrust_left, thrust_right])

    if not pause and state["out"]: # when outside of box
        env.reset(x=env.width / 2, y=env.height / 2, vx=0, vy=0, ax=0, ay=0, a=0, va=0)

    screen.fill(BLACK)
    pygame.draw.circle(screen, GREEN, (int(env.checkpoint_x), int(env.checkpoint_y)), env.checkpoint_radius, 2)
    draw_drone(screen, env.x, env.y, env.a) # drone with new state
    pygame.display.flip()
    clock.tick(60)

pygame.quit()


  ang = t(torch.tensor(d))
  va = torch.tanh(torch.tensor(state["ang_vel"]))
