In [2]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
device

device(type='cpu')

In [3]:
class MachineEnv:
  def __init__(self):
    self.state = np.zeros(3)
    self.action = np.array([0, 1, 2, 3])
    self.max_time = 100.
    self.maintenance_cost = 10.
    self.breakdown_cost = 100.
    self.profit = 20.

  def reset(self):
    self.state = np.zeros(3)
    return self.state

  def failure_prob(self):
    return 1 - np.exp(-0.3*(self.state[0] + self.state[1]) - 0.1*self.state[2]/self.max_time)

  def step(self, action):
    if action == 0:
      if np.random.rand() < self.failure_prob():
        reward = -self.breakdown_cost
        self.state[0] = 0
        self.state[1] = 0
      else:
        reward = self.profit
        self.state[0] += np.random.rand()*0.1
        self.state[1] += np.random.rand()*0.05

    elif action == 1:
      reward = -self.maintenance_cost
      self.state[0] = 0.

    elif action == 2:
      reward = -self.maintenance_cost
      self.state[1] = 0.

    elif action == 3:
      reward = -2*self.maintenance_cost
      self.state[0] = 0.
      self.state[1] = 0.

    self.state[0] = min(1, self.state[0])
    self.state[1] = min(1, self.state[1])
    self.state[2] += 1.

    if self.state[2] == self.max_time:
      return self.state, reward, True
    else:
      return self.state, reward, False

In [4]:
class DQN(nn.Module):
  def __init__(self, state_dim, action_dim, hidden_dim_1 = 64, hidden_dim_2 = 64):
    super().__init__()
    self.net1 = nn.Linear(state_dim, hidden_dim_1)
    self.net2 = nn.Linear(hidden_dim_1, hidden_dim_2)
    self.net3 = nn.Linear(hidden_dim_2, action_dim)
    self.relu = nn.ReLU()

  def forward(self, s):
    x = self.relu(self.net1(s))
    x = self.relu(self.net2(x))
    return self.net3(x)

In [5]:
class ReplayBuffer:
  def __init__(self, state_dim, action_dim, max_size = 10000):
    self.max_size = max_size
    self.count = 0
    self.size = 0
    self.s = np.zeros(shape = (self.max_size, state_dim))
    self.a = np.zeros(shape = (self.max_size, 1))
    self.r = np.zeros(shape = (self.max_size, 1))
    self.s_next = np.zeros(shape = (self.max_size, state_dim))
    self.done = np.zeros(shape = (self.max_size, 1))

  def store(self, s, a, r, s_next, done):
    self.s[self.count] = s
    self.a[self.count] = a
    self.r[self.count] = r
    self.s_next[self.count] = s_next
    self.done[self.count] = done

    self.count = (self.count+1) % self.max_size
    self.size = min(self.size+1, self.max_size)

  def sample(self, batch_size):
    idx = np.random.choice(self.size, size = batch_size)
    batch_s = torch.tensor(self.s[idx], dtype=torch.float32).to(device)
    batch_a = torch.tensor(self.a[idx], dtype=torch.long).to(device)
    batch_r = torch.tensor(self.r[idx], dtype=torch.float32).to(device)
    batch_s_next = torch.tensor(self.s_next[idx], dtype=torch.float32).to(device)
    batch_done = torch.tensor(self.done[idx], dtype=torch.float32).to(device)
    return batch_s, batch_a, batch_r, batch_s_next, batch_done

In [6]:
class DQN_Learning:
  def __init__(self, state_dim, action_dim, alpha, gamma, tau, K):
    self.state_dim = state_dim
    self.action_dim = action_dim

    self.q = DQN(state_dim, action_dim).to(device)
    self.q_target = DQN(state_dim, action_dim).to(device)
    self.optimizer = optim.Adam(self.q.parameters(), lr = alpha)
    self.loss_fn = nn.MSELoss()

    self.memory = ReplayBuffer(state_dim, action_dim)

    self.t_step = 0
    self.K = K
    self.alpha = alpha
    self.gamma = gamma
    self.tau = tau

  def step(self, s, a, r, s_next, done):
    self.memory.store(s, a, r, s_next, done)

    self.t_step = (self.t_step + 1) % self.K
    if self.t_step == 0:
      if self.memory.size >= 64:
        batch_sample = self.memory.sample(64)
        self.learn(batch_sample)

  def choose_action(self, state, epsilon):
    state_tensor = torch.tensor(state, dtype = torch.float32).reshape(1, -1).to(device)

    self.q.eval()
    with torch.no_grad():
      action_values = self.q(state_tensor)
    self.q.train()

    if np.random.rand() > epsilon:
        return action_values.argmax(dim=1).item()
    else:
        return np.random.randint(self.action_dim)

  def learn(self, batch_sample):
    s, a, r, s_next, done = batch_sample

    q_targets_next = self.q_target(s_next).detach().max(dim=1)[0].reshape(-1, 1)
    q_targets = r + (self.gamma*q_targets_next*(1-done))
    q_local = self.q(s).gather(dim=1, index=a)

    loss = self.loss_fn(q_local, q_targets)
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

    self.soft_update(self.q, self.q_target)

  def soft_update(self, q, q_target):
    for target_param, local_param in zip(q_target.parameters(), q.parameters()):
      target_param.data = self.tau*local_param.data + (1.0 - self.tau)*target_param.data

In [79]:
env = MachineEnv()

num_episode = 300
max_steps = 100
epsilon_start = 0.4
epsilon_end = 0.1
epsilon_decay_rate = 0.9
gamma = 1
K = 64
tau = 0.005
lr = 0.001
freq = 50

