## 5. WAP in Python to calculate the heuristic value for Blocks World Problem

Heuristic is calculated as:

- For each block that has the correct support structure, give +1 to every block in the support structure. 
- For each block that has a wrong support structure: -1 to every block in the support structure

---

Here, the state for blocks world is being represented as a dictionary. The keys represent the various blocks and the values represent the blocks below each. Implementation of heuristic() function based on support structure:

In [1]:
def heuristic(state, goal_state):
    """
    Calculate heuristic value for any particular blocks world state.
    Here, it is the evaluation of correct support structure through parsing state (dict).
    """
    bottom_block_g = [b for b in goal_state.keys() if goal_state[b] is None].pop()

    h = 0
    for key in state.keys():
        i = 0
        bottom_block = pointer = state[key]
        while pointer is not None:
            i += 1
            bottom_block = pointer
            pointer = state[pointer]  # change pointer to block below
        h += i if bottom_block == bottom_block_g else -i

    return h


goal_state = {
    "A": None,
    "B": "A",
    "C": "B",
    "D": "C",
}  # A on table, B on top of A, C on top of B, D on top of C

test_1 = {"A": "D", "B": None, "C": "B", "D": "C"}
test_2 = {"A": None, "B": None, "C": "B", "D": "C"}
test_3 = {"A": None, "B": None, "C": None, "D": None}

print("Heuristic value (Test 1):", heuristic(test_1, goal_state))
print("Heuristic value (Test 2):", heuristic(test_2, goal_state))
print("Heuristic value (Test 3):", heuristic(test_3, goal_state))

print("\nHeuristic value for Goal State:", heuristic(goal_state, goal_state))

Heuristic value (Test 1): -6
Heuristic value (Test 2): -3
Heuristic value (Test 3): 0

Heuristic value for Goal State: 6


This function can be used as part of previous question. This can be done by integrating a priority queue which makes use of heuristic values calculated for successors. The `heapq` module is used for this purpose. Also, as `heapq` module seems to consider (parent, child) ordered pairs in OPEN queue as separate elements, the node pairs are serialized/deserialized to and from string using `json` module to account for this behavior.



In [2]:
from json import dumps, loads
from heapq import heappush, heappop


class BlocksWorld:
    def __init__(
        self,
        initial_state={"A": "C", "B": None, "C": None},
        goal_state={"A": None, "B": "A", "C": "B"},
    ):
        """
        Initialize open queue with initial state, goal state and closed set for visited nodes.
        In the state (as a dict), the keys represent block names and values represent
        names of blocks below. Value of `None` denotes being above table.
        """
        self.goal_state = goal_state
        self.open = []
        heappush(
            self.open, (-self.heuristic(initial_state), dumps([None, initial_state]))
        )  # pq_item is pair of [parent, child], -ve priority as heapq is min queue
        self.closed = []

    def goal_test(self, state):
        """
        Check to see if blocks are arranged according to goal state.
        """
        for block in state.keys():
            if not state[block] == self.goal_state[block]:
                return False
        return True

    def heuristic(self, state):
        """
        Calculate heuristic value for any particular blocks world state.
        Here, it is the evaluation of correct support structure through parsing state (dict).
        """
        bottom_block_g = [
            b for b in self.goal_state.keys() if self.goal_state[b] is None
        ].pop()

        h = 0
        for key in state.keys():
            i = 0
            bottom_block = pointer = state[key]
            while pointer is not None:
                i += 1
                bottom_block = pointer
                pointer = state[pointer]  # change pointer to block below
            h += i if bottom_block == bottom_block_g else -i

        return h

    def successor(self, state):
        """
        Generate successor states based on current state and production rules.
        """
        free_blocks = [b for b in state.keys() if b not in state.values()]
        table_top_blocks = [b for b in free_blocks if state[b] is None]
        stack_top_blocks = [b for b in free_blocks if state[b] is not None]

        succ = []

        # Move stack top blocks to top of other stacks or to table
        for stb in stack_top_blocks:
            # Move to top of other stacks
            for a_stb in stack_top_blocks:
                if stb == a_stb:
                    continue
                next_state = state.copy()
                next_state.update({stb: a_stb})
                succ.append(next_state)
            # Move to table
            next_state = state.copy()
            next_state.update({stb: None})
            succ.append(next_state)

        # Move table top blocks to top of other free blocks
        for ttb in table_top_blocks:
            # Move to top of other free blocks
            for fb in free_blocks:
                if ttb == fb:
                    continue
                next_state = state.copy()
                next_state.update({ttb: fb})
                succ.append(next_state)

        return succ

    def get_pair_child(self, node_pair_iter):
        """
        Returns list of only children from iterable containing (parent, child) node pairs.
        """
        return [pair[1] for pair in node_pair_iter]

    def heuristic_search(self):
        """
        Heuristic Search using open as priority queue.
        """
        while self.open:
            # Dequeue, add to closed set and check if current node is goal
            node_pair = loads(heappop(self.open)[1])
            self.closed.append(node_pair)
            _, node = node_pair
            if self.goal_test(node):
                return node
            # If current node is not goal, generate successors and add to open queue
            open_node_pairs = [loads(pq_item[1]) for pq_item in self.open]
            extend_list = [
                (
                    -self.heuristic(s),
                    dumps([node, s]),
                )  # -ve priority as heapq is min queue
                for s in self.successor(node)
                if s not in self.get_pair_child(open_node_pairs)
                and s not in self.get_pair_child(self.closed)
            ]
            for pq_item in extend_list:
                heappush(self.open, pq_item)
        # Return None if goal not found
        return None

    def generate_path(self):
        """
        Generate the path from initial state to solution/goal state.
        """
        path = []
        node = self.goal_state
        while node:
            path.append(str(node))
            for parent, child in self.closed:
                if node == child:
                    node = parent
                    break
        path.reverse()
        print("Solution path:", end=" ")
        print(" -> ".join(path))

    def run(self):
        """
        Driver method.
        """
        goal_node = self.heuristic_search()
        if goal_node:
            self.generate_path()
            print(f"Found the goal state {goal_node}")
        else:
            print("Did not find goal state")


bw = BlocksWorld()
bw.run()

Solution path: {'A': 'C', 'B': None, 'C': None} -> {'A': None, 'B': None, 'C': None} -> {'A': None, 'B': 'A', 'C': None} -> {'A': None, 'B': 'A', 'C': 'B'}
Found the goal state {'A': None, 'B': 'A', 'C': 'B'}
