In [None]:
### Install libraries ###

#!pip install git+https://github.com/HumanCompatibleAI/overcooked_ai.git

In [None]:
### Install libraries ###

#!pip install git+https://github.com/HumanCompatibleAI/overcooked_ai.git

In [4]:
### Imports ###

from overcooked_ai_py.mdp.overcooked_mdp import OvercookedGridworld
from overcooked_ai_py.mdp.overcooked_env import OvercookedEnv
from overcooked_ai_py.agents.benchmarking import AgentEvaluator
from overcooked_ai_py.visualization.state_visualizer import StateVisualizer
from overcooked_ai_py.agents.agent import NNPolicy, AgentFromPolicy, AgentPair
import gym
import numpy as np
import torch
from PIL import Image
import os
from IPython.display import display, Image as IPImage

import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# from google.colab import drive

In [5]:
# System set up
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()


def plot_soup(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(n_soups, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())
        
    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())  

In [6]:
class DQN(nn.Module):

    def __init__(self, n_obs, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_obs, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)
        
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

In [21]:
class DQNAgent:
    def __init__(self, n_obs, n_actions):
        self.batch_size = 128
        self.memory = deque([], maxlen=100000)
        self.epsilon = 0.95
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
        self.learning_rate = 0.0001
        self.tau = 0.005
        self.gamma = 0.99
        self.n_obs = n_obs
        self.n_actions = n_actions
        self.device = "cpu"
        self.policy = DQN(n_obs, n_actions).to(self.device)
        self.target = DQN(n_obs, n_actions).to(self.device)
        self.target.load_state_dict(self.policy.state_dict())
        self.target.eval()
        self.optimizer = optim.AdamW(self.policy.parameters(), lr=self.learning_rate)

    def memorize(self, obs, action, reward, next_obs, done):
        obs = torch.tensor(obs, device=self.device, dtype=torch.float32)
        action = torch.tensor(action, device=self.device, dtype=torch.long)
        reward = torch.tensor(reward, device=self.device, dtype=torch.float32)
        next_obs = torch.tensor(next_obs, device=self.device, dtype=torch.float32)
        done = torch.tensor(done, device=self.device, dtype=torch.float32)
        self.memory.append((obs, action, reward, next_obs, done))

    def epsilon_greedy(self, obs):
       # if self.epsilon > self.epsilon_min:
        #    self.epsilon *= self.epsilon_decay
            
        if random.random() >= self.epsilon:
            obs = torch.tensor(obs, device=self.device, dtype=torch.float32)
            q = self.policy(obs)
            return torch.argmax(q).item()
        else:
            return random.randrange(self.n_actions)

    def replay(self):
        if len(self.memory) < self.batch_size:
            return
            
        transitions = random.sample(self.memory, self.batch_size)
        obs, action, reward, next_obs, done = zip(*transitions)

        obs = torch.stack(obs)
        action = torch.stack(action)
        next_obs = torch.stack(next_obs)
        reward = torch.stack(reward)
        done = torch.stack(done)
        
        predicted_Q = self.policy(obs).gather(1, action.unsqueeze(1))
        # Compute the expected Q values
        next_state_values = self.target(next_obs).max(1)[0]
        target_Q = (next_state_values * self.gamma) * (1-done) + reward

        # Compute SmoothL1Loss
        loss = nn.SmoothL1Loss()(predicted_Q, target_Q.detach().unsqueeze(1))

        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_value_(self.policy.parameters(), 100)
        self.optimizer.step()

        #soft update
        for target_param, policy_param in zip(self.target.parameters(), self.policy.parameters()):
            target_param.data.copy_(self.tau * policy_param.data + (1.0 -self.tau) * target_param.data)
   #     target_nn_state_dict = self.target.state_dict()
   #     policy_nn_state_dict = self.policy.state_dict()
   #     for key in policy_nn_state_dict:
   #         target_nn_state_dict[key] = policy_nn_state_dict[key]*self.tau + target_nn_state_dict[key]*(1-self.tau)
   #     self.target.load_state_dict(target_nn_state_dict)          


In [22]:
### Environment setup ###
#random.seed(2023)
device = "cpu"

# Swap between the 5 layouts here:
layout = "cramped_room"
# layout = "asymmetric_advantages"
# layout = "coordination_ring"
# layout = "forced_coordination"
# layout = "counter_circuit_o_1order"

