In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
!pip install pybullet stable-baselines3[extra] motion-imitation==0.0.2 requests

# import a1 model

In [None]:
import requests
url = "https://raw.githubusercontent.com/xiaoa5/robotics_control/main/a1_sim.py"
r = requests.get(url)

with open('a1_sim.py','w') as f:
  f.write(r.text)
# from google.colab import files
# # files.upload()
# src = list(files.upload().values())[0]
# open('a1_sim.py','wb').write(src)

# gym style enviorment

In [None]:
from __future__ import absolute_import
from __future__ import division
#from __future__ import google_type_annotations
from __future__ import print_function

import os
import inspect

from absl import app
from absl import flags
import scipy.interpolate
import numpy as np
import pybullet_data as pd
from pybullet_utils import bullet_client

import time
import pybullet
import random

from mpc_controller import com_velocity_estimator
from mpc_controller import gait_generator as gait_generator_lib
from mpc_controller import locomotion_controller
from mpc_controller import openloop_gait_generator
from mpc_controller import raibert_swing_leg_controller
from mpc_controller import torque_stance_leg_controller
from google.colab import files
# files.upload()
# src = list(files.upload().values())[0]
# open('a1_sim.py','wb').write(src)
import a1_sim as laikago_sim

FLAGS = flags.FLAGS


_NUM_SIMULATION_ITERATION_STEPS = 300
_BODY_HEIGHT = 0.24
# _STANCE_DURATION_SECONDS = [
#     0.3
# ] * 4  # For faster trotting (v > 1.5 ms reduce this to 0.13s).
# _DUTY_FACTOR = [0.6] * 4

_STANCE_DURATION_SECONDS = [0.15]*4   #(0.3, 0.3, 0.3, 0.3)
_DUTY_FACTOR = [0.6]*4
_INIT_PHASE_FULL_CYCLE = [0., 0.25, 0.5, 0.25]
_MAX_TIME_SECONDS = 250
_MOTOR_KD = [1.0, 2.0, 2.0] * 4

LAIKAGO_THREE = (
    gait_generator_lib.LegState.STANCE,
    gait_generator_lib.LegState.STANCE,
    gait_generator_lib.LegState.STANCE,
    gait_generator_lib.LegState.SWING,
)
LAIKAGO_TROTTING = (
    gait_generator_lib.LegState.SWING,
    gait_generator_lib.LegState.STANCE,
    gait_generator_lib.LegState.STANCE,
    gait_generator_lib.LegState.SWING,
)
def _generate_example_linear_angular_speed(t):
  """Creates an example speed profile based on time for demo purpose."""
  vx = 0.6
  vy = 0.2
  wz = 0.8
  time_points = (0, 5, 10, 15, 20, 25,30)
  speed_points = ((0, 0, 0, 0), (0, 0, 0, wz), (vx, 0, 0, 0), (0, 0, 0, -wz), (0, -vy, 0, 0),
                  (0, 0, 0, 0), (0, 0, 0, wz))

  speed = scipy.interpolate.interp1d(
      time_points,
      speed_points,
      kind="previous",
      fill_value="extrapolate",
      axis=0)(
          t)

  return speed[0:3], speed[3]


def _setup_controller(robot):
  """Demonstrates how to create a locomotion controller."""
  desired_speed = (0, 0)
  desired_twisting_speed = 0

  gait_generator = openloop_gait_generator.OpenloopGaitGenerator(
      robot,
      stance_duration=_STANCE_DURATION_SECONDS,
      duty_factor=_DUTY_FACTOR,
      initial_leg_phase=_INIT_PHASE_FULL_CYCLE
  )
  state_estimator = com_velocity_estimator.COMVelocityEstimator(robot)
  sw_controller = raibert_swing_leg_controller.RaibertSwingLegController(
      robot,
      gait_generator,
      state_estimator,
      desired_speed=desired_speed,
      desired_twisting_speed=desired_twisting_speed,

      desired_height=_BODY_HEIGHT,
  )
  st_controller = torque_stance_leg_controller.TorqueStanceLegController(
      robot,
      gait_generator,
      state_estimator,
      desired_speed=desired_speed,
      desired_twisting_speed=desired_twisting_speed,


      desired_body_height=_BODY_HEIGHT,
      body_mass= 108 / 9.8,  #68 / 9.8,
      # body_inertia=(0.07335, 0, 0, 0, 0.25068, 0, 0, 0, 0.25447),
      body_inertia=(0.068, 0, 0, 0, 0.228, 0, 0, 0, 0.256),   #np.array((0.017, 0, 0, 0, 0.057, 0, 0, 0, 0.064)) * 0.1  #
  )

  controller = locomotion_controller.LocomotionController(
      robot=robot,
      gait_generator=gait_generator,
      state_estimator=state_estimator,
      swing_leg_controller=sw_controller,
      stance_leg_controller=st_controller,
      clock=robot.GetTimeSinceReset)
  return controller


