In [4]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

# Intrinsic Curiosity Module (ICM)
class StateEncoder(nn.Module):
    def __init__(self, state_dim, encoding_dim):
        super(StateEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, encoding_dim),
        )

    def forward(self, state):
        return self.encoder(state)

class InverseModel(nn.Module):
    def __init__(self, encoding_dim, action_dim):
        super(InverseModel, self).__init__()
        self.inverse_model = nn.Sequential(
            nn.Linear(encoding_dim * 2, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim),
        )

    def forward(self, state_encoding, next_state_encoding):
        input_tensor = torch.cat([state_encoding, next_state_encoding], dim=1)
        return self.inverse_model(input_tensor)

class ForwardModel(nn.Module):
    def __init__(self, encoding_dim, action_dim, stack_size=4):
        super(ForwardModel, self).__init__()
        self.stack_size = stack_size
        self.forward_model = nn.Sequential(
            nn.Linear(encoding_dim * stack_size + action_dim, 256),
            nn.ReLU(),
            nn.Linear(256, encoding_dim),
        )

    def forward(self, state_encoding_stack, action):
        input_tensor = torch.cat([state_encoding_stack.view(-1, self.stack_size * state_encoding_stack.shape[-1]), action], dim=1)
        return self.forward_model(input_tensor)

class ICM(nn.Module):
    def __init__(self, state_dim, action_dim, encoding_dim, stack_size=4):
        super(ICM, self).__init__()
        self.state_encoder = StateEncoder(state_dim, encoding_dim)
        self.inverse_model = InverseModel(encoding_dim, action_dim)
        self.forward_model = ForwardModel(encoding_dim, action_dim, stack_size)

    def forward(self, state, next_state, action):
        state_encoding = self.state_encoder(state)
        next_state_encoding = self.state_encoder(next_state)

        inverse_loss = nn.MSELoss()(self.inverse_model(state_encoding, next_state_encoding), action)

        state_encoding_stack = torch.cat([state_encoding.unsqueeze(1)] * self.forward_model.stack_size, dim=1)
        forward_loss = nn.MSELoss()(self.forward_model(state_encoding_stack, action), next_state_encoding)

        return inverse_loss, forward_loss

# Deep Q-Network (DQN)
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Replay Buffer
class ReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer = deque(maxlen=buffer_size)

    def append(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return state, action, reward, next_state, done

# Train Agent
def train_agent(env, episodes, batch_size, buffer_size, gamma, epsilon, epsilon_decay):
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n
    encoding_dim = 64
    stack_size = 4

    dqn = DQN(state_dim, action_dim).to(device)
    icm = ICM(state_dim, action_dim, encoding_dim, stack_size).to(device)
    dqn_optimizer = optim.Adam(dqn.parameters())
    icm_optimizer = optim.Adam(icm.parameters())
    replay_buffer = ReplayBuffer(buffer_size)

    for episode in range(episodes):
        state = env.reset()
        episode_reward = 0
        done = False

        while not done:
            # Choose action
            if np.random.rand() < epsilon:
                action = env.action_space.sample()
            else:
                state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
                q_values = dqn(state_tensor)
                action = torch.argmax(q_values).item()

            # Take action and observe next state
            next_state, reward, done, _ = env.step(action)

            # Compute intrinsic reward
            state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            next_state_tensor = torch.tensor(next_state, dtype=torch.float32, device=device).unsqueeze(0)
            action_tensor = torch.tensor([action], dtype=torch.float32, device=device)

            inverse_loss, forward_loss = icm(state_tensor, next_state_tensor, action_tensor)
            intrinsic_reward = forward_loss.item()

            # Add experience to replay buffer
            replay_buffer.append(state, action, reward + intrinsic_reward, next_state, done)

            # Sample from replay buffer and update networks
            if len(replay_buffer.buffer) >= batch_size:
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)

                states_tensor = torch.tensor(states, dtype=torch.float32, device=device)
                actions_tensor = torch.tensor(actions, dtype=torch.long, device=device).unsqueeze(1)
                rewards_tensor = torch.tensor(rewards, dtype=torch.float32, device=device).unsqueeze(1)
                next_states_tensor = torch.tensor(next_states, dtype=torch.float32, device=device)
                dones_tensor = torch.tensor(dones, dtype=torch.float32, device=device).unsqueeze(1)

                # Update DQN
                q_values = dqn(states_tensor).gather(1, actions_tensor)
                next_q_values = dqn(next_states_tensor).max(1)[0].detach()
                expected_q_values = rewards_tensor + gamma * next_q_values * (1 - dones_tensor)
                dqn_loss = nn.MSELoss()(q_values, expected_q_values)

                dqn_optimizer.zero_grad()
                dqn_loss.backward()
                dqn_optimizer.step()

                # Update ICM
                icm_optimizer.zero_grad()
                inverse_loss, forward_loss = icm(states_tensor, next_states_tensor, actions_tensor)
                icm_loss = inverse_loss + forward_loss
                icm_loss.backward()
                icm_optimizer.step()

            state = next_state
            episode_reward += reward

        epsilon *= epsilon_decay
        print(f"Episode {episode + 1}, Reward: {episode_reward}, Epsilon: {epsilon}")

    env.close()

