# Pi Robot Exploration

## Setup

In [1]:
import random
from tqdm import tqdm
import numpy as np
import math
import os

%matplotlib inline
from matplotlib import pyplot as plt

import magnum as mn
from PIL import Image
import imageio
from IPython.display import Video

import habitat_sim
from habitat_sim.utils import common as utils
from habitat_sim.utils import viz_utils as vut

def display_sample(rgb_obs, semantic_obs=np.array([]), depth_obs=np.array([])):
    from habitat_sim.utils.common import d3_40_colors_rgb
    
    

    rgb_img = Image.fromarray(rgb_obs, mode="RGBA")

    arr = [rgb_img]
    titles = ["rgb"]

    plt.figure(figsize=(12, 8))
    for i, data in enumerate(arr):
        ax = plt.subplot(1, 3, i + 1)
        ax.axis("off")
        ax.set_title(titles[i])
        plt.imshow(data)
    plt.show(block=False)

### Actual PiCar Stats
- Width: 5", 0.127m
- Length: 7", 0.178
- Height: 4", 0.1m
- Camera: 3", 0.076m

In [None]:
sim_cfg = habitat_sim.SimulatorConfiguration()
sim_cfg.scene_id = './data//versioned_data/replica_cad_baked_lighting_1.0/remake_v0_v3_sc4_staging_05.glb'
sim_cfg.enable_physics = True
sim_cfg.allow_sliding = False

picam = habitat_sim.CameraSensorSpec()
picam.uuid = "color_sensor"
picam.sensor_type = habitat_sim.SensorType.COLOR
# TODO: Noise model?
picam.resolution = [720, 1280]
picam.hfov = 62.2 # https://elinux.org/Rpi_Camera_Module#Technical_Parameters_.28v.2_board.29
picam.position = [0.0, 0.076, 0.0] # TODO: maybe change y position since it's not in the center?

# agent
agent_cfg = habitat_sim.agent.AgentConfiguration()
agent_cfg.sensor_specifications = [picam]
agent_cfg.height = 0.1
agent_cfg.mass = 0.5
agent_cfg.linear_acceleration = 1.4


picar = habitat_sim.Configuration(sim_cfg, [agent_cfg])

In [None]:
try:  # Needed to handle out of order cell run in Colab
    sim.close()
except NameError:
    pass
sim = habitat_sim.Simulator(picar)

In [None]:
# initialize an agent
agent = sim.initialize_agent(0)

# Set agent state
agent_state = habitat_sim.AgentState()
agent_state.position = np.zeros(3)  # in world space
agent.set_state(agent_state)

# Get agent state
agent_state = agent.get_state()
print("agent_state: position", agent_state.position, "rotation", agent_state.rotation)

## Test Moving Around

In [None]:
# obtain the default, discrete actions that an agent can perform
# default action space contains 3 actions: move_forward, turn_left, and turn_right
action_names = list(picar.agents[0].action_space.keys())
print("Discrete action space: ", action_names)


def navigateAndSee(action=""):
    if action in action_names:
        observations = sim.step(action)
        print("action: ", action)
        if display:
            display_sample(observations["color_sensor"])

In [None]:
navigateAndSee('move_forward')

## More Sophistication

### NavMesh

In [None]:
navmesh_settings = habitat_sim.NavMeshSettings()

navmesh_settings.set_defaults()
navmesh_settings.cell_size = 0.01 #@param {type:"slider", min:0.01, max:0.2, step:0.01}
#default = 0.05
navmesh_settings.cell_height = 0.01 #@param {type:"slider", min:0.01, max:0.4, step:0.01}
#default = 0.2

