In [1]:
## This file contains a prototype idea of trying to learn a value function that represents the viable region
## for a LIP model. 
## Author : Avadesh Meduri
## Date : 20/02/2020

import numpy as np
import IPython
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.animation import FuncAnimation
import pickle as p
import os

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F


In [14]:
## LIPM Environment

class LipmEnv:
    def __init__(self, h):
        self.omega = np.sqrt(9.81/h)
        self.max_leg_length = 0.6
        self.dt = 0.001
        self.h = h
        self.A = np.matrix([[1, self.dt], [(self.omega**2)*self.dt, 1]])
        self.B = np.matrix([0, -(self.omega**2)*self.dt])
        self.t = 0
                                 
    def integrate_lip_dynamics(self, x_t, u_t):
        ## integrates dynamics for one step
        assert np.shape(x_t) == (2,)
        x_t_1 = np.matmul(self.A, np.transpose(x_t)) + np.matmul(self.B.transpose(), [u_t])
        return x_t_1

    def reset_env(self, x0, u0, epi_time):
        ## initialises environment
        self.t = 0
        self.sim_data = np.zeros((4, int(epi_time/self.dt)+1))
        self.sim_data[:,0][0:2] = x0
        self.sim_data[:,0][2] = u0
        self.sim_data[:,0][3] = self.h
    def step_env(self):
        ## integrates the simulation one step
        self.sim_data[:,self.t + 1][0:2] = self.integrate_lip_dynamics(self.sim_data[:,self.t][0:2],\
                                                   self.sim_data[:,self.t][2])
        self.sim_data[:,self.t + 1][2] = self.sim_data[:,self.t][2]
        self.sim_data[:,self.t + 1][3] = self.sim_data[:,self.t][3] 
        self.t += 1
    
    def set_action(self, u):
        self.sim_data[:,self.t][2] = u
        
    def return_sample_data(self):
        return self.sim_data[:,0:self.t]
           
    def show_episode(self, freq, i_no):
        ## Input:
            ## Freq : frame rate (if freq = 5 one in 5 is shown)
            ## i_no : iteration number 
        sim_data = self.sim_data[:,::freq]

        fig = plt.figure()
        ax = plt.axes(xlim=(-5, 5), ylim=(0, sim_data[:,0][3] + 0.2))
        text_str = "iter - " + str(i_no)
        line, = ax.plot([], [], lw=3)
        def init():
            line.set_data([], [])
            return line,
        def animate(i):
            x = sim_data[:,i][0]
            y = sim_data[:,i][3]
            u = sim_data[:,i][2]
            line.set_data([u,x], [0,y])
            return line,
        props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
        ax.text(0.05, 0.95, text_str, transform=ax.transAxes, fontsize=15,
        verticalalignment='top', bbox=props)
        
        anim = FuncAnimation(fig, animate, init_func=init,
                                       frames=np.shape(sim_data)[1], interval=25, blit=True)

        plt.close(fig)
        plt.close(anim._fig)
        IPython.display.display_html(IPython.core.display.HTML(anim.to_html5_video()))

    def compute_reward(self, step_time):
        ## Computes the reward after step
        r = 0
        step_data = self.sim_data[:,int(self.t - step_time*1000):int(self.t)].copy()
        step_data[0] = np.subtract(step_data[0], step_data[2])
        min_dist = step_data[0].argmin()
        r += step_data[0][min_dist]**2 ## min distance between COM and COP
        r += step_data[1][min_dist]**2 ## Min velocity when min dist is achieved

        r = np.power(5, -r)
        if self.sim_data[:,int(self.t - step_time*1000)-1][2] != self.sim_data[:,int(self.t - step_time*1000)][2]:
            r -= 1 ## penalises if step is taken
        else:
            r += 1 ## rewards if no step is taken
            
        return r


In [46]:
### This block samples and store data using epsillon greedy algorithm

