In [None]:
import gym
import time
import matplotlib.pyplot as plt
import numpy as np
import math
from quanser_robots import GentlyTerminating
from quanser_robots.qube import SwingUpCtrl

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import torch.utils.data as data

#env = GentlyTerminating(gym.make('Qube-v0'))
env = gym.make('Qube-v0')

In [None]:
"""
The minimal program that shows the basic control loop on the simulated swing-up.
"""

import gym
from quanser_robots import GentlyTerminating
from quanser_robots.qube import SwingUpCtrl

env = GentlyTerminating(gym.make('Qube-v0'))

ctrl = SwingUpCtrl(ref_energy=0.035, energy_gain=70.0, acc_max=7.0)
obs = env.reset()
done = False
while True:
    env.render()
    act = ctrl(obs)
    obs, _, done, _ = env.step(act)

env.close()

KeyboardInterrupt: 

In [7]:
env.reward_range

(0.0, 0.02)

In [None]:
"""
Dependencies:
torch: 0.4
gym: 0.8.1
numpy
"""
# Hyper Parameters
BATCH_SIZE = 32
LR = 0.01                   # learning rate
EPSILON = 0.9               # greedy policy
GAMMA = 0.9                 # reward discount
TARGET_REPLACE_ITER = 100   # target update frequency
MEMORY_CAPACITY = 5000
#env = gym.make('Pendulum-v0')
#env = env.unwrapped
N_ACTIONS = 25
N_STATES = env.observation_space.shape[0]
ENV_A_SHAPE = 0 if isinstance(env.action_space.sample(), int) else env.action_space.sample().shape     # to confirm the shape


class Net(nn.Module):
    def __init__(self, ):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(N_STATES, 100)
        self.fc1.weight.data.normal_(0, 0.1)   # initialization
        self.out = nn.Linear(100, N_ACTIONS)
        self.out.weight.data.normal_(0, 0.1)   # initialization

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        actions_value = self.out(x)
        return actions_value


class DQN(object):
    def __init__(self):
        self.eval_net, self.target_net = Net(), Net()

        self.learn_step_counter = 0                                     # for target updating
        self.memory_counter = 0                                         # for storing memory
        self.memory = np.zeros((MEMORY_CAPACITY, N_STATES * 2 + 2))     # initialize memory
        self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)
        self.loss_func = nn.MSELoss()

    def choose_action(self, x):
        x = torch.unsqueeze(torch.FloatTensor(x), 0)
        # input only one sample
        if np.random.uniform() < EPSILON:   # greedy
            actions_value = self.eval_net.forward(x)
            action = torch.max(actions_value, 1)[1].data.numpy()
            action = action[0] #if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE)  # return the argmax index
        else:   # random
            action = np.random.randint(0, N_ACTIONS)
            action = action #if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE)
        return action

    def store_transition(self, s, a, r, s_):
        transition = np.hstack((s, [a, r], s_))
        # replace the old memory with new memory
        index = self.memory_counter % MEMORY_CAPACITY
        self.memory[index, :] = transition
        self.memory_counter += 1

    def learn(self):
        # target parameter update
        if self.learn_step_counter % TARGET_REPLACE_ITER == 0:
            self.target_net.load_state_dict(self.eval_net.state_dict())
        self.learn_step_counter += 1

        # sample batch transitions
        sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)
        b_memory = self.memory[sample_index, :]
        b_s = torch.FloatTensor(b_memory[:, :N_STATES])
        b_a = torch.LongTensor(b_memory[:, N_STATES:N_STATES+1].astype(int))
        b_r = torch.FloatTensor(b_memory[:, N_STATES+1:N_STATES+2])
        b_s_ = torch.FloatTensor(b_memory[:, -N_STATES:])

        # q_eval w.r.t the action in experience
        q_eval = self.eval_net(b_s).gather(1, b_a)  # shape (batch, 1)
        q_next = self.target_net(b_s_).detach()     # detach from graph, don't backpropagate
        q_target = b_r + GAMMA * q_next.max(1)[0].view(BATCH_SIZE, 1)   # shape (batch, 1)
        loss = self.loss_func(q_eval, q_target)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()



In [None]:
s = env.reset()  

In [None]:
s_, r, done, info = env.step(np.array([f_action]))
env.render()
s_

In [None]:
f_action = 5

In [None]:
dqn = DQN()

print('\nCollecting experience...')
for i_episode in range(400):
    s = env.reset()  
    ep_r = 0  
    count = 0
    s[4]/=30
    s[5]/=40
    while True:
        env.render()
        
        a = dqn.choose_action(s)
        f_action = 5*(a-(N_ACTIONS-1)/2)/((N_ACTIONS-1)/2)
        if count%20 ==0:
            print(a,"     ",f_action)
        count = count+1
        # take action
        s_, r, done, info = env.step(np.array([f_action]))
        s_[4]/=30
        s_[5]/=40
        # modify the reward
     #   x, x_dot, theta, theta_dot = s_
     #   r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
     #   r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
     #   r = r1 + r2

        dqn.store_transition(s, a, r, s_)

        ep_r += r
        if dqn.memory_counter > MEMORY_CAPACITY:
            dqn.learn()
            if done:
                print('Ep: ', i_episode,
                      '| Ep_r: ', round(ep_r, 2))

        if done:
            break
        s = s_

In [None]:
class mlp(nn.Module):
    def __init__(self, input_size=4, output_size=3, h=100, num_units=2, device = torch.device('cuda')):
        super(mlp,self).__init__()
        self.input_size = input_size
        self.device = device
        self.fc_list= nn.ModuleList()
        self.fc_in = nn.Linear(input_size, h)
        self.act = nn.Tanh()
        for _ in range(num_units):
            self.fc_list.append(nn.Linear(h,h))
        self.fc_out = nn.Linear(h,output_size)
        # Initialize weight
        nn.init.uniform_(self.fc_in.weight,-1,1)
        nn.init.uniform_(self.fc_out.weight,-1,1)
        self.fc_list.apply(self.init_normal) 
        self.mean_data = 0
        self.std_data = 1
        self.mean_label = 0
        self.std_label = 1
    def forward(self,x):
        out = x.view(-1,self.input_size)
        out = self.fc_in(out)
        out = self.act(out)
        out= F.dropout(out, p=0.05, training= self.training)
        for _, layer in enumerate(self.fc_list, start=0):
            out = layer(out)
            out = self.act(out)
            # out= F.dropout(out, p=0.05, training= self.training)
        out = self.fc_out(out)
        return out   
    def init_normal(self, m):
        if type(m) == nn.Linear:
            nn.init.uniform_(m.weight,-1,1)
    # input a numpy array and return a numpy array
    def predict(self, x):
        x = self.pre_process(x)
        x_tensor = torch.tensor(x).to(self.device).float()
        out_tensor = self.forward(x_tensor)
        out = out_tensor.cpu().detach().numpy()
        out = self.after_process(out)
        return out
    def pre_process(self,x):
        x = (x-self.mean_data)/self.std_data
        return x
    def after_process(self,x):
        x = x * self.std_label + self.mean_label
        return x
    def normlize_datasets(self,datas,labels):
        self.mean_data = np.mean(datas,axis=0)
        self.mean_label = np.mean(labels,axis=0)
        self.std_data = np.std(datas,axis=0)
        self.std_label = np.std(labels,axis=0)
        datas = (datas-self.mean_data)/self.std_data
        labels = (labels-self.mean_label)/self.std_label
        return datas, labels