DQN_learner = DQN_Learning(len(env.state), len(env.action), lr, gamma, tau, K)
def DQN_policy(num_episodes,freq,epsilon_start,epsilon_end,epsilon_decay_rate,max_steps):
  total_reward = 0
  training_rewards = 0
  for episode in range(num_episodes):
    state = env.reset()
    episode_reward = 0
    epsilon = max(epsilon_end, epsilon_start * (epsilon_decay_rate ** episode))
    for step in range(max_steps):
      action = DQN_learner.choose_action(state, epsilon)
      next_state, reward, done = env.step(action)
      DQN_learner.step(state, action, reward, next_state, done)
      state = next_state
      episode_reward += reward
      if done:
        break
    total_reward += episode_reward
    training_rewards += episode_reward
    if (episode + 1) % freq == 0:
      print(f"Episode {episode + 1}: Average rewards for the last {freq} episodes: {training_rewards / freq:.2f}")
      training_rewards = 0
  avg_reward = total_reward/num_episodes
  return avg_reward
DQN_policy(num_episode,freq,epsilon_start,epsilon_end,epsilon_decay_rate,max_steps)

Episode 50: Average rewards for the last 50 episodes: 406.20
Episode 100: Average rewards for the last 50 episodes: 339.80
Episode 150: Average rewards for the last 50 episodes: 450.20
Episode 200: Average rewards for the last 50 episodes: 410.60
Episode 250: Average rewards for the last 50 episodes: 436.20
Episode 300: Average rewards for the last 50 episodes: 363.60


401.1

In [80]:
env = MachineEnv()
PM = np.zeros(shape=(10, 10))
def heuristic1(PM,num_episodes,epsilon_start,epsilon_end,epsilon_decay_rate,max_steps):
  for i, ii in enumerate(range(1, 11)):
    for j, jj in enumerate(range(1, 11)):
      training_rewards = 0
      for episode in range(num_episodes):
        state = env.reset()
        epsilon = max(epsilon_end, epsilon_start * (epsilon_decay_rate ** episode))
        episode_reward = 0
        for step in range(max_steps):
          if (step % ii == 0 and step % jj == 0):
            action = 3
          elif step % ii == 0:
            action = 1
          elif step % jj == 0:
            action = 2
          else:
            action = DQN_learner.choose_action(state, epsilon)
          next_state, reward, done = env.step(action)
          DQN_learner.step(state, action, reward, next_state, done)
          state = next_state
          episode_reward += reward
          if done:
            break
        training_rewards += episode_reward
      PM[i][j] = training_rewards / num_episodes
  best_i, best_j = np.unravel_index(np.argmax(PM), PM.shape)
  best_reward = PM[best_i][best_j]
  return best_reward, best_i+1, best_j+1
best_reward, best_i, best_j = heuristic1(PM,100,epsilon_start,epsilon_end,epsilon_decay_rate,max_steps)
print(f" reward: {best_reward}, part1: {best_i}, part2: {best_j}")

 reward: 178.4, part1: 3, part2: 6


In [81]:
env = MachineEnv()
CM = np.zeros(shape=(10, 10))
max_steps = 100
freq = 100
epsilon_start = 0.3
epsilon_end = 0.1
epsilon_decay_rate = 0.9
def heuristic2(CM,num_episodes,epsilon_start,epsilon_end,epsilon_decay_rate,max_steps):
  for i, ii in enumerate(range(5, 51, 5)):
    for j, jj in enumerate(range(5, 51, 5)):
      training_rewards = 0
      for episode in range(num_episodes):
        state = env.reset()
        epsilon = max(epsilon_end, epsilon_start * (epsilon_decay_rate ** episode))
        episode_reward = 0
        for step in range(max_steps):
          if (state[0] > (ii/100) and state[1] > (jj/100)):
            action = 3
          elif state[0] > (ii/100):
            action = 1
          elif state[1] > (jj/100):
            action = 2
          else:
            action = DQN_learner.choose_action(state, epsilon)
          next_state, reward, done = env.step(action)
          DQN_learner.step(state, action, reward, next_state, done)
          state = next_state
          episode_reward += reward
          if done:
              break
        training_rewards += episode_reward
      CM[i-1][j-1] = training_rewards / num_episodes
  Best_i, Best_j = np.unravel_index(np.argmax(CM), CM.shape)
  best_reward = CM[Best_i][Best_j]
  return best_reward, (Best_i+1)*0.05, (Best_j+1)*0.05
best_reward2, best_i2, best_j2 = heuristic2(CM,100,epsilon_start,epsilon_end,epsilon_decay_rate,max_steps)
print(f" reward: {best_reward2}, part1: {best_i2}, part2: {best_j2}")

 reward: -629.7, part1: 0.30000000000000004, part2: 0.35000000000000003


In [82]:
DQN_reward = DQN_policy(100,freq,epsilon_start,epsilon_end,epsilon_decay_rate,max_steps)

heuristic1_reward, best_i, best_j =  heuristic1(PM,100,epsilon_start,epsilon_end,epsilon_decay_rate,max_steps)

heuristic2_reward, best_i2, best_j2 = heuristic2(CM,100,epsilon_start,epsilon_end,epsilon_decay_rate,max_steps)

print(f"DQN reward: {DQN_reward}")
print(f"heuristic 1 reward: {heuristic1_reward}")
print(f"heuristic 2 reward: {heuristic2_reward}")

Episode 100: Average rewards for the last 100 episodes: -1021.00
DQN reward: -1021.0
heuristic 1 reward: -223.1
heuristic 2 reward: -521.6
