<a href="https://colab.research.google.com/github/jonathanjander/Best-README-Template/blob/master/textworld.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Textworld example

## notes
- [official pytorch example](https://colab.research.google.com/github/microsoft/TextWorld/blob/main/notebooks/Building%20a%20simple%20agent.ipynb#scrollTo=ChrM9GGGlrtf)

In [1]:
!pip install textworld
#!pip install gym
#!pip install gym==0.21

  Building wheel for jericho (setup.py) ... [?25l[?25hdone
  Created wheel for jericho: filename=jericho-3.1.2-py3-none-any.whl size=325097 sha256=44944d71ba796eebe4835670c53ed21560a982b5dac6e7886b5f1f1210a4c55b
  Stored in directory: /root/.cache/pip/wheels/6b/1d/a7/91e11767b583fe77fae27d292e724d0dc8cd4335dab886adfe
Successfully built textworld jericho
Installing collected packages: mementos, tatsu, hashids, jericho, textworld
Successfully installed hashids-1.3.1 jericho-3.1.2 mementos-1.3.1 tatsu-5.8.3 textworld-1.6.1


In [2]:
import numpy as np
from glob import glob
import os
import re
from typing import List, Mapping, Any, Optional
from collections import defaultdict, Counter
import numpy as np
import textworld
import textworld.gym
from textworld import EnvInfos
from time import time


import matplotlib.pyplot as plt
import random

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.text import Tokenizer

## init

In [3]:
#!tw-make custom --world-size 2 --quest-length 4 --nb-objects 10 --output tw_games/game.ulx -f -v --seed 123
!tw-make tw-simple --rewards dense  --goal detailed --seed 42 --output games/tw-rewardsDense_goalDetailed.z8 -v -f

Global seed: 42
Game generated: /content/games/tw-rewardsDense_goalDetailed.z8

Objective:
I hope you're ready to go into rooms and interact with objects, because you've just entered TextWorld! Here is how to play! First stop, open the chest drawer. After that, recover the old key from the chest drawer within the bedroom. Then, check that the wooden door is unlocked with the old key. And then, open the wooden door inside the bedroom. Then, go to the east. And then, look and see that the screen door in the kitchen is ajar. After that, make an effort to venture east. And then, attempt to move south. With that accomplished, lift the bell pepper from the floor of the garden. After that, move north. Okay, and then, attempt to take a trip west. Following that, rest the bell pepper on the stove. And once you've done that, you win!

Walkthrough:
open chest drawer > take old key from chest drawer > unlock wooden door with old key > open wooden door > go east > open screen door > go east > go so

In [None]:
request_infos = textworld.EnvInfos(
    admissible_commands=True,  # All commands relevant to the current state.
    entities=True,             # List of all interactable entities found in the game.
    facts=True,  # All the facts that are currently true about the world.
    intermediate_reward=True,
    max_score = True,
    inventory=True,
    description=True,
    command_templates = True
)

In [None]:
# Requesting additional information should be done when registering the game.
#env_id = textworld.gym.register_game('tw_games/game.ulx', request_infos)
env_id = textworld.gym.register_game("./games/tw-rewardsDense_goalDetailed.z8", request_infos)
env = textworld.gym.make(env_id)

obs, infos = env.reset()
print("Entities: {}\n".format(infos["entities"]))
print("Admissible commands:\n  {}".format("\n  ".join(infos["admissible_commands"])))
#print("command_templates:\n  {}".format("\n  ".join(infos["command_templates"])))

Entities: ['wooden door', 'screen door', 'chest drawer', 'antique trunk', 'refrigerator', 'toilet', 'bath', 'lettuce', 'bell pepper', 'apple', 'shovel', 'king-size bed', 'counter', 'set of chairs', 'stove', 'kitchen island', 'sink', 'couch', 'low table', 'tv', 'bbq', 'patio table', 'tomato plant', 'half of a bag of chips', 'milk', 'old key', 'soap bar', 'toothbrush', 'remote', 'note', 'north', 'south', 'east', 'west']

Admissible commands:
  examine antique trunk
  examine chest drawer
  examine king-size bed
  examine wooden door
  inventory
  look
  open antique trunk
  open chest drawer


In [None]:
action_space = len(infos["admissible_commands"])
state_space = 20 # number of rooms times number of items 2*10 (dont think this is correct)

### playing the game

In [None]:
try:
    done = False
    obs, _ = env.reset()
    print(obs)

    print(infos["admissible_commands"])
    nb_moves = 0
    while not done:
        command = input("> ")
        obs, score, done, infos = env.step(command)
        print(obs)
        print('Score',score)
        print(infos["admissible_commands"])
        nb_moves += 1

except KeyboardInterrupt:
    pass  # Press the stop button in the toolbar to quit the game.

print("Played {} steps, scoring {} points.".format(nb_moves, score))

# trying to implement the example code to tensorflow (my implementation)
- using a GRU
- using embedding

### main code

In [None]:
# building a random baseline
class RandomAgent(textworld.gym.Agent):
    """ Agent that randomly selects a command from the admissible ones. """
    def __init__(self, seed=1234):
        self.seed = seed
        self.rng = np.random.RandomState(self.seed)

    @property
    def infos_to_request(self) -> textworld.EnvInfos:
        return textworld.EnvInfos(admissible_commands=True)

    def act(self, obs: str, score: int, done: bool, infos: Mapping[str, Any]) -> str:
        return self.rng.choice(infos["admissible_commands"])

In [4]:
# play function
def play(agent, path, max_step=100, nb_episodes=10, verbose=True):
    # torch.manual_seed(20211021)  # For reproducibility when using action sampling.

    infos_to_request = agent.infos_to_request
    infos_to_request.max_score = True  # Needed to normalize the scores.

    gamefiles = [path]
    if os.path.isdir(path):
        gamefiles = glob(os.path.join(path, "*.z8"))

    env_id = textworld.gym.register_games(gamefiles,
                                          request_infos=infos_to_request,
                                          max_episode_steps=max_step)
    env = textworld.gym.make(env_id)  # Create a Gym environment to play the text game.
    if verbose:
        if os.path.isdir(path):
            print(os.path.dirname(path), end="")
        else:
            print(os.path.basename(path), end="")

    # Collect some statistics: nb_steps, final reward.
    avg_moves, avg_scores, avg_norm_scores = [], [], []
    for no_episode in range(nb_episodes):
        obs, infos = env.reset()  # Start new episode.

        score = 0
        done = False
        nb_moves = 0
        while not done:
            command = agent.act(obs, score, done, infos, no_episode) # CHANGE FOR TF
            #command = agent.act(obs, done, infos)
            obs, score, done, infos = env.step(command)
            nb_moves += 1

        agent.train_model()
        agent.act(obs, score, done, infos, no_episode)  # Let the agent know the game is done. CHANGE FOR TF
        # agent.act(obs, done, infos)  # Let the agent know the game is done. CHANGE FOR TF


        if verbose:
            print(".", end="")
        avg_moves.append(nb_moves)
        avg_scores.append(score)
        avg_norm_scores.append(score / infos["max_score"])

    env.close()
    if verbose:
        if os.path.isdir(path):
            msg = "  \tavg. steps: {:5.1f}; avg. normalized score: {:4.1f} / {}."
            print(msg.format(np.mean(avg_moves), np.mean(avg_norm_scores), 1))
        else:
            msg = "  \tavg. steps: {:5.1f}; avg. score: {:4.1f} / {}."
            print(msg.format(np.mean(avg_moves), np.mean(avg_scores), infos["max_score"]))

In [None]:
#agent = RandomAgent()
#play(agent, 'tw_games/game.ulx')

In [5]:
# TENSORFLOW
# TODO INCREASE PERFORMANCE
class CommandScorer(models.Model):
    def __init__(self, input_size, hidden_size):
        super(CommandScorer, self).__init__()
        self.embedding = layers.Embedding(input_size, hidden_size)
        self.encoder_gru = layers.GRU(hidden_size, return_sequences=True, return_state=True)
        self.cmd_encoder_gru = layers.GRU(hidden_size, return_state=True)
        self.state_gru = layers.GRU(hidden_size, return_state=True)
        self.critic = layers.Dense(1) # Critic for state value estimation
        self.att_cmd = layers.Dense(1) # Attention mechanism for commands

        self.hidden_size  = hidden_size

        # defining states
        # Initialize hidden states for GRU layers
        self.encoder_state = tf.Variable(tf.zeros([1, hidden_size]), trainable=False)
        self.cmd_encoder_state = tf.Variable(tf.zeros([1, hidden_size]), trainable=False)
        self.state_gru_state = tf.Variable(tf.zeros([1, hidden_size]), trainable=False)

    def call(self, obs, commands):
        # Process observation
        embedded_obs = self.embedding(obs)

        #_, encoder_hidden = self.encoder_gru(embedded_obs) # stateless
        encoder_output, encoder_hidden = self.encoder_gru(embedded_obs, initial_state=self.encoder_state) # stateful
        self.encoder_state.assign(encoder_hidden) # stateful

        # Expand dimensions of encoder_hidden to fit GRU input requirements
        # for critic state value prediction
        encoder_hidden_expanded = tf.expand_dims(encoder_hidden, axis=1)

        #state_output, _ = self.state_gru(encoder_hidden_expanded) # stateless
        # Process state GRU with state handling
        state_output, new_state_gru_state = self.state_gru(encoder_hidden_expanded, initial_state=self.state_gru_state) # stateful
        self.state_gru_state.assign(new_state_gru_state) # stateful

        #critic prediction
        value = self.critic(state_output)

        # Process commands
        cmds_embedding = self.embedding(commands)  # Shape: (num_commands, cmd_length, hidden_size)
        cmd_length = cmds_embedding.shape[1]

        # Reshape for batch processing
        cmds_embedding_reshaped = tf.reshape(cmds_embedding, (-1, cmd_length, self.embedding.output_dim))
        batch_size = tf.shape(cmds_embedding_reshaped)[0]

        # Process each command as a separate sequence (batch)
        _, cmds_encoding_last_states = self.cmd_encoder_gru(cmds_embedding_reshaped, initial_state=tf.zeros((batch_size, self.encoder_gru.units))) # stateless


        # _, cmds_encoding_last_states = self.cmd_encoder_gru(cmds_embedding_reshaped, initial_state=self.cmd_encoder_state) # stateful
        #self.state_gru_state.assign(cmds_encoding_last_states) # stateful

        # Reshape to get separate encodings for each command
        cmds_encoding_last_states = tf.reshape(cmds_encoding_last_states, (1, -1, self.encoder_gru.units))

        # Prepare state representation
        state_hidden_repeated = tf.repeat(encoder_hidden_expanded, repeats=tf.shape(commands)[0], axis=1)

        # Concatenate state and command encodings
        cmd_selector_input = tf.concat([state_hidden_repeated, cmds_encoding_last_states], axis=-1)

        #print('state_hidden_repeated', state_hidden_repeated)
        # Compute scores and select action
        scores = self.att_cmd(cmd_selector_input)
        scores = tf.squeeze(scores, axis=-1)

        #print('scores', scores)

        # Calculate probabilities and sample an action
        probs = tf.nn.softmax(scores, axis=1)
        index = tf.random.categorical(tf.math.log(probs), num_samples=1)

        #print(scores)

        return scores, index, value

    def reset_hidden(self, batch_size):
        # Reset hidden states
        self.encoder_state.assign(tf.zeros([batch_size, self.hidden_size]))
        self.cmd_encoder_state.assign(tf.zeros([batch_size, self.hidden_size]))
        self.state_gru_state.assign(tf.zeros([batch_size, self.hidden_size]))


In [6]:
# TENSORFLOW

class NeuralAgent:
    # ... (Initialization and utility functions remain largely the same)
    """ Simple Neural Agent for playing TextWorld games. """
    MAX_VOCAB_SIZE = 1000
    UPDATE_FREQUENCY = 10
    LOG_FREQUENCY = 1000
    GAMMA = 0.9
    LR = 0.00003

    def __init__(self) -> None:
        self.tokenizer = Tokenizer(num_words=self.MAX_VOCAB_SIZE, oov_token="<UNK>")
        self._initialized = False
        self._epsiode_has_started = False
        self.id2word = ["<PAD>", "<UNK>"]
        self.word2id = {w: i for i, w in enumerate(self.id2word)}
        self.model = CommandScorer(input_size=self.MAX_VOCAB_SIZE, hidden_size=128)
        #self.optimizer = optim.Adam(self.model.parameters(), 0.00003) # CHANGE FOR TF
        self.optimizer = tf.optimizers.Adam(learning_rate=self.LR)
        self.mode = "test"


    def train(self): # CHANGE FOR TF
        self.mode = "train"
        self.stats = {"max": defaultdict(list), "mean": defaultdict(list)}
        self.transitions = []
        self.model.reset_hidden(1)
        self.last_score = 0
        self.no_train_step = 0

    def test(self): # CHANGE FOR TF
        self.mode = "test"
        #self.model.reset_hidden(1)

    def train_model(self):
      return

    @property
    def infos_to_request(self) -> EnvInfos: # WORKING
        return EnvInfos(description=True, inventory=True, admissible_commands=True,
                        won=True, lost=True)

    def _get_word_id(self, word): # WORKING

        #print('GET WORD ID METHOD')
        if word not in self.word2id:
            if len(self.word2id) >= self.MAX_VOCAB_SIZE:
                return self.word2id["<UNK>"]

            self.id2word.append(word)
            self.word2id[word] = len(self.word2id)

        return self.word2id[word]

    def _tokenize(self, text): # WORKING

        #print('TOKENIZE METHOD')
        # Simple tokenizer: strip out all non-alphabetic characters.
        text = re.sub("[^a-zA-Z0-9\- ]", " ", text)
        word_ids = list(map(self._get_word_id, text.split()))
        return word_ids

    def _process(self, texts, tokenizer): # WORKING

        #print('PROCESS METHOD')
        #tokenized_texts = tokenizer.texts_to_sequences(texts)
        texts = list(map(self._tokenize, texts))
        max_len = max(len(l) for l in texts)
        padded = np.ones((len(texts), max_len)) * self.word2id["<PAD>"]

        for i, text in enumerate(texts):
            padded[i, :len(text)] = text
        #padded_texts = tf.keras.preprocessing.sequence.pad_sequences(tokenized_texts, maxlen=max_len)

        # Convert the NumPy array to a TensorFlow tensor
        padded_tensor = tf.convert_to_tensor(padded, dtype=tf.int32)

        # Transpose the tensor to switch from Batch x Sequence to Sequence x Batch
        #padded_tensor = tf.transpose(padded_tensor, perm=[1, 0])

        return padded

    def _compute_advantage(self, last_values): # TF
      returns, advantages = [], []
      R = last_values
      for t in reversed(range(len(self.transitions))):
          rewards, _, _, values = self.transitions[t]
          R = rewards + self.GAMMA * R
          adv = R - values
          returns.append(R)
          advantages.append(adv)

      return returns[::-1], advantages[::-1]


    def _debug_train(self, input_tens, command_tens, infos, tape):
      debug_target = tf.random.normal(shape=[1, len(infos["admissible_commands"])])
      #with tf.GradientTape() as tape:

      # Forward pass: Get model's output for the dummy input
      debug_output, _, _ = self.model(input_tens, command_tens)
      #print('output',debug_output)

      # Simplified loss: MSE between model's output and the arbitrary target
      debug_loss = tf.reduce_mean(tf.square(debug_output - debug_target))


      # Compute gradients
      debug_gradients = tape.gradient(debug_loss, self.model.trainable_variables)

      # Check gradients
      print("Debug Loss:", debug_loss.numpy())
      #print("Debug Gradients:", len(debug_gradients))

      # Apply gradients if they are valid (not None)

      # Print debugging information
      #print("Debug Loss:", debug_loss.numpy())


      # Apply only non-None gradients
      gradients_to_apply = [(grad, var) for grad, var in zip(debug_gradients, self.model.trainable_variables) if grad is not None]

      if gradients_to_apply:
        self.optimizer.apply_gradients(gradients_to_apply)
        #print("Some valid gradients applied.")
      else:
        print("No valid gradients. Check model computations.")


    def _train_loop(self, values, tape):

      returns, advantages = self._compute_advantage(values)

      #with tf.GradientTape() as tape:
      loss = 0
      # actor critic policy gradient using advantage
      # using advantage rather than the raw reward reduces variance
      for transition, ret, adv in zip(self.transitions, returns, advantages):
        #print('trans: ',self.transitions)
        reward, indexes_, outputs_, values_ = transition
        # indexes_ is the index of the action
        # output_ is the logits
        # values_ is the predicted value

        #print('reward', type(reward))
        #print('indexes_', type(indexes_))
        #print('outputs_', type(outputs_))
        #print('values_', type(values_))

        # Calculate policy loss
        probs = tf.nn.softmax(outputs_)
        log_probs = tf.math.log(probs)
        log_action_probs = tf.reduce_sum(tf.one_hot(indexes_, outputs_.shape[-1]) * log_probs)

        # Calculate policy loss
        # this is the ACTOR part
        policy_loss = -log_action_probs * adv

        # Calculate value loss
        # value loss is the CRITIC part of my code
        value_loss = 0.5 * tf.square(ret - values_)

        # Calculate entropy (for exploration)
        # entropy encourages exploration by discouraging the policy to become too deterministic
        entropy = -tf.reduce_sum(probs * log_probs)

        # Accumulate losses
        loss += policy_loss + value_loss - 0.1 * entropy

        # Append metrics to stats
        self.stats["mean"]["reward"].append(reward)
        self.stats["mean"]["policy"].append(policy_loss.numpy())
        self.stats["mean"]["value"].append(value_loss.numpy())
        self.stats["mean"]["entropy"].append(entropy.numpy())

        # For confidence, you need to calculate the exponent of the negative log probability
        # of the selected action. This represents the probability of the selected action.
        #action_probability = tf.exp(-log_action_probs)
        action_probability = tf.exp(log_action_probs)
        self.stats["mean"]["confidence"].append(action_probability.numpy())


      gradients = tape.gradient(loss, self.model.trainable_variables)

      #for var, grad in zip(self.model.trainable_variables, gradients):
      #  if grad is None:
      #    print(f"Variable with None gradient: {var.name}")
      #  if grad is not None:
      #    print(f"Variable with a gradient: {var.name}")
      #print()

      # Apply only non-None gradients
      gradients_to_apply = [(grad, var) for grad, var in zip(gradients, self.model.trainable_variables) if grad is not None]
      self.optimizer.apply_gradients(gradients_to_apply)
      #print('here6')
      # Clear transitions and reset hidden state after each episode
      self.transitions.clear()

      if self.no_train_step % self.LOG_FREQUENCY == 0:
        # Log training information
        msg = "{:6d}. ".format(self.no_train_step)
        msg += "  ".join("{}: {: 3.3f}".format(k, np.mean(v)) for k, v in self.stats["mean"].items())
        msg += "  " + "  ".join("{}: {:2d}".format(k, np.max(v)) for k, v in self.stats["max"].items())
        msg += "  vocab: {:3d}".format(len(self.id2word))
        print(msg)
        self.stats = {"max": defaultdict(list), "mean": defaultdict(list)}





    def get_gradient(self, obs, score, done, infos):
      # Get model's output - scores for each command and value estimation
      # Build agent's observation: feedback + look + inventory.
      input_ = "{}\n{}\n{}".format(obs, infos["description"], infos["inventory"])

      input_tensor = self._process([input_], self.tokenizer)

      commands_tensor = self._process(infos["admissible_commands"], self.tokenizer)

      with tf.GradientTape() as tape:
        outputs, indexes, value = self.model(input_tensor, commands_tensor)

        returns, advantages = self._compute_advantage(value)
        # Training logic
        self.no_train_step += 1

        if self.transitions:
            # Calculate reward
            reward = score - self.last_score
            self.last_score = score
            if infos["won"]:
                reward += 100
            if infos["lost"]:
                reward -= 100
            # Update the last transition with the calculated reward
            self.transitions[-1][0] = reward

        if self.no_train_step % self.UPDATE_FREQUENCY == 0:
          loss = 0
          # actor critic policy gradient using advantage
          # using advantage rather than the raw reward reduces variance
          for transition, ret, adv in zip(self.transitions, returns, advantages):
            #print('trans: ',self.transitions)
            reward, indexes_, outputs_, values_ = transition
            # Calculate policy loss
            probs = tf.nn.softmax(outputs_)
            log_probs = tf.math.log(probs)
            log_action_probs = tf.reduce_sum(tf.one_hot(indexes_, outputs_.shape[-1]) * log_probs)

            # Calculate policy loss
            # this is the ACTOR part
            policy_loss = -log_action_probs * adv

            # Calculate value loss
            # value loss is the CRITIC part of my code
            value_loss = 0.5 * tf.square(ret - values_)

            # Calculate entropy (for exploration)
            # entropy encourages exploration by discouraging the policy to become too deterministic
            entropy = -tf.reduce_sum(probs * log_probs)

            # Accumulate losses
            loss += policy_loss + value_loss - 0.1 * entropy

            # Append metrics to stats
            self.stats["mean"]["reward"].append(reward)
            self.stats["mean"]["policy"].append(policy_loss.numpy())
            self.stats["mean"]["value"].append(value_loss.numpy())
            self.stats["mean"]["entropy"].append(entropy.numpy())

            # For confidence, you need to calculate the exponent of the negative log probability
            # of the selected action. This represents the probability of the selected action.
            action_probability = tf.exp(-log_action_probs)
            self.stats["mean"]["confidence"].append(action_probability.numpy())

            # Clear transitions and reset hidden state after each episode
            self.transitions.clear()
          else:
            # Keep information about transitions for Truncated Backpropagation Through Time.
            # Store transition for training
            self.transitions.append([None, indexes, outputs, value]) # why?
            #print('trans: ',self.transitions)

      gradients = tape.gradient(loss, self.model.trainable_variables)

      #for var, grad in zip(self.model.trainable_variables, gradients):
      #  if grad is None:
      #    print(f"Variable with None gradient: {var.name}")
      #  if grad is not None:
      #    print(f"Variable with a gradient: {var.name}")
      #print()

    def act(self, obs, score, done, infos, no_episode):
        #print('ACT METHOD')
        # Convert observation and commands to model input format

        # Build agent's observation: feedback + look + inventory.
        input_ = "{}\n{}\n{}".format(obs, infos["description"], infos["inventory"])

        input_tensor = self._process([input_], self.tokenizer)

        commands_tensor = self._process(infos["admissible_commands"], self.tokenizer)



        # Get model's output - scores for each command and value estimation
        with tf.GradientTape() as tape:
          outputs, indexes, value = self.model(input_tensor, commands_tensor)

          #tape.watch(outputs)
          #tape.watch(indexes)
          #tape.watch(value)

          chosen_action_index = tf.squeeze(indexes).numpy()

          action = infos["admissible_commands"][chosen_action_index]

          #print(action)
          # test
          if self.mode == "test":
              if done:
                  #print('DONE')
                  self.model.reset_hidden(1)
              return action

          # Training logic
          self.no_train_step += 1

          if self.transitions:
              # Calculate reward
              reward = score - self.last_score
              self.last_score = score
              if infos["won"]:
                  reward += 100
              if infos["lost"]:
                  reward -= 100
              # Update the last transition with the calculated reward
              self.transitions[-1][0] = reward

          #self.transitions.append([indexes, outputs, value]) # why?

          self.stats["max"]["score"].append(score)

          # Perform training at specified frequency

          if self.no_train_step % self.UPDATE_FREQUENCY == 0:

              #print("IF",self.no_train_step)
              self._train_loop(value, tape)
              #self._debug_train(input_tensor, commands_tensor, infos)

          else:
            #print(self.no_train_step)
            # Keep information about transitions for Truncated Backpropagation Through Time.
            # Store transition for training
            self.transitions.append([None, indexes, outputs, value]) # why?
            #print('trans: ',self.transitions)

          if done:
              self.last_score = 0
              #self.model.reset_hidden(1)

          return action


In [7]:
nagent = NeuralAgent()

In [None]:
nagent.train()
play(nagent, 'games/tw-rewardsDense_goalDetailed.z8', max_step=100, nb_episodes=250)

tw-rewardsDense_goalDetailed.z8.........  1000. reward:  0.039  policy:  0.475  value:  0.089  entropy:  2.330  confidence:  0.100  score:  7  vocab: 320
..........  2000. reward:  0.036  policy:  1.276  value:  0.275  entropy:  2.326  confidence:  0.099  score:  6  vocab: 321
..........  3000. reward:  0.041  policy:  11.334  value:  15.313  entropy:  2.325  confidence:  0.100  score:  6  vocab: 326
..........  4000. reward:  0.047  policy:  14.794  value:  22.787  entropy:  2.391  confidence:  0.093  score:  5  vocab: 331
..........  5000. reward:  0.056  policy:  15.551  value:  24.978  entropy:  2.394  confidence:  0.093  score:  9  vocab: 341
..........  6000. reward:  0.048  policy:  15.793  value:  26.341  entropy:  2.371  confidence:  0.096  score:  5  vocab: 341
..........  7000. reward:  0.044  policy:  16.337  value:  27.898  entropy:  2.384  confidence:  0.095  score:  6  vocab: 341
..........  8000. reward:  0.031  policy:  16.054  value:  28.937  entropy:  2.295  confiden

In [None]:
nagent.test()
play(nagent, 'games/tw-rewardsDense_goalDetailed.z8', max_step=100)

In [8]:
# You can skip this if you already downloaded the data in the prequisite section.

from time import time
agent = NeuralAgent()

print("Training")
agent.train()  # Tell the agent it should update its parameters.
starttime = time()
play(agent, "./games/tw-rewardsDense_goalDetailed.z8", nb_episodes=250, verbose=True)  # Dense rewards game.

print("Trained in {:.2f} secs".format(time() - starttime))

# Save the trained agent.
import os
os.makedirs('checkpoints', exist_ok=True)
agent.save(agent, 'checkpoints/agent_trained_on_single_game.pt')

Training
tw-rewardsDense_goalDetailed.z8.....

KeyboardInterrupt: ignored

In [None]:
# We report the score and steps averaged over 10 playthroughs.
agent = tf.keras.models.load_model('checkpoints/agent_trained_on_single_game.pt')

agent.test()
play(agent, "./games/tw-rewardsDense_goalDetailed.z8")  # Dense rewards game.

In [None]:
rangent = RandomAgent()
play(rangent, 'games/tw-rewardsDense_goalDetailed.z8', max_step=50, nb_episodes=50)

### neuralagent2 (not runnable)

In [None]:
# TENSORFLOW

class NeuralAgent2:
    # ... (Initialization and utility functions remain largely the same)
    """ Simple Neural Agent for playing TextWorld games. """
    MAX_VOCAB_SIZE = 1000
    UPDATE_FREQUENCY = 10
    LOG_FREQUENCY = 1000
    GAMMA = 0.9
    LR = 0.00003
    BATCH_SIZE=64

    def __init__(self) -> None:
        self.tokenizer = Tokenizer(num_words=self.MAX_VOCAB_SIZE, oov_token="<UNK>")
        self._initialized = False
        self._epsiode_has_started = False
        self.id2word = ["<PAD>", "<UNK>"]
        self.word2id = {w: i for i, w in enumerate(self.id2word)}

        self.model = CommandScorer(input_size=self.MAX_VOCAB_SIZE, hidden_size=128)
        #self.optimizer = optim.Adam(self.model.parameters(), 0.00003) # CHANGE FOR TF
        self.optimizer = tf.optimizers.Adam(learning_rate=self.LR)

        self.last_score = 0 # TODO REMOVE LATER
        self.transitions = []
        self.mode = "test"



    def train(self): # CHANGE FOR TF
        self.mode = "train"
        self.stats = {"max": defaultdict(list), "mean": defaultdict(list)}
        self.transitions = []
        #self.model.reset_hidden(1)
        self.last_score = 0
        self.no_train_step = 0

    def test(self): # CHANGE FOR TF
        self.mode = "test"
        #self.model.reset_hidden(1)


    @property
    def infos_to_request(self) -> EnvInfos: # WORKING
        return EnvInfos(description=True, inventory=True, admissible_commands=True,
                        won=True, lost=True)

    def _get_word_id(self, word): # WORKING

        #print('GET WORD ID METHOD')
        if word not in self.word2id:
            if len(self.word2id) >= self.MAX_VOCAB_SIZE:
                return self.word2id["<UNK>"]

            self.id2word.append(word)
            self.word2id[word] = len(self.word2id)

        return self.word2id[word]

    def _tokenize(self, text): # WORKING

        #print('TOKENIZE METHOD')
        # Simple tokenizer: strip out all non-alphabetic characters.
        text = re.sub("[^a-zA-Z0-9\- ]", " ", text)
        word_ids = list(map(self._get_word_id, text.split()))
        return word_ids

    def _process(self, texts, tokenizer): # WORKING

        #print('PROCESS METHOD')
        #tokenized_texts = tokenizer.texts_to_sequences(texts)
        texts = list(map(self._tokenize, texts))
        max_len = max(len(l) for l in texts)
        padded = np.ones((len(texts), max_len)) * self.word2id["<PAD>"]

        for i, text in enumerate(texts):
            padded[i, :len(text)] = text
        #padded_texts = tf.keras.preprocessing.sequence.pad_sequences(tokenized_texts, maxlen=max_len)

        # Convert the NumPy array to a TensorFlow tensor
        padded_tensor = tf.convert_to_tensor(padded, dtype=tf.int32)

        # Transpose the tensor to switch from Batch x Sequence to Sequence x Batch
        # padded_tensor = tf.transpose(padded_tensor, perm=[1, 0])

        return padded

    def _compute_advantage(self, last_values): # TF
      returns, advantages = [], []
      R = last_values
      for t in reversed(range(len(self.transitions))):
          rewards, _, _, values = self.transitions[t]
          R = rewards + self.GAMMA * R
          adv = R - values
          returns.append(R)
          advantages.append(adv)

      return returns[::-1], advantages[::-1]

    def train_model(self):

      # Check if there are enough transitions to form a batch
      if len(self.transitions) < self.BATCH_SIZE:
          return

      # Sample a batch of transitions
      batch = random.sample(self.transitions, self.BATCH_SIZE)

      # Separate the data into states, actions, rewards, etc.
      states, commands, rewards, actions, outputs, values = zip(*batch)

      #for state in states:
        #print(state.shape)

      # Convert lists to tensors
      #states = tf.convert_to_tensor(np.array(states), dtype=tf.float32)
      #commands = tf.convert_to_tensor(np.array(commands), dtype=tf.float32)
      rewards = tf.convert_to_tensor(rewards, dtype=tf.float32)
      actions = tf.convert_to_tensor(actions, dtype=tf.int32)
      values = tf.convert_to_tensor(values, dtype=tf.float32)


      with tf.GradientTape() as tape:
          predictions, _, predicted_values = self.model(states, commands)
          loss = self.compute_loss(predictions, rewards, actions, predicted_values)
          # Compute the loss
          # This step will depend on your specific RL algorithm.
          # For example, in a value-based method, you might compute the TD error,
          # while in a policy gradient method, you'd compute the policy gradient loss.
          # Here's a placeholder for loss computation:
          #loss = self.compute_loss(rewards, actions, outputs, values)


      # Compute gradients and apply them
      gradients = tape.gradient(loss, self.model.trainable_variables)
      self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))

      # Optionally, you can clear transitions after training
      self.transitions.clear()

    def compute_loss(self, predictions, rewards, actions, predicted_values):
        # Define how you compute your loss here
        # This will depend on your specific reinforcement learning algorithm
        # For example, it could involve calculating the TD error for Q-learning
        # or the policy gradient loss for policy-based methods
        return tf.reduce_mean(tf.square(predicted_values - rewards))  # Placeholder


    def act(self, obs, score, done, infos, no_episode):
        # Convert observation and commands to model input format
        input_ = "{}\n{}\n{}".format(obs, infos["description"], infos["inventory"])
        input_tensor = self._process([input_], self.tokenizer)
        commands_tensor = self._process(infos["admissible_commands"], self.tokenizer)

        # Forward pass through the model to get scores for each command
        outputs, indexes, value = self.model(input_tensor, commands_tensor)

        # Select action based on the model's output
        chosen_action_index = tf.squeeze(indexes).numpy()
        action = infos["admissible_commands"][chosen_action_index]

        # Calculate immediate reward and store transition
        reward = score - self.last_score
        self.last_score = score if not done else 0

        self.transitions.append((input_tensor, commands_tensor, reward, indexes, outputs, value))

        return action

    def act_old(self, obs, score, done, infos, no_episode):
        #print('ACT METHOD')
        # Convert observation and commands to model input format

        # Build agent's observation: feedback + look + inventory.
        input_ = "{}\n{}\n{}".format(obs, infos["description"], infos["inventory"])

        input_tensor = self._process([input_], self.tokenizer)
        commands_tensor = self._process(infos["admissible_commands"], self.tokenizer)
        # Get model's output - scores for each command and value estimation
        outputs, indexes, value = self.model(input_tensor, commands_tensor)
        chosen_action_index = tf.squeeze(indexes).numpy()


        action = infos["admissible_commands"][chosen_action_index]

        #print(action)
        # test
        if self.mode == "test":
            if done:
                print('DONE')
                #self.model.reset_hidden(1)
            return action

        # Training logic
        self.no_train_step += 1

        if self.transitions:
            # Calculate reward
            reward = score - self.last_score
            self.last_score = score
            if infos["won"]:
                reward += 100
            if infos["lost"]:
                reward -= 100
            # Update the last transition with the calculated reward
            self.transitions[-1][0] = reward



        #self.transitions.append([indexes, outputs, value]) # why?

        self.stats["max"]["score"].append(score)

        # Perform training at specified frequency

        if self.no_train_step % self.UPDATE_FREQUENCY == 0:

            #print("IF",self.no_train_step)
            self._train_loop(value)
            #self._debug_train(input_tensor, commands_tensor, infos)

        else:
          #print(self.no_train_step)
          # Keep information about transitions for Truncated Backpropagation Through Time.
          # Store transition for training
          self.transitions.append([None, indexes, outputs, value]) # why?
          #print('trans: ',self.transitions)

        if done:
            self.last_score = 0
            #self.model.reset_hidden(1)

        return action


In [None]:
nagent = NeuralAgent2()
#nagent.train_model()

In [None]:
#nagent = NeuralAgent2()
#nagent.train()
play(nagent, 'games/tw-rewardsDense_goalDetailed.z8', max_step=50, nb_episodes=2)

### gpt-4 attempt using function approximation

In [None]:
class EPSdecay:
    def __init__(self,min_epsilon=0.01,max_epsilon=1.0,decay_rate=0.01):
        self.min_epsilon = min_epsilon
        self.max_epsilon = max_epsilon
        self.decay_rate = decay_rate
    def __call__(self,episode)->float:
        return self.min_epsilon + (self.max_epsilon - self.min_epsilon)*np.exp(-self.decay_rate*episode)

In [None]:
def build_vocabulary(descriptions):
    word_counts = Counter(word for desc in descriptions for word in desc.split())
    vocabulary = {word: i for i, word in enumerate(word_counts.keys())}
    return vocabulary

# Example descriptions from your text-based game
descriptions = [
    "You see a key and a door",
    "You are in a dark room",
    # ... more descriptions from your game
]

vocabulary = build_vocabulary(descriptions)

def preprocess_state(description, vocabulary):
    state_vector = np.zeros(len(vocabulary))
    for word in description.split():
        if word in vocabulary:
            state_vector[vocabulary[word]] += 1
    return state_vector

# Example usage
state_description = "You are in a room with a key"
state_vector = preprocess_state(state_description, vocabulary)


In [None]:
def build_model(input_size, output_size):
    model = models.Sequential([
        layers.Dense(128, activation='relu', input_shape=(input_size,)),
        layers.Dense(64, activation='relu'),
        layers.Dense(output_size)
    ])
    return model


In [None]:
def train_step(model, optimizer, state, action, reward, next_state, done):
    # Predict Q-values for the current state
    with tf.GradientTape() as tape:
        q_values = model(state)
        q_action = tf.reduce_sum(tf.one_hot(action, action_space) * q_values, axis=1)

        # Predict the Q-values for next state
        q_values_next = model(next_state)
        q_next = tf.reduce_max(q_values_next, axis=1)
        #bellman equation
        q_target = reward + (1 - done) * discount_factor * q_next

        # Calculate loss
        loss = tf.reduce_mean(tf.square(q_target - q_action))

    # Backpropagate the error
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))


