In [None]:
### Install libraries ###

!pip install git+https://github.com/HumanCompatibleAI/overcooked_ai.git

In [None]:
### Imports ###

from overcooked_ai_py.mdp.overcooked_mdp import OvercookedGridworld
from overcooked_ai_py.mdp.overcooked_env import OvercookedEnv
from overcooked_ai_py.agents.agent import NNPolicy, AgentFromPolicy, AgentPair
from overcooked_ai_py.agents.benchmarking import AgentEvaluator
from overcooked_ai_py.visualization.state_visualizer import StateVisualizer
import gym
import numpy as np
import torch
from PIL import Image
import os
from IPython.display import display, Image as IPImage

## Uncomment if you'd like to use your personal Google Drive to store outputs
## from your runs. You can find some hooks to Google Drive commented
## throughout the rest of this code.
# from google.colab import drive

In [None]:
### Environment setup ###

## Swap between the 3 layouts here:
layout = "cramped_room"
# layout = "asymmetric_advantages"
# layout = "forced_coordination"

## Reward shaping is disabled by default; i.e., only the sparse rewards are
## included in the reward returned by the enviornment).  If you'd like to do
## reward shaping (recommended to make the task much easier to solve), this
## data structure provides access to a built-in reward-shaping mechanism within
## the Overcooked environment.  You can, of course, do your own reward shaping
## in lieu of, or in addition to, using this structure. The shaped rewards
## provided by this structure will appear in a different place (see below)
reward_shaping = {
    "PLACEMENT_IN_POT_REW": 3,
    "DISH_PICKUP_REWARD": 3,
    "SOUP_PICKUP_REWARD": 5
}

# Length of Episodes.  Do not modify for your submission!
# Modification will result in a grading penalty!
horizon = 400

# Build the environment.  Do not modify!
mdp = OvercookedGridworld.from_layout_name(layout, rew_shaping_params=reward_shaping)
base_env = OvercookedEnv.from_mdp(mdp, horizon=horizon, info_level=0)
env = gym.make("Overcooked-v0", base_env=base_env, featurize_fn=base_env.featurize_state_mdp)

In [None]:
### Train your agent ###

# drive.mount('/content/drive')
# !mkdir -p "/content/drive/My Drive/Colab"

# The code below runs a few episodes with a random agent.  Your learning algorithm
# would go here.

num_episodes = 5

for e in range(num_episodes):
    # Episode termination flag
    done = False

    # The number of soups the agent pair made during the episode
    num_soups_made = 0

    # Reset the environment at the start of each episode
    obs = env.reset()

    while not done:
        # Obtain observations for each agent
        obs0 = obs["both_agent_obs"][0]
        obs1 = obs["both_agent_obs"][1]

        # Select random actions from the set {North, South, East, West, Stay, Interact}
        # for each agent.
        a0 = env.action_space.sample()
        a1 = env.action_space.sample()

        # Take the selected actions and receive feedback from the environment
        # The returned reward "R" only reflects completed soups.
        obs, R, done, info = env.step([a0, a1])

        # You can find the separate shaping rewards induced by the data
        # structure you defined above in the "info" dictionary.
        ## THE REVERSAL OF THIS ARRAY IS NECESSARY TO ALIGN THE CORRECT REWARD
        ## TO THE CORRECT AGENT (see project documentation)!
        # Note that this shaping reward does *not* include the +20 reward for
        # completed soups (the one returned in "R").
        r_shaped = info["shaped_r_by_agent"]
        if env.agent_idx:
            r_shaped_0 = r_shaped[1]
            r_shaped_1 = r_shaped[0]
        else:
            r_shaped_0 = r_shaped[0]
            r_shaped_1 = r_shaped[1]

        # Accumulate the number of soups made
        num_soups_made += int(R / 20) # Each served soup generates 20 reward

    # Display status
    print("Ep {0}".format(e + 1), end=" ")
    print("shaped reward for agent 0: {0}:".format(r_shaped_0), end=" ")
    print("shaped reward for agent 1: {0}".format(r_shaped_1), end=" ")
    print("number of soups made: {0}".format(num_soups_made))

# The info flag returned by the environemnt contains special status info
# specifically when done == True.  This information may be useful in
# developing, debugging, and analyzing your results.  It may also be a good
# way for you to find a metric that you can use in evaluating collaboration
# between your agents.
print("\nExample end-of-episode info dump:\n", info)

