In [1]:
import simglucose
import gymnasium as gym
from collections import namedtuple, deque
import random

In [2]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward', 'h_0', 'c_0', 'h_0_next', 'c_0_next'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [3]:

from gymnasium.wrappers import FlattenObservation


def paper_reward_function(BG_last_hour):
    G = BG_last_hour[-1]
    if G >= 70 and G <= 180:
        return 0.5
    if G > 180 and G <= 200:
        return -0.9
    if G > 200 and G <= 250:
        return -1.2
    if G > 250 and G <= 350:
        return -1.5
    if G > 30 and G < 70:
        return -1.8
    else:
        return -2

gym.envs.register(
    id="simglucose-basal",
    entry_point="simglucose.envs:T1DSimEnvBolus",
    kwargs={
        "patient_name": ["adult#001"],
        "reward_fun": paper_reward_function,
        "history_length": 1,
        "enable_meal": True,
    },
)




In [4]:

def create_env():

    env = gym.make("simglucose-basal")

    env = FlattenObservation(env)

    return env



env = create_env()


print(env.action_space)
print(env.observation_space)

# random action
env.reset()
for _ in range(100):
    env.step(env.action_space.sample())  # take a random action


Discrete(7)
Box(0.0, 10000.0, (3,), float32)


### Train the network?

In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
from collections import deque

In [13]:
class QNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(QNetwork, self).__init__()
        # Define your neural network architecture
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, x):
        return self.fc(x)

In [88]:
class DQNAgent:
    def __init__(self, input_size, output_size, gamma=0.99, learning_rate=0.001, buffer_size=10000, batch_size=32):
        self.gamma = gamma
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Q-networks
        self.q_network = QNetwork(input_size, output_size).to(self.device)
        self.target_q_network = QNetwork(input_size, output_size).to(self.device)
        self.target_q_network.load_state_dict(self.q_network.state_dict())

        # Optimizer
        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=learning_rate)

        # Experience replay buffer
        self.buffer_size = buffer_size
        self.batch_size = batch_size
        self.replay_buffer = deque(maxlen=buffer_size)

    def select_action(self, state):
        alpha_index = env.action_space.sample()
        return alpha_index

    def store_transition(self, state, action, reward, next_state, done):
        transition = (state, action, reward, next_state, done)
        self.replay_buffer.append(transition)

    def sample_batch(self):
        batch = random.sample(self.replay_buffer, min(len(self.replay_buffer), self.batch_size))
        states, actions, rewards, next_states, dones = zip(*batch)
        
        # Convert to tensors and handle dimensions
        states = torch.stack(states)
        actions = torch.LongTensor(actions).unsqueeze(1)  
        rewards = torch.FloatTensor(rewards).unsqueeze(1)  
        next_states = torch.stack(next_states)
        dones = torch.FloatTensor(dones).unsqueeze(1) 

        return states, actions, rewards, next_states, dones

    def update_q_network(self):
        if len(self.replay_buffer) < self.batch_size:
            return

        states, actions, rewards, next_states, dones = self.sample_batch()

        # Compute Q-values
        q_values = self.q_network(states).gather(1, actions.unsqueeze(1))

        # Compute target Q-values using the target network
        target_q_values = rewards + (1 - dones) * self.gamma * self.target_q_network(next_states).max(1)[0].detach()

        # Compute the Huber loss
        loss = F.smooth_l1_loss(q_values, target_q_values.unsqueeze(1))

        # Update the Q-network
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_q_network(self):
        # Update the target network by copying the Q-network parameters
        self.target_q_network.load_state_dict(self.q_network.state_dict())

    def train_step(self, state, action, reward, next_state, done):
        # Store the transition in the replay buffer
        self.store_transition(state, action, reward, next_state, done)

        # Update the Q-network
        self.update_q_network()

        # Update the target Q-network periodically
        if len(self.replay_buffer) % 100 == 0:
            self.update_target_q_network()

In [89]:
def paper_reward_function(BG_last_hour):
    G = BG_last_hour[-1]
    if G >= 70 and G <= 180:
        return 0.5
    if G > 180 and G <= 200:
        return -0.9
    if G > 200 and G <= 250:
        return -1.2
    if G > 250 and G <= 350:
        return -1.5
    if G > 30 and G < 70:
        return -1.8
    else:
        return -2

gym.envs.register(
    id="simglucose-basal",
    entry_point="simglucose.envs:T1DSimEnvBolus",
    kwargs={
        "patient_name": ["adult#001"],
        "reward_fun": paper_reward_function,
        "history_length": 1,
        "enable_meal": True,
    },
)

def create_env():

    env = gym.make("simglucose-basal")

    env = FlattenObservation(env)

    return env



env = create_env()

  logger.warn(f"Overriding environment {new_spec.id} already in registry.")


In [90]:
# DDQ Agent setup
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
print('state_size', state_size)
print('action_size', action_size)
agent = DQNAgent(input_size=state_size, output_size=action_size)

state_size 3
action_size 7


In [98]:
num_episodes = 100

for episode in range(num_episodes):
    state = env.reset() 
    total_reward = 0 

    while True:
        action = agent.select_action(state)

        # Take the selected action in the environment
        next_state, reward, done, _, _ = env.step(action)    

        # Store the transition and perform a training step
        # PROBLEM WITH TORCH TENSOR CONVERSIONS
        agent.train_step(state_obs, action, reward, next_state, done)
        state = next_state

        # Accumulate the total reward
        total_reward += reward

        if done:
            break  # End the episode if the environment signals that it's done

    print(f"Episode {episode + 1}, Total Reward: {total_reward}")




ValueError: expected sequence of length 3 at dim 1 (got 4)

In [97]:
state_obs

tensor([154.8131,   0.0000,   0.0000])

In [None]:
state[0]

tensor([145.4606,   0.0000,   0.0000])