In [56]:
import gym
import numpy as np
import keras
import random
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D
import cv2

random.seed(7)

In [52]:
import gym
env = gym.make('Breakout-v0')
env.reset()


def preprocess(observation):
    observation = cv2.cvtColor(cv2.resize(observation, (84, 84)), cv2.COLOR_BGR2GRAY)
    ret, observation = cv2.threshold(observation,1,255,cv2.THRESH_BINARY)
    return np.reshape(observation,(84,84,1))




Deep ConvNet class

In [53]:
class DQN():
    def __init__(self, inputWidth=84, inputHeight=84, zSize=4):
        self.model = Sequential()
        model = self.model
        data_format=None
        model.add(Conv2D(32, (8, 8), strides=(4, 4), activation='relu', 
                        input_shape=(inputWidth, inputHeight, 4)))
        model.add(Conv2D(64, (4, 4), strides=(2, 2), activation='relu'))
        model.add(Conv2D(64, (3, 3), activation='relu'))
        model.add(Flatten())
        model.add(Dense(512, activation='relu'))
        model.add(Dense(4, activation='softmax'))
        
        model.compile(loss=keras.losses.categorical_crossentropy,
              optimizer=keras.optimizers.Adam(),
              metrics=['accuracy'])
    #passed X and Y buffer to training
    def train(self, x_train, y_train):
        model.fit(x_train, y_train,
          epochs=1,
          batch_size=32, shuffle=True)
        
    def getModel(self):
        return self.model


In [54]:
class replay_Memory():
    def __init__(self, maxSize=2000):
        self.maxSize = maxSize
        self.memory = np.array([])
        
    def addUnit(self, unit):
        if len(self.memory) + 1 > self.maxSize: 
            newList = np.random.choice(self.memory, len(self.memory)-1)
            self.memory = np.append(newList, unit)
        else:
            self.memory = np.append(self.memory, unit)
            
    def getMemory(self):
        return self.memory
    
    
    def getBatch(self, batch_length):
        memlen = len(self.memory)
        if batch_length > memlen:
            return np.random.choice(self.memory, memlen)
        else:
            return np.random.choice(self.memory, length)
            
        

<img src="files/pseudocode.png">


In [100]:
#class that is representing single transition in replay memory
class Transition():
    
    def __init__(self, observation0, action, reward, observation1, isTerminal):
        self.isTerminal = isTerminal
        self.obs0 = observation0
        self.action = action
        self.reward = reward
        self.obs1 = observation1
    
    def getObs(self, index):
        if index == 0:
            return self.obs0
        elif index == 1:
            return self.obs1
        else:
            raise Excetion('Error', 'Invalid index of observation given in transition getter')

    def isTerminal():
        return self.isTerminal
    
    def getReward(self):
        return self.reward
    
    def getAction(self):
        return self.action
    
        

def get_actions(st, model):
    if len(st) != 4:
        raise Exception('Error', 'stateProcessingError!')
    st = np.dstack((st[0], st[1], st[2], st[3]))
    return model.predict(np.array([st]))

    

        
    
memory = replay_Memory()

#initialize dqn
q = DQN()

#initialize target dqn
q_target_model = keras.models.clone_model(q.getModel())
q_target_model.set_weights(q.getModel().get_weights())


#what percentage of actions are random (improves exploartion rate of the algorithm)


#constants
EPSILON_EXPLORATION = 0.01
TARGET_UPDATE_TICK = 0
TARGET_UDPATE_STEP = 4
DISCOUNT_CONSTANT = 0.99
REPEAT_ACTION_N = 4


for episode in range (1, 10):
    
    state_t = env.reset()
    totalReward = 0
    tick = 0
    
    done = False
    lastAction = 0
    
    recentStates = deque([], maxlen=4)
    
    while not done:
        #repeat 3 same actions on same frame
        if tick < REPEAT_ACTION_N:
            observation, reward, done, info = env.step(lastAction)
            recentStates.append(preprocess(observation))
            tick += 1
        else:
            randval = random.uniform(0, 1)

            #perform random action
            if randval <= EPSILON_EXPLORATION:
                lastAction = env.action_space.sample()
                observation, reward, done, info = env.step(lastAction)   
                recentStates.append(preprocess(observation))
            #perform DQN action
            else:
                
                #get optimal action
                optimal_action = np.argmax(get_actions(recentStates, q.getModel())[0])
                print(get_actions(recentStates, q.getModel()))
                #execute optimal action
                observation, reward, done, info = env.step(optimal_action)
                
                obs_t0 = recentStates.copy()
                recentStates.append(preprocess(observation))
                obs_t1 = recentStates.copy()
                
                #save transition in replay memory
                transition = Transition(obs_t0, optimal_action, reward, obs_t1, isTerminal=done)
                
                memory.addUnit(transition)
                
                #batch = [{ob1, an, r, ob2}...]
                batch = memory.getBatch(32)
                
                #get X for training
                X = []
                for sample in batch:
                    X.append(sample.getObs(0))
                X = np.array(X)
                print(X.shape)
                print()

                
                #get Y for training
                Y = []
                #get the correct Y values
                for sample in batch:
                    
                    action = sample.getAction()
                    prediction = get_actions(sample.getObs(0), q.getModel())
                    
                    # set target as r
                    if sample.isTerminal:
                        prediction[action] = sample.getReward()
                        
                    #set target as rt + discount_factor*max(Q(st + 1, a_))    
                    else:
                        max_actionVal = np.max( get_actions(sample.getObs(1), q_target_model) )
                        q_value = sample.getReward() + DISCOUNT_CONSTANT * max_actionVal
                        
                        print('prediction', prediction)
                        print('q_value')
                        prediction[0][action] = q_value
                        
                    Y.append(prediction)
                Y = np.array(Y)
                    
                #perform gradient step update
                q.getModel().fit(x = np.swapaxes(np.squeeze(X, axis=4),1, 3), y = Y, batch_size=len(batch))
                
                #every TARGET_UDPATE_STEP step copy weights to target network
                if TARGET_UPDATE_TICK >= TARGET_UDPATE_STEP:
                    TARGET_UPDATE_TICK = 0
                    q_target_model.set_weights(q.getModel().get_weights())
                    
                TARGET_UPDATE_TICK += 1
            tick = 1
        env.render()
                
    env.close()


<keras.models.Sequential object at 0x1380340b8>
<keras.models.Sequential object at 0x1380340b8>
[[  4.47580295e-19   1.00000000e+00   6.11539651e-31   4.70457007e-15]]
1
(1, 4, 84, 84, 1)

prediction [[  4.47580295e-19   1.00000000e+00   6.11539651e-31   4.70457007e-15]]
q_value


ValueError: Error when checking target: expected dense_82 to have 2 dimensions, but got array with shape (1, 1, 4)