In [1]:
from CartAndPole.QLearningTabular.Agent import Agent
from CartAndPole.QLearningTabular.LinearFunctionApprxPolicy import LinearFunctionApprxPolicy as Policy
from Core.Utility.EpsilonGreedyStrategy import EpsilonGreedyStrategy
from CartAndPole.QLearningTabular.EnvironmentManager import EnvironmentManager
from IPython.display import clear_output, display
from itertools import count
from Core.Utility.Helper import Helper
from Core.Utility.DecayingLearningRate import DecayingLearningRate
from Core.Utility.UCB import UCB 
from Core.Utility.ReplayMemory import ReplayMemory
import numpy as np
from collections import namedtuple


In [None]:


Experience = namedtuple(
    'Experience',
    ('currentState', 'action', 'nextState', 'reward')
)

def extract_experience(experiences):
    batch = Experience(*zip(*experiences))
    currentStates = np.vstack(batch.currentState)
    action = np.array(batch.action)
    nextState = np.vstack(batch.nextState)
    reward = np.array(batch.reward)
    
    return currentState, action, nextState, reward


epsilon_start = 1
epsilon_end = 0.01
epsilon_decay = 0.001

initial_lr = 0.01
decay_rate = 0.000002
min_lr = 0.001

capacity = 5000
batchSize = 256


agent = Agent()
stateSpace = agent.initializeState()
epsilonGreedy = EpsilonGreedyStrategy(epsilon_start, epsilon_end, epsilon_decay)
decayingLearningRate = DecayingLearningRate(initial_lr, decay_rate, min_lr)
environment = EnvironmentManager('CartPole-v1')
policy = Policy() 
policy.initializePolicy(stateSpace, environment.actionSpace.n)
# ucb = UCB(environment.actionSpace.n, len(stateSpace), .1)
replayMemory = ReplayMemory(capacity)


numberOfEpisodes = 100000
score = [0]
scoreCounter = 0
stateDict = dict(zip(stateSpace, range(len(stateSpace))))
for episode in range(numberOfEpisodes):
    stateHasChanged = True
        
    for step in count():
        
        currentState = agent.getState(environment.observation)
        action = epsilonGreedy.chooseAction(currentState, policy, episode)
        reward, terminated = environment.step(action)
        nextState = agent.getState(environment.observation)
        trainingReward = -10 if terminated else reward
        replayMemory.push(Experience(
            currentState, action, nextState, reward
        ))
        
        if (replayMemory.canProvideSample(batchSize)):
            experienceSample = replayMemory.sample(batchSize)
            experiences = extract_experience(experienceSample)
            policy.train(*experiences, step)
            
            
            
        
        
        
        # selectionPolicy.train(currentState, nextState, reward, action, updatePolicy)

         
        
        
      
        
        scoreCounter += reward
        
        
        
        if terminated:
            
            score.append(scoreCounter)
            scoreCounter = 0
            environment.reset()
            
            if(episode % 200 == 0):
                clear_output(wait=True)
                display(f"episode {episode}")
                display(int(Helper.getMovingAverage3(300, score)))
                # utility.plot(300 ,score)
            
            break
        
       
            
            
environment.close()