In [3]:
import gym
from gym import spaces
import numpy as np
import torch
from torchvision import datasets, transforms

class CIFAR10Env(gym.Env):
    def __init__(self, subset='train'):
        super(CIFAR10Env, self).__init__()

        # Load CIFAR-10 dataset
        self.transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
        self.dataset = datasets.CIFAR10(root='./data', train=(subset == 'train'), download=True, transform=self.transform)
        self.loader = torch.utils.data.DataLoader(self.dataset, batch_size=1, shuffle=True, num_workers=2)

        # Define action and observation spaces
        self.action_space = spaces.Discrete(10)  # 10 classes in CIFAR-10
        self.observation_space = spaces.Box(low=0, high=1, shape=(3, 32, 32), dtype=np.float32)

        # Initialize state
        self.current_index = 0

    def reset(self):
        self.current_index = 0
        return self._get_observation()

    def step(self, action):
        # Take action and return next state, reward, done, and info
        obs = self._get_observation()
        reward = self._calculate_reward(action)
        done = self.current_index == len(self.dataset) - 1
        info = {}

        self.current_index += 1

        return obs, reward, done, info

    def _get_observation(self):
        # Get the current observation (image)
        image, _ = next(iter(self.loader))
        return image.squeeze(0).numpy()

    def _calculate_reward(self, action):
        # Placeholder reward function (you may want to customize this based on your task)
        true_label = self.dataset[self.current_index][1]
        temp_reward = 1.0 if action == true_label else 0.0
        
        return float(temp_reward)

# Example of using the CIFAR10Env
env = CIFAR10Env(subset='train')

# Reset the environment
obs = env.reset()

# Sample random actions for 10 steps
for _ in range(10):
    state=env._get_observation()
    action = env.action_space.sample()
    obs, reward, done, info = env.step(action)
    print(f"Action: {action}, Reward: {reward}, Done: {done}")


Files already downloaded and verified
Action: 5, Reward: 0.0, Done: False
Action: 4, Reward: 0.0, Done: False
Action: 6, Reward: 0.0, Done: False
Action: 2, Reward: 0.0, Done: False
Action: 0, Reward: 0.0, Done: False
Action: 7, Reward: 0.0, Done: False


KeyboardInterrupt: 

In [5]:
env.observation_space.shape[0]

3

In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
import gym
import ptan
import numpy as np

class DQN(nn.Module):
    def __init__(self, input_channels, n_actions):
        super(DQN, self).__init__()

        self.conv_layers = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Calculate the size of the output after convolutional layers
        self.fc_input_size = self._calculate_conv_output_size(input_channels, 16, 2) * \
                             self._calculate_conv_output_size(16, 32, 2) * \
                             self._calculate_conv_output_size(32, 64, 2) * \
                             self._calculate_conv_output_size(64, 128, 2)

        self.fc_layers = nn.Sequential(
            # nn.Linear(2048, 2048),  # Update input size based on flattened output size
            # nn.ReLU(),
            nn.Linear(2048, 4096),
            nn.ReLU(),
            nn.Linear(4096, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )

    def forward(self, x):
        
        x = self.conv_layers(x)
        
        # Print the shape of the output after convolutional layers
        #print("Conv Output Shape:", x.shape)
        
        x = x.view(x.size(0), -1)  # Flatten the output for the fully connected layers
        
        # Print the shape of the flattened output
        #print("Flattened Output Shape:", x.shape)
        
        x = self.fc_layers(x)
        
        # Print the shape of the output after fully connected layers
        #print("FC Output Shape:", x.shape)
        
        return x

    def _calculate_conv_output_size(self, in_channels, out_channels, stride):
        # Function to calculate the size of the output after a convolutional layer
        dummy_input = torch.zeros(1, in_channels, 32, 32)
        dummy_output = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)(dummy_input)
        return dummy_output.size(2)








