# Parameters

In [None]:
#Variables to change based upon specifics of test
TEST_EPISODES = 1 # Number of tests to run for each model
TEST_STEPS = 10000 # Total timesteps to run for each model
USING_CUSTOM_ENV = True #Are we using a custom enviroment
DIRECTORY_PATH = "/content/drive/MyDrive/packages/minerl_saved_models" #Directory we have the models saved in
SAVE_LOCATION = "/content/drive/MyDrive/packages/minerl_test_outputs" #Directory we are saving videos to
FORCE_STOP = False #Force stops after one test (for code testing purposes)

#Installations

In [None]:
import sys
from google.colab import drive
# Allow colab to access google drive
drive.mount('/content/drive')

In [None]:
LOCAL_MINERL = True

if LOCAL_MINERL:
  !chmod 555 -R "/content/drive/MyDrive/packages/minerl"
  sys.path.append("/content/drive/MyDrive/packages/minerl")
  !chmod 555 -R "/content/drive/MyDrive/packages/MixinGradle-dcfaf61"
  sys.path.append("/content/drive/MyDrive/packages/MixinGradle-dcfaf61")


In [None]:
%%capture
!sudo add-apt-repository -y ppa:openjdk-r/ppa
!sudo apt-get purge openjdk-*
!sudo apt-get install openjdk-8-jdk
!sudo apt-get install xvfb
!sudo apt-get install xserver-xephyr
!sudo apt install tigervnc-standalone-server
!sudo apt-get install -y python3-opengl
!sudo apt-get install ffmpeg
!pip3 install gym==0.13.1
if LOCAL_MINERL:
  !pip3 install -e /content/drive/MyDrive/packages/minerl
else:
  !pip3 install minerl==0.4.4 --verbose
!pip3 install pyvirtualdisplay
!pip3 install -U colabgymrender
!sudo apt-get install xvfb
!pip3 install opencv-python
!pip3 install imageio==2.4.1

# Custom Environment Setup

In [None]:
from minerl.herobraine.env_specs.simple_embodiment import SimpleEmbodimentEnvSpec
from minerl.herobraine.hero.handler import Handler
from typing import List
import random

import minerl.herobraine.hero.handlers as handlers
from minerl.herobraine.hero.mc import ALL_ITEMS


"""
The intent of this env_spec is to create a survival environment for our agent to be evaluated in.
This environment allows us to tailor the observation and action spaces to our agent's and UI's needs.
"""

NONE = 'none'
OTHER = 'other'

MS_PER_STEP = 50

ML4MC_SURVIVAL_LENGTH = 1 * 60 * 60 * 20  # 1 hour * 60 minutes * 60 seconds * 20 ticks/steps per second

