In [42]:
from typing import Any, List, Generator, Union, Optional
import re




In [43]:

class Expr:
    """Represents logical expressions using operators and arguments."""

    def __init__(self, op: Union[str, int], *args: Any) -> None:
        self.op = op
        self.args = list(map(Expr.create_expression, args)) ## Coerce args to Exprs

    @staticmethod
    def create_expression(s: Union[str, int]) -> "Expr":
        if isinstance(s, Expr):
            return s
        if s.isnumeric():
            return Expr(s)
        s = s.replace("==>", ">>").replace("<==", "<<").replace("<=>", "%").replace("=/=", "^")
        s = re.sub(r'([a-zA-Z0-9_.]+)', r'Expr("\1")', s)
        return eval(s, {"Expr": Expr})
    
    def __call__(self, *args):
        """Self must be a symbol with no args, such as Expr('F').  Create a new
        Expr with 'F' as op and the args as arguments."""
        assert Logic().is_symbol(self.op) and not self.args
        return Expr(self.op, *args)

    def __repr__(self):
        "Show something like 'P' or 'P(x, y)', or '~P' or '(P | Q | R)'"
        if not self.args:         # Constant or proposition with arity 0
            return str(self.op)
        elif Logic().is_symbol(self.op):  # Functional or propositional operator
            return '%s(%s)' % (self.op, ', '.join(map(repr, self.args)))
        elif len(self.args) == 1: # Prefix operator
            return self.op + repr(self.args[0])
        else:                     # Infix operator
            return '(%s)' % (' '+self.op+' ').join(map(repr, self.args))

    def __eq__(self, other):
        """x and y are equal iff their ops and args are equal."""
        return (other is self) or (isinstance(other, Expr)
            and self.op == other.op and self.args == other.args)

    def __ne__(self, other):
        return not self.__eq__(other)

    def __hash__(self):
        "Need a hash method so Exprs can live in dicts."
        return hash(self.op) ^ hash(tuple(self.args))
    
    
    def __lt__(self, other):     return Expr('<',  self, other)
    def __le__(self, other):     return Expr('<=', self, other)
    def __ge__(self, other):     return Expr('>=', self, other)
    def __gt__(self, other):     return Expr('>',  self, other)
    def __add__(self, other):    return Expr('+',  self, other)
    def __sub__(self, other):    return Expr('-',  self, other)
    def __and__(self, other):    return Expr('&',  self, other)
    def __div__(self, other):    return Expr('/',  self, other)
    def __truediv__(self, other):return Expr('/',  self, other)
    def __invert__(self):        return Expr('~',  self)
    def __lshift__(self, other): return Expr('<<', self, other)
    def __rshift__(self, other): return Expr('>>', self, other)
    def __mul__(self, other):    return Expr('*',  self, other)
    def __neg__(self):           return Expr('-',  self)
    def __or__(self, other):     return Expr('|',  self, other)
    def __pow__(self, other):    return Expr('**', self, other)
    def __xor__(self, other):    return Expr('^',  self, other)
    def __mod__(self, other):    return Expr('<=>',  self, other)



In [44]:
from collections import deque