In [7]:
def unpack_batch(batch):
    states, actions, rewards, dones, next_states = [], [], [], [], []
    for experience in batch:
        states.append(experience.state)
        actions.append(experience.action)
        rewards.append(experience.reward)
        dones.append(experience[-1])  # Accessing done flag directly
        next_states.append(experience.last_state)

    states_v = torch.tensor(states, dtype=torch.float32)
    actions_v = torch.tensor(actions)
    rewards_v = torch.tensor(rewards, dtype=torch.float32)
    
    # Convert 'dones' to a NumPy array before creating the PyTorch tensor
    dones_mask = torch.tensor(np.array(dones), dtype=torch.bool)
    
    next_states_v = torch.tensor(next_states, dtype=torch.float32)

    return states_v, actions_v, rewards_v, dones_mask, next_states_v


In [21]:
net = DQN(env.observation_space.shape[0], env.action_space.n)
net

DQN(
  (conv_layers): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc_layers): Sequential(
    (0): Linear(in_features=2048, out_features=4096, bias=True)
    (1): ReLU()
    (2): Linear(in_features=4096, out_features=128, bias=True)
    (3): ReLU()
    (4): Linear(in_features=128, out_features=10, bias=True)
  )
)

In [22]:

# Create CartPole environment
env = CIFAR10Env(subset="train")

# Set random seeds for reproducibility
#env.seed(42)
torch.manual_seed(42)
np.random.seed(42)

# Neural network and optimizer
net = DQN(env.observation_space.shape[0], env.action_space.n)
target_net = ptan.agent.TargetNet(net)
selector = ptan.actions.ArgmaxActionSelector()
epsilon_greedy = ptan.actions.EpsilonGreedyActionSelector(epsilon=0.1, selector=selector)
agent = ptan.agent.DQNAgent(net, epsilon_greedy, preprocessor=ptan.agent.float32_preprocessor)

# Experience source
exp_source = ptan.experience.ExperienceSourceFirstLast(env, agent, gamma=0.99, steps_count=1)

# Experience buffer
buffer = ptan.experience.ExperienceReplayBuffer(exp_source, buffer_size=1000)

# Loss function and optimizer
loss_func = nn.MSELoss()
optimizer = optim.Adam(net.parameters(), lr=1e-3)


Files already downloaded and verified


In [10]:
exp_source.agent

<ptan.agent.DQNAgent at 0x1c04e586110>

In [40]:
#states_v, actions_v, rewards_v, dones_mask, next_states_v = unpack_batch(s)
print(s)#states_v, actions_v, rewards_v, dones_mask, next_states_v)

None


In [61]:

if buffer.sample(1) is not None:
    sample=buffer.populate(1)
    # Check if the sample is None
    if sample is not None:
        states_v, actions_v, rewards_v, dones_mask, next_states_v = unpack_batch(sample)

        # Other training steps...
    else:
        # Handle the case where the sample is None (e.g., buffer is not ready)
        print("Sample is None, buffer might not be ready.")

Conv Output Shape: torch.Size([1, 128, 4, 4])
Flattened Output Shape: torch.Size([1, 2048])
FC Output Shape: torch.Size([1, 10])
Conv Output Shape: torch.Size([1, 128, 4, 4])
Flattened Output Shape: torch.Size([1, 2048])
FC Output Shape: torch.Size([1, 10])
Sample is None, buffer might not be ready.


In [42]:
sample
states_v, actions_v, rewards_v, dones_mask, next_states_v = unpack_batch(sample)

TypeError: 'NoneType' object is not iterable

In [11]:
agent

<ptan.agent.DQNAgent at 0x1c04e586110>

In [3]:

best_reward = float('-inf')  # Initialize with negative infinity or other appropriate value
best_model_path = "best_model.pth"  # Define the path where you want to save the best model
checkpoint_interval = 5
current_reward = 0.0
# Training loop
for step in range(1000):  # You may want to adjust the number of steps
    buffer.populate(1)

    # Get batch from the buffer
    batch = buffer.sample(1)
    states_v, actions_v, rewards_v, dones_mask, next_states_v = unpack_batch(batch)

    # Zero gradients
    optimizer.zero_grad()

    # Forward pass
    q_values = net(states_v)

    # Get Q-values for taken actions
        # Get target Q-values
    target_q_values = target_net.target_model(next_states_v).max(1)[0]

    # Use view(-1) to ensure dones_mask is 1-dimensional
    dones_mask = dones_mask.view(-1)

    # Ensure target_q_values is also 1-dimensional
    target_q_values = target_q_values.view(-1)
    print(target_q_values)
    # # Update Q-values based on dones_mask
    # target_q_values[dones_mask] = 0.0

    # Detach target_q_values
    target_q_values = target_q_values.detach()

    # Calculate TD error
    expected_q_values = rewards_v + target_q_values * 0.99
    loss = loss_func(q_values, expected_q_values)

    # Backward pass
    loss.backward()

    # Optimize
    optimizer.step()


    if step % 10 == 0:
        target_net.sync()
    print(f"\n----------------------------Loss {loss.item()}------------\n")
    if step % checkpoint_interval == 0:
            # Check the performance and save the model if it's the best so far
            if current_reward > best_reward:
                best_reward = current_reward
                torch.save(net.state_dict(), best_model_path)