class ML4MCSurvival(SimpleEmbodimentEnvSpec):
    # ML4MCSurvival constructor
    def __init__(self, *args, **kwargs):
        if 'name' not in kwargs:
            kwargs['name'] = 'ML4MCSurvival-v0' # Add environment name if not added

        super().__init__(*args, max_episode_steps=ML4MC_SURVIVAL_LENGTH, **kwargs)

    # Allows scripts to observe inventory, equipped item, and current location related stats
    def create_observables(self) -> List[Handler]:
        return super().create_observables() + [
            handlers.FlatInventoryObservation(ALL_ITEMS),
            handlers.EquippedItemObservation(items=[
                'air', 'wooden_axe', 'wooden_pickaxe', 'stone_axe', 'stone_pickaxe', 'iron_axe', 'iron_pickaxe', NONE,
                OTHER
            ], _default='air', _other=OTHER),
            handlers.ObservationFromCurrentLocation(),
            handlers.ObservationFromLifeStats(),
        ]

    # Allows scripts to place blocks, equip items, craft items, and smelt items
    def create_actionables(self):
        return super().create_actionables() + [
            handlers.PlaceBlock([NONE, 'dirt', 'stone', 'cobblestone', 'crafting_table', 'furnace', 'torch'],
                                _other=NONE, _default=NONE),
            handlers.EquipAction([NONE, 'air', 'wooden_axe', 'stone_axe', 'iron_axe', 'stone_sword', 'iron_sword', 'wooden_sword'], _other=NONE, _default=NONE),
            handlers.CraftAction([NONE, 'torch', 'stick', 'planks', 'crafting_table'], _other=NONE, _default=NONE),
            handlers.CraftNearbyAction(
                [NONE, 'wooden_axe', 'wooden_pickaxe', 'stone_axe', 'stone_pickaxe', 'iron_axe', 'iron_pickaxe',
                 'furnace'], _other=NONE, _default=NONE),
            handlers.SmeltItemNearby([NONE, 'iron_ingot', 'coal'], _other=NONE, _default=NONE),
        ]

    # Rewards for collecting iron (and cobblestone)
    def create_rewardables(self) -> List[Handler]:
        return [
            # handlers.RewardForCollectingItems([
            #     dict(type="cobblestone", amount=1, reward=256.0),
            #     dict(type="dirt", amount=1, reward=64.0),
            # ])
            handlers.RewardForXPGain(
                reward_per_xp=100.0,
                reward_type="FIXED"),
            handlers.ConstantReward(constant=1.0)
            # handlers.RewardForTakingDMG(
            #     reward_per_dmg=10.0,
            #     reward_type="FIXED")
        ]


    # Start the agent with nothing by default, can be modified for testing
    def create_agent_start(self) -> List[Handler]:
        return [
            handlers.SimpleInventoryAgentStart([
                dict(type="iron_sword", quantity=5)
            ])
        ]

    # No agent handlers needed as we are not using any rewards
    def create_agent_handlers(self) -> List[Handler]:
        return [
            handlers.AgentQuitFromPossessingItem([
                dict(type="diamond_ore", amount=32)]
            )
        ]

    # Use the default world generator
    def create_server_world_generators(self) -> List[Handler]:
        # return [handlers.BiomeGenerator("extreme_hills")]
        return [
            handlers.FlatWorldGenerator(generatorString="3;7,220*1,5*3,2;3;dungeon")
            # handlers.DrawEntityHandler(
            #     xpos=str(random.randint(0,100)/10),
            #     ypos=str(random.randint(0,100)/10),
            #     zpos=str(random.randint(0,100)/10),
            #     mobname="zombie"
            # )

        ]

    def create_server_quit_producers(self) -> List[Handler]:
        # Set a timeout to end the episode to prevent it from running forever
        return [
            handlers.ServerQuitFromTimeUp(time_limit_ms=self.max_episode_steps * MS_PER_STEP),
            handlers.ServerQuitWhenAnyAgentFinishes()
        ]

    # This method can be used to change other things about the world such as drawing shapes or spawning a village
    # Not needed for ML4MCSurvival
    def create_server_decorators(self) -> List[Handler]:
        return [
            handlers.DrawingDecorator("""
              <DrawCuboid type="bedrock" x1="-15" x2="16" y1="1" y2="50" z1="-15" z2="16" />
              <DrawCuboid type="air" x1="-12" x2="13" y1="4" y2="39" z1="-12" z2="13" />
              <DrawCuboid type="glowstone" x1="-14" x2="15" y1="7" y2="30" z1="-14" z2="15" />
            """)
            # handlers.DrawEntityHandler(
            #     xpos=str(random.randint(0,100)/10),
            #     ypos=str(random.randint(0,100)/10),
            #     zpos=str(random.randint(0,100)/10),
            #     mobname="zombie"
            # )
        ]

    # This method sets the conditions for the world the agent will spawn into
    # We will allow spawning and the passage of time to replicate a realistic Minecraft environment
    def create_server_initial_conditions(self) -> List[Handler]:
        return [
            handlers.TimeInitialCondition(
                start_time=18000, #18000 is night #6000 is day
                allow_passage_of_time=True,
            ),
            handlers.SpawningInitialCondition(
                allow_spawning=True
            )
        ]

    def is_from_folder(self, folder: str) -> bool:
        return folder == 'ml4mc_survival'

    # Don't need docstring as we're not publishing this environment to MineRL's website
    def get_docstring(self):
        return ""

    def determine_success_from_rewards(self, rewards: list) -> bool:
        # All survival experiemnts are a success =)
        return sum(rewards) >= self.reward_threshold

