# **Tests**

In [1]:
import zipfile
import os

zip_path = "/content/tests.zip"
extract_path = "/content/tests"

os.makedirs(extract_path, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

print(f"Files extracted to: {extract_path}")


Files extracted to: /content/tests


# **First trial**

In [None]:
from environment import *
from typing import List, Tuple
import time

class MyAgent(BlocksWorldAgent):

    def __init__(self, name: str, target_state: BlocksWorld):
        super(MyAgent, self).__init__(name=name)

        self.target_state = target_state

        """
        The agent's belief about the world state. Initially, the agent has no belief about the world state.
        """
        self.belief: BlocksWorld = None

        """
        The agent's current desire. It is expressed as a list of blocks for which the agent wants to make a plan to bring to their corresponding
        configuration in the target state.
        The list can contain a single block or a sequence of blocks that represent: (i) a stack of blocks, (ii) a row of blocks (e.g. going level by level).
        """
        self.current_desire : List[Block] = None

        """
        The current intention is the agent plan (sequence of actions) that the agent is executing to achieve the current desire.
        """
        self.current_intention: List[BlocksWorldAction] = []


    def response(self, perception: BlocksWorldPerception) -> BlocksWorldAction:
        ## if the perceived state contains the target state, the agent has achieved its goal
        if perception.current_world.contains_world(self.target_state):
            return AgentCompleted()

        ## revise the agents beliefs based on the perceived state
        self.revise_beliefs(perception.current_world, perception.previous_action_succeeded, perception)

        ## Single minded agent intention execution: if the agent still has actions left in the current intention, and the intention
        ## is still applicable to the perceived state, the agent continues executing the intention
        if len(self.current_intention) > 0 and self._can_apply_action(self.current_intention[0], perception.current_world, perception.holding_block):
            return self.current_intention.pop(0)
        else:
            ## the agent has to set a new current desire and plan to achieve it
            self.current_desire, self.current_intention = self.plan()

        ## If there is an action in the current intention, pop it and return it
        if len(self.current_intention) > 0:
            return self.current_intention.pop(0)
        else:
            ## If there is no action in the current intention, return a NoAction
            return NoAction()



    def _can_apply_action(self, act: BlocksWorldAction, world: BlocksWorld, holding_block: str) -> bool:
        """
        Check if the action can be applied to the current world state.
        """
        ## create a clone of the world
        sim_world = world.clone()

        ## apply the action to the clone, surrpressing any exceptions
        try:
            ## locking can be performed at any time, so check if the action is a lock actio
            if act.get_type() == "lock":
                ## try to lock the block
                sim_world.lock(act.get_argument())
            else:
                if holding_block is None:
                    if act.get_type() == "putdown" or act.get_type() == "stack":
                        ## If we are not holding anything, we cannot putdown or stack a block
                        return False

                    if act.get_type() == "pickup":
                        ## try to pickup the block
                        sim_world.pickup(act.get_argument())
                    elif act.get_type() == "unstack":
                        ## try to unstack the block
                        sim_world.unstack(act.get_first_arg(), act.get_second_arg())
                else:
                    ## we are holding a block, so we can only putdown or stack
                    if act.get_type() == "pickup" or act.get_type() == "unstack":
                        ## If we are holding a block, we cannot pickup or unstack
                        return False

                    if act.get_type() == "putdown":
                        ## If we want to putdown the block we have to check if it's the same block we are holding
                        if act.get_argument() != holding_block:
                            return False

                    if act.get_type() == "stack":
                        ## If we want to stack the block we have to check if it's the same block we are holding
                        if act.get_first_arg() != holding_block:
                            return False
                        ## try to stack the block
                        sim_world.stack(act.get_first_arg(), act.get_second_arg())
        except Exception as e:
            return False

        return True


    def revise_beliefs(self, perceived_world_state: BlocksWorld, previous_action_succeeded: bool, perception: BlocksWorldPerception):
        print("\nUpdating beliefs...")

        # Clone the perceived world state to update the agent's belief
        self.belief = perceived_world_state.clone()

        # Update the block currently held by the agent
        self.held_block = perception.holding_block
        print(f"Block held by agent: {self.held_block}" if self.held_block else "Agent is not holding any block.")

        # Identify all blocks currently present in the environment
        self.active_blocks = set()
        for stack in self.belief.get_stacks():
            self.active_blocks.update(stack.get_blocks())

        print(f"Active blocks in the world: {self.active_blocks}")

        # Determine which blocks are required to achieve the target state
        required_blocks = set(self.target_state.get_all_blocks())

        # Identify blocks that are missing from the environment
        self.stash_blocks = required_blocks - self.active_blocks

        # If the agent is holding a block, it should not be considered in the stash
        if self.held_block and self.held_block in self.stash_blocks:
            self.stash_blocks.remove(self.held_block)

        print(f"Blocks in stash: {self.stash_blocks}" if self.stash_blocks else "Stash is empty.")

        # Identify missing blocks
        self.missing_blocks = required_blocks - self.active_blocks
        if self.held_block and self.held_block in self.stash_blocks:
            self.stash_blocks.remove(self.held_block)

        print(f"Missing blocks: {self.missing_blocks}")

        # Display the current stack structure as perceived by the agent
        print("Current stack structure in agent's perception:")
        for stack in self.belief.get_stacks():
            print(f"Stack: {stack.get_blocks()}")

        # Identify the desired target stack and the base block
        self.desired_stack = self.target_state.get_stacks()[0].get_blocks()
        self.base_block = self.desired_stack[0] if self.desired_stack else None

        print(f"Target stack: {self.desired_stack}")
        print(f"Base block: {self.base_block}")

        # Check if any blocks required for the target state are missing
        for block in self.desired_stack:
            if not self.belief.exists(block):
                print(f"! Block {block} has disappeared! Resetting intentions.")
                self.current_intention = []
                return

        # If the last action was unsuccessful, reset the agent's intentions
        if not previous_action_succeeded:
            print("! Unexpected failure detected! Resetting intentions.")
            self.current_intention = []
        else:
            print("Last action was successful.")

    def plan(self) -> Tuple[List[Block], List[BlocksWorldAction]]:
        print("Agent is making a plan...")

        actions = []

        # If the agent is currently holding a block, it must put it down before picking another one
        if self.held_block:
            print(f"Agent is already holding {self.held_block}, it must be placed down before picking another block.")

            # Check if the held block should be placed on another block
            for i in range(len(self.desired_stack) - 1):
                block = self.desired_stack[i + 1]
                below_block = self.desired_stack[i]

                if block == self.held_block:
                    # Ensure the below block exists and is not in the stash
                    if not self.belief.exists(below_block) or below_block in self.stash_blocks:
                        print(f"! Cannot place {block} on {below_block}, because it is missing or in stash. Waiting...")
                        return [], [NoAction()]

                    # Verify that the below block is still at the top of its stack
                    try:
                        stack = self.belief.get_stack(below_block)
                        if stack.get_top_block() != below_block:
                            print(f"! Cannot place {block} on {below_block}, because {below_block} is no longer the topmost block. Recalculating...")
                            return [], [NoAction()]
                    except ValueError:
                        print(f"! Cannot place {block} on {below_block}, as {below_block} is no longer in a stack. Waiting...")
                        return [], [NoAction()]

                    # Place the block on the below block
                    print(f"Placing {block} on {below_block}.")
                    return [], [Stack(block, below_block)]

            # If the block cannot be placed on another, put it down on the table
            print(f"Placing {self.held_block} on the table.")
            return [], [PutDown(self.held_block)]

        # Ensure the base block of the desired stack exists before proceeding
        if not self.belief.exists(self.base_block) or self.base_block in self.stash_blocks:
            print(f"Base block {self.base_block} is missing or in stash! Waiting for it to reappear...")
            return [], [NoAction()]

        # If the base block is on the table, attempt to lock it
        if self.belief.is_on_table(self.base_block):
            try:
                base_stack = self.belief.get_stack(self.base_block)
                if not base_stack.is_locked(self.base_block):
                    print(f"Locking target base block: {self.base_block}")
                    actions.append(Lock(self.base_block))
            except ValueError:
                print(f"Base block {self.base_block} is not in a valid stack. Waiting for it to reappear...")
                return [], [NoAction()]

        # Determine the next required block for the current step
        next_required_block = None
        for i in range(len(self.desired_stack) - 1):
            block = self.desired_stack[i + 1]
            below_block = self.desired_stack[i]

            # Only select the first missing block that we can actually place now
            if not self.belief.exists(block) and self.belief.exists(below_block):
                next_required_block = block
                break

        if next_required_block:
            print(f"Next required block for movement: {next_required_block}")

            # Only wait if the missing block is required for the next immediate move
            if next_required_block in self.stash_blocks:
                print(f"Block {next_required_block} is in stash and its base is ready. Waiting for it to reappear...")
                return [], [NoAction()]

            print(f"Block {next_required_block} is missing and not in stash. Waiting...")
            return [], [NoAction()]

        # Move all blocks to the table before reconstructing the target stack
        for block in self.active_blocks:
            if not self.belief.exists(block):
                continue

            try:
                stack = self.belief.get_stack(block)
            except ValueError:
                continue

            # Skip locked blocks
            if stack.is_locked(block):
                continue

            # If the block is not on the table, unstack it and place it down
            if not self.belief.is_on_table(block):
                if stack.get_top_block() == block:
                    below_block = stack.get_below(block)
                    actions.append(Unstack(block, below_block))
                    actions.append(PutDown(block))

        if actions:
            print("Placing all blocks on the table before constructing the final stack.")
            return [], actions

        print("All blocks are on the table! Starting construction.")

        # Construct the target stack from the bottom up
        for i in range(len(self.desired_stack) - 1):
            block = self.desired_stack[i + 1]
            below_block = self.desired_stack[i]

            # Ensure that the required blocks exist and are not in the stash
            if not self.belief.exists(block) or block in self.stash_blocks:
                print(f"! Block {block} is missing or in stash. Waiting...")
                return [], [NoAction()]

            if not self.belief.exists(below_block) or below_block in self.stash_blocks:
                print(f"! Block {below_block} is missing or in stash. Waiting...")
                return [], [NoAction()]

            try:
                stack = self.belief.get_stack(block)
            except ValueError:
                print(f"! Block {block} is no longer in a stack. Waiting...")
                return [], [NoAction()]

            # If the block is already on the below block, lock it in place
            if stack.get_below(block) == below_block:
                if not stack.is_locked(block):
                    print(f"Block {block} is already on {below_block}, locking it.")
                    actions.append(Lock(block))
                continue

            # Move the block to the correct position
            print(f"Moving block {block} onto {below_block}.")
            actions.append(PickUp(block))
            actions.append(Stack(block, below_block))
            actions.append(Lock(block))

        # Check if any blocks are missing from the world before completing the task
        missing_from_world = set(self.target_state.get_all_blocks()) - set(self.belief.get_all_blocks())

        if missing_from_world:
            print(f"Cannot complete! Missing blocks: {missing_from_world}. Waiting for them to reappear...")
            return [], [NoAction()]

        # If no actions are needed, the agent has completed its task
        if not actions:
            print("All blocks are present and correctly placed! Agent completes the task.")
            return [], [AgentCompleted()]

        return [], actions


    def status_string(self):
        # TODO: return information about the agent's current state and current plan.
        return str(self) + ": Hai agentule, pune si tu blocurile alea."



class Tester(object):
    STEP_DELAY = 0.5
    TEST_SUITE = "tests/0e-large/"

    EXT = ".txt"
    SI  = "si"
    SF  = "sf"

    DYNAMICS_PROB = .0

    AGENT_NAME = "*A"

    def __init__(self):
        self._environment = None
        self._agents = []

        self._initialize_environment(Tester.TEST_SUITE)
        self._initialize_agents(Tester.TEST_SUITE)



    def _initialize_environment(self, test_suite: str) -> None:
        filename = test_suite + Tester.SI + Tester.EXT

        with open(filename) as input_stream:
            self._environment = DynamicEnvironment(BlocksWorld(input_stream=input_stream))


    def _initialize_agents(self, test_suite: str) -> None:
        filename = test_suite + Tester.SF + Tester.EXT

        agent_states = {}

        with open(filename) as input_stream:
            desires = BlocksWorld(input_stream=input_stream)
            agent = MyAgent(Tester.AGENT_NAME, desires)

            agent_states[agent] = desires
            self._agents.append(agent)

            self._environment.add_agent(agent, desires, None)

            print("Agent %s desires:" % str(agent))
            print(str(desires))


    def make_steps(self):
        print("\n\n================================================= INITIAL STATE:")
        print(str(self._environment))
        print("\n\n=================================================")

        completed = False
        nr_steps = 0

        while not completed:
            completed = self._environment.step()

            time.sleep(Tester.STEP_DELAY)
            print(str(self._environment))

            for ag in self._agents:
                print(ag.status_string())

            nr_steps += 1

            print("\n\n================================================= STEP %i completed." % nr_steps)

        print("\n\n================================================= ALL STEPS COMPLETED")



if __name__ == "__main__":
    tester = Tester()
    tester.make_steps()

Agent A desires:
 [A][H][J]               
 [B][L][K][F][Q][I]      
 [C][P][O][D][M][N][G][E]


 A                                              
 <>                                             

                                               
 [A]   [P]   [Q]                                 
 [C]   [L]   [O]   [F]   [M]   [N]               
 [B]   [H]   [K]   [D]   [I]   [J]   [E]   [G]   
=====                                           
 #0                                             
Stash:  



Agent A at #0 holding none; previous action: successful

Updating beliefs...
Agent is not holding any block.
Active blocks in the world: {L, A, I, G, P, D, B, K, H, C, F, J, N, E, M, Q, O}
Stash is empty.
Missing blocks: set()
Current stack structure in agent's perception:
Stack: [B, C, A]
Stack: [H, L, P]
Stack: [K, O, Q]
Stack: [D, F]
Stack: [I, M]
Stack: [J, N]
Stack: [E]
Stack: [G]
Target stack: [C, B, A]
Base block: C
Last action was successful.
Agent is making a plan...
Placing all blo

# **Intermediate version**

In [6]:
from environment import *
from typing import List, Tuple
import time

class MyAgent(BlocksWorldAgent):

    def __init__(self, name: str, target_state: BlocksWorld):
        super(MyAgent, self).__init__(name=name)

        self.target_state = target_state

        """
        The agent's belief about the world state. Initially, the agent has no belief about the world state.
        """
        self.belief: BlocksWorld = None

        """
        The agent's current desire. It is expressed as a list of blocks for which the agent wants to make a plan to bring to their corresponding
        configuration in the target state.
        The list can contain a single block or a sequence of blocks that represent: (i) a stack of blocks, (ii) a row of blocks (e.g. going level by level).
        """
        self.current_desire : List[Block] = None

        """
        The current intention is the agent plan (sequence of actions) that the agent is executing to achieve the current desire.
        """
        self.current_intention: List[BlocksWorldAction] = []


    def response(self, perception: BlocksWorldPerception) -> BlocksWorldAction:
        ## if the perceived state contains the target state, the agent has achieved its goal
        if perception.current_world.contains_world(self.target_state):
            return AgentCompleted()

        ## revise the agents beliefs based on the perceived state
        self.revise_beliefs(perception.current_world, perception.previous_action_succeeded, perception)

        ## Single minded agent intention execution: if the agent still has actions left in the current intention, and the intention
        ## is still applicable to the perceived state, the agent continues executing the intention
        if len(self.current_intention) > 0 and self._can_apply_action(self.current_intention[0], perception.current_world, perception.holding_block):
            return self.current_intention.pop(0)
        else:
            ## the agent has to set a new current desire and plan to achieve it
            self.current_desire, self.current_intention = self.plan()

        ## If there is an action in the current intention, pop it and return it
        if len(self.current_intention) > 0:
            return self.current_intention.pop(0)
        else:
            ## If there is no action in the current intention, return a NoAction
            return NoAction()



    def _can_apply_action(self, act: BlocksWorldAction, world: BlocksWorld, holding_block: str) -> bool:
        """
        Check if the action can be applied to the current world state.
        """
        ## create a clone of the world
        sim_world = world.clone()

        ## apply the action to the clone, surrpressing any exceptions
        try:
            ## locking can be performed at any time, so check if the action is a lock actio
            if act.get_type() == "lock":
                ## try to lock the block
                sim_world.lock(act.get_argument())
            else:
                if holding_block is None:
                    if act.get_type() == "putdown" or act.get_type() == "stack":
                        ## If we are not holding anything, we cannot putdown or stack a block
                        return False

                    if act.get_type() == "pickup":
                        ## try to pickup the block
                        sim_world.pickup(act.get_argument())
                    elif act.get_type() == "unstack":
                        ## try to unstack the block
                        sim_world.unstack(act.get_first_arg(), act.get_second_arg())
                else:
                    ## we are holding a block, so we can only putdown or stack
                    if act.get_type() == "pickup" or act.get_type() == "unstack":
                        ## If we are holding a block, we cannot pickup or unstack
                        return False

                    if act.get_type() == "putdown":
                        ## If we want to putdown the block we have to check if it's the same block we are holding
                        if act.get_argument() != holding_block:
                            return False

                    if act.get_type() == "stack":
                        ## If we want to stack the block we have to check if it's the same block we are holding
                        if act.get_first_arg() != holding_block:
                            return False
                        ## try to stack the block
                        sim_world.stack(act.get_first_arg(), act.get_second_arg())
        except Exception as e:
            return False

        return True


    def revise_beliefs(self, perceived_world_state: BlocksWorld, previous_action_succeeded: bool, perception: BlocksWorldPerception):
        print("\nUpdating beliefs...")

        self.belief = perceived_world_state.clone()
        self.held_block = perception.holding_block
        print(f"Block held by agent: {self.held_block}" if self.held_block else "Agent is not holding any block.")

        self.active_blocks = set()
        for stack in self.belief.get_stacks():
            self.active_blocks.update(stack.get_blocks())

        print(f"Active blocks in the world: {self.active_blocks}")

        required_blocks = set(self.target_state.get_all_blocks())
        self.stash_blocks = required_blocks - self.active_blocks

        if self.held_block and self.held_block in self.stash_blocks:
            self.stash_blocks.remove(self.held_block)

        print(f"Blocks in stash: {self.stash_blocks}" if self.stash_blocks else "Stash is empty.")

        self.missing_blocks = required_blocks - self.active_blocks
        if self.held_block and self.held_block in self.stash_blocks:
            self.stash_blocks.remove(self.held_block)

        print(f"Missing blocks: {self.missing_blocks}")

        print("Current stack structure in agent's perception:")
        for stack in self.belief.get_stacks():
            print(f"Stack: {stack.get_blocks()}")

        self.desired_stacks = [stack.get_blocks() for stack in self.target_state.get_stacks()]
        self.base_blocks = [stack[0] for stack in self.desired_stacks if stack]

        print(f"Target stacks: {self.desired_stacks}")
        print(f"Base blocks of stacks: {self.base_blocks}")

        print("\nComparing current world state with desired state...")

        current_positions = {}
        target_positions = {}

        def remove_locked_duplicates(stack):
            cleaned_stack = []
            seen_blocks = set()
            for block in stack:
                if block not in seen_blocks:
                    cleaned_stack.append(block)
                    seen_blocks.add(block)
            return cleaned_stack

        for stack in self.belief.get_stacks():
            stack_blocks = remove_locked_duplicates(stack.get_blocks())
            for i, block in enumerate(stack_blocks):
                current_positions[block] = (i, stack_blocks)

        for stack in self.target_state.get_stacks():
            stack_blocks = stack.get_blocks()
            for i, block in enumerate(stack_blocks):
                target_positions[block] = (i, stack_blocks)

        misplaced_blocks = []
        for block in self.active_blocks:
            current_info = current_positions.get(block, None)
            target_info = target_positions.get(block, None)

            if not current_info:
                print(f"Block {block} is missing from the current environment!")
                continue

            if not target_info:
                print(f"Block {block} is not part of the target state!")
                continue

            current_index, current_stack = current_info
            target_index, target_stack = target_info

            if current_stack == target_stack and current_index == target_index:
                print(f"Block {block} is correctly placed.")
            else:
                print(f"Block {block} is misplaced! "
                      f"Current: Stack {current_stack}, Position {current_index} | "
                      f"Target: Stack {target_stack}, Position {target_index}")
                misplaced_blocks.append(block)

        print(f"\nTotal misplaced blocks: {len(misplaced_blocks)} - {misplaced_blocks}")

        self.misplaced_blocks = misplaced_blocks

        if not previous_action_succeeded:
            print("! Unexpected failure detected! Resetting intentions.")
            self.current_intention = []
        else:
            print("Last action was successful.")


    def plan(self) -> Tuple[List[Block], List[BlocksWorldAction]]:
        print("Agent is making a plan...")

        actions = []

        misplaced_blocks = self.misplaced_blocks
        correct_blocks = self.active_blocks - set(misplaced_blocks)

        print(f"Blocks correctly placed: {correct_blocks}")
        print(f"Blocks misplaced: {misplaced_blocks}")

        if not misplaced_blocks:
            print("All blocks are correctly placed! Agent completes the task.")
            return [], [AgentCompleted()]

        if self.held_block:
            print(f"Agent is holding {self.held_block}, deciding where to place it...")

            for stack in self.desired_stacks:
                for i in range(len(stack) - 1):
                    block = stack[i + 1]
                    below_block = stack[i]

                    if block == self.held_block:
                        if not self.belief.exists(below_block) or below_block in self.stash_blocks:
                            print(f"! Cannot place {block} on {below_block}, because it is missing or in stash. Waiting...")
                            return [], [NoAction()]

                        try:
                            stack_data = self.belief.get_stack(below_block)
                            if stack_data.get_top_block() != below_block:
                                print(f"! Cannot place {block} on {below_block}, because {below_block} is not the topmost block.")
                                return [], [NoAction()]
                        except ValueError:
                            print(f"! Cannot place {block} on {below_block}, as {below_block} is no longer in a stack.")
                            return [], [NoAction()]

                        print(f"Placing {block} on {below_block}.")
                        return [], [Stack(block, below_block)]

            print(f"Placing {self.held_block} on the table.")
            return [], [PutDown(self.held_block)]

        for base_block in self.base_blocks:
            if not self.belief.exists(base_block) or base_block in self.stash_blocks:
                print(f"Base block {base_block} is missing or in stash! Waiting for it to reappear...")
                return [], [NoAction()]

            if self.belief.is_on_table(base_block):
                try:
                    base_stack = self.belief.get_stack(base_block)
                    if not base_stack.is_locked(base_block):
                        print(f"Locking target base block: {base_block}")
                        actions.append(Lock(base_block))
                except ValueError:
                    print(f"Base block {base_block} is not in a valid stack. Waiting for it to reappear...")
                    continue

        for block in self.active_blocks:
            if not self.belief.exists(block):
                continue

            try:
                stack = self.belief.get_stack(block)
            except ValueError:
                continue

            if stack.is_locked(block):
                continue

            if not self.belief.is_on_table(block):
                if stack.get_top_block() == block:
                    below_block = stack.get_below(block)
                    actions.append(Unstack(block, below_block))
                    actions.append(PutDown(block))

        if actions:
            print("Placing all blocks on the table before constructing the final stack.")
            return [], actions

        print("All blocks are on the table! Starting construction.")

        for stack in self.desired_stacks:
            for i in range(len(stack) - 1):
                block = stack[i + 1]
                below_block = stack[i]

                if not self.belief.exists(block) or block in self.stash_blocks:
                    print(f"! Block {block} is missing or in stash. Waiting...")
                    return [], [NoAction()]

                if not self.belief.exists(below_block) or below_block in self.stash_blocks:
                    print(f"! Block {below_block} is missing or in stash. Waiting...")
                    return [], [NoAction()]

                try:
                    stack_data = self.belief.get_stack(block)
                except ValueError:
                    print(f"! Block {block} is no longer in a stack. Waiting...")
                    return [], [NoAction()]

                if stack_data.get_below(block) == below_block:
                    if not stack_data.is_locked(block):
                        print(f"Block {block} is already on {below_block}, locking it.")
                        actions.append(Lock(block))
                    continue

                print(f"Moving block {block} onto {below_block}.")
                actions.append(PickUp(block))
                actions.append(Stack(block, below_block))
                actions.append(Lock(block))

        return [], actions

    def status_string(self):
        # TODO: return information about the agent's current state and current plan.
        return str(self) + ": Hai agentule, pune si tu blocurile alea."



class Tester(object):
    STEP_DELAY = 0.5
    TEST_SUITE = "tests/0e-large/"

    EXT = ".txt"
    SI  = "si"
    SF  = "sf"

    DYNAMICS_PROB = .5

    AGENT_NAME = "*A"

    def __init__(self):
        self._environment = None
        self._agents = []

        self._initialize_environment(Tester.TEST_SUITE)
        self._initialize_agents(Tester.TEST_SUITE)



    def _initialize_environment(self, test_suite: str) -> None:
        filename = test_suite + Tester.SI + Tester.EXT

        with open(filename) as input_stream:
            self._environment = DynamicEnvironment(BlocksWorld(input_stream=input_stream))


    def _initialize_agents(self, test_suite: str) -> None:
        filename = test_suite + Tester.SF + Tester.EXT

        agent_states = {}

        with open(filename) as input_stream:
            desires = BlocksWorld(input_stream=input_stream)
            agent = MyAgent(Tester.AGENT_NAME, desires)

            agent_states[agent] = desires
            self._agents.append(agent)

            self._environment.add_agent(agent, desires, None)

            print("Agent %s desires:" % str(agent))
            print(str(desires))


    def make_steps(self):
        print("\n\n================================================= INITIAL STATE:")
        print(str(self._environment))
        print("\n\n=================================================")

        completed = False
        nr_steps = 0

        while not completed:
            completed = self._environment.step()

            time.sleep(Tester.STEP_DELAY)
            print(str(self._environment))

            for ag in self._agents:
                print(ag.status_string())

            nr_steps += 1

            print("\n\n================================================= STEP %i completed." % nr_steps)

        print("\n\n================================================= ALL STEPS COMPLETED")



if __name__ == "__main__":
    tester = Tester()
    tester.make_steps()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Stack: [E, E, E]
Stack: [N, N, N]
Stack: [O, O, O]
Stack: [C, B, A, C, B, A, C, B, A]
Stack: [F]
Stack: [G, G, G]
Target stacks: [[C, B, A], [P, L, H], [O, K, J], [D, F], [M, Q], [N, I], [G], [E]]
Base blocks of stacks: [C, P, O, D, M, N, G, E]

🔎 Comparing current world state with desired state...
❌ Block O is misplaced! Current: Stack [O], Position 0 | Target: Stack [O, K, J], Position 0
❌ Block F is misplaced! Current: Stack [F], Position 0 | Target: Stack [D, F], Position 1
❌ Block D is misplaced! Current: Stack [D], Position 0 | Target: Stack [D, F], Position 0
✅ Block E is correctly placed.
❌ Block N is misplaced! Current: Stack [N], Position 0 | Target: Stack [N, I], Position 0
✅ Block A is correctly placed.
✅ Block L is correctly placed.
✅ Block P is correctly placed.
✅ Block H is correctly placed.
❌ Block M is misplaced! Current: Stack [M], Position 0 | Target: Stack [M, Q], Position 0
✅ Block G is correctly plac

# **Correct Version**

In [7]:
%%writefile my.py
from environment import *
from typing import List, Tuple
import time

class MyAgent(BlocksWorldAgent):

    def __init__(self, name: str, target_state: BlocksWorld):
        super(MyAgent, self).__init__(name=name)

        self.target_state = target_state

        """
        The agent's belief about the world state. Initially, the agent has no belief about the world state.
        """
        self.belief: BlocksWorld = None

        """
        The agent's current desire. It is expressed as a list of blocks for which the agent wants to make a plan to bring to their corresponding
        configuration in the target state.
        The list can contain a single block or a sequence of blocks that represent: (i) a stack of blocks, (ii) a row of blocks (e.g. going level by level).
        """
        self.current_desire : List[Block] = None

        """
        The current intention is the agent plan (sequence of actions) that the agent is executing to achieve the current desire.
        """
        self.current_intention: List[BlocksWorldAction] = []


    def response(self, perception: BlocksWorldPerception) -> BlocksWorldAction:
        ## if the perceived state contains the target state, the agent has achieved its goal
        if perception.current_world.contains_world(self.target_state):
            return AgentCompleted()

        ## revise the agents beliefs based on the perceived state
        self.revise_beliefs(perception.current_world, perception.previous_action_succeeded, perception)

        ## Single minded agent intention execution: if the agent still has actions left in the current intention, and the intention
        ## is still applicable to the perceived state, the agent continues executing the intention
        if len(self.current_intention) > 0 and self._can_apply_action(self.current_intention[0], perception.current_world, perception.holding_block):
            return self.current_intention.pop(0)
        else:
            ## the agent has to set a new current desire and plan to achieve it
            self.current_desire, self.current_intention = self.plan()

        ## If there is an action in the current intention, pop it and return it
        if len(self.current_intention) > 0:
            return self.current_intention.pop(0)
        else:
            ## If there is no action in the current intention, return a NoAction
            return NoAction()



    def _can_apply_action(self, act: BlocksWorldAction, world: BlocksWorld, holding_block: str) -> bool:
        """
        Check if the action can be applied to the current world state.
        """
        ## create a clone of the world
        sim_world = world.clone()

        ## apply the action to the clone, surrpressing any exceptions
        try:
            ## locking can be performed at any time, so check if the action is a lock actio
            if act.get_type() == "lock":
                ## try to lock the block
                sim_world.lock(act.get_argument())
            else:
                if holding_block is None:
                    if act.get_type() == "putdown" or act.get_type() == "stack":
                        ## If we are not holding anything, we cannot putdown or stack a block
                        return False

                    if act.get_type() == "pickup":
                        ## try to pickup the block
                        sim_world.pickup(act.get_argument())
                    elif act.get_type() == "unstack":
                        ## try to unstack the block
                        sim_world.unstack(act.get_first_arg(), act.get_second_arg())
                else:
                    ## we are holding a block, so we can only putdown or stack
                    if act.get_type() == "pickup" or act.get_type() == "unstack":
                        ## If we are holding a block, we cannot pickup or unstack
                        return False

                    if act.get_type() == "putdown":
                        ## If we want to putdown the block we have to check if it's the same block we are holding
                        if act.get_argument() != holding_block:
                            return False

                    if act.get_type() == "stack":
                        ## If we want to stack the block we have to check if it's the same block we are holding
                        if act.get_first_arg() != holding_block:
                            return False
                        ## try to stack the block
                        sim_world.stack(act.get_first_arg(), act.get_second_arg())
        except Exception as e:
            return False

        return True


    def revise_beliefs(self, perceived_world_state: BlocksWorld, previous_action_succeeded: bool, perception: BlocksWorldPerception):
        print("\nUpdating beliefs...")

        # Update internal belief and held block from perception
        self.belief = perceived_world_state.clone()
        self.held_block = perception.holding_block
        print(f"Block held by agent: {self.held_block}" if self.held_block else "Agent is not holding any block.")

        # Get all blocks currently present in the perceived world
        self.active_blocks = set()
        for stack in self.belief.get_stacks():
            self.active_blocks.update(stack.get_blocks())

        print(f"Active blocks in the world: {self.active_blocks}")

        # Identify blocks needed for the goal but not currently present
        required_blocks = set(self.target_state.get_all_blocks())
        self.stash_blocks = required_blocks - self.active_blocks

        # Remove held block from stash if already in hand
        if self.held_block and self.held_block in self.stash_blocks:
            self.stash_blocks.remove(self.held_block)

        print(f"Blocks in stash: {self.stash_blocks}" if self.stash_blocks else "Stash is empty.")

        # Identify missing blocks (still needed)
        self.missing_blocks = required_blocks - self.active_blocks
        if self.held_block and self.held_block in self.stash_blocks:
            self.stash_blocks.remove(self.held_block)

        print(f"Missing blocks: {self.missing_blocks}")

        # Display current perceived stack structure
        print("Current stack structure in agent's perception:")
        for stack in self.belief.get_stacks():
            print(f"Stack: {stack.get_blocks()}")

        # Extract desired stack configuration and base blocks
        self.desired_stacks = [stack.get_blocks() for stack in self.target_state.get_stacks()]
        self.base_blocks = [stack[0] for stack in self.desired_stacks if stack]

        print(f"Target stacks: {self.desired_stacks}")
        print(f"Base blocks of stacks: {self.base_blocks}")

        print("\nComparing current world state with desired state...")

        current_positions = {}
        target_positions = {}

        # Helper: remove duplicate blocks in a stack (if any)
        def remove_locked_duplicates(stack):
            cleaned_stack = []
            seen_blocks = set()
            for block in stack:
                if block not in seen_blocks:
                    cleaned_stack.append(block)
                    seen_blocks.add(block)
            return cleaned_stack

        # Build current block positions
        for stack in self.belief.get_stacks():
            stack_blocks = remove_locked_duplicates(stack.get_blocks())
            for i, block in enumerate(stack_blocks):
                current_positions[block] = (i, stack_blocks)

        # Build target block positions
        for stack in self.target_state.get_stacks():
            stack_blocks = stack.get_blocks()
            for i, block in enumerate(stack_blocks):
                target_positions[block] = (i, stack_blocks)

        misplaced_blocks = []

        # Compare block positions and find misplaced ones
        for block in self.active_blocks:
            current_info = current_positions.get(block, None)
            target_info = target_positions.get(block, None)

            if not current_info:
                print(f"Block {block} is missing from the current environment!")
                continue

            if not target_info:
                print(f"Block {block} is not part of the target state!")
                continue

            current_index, current_stack = current_info
            target_index, target_stack = target_info

            if current_stack == target_stack and current_index == target_index:
                print(f"✅ Block {block} is correctly placed.")
            else:
                print(f"❌ Block {block} is misplaced! "
                      f"Current: Stack {current_stack}, Position {current_index} | "
                      f"Target: Stack {target_stack}, Position {target_index}")
                misplaced_blocks.append(block)

        print(f"\nTotal misplaced blocks: {len(misplaced_blocks)} - {misplaced_blocks}")

        self.misplaced_blocks = misplaced_blocks

        # If last action failed unexpectedly, reset plan
        if not previous_action_succeeded:
            print("! Unexpected failure detected! Resetting intentions.")
            self.current_intention = []
        else:
            print("Last action was successful.")


    def plan(self) -> Tuple[List[Block], List[BlocksWorldAction]]:
        print("Agent is making a plan...")

        actions = []
        misplaced_blocks = self.misplaced_blocks
        correct_blocks = self.active_blocks - set(misplaced_blocks)

        print(f"Blocks correctly placed: {correct_blocks}")
        print(f"Blocks misplaced: {misplaced_blocks}")

        if not misplaced_blocks:
            print("All blocks are correctly placed! Agent completes the task.")
            return [], [AgentCompleted()]

        # STEP 1: If the agent is holding a block, try placing it
        if self.held_block:
            print(f"Agent is holding {self.held_block}, deciding where to place it...")

            for stack in self.desired_stacks:
                for i in range(len(stack) - 1):
                    block = stack[i + 1]
                    below_block = stack[i]

                    if block == self.held_block:
                        if not self.belief.exists(below_block) or below_block in self.stash_blocks:
                            print(f"! Cannot place {block} on {below_block}, base missing. Checking other moves...")
                            continue

                        try:
                            stack_data = self.belief.get_stack(below_block)
                            if stack_data.get_top_block() != below_block:
                                print(f"! Cannot place {block} on {below_block}, not topmost. Checking other moves...")
                                continue
                        except ValueError:
                            print(f"! Cannot place {block} on {below_block}, stack error. Checking other moves...")
                            continue

                        print(f"Placing {block} on {below_block}.")
                        return [], [Stack(block, below_block)]

            print(f"Placing {self.held_block} on the table.")
            return [], [PutDown(self.held_block)]

        # STEP 2: Make sure base blocks are in position and locked
        for base_block in self.base_blocks:
            if not self.belief.exists(base_block) or base_block in self.stash_blocks:
                print(f"Base block {base_block} is missing! Checking other possible moves...")
                continue

            if self.belief.is_on_table(base_block):
                try:
                    base_stack = self.belief.get_stack(base_block)
                    if not base_stack.is_locked(base_block):
                        print(f"Locking base block: {base_block}")
                        actions.append(Lock(base_block))
                except ValueError:
                    print(f"Error accessing {base_block}. Skipping lock.")
                    continue

        # STEP 3: Move misplaced blocks to the table
        moved_blocks = False
        for block in self.active_blocks:
            if not self.belief.exists(block):
                continue

            try:
                stack = self.belief.get_stack(block)
            except ValueError:
                continue

            if stack.is_locked(block):
                continue

            if not self.belief.is_on_table(block):
                if stack.get_top_block() == block:
                    below_block = stack.get_below(block)
                    print(f"Moving block {block} to the table.")
                    actions.append(Unstack(block, below_block))
                    actions.append(PutDown(block))
                    moved_blocks = True

        if actions:
            print("Executing useful moves before waiting.")
            return [], actions

        # STEP 4: Build the desired stacks in correct order
        for stack in self.desired_stacks:
            for i in range(len(stack) - 1):
                block = stack[i + 1]
                below_block = stack[i]

                if not self.belief.exists(block) or block in self.stash_blocks:
                    print(f"! Block {block} missing. Checking alternatives before waiting...")
                    continue

                if not self.belief.exists(below_block) or below_block in self.stash_blocks:
                    print(f"! Block {below_block} missing. Checking alternatives before waiting...")
                    continue

                try:
                    stack_data = self.belief.get_stack(block)
                except ValueError:
                    print(f"! Block {block} is no longer in a stack. Checking alternatives...")
                    continue

                if stack_data.get_below(block) == below_block:
                    if not stack_data.is_locked(block):
                        print(f"Block {block} is already in position. Locking it.")
                        actions.append(Lock(block))
                    continue

                # Check if we need to unstack before picking up
                if not self.belief.is_on_table(block):
                    top_block = stack_data.get_top_block()
                    if top_block != block:
                        print(f"Block {block} is not at the top, unstacking {top_block}.")
                        actions.append(Unstack(top_block, stack_data.get_below(top_block)))
                        actions.append(PutDown(top_block))
                        continue

                print(f"Moving block {block} onto {below_block}.")
                actions.append(PickUp(block))
                actions.append(Stack(block, below_block))
                actions.append(Lock(block))

        if actions:
            return [], actions

        print("No more useful moves. Waiting...")
        return [], [NoAction()]


    def status_string(self):
        # TODO: return information about the agent's current state and current plan.
        return str(self) + ": Hai agentule, pune si tu blocurile alea."



class Tester(object):
    STEP_DELAY = 0.5
    TEST_SUITE = "tests/0e-large/"

    EXT = ".txt"
    SI  = "si"
    SF  = "sf"

    DYNAMICS_PROB = .5

    AGENT_NAME = "*A"

    def __init__(self):
        self._environment = None
        self._agents = []

        self._initialize_environment(Tester.TEST_SUITE)
        self._initialize_agents(Tester.TEST_SUITE)



    def _initialize_environment(self, test_suite: str) -> None:
        filename = test_suite + Tester.SI + Tester.EXT

        with open(filename) as input_stream:
            self._environment = DynamicEnvironment(BlocksWorld(input_stream=input_stream))


    def _initialize_agents(self, test_suite: str) -> None:
        filename = test_suite + Tester.SF + Tester.EXT

        agent_states = {}

        with open(filename) as input_stream:
            desires = BlocksWorld(input_stream=input_stream)
            agent = MyAgent(Tester.AGENT_NAME, desires)

            agent_states[agent] = desires
            self._agents.append(agent)

            self._environment.add_agent(agent, desires, None)

            print("Agent %s desires:" % str(agent))
            print(str(desires))


    def make_steps(self):
        print("\n\n================================================= INITIAL STATE:")
        print(str(self._environment))
        print("\n\n=================================================")

        completed = False
        nr_steps = 0

        while not completed:
            completed = self._environment.step()

            time.sleep(Tester.STEP_DELAY)
            print(str(self._environment))

            for ag in self._agents:
                print(ag.status_string())

            nr_steps += 1

            print("\n\n================================================= STEP %i completed." % nr_steps)

        print("\n\n================================================= ALL STEPS COMPLETED")



if __name__ == "__main__":
    tester = Tester()
    tester.make_steps()

Writing my.py