In [None]:
### All of the remaining code in this notebook is solely for using the
### built-in Overcooked state visualizer on a trained agent, so that you can see
### a graphical rendering of what your agents are doing. It is not
### necessary to use this.

# The below code is a partcular way to rollout episodes in a format
# compatible with the built-in state visualizer.

class StudentPolicy(NNPolicy):
    """ Generate policy """
    def __init__(self):
        super(StudentPolicy, self).__init__()

    def state_policy(self, state, agent_index):
        """
        This method should be used to generate the poiicy vector corresponding to
        the state and agent_index provided as input.  If you're using a neural
        network-based solution, the specifics depend on the algorithm you are using.
        Below are two commented examples, the first for a policy gradient algorithm
        and the second for a value-based algorithm.  In policy gradient algorithms,
        the neural networks output a policy directly.  In value-based algorithms,
        the policy must be derived from the Q value outputs of the networks.  The
        uncommented code below is a placeholder that generates a random policy.
        """
        featurized_state = base_env.featurize_state_mdp(state)
        input_state = torch.FloatTensor(featurized_state[agent_index]).unsqueeze(0)

        # Example for policy NNs named "PNN0" and "PNN1"
        # with torch.no_grad():
        #   if agent_index == 0:
        #       action_probs = PNN0(input_state)[0].numpy()
        #   else:
        #       action_probs = PNN1(input_state)[0].numpy()

        # Example for Q value NNs named "QNN0" and "QNN1"
        # action_probs = np.zeros(env.action_space.n)
        # with torch.no_grad():
        #   if agent_index == 0:
        #       action_probs[np.argmax(QNN0(input_state)[0].numpy())] = 1
        #   else:
        #       action_probs[np.argmax(QNN1(input_state)[0].numpy())] = 1

        # Random deterministic policy
        action_probs = np.zeros(env.action_space.n)
        action_probs[env.action_space.sample()] = 1

        return action_probs

    def multi_state_policy(self, states, agent_indices):
        """ Generate a policy for a list of states and agent indices """
        return [self.state_policy(state, agent_index) for state, agent_index in zip(states, agent_indices)]


class StudentAgent(AgentFromPolicy):
    """Create an agent using the policy created by the class above"""
    def __init__(self, policy):
        super(StudentAgent, self).__init__(policy)


# Instantiate the policies for both agents
policy0 = StudentPolicy()
policy1 = StudentPolicy()

# Instantiate both agents
agent0 = StudentAgent(policy0)
agent1 = StudentAgent(policy1)
agent_pair = AgentPair(agent0, agent1)

# Generate an episode
ae = AgentEvaluator.from_layout_name({"layout_name": layout}, {"horizon": horizon})
trajs = ae.evaluate_agent_pair(agent_pair, num_games=1)
print("\nlen(trajs):", len(trajs))

In [None]:
##############################################################################
# The function StateVisualizer() below generates images for the state of the
# environment at each time step of the episode.
#
# You have several options for how to use these images:
#
# 1) You can set img_dir to a local directory (or a directory within Google Drive
# if using Colab), and all the images will be saved to that directory for you to browse.
#
# 2) If using a notebook, you can set the argument ipthon_display=True to get a
# tool with a slider that lets you scan through all the images directly in the
# notebook.  This option does not require you to store your images.
#
# 3) You can generate a GIF of the episode. This requires you to set
# img_dir.  The code to generate the GIF is commented out below

# Modify as appropriate. Example hooks to Google drive are commented.
img_dir =  "imgs/" # "/content/drive/My Drive/Colab/" + "imgs_" + layout + "/"
ipython_display = True
gif_path = "imgs/imgs.gif" # "/content/drive/My Drive/Colab/" + layout + ".gif"

StateVisualizer().display_rendered_trajectory(trajs, img_directory_path=img_dir, ipython_display=ipython_display)

## Uncomment for GIF to be generated and stored in 'gif_path'. Requires 'img_dir'
## to point to a directory of saved images.
# img_list = [f for f in os.listdir(img_dir) if f.endswith('.png')]
# img_list = sorted(img_list, key=lambda x: int(x.split('.')[0]))
# images = [Image.open(img_dir + img).convert('RGBA') for img in img_list]
# images[0].save(gif_path, save_all=True, append_images=images[1:], optimize=False, duration=250, loop=0)
# with open(gif_path, 'rb') as f: display(IPImage(data=f.read(), format='png'))