In [1]:
#change working directory, for moduls of other package
import os
os.chdir(os.path.abspath('../../../00_src'))

In [2]:
import sys
import numpy as np
from agents.agent_ollama import AgentOllama
from environment.environment import SokobanEnvImpl
import environment.util as env_util
import environment.const as env_const
import environment.visualization as env_vis
from knowledge_graph.knowledge_graph import KnowledgeGraph
%matplotlib inline

In [3]:
attempt = 5
UP, DOWN, LEFT, RIGHT = env_const.UP, env_const.DOWN, env_const.LEFT, env_const.RIGHT
WALL, FLOOR, BOX_TARGET, BOX_ON_TARGET, BOX, PLAYER = env_const.WALL, env_const.FLOOR, env_const.BOX_TARGET, env_const.BOX_ON_TARGET, env_const.BOX, env_const.PLAYER
env = SokobanEnvImpl(max_steps=60, use_default_env=True)
kg = KnowledgeGraph(env)

In [4]:
agent_player = AgentOllama("gemma3:4b",
                     [("system", ("You are an agent who plays the game Sokoban. \n"
                                  "The target of the PLAYER is to push every BOX on top of a BOX_TARGET, with as few steps as possible. \n"
                                  "Current game state [0:WALL, 1:FLOOR, 2:BOX_TARGET, 3:BOX_ON_TARGET, 4:BOX, 5:PLAYER]: \n"
                                  "{game_state} \n"
                                  "Answer only in one word with one of the following possible actions {possible_actions}.")),        
                      ("human", ("The optimal path to solve is {shortest_trajectory}. What is the next action?"))])
agent_player.write_log("../03_resource/09_LLM_KG/output/default_env_{:02}_attempt_trajectory.log".format(attempt), clear_log_path=True)

In [5]:
room_caption_map = {WALL:"WALL", FLOOR:"FLOOR", BOX_TARGET:"BOX_TARGET", BOX_ON_TARGET:"BOX_ON_TARGET", BOX:"BOX", PLAYER:"PLAYER"}
action_caption_map = {0: "WAIT", UP:"UP", DOWN:"DOWN", LEFT:"LEFT", RIGHT:"RIGHT"}
caption_action_map = {"UP":UP, "DOWN":DOWN, "LEFT":LEFT, "RIGHT":RIGHT}

def create_input():
    game_state = env.room_state
    shortest_trajectory = np.array2string(np.vectorize(action_caption_map.get)(env_util.breadth_first_search(env)), max_line_width=sys.maxsize)
    possible_actions = np.vectorize(action_caption_map.get)(kg.get_possible_actions())
    return {"game_state": game_state,
            "shortest_trajectory" : shortest_trajectory,
            "possible_actions": possible_actions}

def doStep(step:int) -> bool:
    observation, reward_last, done, info = env.step(step)
    kg.update()
    return done

In [6]:
done = False
trajectory = []

while not done:
    agent_answer = agent_player.invoke(create_input())
    next_action = 0 # default do nothing
    print("Step {step}: Action ".format(step = len(trajectory)+1), end= "")
    for action_id in caption_action_map.keys():
        if action_id in agent_answer:
            print(action_id, end="") if next_action == 0 else print(", " + action_id, end="")
            next_action = caption_action_map.get(action_id)
    print()
    trajectory.append(next_action)
    done = doStep(next_action)


env.reset()

Step 1: Action RIGHT
Step 2: Action RIGHT
Step 3: Action RIGHT
Step 4: Action RIGHT
Step 5: Action LEFT
Step 6: Action RIGHT
Step 7: Action LEFT
Step 8: Action RIGHT
Step 9: Action LEFT
Step 10: Action RIGHT
Step 11: Action LEFT
Step 12: Action RIGHT
Step 13: Action LEFT
Step 14: Action RIGHT
Step 15: Action LEFT
Step 16: Action RIGHT
Step 17: Action LEFT
Step 18: Action RIGHT
Step 19: Action LEFT
Step 20: Action RIGHT
Step 21: Action LEFT
Step 22: Action RIGHT
Step 23: Action LEFT
Step 24: Action RIGHT
Step 25: Action LEFT
Step 26: Action LEFT
Step 27: Action LEFT
Step 28: Action RIGHT
Step 29: Action RIGHT
Step 30: Action RIGHT
Step 31: Action LEFT
Step 32: Action RIGHT
Step 33: Action LEFT
Step 34: Action RIGHT
Step 35: Action LEFT
Step 36: Action RIGHT
Step 37: Action LEFT
Step 38: Action RIGHT
Step 39: Action LEFT
Step 40: Action RIGHT
Step 41: Action LEFT
Step 42: Action LEFT
Step 43: Action RIGHT
Step 44: Action RIGHT
Step 45: Action LEFT
Step 46: Action RIGHT
Step 47: Action LE

In [7]:
trajectory_of_moves = [action for action in trajectory if action != 0]
env_vis.animate(env=env, path=trajectory_of_moves, save_ani="../03_resource/09_LLM_KG/output/default_env_{:02}_attempt_trajectory.gif".format(attempt), dpi=300)