# After training, you can use the trained model for inference.
# For example, you can run the trained agent in the environment:


NameError: name 'buffer' is not defined

In [19]:
state = env.reset()
while True:
    #env.render()
    state =env._get_observation() # or numpy image
    # Wrap the state in a batch-like structure
    state_batch = torch.tensor([state], dtype=torch.float32)
    action= agent(state_batch)
    print(action)
    obs, reward, done, info= env.step(action)  # Extract the action value from the tensor
    if done:
        break

env.close()


Conv Output Shape: torch.Size([1, 128, 4, 4])
Flattened Output Shape: torch.Size([1, 2048])
FC Output Shape: torch.Size([1, 10])
(array([7], dtype=int64), [None])
Conv Output Shape: torch.Size([1, 128, 4, 4])
Flattened Output Shape: torch.Size([1, 2048])
FC Output Shape: torch.Size([1, 10])
(array([3], dtype=int64), [None])
Conv Output Shape: torch.Size([1, 128, 4, 4])
Flattened Output Shape: torch.Size([1, 2048])
FC Output Shape: torch.Size([1, 10])
(array([3], dtype=int64), [None])
Conv Output Shape: torch.Size([1, 128, 4, 4])
Flattened Output Shape: torch.Size([1, 2048])
FC Output Shape: torch.Size([1, 10])
(array([3], dtype=int64), [None])
Conv Output Shape: torch.Size([1, 128, 4, 4])
Flattened Output Shape: torch.Size([1, 2048])
FC Output Shape: torch.Size([1, 10])
(array([7], dtype=int64), [None])
Conv Output Shape: torch.Size([1, 128, 4, 4])
Flattened Output Shape: torch.Size([1, 2048])
FC Output Shape: torch.Size([1, 10])
(array([3], dtype=int64), [None])
Conv Output Shape: tor

KeyboardInterrupt: 

In [27]:
class DynamicPenaltyTracker(ptan.ignite.Trackable):
    def __init__(self, initial_penalty=0.1, update_interval=100, update_factor=0.9):
        self.penalty = initial_penalty
        self.update_interval = update_interval
        self.update_factor = update_factor
        self.steps = 0

    def frame(self, value):
        self.steps += 1
        if self.steps % self.update_interval == 0:
            self.penalty *= self.update_factor
        return self.penalty

AttributeError: module 'ptan' has no attribute 'ignite'

In [64]:
# Create a DynamicPenaltyTracker
penalty_tracker = DynamicPenaltyTracker(initial_penalty=0.1, update_interval=100, update_factor=0.9)

# Training loop
for step in range(1000):
    buffer.populate(1)

    # Get batch from the buffer
    batch = buffer.sample(1)
    states_v, actions_v, rewards_v, dones_mask, next_states_v = unpack_batch(batch)

    # Forward pass
    q_values = net(states_v)

    # Get target Q-values
    target_q_values = target_net.target_model(next_states_v).max(1)[0]
    print(target_q_values)
    target_q_values[dones_mask] = 0.0
    target_q_values = target_q_values.detach()

    # Calculate TD error
    expected_q_values = rewards_v + target_q_values * 0.99
    loss = loss_func(q_values, expected_q_values)

    # Get the current penalty factor from the tracker
    penalty_factor = penalty_tracker.frame(step)

    # Apply penalty to the loss
    loss += penalty_factor * (some_dynamic_value - some_target_value)

    # Zero gradients
    optimizer.zero_grad()

    # Backward pass
    loss.backward()

    # Optimize
    optimizer.step()


NameError: name 'DynamicPenaltyTracker' is not defined