In [None]:
# Import dependencies
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import cv2
import os
import random
import time
import math
import pickle
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.optimizers import Adam


In [None]:
class Sokoban():
    def __init__(self, level):
        self.level = level
        self.player = None
        self.walls = []
        self.boxes = []
        self.goals = []
        self.load_level(level)
        self.state = self.get_state()
        self.action_space = ['u', 'd', 'l', 'r']
        self.action_size = len(self.action_space)
        self.observation_space = self.get_state().shape

    def load_level(self, level):
        with open(level) as f:
            lines = f.readlines()
        for y, line in enumerate(lines):
            for x, char in enumerate(line):
                if char == '#':
                    self.walls.append((x, y))
                elif char == '@':
                    self.boxes.append((x, y))
                elif char == '+':
                    self.boxes.append((x, y))
                    self.player = (x, y)
                elif char == '.':
                    self.goals.append((x, y))
                elif char == '*':
                    self.goals.append((x, y))
                    self.boxes.append((x, y))
                elif char == ' ':
                    pass
                else:
                    raise Exception('Invalid character %s at %s' % (char, (x, y)))

    def get_state(self):
        state = np.zeros((len(self.walls), 3), dtype=np.float32)
        for i, wall in enumerate(self.walls):
            state[i, 0] = wall[0] / 10
            state[i, 1] = wall[1] / 10
            state[i, 2] = 1
        for i, box in enumerate(self.boxes):
            state[i + len(self.walls), 0] = box[0] / 10
            state[i + len(self.walls), 1] = box[1] / 10
            state[i + len(self.walls), 2] = 2
        state[len(self.walls) + len(self.boxes), 0] = self.player[0] / 10
        state[len(self.walls) + len(self.boxes), 1] = self.player[1] / 10
        state[len(self.walls) + len(self.boxes), 2] = 3
        return state

    def step(self, action):
        if action=='u':
            new_player = (self.player[0], self.player[1] - 1)
        elif action=='d':
            new_player = (self.player[0], self.player[1] + 1)
        elif action=='l':
            new_player = (self.player[0] - 1, self.player[1])
        elif action=='r':
            new_player = (self.player[0] + 1, self.player[1])
        else:
            raise Exception('Invalid action %s' % action)
        if new_player in self.walls:
            return -1, False
        if new_player in self.boxes:
            new_box = (new_player[0] + (new_player[0] - self.player[0]), new_player[1] + (new_player[1] - self.player[1]))
            if new_box in self.walls or new_box in self.boxes:
                return -1, False
            self.boxes.remove(new_player)
            self.boxes.append(new_box)
        self.player = new_player
        return self.get_reward(), self.is_done()
    
    def get_reward(self):
        reward = 0
        for box in self.boxes:
            if box in self.goals:
                reward += 1
        return reward
    
    def is_done(self):
        for box in self.boxes:
            if box not in self.goals:
                return False
        return True
    
    def reset(self):
        self.__init__(self.level)
        return self.get_state()
    
    def render(self):
        for y in range(10):
            for x in range(10):
                if (x, y) in self.walls:
                    print('#', end='')
                elif (x, y) in self.boxes:
                    if (x, y) in self.goals:
                        print('*', end='')
                    else:
                        print('@', end='')
                elif (x, y) == self.player:
                    if (x, y) in self.goals:
                        print('+', end='')
                    else:
                        print('.', end='')
                elif (x, y) in self.goals:
                    print('.', end='')
                else:
                    print(' ', end='')
            print()
        print()

# Path: Sokoban.ipynb
def get_action(state, model, epsilon):
    if np.random.rand() <= epsilon:
        return np.random.randint(0, 4)
    else:
        q_values = model.predict(state)
        return np.argmax(q_values[0])
    
# Path: Sokoban.ipynb

def train_model(model, target_model, memory, batch_size, gamma):
    if len(memory) < batch_size * 3:
        return
    batch = random.sample(memory, batch_size)
    states = np.array([val[0] for val in batch])
    actions = np.array([val[1] for val in batch])
    rewards = np.array([val[2] for val in batch])
    next_states = np.array([val[3] for val in batch])
    dones = np.array([val[4] for val in batch])
    states = np.squeeze(states)
    next_states = np.squeeze(next_states)
    q_values = model.predict(states)
    next_q_values = target_model.predict(next_states)
    for i in range(len(batch)):
        if dones[i]:
            q_values[i, actions[i]] = rewards[i]
        else:
            q_values[i, actions[i]] = rewards[i] + gamma * np.max(next_q_values[i])
    model.fit(states, q_values, verbose=0)

# Path: Sokoban.ipynb

def play(model, target_model, memory, epsilon, epsilon_decay, epsilon_min, batch_size, gamma):
    env = Sokoban('levels/level1.txt')
    done = False
    steps = 0
    state = env.get_state()
    while not done:
        steps += 1
        action = get_action(state, model, epsilon)
        reward, done = env.step(env.action_space[action])
        next_state = env.get_state()
        memory.append((state, action, reward, next_state, done))
        state = next_state
        train_model(model, target_model, memory, batch_size, gamma)
        if epsilon > epsilon_min:
            epsilon *= epsilon_decay
    return steps

# Path: Sokoban.ipynb

def test(model):
    env = Sokoban('levels/level1.txt')
    done = False
    steps = 0
    state = env.get_state()
    while not done:
        steps += 1
        action = get_action(state, model, 0)
        reward, done = env.step(env.action_space[action])
        state = env.get_state()
    return steps

# Path: Sokoban.ipynb

def main():
    memory = deque(maxlen=100000)
    gamma = 0.95
    epsilon = 1.0
    epsilon_decay = 0.995
    epsilon_min = 0.01
    batch_size = 64
    model = Sequential()
    model.add(Dense(128, input_shape=(len(Sokoban('levels/level1.txt').get_state().flatten()),), activation='relu'))
    model.add(Dense(128, activation='relu'))
    model.add(Dense(4, activation='linear'))
    model.compile(loss='mse', optimizer=Adam(lr=0.001))
    target_model = Sequential()
    target_model.add(Dense(128, input_shape=(len(Sokoban('levels/level1.txt').get_state().flatten()),), activation='relu'))
    target_model.add(Dense(128, activation='relu'))
    target_model.add(Dense(4, activation='linear'))
    target_model.compile(loss='mse', optimizer=Adam(lr=0.001))
    target_model.set_weights(model.get_weights())
    for i in range(1000):
        steps = play(model, target_model, memory, epsilon, epsilon_decay, epsilon_min, batch_size, gamma)
        print('Episode: %s, Steps: %s, Epsilon: %s' % (i, steps, epsilon))
        if i % 10 == 0:
            target_model.set_weights(model.get_weights())
    print('Testing...')
    steps = test(model)
    print('Steps: %s' % steps)

# Path: Sokoban.ipynb

if __name__ == '__main__':
    main()