class Logic:
    """A class for logical operations and conversions."""

    TRUE, FALSE, ZERO, ONE, TWO = map(Expr, ["TRUE", "FALSE", 0, 1, 2])
    A, B, C, D, E, F, G, P, Q, x, y, z = map(Expr, "ABCDEFGPQxyz")
    _op_identity = {"&": TRUE, "|": FALSE, "+": ZERO, "*": ONE}
    


    def is_symbol(self, s: str) -> bool:
        """Returns True if the string is a symbol."""
        return isinstance(s, str) and s[:1].isalpha()

    def is_var_symbol(self, s: str) -> bool:
        """Returns True if the string is a variable symbol."""
        return self.is_symbol(s) and s[0].islower()

    def is_prop_symbol(self, s: str) -> bool:
        """Returns True if the string is a proposition symbol."""
        return self.is_symbol(s) and s[0].isupper() and s != "TRUE" and s != "FALSE"

    def variables(self, s: "Expr") -> set:
        """Returns a set of variables in the expression."""
        result = set([])

        def walk(s):
            if self.is_var_symbol(s):
                result.add(s)
            else:
                for arg in s.args:
                    walk(arg)

        walk(s)
        return result

    def tt_entails(self, kb: "Expr", alpha: "Expr") -> bool:
        """Does kb entail the sentence alpha? Use truth tables. For propositional
        kb's and sentences."""
        # print("TT_ENTAILS", kb, alpha)
        assert not self.variables(alpha)
        return self.tt_check_all(kb, alpha, self.prop_symbols(kb & alpha), {})

    def tt_check_all(
        self, kb: "Expr", alpha: "Expr", symbols: deque, model: dict
    ) -> bool:
        """Auxiliary routine to implement tt_entails."""
        # print("TT_CHECK_ALL", kb, alpha, symbols, model)
        if not symbols:
            if self.evaluate(kb, model):
                result = self.evaluate(alpha, model)
                assert result in (True, False)
                return result
            else:
                return True
        else:
            # P, rest = symbols[0], symbols[1:]
            P = symbols.popleft()
            rest = symbols
            # print("p", P, "rest", rest)
            current_key_value = model[P] if P in model else None
            model[P] = True
            value_true = self.tt_check_all(kb, alpha, rest, model)
            model[P] = False
            value_false = self.tt_check_all(kb, alpha, rest, model)
            if not current_key_value:
                del model[P]
            else:
                model[P] = current_key_value
            return value_true and value_false

    def prop_symbols(self, x: "Expr") -> deque:
        """Return a set of all propositional symbols in x."""
        if not isinstance(x, Expr):
            return deque([])
        elif self.is_prop_symbol(x.op):
            return deque([x])
        else:
            return deque(set(symbol for arg in x.args for symbol in self.prop_symbols(arg)))

    def evaluate(self, exp: "Expr", model: dict = {}) -> Optional[bool]:
        """Return True if the propositional logic expression is true in the model,
        and False if it is false. If the model does not specify the value for
        every proposition, this may return None to indicate 'not obvious';
        this may happen even when the expression is tautological."""
        op, args = exp.op, exp.args
        
        # If the expression is a constant, return its value
        if exp in (self.TRUE, self.FALSE):
            return exp == self.TRUE

        # If the expression is a proposition symbol, return its value
        if self.is_prop_symbol(op):
            return model.get(exp)
        
        # Evaluate complex expressions
        if op == "~": # negation
            p = self.evaluate(args[0], model)
            return not p if p is not None else None
        elif op == "|": # disjunction
            result = False
            for arg in args:
                p = self.evaluate(arg, model)
                if p is True:
                    return True
                if p is None:
                    result = None
            return result
        elif op == "&": # conjunction
            result = True
            for arg in args:
                p = self.evaluate(arg, model)
                if p is False:
                    return False
                if p is None:
                    result = None
            return result
        
        # At this point, the operator represents a binary operation
        # Split the expression into a list of clauses
        p, q = args
        
        if op == ">>": # implication
            return self.evaluate(~p | q, model)
        elif op == "<<": # reverse implication
            return self.evaluate(p | ~q, model)
        
        # If the operator is not an implication, evaluate the expression
        # Get the truth values of the two propositions
        pt = self.evaluate(p, model)
        qt = self.evaluate(q, model) 
        
        # If the truth values are not known, return None
        if not pt or not qt:
            return None
        
        # Evaluate the expression
        if op == "<=>": # biconditional
            return pt == qt
        elif op == "^": # exclusive or
            return pt != qt
        else:
            raise ValueError("illegal operator in logic expression" + str(exp))

    def to_cnf(self, s: "Expr") -> "Expr":
        """Converts a propositional logical sentence s to conjunctive normal form."""
        if isinstance(s, str):
            s = Expr.create_expression(s)
        s = self.eliminate_implications(s)
        s = self.demorgans_law(s)
        return self.distribute_and_over_or(s)

    def eliminate_implications(self, s: "Expr") -> "Expr":
        """Change >>, <<, and <=> into &, |, and ~."""
        if not s.args or self.is_symbol(s.op):
            return s
        args = list(map(self.eliminate_implications, s.args))
        a, b = args[0], args[-1]
        if s.op == ">>":
            return b | ~a
        elif s.op == "<<":
            return a | ~b
        elif s.op == "<=>":
            return (a | ~b) & (b | ~a)
        elif s.op == "^":
            assert len(args) == 2
            return (a & ~b) | (~a & b)
        else:
            assert s.op in ("&", "|", "~")
            return Expr(s.op, *args)
    
    def demorgans_law(self, s: "Expr") -> "Expr":
        """Rewrite sentence s by moving negation sign inward."""
        if s.op == "~":
            NOT = lambda b: self.demorgans_law(~b)
            a = s.args[0]
            if a.op == "~":
                return self.demorgans_law(a.args[0])
            if a.op == "&":
                return self.associate("|", map(NOT, a.args))
            if a.op == "|":
                return self.associate("&", map(NOT, a.args))
            return s
        elif self.is_symbol(s.op) or not s.args:
            return s
        else:
            return Expr(s.op, *map(self.demorgans_law, s.args))

    def distribute_and_over_or(self, s: "Expr") -> "Expr":
        """Given a sentence s consisting of conjunctions and disjunctions of literals,
        return an equivalent sentence in CNF."""
        if s.op == "|":
            s = self.associate("|", s.args)
            if s.op != "|":
                return self.distribute_and_over_or(s)
            if len(s.args) == 0:
                return self.FALSE
            if len(s.args) == 1:
                return self.distribute_and_over_or(s.args[0])
            conj = next((d for d in s.args if d.op == "&"), None)
            if not conj:
                return s
            others = [a for a in s.args if a is not conj]
            rest = self.associate("|", others)
            return self.associate(
                "&", [self.distribute_and_over_or(c | rest) for c in conj.args]
            )
        elif s.op == "&":
            return self.associate("&", map(self.distribute_and_over_or, s.args))
        else:
            return s

    def associate(self, op: str, args: List["Expr"]) -> "Expr":
        """Given an associative op, return an expression with the same meaning as
        Expr(op, *args), but flattened."""
        args = self.split_by_op(op, args)
        if len(args) == 0:
            return self._op_identity[op]
        elif len(args) == 1:
            return args[0]
        else:
            return Expr(op, *args)
        
    
    def conjuncts(self, s: "Expr") -> List["Expr"]:
        """Return a list of the conjuncts in the sentence s."""
        return self.split_by_op("&", [s])
    
    def disjuncts(self, s: "Expr") -> List["Expr"]:
        """Return a list of the disjuncts in the sentence s."""
        return self.split_by_op("|", [s])

    def split_by_op(self, op: str, args: List["Expr"]) -> List["Expr"]:
        """Given an associative op, return a flattened list result such that
        Expr(op, *result) means the same as Expr(op, *args).
        
        EJ: for op = "&" and args = [A, B, Expr("|", C, D), Expr("&", E, F)]
        result = [A, B, Expr("|", C, D), E, F]
        
        """
        result = []

        def collect(subargs):
            for arg in subargs:
                if arg.op == op:
                    collect(arg.args)
                else:
                    result.append(arg)

        collect(args)
        return result


    
    

