In [1]:
import os
import re
import random
from typing import List, Dict, Any, Optional
from collections import defaultdict
from time import time
from glob import glob
import numpy as np
import gym
from textworld import EnvInfos
import textworld.gym

In [2]:
gamefiles = []
GAMES_PATH = "./sample_games/"
for file in os.listdir(GAMES_PATH):
    if file.endswith('.ulx'):
        gamefiles.append(GAMES_PATH + file)

In [3]:
def play(agent, path, max_step=50, nb_episodes=10, verbose=True):
    request_infos = agent.select_additional_infos
    request_infos.max_score = True  # Needed to normalize the scores.

    gamefiles = [path]
    if os.path.isdir(path):
        gamefiles = glob(os.path.join(path, "*.ulx"))

    env_id = textworld.gym.register_games(gamefiles,
                                          request_infos=request_infos,
                                          max_episode_steps=max_step)
    env = gym.make(env_id)  # Create a Gym environment to play the text game.
    if verbose:
        if os.path.isdir(path):
            print(os.path.dirname(path), end="")
        else:
            print(os.path.basename(path), end="")

    # Collect some statistics: nb_steps, final reward.
    avg_moves, avg_scores, avg_norm_scores = [], [], []
    for no_episode in range(nb_episodes):
        obs, infos = env.reset()  # Start new episode.

        score = 0
        done = False
        nb_moves = 0
        while not done:
            command = agent.act(obs, score, done, infos)
            obs, score, done, infos = env.step(command)
            nb_moves += 1

        # Let the agent know the game is done.
        agent.act(obs, score, done, infos)

        if verbose:
            print(".", end="")
        avg_moves.append(nb_moves)
        avg_scores.append(score)
        avg_norm_scores.append(score / infos["max_score"])

    env.close()
    msg = "  \tavg. steps: {:5.1f}; avg. score: {:4.1f} / {}."
    if verbose:
        if os.path.isdir(path):
            print(msg.format(np.mean(avg_moves), np.mean(avg_norm_scores), 1))
        else:
            print(msg.format(np.mean(avg_moves), np.mean(
                avg_scores), infos["max_score"]))