#@markdown **Agent parameters**:
navmesh_settings.agent_height = 0.1 #@param {type:"slider", min:0.01, max:3.0, step:0.01}
#default = 1.5
navmesh_settings.agent_radius = 0.1 #@param {type:"slider", min:0.01, max:0.5, step:0.01}
#default = 0.1
navmesh_settings.agent_max_climb = 0.2 #@param {type:"slider", min:0.01, max:0.5, step:0.01}
#default = 0.2
navmesh_settings.agent_max_slope = 45 #@param {type:"slider", min:0, max:85, step:1.0}
# default = 45.0
# fmt: on
navmesh_settings.filter_low_hanging_obstacles = True  # @param {type:"boolean"}
# default = True
navmesh_settings.filter_ledge_spans = True  # @param {type:"boolean"}
# default = True
navmesh_settings.filter_walkable_low_height_spans = True  # @param {type:"boolean"}
# default = True

navmesh_settings.region_min_size = 0 #@param {type:"slider", min:0, max:50, step:1}
#default = 20
navmesh_settings.region_merge_size = 0 #@param {type:"slider", min:0, max:50, step:1}
#default = 20
navmesh_settings.edge_max_len = 0 #@param {type:"slider", min:0, max:50, step:1}
#default = 12.0
navmesh_settings.edge_max_error = 0.1 #@param {type:"slider", min:0, max:5, step:0.1}
#default = 1.3
navmesh_settings.verts_per_poly = 6.0 #@param {type:"slider", min:3, max:6, step:1}
#default = 6.0
navmesh_settings.detail_sample_dist = 6.0 #@param {type:"slider", min:0, max:10.0, step:0.1}
#default = 6.0
navmesh_settings.detail_sample_max_error = 1.0 #@param {type:"slider", min:0, max:10.0, step:0.1}
# default = 1.0
# fmt: on

navmesh_success = sim.recompute_navmesh(
    sim.pathfinder, navmesh_settings, include_static_objects=True
)

if not navmesh_success:
    print("Failed to build the navmesh! Try different parameters?")

In [None]:
def random_position():
    agent_state = sim.agents[0].get_state()
    agent_state.position = sim.pathfinder.get_random_navigable_point()
    orientation = random.random() * math.pi * 2.0
    agent_state.rotation = utils.quat_from_magnum(
        mn.Quaternion.rotation(-mn.Rad(orientation), mn.Vector3(0, 1.0, 0))
    )

    sim.agents[0].set_state(agent_state)

In [None]:
random_position()
plt.imshow(sim.get_sensor_observations()['color_sensor'])

## Continuous Navigation

In [None]:
vel_control = habitat_sim.physics.VelocityControl()
vel_control.controlling_lin_vel = True
vel_control.lin_vel_is_local = True
vel_control.controlling_ang_vel = True
vel_control.ang_vel_is_local = True


agent = sim.agents[0]

ACCEL = 0.1 # m/s^2
MAX_VEL = 1.5 # m/s
target_linear_vel = 0. # m/s

def move_linear(direction):
    global target_linear_vel
    target_linear_vel= float(direction)
    
def turn(direction):
    vel_control.angular_velocity = np.array([0, float(direction), 0])
    
def update_position(time_step):
    current_vel = vel_control.linear_velocity[2]
    
    next_step_vel = None
    if target_linear_vel > current_vel:
        next_step_vel = min(MAX_VEL, current_vel + ACCEL * time_step)
    else: 
        next_step_vel = max(-1 * MAX_VEL, current_vel - ACCEL * time_step)
    vel_control.linear_velocity = np.array([0., 0., next_step_vel])
    
    # TODO: probably just directly integrate acceleration into state??

    # Integrate the velocity and apply the transform.
    # Note: this can be done at a higher frequency for more accuracy
    agent_state = agent.state
    previous_rigid_state = habitat_sim.RigidState(
        utils.quat_to_magnum(agent_state.rotation), agent_state.position
    )

    # manually integrate the rigid state
    target_rigid_state = vel_control.integrate_transform(
        time_step, previous_rigid_state
    )

    # snap rigid state to navmesh and set state to object/agent
    # calls pathfinder.try_step or self.pathfinder.try_step_no_sliding
    end_pos = sim.step_filter(
        previous_rigid_state.translation, target_rigid_state.translation
    )

    # set the computed state
    agent_state.position = end_pos
    agent_state.rotation = utils.quat_from_magnum(
        target_rigid_state.rotation
    )
    agent.set_state(agent_state)

    # Check if a collision occured
    dist_moved_before_filter = (
        target_rigid_state.translation - previous_rigid_state.translation
    ).dot()
    dist_moved_after_filter = (
        end_pos - previous_rigid_state.translation
    ).dot()

    # NB: There are some cases where ||filter_end - end_pos|| > 0 when a
    # collision _didn't_ happen. One such case is going up stairs.  Instead,
    # we check to see if the the amount moved after the application of the filter
    # is _less_ than the amount moved before the application of the filter
    EPS = 1e-5
    collided = (dist_moved_after_filter + EPS) < dist_moved_before_filter

    # run any dynamics simulation
    sim.step_physics(time_step)
    
    return collided