In [45]:


class KnowledgeBase:
    """A base class for Knowledge Base (KB) systems."""
    def __init__(self):
        self.clauses: List[Expr] = []

    def ask(self, query: Any) -> Union[dict, bool]:
        """Returns a substitution that makes the query true, or False if not found."""
        # print("ASK", query)
        for _ in self.ask_generator(query):
            return True if len(_) > 0 else False
        return False

    def tell(self, sentence: "Expr") -> None:
        """Adds clauses of a sentence to the KB."""
        cnf_sentence = Logic().to_cnf(sentence)
        conjuncts = Logic().conjuncts(cnf_sentence)
        self.clauses.extend(conjuncts)


    def ask_generator(self, query: "Expr") -> Generator[dict, None, None]:
        """Yields an empty substitution if the KB implies the query."""
        if Logic().tt_entails(Expr("&", *self.clauses), query):
            yield {}

    def retract(self, sentence: "Expr") -> None:
        """Removes clauses of a sentence from the KB."""
        
        for clause in Logic().conjuncts(Logic().to_cnf(sentence)):
            if clause in self.clauses:
                self.clauses.remove(clause)


In [47]:

class LogicAgent:
    def __init__(self):
        self.KB = KnowledgeBase()
        self.Logic = Logic()
        self.DELTAS = [(0, 1), (1, 0), (0, -1), (-1, 0)]
        
    def safe_neighbors(self, position):
        neighbours = self.get_neighbors(position)        
        safe_neighbors = set()
        
        for neighbor in neighbours:
            x, y = neighbor
            
            if x < 0 or y < 0:
                continue
            
            # Current Neighbor
            loc = f"{x}_{y}"
            
            # Add current location spot to the safe_spots
            is_loc = self.KB.ask(Expr.create_expression(f"L{loc}"))
            if is_loc:
                safe_neighbors.add(neighbor)
            
            # print(f"Safe Neighbors: {safe_neighbors} after checking for location")
            isnt_pit = self.KB.ask(Expr.create_expression(f"~P{loc}"))
            isnt_wumpus = self.KB.ask(Expr.create_expression(f"~W{loc}"))
            if isnt_pit and isnt_wumpus:
                safe_neighbors.add(neighbor)
            
            # print("Current KB: ", self.KB.clauses)
            # print(f"Current neighbor: {neighbor} - {loc} - isnt_pit: {isnt_pit} - isnt_wumpus: {isnt_wumpus} is_loc: {is_loc}")
        return safe_neighbors                
        
    def not_unsafe_neighbors(self, position: tuple):
        neighbours = self.get_neighbors(position)
        not_unsafe_neighbors = set()
        
        for neighbor in neighbours:
            x, y = neighbor
            
            if x < 0 or y < 0:
                continue
            
            # Current Neighbor
            loc = f"{x}_{y}"
            
            is_loc = self.KB.ask(Expr.create_expression(f"L{loc}"))
            if not is_loc:
                not_unsafe_neighbors.add(neighbor)
                
            
            # Not a pit or not a wumpus at current location
            # print("is_wumpus: ", f"W{loc}")
            is_wumpus = self.KB.ask(Expr.create_expression(f"W{loc}"))
            is_pit = self.KB.ask(Expr.create_expression(f"P{loc}"))
            if not is_wumpus and not is_pit:
                not_unsafe_neighbors.add(neighbor)
            
            # print(f"Current neighbor: {neighbor} - {loc} - is_wumpus: {is_wumpus} - is_pit: {is_pit} is_loc: {is_loc}")


        return not_unsafe_neighbors
        
        
    def get_neighbors(self, position):
        return [(position[0] + dx, position[1] + dy) for dx, dy in self.DELTAS if 0 <= position[0] + dx and 0 <= position[1] + dy]
    
    def unvisited(self, position):
        """
        Use logic to determine the set of locations I have visited.
        """
        # for testing, also ask for current location
        result = set()
        neighbours = self.get_neighbors(position)
        for x, y in neighbours:
            if x < 0 or y < 0:
                continue
            
            is_loc = self.KB.ask(Expr.create_expression(f"L{x}_{y}"))
            # print(f"KB is {self.KB.clauses}")
            # print(f"Checking if {x}_{y} is a location: {is_loc}, but we need not is_loc, so: {not is_loc}")
            if not is_loc:
                result.add((x, y))
        
        return result
    
    def get_next_cell(self, position):
        # print(f"Current Position: {position}")  
        unvisited_cells = self.unvisited(position)
        # print(f"Unvisited Cells: {unvisited_cells}")
        
        safe_cells = self.safe_neighbors(position)
        # print(f"Safe Cells: {safe_cells}")
        
        safe_cells = safe_cells.intersection(unvisited_cells)
        
        # print(f"Safe Unvisited Cells: {safe_cells}")
        
        if safe_cells:
            next_cell = min(safe_cells)
        else:
            # print("No safe cells, checking for not unsafe cells")
            not_unsafe_cells = self.not_unsafe_neighbors(position)
            # print(f"Not unsafe Cells: {not_unsafe_cells}")
            not_unsafe_cells = not_unsafe_cells.intersection(unvisited_cells)
            if not_unsafe_cells:
                next_cell = min(not_unsafe_cells)
            else:
                raise Exception("Nowhere left to go")

        return next_cell