In [71]:
class CustomAgent():

    def __init__(self) -> None:
        self._initialized = False
        self._epsiode_has_started = False
        self.mode = "test"
        
        # read in vocab.txt and map to id
        with open("./vocab.txt") as f:
            self.word_vocab = f.read().split("\n")
        self.word2id = {}
        self.id2word = []
        for i, w in enumerate(self.word_vocab):
            self.word2id[w] = i
            self.id2word.append(w)
        
        self.qvalues = [0] * len(self.id2word)

    def train(self) -> None:
        self.mode = "train"
        self.transitions = []
        self.last_score = 0
        self.no_train_step = 0

    def test(self) -> None:
        self.mode = "test"

    @property
    def select_additional_infos(self) -> EnvInfos:
        """
        Returns what additional information should be made available at each game step.

        Requested information will be included within the `infos` dictionary
        passed to `CustomAgent.act()`. To request specific information, create a
        :py:class:`textworld.EnvInfos <textworld.envs.wrappers.filter.EnvInfos>`
        and set the appropriate attributes to `True`. The possible choices are:

        * `description`: text description of the current room, i.e. output of the `look` command;
        * `inventory`: text listing of the player's inventory, i.e. output of the `inventory` command;
        * `max_score`: maximum reachable score of the game;
        * `objective`: objective of the game described in text;
        * `entities`: names of all entities in the game;
        * `verbs`: verbs understood by the the game;
        * `command_templates`: templates for commands understood by the the game;
        * `admissible_commands`: all commands relevant to the current state;

        In addition to the standard information, game specific information
        can be requested by appending corresponding strings to the `extras`
        attribute. For this competition, the possible extras are:

        * `'recipe'`: description of the cookbook;
        * `'walkthrough'`: one possible solution to the game (not guaranteed to be optimal);

        Example:
            Here is an example of how to request information and retrieve it.

            >>> from textworld import EnvInfos
            >>> request_infos = EnvInfos(description=True, inventory=True, extras=["recipe"])
            ...
            >>> env = gym.make(env_id)
            >>> ob, infos = env.reset()
            >>> print(infos["description"])
            >>> print(infos["inventory"])
            >>> print(infos["extra.recipe"])

            Handicap is defined as follows
                max_score, has_won, has_lost,               # Handicap 0
                description, inventory, verbs, objective,   # Handicap 1
                command_templates,                          # Handicap 2
                entities,                                   # Handicap 3
                extras=["recipe"],                          # Handicap 4
                admissible_commands,                        # Handicap 5
        """
        return EnvInfos(description=True, inventory=True, max_score = True, 
                        admissible_commands=True, has_won=True, has_lost=True)

    def _init(self) -> None:
        """ Initialize the agent. """
        self._initialized = True

        # [You can insert code here.]
    
    # look up id of a word in dictionary & add word to dictionaries if unknown
    def _get_word_id(self, word):
        if word not in self.word2id:        
            self.id2word.append(word)
            self.qvalues.append(0)
            self.word2id[word] = len(self.word2id)
        return self.word2id[word]

    # look up word by id in dictionary
    # returns None, if id does not exist
    def _get_word_by_id(self, id):
        if id < len(self.id2word):
            return self.id2word[id]
        else:
            return None
    
    def _get_qvalue(self, id):
        if id < len(self.qvalues):
            return self.qvalues[id]
        else:
            return None
    
    def _tokenize(self, text):
        # Simple tokenizer: strip out all non-alphabetic characters.
        text = re.sub("[^a-zA-Z0-9\- ]", " ", text)
        word_ids = list(map(self._get_word_id, text.split()))
        return word_ids
    
    def _start_episode(self, obs: List[str], infos: Dict[str, List[Any]]) -> None:
        """
        Prepare the agent for the upcoming episode.
        Arguments:
            obs: Initial feedback for each game.
            infos: Additional information for each game.
        """
        if not self._initialized:
            self._init()

        self._epsiode_has_started = True

        # [You can insert code here.]

    def _end_episode(self, obs: List[str], scores: List[int], infos: Dict[str, List[Any]]) -> None:
        """
        Tell the agent the episode has terminated.
        Arguments:
            obs: Previous command's feedback for each game.
            score: The score obtained so far for each game.
            infos: Additional information for each game.
        """
        self._epsiode_has_started = False

        # [You can insert code here.]
    
    def update_qvalues(self, command : List[int],reward):
        for id in command:
            self.qvalues[id] += reward
    
    def calc_avg_qvalue(self, command) -> int:
        sum = 0
        for id in command:
            sum += self._get_qvalue(id)
        sum = sum/ len(command)
        return sum

    def choose_best_cmd(self,commands) -> int:
        avg_qvalues =  []
        for cmd in commands:
            avg_qvalues.append(self.calc_avg_qvalue(cmd))
        max_value = max(avg_qvalues)
        max_commands = []
        for i in range(len(commands)):
            if(max_value == avg_qvalues[i]):
                max_commands.append(i)
        return random.choice(max_commands)
        
                        
    def act(self, obs: str, score: int, done: bool, 
            infos: Dict[str, List[Any]]) -> Optional[List[str]]:
        """
        Acts upon the current list of observations.

        One text command must be returned for each observation.

        Arguments:
            obs: Previous command's feedback for each game.
            scores: The score obtained so far for each game.
            dones: Whether a game is finished.
            infos: Additional information for each game.

        Returns:
            Text commands to be performed (one per observation).
            If episode had ended (e.g. `all(dones)`), the returned
            value is ignored.

        Notes:
            Commands returned for games marked as `done` have no effect.
            The states for finished games are simply copy over until all
            games are done.
        """
        #if all(dones):
        #   self._end_episode(obs, scores, infos)
        #    return  # Nothing to return.

        if not self._epsiode_has_started:
            self._start_episode(obs, infos)
        
        action = random.choice(infos["admissible_commands"])
            
        if self.mode == "test":
            return action
        
        self.no_train_step += 1 
        reward = 0
        
        if self.transitions:
            reward = score - self.last_score
            self.last_score = score
            if infos["has_won"]:
                reward += 100
            if infos["has_lost"]:
                reward -= 100
            self.update_qvalues(self.transitions[-1],reward)

        cmds = []
        for i in range(len(infos["admissible_commands"])):
            cmds.append(self._tokenize(infos["admissible_commands"][i]))
        
        best_index = self.choose_best_cmd(cmds)
    
        action = infos["admissible_commands"][best_index]
        
        self.transitions.append(cmds[best_index])

        if(self.no_train_step % 10 == 0):
            print(self.no_train_step)
            print(reward)
            command = ""
            if self.transitions:
                for id in self.transitions[-1]:
                    command += " " + self._get_word_by_id(id)
                print(command)
        
        
        if done:
            self.last_score = 0
            self.transitions = []
            
        return action

