In [1]:
def read_file(path: str):
    def parse_data(line: str) -> list[(int, int)]:
        data = list(map(int, line.split()))
        coordinates = []
        for i in range(data[0]):
            coordinates.append((data[i*2 + 1]-1, data[(i+1)*2]-1))
        return coordinates
    
    with open(path, 'r') as f:
        board_size = tuple(map(int, f.readline().split()))
        walls = parse_data(f.readline())
        boxes = parse_data(f.readline())
        storages = parse_data(f.readline())
        player = tuple(map(lambda x: int(x)-1, f.readline().split()))
        
    return board_size, walls, boxes, storages, player

def visualize(data):
    board_size, walls, boxes, storages, player = data
    board = [[' '] * board_size[1] for _ in range(board_size[0])]
    for r, c in walls:
        board[r][c] = '#'
    for r, c in boxes:
        board[r][c] = '$'
    for r, c in storages:
        board[r][c] = '.'
    board[player[0]][player[1]] = '@'
    
    for r in board:
        print(''.join(r))
    
data = read_file('sokoban01.txt')
visualize(data)

########
#. #   #
#  $   #
#   # ##
## # $.#
#   $  #
#  .# @#
########


In [2]:
import collections
import random
import numpy as np
from copy import deepcopy

class Reward:
    STEP = -0.1
    BOX_ON_STORAGE = 1.0
    BOX_OFF_STORAGE = -1.0
    ALL_ON_STORAGE = 10.0
    DEADLOCK = -10.0
    MOVE_TO_WALL = -1.0

class Obj:
    EMPTY = 0
    STORAGE = 1
    PLAYER = 2
    PLAYER_ON_STORAGE = 3
    WALL = 4
    BOX = 5
    BOX_ON_STORAGE = 6

class QLearning:
    def __init__(self, board_size, walls, boxes, storages, player, max_step, eps=1.0):
        self.Q = collections.defaultdict(lambda:np.zeros(4))
        self.eps = eps
        self.num_step = 0
        self.max_step = max_step
        self.board_size = board_size
        self.walls = frozenset(walls)
        self.storages = frozenset(storages)
        self.state = (frozenset(boxes), player)
        self.init_state = (frozenset(boxes), player)
        self.board = [[0] * self.board_size[1] for _ in range(self.board_size[0])]
        for box in boxes:
            self.board[box[0]][box[1]] += Obj.BOX
        self.board[player[0]][player[1]] += Obj.PLAYER
        for wall in walls:
            self.board[wall[0]][wall[1]] += Obj.WALL
        for storage in storages:
            self.board[storage[0]][storage[1]] += Obj.STORAGE
        
        self.move = {
            0: (0, -1), # Left
            1: (-1, 0), # Up
            2: (0, 1),   # Right
            3: (1, 0),  # Down
        }
        
        self.direction = {
            0: "L",
            1: "U",
            2: "R",
            3: "D"
        }
        
    def isDeadlock(self):
        for box in self.state[0]:
            if (self.board[box[0]][box[1]] != Obj.BOX_ON_STORAGE and
                (self.board[box[0]+1][box[1]] >= Obj.WALL and 
                self.board[box[0]][box[1]+1] >= Obj.WALL or
                self.board[box[0]][box[1]+1] >= Obj.WALL and 
                self.board[box[0]-1][box[1]] >= Obj.WALL or
                self.board[box[0]-1][box[1]] >= Obj.WALL and 
                self.board[box[0]][box[1]-1] >= Obj.WALL or
                self.board[box[0]][box[1]-1] >= Obj.WALL and 
                self.board[box[0]+1][box[1]] >= Obj.WALL)):
                return True
        return False
    
    def getNextState(self, action):
        self.num_step += 1
        if self.isDeadlock() or self.isFinished() or self.num_step == self.max_step:
            self.num_step = 0
            self.updateState(self.init_state)
            
        reward = Reward.STEP
        ahead = np.sum([self.state[1], self.move[action]], axis=0)
        next_state = self.state
        
        if self.board[ahead[0]][ahead[1]] <= Obj.STORAGE:
            # Ahead is empty or storage. Move the player.
            next_state = (self.state[0], tuple(ahead))
        elif self.board[ahead[0]][ahead[1]] >= Obj.BOX:
            # Box or Box on storage. Check whether pushable.
            double_ahead = np.sum([ahead, self.move[action]], axis=0)
            
            if self.board[double_ahead[0]][double_ahead[1]] <= Obj.STORAGE:
                # Double ahead is empty or empty storage. Push the box.

                # Add reward for the box.
                if self.board[ahead[0]][ahead[1]] == Obj.BOX_ON_STORAGE:
                    reward += Reward.BOX_OFF_STORAGE
                if self.board[double_ahead[0]][double_ahead[1]] == Obj.STORAGE:
                    reward += Reward.BOX_ON_STORAGE
                    if len(self.state[0] - self.storages) == 1:
                        reward += Reward.ALL_ON_STORAGE
                        
                elif (self.board[double_ahead[0]+1][double_ahead[1]] == Obj.WALL and 
                    self.board[double_ahead[0]][double_ahead[1]+1] == Obj.WALL or
                    self.board[double_ahead[0]][double_ahead[1]+1] == Obj.WALL and 
                    self.board[double_ahead[0]-1][double_ahead[1]] == Obj.WALL or
                    self.board[double_ahead[0]-1][double_ahead[1]] == Obj.WALL and 
                    self.board[double_ahead[0]][double_ahead[1]-1] == Obj.WALL or
                    self.board[double_ahead[0]][double_ahead[1]-1] == Obj.WALL and 
                    self.board[double_ahead[0]+1][double_ahead[1]] == Obj.WALL):
                    reward += Reward.DEADLOCK
                
                # Move the player.
                next_player = tuple(ahead)
                # Update box list
                boxes = list(self.state[0])
                boxes.remove(tuple(ahead))
                boxes.append(tuple(double_ahead))
                next_boxes = frozenset(boxes)
                
                next_state = (next_boxes, next_player)
            else:
                # Can't move because of the wall
                reward += Reward.MOVE_TO_WALL
        else:
            # Can't move because of the wall
            reward += Reward.MOVE_TO_WALL
        
        return next_state, reward
    
    def updateState(self, next_state):
        boxes, player = self.state
        for box in boxes:
            self.board[box[0]][box[1]] -= Obj.BOX
        self.board[player[0]][player[1]] -= Obj.PLAYER
        
        self.state = next_state
        boxes, player = self.state
        for box in boxes:
            self.board[box[0]][box[1]] += Obj.BOX
        self.board[player[0]][player[1]] += Obj.PLAYER

    def getAction(self):
        if np.random.random_sample() < self.eps:
            # Random move
            action = np.random.randint(0, 4)
        else:
            # Pick the best action
            action = np.argmax(self.Q[self.state])
        return action
    
    def isFinished(self):
        if self.state[0] == self.storages:
            self.eps *= 0.9
            return True
        else:
            return False
    
    def visualize(self):
        symbols = {
            0: ' ',
            1: '.',
            2: '#',
            3: '@',
            4: '@',
            5: '$',
            6: '$'
        }
        for r in self.board:
            print(''.join(map(lambda x:symbols[x], r)))
        print()