class WumpusAgent(LogicAgent):
    def __init__(self, initial_position=(0, 0)):
        # Initialize the agent
        super().__init__()
        
        # Constants
        self.ORIENTATIONS = ['NORTH', 'EAST', 'SOUTH', 'WEST']
        self.MOVEMENTS = {'NORTH': (0, 1), 'EAST': (1, 0), 'SOUTH': (0, -1), 'WEST': (-1, 0)}
        
        # Initialize agent state
        self.initial_position = initial_position
        self.position = initial_position
        self.orientation = 'EAST'
        self.has_gold = False
        self.has_arrow = True
        self.t = 0 # Time step
        self.is_alive = True
        self.score = 0
        self.wumpus_alive = True


    def make_action_sentence(self, action):
        return f"Executed({action}| {self.position}| {self.orientation}| {self.t})"
    
    def make_clause(self, position, symbol):
        return f"{symbol}{position[0]}_{position[1]}"
    
    def make_neighbor_clause(self, position, symbol, consecuence_symbol, negation=False):
        conjunction = "&" if negation else "|"
        x, y = position
        
        clause = f"{symbol}{x}_{y} <=> ("
        neighbors = self.get_neighbors(position)
        for n in neighbors:
            clause += f" {consecuence_symbol}{n[0]}_{n[1]} {conjunction}"
        clause = clause[:-1] + ")"
        return clause
 
    def make_percept_knowledge(self, percept, t):
        # Create sentences based on the percept
        percept_sentences = []
        
        
        # Scream
        if 'Scream' in percept:
            self.wumpus_alive = False
            
        # New perceptions
        # The obvious ones
        # 1. Current location
        percept_sentences.append(self.make_clause(self.position, 'L'))
        
        # 2. No pit at current location
        percept_sentences.append(self.make_clause(self.position, '~P'))
        
        # 3. No wumpus at current location
        percept_sentences.append(self.make_clause(self.position, '~W'))
        
        # Breeze
        if 'Breeze' in percept:
            breeze = self.make_clause(self.position, 'B')
            neighbor_clause = self.make_neighbor_clause(self.position, 'B', 'P')
        else:
            breeze = self.make_clause(self.position, '~B')
            neighbor_clause = self.make_neighbor_clause(self.position, '~B', '~P', negation=True)
        percept_sentences.append(breeze)
        percept_sentences.append(neighbor_clause)
        
        # Stench
        if 'Stench' in percept:
            stench = self.make_clause(self.position, 'S')
            neighbor_clause = self.make_neighbor_clause(self.position, 'S', 'W')
        else:
            stench = self.make_clause(self.position, '~S')
            neighbor_clause = self.make_neighbor_clause(self.position, '~S', '~W', negation=True)
        percept_sentences.append(stench)
        percept_sentences.append(neighbor_clause)
        
        # Glitter
        if 'Glitter' in percept:
            percept_sentences.append(self.make_clause(self.position, 'G'))
        
        return percept_sentences
    
    def go_to(self, goal):
        self.position = goal
    
    def act(self, percept):
        # Update time step
        self.t += 1
        
        # Get the next action
        knowledge = self.make_percept_knowledge(percept, self.t)
        
        # If we feel a bump, return to the previous position
        if 'Bump' in percept:
            self.KB.tell(f"L{self.position[0]}_{self.position[1]}")
            self.KB.tell(f"N{self.position[0]}_{self.position[1]}")
            self.position = (self.position[0] - self.MOVEMENTS[self.orientation][0], self.position[1] - self.MOVEMENTS[self.orientation][1])
            return self.make_action_sentence('Return')
        
        # Check if there is a Glitter 
        if 'Glitter' in percept and not self.has_gold:
            self.has_gold = True
            self.KB.tell(Expr.create_expression(f"G{self.position[0]}_{self.position[1]}"))
            return self.make_action_sentence('Grab')
        
        # If we have gold we need to go back to the start
        if self.has_gold and self.position == self.initial_position:
            return self.make_action_sentence('Climb')
        elif self.has_gold:
            self.go_to(self.initial_position)
            return self.make_action_sentence('Forward')
        
        # Update knowledge base
        for sentence in knowledge:
            self.KB.tell(sentence)
        
        # Ask the knowledge base for the next action
        next_cell = self.get_next_cell(self.position)
        
        # Change the orientation and move to the next cell
        dx, dy = next_cell[0] - self.position[0], next_cell[1] - self.position[1]
        
        # Locate the orientation
        new_orientation = [k for k, v in self.MOVEMENTS.items() if v == (dx, dy)][0]
        self.orientation = new_orientation
        
        # Move to the next cell
        self.position = next_cell
        
        # Return the action
        return self.make_action_sentence('Forward')

