In [3]:
import gym
from gym import spaces
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard
from collections import deque
import random

In [7]:
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

def train_agent(env, episodes):
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    agent = DQNAgent(state_size, action_size)
    batch_size = 32
    for e in range(episodes):
        state = env.reset()
        state = np.reshape(state, [1, state_size])
        for time in range(500):
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            next_state = np.reshape(next_state, [1, state_size])
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            if done:
                print("episode: {}/{}, score: {}, e: {:.2}"
                      .format(e, episodes, time, agent.epsilon))
                break
            if len(agent.memory) > batch_size:
                agent.replay(batch_size)
    return agent



class TestPrioritizationEnv(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self, test_cases):
        super(TestPrioritizationEnv, self).__init__()
        self.test_cases = test_cases
        self.num_test_cases = len(test_cases)
        self.observation_space = spaces.MultiBinary(self.num_test_cases)
        self.action_space = spaces.Discrete(self.num_test_cases)
        self.current_test_case = 0
        self.done = False

    def reset(self):
        self.current_test_case = 0
        self.done = False
        return np.zeros(self.num_test_cases)

    def step(self, action):
        assert self.action_space.contains(action), "Invalid action"
        reward = self.test_cases[self.current_test_case] * action
        self.current_test_case += 1
        if self.current_test_case == self.num_test_cases:
            self.done = True
        return np.zeros(self.num_test_cases), reward, self.done, {}

    def render(self, mode='human'):
        pass

    def close(self):
        pass


# create the environment and train the agent
test_cases = [0, 1, 1, 0, 1, 0, 0, 1, 0, 1]
env = TestPrioritizationEnv(test_cases)