def _update_controller_params(controller, lin_speed, ang_speed):
  controller.swing_leg_controller.desired_speed = lin_speed
  controller.swing_leg_controller.desired_twisting_speed = ang_speed
  controller.stance_leg_controller.desired_speed = lin_speed
  controller.stance_leg_controller.desired_twisting_speed = ang_speed

from gym.utils import seeding
import gym
from gym import spaces
class CustomGymEnv(gym.Env):

    metadata = {"render.modes": ["human", "rgb_array"], "video.frames_per_second": 100}
    def __init__(self, render=False):
        # Set up logging.
        self._is_render = render
        if self._is_render:
            self.p = bullet_client.BulletClient() #(connection_mode=pybullet.GUI)
        else:
            self.p = bullet_client.BulletClient()
        # self.p = bullet_client.BulletClient(connection_mode=pybullet.GUI)
        self.steering1Slider = self.p.addUserDebugParameter("forward  backward", -0.6, 3.6, 0)
        self.steering2Slider = self.p.addUserDebugParameter("left  right", -0.2, 0.2, 0)
        self.steering3Slider = self.p.addUserDebugParameter("turnLeft  tureRight", -0.8, 0.8, 0)
        self.forcexSlider = self.p.addUserDebugParameter("X Richtung", -2.8, 2.8, 0)
        self.forceySlider = self.p.addUserDebugParameter("Y Richtung", -2.8, 2.8, 0)
        self.forcezSlider = self.p.addUserDebugParameter("Z Richtung", -2.8, 2.8, 0)
        self.force1Slider = self.p.addUserDebugParameter("force applied on Dog", 0, 3000, 0)
        self.stanceDurationSlider = self.p.addUserDebugParameter("stance Duration", 0.05, 0.5, 0.18)       
        self.observation_space = spaces.Box(low=-20, high=20,shape=(37,), dtype=np.float32)  # 12 + 12 + 3 + 3 =30
        self.action_space = spaces.Box(low=-0.3, high=0.3,shape=(12,), dtype=np.float32)
        self.obs = None

        self.seed()
        self.reset()

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]



    def reset(self):
        self.p.resetSimulation()
        self.p.setAdditionalSearchPath(pd.getDataPath())
        num_bullet_solver_iterations = int(_NUM_SIMULATION_ITERATION_STEPS /laikago_sim.ACTION_REPEAT)
        self.p.setPhysicsEngineParameter(numSolverIterations=num_bullet_solver_iterations)
        self.p.setTimeStep(laikago_sim.time_step)
        self.p.setGravity(0, 0, -10)
        self.p.setPhysicsEngineParameter(enableConeFriction=0)
        planeShape = self.p.createCollisionShape(shapeType = self.p.GEOM_PLANE)
        ground_id  = self.p.createMultiBody(0, planeShape)
        self.p.resetBasePositionAndOrientation(ground_id,[0,0,0], [0,0,0,1])
        self.p.changeVisualShape(ground_id, -1, rgbaColor=[0,0.7,0.5,0.8])
        self.p.changeDynamics(ground_id, -1, lateralFriction=2.0)
        deltaH = random.random()/20
        self.robot_uid = self.p.loadURDF("a1/a1.urdf", [0, 0, 0.28+deltaH])
        self.robot = laikago_sim.SimpleRobot(self.p, self.robot_uid)
        self.controller = _setup_controller(self.robot)
        self.controller.reset()

        self.current_time = self.robot.GetTimeSinceReset()
        self.p.configureDebugVisualizer(self.p.COV_ENABLE_RENDERING, 1)
        self._env_step_counter = 0
        obs = self.mpc_action()
        # self.obs = obs
        return np.clip(obs,-20,20)

    def is_fallen(self):
        
        pos, orientation = self.p.getBasePositionAndOrientation(self.robot.quadruped)
        # rot_mat = self.p.getMatrixFromQuaternion(orientation)
        # local_up = rot_mat[6:]
        roll_pitch_yaw = self.p.getEulerFromQuaternion(orientation)

        # print(np.dot(np.asarray([0, 0, 1]), np.asarray(local_up)))
        # pos = self.robot.GetBasePosition()
        # print(roll_pitch_yaw)
        return  pos[2] < 0.135 or pos[2] > 0.8 #np.dot(np.asarray([0, 0, 1]), np.asarray(local_up)) < 0 and

    def _termination(self):
        # position = self.robot.GetTrueBaseOrientation()
        # distance = math.sqrt(position[0]**2 + position[1]**2)
        # print(self.is_fallen())
        return self.is_fallen()  # or distance > self._distance_limit
    
    def get_obs(self,acts):
        # acts = self.obs

        obs0 = []
        obs1 = []  # swing 0 stance 1
        for i in range(12):
            if acts[i*5] == 0 and acts[i*5 + 1] == 0:
                obs0.extend([1])
                obs1.extend([acts[i*5+4]])
            else:
                obs0.extend([0])
                obs1.extend([acts[i*5]])

        pos, orientation = self.p.getBasePositionAndOrientation(self.robot.quadruped)
        posV, orientationV = self.p.getBaseVelocity(self.robot.quadruped)
        # print(pos, orientation,posV, orientationV)
        # print(acts)
        # print(pos, orientation)
        obs0.extend(obs1)
        obs0.extend(pos)
        obs0.extend(posV)
        obs0.extend(orientation)
        obs0.extend(orientationV)
        # print(np.shape(obs0))
        return obs0
        
        
        


    def mpc_action(self):
        if self.current_time < 3000:
            if self._is_render:

                # steeringAngle1 = self.p.readUserDebugParameter(self.steering1Slider)
                # steeringAngle2 = self.p.readUserDebugParameter(self.steering2Slider)
                # steeringAngle3 = self.p.readUserDebugParameter(self.steering3Slider)

                # forcex = self.p.readUserDebugParameter(self.forcexSlider)
                # forcey = self.p.readUserDebugParameter(self.forceySlider)
                # forcez = self.p.readUserDebugParameter(self.forcezSlider)

                # force1 = self.p.readUserDebugParameter(self.force1Slider)
                # stanceDuration1 = self.p.readUserDebugParameter(self.stanceDurationSlider)
                # stanceDuration = [stanceDuration1] * 4 
                pass
            else:
                steeringAngle1 = 3.6
                steeringAngle2 = 0
                steeringAngle3 = 0

                forcex = 0
                forcey = 0
                forcez = 0

                force1 = 0
                stanceDuration1 = 0.18
                stanceDuration = [stanceDuration1] * 4 

            steeringAngle1 = 3.6
            steeringAngle2 = 0
            steeringAngle3 = 0

            forcex = 0
            forcey = 0
            forcez = 0

            force1 = 0
            stanceDuration1 = 0.18
            stanceDuration = [stanceDuration1] * 4 


            lin_speed = [steeringAngle1,steeringAngle2,0]
            ang_speed = steeringAngle3

            rpy = (forcex,forcey,forcez)

            start = time.time()
    
            _update_controller_params(self.controller, lin_speed, ang_speed)
            # print(rpy,"-----------------")
            
            # Needed before every call to get_action().
            self.controller.update()

            start1 = time.time()

            try:


                hybrid_action = self.controller.get_action()
            except:
                hybrid_action = self.obs
                print('+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
            obs0 = []
            obs1 = []  # swing 0 stance 1
            for i in range(12):
                if hybrid_action[i*5] == 0 and hybrid_action[i*5 + 1] == 0:
                    obs0.extend([1])
                    obs1.extend([hybrid_action[i*5+4]])
                else:
                    obs0.extend([0])
                    obs1.extend([hybrid_action[i*5]])

            obs3 = self.robot.GetBaseRollPitchYawRate()                     # [-0.29003763 -0.15708871  0.03630358]
            obs4 = self.controller.state_estimator.com_velocity_body_frame  #(-0.08136581629514694, -0.06520375609397888, 0.02040734514594078)
            # print(obs4)  
            out = obs1

            end = time.time()

            # self.render()

            gemPos, gemOrn = self.p.getBasePositionAndOrientation(self.robot_uid)

            self.p.resetDebugVisualizerCamera( cameraDistance=1.5, cameraYaw=-30, cameraPitch=-30, cameraTargetPosition=gemPos)
            
            force = force1*np.array([forcex,forcey,forcez])

            
            self.p.applyExternalForce(objectUniqueId=self.robot_uid, linkIndex=-1, forceObj=force
                ,posObj=gemPos, flags=self.p.WORLD_FRAME)
            # getExtendedObservation(robot_uid)
            # time.sleep(0.001)
            # for i in range(12):
            #     if hybrid_action[i*5] == 0 and hybrid_action[i*5 + 1] == 0:
            #         hybrid_action[i*5 + 4] += obs1[i]
            #     else:
            #         hybrid_action[i*5] += obs1[i]
            self.obs = hybrid_action
            self.get_obs(hybrid_action)

            return self.get_obs(hybrid_action) #hybrid_action

    def step(self,action):

        acts = self.obs

        obs0 = []
        obs1 = []  # swing 0 stance 1
        # print(acts)
        for i in range(12):
            if acts[i*5] == 0 and acts[i*5 + 1] == 0:
                obs0.extend([1])
                # obs1.extend([obs[i*5+4]])
            else:
                obs0.extend([0])
                # obs1.extend([obs[i*5]])

        penalty_action = 0

        for i in range(12):
            if obs0[i] == 1:
                acts[i*5+4] += action[i]
                penalty_action += action[i]*action[i]
            else:
                acts[i*5] += action[i]
                penalty_action += action[i]*action[i]

        action = acts
        
        try:
            self.robot.Step(action)
        except:
            print(action,self.obs)
        
        
        current_time = self.robot.GetTimeSinceReset()
        
        # done = self._termination()

        if self._env_step_counter > 2500 or  self._termination():
            done = True
        else:
            done = False

        # done = self.is_fallen()
        
        self._env_step_counter += 1
        reward = self._reward()
        if self._termination():
            reward -= 10
        else:
            reward += 0.5

        reward -= penalty_action*10
        # elif penity_action > 0.01*6:
        # else:
        #     reward += 0.3


        obs = self.mpc_action()
        # self.obs = obs 
        return np.clip(obs,-20,20), reward, done, {}

    def render(self, mode="rgb_array", close=False):

        cam_dist = 1.0
        cam_yaw = 0
        cam_pitch = -30

        RENDER_HEIGHT = 360
        RENDER_WIDTH = 480

        gemPos, gemOrn = self.p.getBasePositionAndOrientation(self.robot_uid)
        # if mode != "rgb_array":
        #   self.p.resetDebugVisualizerCamera( cameraDistance=1.5, cameraYaw=-30, cameraPitch=-30, cameraTargetPosition=gemPos)
        #   return np.array([])

        # gemPos, gemOrn = self.p.getBasePositionAndOrientation(self.robot_uid)

        # self.p.resetDebugVisualizerCamera( cameraDistance=1.5, cameraYaw=-30, cameraPitch=-30, cameraTargetPosition=gemPos)


        
        view_matrix = self.p.computeViewMatrixFromYawPitchRoll(
            cameraTargetPosition=gemPos,
            distance=cam_dist,
            yaw=cam_yaw,
            pitch=cam_pitch,
            roll=0,
            upAxisIndex=2)
        proj_matrix = self.p.computeProjectionMatrixFOV(fov=60,
                                                                      aspect=float(RENDER_WIDTH) /
                                                                      RENDER_HEIGHT,
                                                                      nearVal=0.1,
                                                                      farVal=100.0)
        (_, _, px, _, _) = self.p.getCameraImage(
            width=RENDER_WIDTH,
            height=RENDER_HEIGHT,
            renderer=self.p.ER_BULLET_HARDWARE_OPENGL,
            viewMatrix=view_matrix,
            projectionMatrix=proj_matrix)
        rgb_array = np.array(px)
        rgb_array = rgb_array[:, :, :3]
        return rgb_array

    def _reward(self):
        current_base_velocity = self.robot.GetBaseVelocity()
        # if self._is_render:
        #     desire_base_velocity = self.p.readUserDebugParameter(self.steering1Slider)
        # else:
        #     desire_base_velocity = 3.6
        desire_base_velocity = 3.6
        co = abs(current_base_velocity[0]-desire_base_velocity) + 1
        reward = (current_base_velocity[0]- 0.0)/(co*10)
        reward -= (abs(current_base_velocity[1]) + abs(current_base_velocity[2]))/(co*20)

        orientation = self.robot.GetTrueBaseOrientation()
        rot_matrix = self.p.getMatrixFromQuaternion(orientation)
        local_up_vec = rot_matrix[6:]
        shake_reward = -abs(np.dot(np.asarray([1, 1, 0]), np.asarray(local_up_vec)))

        reward += shake_reward*0.5
        

        # if current_base_velocity[0]-desire_base_velocity > 0:
        #     reward = 1
        # elif abs(current_base_velocity[0]-desire_base_velocity) < 0.5:
        #     reward = 0.3
        # elif abs(current_base_velocity[0]-desire_base_velocity) < 1:
        #     reward = 0.
        # elif abs(current_base_velocity[0]-desire_base_velocity) < 1.5:
        #     reward = -0.1
        # else:
        #     reward = -0.3
        
        

        # forward_reward = current_base_position[0] - self._last_base_position[0]
        # # Cap the forward reward if a cap is set.
        # forward_reward = min(forward_reward, self._forward_reward_cap)
        # # Penalty for sideways translation.
        # drift_reward = -abs(current_base_position[1] - self._last_base_position[1])
        # # Penalty for sideways rotation of the body.
        # orientation = self.minitaur.GetBaseOrientation()
        # rot_matrix = pybullet.getMatrixFromQuaternion(orientation)
        # local_up_vec = rot_matrix[6:]
        # shake_reward = -abs(np.dot(np.asarray([1, 1, 0]), np.asarray(local_up_vec)))
        # energy_reward = -np.abs(
        #     np.dot(self.minitaur.GetMotorTorques(),
        #         self.minitaur.GetMotorVelocities())) * self._time_step
        # objectives = [forward_reward, energy_reward, drift_reward, shake_reward]
        # weighted_objectives = [o * w for o, w in zip(objectives, self._objective_weights)]
        # reward = sum(weighted_objectives)
        # self._objectives.append(objectives)
        return reward

# env2 = MyEnv()
def make_env(rank, seed=0):
    """
    Utility function for multiprocessed env.
    :param env_id: (str) the environment ID
    :param num_env: (int) the number of environments you wish to have in subprocesses
    :param seed: (int) the inital seed for RNG
    :param rank: (int) index of the subprocess
    """
    def _init():
        env = CustomGymEnv()
        env.seed(seed + rank)
        return env
    set_random_seed(seed)
    return _init


# run tensorboard

In [None]:
%load_ext tensorboard
%tensorboard --logdir '/content/drive/My Drive/Colab Notebooks/files/tb/mpc/'

# video setting

In [None]:
from typing import Any, Dict
from stable_baselines3.common.callbacks import BaseCallback

class VideoRecorderCallback(BaseCallback):
    def __init__(self, eval_env: gym.Env, render_freq: int, n_eval_episodes: int = 1, deterministic: bool = True):
        """
        Records a video of an agent's trajectory traversing ``eval_env`` and logs it to TensorBoard

        :param eval_env: A gym environment from which the trajectory is recorded
        :param render_freq: Render the agent's trajectory every eval_freq call of the callback.
        :param n_eval_episodes: Number of episodes to render
        :param deterministic: Whether to use deterministic or stochastic policy
        """
        super().__init__()
        self._eval_env = eval_env
        self._render_freq = render_freq
        self._n_eval_episodes = n_eval_episodes
        self._deterministic = deterministic

    def _on_step(self) -> bool:
        if self.n_calls % self._render_freq == 0:
            screens = []

            def grab_screens(_locals: Dict[str, Any], _globals: Dict[str, Any]) -> None:
                """
                Renders the environment in its current state, recording the screen in the captured `screens` list

                :param _locals: A dictionary containing all local variables of the callback's scope
                :param _globals: A dictionary containing all global variables of the callback's scope
                """
                screen = self._eval_env.render(mode="rgb_array")
                # PyTorch uses CxHxW vs HxWxC gym (and tensorflow) image convention
                screens.append(screen.transpose(2, 0, 1))

            evaluate_policy(
                self.model,
                self._eval_env,
                callback=grab_screens,
                n_eval_episodes=self._n_eval_episodes,
                deterministic=self._deterministic,
            )
            self.logger.record(
                "trajectory/video",
                Video(th.ByteTensor([screens]), fps=40),
                exclude=("stdout", "log", "json", "csv"),
            )
        return True

# train loop

In [None]:
from stable_baselines3.common.callbacks import CallbackList, CheckpointCallback, EvalCallback
from stable_baselines3.common.vec_env import SubprocVecEnv, DummyVecEnv, VecNormalize
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.utils import set_random_seed
from stable_baselines3 import A2C,PPO,SAC

from stable_baselines3.common.logger import Video
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.evaluation import evaluate_policy



eval_env = CustomGymEnv()
eval_env = DummyVecEnv([lambda: eval_env])
eval_env = VecNormalize(eval_env, norm_obs=True, norm_reward=False,clip_obs=20., gamma=0.99) # when training norm_reward = True

num_cpu = 4
env = SubprocVecEnv([make_env(i) for i in range(num_cpu)])
env = VecNormalize(env, norm_obs=True, norm_reward=False,clip_obs=20., gamma=0.99) # when training norm_reward = True




video_recorder = VideoRecorderCallback(eval_env, render_freq=10000)



model_load_name = "mpc_ppo0410"
path = F"/content/drive/My Drive/Colab Notebooks/files/{model_load_name}" 
model = PPO.load(path, env,  tensorboard_log="/content/drive/My Drive/Colab Notebooks/files/tb/mpc/", verbose=0)

import torch as th

#### new model
# policy_kwargs = dict(activation_fn=th.nn.Tanh,net_arch=[dict(pi=[256, 256], vf=[256, 256])])
# model = PPO('MlpPolicy', env, policy_kwargs=policy_kwargs, tensorboard_log="/content/drive/My Drive/Colab Notebooks/files/tb/mpc/", verbose=0)

checkpoint_callback = CheckpointCallback(save_freq=50000, save_path='/content/drive/My Drive/Colab Notebooks/files/temp_model_logs/tmp/')
eval_callback = EvalCallback(eval_env, best_model_save_path='/content/drive/My Drive/Colab Notebooks/files/temp_model_logs/best_model',
                            log_path='/content/drive/My Drive/Colab Notebooks/files/temp_model_logs/results', eval_freq=10000)
callback = CallbackList([checkpoint_callback, eval_callback, video_recorder])
tmsp = 1000*6000
model.learn(total_timesteps=tmsp, callback=callback)  # , eval_freq=10, n_eval_episodes=1, eval_log_path="/content/drive/My Drive/Colab Notebooks/files/ev/"

model_save_name = "mpc_ppo0411"

path = F"/content/drive/My Drive/Colab Notebooks/files/{model_save_name}"

model.save(path)

stats_name = "vn_mpc_ppo03.pkl"
stats_path = F"/content/drive/My Drive/Colab Notebooks/files/{stats_name}"
env.save(stats_path)