In [48]:
# Utils

def get_position_string(action):
    actions = action.split("|")
    position_string = actions[1].strip() # (x, y)
    positions = position_string.split(",") # ['(x', 'y)']
    positions = list(map(lambda x: x.strip(), positions)) # ['(x', 'y)']
    x, y = int(positions[0][1:]), int(positions[1][:-1])
    return x, y

def get_last_action(action):
    actions = action.split("|")
    actions = actions[0].split("(")
    return actions[1].strip()

def define_starting_clauses(world):
    def or_clause(position, first_symbol, consecuence_symbol, positions):
        clause = f"{first_symbol}{position[0]}_{position[1]} <=> ("
        for position in positions:
            clause += f" {consecuence_symbol}{position[0]}_{position[1]} |"
        clause = clause[:-1] + ")"
        return clause
    
    def and_clause(position, first_symbol, consecuence_symbol, positions):
        clause = f"{first_symbol}{position[0]}_{position[1]} <=> ("
        for position in positions:
            clause += f" {consecuence_symbol}{position[0]}_{position[1]} &"
        clause = clause[:-1] + ")"
        return clause
    
    initial_clauses = []
    for i in range(world.grid_size):
        for j in range(world.grid_size):
            B = or_clause((i, j), "B", "P", world.get_nearby_cells((i, j)))
            nB = and_clause((i, j), "~B", "~P", world.get_nearby_cells((i, j)))
            S = or_clause((i, j), "S", "W", world.get_nearby_cells((i, j)))
            nS = and_clause((i, j), "~S", "~W", world.get_nearby_cells((i, j)))
            
            initial_clauses = initial_clauses + [B, nB, S, nS]
    
    return initial_clauses
                