In [None]:
discount_factor = 0.95
num_episodes = 20
learning_rate = 0.001
#epsilon = 0.5 # use fixed epsilon
epsilon = EPSdecay()

In [None]:
optimizer = tf.optimizers.Adam(learning_rate=learning_rate)
model = build_model(len(state_vector), action_space)

In [None]:
# Collect some statistics: nb_steps, final reward.
avg_moves, avg_scores, avg_norm_scores = [], [], []
for episode in range(num_episodes):
    state, infos = env.reset()
    done = False
    total_rewards = 0
    total_steps = 0
    while not done:
      #print('episode: ', episode)

      state_vector = preprocess_state(state, vocabulary)
      state_vector = np.expand_dims(state_vector, axis=0) # Convert from shape (features,) to (1, features)

      # Select action using epsilon-greedy policy
      if random.uniform(0,1) > epsilon(episode):
        q_values = model.predict(state_vector, verbose=0)[0]
        action_ind = np.argmax(q_values)
        print(q_values)
      else:
        action_ind = random.randrange(len(infos["admissible_commands"]))
        #action = infos["admissible_commands"][action_ind] # random action for now

      action = infos["admissible_commands"][action_ind]
      #print(action)
      next_state, reward, done, infos = env.step(action)
      total_steps += 1
      total_rewards += reward

      # Preprocess states if required
      next_state_vector = preprocess_state(next_state, vocabulary)
      next_state_vector = np.expand_dims(next_state_vector, axis=0) # Convert from shape (features,) to (1, features)

      #print(next_state_vector)
      # Perform training step
      train_step(model, optimizer, state_vector, action_ind, reward, next_state_vector, done)

      state = next_state

      avg_moves.append(total_steps)
      avg_scores.append(reward)
      avg_norm_scores.append(reward / infos["max_score"])
    #print('total steps', total_steps)
    #print('total rewards', total_rewards)
    print('avg moves', np.mean(avg_moves))
    print('avg score', "{:.2f}".format(np.mean(avg_scores)))
    print('avg normalized score', "{:.2f}".format(np.mean(avg_norm_scores)))


avg moves 20.5
avg score 0.03
avg normalized score 0.03
avg moves 23.27777777777778
avg score 0.01
avg normalized score 0.01


KeyboardInterrupt: ignored