In [None]:
observations = []
random_position()

sim_time = 10
control_frequency = 1  # @param {type:"slider", min:1, max:30, step:1}
frame_skip = 60  # @param {type:"slider", min:1, max:30, step:1}


fps = control_frequency * frame_skip
print("fps = " + str(fps))

time_step = 1.0 / (frame_skip * control_frequency)
print("time_step = " + str(time_step))
movement = 1
for _ in range(int(sim_time * control_frequency)):
    move_linear(movement)
    turn(random.choice([-1,1,0]))

    for _frame in range(frame_skip):
        collided = update_position(time_step)
        observations.append(sim.get_sensor_observations())
        
        if (collided):
            movement = -1 * movement

print("frames = " + str(len(observations)))

In [None]:
vut.make_video(observations, 'color_sensor', 'color', './out.mp4', fps=fps)

## Gym Env

In [49]:
%load_ext autoreload
from picar_env import PiCarEnv
%autoreload
try:
    test.close()
except NameError:
    pass
test = PiCarEnv('./data//versioned_data/hm3d-1.0/hm3d/example/00337-CFVBbU9Rsyb/CFVBbU9Rsyb.basis.glb')

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


I0122 23:28:16.309239 87360896 BulletPhysicsManager.cpp:35] Deconstructing BulletPhysicsManager
I0122 23:28:16.310658 87360896 PhysicsManager.cpp:50] Deconstructing PhysicsManager
I0122 23:28:16.310675 87360896 SemanticScene.h:43] Deconstructing SemanticScene
I0122 23:28:16.310678 87360896 SceneManager.h:25] Deconstructing SceneManager
I0122 23:28:16.310681 87360896 SceneGraph.h:25] Deconstructing SceneGraph
I0122 23:28:16.310866 87360896 Sensor.cpp:69] Deconstructing Sensor
I0122 23:28:16.328969 87360896 Renderer.cpp:71] Deconstructing Renderer
I0122 23:28:16.328996 87360896 WindowlessContext.h:17] Deconstructing WindowlessContext
I0122 23:28:16.339128 87360896 ManagedFileBasedContainer.h:210] <Dataset>::convertFilenameToPassedExt : Filename : default changed to proposed scene_dataset_config.json filename : default.scene_dataset_config.json
I0122 23:28:16.339165 87360896 AttributesManagerBase.h:365] <Dataset>::createFromJsonOrDefaultInternal : Proposing JSON name : default.scene_datas

Renderer: Apple M1 by Apple
OpenGL version: 4.1 Metal - 76.1
Using optional features:
    GL_ARB_vertex_array_object
    GL_ARB_ES2_compatibility
    GL_ARB_separate_shader_objects
    GL_ARB_texture_storage
    GL_EXT_texture_filter_anisotropic
    GL_EXT_debug_label
    GL_EXT_debug_marker
Using driver workarounds:
    no-layout-qualifiers-on-old-glsl
    apple-buffer-texture-unbind-on-buffer-modify