def sample_data(no_episodes, epi_t, h, action_set, value_function, show_episode = False):
    # this function samples data
    env = LipmEnv(h)
    sample_data = []
    for e in range(no_episodes):
        print("running iter number - " + str(e))
        x = [0.0, 4*np.random.random() - 2]
        u0 = action_set[np.random.randint(9)]
        #espillon greedy
        if np.random.random() > 0.2:
            x_in = np.tile([u0 - x[0], x[1], 0],(len(action_set),1)) 
            x_in[:,2] = action_set
            a = np.argmax(value_function(torch.tensor((x_in), dtype=torch.float)).cpu().detach().numpy())
        else:
            a = np.random.randint(9)
        step_time = 0.15
        env.reset_env(x, u0, epi_t)

        ## sars_t = s_t, a_t, r_t, s_t+1
        sars_t = np.zeros(6)
        sars_t[0] = u0 - x[0]
        sars_t[1] = x[1]
        sars_t[2] = action_set[a]
        for t in range(0, int(epi_t*1000) - 1):
            if t % int(step_time * 1000) == 0 and t > 0:
                sars_t[3] = env.compute_reward(step_time)
                env.set_action(env.sim_data[:,env.t][0] + action_set[a]) ## setting action
                sars_t[4] = env.sim_data[:,env.t][2] - env.sim_data[:,env.t][0]
                sars_t[5] = env.sim_data[:,env.t][1]
                sample_data.append(sars_t)
                
                sars_t = np.zeros(6)
                sars_t[0] = env.sim_data[:,env.t][2] - env.sim_data[:,env.t][0]
                sars_t[1] = env.sim_data[:,env.t][1]
                # epsillon greedy
                if np.random.random() > 0.2:
                    x_in = [sars_t[0], sars_t[1], 0].copy()
                    x_in = np.tile(x_in,((len(action_set),1)))
                    x_in[:,2] = action_set
                    a = np.argmax(value_function(torch.tensor((x_in), dtype=torch.float)).cpu().detach().numpy())
                else:
                    a = np.random.randint(9)
    
                sars_t[2] = action_set[a]
        
            env.step_env()
    
        if show_episode: 
            env.show_episode(5, e)
            
    return np.asarray(sample_data)

def store_data(data_array, file_name, dir):
    batch_no = str(len(os.listdir(dir)))
    f = open(dir + file_name + "_" + batch_no + ".pkl", 'wb')
    print("dumping data ...")
    p.dump(data_array, f, -1)  
    f.close()    
    print("finished dumping...")

In [18]:
## this block is for the Q function
class ANN(nn.Module):
    
    def __init__(self, input_size, outputs):
        super(ANN, self).__init__()
        self.l1 = nn.Linear(input_size, 128)
        self.l2 = nn.Linear(128, 128)
        self.action_value = nn.Linear(128, outputs)
        
    def forward(self, x):
        x = F.relu(self.l1(x))
        x = F.relu(self.l2(x))
        return self.action_value(x)
        
        

In [47]:
## This block shows how data sampling is done
device = torch.device("cpu")
dq_sampler = ANN(3, 1).to(device) 
## input to the ANN is u - x (u is cop, x is com location), xd, a_set(possible set of actions)
action_set = np.linspace(-0.2, 0.2, 9)
sample = sample_data(1, 1.5, 0.2, action_set, dq_sampler, True)
sample[0:20]
## The simulation below shows stepping sequences using an epsilon greedy policy.

running iter number - 0


array([[ 0.15      ,  1.52710762,  0.        , -0.97739342,  0.        ,
         1.12791586],
       [ 0.        ,  1.12791586, -0.15      , -0.87094512, -0.15      ,
         1.80308171],
       [-0.15      ,  1.80308171,  0.05      , -0.9948495 ,  0.05      ,
         4.19571131],
       [ 0.05      ,  4.19571131,  0.2       , -1.        ,  0.2       ,
         6.26947635],
       [ 0.2       ,  6.26947635,  0.2       , -1.        ,  0.2       ,
         8.27127652],
       [ 0.2       ,  8.27127652,  0.2       , -1.        ,  0.2       ,
        11.47134617],
       [ 0.2       , 11.47134617,  0.2       , -1.        ,  0.2       ,
        16.58696452],
       [ 0.2       , 16.58696452,  0.2       , -1.        ,  0.2       ,
        24.76477127],
       [ 0.2       , 24.76477127,  0.2       , -1.        ,  0.2       ,
        37.83777997]])