#Setup

In [None]:

import os
import numpy as np
import torch as th
from torch import nn
import gym
import minerl
import pandas as pd
from tqdm.notebook import tqdm
from colabgymrender.recorder import Recorder
from pyvirtualdisplay import Display
import logging
logging.disable(logging.ERROR)
from datetime import datetime

In [None]:
#Start the Display for saving videos on Colab
from pyvirtualdisplay import Display
from os import path
display = Display(visible=False, size=(400, 300))
display.start();

In [None]:
class NatureCNN(nn.Module):
    """
    CNN from DQN nature paper:
        Mnih, Volodymyr, et al.
        "Human-level control through deep reinforcement learning."
        Nature 518.7540 (2015): 529-533.

    :param input_shape: A three-item tuple telling image dimensions in (C, H, W)
    :param output_dim: Dimensionality of the output vector
    """

    def __init__(self, input_shape, output_dim):
        super().__init__()
        n_input_channels = input_shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=8, stride=4, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=0),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(th.zeros(1, *input_shape)).shape[1]

        self.linear = nn.Sequential(
            nn.Linear(n_flatten, 512),
            nn.ReLU(),
            nn.Linear(512, output_dim)
        )

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

In [None]:
class ActionShaping(gym.ActionWrapper):
  def __init__(self, env, camera_angle=10, always_attack=False):
    super().__init__(env)

    self.camera_angle = camera_angle
    self.always_attack = always_attack
    self._actions = [
      [('attack', 1)],
      [('forward', 1)],
      # [('back', 1)],
      # [('left', 1)],
      # [('right', 1)],
      # [('jump', 1)],
      # [('forward', 1), ('attack', 1)],
      # [('craft', 'planks')],
      [('forward', 1), ('jump', 1)],
      [('camera', [-self.camera_angle, 0])],
      [('camera', [self.camera_angle, 0])],
      [('camera', [0, self.camera_angle])],
      [('camera', [0, -self.camera_angle])],
    ]

    self.actions = []
    for actions in self._actions:
      act = self.env.action_space.noop()
      for a, v in actions:
        act[a] = v
      if self.always_attack:
        act['attack'] = 1
      self.actions.append(act)

    self.action_space = gym.spaces.Discrete(len(self.actions))

  def action(self, action):
    return self.actions[action]

In [None]:
def dataset_action_batch_to_actions(dataset_actions, camera_margin=5):
  # There are dummy dimensions of shape one
  camera_actions = dataset_actions["camera"].squeeze()
  attack_actions = dataset_actions["attack"].squeeze()
  forward_actions = dataset_actions["forward"].squeeze()
  jump_actions = dataset_actions["jump"].squeeze()
  batch_size = len(camera_actions)
  actions = np.zeros((batch_size,), dtype=np.int)

  for i in range(len(camera_actions)):
    # Moving camera is most important (horizontal first)
    if camera_actions[i][0] < -camera_margin:
      actions[i] = 3
    elif camera_actions[i][0] > camera_margin:
      actions[i] = 4
    elif camera_actions[i][1] > camera_margin:
      actions[i] = 5
    elif camera_actions[i][1] < -camera_margin:
      actions[i] = 6
    elif forward_actions[i] == 1:
      if jump_actions[i] == 1:
        actions[i] = 2
      else:
        actions[i] = 1
    elif attack_actions[i] == 1:
      actions[i] = 0
    else:
      # No reasonable mapping (would be no-op)
      actions[i] = -1
  return actions

