In [2]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np

class DelaySampleToMatchEnv(gym.Env):
    metadata = {'render.modes': ['human']}
    
    def __init__(self, n_stimuli=3, delay_length=1):
        super(DelaySampleToMatchEnv, self).__init__()
        
        # Number of different stimuli (excluding the delay)
        self.n_stimuli = n_stimuli
        
        # Length of delay between stimuli
        self.delay_length = delay_length
        
        # Action space and observation space
        # Stimuli are represented as integers from 0 to n_stimuli (0 is the delay)
        self.action_space = spaces.Discrete(n_stimuli + 1)
        self.observation_space = spaces.Discrete(n_stimuli + 1)
        
        # Initialize the sequence and current step
        self.sequence = []
        self.current_step = 0
        
    def _generate_sequence(self):
        # Randomly select n stimuli (1 to n_stimuli, excluding 0 which is the delay)
        stimuli = np.random.choice(range(1, self.n_stimuli + 1), self.n_stimuli, replace=False)
        # Create the sequence: stimulus1-delay-stimulus2-delay-...-stimulusN-stimulus1
        sequence = []
        for stimulus in stimuli:
            sequence.append(stimulus)
            sequence.extend([0] * self.delay_length)  # 0 represents the delay
        sequence.append(stimuli[0])
        return sequence
        
    def reset(self):
        # Generate a new sequence for the episode
        self.sequence = self._generate_sequence()
        self.current_step = 0
        # Return the first stimulus
        return self.sequence[self.current_step]
    
    def step(self, action):
        self.current_step += 1
        
        # Check if the episode is done
        done = self.current_step >= len(self.sequence)
        
        # Get the next observation
        if not done:
            observation = self.sequence[self.current_step]
        else:
            observation = None
        
        # In this environment, we don't have a reward structure as it's a sequence task
        reward = 0
        
        return observation, reward, done, {}
    
    def render(self, mode='human'):
        if self.current_step < len(self.sequence):
            print(f"Step {self.current_step}: Stimulus {self.sequence[self.current_step]}")
        else:
            print("Episode finished.")

# Example usage
env = DelaySampleToMatchEnv(n_stimuli=3, delay_length=1)
observation = env.reset()
done = False

while not done:
    print("Current Observation:", observation)
    action = env.action_space.sample()  # Random action (not used in this environment)
    observation, reward, done, info = env.step(action)
    env.render()

env.close()


Current Observation: 1
Step 1: Stimulus 0
Current Observation: 0
Step 2: Stimulus 3
Current Observation: 3
Step 3: Stimulus 0
Current Observation: 0
Step 4: Stimulus 2
Current Observation: 2
Step 5: Stimulus 0
Current Observation: 0
Step 6: Stimulus 1
Current Observation: 1
Episode finished.


In [3]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np

class DelaySampleToMatchEnv(gym.Env):
    metadata = {'render.modes': ['human']}
    
    def __init__(self, n_stimuli=3, delay_length=1):
        super(DelaySampleToMatchEnv, self).__init__()
        
        # Number of different stimuli (excluding the delay)
        self.n_stimuli = n_stimuli
        
        # Length of delay between stimuli
        self.delay_length = delay_length
        
        # Action space and observation space
        # Stimuli are represented as integers from 0 to n_stimuli (0 is the delay)
        self.action_space = spaces.Discrete(n_stimuli + 1)
        self.observation_space = spaces.Discrete(n_stimuli + 1)
        
        # Initialize the sequence and current step
        self.sequence = []
        self.current_step = 0
        self.first_stimulus = None
        
    def _generate_sequence(self):
        # Randomly select n stimuli (1 to n_stimuli, excluding 0 which is the delay)
        stimuli = np.random.choice(range(1, self.n_stimuli + 1), self.n_stimuli, replace=False)
        self.first_stimulus = stimuli[0]
        # Create the sequence: stimulus1-delay-stimulus2-delay-...-stimulusN-stimulus1
        sequence = []
        for stimulus in stimuli:
            sequence.append(stimulus)
            sequence.extend([0] * self.delay_length)  # 0 represents the delay
        sequence.append(stimuli[0])
        return sequence
        
    def reset(self):
        # Generate a new sequence for the episode
        self.sequence = self._generate_sequence()
        self.current_step = 0
        # Return the first stimulus
        return self.sequence[self.current_step]
    
    def step(self, action):
        reward = 0
        done = False
        
        # If the action is non-zero and the current stimulus is a delay, give a reward of -1
        if action != 0 and self.sequence[self.current_step] == 0:
            reward = -1
        
        # If the current step is the last one, check if the action matches the first stimulus
        if self.current_step == len(self.sequence) - 1:
            done = True
            if action == self.first_stimulus:
                reward = 1
            else:
                reward = -1
        
        # Advance to the next step
        self.current_step += 1
        
        # Check if the episode is done
        done = self.current_step >= len(self.sequence)
        
        # Get the next observation
        if not done:
            observation = self.sequence[self.current_step]
        else:
            observation = None
        
        return observation, reward, done, {}
    
    def render(self, mode='human'):
        if self.current_step < len(self.sequence):
            print(f"Step {self.current_step}: Stimulus {self.sequence[self.current_step]}")
        else:
            print("Episode finished.")

# Example usage
env = DelaySampleToMatchEnv(n_stimuli=3, delay_length=1)
observation = env.reset()
done = False

