In [1]:
import gym
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

from torch.utils.data import TensorDataset, DataLoader
from collections import deque

In [2]:
class ShowerEnv(Env):
    def __init__(self) -> None:
        super().__init__()
        # Actions we can take: down or stay or up
        self.action_space = Discrete(3)
        # Temperature array 0 - 100
        self.observation_space = Box(low=np.array([0]), high=np.array([100]))
        # Set start temp
        self.state = 38 + random.randint(-3, 3)
        # Set shower length
        self.shower_length = 60
    
    def step(self, action):
        # Apply action
        self.state += action - 1
        # Reduce shower length by 1 second
        self.shower_length -= 1
        
        # Calculate reward
        if self.state >= 37 and self.state <= 39:
            reward = 1
        else:
            reward = - 1
            
        # Check if shower is done
        if self.shower_length <= 0:
            done = True
        else:
            done = False
            
        # Apply temperature noise
        self.state += random.randint(-1, 1)
        # Set placeholder for info
        info = {}
        
        return self.state, reward, done, info
    
    def render(self):
        pass
    
    def reset(self):
        self.state = 38 + random.randint(-3, 3)
        self.shower_length = 60
        return self.state

In [3]:
env = ShowerEnv()

  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


In [4]:
env.action_space.sample(), env.observation_space.sample()

(2, array([39.940598], dtype=float32))

In [5]:
env.reset()

40

In [19]:
episodes = 10
for eps in range(1, episodes + 1):
    state = env.reset()
    done = False
    score = 0
    
    while not done:
        # env.render()
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score += reward
    
    print('Episode:{} Score:{}'.format(eps, score))

Episode:1 Score:-42
Episode:2 Score:-42
Episode:3 Score:-24
Episode:4 Score:-36
Episode:5 Score:-30
Episode:6 Score:-58
Episode:7 Score:-46
Episode:8 Score:-54
Episode:9 Score:-60
Episode:10 Score:-60


## DL with Keras

In [52]:
states = env.observation_space.shape
actions = env.action_space.n

In [26]:
def build_model(states, actions):
    model = Sequential()
    model.add(Dense(24, activation='relu', input_shape=states))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(actions, activation='linear'))
    return model

In [53]:
def build_model(input_size, output_size):
    model = nn.Sequential(
        nn.Linear(input_size, 64),
        nn.ReLU(),
        nn.Linear(64, 64),
        nn.ReLU(),
        nn.Linear(64, output_size)
    )
    return model

In [54]:
model = build_model(1, actions)

In [49]:
model.eval()

Sequential(
  (0): Linear(in_features=1, out_features=64, bias=True)
  (1): ReLU()
  (2): Linear(in_features=64, out_features=64, bias=True)
  (3): ReLU()
  (4): Linear(in_features=64, out_features=3, bias=True)
)

## Build agent with Keras

In [57]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

from torch.utils.data import TensorDataset, DataLoader
from collections import deque

class DQNAgent:
    def __init__(self, model, target_model, memory_size=50000, batch_size=32, gamma=0.99):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = model.to(self.device)
        self.target_model = target_model.to(self.device)
        self.target_model.load_state_dict(self.model.state_dict())
        self.target_model.eval()
        self.optimizer = optim.Adam(self.model.parameters(), lr=1e-3)
        self.loss_fn = nn.SmoothL1Loss()
        self.memory = deque(maxlen=memory_size)
        self.batch_size = batch_size
        self.gamma = gamma

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        batch = np.random.choice(len(self.memory), self.batch_size, replace=False)
        states, actions, rewards, next_states, dones = zip(*[self.memory[i] for i in batch])

        states = torch.tensor(states, dtype=torch.float32).to(self.device)
        actions = torch.tensor(actions, dtype=torch.long).to(self.device)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device)
        next_states = torch.tensor(next_states, dtype=torch.float32).to(self.device)
        dones = torch.tensor(dones, dtype=torch.float32).to(self.device)

        q_values = self.model(states).gather(1, actions.unsqueeze(1))
        next_q_values = self.target_model(next_states).max(dim=1)[0].detach()
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

        loss = self.loss_fn(q_values, target_q_values.unsqueeze(1))
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def act(self, state, epsilon=0):
        if np.random.rand() <= epsilon:
            return np.random.randint(self.model.output_size)
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)
        q_values = self.model(state)
        return torch.argmax(q_values).item()

# Build your models
target_model = build_model(1, actions)

# Build your DQNAgent
dqn_agent = DQNAgent(model, target_model)

# Training loop
for step in range(50000):
    action = dqn_agent.act(state)
    next_state, reward, done, _ = env.step(action)
    dqn_agent.remember(state, action, reward, next_state, done)
    dqn_agent.replay()
    if step % 100 == 0:
        dqn_agent.update_target_model()

# Testing loop
scores = []
for _ in range(100):
    state = env.reset()
    total_reward = 0
    done = False
    while not done:
        action = dqn_agent.act(state, epsilon=0)
        next_state, reward, done, _ = env.step(action)
        total_reward += reward
        state = next_state
    scores.append(total_reward)
print(np.mean(scores))


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x32 and 1x64)

In [56]:
target_model

Sequential(
  (0): Linear(in_features=1, out_features=64, bias=True)
  (1): ReLU()
  (2): Linear(in_features=64, out_features=64, bias=True)
  (3): ReLU()
  (4): Linear(in_features=64, out_features=3, bias=True)
)