In [16]:
import numpy as np
bound =  [0, 10, -5, 5]
abound=[0, 10, -5, 5, 0, 10, -5, 5]
_st_range = np.array([bound[:2], bound[2:], [-1, 1]]).T
strange = np.array([abound[:2],abound[2:4],abound[4:6],abound[6:],[-1,0],[1, 0],[0, -1],[0, 1]]).T
print(strange)
print(strange[0])
print(_st_range)
print(_st_range[0, 1])


[[ 0 -5  0 -5 -1  1  0  0]
 [10  5 10  5  0  0 -1  1]]
[ 0 -5  0 -5 -1  1  0  0]
[[ 0 -5 -1]
 [10  5  1]]
-5


In [None]:
class RLagent(Marble):
    def __init__(self,env,r=0.01,nInputs = 3,nOutputs = 1,nSamples = 1,nHiddens = 5):
        self.env=env
        self.rhoh = self.rhoo = r
        self.rh = self.rhoh / (nSamples*nOutputs)
        self.ro = self.rhoo / (nSamples*nOutputs)
        self.V = 0.1*2*(np.random.uniform(size=(nInputs+1,nHiddens))-0.5)
        self.W = 0.1*2*(np.random.uniform(size=(1+nHiddens,nOutputs))-0.5)
        self.fig = plt.figure(figsize=(8, 8))
        self.stdX = Standardizer(self.env.get_state_range())
    def addOnes(self,A):
        return np.insert(A, 0, 1, axis=len(np.array(A).shape)-1)

    def forward(self,X):
        #X = self.stdX.standardize(X)
        # Forward pass on training data
        X1 = self.addOnes(X)
        Z = np.tanh(X1 @ self.V)
        Z1 = self.addOnes(Z)
        Y = Z1 @ self.W
        return Y, Z

    def as_array(self,A):
        A = np.array(A)
        if len(A.shape) == 1:
            return A.reshape((1, -1))
        return A

    def backward(self,error, Z, X):
    
        ### make sure the array shapes
        X = self.as_array(X)
        Z = self.as_array(Z)
        E = self.as_array(error)
    
        Z1 = self.addOnes(Z)
        X1 = self.addOnes(X)

        # Backward pass - the backpropagation and weight update steps
        dV = self.rh * X1.T @ ( ( E @ self.W[1:,:].T) * (1-Z**2))
        dW = self.ro * Z1.T @ E
        return dV, dW


    def epsilon_greedy(self,e, s,n_actions):
        if np.random.rand() < e:
            return np.random.randint(n_actions) - 1
        else:
            Q, _ = self.forward(np.hstack((np.tile(s, (3,1)), env.get_actions().reshape((-1, 1)))))
            max_as = np.where(Q == np.max(Q))[0] - 1 # index to action value
            return np.random.choice(max_as)
    def use(self,k=300,n=3,steps=500,g=.9,verb=False,epsilon=1,final=0.1):
        K = k 
        n_actions = n 
        max_steps = steps
        gamma = g
        verbose = verb
        epsilon = epsilon
        final_epsilon = final
        epsilon_decay =  np.exp(np.log(final_epsilon) / K)
        rtrace = []
        etrace = [epsilon]
        for j in range(K):

            if verbose: print("\tepisode #", j, "   ",end="")
            env.init([3,0]) #[float(np.random.randint(-5, 5, 1)), 0])
            s = env.get_cur_state()
            # selection an action
            a = self.epsilon_greedy(epsilon, s,n_actions)

            rewards = []
            trace = np.array(s)
            for step in range(max_steps):
                if verbose: print("\tstep #", step, "   ",end="")
                # move
                r1 = env.next(a)
                s1 = env.get_cur_state()
                a1 = self.epsilon_greedy(epsilon, s1,n_actions)

                rewards.append(r1)
                trace = np.vstack((trace, s1))
                # update neural networks
                Q1, _ = self.forward(np.hstack((s1, a1)))  # output of neural network is Q for next state
                Q, Z = self.forward(np.hstack((s, a)))  # output of neural network is Q for next state
                error = r1 + gamma * Q1 - Q  # use action value as index by adding one
                dV, dW = self.backward(error, Z, np.hstack((s, a)))
                self.V += dV
                self.W += dW

                s = s1
                a = a1
        
            epsilon *= epsilon_decay
            etrace.append(epsilon)


            if verbose: print("Done (", np.sum(rewards), ")", step)

            rtrace.append(np.sum(rewards))

            last_plot = (j == K-1)
    
            if j % 10 == 0 or last_plot:
                plt.clf()
                self.fig.add_subplot(221)
                plt.plot(rtrace, "b-")
                plt.ylabel("sum of rewards")

                self.fig.add_subplot(222)
                plt.plot(etrace, "-")
                plt.ylabel("p(random action), $\epsilon$")

                # contour plot for Q
                self.fig.add_subplot(223)
                xs, ys = np.meshgrid(np.linspace(0, 10, 100), np.linspace(-5, 5, 100))
                X = np.vstack((xs.flat, ys.flat)).T

                Q = np.array([self.forward(np.hstack((x, a)))[0] for a in [-1,0,1] for x in X])
                maxQ = np.max(Q.reshape((3, -1)), axis=0)
                cs = plt.contourf(xs, ys, maxQ.reshape(xs.shape))
                plt.colorbar(cs)
                plt.text(env.Goal, 0, 'G')
                plt.ylabel("max Q")

                # plot traces
                self.fig.add_subplot(224)
                #print(trace)
                plt.plot(trace[:, 0], trace[:, 1], "k-")
                plt.fill_between([env.Goal-1, env.Goal+1], [-5, -5],[5, 5], color='red', alpha=0.3)
                plt.title("trace of last episode")
                plt.xlim([0, 10])
                plt.ylim([-5, 5])

                plt.suptitle(''.join(["Episode ",str(j)]))
                plt.tight_layout(rect=[0, 0.03, 1, 0.95])
                plt.draw()

                ipd.clear_output(wait=True)
                ipd.display(self.fig)
        ipd.clear_output(wait=True)