In [72]:
agent = CustomAgent()
wordids = agent._tokenize("cook a meal")
print(wordids)

print(agent._get_word_id("cook"))
print(agent._get_word_by_id(4664))

[4664, 785, 11449]
4664
cook


In [73]:
agent = CustomAgent()
agent.train()
for i in range(5):
    play(agent, gamefiles[5])

tw-cooking-recipe1+take1-11Oeig8bSVdGSp78.ulx10
0
 examine red hot pepper
20
0
 drop red hot pepper
.30
0
 examine counter
40
0
 examine counter
50
0
 examine counter
60
0
 examine counter
70
0
 examine counter
.80
0
 examine counter
90
0
 examine counter
100
0
 examine counter
110
0
 examine counter
120
0
 examine counter
.130
0
 take knife from counter
140
0
 examine counter
150
0
 examine counter
160
0
 examine counter
170
0
 examine counter
180
0
 examine counter
.190
0
 examine counter
200
0
 examine counter
210
0
 examine counter
220
0
 examine counter
230
0
 examine counter
.240
0
 examine counter
250
0
 examine counter
260
0
 examine counter
270
0
 examine counter
280
0
 examine counter
.290
0
 examine counter
300
0
 examine counter
310
0
 examine counter
320
0
 examine counter
330
0
 examine counter
.340
0
 examine counter
350
0
 examine counter
360
0
 examine counter
370
0
 examine counter
380
0
 examine counter
.390
0
 examine counter
400
0
 examine counter
410
0
 examine co

KeyboardInterrupt: 

In [23]:
for i in range(len(agent.qvalues)):
    if(agent.qvalues[i] != 0 ):
        print(agent._get_word_by_id(i))

In [7]:
print("Training")
agent.train()  # Tell the agent it should update its parameters.
starttime = time()
play(agent, gamefiles[0], nb_episodes=500, verbose=False)  # Dense rewards game.
print("Trained in {:.2f} secs".format(time() - starttime))

Training


KeyboardInterrupt: 

In [8]:
def play_render(agent,gamefile):
    requested_infos = agent.select_additional_infos
    env_id = textworld.gym.register_games([gamefile], requested_infos)

    agent = CustomAgent()

    env = gym.make(env_id)
    obs, infos = env.reset()

    env.render()  # Print the initial observation.

    score = 0
    done = False
    while not done:
        command = agent.act(obs,score,done,infos)
        ob, score, done, infos = env.step(command)
        env.render()

In [10]:
play_render(CustomAgent(), gamefiles[5])




                    ________  ________  __    __  ________
                   |        \|        \|  \  |  \|        \
                    \$$$$$$$$| $$$$$$$$| $$  | $$ \$$$$$$$$
                      | $$   | $$__     \$$\/  $$   | $$
                      | $$   | $$  \     >$$  $$    | $$
                      | $$   | $$$$$    /  $$$$\    | $$
                      | $$   | $$_____ |  $$ \$$\   | $$
                      | $$   | $$     \| $$  | $$   | $$
                       \$$    \$$$$$$$$ \$$   \$$    \$$
              __       __   ______   _______   __        _______
             |  \  _  |  \ /      \ |       \ |  \      |       \
             | $$ / \ | $$|  $$$$$$\| $$$$$$$\| $$      | $$$$$$$\
             | $$/  $\| $$| $$  | $$| $$__| $$| $$      | $$  | $$
             | $$  $$$\ $$| $$  | $$| $$    $$| $$      | $$  | $$
             | $$ $$\$$\$$| $$  | $$| $$$$$$$\| $$      | $$  | $$
             | $$$$  \$$$$| $$__/ $$| $$  | $$| $$_____ | $$__/ $$
          