# Reward shaping is disabled by default.  This data structure may be used for
# reward shaping.  You can, of course, do your own reward shaping in lieu of, or
# in addition to, using this structure.
reward_shaping = {
    "PLACEMENT_IN_POT_REW": 3,
    "DISH_PICKUP_REWARD": 3,
    "SOUP_PICKUP_REWARD": 5
}

# Length of Episodes.  Do not modify for your submission!
# Modification will result in a grading penalty!
horizon = 400

# Build the environment.  Do not modify!
mdp = OvercookedGridworld.from_layout_name(layout, rew_shaping_params=reward_shaping)
base_env = OvercookedEnv.from_mdp(mdp, horizon=horizon, info_level=0)
env = gym.make("Overcooked-v0", base_env=base_env,
               featurize_fn=base_env.featurize_state_mdp)

n_actions = env.action_space.n
obs = env.reset()
obs0 = obs["both_agent_obs"][0]
obs1 = obs["both_agent_obs"][1]
n_obs = len(obs0)

agent0 = DQNAgent(n_obs, n_actions)
agent1 = DQNAgent(n_obs, n_actions)

In [None]:
### Train your agent ###

# The code below runs a few episodes with a random agent.  Your learning algorithm
# would go here.
n_soups = []
num_episodes = 500

for e in range(num_episodes):
    # Episode termination flag
    done = False

    # The number of soups the agent pair made during the episode
    num_soups_made = 0
    infos = []
    # Reset the environment at the start of each episode
    obs = env.reset()
    
    while not done:
        # Obtain observations for each agent
        obs0 = obs["both_agent_obs"][0]
        obs1 = obs["both_agent_obs"][1]
        
        a0 = agent0.epsilon_greedy(obs0)
        a1 = agent1.epsilon_greedy(obs1)

        # Take the selected actions and receive feedback from the environment
        # The returned reward "R" only reflects completed soups.  You can find
        # the separate shaping rewards in the "info" variables
        # info["shaped_r_by_agent"][0] and info["shaped_r_by_agent"][1].  Note that
        # this shaping reward does *not* include the +20 reward for completed
        # soups returned in "R".
        s, R, done, info = env.step([a0, a1])

        next_obs0 = s["both_agent_obs"][0]
        next_obs1 = s["both_agent_obs"][1]

        if info['policy_agent_idx'] == 0:
            reward0 = info['shaped_r_by_agent'][0] + info['sparse_r_by_agent'][0]
            reward1 = info['shaped_r_by_agent'][1] + info['sparse_r_by_agent'][1]
        else:
            reward1 = info['shaped_r_by_agent'][0] + info['sparse_r_by_agent'][0]
            reward0 = info['shaped_r_by_agent'][1] + info['sparse_r_by_agent'][1]

        if reward0 != 0 or reward1 != 0:
            infos.append([reward0, reward1])

        agent0.memorize(obs0, a0, reward0, next_obs0, done)
        agent1.memorize(obs1, a1, reward1, next_obs1, done)

        agent0.replay()
        agent1.replay()
        
        # Accumulate the number of soups made
        num_soups_made += int(R / 20) # Each served soup generates 20 reward

        if done:
            n_soups.append(num_soups_made)
            #plot_soup()

#print('Complete')
#plot_soup(show_result=True)
#plt.ioff()
#plt.show()
    # Display status
    print(infos)
    print("Ep {0}".format(e + 1), end=" ")
    print("number of soups made: {0}".format(num_soups_made))

[[0, 3], [0, 5], [3, 0]]
Ep 1 number of soups made: 0
[[3, 0], [3, 0], [0, 3], [0, 3], [0, 5]]
Ep 2 number of soups made: 0
[[3, 0]]
Ep 3 number of soups made: 0
[[3, 0], [5, 0], [0, 3]]
Ep 4 number of soups made: 0
[[3, 0], [5, 0], [0, 3]]
Ep 5 number of soups made: 0
[[3, 0]]
Ep 6 number of soups made: 0
[[3, 0], [0, 3], [0, 5]]
Ep 7 number of soups made: 0
[[3, 0]]
Ep 8 number of soups made: 0
[]
Ep 9 number of soups made: 0
[[0, 3]]
Ep 10 number of soups made: 0
[]
Ep 11 number of soups made: 0
[[3, 0]]
Ep 12 number of soups made: 0
[[0, 3]]
Ep 13 number of soups made: 0
[[3, 0], [0, 3]]
Ep 14 number of soups made: 0
[[3, 0], [5, 0]]
Ep 15 number of soups made: 0
[[3, 0]]
Ep 16 number of soups made: 0
[[3, 0], [3, 0], [0, 5]]
Ep 17 number of soups made: 0
[[3, 0], [0, 3], [5, 0], [0, 3], [0, 3], [3, 0]]
Ep 18 number of soups made: 0
[[0, 3], [3, 0]]
Ep 19 number of soups made: 0
[[3, 0], [3, 0]]
Ep 20 number of soups made: 0
[[0, 3], [0, 3], [0, 5], [3, 0], [3, 0], [0, 3], [0, 5]]