ersioned_data/hm3d-1.0/hm3d/example/00337-CFVBbU9Rsyb/CFVBbU9Rsyb.basis.stage_config.json from original name : ./data//versioned_data/hm3d-1.0/hm3d/example/00337-CFVBbU9Rsyb/CFVBbU9Rsyb.basis.glb | This file  does not exist.
I0122 23:28:16.361654 87360896 AbstractObjectAttributesManagerBase.h:183] File (./data//versioned_data/hm3d-1.0/hm3d/example/00337-CFVBbU9Rsyb/CFVBbU9Rsyb.basis.glb) exists but is not a recognized config filename extension, so new default Stage Template attributes created and registered.
I0122 23:28:16.361667 87360896 SceneDatasetAttributes.cpp:45] ::addNewSceneInstanceToDataset : Dataset : 'default' : Stage Attributes './data//versioned_data/hm3d-1.0/hm3d/example/00337-CFVBbU9Rsyb/CFVBbU9Rsyb.basis.glb' specified in Scene Attributes exists in dataset library.
I0122 23:28:16.361670 87360896 SceneDatasetAttributes.cpp:79] ::addNewSceneInstanceToDataset : Dataset : 'default' : Lighting Layout Attributes 'no_lights' specified in Scene Attributes but does not exist in 

### Random Baseline

In [53]:
test.reset()

observations = []
rewards = []

sim_time = 30
FPS = 30
direction = 1
for i in tqdm(range(sim_time * 10)):
    controls = np.random.rand(2,) * 2 - 1
        
    obs, reward, _, _ = test.step(controls)
    observations.append(obs)
    rewards.append(reward)

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 300/300 [00:01<00:00, 199.87it/s]


In [54]:
print('REWARD')
print('Mean:', np.mean(rewards))
print('Std', np.std(rewards))

with imageio.get_writer('./random.mp4', mode='I', fps=FPS) as writer:
    for obs in tqdm(observations):
        writer.append_data(obs)
Video('./random.mp4')

REWARD
Mean: 0.0030482998242530356
Std 0.002943935637300123


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 300/300 [00:01<00:00, 293.46it/s]


### Naive Baseline

In [55]:
test.reset()

observations = []
rewards = [0]

sim_time = 30
FPS = 30
direction = 1
for i in tqdm(range(sim_time * 10)):
    if i % 30 == 0: 
        if rewards[-1] < 0:
            direction = -1 * direction
    controls = np.array([float(direction), random.random()*2-1])
        
    obs, reward, _, _ = test.step(controls)
    observations.append(obs)
    rewards.append(reward)

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 300/300 [00:01<00:00, 223.69it/s]


In [56]:
print('REWARD')
print('Mean:', np.mean(rewards))
print('Std', np.std(rewards))

with imageio.get_writer('./naive.mp4', mode='I', fps=FPS) as writer:
    for obs in tqdm(observations):
        writer.append_data(obs)
Video('./naive.mp4')

REWARD
Mean: -3.4542872832369955
Std 5.089864992422782


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 300/300 [00:01<00:00, 282.06it/s]


## Learning

In [59]:
from stable_baselines3 import SAC
model = SAC('CnnPolicy', test, tensorboard_log="tensorboard")
for i in range(20): # 10 minutes of world time
    model.learn(total_timesteps=30 * 30, log_interval=10) # 30 "seconds"
    test.reset()



KeyboardInterrupt: 

In [None]:
observations = [test.reset()]
rewards = []

sim_time = 30
FPS = 30
controls = None
for i in tqdm(range(sim_time * FPS)):
    controls = model.predict(observations[-1])[0]
        
    obs, reward, _, _ = test.step(controls)
    observations.append(obs)
    rewards.append(reward)
       
print('RENDERING VIDEO')
with imageio.get_writer('./trained.mp4', mode='I', fps=FPS) as writer:
    for obs in tqdm(observations):
        writer.append_data(obs)
Video('./trained.mp4')

In [None]:
print('REWARD')
print(np.mean(rewards))
print(np.std(rewards))

In [None]:
?model.learn