# Simple Continuous 1D Reinforcement Learning

In [1]:
import torch
from torch.autograd import Variable
import numpy as np
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
%matplotlib inline

In [2]:
def epsilonGreedyDiscrete(epsilon, Q, state):
    '''
    epsilon is probability of taking a random move
    '''
    vM = validMoves(state)
    if epsilon > random.uniform(0,1):
        # take random action
        move = random.choice(vM)
#         print('took random move {}'.format(move))
    else:
        # take greedy action
        Qs = np.array([Q.get(smt(state,a),0) for a in vM])
        move = vM[np.argmax(Qs)]
#         print('took GREEDY move {}'.format(move))
    return move

In [3]:
class Agent(ABC):
    '''Abstract class representing a reinforcement learning agent'''
    def __init__(self):        
        self.Q = lambda x: 0
        
    @abstractmethod
    def train(X, Qt, nIterations):
        '''
        Train the agent based on the data provided
        Input consists of [s,a,r] and output data consists of [q]
        '''
        pass
    
    @abstractmethod
    def Q(state, action):
        pass
    
    @abstractmethod
    def act(state):
        pass
    
    @abstractmethod
    def randomAction(state):
        pass
    
    @abstractmethod
    def greedyAction(state):
        pass


In [4]:
class Environment(ABC):
    '''Abstract class representing a reinforcement learning environment'''
    def __init__(self, initialState):
        pass
    
    @abstractmethod
    def reinforce(state):
        '''
        Reinforcement function for any valid state
        Should be deterministic and not stateful
        '''
        return 0
    
    @abstractmethod
    def validActions(state):
        '''
        Returns iterable of valid discrete from any valid state
        Should be deterministic and not stateful
        '''
        return []
    
    @abstractmethod
    def step(self, state, action):
        '''
        Advance the state of the Environment by taking an action
        Should be deterministic and **NOT** stateful
        step does NOT update the state of the Environment
        '''
        return self.state
    
    def run(agent, nEpochs, nSteps):
        '''
        Let the agent loose in the environment for nEpochs epochs of size nSteps steps
        Should use a subclass of Agent designed for the same problem
            as the subclass for Environment
        Will generate (nEpochs * nSteps) samples
        '''
        inputs = []
        qs = []
        for i in range(nEpochs):
            for j in range(nSteps):
                action = agent.act(self.state)
                newState = step(action)
                inputs.append([state,action])
                # temporal difference error
                qs.append(agent.Q(state,action)
                               + agent.learningRate*(reinforce(newState) - agent.Q(state,action)))
        return inputs,qs
        

In [5]:
def standardize(x):
    return (x - np.mean(x,axis=1)) / np.std(x, axis=1)

In [6]:
class DiscreteQNetAgent(Agent):
    '''
    Concrete subclass of Agent
    RL Agent with discrete action space
    Q function is a PyTorch Neural Network
    '''
    def __init__(self, nInputs, nHidden, nOutputs, QnetClass, validActionsF, reinforcementF, useCuda=True):
        self.Qnet = QNetClass(nInputs, nHidden, nOutputs)
        self.useCuda = useCuda
        if torch.cuda.is_available() and useCuda:
            self.Qnet = self.Qnet.cuda()
        self.criterion = torch.nn.MSELoss(size_average=False)
        self.optimizer = torch.optim.SGD(self.Qnet.parameters(), lr=1e-4)
        self.scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 30)
        self.lossTrace = []
        self.validActions = validActionsF
        self.reinforce = reinforcementF
    
    def Q(state, action):
        return Qnet(torch.cat(state,action))
    
    def train(X, Qt, nIterations):
        '''
        Trains the agent's Q function on the provided epochs of data. 
        X = input samples (one s/a/r sample per row)
        Qt = output samples (one q value per row)
        X and Qt should have the same number of rows (X.shape[1] == Qt.shape[1])
        '''
        # standardize x values
        X = standardize(X) 

        # X and Qt are likely not Tensors yet
        x = Variable(torch.FloatTensor(X))
        y = Variable(torch.FloatTensor(Qt), requiresGrad=False)
        
        if self.useCuda:
            x = x.type(torch.cuda.FloatTensor)
            y = x.type(torch.cuda.FloatTensor)
        
        lossTrace = []
        for t in range(nIterations):
            # Forward pass: Compute predicted y by passing x to the model
            y_pred = self.Qnet(x)

            # Compute loss
            loss = self.criterion(y_pred, y)
            lossTrace.append(loss.data[0])

            # Zero gradients, perform a backward pass, and update the weights.
            self.optimizer.zero_grad()
            loss.backward()
            # optimizer.step()
            self.scheduler.step()
        self.lossTrace.append(lossTrace)
    
    def randomAction(state):
        import random
        return random.choice(validActions(state))
    
    def greedyAction(state):
        qValues = np.array([Q(state,action) for action in validActions(state)])
        return np.argmax(qValues)

In [7]:
class Reltan(torch.nn.Module):
    
    def __init__(self,n,h,o):
        super(Model, self).__init__()
        self.mods = torch.nn.ModuleList([])
        self.mods.append(torch.nn.Linear(n, h))
        self.mods.append(torch.nn.ReLU())
        self.mods.append(torch.nn.Tanh())
        self.mods.append(torch.nn.ReLU())
        self.mods.append(torch.nn.Tanh())
        self.mods.append(torch.nn.ReLU())
        self.mods.append(torch.nn.Linear(h,o))
        
    def forward(self, x):
        #return self.fc(x) # it was just x there
        for module in self.mods:
            x = module(x)
        return x

In [10]:
class OneDimEnv(Environment):
    '''Abstract class representing a reinforcement learning environment'''
    def __init__(self, initialState):
        pass
    
    def reinforce(state):
        '''
        Reinforcement function for any valid state
        Should be deterministic and not stateful
        '''
        return 0
    
    def validActions(state):
        '''
        Returns iterable of valid discrete from any valid state
        Should be deterministic and not stateful
        '''
        return []
    
    def step(self, state, action):
        '''
        Advance the state of the Environment by taking an action
        Should be deterministic and **NOT** stateful
        step does NOT update the state of the Environment
        '''
        return self.state

```

def trainQ(epoch, learningRate, epsilonDecayFactor, validMovesF, makeMoveF):
    startState = [[1, 2, 3], [], []]
    epsilon = 1
    steps = 0
    reinf = -1
    stepsList = []
    Q = {}
    s = startState
    a = epsilonGreedy(epsilon,Q,s)
    
    
    for rep in range(nRepetitions):
        epsilon = epsilon * epsilonDecayFactor
        while not isGoal(s,2):
            if steps > 0:
                Q[smt(sold,aold)] = Q.get(smt(sold,aold),0) + learningRate * (reinf- Q.get(smt(sold,aold),0) + Q.get(smt(s,a),0))
            sold,aold = s,a
            s = makeMove(sold,aold)
            a = epsilonGreedy(epsilon, Q, s)
            steps+=1
        Q[smt(s,a)] = 0
        if steps > 0:
            Q[smt(sold,aold)] = reinf
        s = startState
        a = epsilonGreedy(epsilon, Q, s)
        stepsList.append(steps)
        steps = 0
    return Q,stepsList
 ```