In [None]:
### Evaluate your agent ###

# This is where you would rollout episodes with your trained agent.
# The below code is a partcular way to rollout episodes in a format
# compatible with a state visualizer, if you'd like to visualize what your
# agents are doing during episodes.  Visualization is in the next cell.

class StudentPolicy(NNPolicy):
    """ Generate policy """
    def __init__(self):
        super(StudentPolicy, self).__init__()

    def state_policy(self, state, agent_index):
        """
        This method should be used to generate the poiicy vector corresponding to
        the state and agent_index provided as input.  If you're using a neural
        network-based solution, the specifics depend on the algorithm you are using.
        Below are two commented examples, the first for a policy gradient algorithm
        and the second for a value-based algorithm.  In policy gradient algorithms,
        the neural networks output a policy directly.  In value-based algorithms,
        the policy must be derived from the Q value outputs of the networks.  The
        uncommented code below is a placeholder that generates a random policy.
        """
        featurized_state = base_env.featurize_state_mdp(state)
        input_state = torch.FloatTensor(featurized_state[agent_index]).unsqueeze(0)

        # Example for policy NNs named "PNN0" and "PNN1"
        # with torch.no_grad():
        #   if agent_index == 0:
        #       action_probs = PNN0(input_state)[0].numpy()
        #   else:
        #       action_probs = PNN1(input_state)[0].numpy()

        # Example for Q value NNs named "QNN0" and "QNN1"
        # action_probs = np.zeros(env.action_space.n)
        # with torch.no_grad():
        #   if agent_index == 0:
        #       action_probs[np.argmax(QNN0(input_state)[0].numpy())] = 1
        #   else:
        #       action_probs[np.argmax(QNN1(input_state)[0].numpy())] = 1

        # Random deterministic policy
        action_probs = np.zeros(env.action_space.n)
        action_probs[env.action_space.sample()] = 1

        return action_probs

    def multi_state_policy(self, states, agent_indices):
        """ Generate a policy for a list of states and agent indices """
        return [self.state_policy(state, agent_index) for state, agent_index in zip(states, agent_indices)]


class StudentAgent(AgentFromPolicy):
    """Create an agent using the policy created by the class above"""
    def __init__(self, policy):
        super(StudentAgent, self).__init__(policy)


# Instantiate the policies for both agents
policy0 = StudentPolicy()
policy1 = StudentPolicy()

# Instantiate both agents
agent0 = StudentAgent(policy0)
agent1 = StudentAgent(policy1)
agent_pair = AgentPair(agent0, agent1)

# Generate an episode
ae = AgentEvaluator.from_layout_name({"layout_name": layout}, {"horizon": horizon})
trajs = ae.evaluate_agent_pair(agent_pair, num_games=1)
print("\nlen(trajs):", len(trajs))

In [None]:
### Agent Visualization ###

##############################################################################
# The function StateVisualizer() below generates images for the state of the
# environment at each time step of the episode.
#
# You have several options for how to use these images:
#
# 1) You can set img_dir to a local directory (or a directory within Google Drive
# if using Colab), and all the images will be saved to that directory for you to browse.
#
# 2) If using a notebook, you can set the argument ipthon_display=True to get a
# tool with a slider that lets you scan through all the images directly in the
# notebook.  This option does not require you to store your images.
#
# 3) You can generate a GIF of the episode. This requires you to set
# img_dir.  The code to generate the GIF is commented out below

# Modify as appropriate
img_dir = None
ipython_display = True
gif_path = None

# Do not modify -- uncomment for GIF generation
StateVisualizer().display_rendered_trajectory(trajs, img_directory_path=img_dir, ipython_display=ipython_display)
# img_list = [f for f in os.listdir(img_dir) if f.endswith('.png')]
# img_list.sort(key=lambda x: os.path.getmtime(os.path.join(img_dir, x)))
# images = [Image.open(img_dir + img).convert('RGBA') for img in img_list]
# images[0].save(gif_path, save_all=True, append_images=images[1:], optimize=False, duration=250, loop=0)
# with open(gif_path,'rb') as f: display(IPImage(data=f.read(), format='png'))