In [None]:
def str_to_act(env, actions):
  act = env.action_space.noop()
  for action in actions.split():
    if ":" in action:
      k, v = action.split(':')
      if k == 'camera':
        act[k] = eval(v)
      else:
        act[k] = v
    else:
      act[action] = 1
  return act

In [None]:
if USING_CUSTOM_ENV:
  abs_CUSTOM = ML4MCSurvival()
  abs_CUSTOM.register()

In [None]:
!pip3 install stable-baselines3

Training

In [None]:
from stable_baselines3.common import results_plotter
from stable_baselines3.common import monitor
from stable_baselines3.common.results_plotter import load_results, ts2xy
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class SaveOnBestTrainingRewardCallback(BaseCallback):
  def __init__(self, check_freq: int, log_dir: str, verbose=1):
    super(SaveOnBestTrainingRewardCallback, self).__init__(verbose)
    self.check_freq = check_freq
    self.log_dir = log_dir
    self.save_path = os.path.join(log_dir, 'best_model')
    self.best_mean_reward = -np.inf

  def _init_callack(self) -> None:
    if self.save_path is not None:
      os.makedirs(self.save_path, exist_ok=True)

  def _on_step(self) -> bool:
    if self.n_calls % self.check_freq == 0:

      #Retrieve  Training Reward
      x, y = ts2xy(load_results(self.log_dir), 'timesteps')
      if len(x) > 0:
          #Mean training reward over the last 100 episodes
          mean_reward = np.mean(y[-100:])
          if self.verbose > 0:
            print("Num timesteps: {}".format(self.num_timesteps))
            print("Best mean reward: {:.2f} - Last mean reward per episode: {:.2f}".format(self.best_mean_reward, mean_reward))

            #New best model, save the agent
            if mean_reward > self.best_mean_reward:
              self.best_mean_reward = mean_reward
              #Example for saving best model
              if self.verbose > 0:
                print("Saving new best model to {}".format(self.save_path))
              self.model.save(self.save_path)
    return True

In [None]:
# abs_STONE = StoneCollection()
# abs_STONE.register() # Register with gym

In [None]:
from stable_baselines3 import PPO

if USING_CUSTOM_ENV:
  env = gym.make('ML4MCSurvival-v0')
else:
  env = gym.make('MineRLObtainDiamond-v0')

In [None]:
from colabgymrender.recorder import Recorder
env = Recorder(env, "/content/drive/MyDrive/ml4mc_outputs", fps=60)

In [None]:
!pip install shimmy

In [None]:
#A wrapper for getting the POV of the avatar from the environment, which is needed for stable_baselines
class ExtractPOV(gym.ObservationWrapper):
  def __init__(self, env):
    super().__init__(env)
    self.observation_space = self.env.observation_space['pov']

  def observation(self, observation):
    return observation['pov']

In [None]:
"""
#callback addition
log_dir = "tmp/"
os.makedirs(log_dir, exist_ok=True)
monitored_env = monitor.Monitor(env1, log_dir)
callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir)
"""
obs_wrapped_stone = ExtractPOV(env) #Extracting the POV of the avatar from the environment which is needed for stable_baselines
obs_action_wrapped_stone = ActionShaping(obs_wrapped_stone) #Performing action shaping on the actions of the environment to convert them from dictionaries into an array.
obs = obs_action_wrapped_stone.reset() #reseting the provided environnment



model = PPO(policy="CnnPolicy", env=obs_action_wrapped_stone, verbose=1) #Setting the model to be a PPO model with a CnnPolicy. This was just the model used by tutorials, we'll experiment with the best model later
model.learn(total_timesteps=50000) #Training the model, allowing it to walk through 50000 timesteps of the environment (about 1.5 minutes)
env.release() #releasing the recorded environment to actually make a video on Colab.


In [None]:
"""
results_plotter.plot_results([log_dir], 5000, results_plotter.X_TIMESTEPS, "MineRL RL Training")
plt.show()
"""
model.save(DIRECTORY_PATH + "/" + 'combat.pth' )