In [3]:
def solve(input_file, visualize=True):
    alpha = 1.0
    gamma = 1.0
    max_iter = 20000
    eps = 0.5

    board_size, walls, boxes, storages, player = read_file(input_file)
    max_step = board_size[0]*board_size[1]
    solver = QLearning(board_size, walls, boxes, storages, player, max_step, eps)

    from tqdm import tqdm
    for i in tqdm(range(max_iter)):
        action = solver.getAction()
        next_state, reward = solver.getNextState(action)
        solver.Q[solver.state][action] = solver.Q[solver.state][action] + alpha*(reward + gamma*np.amax(solver.Q[next_state]) - solver.Q[solver.state][action])
        solver.updateState(next_state)

    print(f"Finished {max_iter} updates")
    print(f"Total explored # of states: {len(solver.Q)}")
    
    # Inference
    solver.updateState(solver.init_state)
    solver.eps = 0.0
    moves = []
    for i in range(solver.max_step):
        action = solver.getAction()
        moves.append(solver.direction[action])
        next_state, reward = solver.getNextState(action)
        if visualize:
            print(f"best action: {solver.direction[action]}")
            print(solver.Q[solver.state])
            solver.updateState(next_state)
            solver.visualize()
        else:
            solver.updateState(next_state)
        if solver.isFinished():
            break
    print(f"{len(moves)} {''.join(moves)}")

In [4]:
solve('sokoban00.txt')

100%|█████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 30485.18it/s]

Finished 20000 updates
Total explored # of states: 2
best action: D
[ 9.8  9.8  9.8 10.9]
@@@
@ @
@#@
@$@
@@@

1 D





In [5]:
solve('sokoban01.txt')

100%|█████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 30381.18it/s]

Finished 20000 updates
Total explored # of states: 839
best action: L
[ 1.08000000e+01  1.38777878e-16  9.70000000e+00 -1.10000000e+00]
@@@@@@@@
@. @   @
@  $   @
@   @ @@
@@ @ $.@
@   $  @
@  .@# @
@@@@@@@@

best action: U
[-1.  10.9 -0.1 -1. ]
@@@@@@@@
@. @   @
@  $   @
@   @ @@
@@ @ $.@
@   $# @
@  .@  @
@@@@@@@@

best action: L
[ 1.10000000e+01 -6.00000000e-01  1.38777878e-16  1.38777878e-16]
@@@@@@@@
@. @   @
@  $   @
@   @ @@
@@ @ $.@
@  $#  @
@  .@  @
@@@@@@@@

best action: L
[11.1  0.3  0.1 -0.8]
@@@@@@@@
@. @   @
@  $   @
@   @ @@
@@ @ $.@
@ $#   @
@  .@  @
@@@@@@@@

best action: R
[-10.1  -0.7  11.2   0.2]
@@@@@@@@
@. @   @
@  $   @
@   @ @@
@@ @ $.@
@ $ #  @
@  .@  @
@@@@@@@@

best action: U
[ 0.2 11.3  0.3 10.2]
@@@@@@@@
@. @   @
@  $   @
@   @ @@
@@ @#$.@
@ $    @
@  .@  @
@@@@@@@@

best action: R
[10.3 -0.5 11.4  0.4]
@@@@@@@@
@. @   @
@  $   @
@   @ @@
@@ @ #$@
@ $    @
@  .@  @
@@@@@@@@

best action: U
[-0.5 10.5  9.4 -0.5]
@@@@@@@@
@. @   @
@  $   @
@   @#@@
@@ @  $@
@