['B0_0 <=> ( P0_1 | P1_0 )', '~B0_0 <=> ( ~P0_1 & ~P1_0 )', 'S0_0 <=> ( W0_1 | W1_0 )', '~S0_0 <=> ( ~W0_1 & ~W1_0 )', 'B0_1 <=> ( P0_2 | P1_1 | P0_0 )', '~B0_1 <=> ( ~P0_2 & ~P1_1 & ~P0_0 )', 'S0_1 <=> ( W0_2 | W1_1 | W0_0 )', '~S0_1 <=> ( ~W0_2 & ~W1_1 & ~W0_0 )', 'B0_2 <=> ( P1_2 | P0_1 )', '~B0_2 <=> ( ~P1_2 & ~P0_1 )', 'S0_2 <=> ( W1_2 | W0_1 )', '~S0_2 <=> ( ~W1_2 & ~W0_1 )', 'B1_0 <=> ( P1_1 | P2_0 | P0_0 )', '~B1_0 <=> ( ~P1_1 & ~P2_0 & ~P0_0 )', 'S1_0 <=> ( W1_1 | W2_0 | W0_0 )', '~S1_0 <=> ( ~W1_1 & ~W2_0 & ~W0_0 )', 'B1_1 <=> ( P1_2 | P2_1 | P1_0 | P0_1 )', '~B1_1 <=> ( ~P1_2 & ~P2_1 & ~P1_0 & ~P0_1 )', 'S1_1 <=> ( W1_2 | W2_1 | W1_0 | W0_1 )', '~S1_1 <=> ( ~W1_2 & ~W2_1 & ~W1_0 & ~W0_1 )', 'B1_2 <=> ( P2_2 | P1_1 | P0_2 )', '~B1_2 <=> ( ~P2_2 & ~P1_1 & ~P0_2 )', 'S1_2 <=> ( W2_2 | W1_1 | W0_2 )', '~S1_2 <=> ( ~W2_2 & ~W1_1 & ~W0_2 )', 'B2_0 <=> ( P2_1 | P1_0 )', '~B2_0 <=> ( ~P2_1 & ~P1_0 )', 'S2_0 <=> ( W2_1 | W1_0 )', '~S2_0 <=> ( ~W2_1 & ~W1_0 )', 'B2_1 <=> ( P2_2 | P2_0

KeyboardInterrupt: 