In [27]:
import random

class Game:
    # Actions
    # 0 = player bergerak ke kiri
    # 1 = player bergerak ke kanan
    num_actions = 2
    
    # State
    # 0 = player position
    # 1 = target position
    state = {
        0 : None,
        1 : None
    }
    
    MAX_MOVEMENT = 5
    current_movement = 5
    
    def __init__(self) -> None:
        player = random.randint(0, 5)
        target = player
        while target == player:
            target = player + random.randint(-4,4)
        self.state = [player, target]
    
    def print_state(self):
        print(f"Target Position : {self.state[1]}")
        print(f"Player Position : {self.state[0]}")
    
    def get_state(self):
        return [self.state[0], self.state[1]]
    
    def execute_action(self, action_num):
        rewards = None
        
        state_before = self.state
        if action_num == 0:
            self.state[0] -= 1
        else:
            self.state[0] += 1
            
        rewards = self.__calculate_rewards__(self.state, state_before)
        
        self.current_movement -= 1
        
        return self.get_state(), rewards, self.__check_game_end__()
    
    def __calculate_rewards__(self, current_state, previous_state):
        # Calculate the distance between player and target and normalize it
        delta_current = (current_state[0] - current_state[1]) ** 2
        delta_before = (previous_state[0] - previous_state[1]) ** 2
        
        if delta_current < delta_before:
            return 1 * (self.MAX_MOVEMENT - self.current_movement)
        else:
            return -1 * (self.MAX_MOVEMENT - self.current_movement)
    
    def __check_game_end__(self):
        if self.current_movement == 0:
            return 1
        
        if self.state[0] == self.state[1]:
            return 1
        else:
            return 0

In [28]:
# 1 samples must be a = [state, action, rewards, next_state, is_done]
# is_done is for determining a terminal or non-terminal state

import random
import tensorflow as tf
import numpy as np

class ReplayMemory:
    main_memory = []
    max_reply = 0
    num_batch = 0
    def __init__(self, max_replay: int, mini_batch_num: int):
        self.max_reply = max_replay
        self.num_batch = mini_batch_num

class DeepQAgent:
    replay:ReplayMemory = None
    num_actions: int = None
    eval_model = None
    target_model = None
    gamma:float = None
    epsilon:float = None
    epsilon_min: float = None
    epsilon_decay: float = None
    
    # counter for updating model weight
    learn_counter: int = 0
    update_weight_on: int = 0
    
    def __init__(self, num_actions: int, max_replay: int, mini_batch_num: int, 
                 weight_update: int, epsilon: float, epsilon_min: float, 
                 epsilon_decay:float, gamma:float):
        self.replay = ReplayMemory(max_replay, mini_batch_num)
        self.eval_model, self.target_model = self.create_model()
        self.num_actions = num_actions
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.update_weight_on = weight_update
        
    def create_model(self):
        # Create your own model and return the sequential model.
        # Need to watchout your input is need to be a state shape
        # And your output need to be your action shape
        model = tf.keras.models.Sequential([
            tf.keras.layers.Dense(256, input_dim=2, activation='relu'),
            tf.keras.layers.Dense(512, activation='relu'),
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.Dense(2, activation='linear'),
        ])
        
        model.compile(optimizer = 'adam',
                      loss = 'mean_squared_error',
                      metrics= ['mse']
                      )
        
        return model, model
        
    def store_memory(self, state, action, rewards, next_state, is_done):
        if len(self.replay.main_memory) == self.replay.max_reply:
            self.replay.main_memory.pop(0)
        self.replay.main_memory.append([state, action, rewards, 
                                        next_state, is_done])
        
    def pick_action(self, state):
        action = None
        if random.random() < self.epsilon:
            prediction = self.eval_model.predict([state])[0]
            action = np.argmax(prediction)
        else:
            action = random.randint(0, self.num_actions - 1)
            
        return action
    
    def learn(self):
        if len(self.replay.main_memory) < self.replay.num_batch:
            return
        
        samples = self.__sample_mini_batch__()
        X_samples = [x[3] for x in samples]
        prediction = self.eval_model.predict(X_samples)
        target_prediction = self.target_model.predict(X_samples)
        for i in range(len(samples)):
            if samples[i][4]: # if is_done
                # For terminal next state
                prediction[i][samples[i][1]] = samples[i][2]
            else:
                # For non-terminal next state
                target = self.gamma * target_prediction[i][samples[i][1]]
                prediction[i][samples[i][1]] = samples[i][2] + target
                
        X_train = [i[0] for i in samples]
        X_train = np.array(X_train)
        self.eval_model.fit(X_train, prediction, verbose=1, epochs=10)
        if self.learn_counter % self.update_weight_on == 0:
            self.__update_target_models__()
        
        # Post Learn
        self.learn_counter += 1
        epsilon_after_decay = self.epsilon * self.epsilon_decay
        if  epsilon_after_decay < self.epsilon_min:
            self.epsilon = self.epsilon_min
        else:
            self.epsilon = epsilon_after_decay
            
    def __sample_mini_batch__(self):
        return random.sample(self.replay.main_memory, self.replay.num_batch)

    def __update_target_models__(self):
        self.target_model.set_weights(self.eval_model.get_weights())

In [32]:
import matplotlib.pyplot as plt 
import time
%matplotlib qt

class Environment:
    game: Game
    agent: DeepQAgent
    
    def __init__(self):
        self.agent = DeepQAgent(2, 10000, 4000, 5, 1, 0.001, 0.99, 0.95)
        
    def train(self, num_episodes: int):

        # For Drawing Purposes
        x_axis = []
        y_axis = []  
        figure, ax = plt.subplots(figsize=(10, 8))
        line1, = ax.plot(x_axis, y_axis)
        WINDOW_RATIO = 0.1
        WINDOW_LIMIT = 0.85
        plt.title("Error Margin of Target and Player", fontsize=20)
        plt.xlabel("Number of Episodes")
        plt.ylabel("Error Margin")
        figure.canvas.draw()
        plt.show(block=False)
              
        for i in range(1, num_episodes+1):
            # print(f"Episodes {i}")
            self.game = Game()
            game_end = False
            while not game_end:
                state = self.game.get_state()
                action = self.agent.pick_action(state)
                next_state, rewards, game_end = self.game.execute_action(action)
                self.agent.store_memory(state, action, rewards,
                                        next_state, game_end)
            
            # For Model Learning Purposes
            if i % 200 == 0:
                self.agent.learn()
                
            # For Drawing of Error Margin of Target and Player
            x_axis.append(i)
            y_temp = self.game.get_state()
            y_axis.append(((y_temp[0] - y_temp[1])**2)**0.5)
            line1.set_xdata(x_axis)
            line1.set_ydata(self.calculate_window(WINDOW_RATIO, WINDOW_LIMIT, y_axis))
            plt.title(f"Error Margin of Target and Player\nEpsilon:{self.agent.epsilon}"
                      , fontsize=20)
            
            ax.relim() 
            ax.autoscale_view(True,True,True) 

            figure.canvas.draw()
            
            plt.pause(0.005)
            
    def calculate_window(self, win_ratio, win_limit, series):
        copy_series = series.copy()
        series_length = len(copy_series)
        window_steps = int(series_length * win_ratio)
        steps_boundary = int(series_length * win_limit)
        if series_length < 100 + window_steps:
            return copy_series
        for i in range(steps_boundary):
            copy_series[i] = np.average(copy_series[i : i+window_steps])
        
        return copy_series

In [33]:
envir = Environment()

In [34]:
envir.train(10000)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10

KeyboardInterrupt: 