while not done:
    print("Current Observation:", observation)
    action = env.action_space.sample()  # Random action (for demonstration purposes)
    observation, reward, done, info = env.step(action)
    env.render()
    print(f"Action: {action}, Reward: {reward}")

env.close()


Current Observation: 1
Step 1: Stimulus 0
Action: 3, Reward: 0
Current Observation: 0
Step 2: Stimulus 2
Action: 3, Reward: -1
Current Observation: 2
Step 3: Stimulus 0
Action: 3, Reward: 0
Current Observation: 0
Step 4: Stimulus 3
Action: 2, Reward: -1
Current Observation: 3
Step 5: Stimulus 0
Action: 0, Reward: 0
Current Observation: 0
Step 6: Stimulus 1
Action: 1, Reward: -1
Current Observation: 1
Episode finished.
Action: 1, Reward: 1


In [5]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import matplotlib.pyplot as plt

# RNN for approximating Q-values
class RNNQNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(RNNQNetwork, self).__init__()
        self.hidden_dim = hidden_dim
        
        self.i2h = nn.Linear(input_dim, hidden_dim)
        self.h2h = nn.Linear(hidden_dim, hidden_dim)
        self.h2o = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, hidden):
        hidden = torch.tanh(self.i2h(x) + self.h2h(hidden))
        output = self.h2o(hidden)
        return output, hidden

    def init_hidden(self, batch_size):
        return torch.zeros(batch_size, self.hidden_dim)


# Deep Q-Learning Agent with RNN
class DQNAgent:
    def __init__(self, env, hidden_dim=64, learning_rate=0.001, discount_factor=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995, buffer_size=10000, batch_size=64):
        self.env = env
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.buffer_size = buffer_size
        self.batch_size = batch_size
        self.hidden_dim = hidden_dim

        self.memory = deque(maxlen=buffer_size)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.q_network = RNNQNetwork(env.observation_space.n, hidden_dim, env.action_space.n).to(self.device)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.criterion = nn.MSELoss()

    def choose_action(self, state, hidden):
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)
        with torch.no_grad():
            q_values, hidden = self.q_network(state_tensor, hidden)
        if np.random.rand() < self.epsilon:
            return self.env.action_space.sample(), hidden  # Explore
        else:
            return torch.argmax(q_values).item(), hidden  # Exploit

    def store_transition(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def learn(self):
        if len(self.memory) < self.batch_size:
            return

        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        # Convert lists to numpy arrays and concatenate
        states = np.array(states, dtype=np.float32)
        actions = np.array(actions, dtype=np.int64)
        rewards = np.array(rewards, dtype=np.float32)
        next_states = np.array(next_states, dtype=np.float32)
        dones = np.array(dones, dtype=np.float32)

        # Convert concatenated numpy arrays to PyTorch tensors
        states_tensor = torch.tensor(states).to(self.device)
        actions_tensor = torch.tensor(actions).unsqueeze(-1).to(self.device)
        rewards_tensor = torch.tensor(rewards).unsqueeze(-1).to(self.device)
        next_states_tensor = torch.tensor(next_states).to(self.device)
        dones_tensor = torch.tensor(dones).unsqueeze(-1).to(self.device)

        # Initialize the hidden state for the first batch item
        hidden = self.q_network.init_hidden(self.batch_size).to(self.device)

        current_q_values, _ = self.q_network(states_tensor, hidden)
        current_q_values = current_q_values.gather(1, actions_tensor)
        next_q_values, _ = self.q_network(next_states_tensor, hidden)
        next_q_values = next_q_values.max(1)[0].unsqueeze(-1)
        target_q_values = rewards_tensor + self.discount_factor * next_q_values * (1 - dones_tensor)

        loss = self.criterion(current_q_values, target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)



# Function to one-hot encode state
def one_hot_encode(state, state_space):
    one_hot = np.zeros(state_space)
    if isinstance(state, tuple):
        state = state[0]  # Extract the state from the tuple if reset() returns a tuple
    one_hot[int(state)] = 1  # Ensure state is converted to integer
    return one_hot

# Create the environment
#env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=False)

# Instantiate the agent
agent = DQNAgent(env)

n_episodes = 2500
win_pct_list = []
scores = []

# Training loop
for i in range(n_episodes):
    state = agent.env.reset()  # Reset the environment
    state = one_hot_encode(state, env.observation_space.n)
    done = False
    score = 0
    while not done:
        hidden = agent.q_network.init_hidden(1).to(agent.device)
        action, hidden = agent.choose_action(state, hidden)  # Choose action based on epsilon-greedy policy
        next_state, reward, done, info = agent.env.step(action)  # Take the action
        next_state = one_hot_encode(next_state, env.observation_space.n)
        agent.store_transition(state, action, reward, next_state, done)
        agent.learn()  # Update Q-network
        state = next_state  # Move to the next state
        score += reward
    scores.append(score)
    if i % 100 == 0:
        avg_score = np.mean(scores[-100:])
        win_pct_list.append(avg_score)
        print('episode', i, 'win pct %.2f' % avg_score, 'epsilon %.2f' % agent.epsilon)

# Plotting the win percentage over episodes
plt.plot(win_pct_list)
plt.xlabel('Episodes (x100)')
plt.ylabel('Win Percentage')
plt.title('Win Percentage over Time')
plt.show()


TypeError: int() argument must be a string, a bytes-like object or a number, not 'NoneType'