In [3]:
import torch.nn as nn
import torch.nn.functional as F
import gym_pikachu_volleyball
import gym
import time
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
import torch
import torch.optim as optim

In [4]:
class PolicyNetwork(nn.Module) :
    def __init__(self):
        super(PolicyNetwork, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=3, stride=2, padding=1)
        self.fc_actor = nn.Linear(1120, 256)
        self.out_actor = nn.Linear(256, 18)

    def forward(self, x):
        x = nn.functional.relu(self.conv1(x))
        x = nn.functional.max_pool2d(x, kernel_size=2)
        x = nn.functional.relu(self.conv2(x))
        x = nn.functional.max_pool2d(x, kernel_size=2)
        x = nn.functional.relu(self.conv3(x))
        x = nn.functional.max_pool2d(x, kernel_size=2)
        x = x.view(-1, 1120)
        actor_out = nn.functional.relu(self.fc_actor(x))
        actor_out = nn.functional.softmax(self.out_actor(actor_out), dim=1)
        return actor_out

class ValueNetwork(nn.Module) :
    def __init__(self):
        super(ValueNetwork, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=3, stride=2, padding=1)
        self.fc_critic = nn.Linear(1120, 256)
        self.out_critic = nn.Linear(256, 1)

    def forward(self, x):
        x = nn.functional.relu(self.conv1(x))
        x = nn.functional.max_pool2d(x, kernel_size=2)
        x = nn.functional.relu(self.conv2(x))
        x = nn.functional.max_pool2d(x, kernel_size=2)
        x = nn.functional.relu(self.conv3(x))
        x = nn.functional.max_pool2d(x, kernel_size=2)
        x = x.view(-1, 1120)
        critic_out = nn.functional.relu(self.fc_critic(x))
        critic_out = self.out_critic(critic_out)
        return critic_out

class ActorCritic(nn.Module):
    def __init__(self):
        super(ActorCritic, self).__init__()

        # Convolutional layers to reduce the input image size
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=3, stride=2, padding=1)

        # Fully connected layers for the actor and critic
        self.fc_actor = nn.Linear(1120, 256)
        self.fc_critic = nn.Linear(1120, 256)

        # Output layers for the actor and critic
        self.out_actor = nn.Linear(256, 18)  # 4 possible actions
        self.out_critic = nn.Linear(256, 1)

    def forward(self, x):
        x = nn.functional.relu(self.conv1(x))
        x = nn.functional.max_pool2d(x, kernel_size=2)
        x = nn.functional.relu(self.conv2(x))
        x = nn.functional.max_pool2d(x, kernel_size=2)
        x = nn.functional.relu(self.conv3(x))
        x = nn.functional.max_pool2d(x, kernel_size=2)

        # Flatten the output of the convolutional layers and pass through fully connected layers
        x = x.view(-1, 1120)
        actor_out = nn.functional.relu(self.fc_actor(x))
        critic_out = nn.functional.relu(self.fc_critic(x))

        # Output layers for the actor and critic
        actor_out = nn.functional.softmax(self.out_actor(actor_out), dim=1)
        critic_out = self.out_critic(critic_out)

        return actor_out, critic_out

In [5]:
def convert_color(s) :
    nonblack = (s != [0, 0, 0]).any(axis=2)
    s[nonblack] = [255, 255, 255]
    result_img = Image.fromarray(s)
    result_img = np.array(result_img)
    result_img = 255 - result_img
    result_img = Image.fromarray(result_img)
    result_img = transform(result_img)
    return result_img

In [6]:
class Agent:
    def __init__(self, LR=1e-3, GAMMA=0.99):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.LR = LR
        self.GAMMA = GAMMA
        self.policy_net = PolicyNetwork().to(self.device)
        self.value_net = ValueNetwork().to(self.device)
        self.optimizer = optim.Adam(
            list(self.policy_net.parameters()) + list(self.value_net.parameters()), lr=LR
        )
        self.log_probs = []
        self.values = []
        self.rewards = []

    def select_action(self, state):
        action_probabilities_tensor = self.policy_net(state.to(self.device))
        action_probabilities_numpy = action_probabilities_tensor.detach().cpu().numpy()[0]
        action = np.random.choice(18, p=action_probabilities_numpy)
        return action

    def record(self, state, reward, action):
        log_prob = torch.log(self.policy_net(state.to(self.device))[0][action])
        self.log_probs.append(log_prob)
        self.rewards.append(reward)
        value_state = self.value_net(state.to(self.device))
        self.values.append(value_state)

    def optimize(self):
        discounted_rewards = []
        cumulative_reward = 0
        for reward in reversed(self.rewards):
            cumulative_reward = reward + self.GAMMA * cumulative_reward
            discounted_rewards.append(cumulative_reward)
        discounted_rewards.reverse()
        advantage = torch.tensor(discounted_rewards).to(self.device) - torch.cat(self.values)
        target_value = torch.tensor(discounted_rewards).to(self.device)
        value_loss = nn.MSELoss()(torch.cat(self.values), target_value.unsqueeze(1))
        policy_loss = []
        for log_prob, adv in zip(self.log_probs, advantage):
            policy_loss.append(-log_prob * adv)
        policy_loss = torch.cat(policy_loss).mean()
        self.optimizer.zero_grad()
        value_loss.backward(retain_graph=True)
        policy_loss.backward()
        self.optimizer.step()

    def initial(self):
        self.log_probs = []
        self.values = []
        self.rewards = []


In [None]:
transform = transforms.ToTensor()
env = gym.make("PikachuVolleyball-v0", render_mode = None)
option={'is_player1_serve' : True, 'is_player2_serve' : True}

a2c_agent_1 = Agent()
a2c_agent_2 = Agent()
num_episodes = 2000
scores = []
for i in range(num_episodes):
    state = env.reset(options=option)
    state = convert_color(state)

    a2c_agent_1.initial()
    a2c_agent_2.initial()

    done = False
    episode_reward = 0

    while not done:
        state_tensor = state.float().to(a2c_agent_1.device)
        action_1 = a2c_agent_1.select_action(state_tensor)
        action_2 = a2c_agent_2.select_action(state_tensor)

        next_state, reward, done, _ = env.step([action_1, action_2])
        next_state = convert_color(next_state)
        reward = 1
        episode_reward += reward
        next_state_tensor = next_state.float().to(a2c_agent_1.device)
        a2c_agent_1.record(state_tensor, reward, action_1)
        a2c_agent_2.record(state_tensor, reward, action_2)
        if done:
            scores.append(episode_reward)
            a2c_agent_1.optimize()
            a2c_agent_2.optimize()
            break
        else:
            state = next_state

    print(f"Episode {i}, Reward: {episode_reward}")


Episode 0, Reward: 38
Episode 1, Reward: 39
Episode 2, Reward: 38
Episode 3, Reward: 38
Episode 4, Reward: 22
Episode 5, Reward: 58
Episode 6, Reward: 38
Episode 7, Reward: 19
Episode 8, Reward: 111
Episode 9, Reward: 32
Episode 10, Reward: 22
Episode 11, Reward: 32
Episode 12, Reward: 19
Episode 13, Reward: 38
Episode 14, Reward: 57
Episode 15, Reward: 19
Episode 16, Reward: 39
Episode 17, Reward: 19
Episode 18, Reward: 109
Episode 19, Reward: 39
Episode 20, Reward: 61
Episode 21, Reward: 32
Episode 22, Reward: 58
Episode 23, Reward: 61
Episode 24, Reward: 58
Episode 25, Reward: 22
Episode 26, Reward: 32
