In [45]:
import numpy as np
import json
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm
import random
import importlib
import game
importlib.reload(game)

<module 'game' from '/home/paul/Documents/ETH/RLforGames/WearhouseKeeper/SokobanSolver/game.py'>

In [46]:
class ValueNetwork(nn.Module):
    def __init__(self, input_size=5, output_size=1):
        super(ValueNetwork, self).__init__()
        self.linear1 = nn.Linear(input_size, 512)
        self.linear2 = nn.Linear(512, 512)
        self.linear3 = nn.Linear(512, output_size)

    def forward(self, state):
        state = self.linear1(state)
        state = self.linear2(state)
        state = self.linear3(state)
        return state


In [55]:
class BackwardAgent():
    def __init__(self):
        self.value_network = ValueNetwork()
        self.optimizer = optim.SGD(self.value_network.parameters(), lr=0.01)
        self.loss = nn.MSELoss()
        self.gamma = 0.9
        self.alpha = 0.1
        self.discount = 0.98
        self.eps = 0.1
    
    def policy(self, env):
        legal_moves = env.legal_moves()
        if len(legal_moves) == 0: # Agent is stuck and can't move
            return [None, None, None, None, None, None]
        
        value_moves = []
        for char in legal_moves:
            state, reward, done = env.step(char, gamma=self.gamma)
            state_tensor = torch.tensor(state)
            value_moves.append([char, state, state_tensor, self.value_network(state_tensor), reward, done])
        return max(value_moves, key=lambda x:x[3])
        
    def train(self, number_of_episodes, start, end):
        wins=0
        moves = []
        for episode in tqdm(range(number_of_episodes)):
            # create new game instance
            ind = random.randint(start, end)
            env = game.ReverseGame(game.Game(level_id=ind), disable_prints=True)
            if episode % 100 == 0:
                moves.append(env.player_position)
            state, reward, done = env.state(gamma=self.gamma)
            # print(f"state: {state}, reward: {reward}, done: {done}")
            
            # do training on game instance
            while not done:
                # compute value of current state
                state_tensor = torch.tensor(state)
                value = self.value_network(state_tensor)
                
                # calculate next action according to policy
                [action, next_state, next_state_tensor, next_value, next_reward, next_done] = self.policy(env)
                if episode % 100 == 0:
                    moves.append(action)
                # print(f"reward from step: {next_reward}")
                
                if action is None: # agent is stuck, we are in the same state as before and use these to make the TD(0) updates
                    assert(next_value is None and next_done is None and next_reward is None and next_state_tensor is None and next_state is None)
                    next_value = value
                    next_state_tensor = state
                    next_done = 1
                    next_reward = reward
                    
                                    
                # TD(0) update: V(s)←V(s)+α(R+γV(s')−V(s))
                target = value + self.alpha*(next_reward + self.gamma*next_value.detach()*(1-int(next_done)) - value) # using detach here to exclude prediction from computation graph as is standard in TD(0)
                
                # calculate loss
                loss = self.loss(value, target.detach())
                
                # optimize
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # make random move in environment
                if np.random.rand() < self.eps:
                    legal_moves = env.legal_moves()
                    if len(legal_moves) == 0: 
                        break
                    else:
                        action = random.choice(legal_moves)
                        if episode % 100 == 0:
                            moves[-1] = action
                        state, reward, done = env.play(action, gamma=self.gamma)
                else:
                    state = next_state
                    done = next_done
                    reward = next_reward
                    if(action != None): # make the game step if there is a possible move, if there is no possible move, game will end
                        s, r, d = env.play(action, gamma=self.gamma)
                        if not (state == s and done == d and reward == r):
                            env.disable_prints=False
                            env.print_board()
                            print(env.turn)
                            print(f"print form top: {state}, {reward}, {done}")
                            print(f"print form bot: {s}, {r}, {d}")
                        assert(state == s and done == d and reward == r)
                wins+=reward
            
            # update learning rate
            self.alpha = self.alpha * self.discount
        print(f"solved {wins}/{number_of_episodes}")
        file_path = "run.json"
        json_data = json.dumps(moves)
        with open(file_path, "w") as file:
            file.write(json_data)

    

In [57]:
# level 0 works now
torch.manual_seed(40)
random.seed(40)
np.random.seed(40)
backwardagent = BackwardAgent()
backwardagent.train(number_of_episodes=300, start=3, end=3)

100%|██████████| 300/300 [00:19<00:00, 15.06it/s]

solved 0/300