In [13]:
#This block contains the deep Q stepper
class DeepQStepper:
    
    def __init__(self, no_inputs, no_outputs, action_set, h):
        '''
        Input:
            no_inputs : size of the feature vector into the ANN
            no_outputs: Size of the output array from the ANN
            action_set: The list of all possible actions
            h : height of lipm above ground
        '''
        self.device = torch.device("cpu")
        ## input to the ANN is u - x (u is cop, x is com location), xd, a_set(possible set of actions)
        self.dq_stepper = ANN(no_inputs, no_outputs).to(self.device) 
        ## creating a target network to stabilize training
        self.dq_stepper_tar = ANN(no_inputs, no_outputs).to(self.device) 
        self.dq_stepper_tar.load_state_dict(self.dq_stepper.state_dict())
        self.dq_stepper_tar.eval() ## check what this does
        self.action_set = action_set
        self.h = h
        
    def sample_data(self, no_episodes, epi_t, h, action_set, q_function, show_episode = False):
        '''
        This method samples data and returns the data in the SARS form [state, action, reward, state_t+1].
        Input:
            no_episodes : number of episodes of sample data
            epi_t : duration of each episode
            h : height of LIPM from the ground
            action_set : the array of possible actions
            q_function : the ANN that predicts value function given current state, action. Q(s,a)
            show_episode : shows a simulation of the episode.
        '''
        # this function samples data
        env = LipmEnv(h)
        sample_data = []
        for e in range(no_episodes):
            print("running iter number - " + str(e))
            x = [0.0, 4*np.random.random() - 2]
            u0 = action_set[np.random.randint(9)]
            #espillon greedy
            if np.random.random() > 0.2:
                x_in = np.tile([u0 - x[0], x[1], 0],(len(action_set),1)) 
                x_in[:,2] = action_set
                a = np.argmax(q_function(torch.tensor((x_in), dtype=torch.float)).cpu().detach().numpy())
            else:
                a = np.random.randint(9)
            step_time = 0.15
            env.reset_env(x, u0, epi_t)

            ## sars_t = s_t, a_t, r_t, s_t+1
            sars_t = np.zeros(6)
            sars_t[0] = u0 - x[0]
            sars_t[1] = x[1]
            sars_t[2] = action_set[a]
            for t in range(0, int(epi_t*1000) - 1):
                if t % int(step_time * 1000) == 0 and t > 0:
                    sars_t[3] = env.compute_reward(step_time)
                    env.set_action(env.sim_data[:,env.t][0] + action_set[a]) ## setting action
                    sars_t[4] = env.sim_data[:,env.t][2] - env.sim_data[:,env.t][0]
                    sars_t[5] = env.sim_data[:,env.t][1]
                    sample_data.append(sars_t)

                    sars_t = np.zeros(6)
                    sars_t[0] = env.sim_data[:,env.t][2] - env.sim_data[:,env.t][0]
                    sars_t[1] = env.sim_data[:,env.t][1]
                    # epsillon greedy
                    if np.random.random() > 0.2:
                        x_in = [sars_t[0], sars_t[1], 0].copy()
                        x_in = np.tile(x_in,((len(action_set),1)))
                        x_in[:,2] = action_set
                        a = np.argmax(q_function(torch.tensor((x_in), dtype=torch.float)).cpu().detach().numpy())
                    else:
                        a = np.random.randint(9)

                    sars_t[2] = action_set[a]

                env.step_env()

            if show_episode: 
                env.show_episode(5, e)

        return np.asarray(sample_data)
    
    def compute_max_Q(self, x, action_set, q_function):
        '''
        This function returns the max Q value for the given state for the set of possible actions.
        It also returns the action that has maximum value
        Input:
            x : state to be evaluated
            action_set : the array of possible actions
            q_function: model to compute Q value
        '''
            x_in = [x[0], x[1], 0]
            x_in = np.tile(x_in, ((len(action_set),1)))
            x_in[:,2] = action_set
            state_values = q_function(torch.tensor((x_in), dtype=torch.float)).cpu().detach().numpy()
            a_opt = np.argmax(state_values)
            q_opt = np.max(state_values)
            
            return q_opt, a_opt
            
    def train_model(self, no_episodes, epi_t, no_iter):
        '''
        This method trains the deepQstepper model.
        Input:
            no_episodes : number of episodes to sample per round
            epi_t : time of each episode
            no_iter: number of iteration of training
        '''
                
        for i in range(no_iter):
            sample_data = self.sample_data(no_episodes, epi_t, self.h, self.action_set, self.dq_stepper)
            X_train = sample_data[:,[0:3]]
            Y_train = sample_data[:,3]
            
            for j in range(len(X_train)):
                Y_train[j] += self.compute_max_Q(X_train[j], self.action_set, self.dq_stepper_tar)
        
        ## optimizing model
        
        ## updating target function
        
        return self.dq_stepper
                
        
        
    
action_set = np.linspace(-0.2, 0.2, 9)
sample = sample_data(1, 1.5, 0.2, action_set, dq_stepper, False)





running iter number - 0
