In [3]:
# imports for training data
!if ( ! ls . | grep pytransform ); then git clone https://github.com/mhask94/pytransform.git; fi
# !git clone https://github.com/mhask94/pytransform.git
from pytransform.common import skew
from pytransform.quaternion import Quaternion as Quat
import numpy as np

Cloning into 'pytransform'...
remote: Enumerating objects: 11, done.[K
remote: Counting objects: 100% (11/11), done.[K
remote: Compressing objects: 100% (6/6), done.[K
remote: Total 62 (delta 5), reused 11 (delta 5), pack-reused 51
Unpacking objects: 100% (62/62), done.


In [0]:
# imports for pytorch
import torch
import torch.nn as nn

In [0]:
# classes for quadrotor state and dynamics
class State():
    def __init__(self, arr=np.empty(0)):
        if len(arr) == 0:
            self.arr = np.zeros((10,1), dtype=np.float64)
            self.arr[3] = 1
        else:
            assert arr.shape == (10, 1)
            if not arr.dtype == np.float64:
              arr = np.array(arr, dtype=np.float64)
            arr.dtype = np.float64
            self.arr = arr

    def __getitem__(self, position):
        return self.arr[position]
    def __str__(self):
        s = 'p: ' + str(self.p.flatten()) + '\nq: ' + self.q.__str__() + \
                '\nv: ' + str(self.v.flatten())
        s = s.replace('[ ', '[')
        s = s.replace(', ', ' ')
        s = s.replace(' ]', ']')
        return s
    def __repr__(self):
        return self.__str__()
    def __add__(self, other):
        assert other.shape == (9, 1)
        out = np.empty(self.arr.shape)
        out[:3]  = self.p + other[:3]
        out[3:7] = (self.q + other[3:6]).elements
        out[7:]  = self.v + other[6:]
        return State(out)
    def __iadd__(self, other):
        assert other.shape == (9, 1)
        self.arr[:3] += other[:3]
        self.arr[3:7] = (self.q + other[3:6]).elements
        self.arr[7:] += other[6:]
        return self
    @property
    def p(self):
        return self.arr[:3]
    @property
    def q(self):
        return Quat(self.arr[3:7])
    @property
    def v(self):
        return self.arr[7:]
    @property
    def elements(self):
        return self.arr.copy()
    def copy(self):
        return State(self.arr.copy())

class Dynamics():
    def __init__(self):
        self.k1 = np.zeros((9,1))
        self.k2 = np.zeros((9,1))
        self.k3 = np.zeros((9,1))
        self.k4 = np.zeros((9,1))
        self.cd = 0.1
        e_z = np.array([[0,0,1]]).T
        self.g = 9.8065 * e_z
        self.se = 0.5

    def run(self, xu, dt):
        x,u = State(xu[:10]), xu[10:]
        self.k1 = self.f(x, u)
        self.k2 = self.f(x + self.k1*(dt/2), u)
        self.k3 = self.f(x + self.k2*(dt/2), u)
        self.k4 = self.f(x + self.k3*dt, u)
        x += (self.k1 + 2*(self.k2 + self.k3) + self.k4) * (dt/6)
        return x

    def f(self, x, u):
        s, w = u[0], u[1:]
        dx = np.empty(self.k1.shape)
        dx[:3] = x.q.rota(x.v)
        dx[3:6] = w
        dx[6:] = -self.g*(s/self.se) - self.cd*x.v + x.q.rotp(self.g) - \
                skew(w) @ x.v
        return dx

    @property
    def state(self):
        return self.x.copy()

In [0]:
class DataGenerator():
  def __init__(self, batch_size=50):
    self.batch_size = batch_size
    self.pos_lim = 300
    self.att_lim = np.pi
    self.vel_lim = 10
    self.rate_lim = 2*np.pi
    self.s_lim = 1
    self.dyn = Dynamics()

  def getRandomInput(self):
    xu = np.empty((14,1))
    xu[:2] = np.random.uniform(-self.pos_lim, self.pos_lim, (2,1))
    xu[2] = np.random.uniform(-self.pos_lim, 0)
    mask = np.random.uniform(size=3) > 0.2
    euler = np.random.uniform(-self.att_lim, self.att_lim, 3) * mask
    xu[3:7] = Quat.from_euler(*euler).elements
    xu[7:10] = np.random.uniform(-self.vel_lim, self.vel_lim, (3,1))
    xu[10] = np.random.uniform(0, self.s_lim)
    xu[11:] = np.random.uniform(-self.rate_lim, self.rate_lim, (3,1))
    return xu

  def getBatch(self):
    batch_in = np.empty((self.batch_size, 1, 14, 1))
    batch_out = np.empty((self.batch_size, 1, 10, 1))
    for i in range(self.batch_size):
      batch_in[i,0] = self.getRandomInput()
      batch_out[i,0] = self.dyn.run(batch_in[i,0], 0.01).elements
    return batch_in, batch_out

In [56]:
def testGen():
  data_gen = DataGenerator(batch_size=2)
  ran = data_gen.getRandomInput()
  print('rand: ', ran)

  x, truth = data_gen.getBatch()
  print('x: \n', x)
  print('truth: \n', truth)

testGen()

rand:  [[-2.77005722e+02]
 [-6.25429536e+00]
 [-2.40041210e+02]
 [ 3.01457333e-02]
 [-3.52175583e-01]
 [ 7.97813085e-02]
 [ 9.32039986e-01]
 [-7.02200149e+00]
 [-7.38796821e+00]
 [-8.85306413e+00]
 [ 5.30992424e-01]
 [-3.51855717e+00]
 [ 4.37743989e+00]
 [ 1.47724905e+00]]
x: 
 [[[[-4.00206898e+01]
   [-2.47253451e+02]
   [-1.92222032e+02]
   [ 5.41198380e-01]
   [-8.40516550e-01]
   [ 1.09723358e-03]
   [-2.51999578e-02]
   [ 3.51785727e+00]
   [-1.25172220e+00]
   [ 4.47370697e+00]
   [ 4.72194621e-01]
   [ 2.89448780e+00]
   [ 4.39333388e+00]
   [-2.48102119e+00]]]


 [[[-6.75265329e+01]
   [ 2.41993013e+02]
   [-1.36742061e+01]
   [ 4.72621039e-01]
   [ 4.86522496e-01]
   [-5.38981929e-01]
   [ 4.99423362e-01]
   [-7.25580703e+00]
   [ 1.02034327e+01]
   [ 9.43528821e+00]
   [ 6.82203299e-01]
   [ 4.17863570e+00]
   [-5.43169188e+00]
   [ 6.11870916e+00]]]]
truth: 
 [[[[-4.00206898e+01]
   [-2.47253451e+02]
   [-1.92222032e+02]
   [ 5.41198380e-01]
   [-8.40516550e-01]
   [ 1.09723

In [23]:
def testConvTransposeSize():
  inputs = 14
  x_test = torch.zeros(1,1,inputs,1)
  up = nn.ConvTranspose2d(1, 1, (2,2*inputs), padding=0, stride=2)
  up_test = up(x_test)
  print('up: ', up_test.size())

  down = nn.Conv2d(1,1, (1,inputs*2), padding=0, stride=3)
  down_test = down(up_test)
  print('down: ', down_test.size())

testConvTransposeSize()

up:  torch.Size([1, 1, 28, 28])
down:  torch.Size([1, 1, 10, 1])


In [0]:
from IPython.core.debugger import set_trace
class UBlock(nn.Module):
  def __init__(self, c_in, c_out, after='none'):
    super(UBlock, self).__init__()
    self.main_net = nn.Sequential(
        nn.Conv2d(c_in, c_out, (3,3), padding=(1,1)),
        nn.ReLU(),
        nn.Conv2d(c_out, c_out, (3,3), padding=(1,1)),
        nn.ReLU(),
    )
    self.after = after
    if after == 'up':
      self.after_net = nn.ConvTranspose2d(c_out, c_out//2, (2,2), padding=0, stride=2)
    elif after == 'down':
      self.after_net = nn.Conv2d(c_out, c_out, (3,3), padding=(1,1), stride=2)
    elif after == 'end':
      self.after_net = nn.Conv2d(c_out,1, (1,14*2), padding=0, stride=3)
    else: # none
      self.after_net = None

  def forward(self, x):
    main_out = self.main_net(x).squeeze(2).squeeze(2)
    after_out = self.after_net(main_out).squeeze(2).squeeze(2)
    if self.after == 'up':
      b,c,h,w = main_out.shape
      return main_out[:,:c//2,:,:], after_out
    else:
      return after_out

class DynamicsNN(nn.Module):
  def __init__(self, num_inputs=14):
    super(DynamicsNN, self).__init__()
    self.beg = nn.ConvTranspose2d(1, 1, (2,2*num_inputs), padding=0, stride=2)
    self.up1 = UBlock(1, 512, after='up')
    self.up2 = UBlock(256, 256, after='up')
    self.up3 = UBlock(128, 128, after='up')
    self.up4 = UBlock(64, 64, after='up')
    self.dn1 = UBlock(32, 32, after='down')
    self.dn2 = UBlock(64, 64, after='down')
    self.dn3 = UBlock(128, 128, after='down')
    self.dn4 = UBlock(256, 256, after='down')
    self.end = UBlock(512, 512, after='end')

  def forward(self, x_in):
    # set_trace()
    beg = self.beg(x_in)
    skip1, up1 = self.up1(beg)
    skip2, up2 = self.up2(up1)
    skip3, up3 = self.up3(up2)
    skip4, up4 = self.up4(up3)
    down1 = self.dn1(up4)
    down2 = self.dn2(torch.cat((skip4, down1), dim=1))
    down3 = self.dn3(torch.cat((skip3, down2), dim=1))
    down4 = self.dn4(torch.cat((skip2, down3), dim=1))
    out = self.end(torch.cat((skip1, down4), dim=1))
    out[:,:,3:7,:] /= torch.norm(out[:,:,3:7,:], dim=2).view(-1,1,1,1)
    mask = (out[:,:,3,:] < 0).squeeze()
    out[mask,:,3:7,:] *= -1
    return out

In [41]:
def testNet():
  x_test = torch.randn(1,1,14,1)
  print('x_test: ', x_test[0,0,:,0])
  net = DynamicsNN()
  test = net(x_test)
  print('shape: ', test.shape)
  print('output: \n', test[0,0,:,0])

testNet()

x_test:  tensor([ 0.3742, -0.1727,  0.3634,  0.5274, -1.4375,  0.4241, -0.1815, -1.6557,
         0.6278, -0.0034,  0.5984,  0.3879, -2.5938,  0.0033])
shape:  torch.Size([1, 1, 10, 1])
output: 
 tensor([0.0069, 0.0086, 0.0076, 0.5317, 0.4460, 0.5338, 0.4832, 0.0092, 0.0112,
        0.0088], grad_fn=<SelectBackward>)