In [None]:
class Marble():
    """ 1d marble problem
        
        states: x, dx
        action: action [-1,1]


        |            ___                     |
        |___________|///|____G_______________|
                    <- ->
    """

    def __init__(self,goal=5, **params):
        # len(state) + action (1)
        self.n_state = 2
        self.n_action = 1
        self.Goal = goal
        self.bound = params.pop('bound', [0, 10, -5, 5])
        if len(self.bound) != 4:
             self.bound = self.bound[:2] + [-5, 5]

        self._st_range = np.array([self.bound[:2], self.bound[2:], [-1, 1]]).T
        self.nnNI = self.n_state + 1
        self.goal_width = 1
        
        self._s = [0, 0]

    def init(self, start=None):
        if start is not None:
            self._s = start
        else: 
            self._s = [np.random.randint(self.bound[0], self.bound[1]), 0.]
        return self._s
       
    def get_random_action(self):
        return float(np.random.randint(3) -1) # discrete action

    def get_bound_act(self, a):
        if a[0] > 1:
            return 1
        elif a[0] < -1:
            return -1
        else:
            return a[0]

    def next(self, a) :
        s = self._s
        if isinstance(a, collections.Iterable):
            a = a[0]
        s1 = copy(s)
        dT = 0.1
        s1[0] += dT * s[1]  
        s1[1] += dT * ( 2*a - 0.2 * s[1] )

        # adjust velocity when outside of the track
        if s1[0] < self.bound[0]:
            s1[:]  = [self.bound[0], 0]
        elif s1[0] > self.bound[1] :
            s1[:] = [self.bound[1], 0]
        # clipping the velocity
        s1[1] = np.clip(s1[1], self._st_range[0, 1],
                               self._st_range[1, 1])

        self._s =  s1
        return self.get_reward(s, s1, a)

    def get_cur_state(self):
        return self._s

    def get_reward(self,s,s1,a):
        return 1 if abs(s1[0] - self.Goal) < self.goal_width else 0

    def get_state_range(self):
        return self._st_range

    def get_actions(self):
        return np.array([-1., 0., 1.])

    def get_action_index(self, action):
        return np.where(np.array([-1, 0, 1]) == action)[0][0]

    def draw_trajectory(self, smplX):
        if smplX.shape[1] == 1: return
        plt.plot(smplX[:,0],smplX[:,1])
        plt.axis([self.bound[0], self.bound[1],-5,5])
        plt.plot(smplX[0,0],smplX[0,1],'go')
        plt.plot(self.Goal,0,'ro')
        # draw a goal region
        plt.fill_between([self.Goal-self.goal_width, self.Goal+self.goal_width],
                         [-5,-5], [5,5],
                         color="red", alpha=0.3)
        plt.xlabel("s") 
        plt.ylabel("s dot")