if __name__ == "__main__":
    env = gym.make("Hopper-v2")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    episodes = 1000
    batch_size = 64
    buffer_size = 100000
    gamma = 0.99
    epsilon = 1.0
    epsilon_decay = 0.995

    train_agent(env, episodes, batch_size, buffer_size, gamma, epsilon, epsilon_decay)


You appear to be missing MuJoCo.  We expected to find the file here: C:\Users\Vimarsh\.mujoco\mujoco210

This package only provides python bindings, the library must be installed separately.

Please follow the instructions on the README to install MuJoCo

    https://github.com/openai/mujoco-py#install-mujoco

Which can be downloaded from the website

    https://www.roboti.us/index.html



Exception: 
You appear to be missing MuJoCo.  We expected to find the file here: C:\Users\Vimarsh\.mujoco\mujoco210

This package only provides python bindings, the library must be installed separately.

Please follow the instructions on the README to install MuJoCo

    https://github.com/openai/mujoco-py#install-mujoco

Which can be downloaded from the website

    https://www.roboti.us/index.html


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# 1. State Encoder
class StateEncoder(nn.Module):
    def __init__(self, state_dim, encoding_dim):
        super(StateEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, encoding_dim),
        )

    def forward(self, state):
        return self.encoder(state)

# 2. Inverse Model
class InverseModel(nn.Module):
    def __init__(self, encoding_dim, action_dim):
        super(InverseModel, self).__init__()
        self.inverse_model = nn.Sequential(
            nn.Linear(encoding_dim * 2, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim),
        )

    def forward(self, state_encoding, next_state_encoding):
        input_tensor = torch.cat([state_encoding, next_state_encoding], dim=1)
        return self.inverse_model(input_tensor)

# 3. Forward Model
class ForwardModel(nn.Module):
    def __init__(self, encoding_dim, action_dim, stack_size=4):
        super(ForwardModel, self).__init__()
        self.stack_size = stack_size
        self.forward_model = nn.Sequential(
            nn.Linear(encoding_dim * stack_size + action_dim, 256),
            nn.ReLU(),
            nn.Linear(256, encoding_dim),
        )

    def forward(self, state_encoding_stack, action):
        input_tensor = torch.cat([state_encoding_stack.view(-1, self.stack_size * state_encoding_stack.shape[-1]), action], dim=1)
        return self.forward_model(input_tensor)

# 4. Intrinsic Curiosity Module (ICM)
class ICM(nn.Module):
    def __init__(self, state_dim, action_dim, encoding_dim, stack_size=4):
        super(ICM, self).__init__()
        self.state_encoder = StateEncoder(state_dim, encoding_dim)
        self.inverse_model = InverseModel(encoding_dim, action_dim)
        self.forward_model = ForwardModel(encoding_dim, action_dim, stack_size)

    def forward(self, state, next_state, action):
        state_encoding = self.state_encoder(state)
        next_state_encoding = self.state_encoder(next_state)

        inverse_loss = nn.MSELoss()(self.inverse_model(state_encoding, next_state_encoding), action)

        state_encoding_stack = torch.cat([state_encoding.unsqueeze(1)] * self.forward_model.stack_size, dim=1)
        forward_loss = nn.MSELoss()(self.forward_model(state_encoding_stack, action), next_state_encoding)

        return inverse_loss, forward_loss

# Example usage
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

state_dim = ... # Dimension of the state space
action_dim = ... # Dimension of the action space
encoding_dim = 64 # Encoding dimension for state representations
stack_size = 4 # Number of previous frames to stack

# Load your data
states, next_states, actions = ... # Load your data

dataset = TensorDataset(states, next_states, actions)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

icm = ICM(state_dim, action_dim, encoding_dim, stack_size).to(device)
optimizer = optim.Adam(icm.parameters(), lr=0.001)

for epoch in range(num_epochs):
    for states, next_states, actions in dataloader:
        states = states.to(device)
        next_states = next_states.to(device)
        actions = actions.to(device)

        optimizer.zero_grad()

        inverse_loss, forward_loss = icm(states, next_states, actions)
        loss = inverse_loss + forward_loss

        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}, Inverse Loss: {inverse_loss.item()}, Forward Loss: {forward_loss.item()}")