# 距離センサ3つのみでAI RC CARを実現できるか実験してみる。

## このノートブックの概要
ロボット前方に3つの距離センサを搭載し、AI RC CARと同様に、NNに BCできるかを実験する。

## 方針

単純な対抗二輪ロボットと距離センサのモデル化を行い、シミュレーション環境とする。まず、単純な比例制御でロボットを制御し、教師データを収集する。収集したデータを３層のNNへ学習させて、走行データをBCさせる。最終的に、比例制御器を学習したNNに置き換えて自動走行が実現できていることを確認する。


## シミュレータ環境の構築

[詳解 確率ロボティクス]( https://www.amazon.co.jp/dp/4065170060/ref=cm_sw_em_r_mt_dp_U_S8Z9EbWNTEBJ5)で実装されているシミュレータ環境をベースに実装を進める。シミュレータ部分のコード詳細はそちらを参考にする。

壁状の障害物を距離センサで検出できるように拡張を行なっている。

### Worldクラスの実装


In [None]:
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('nbagg')
import matplotlib.animation as anm
from matplotlib import rc
from IPython.display import HTML
%matplotlib inline

class World:
  def __init__(self, time_span, time_interval, debug=False):
    self.objects = []
    self.debug = debug
    self.time_span = time_span
    self.time_interval = time_interval

  def append(self, obs):
    self.objects.append(obs)
  
  def draw(self):
    fig = plt.figure(figsize=(8,8))
    ax = fig.add_subplot(111)
    ax.set_aspect('equal')
    ax.set_xlim(-5,5)
    ax.set_ylim(-5,5)
    ax.set_xlabel("X", fontsize=20)
    ax.set_ylabel("Y", fontsize=20)

    elems = []
    if self.debug:
      for i in range(1000): self.one_step(i, elems,ax)
    else:
      self.ani = anm.FuncAnimation(fig, self.one_step, fargs=(elems,ax), 
                                   frames=int(self.time_span/self.time_interval), 
                                   interval = int(self.time_interval * 1000), repeat=False)
        
  def one_step(self, i, elems, ax):
    while elems: elems.pop().remove()
    time_str = "t = %.2f[s]" % (self.time_interval * i)
    elems.append(ax.text(-4.4,4.5, time_str, fontsize=10))
    for obj in self.objects:
      obj.draw(ax, elems)
      if hasattr(obj, "one_step"):obj.one_step(self.time_interval)

### Robotクラスの実装

IdealSensorは距離センサのモデル化
壁との当たり判定は以下のサイトを参考に実装した。
http://konapower.hateblo.jp/entry/2017/09/14/180949

In [None]:
import math
import matplotlib.patches as patches
import numpy as np
from scipy.stats import expon, norm

class IdealRobot:
  def __init__(self, pose, agent=None, sensors=None, color="black"):
    self.pose = pose
    self.r = 0.2
    self.color = color
    self.agent = agent
    self.poses = [pose]
    self.sensor_pose = None
    self.sensors = sensors

  def draw(self, ax, elems):
    x,y,theta = self.pose
    xn = x + self.r * math.cos(theta)
    yn = y + self.r * math.sin(theta)
    self.sensor_pose = np.array([[x, y], [xn, yn]])
    elems += ax.plot([x, xn], [y, yn], color = self.color)
    c = patches.Circle(xy=(x,y), radius=self.r, fill=False, color = self.color)
    elems.append(ax.add_patch(c))
    self.poses.append(self.pose)
    elems += ax.plot([e[0] for e in self.poses], [e[1] for e in self.poses], linewidth=0.5, color ="black")
    if len(self.sensors) > 1 and len(self.poses) > 1:
      for sensor in self.sensors:
        sensor.draw(ax,elems,self.poses[-2])

  def one_step(self, time_interval):
    if not self.agent:return    
    obs = []
    for sensor in self.sensors:
      obs.append(sensor.data(self.pose))
    nu,omega = self.agent.decision(obs)
    self.pose = self.state_trantision(nu,omega,time_interval,self.pose)

  @classmethod
  def state_trantision(cls, nu, omega, time, pose):
    t0 = pose[2]
    if math.fabs(omega) < 1e-10:
      value = np.array([nu*math.cos(t0),
                              nu*math.sin(t0),
                              omega]) * time
      return pose + value
    else:
      value = np.array([nu/omega*(math.sin(t0 + omega*time) - math.sin(t0)),
                              nu/omega*(-math.cos(t0+omega*time)+math.cos(t0)),
                              omega*time])
      return pose + value

class Robot(IdealRobot):

  def __init__(self, pose, agent=None, sensors = None, color = "black",
               noise_per_meter=5, noise_std=math.pi/60,
               bias_rate_std=(0.1,0.1)):
    super().__init__(pose, agent, sensors, color)
    self.noise_pdf = expon(scale=1.0/(1e-100+noise_per_meter))
    self.distance_until_noise = self.noise_pdf.rvs()
    self.theta_noise = norm(scale=noise_std)
    self.bias_rate_nu = norm.rvs(loc=1.0, scale=bias_rate_std[0])
    self.bias_rate_omega = norm.rvs(loc=1.0, scale=bias_rate_std[1])

  def noise(self, pose, nu, omega, time_interval):
    self.distance_until_noise -= abs(nu) * time_interval + self.r * abs(omega) * time_interval
    if self.distance_until_noise <= 0.0:
      self.distance_until_noise += self.noise_pdf.rvs()
      pose[2] += self.theta_noise.rvs()
    return pose

  def bias(self, nu, omega):
    return nu * self.bias_rate_nu, omega * self.bias_rate_omega

  def one_step(self, time_interval):
    if not self.agent:return    
    obs = []
    for sensor in self.sensors:
      obs.append(sensor.data(self.pose))
    nu,omega = self.agent.decision(obs)
    nu,omega = self.bias(nu, omega)
    self.pose = self.state_trantision(nu, omega, time_interval, self.pose)
    self.pose = self.noise(self.pose, nu, omega, time_interval)

class IdealRangeSensor:

  def __init__(self, env_map, install_angle):
    self.env_map = env_map
    self.install_angle = install_angle
    self.lastdata = []

  def data(self, sensor_pose):
    observed = []
    ls = self.line_segment(sensor_pose)
    for w in self.env_map.walls:
      p = self.intersection(ls[0], ls[1], w.points[0], w.points[1])
      observed.append(p)
    self.lastdata = self._select_min_distance(np.array(observed))
    return self.lastdata

  def _select_min_distance(self, observed):
    np.argsort(observed[:, 2])
    a_2d_sort_col_num = observed[np.argsort(observed[:, 2])]
    return a_2d_sort_col_num[0]
  
  def line_segment(self, sensor_pose):
    x,y,theta = sensor_pose
    corners = np.array([[5, 5], [5,- 5], [-5, 5], [-5, -5]])
    l = max([math.hypot(*(np.array([x, y]-corner))) for corner in corners])
    nx = x + l * math.cos(theta+self.install_angle)
    ny = y + l * math.sin(theta+self.install_angle)
    return np.array([[x, y], [nx, ny]])

  def intersection(self, p1,p2,p3,p4):
    ab = p2-p1; ac = p3-p1; cd = p4-p3
    c = np.cross(ab,cd)
    if c != 0:
      r1 = np.cross(ac,ab)/c
      r2 = np.cross(ac,cd)/c
      if 0 <= r1 and r1 <= 1and 0 <= r2 and r2 <= 1:
        position = (ab*r2)+p1
        diff = p1 - position  
        distance = np.hypot(*diff)
        return np.array([position[0], position[1], distance])
    return np.array([float('inf'), float('inf'), float('inf')])
    
  def draw(self, ax, elems, sensor_pose):
    x,y,theta = sensor_pose
    if len(self.lastdata) > 0:
      elems += ax.plot([x,self.lastdata[0]],[y,self.lastdata[1]], color="pink")

class RangeSensor(IdealRangeSensor):
  def __init__(self, env_map,
               install_angle,
               distance_noise_rate=0.1):
    super().__init__(env_map, install_angle)
    self.distance_noise_rate = distance_noise_rate

  def noise(self, relpose):
    ell = norm.rvs(loc=relpose[2], scale=relpose[2]*self.distance_noise_rate)
    return np.array([relpose[0],relpose[1],ell]).T

  def data(self, sensor_pose):
    observed = []
    ls = self.line_segment(sensor_pose)
    for w in self.env_map.walls:
      p = self.intersection(ls[0], ls[1], w.points[0], w.points[1])
      observed.append(p)
    self.lastdata = self._select_min_distance(np.array(observed))
    self.lastdata = self.noise(self.lastdata)
    return self.lastdata

    



### 壁とマップクラスの実装

障害物として壁を定義、２点間の線分として表現する。

In [None]:
class Wall:
  def __init__(self,p1,p2):
    self.points = np.vstack([p1,p2])
    self.id = None
  
  def draw(self, ax, elems):
    elems += ax.plot([e[0] for e in self.points], 
                     [e[1] for e in self.points], color="blue",linewidth=0.5)
    
class Map:
  def __init__(self):
    self.walls=[]
  def append_walls(self, wall):
    wall.id = len(self.walls) + 1
    self.walls.append(wall)
  
  def draw(self, ax, elems):
    for w in self.walls: w.draw(ax,elems)
     

### Agentクラス実装

Agentクラスは右センサ、左センサの差分を入力として比例制御を行う、左右のセンサの値が同じになる（コース中央を通る）ように制御を行う。速度は今の所固定。比例制御のパラメータはヒューリスティックに適当に決定。

In [None]:
class Agent:
  def __init__(self,nu,omega):
    self.nu = nu
    self.omega = omega
    self.record = []
    self.kp = 20
  
  def decision(self, observation=None):
    e = (observation[2][2] - observation[1][2]) * self.kp
    self.omega = (e / 180 * math.pi) # 角度指令omegaに対する制御
    #速度指令は初期値で固定、実際は上記制御に加えて速度制御用の式が必要になる。
    self.record.append([observation[0][2],observation[1][2],observation[2][2], self.omega, self.nu]) # 学習データの記録、入力とそれに対応する出力をセットで記録する。
    return self.nu, self.omega
  




## シミュレーションの実行

Google Colabではリアルタイムの表示ができないため、動画で確認。実行は結構時間かかる。

In [None]:
time = 300 #シミュレーション時間
interval = 0.1 #制御周期(10Hz)
world=World(time, interval)

#外壁
w1 = Wall([-3.5, -3],[-3.5, 3])
w2 = Wall([-3.5, 3],[-2.5, 4])
w3 = Wall([-2.5,4],[2.5,4])
w4 = Wall([2.5,4],[3.5,3])
w5 = Wall([3.5, 3],[3.5,-3])
w6 = Wall([3.5,-3],[2.5,-4])
w7 = Wall([2.5,-4],[-2.5,-4])
w8 = Wall([-2.5,-4],[-3.5,-3])

#内壁
w9 = Wall([-2, -1.5],[-2, 1.5])
w10 = Wall([-2,1.5],[-1,2.5])
w11 = Wall([-1,2.5],[1,2.5])
w12 = Wall([1, 2.5],[2,1.5])
w13 = Wall([2, 1.5],[2,-1.5])
w14 = Wall([2,-1.5],[1,-2.5])
w15 = Wall([1,-2.5],[-1,-2.5])
w16 = Wall([-1,-2.5],[-2, -1.5])

map = Map()
map.append_walls(w1)
map.append_walls(w2)
map.append_walls(w3)
map.append_walls(w4)
map.append_walls(w5)
map.append_walls(w6)
map.append_walls(w7)
map.append_walls(w8)
map.append_walls(w9)
map.append_walls(w10)
map.append_walls(w11)
map.append_walls(w12)
map.append_walls(w13)
map.append_walls(w14)
map.append_walls(w15)
map.append_walls(w16)

agent = Agent(0.6, 0.0)
front_sensor = RangeSensor(map, 0)
right_sensor = RangeSensor(map, -math.pi/6)
left_sensor = RangeSensor(map, math.pi/6)
sensors = [front_sensor, right_sensor, left_sensor]
robot1 = Robot(np.array([-2.5, 0, math.pi/2]).T, agent, sensors)
world.append(map)
world.append(robot1)
world.draw()
rc('animation', html='jshtml')
world.ani
#print(len(agent.record))

## 学習データの準備

学習データはagent.recordにリストとして溜まっている。以下の並びのデータが収集されている。


```[中央センサの値,右センサの値,左センサの値,舵角,速度]```

以下ではPytorchで学習させるためにデータの整形と変換を行う。正規化とかしてないけど多分動く。

In [None]:
import tensorflow as tf

from tensorflow import keras
from tensorflow.keras import layers
x = np.array(agent.record)

# xを2個の配列に水平分割
train_data, train_labels = np.hsplit(x, [3])

print(train_data)
print(train_labels)

## NNのモデル定義

NNのネットワークを構築する。入力3,出力2の全結合3層のニューラルネットワーク、口が裂けても深層学習とか言ってはいけない。入力3はセンサの値をそのまま突っ込む。出力２は速度と舵角が出力される。中間層は16あるけど、多分、さらに半分に落としても問題ない。

In [None]:
import tensorflow as tf

from tensorflow import keras
from tensorflow.keras import layers

model = keras.Sequential([
    layers.Dense(8, activation='relu', input_shape=(3,)),
    layers.Dense(8, activation='relu'),
    layers.Dense(2),
  ])

optimizer = keras.optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
model.compile(loss='mse',
              optimizer = optimizer,
              metrics = ['mae'])
model.summary()

## 学習ループ

Tensorflowの学習ループ


In [None]:
EPOCHS = 20

history = model.fit(
  train_data, train_labels,
  epochs=EPOCHS, validation_split = 0.2, verbose=1,
  callbacks=[])
print(history)

## 学習モデルを利用した制御関数の実装

学習済みのモデルを使って制御を行う。Agentクラスの比例制御を```model(o)```で置き換えるだけで実装が終わる。
つまり、線形比例の制御関数を大量のデータを学習したNNで近似したことになる。回帰学習だから当たり前なんだけど、ある種本質をついている結果だと思う。

In [None]:
class DeepAgent:
  def __init__(self, model):
    self.model = model

  def decision(self, observation=None):
    o = np.array([[observation[0][2],observation[1][2],observation[2][2]]])
    result = model.predict(o).flatten()
    return np.array(result[1]), np.array(result[0])

## 学習済みモデルによる自動走行

In [None]:
#Mapやセンサの定義は学習時のオブジェクトを使い回す。

world=World(150, 0.1)
deep_agent = DeepAgent(model)
robot = Robot(np.array([-3, 0, math.pi/2]).T, deep_agent, sensors)
world.append(map)
world.append(robot)
world.draw()
rc('animation', html='jshtml')
world.ani

## TensorFlow lite for microcontrollerへモデル変換



In [None]:
# Convert to TFLite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert() #量子化はせずFP32のまま処理していことに注意!

open("converted_model.tflite", "wb").write(tflite_model)


!apt-get install xxd
!xxd -i converted_model.tflite > airc_model.cpp

## Download FlatBuffer C files.

TensorFlow lite for microcontroller向けに保存したモデルファイルをダウンロードする。

In [None]:
from google.colab import files
files.download('airc_model.cpp')