In [None]:
from aimacode.logic import PropKB
from aimacode.planning import Action
from aimacode.search import (
    Node, Problem,
)
from aimacode.utils import expr
from lp_utils import (
    FluentState, encode_state, decode_state,
)
from my_planning_graph import PlanningGraph

from functools import lru_cache

In [238]:
class AirCargoProblem(Problem):
    def __init__(self, cargos, planes, airports, initial, goal):
        """
        :param cargos: list of str
            cargos in the problem
        :param planes: list of str
            planes in the problem
        :param airports: list of str
            airports in the problem
        :param initial: FluentState object
            positive and negative literal fluents (as expr) describing initial state
        :param goal: list of expr
            literal fluents required for goal test
        """
        self.state_map = initial.pos + initial.neg
        self.initial_state_TF = encode_state(initial, self.state_map)
        Problem.__init__(self, self.initial_state_TF, goal=goal)
        self.cargos = cargos
        self.planes = planes
        self.airports = airports
        self.actions_list = self.get_actions()

    def get_actions(self):
        """
        This method creates concrete actions (no variables) for all actions in the problem
        domain action schema and turns them into complete Action objects as defined in the
        aimacode.planning module. It is computationally expensive to call this method directly;
        however, it is called in the constructor and the results cached in the `actions_list` property.

        Returns:
        ----------
        list<Action>
            list of Action objects
        """

        # concrete actions definition: specific literal action that does not include variables as with the schema
        # for example, the action schema 'Load(c, p, a)' can represent the concrete actions 'Load(C1, P1, SFO)'
        # or 'Load(C2, P2, JFK)'.  The actions for the planning problem must be concrete because the problems in
        # forward search and Planning Graphs must use Propositional Logic

        def load_actions():
            """Create all concrete Load actions and return a list

            :return: list of Action objects
            """
            loads = []
            for c in self.cargos:
                for p in self.planes:
                    for a in self.airports:

                        precond_pos = [expr('At({}, {})'.format(c, a)),
                                       expr('At({}, {})'.format(p, a))]
                        precond_neg = []

                        effect_add = [expr('In({}, {})'.format(c, p))]
                        effect_rm =  [expr('At({}, {})'.format(c, a))]

                        load = Action(expr('Load({}, {}, {})'.format(c, p, a)),
                                      [precond_pos, precond_neg],
                                      [effect_add, effect_rm])

                        loads.append(load)

            return loads

        def unload_actions():
            """Create all concrete Unload actions and return a list

            :return: list of Action objects
            """
            unloads = []
            for c in self.cargos:
                for p in self.planes:
                    for a in self.airports:

                        precond_pos = [expr('In({}, {})'.format(c, p)),
                                       expr('At({}, {})'.format(p, a))]
                        precond_neg = []

                        effect_add = [expr('At({}, {})'.format(c, a))]
                        effect_rm =  [expr('In({}, {})'.format(c, p))]

                        unload = Action(expr('Unload({}, {}, {})'.format(c, p, a)),
                                      [precond_pos, precond_neg],
                                      [effect_add, effect_rm])

                        unloads.append(unload)

            return unloads

        def fly_actions():
            """Create all concrete Fly actions and return a list

            :return: list of Action objects
            """
            flys = []
            for fr in self.airports:
                for to in self.airports:
                    if fr != to:
                        for p in self.planes:
                            precond_pos = [expr("At({}, {})".format(p, fr)),
                                           ]
                            precond_neg = []
                            effect_add = [expr("At({}, {})".format(p, to))]
                            effect_rem = [expr("At({}, {})".format(p, fr))]
                            fly = Action(expr("Fly({}, {}, {})".format(p, fr, to)),
                                         [precond_pos, precond_neg],
                                         [effect_add, effect_rem])
                            flys.append(fly)
            return flys

        return load_actions() + unload_actions() + fly_actions()

    def actions(self, state):
        """ Return the actions that can be executed in the given state.

        :param state: str
            state represented as T/F string of mapped fluents (state variables)
            e.g. 'FTTTFF'
        :return: list of Action objects
        """
        possible_actions = []

        # First, create the fluent state which is mapped to this boolean string representation
        fluent_state = FluentState([], [])
        possible_states = self.state_map
        for i, s in enumerate(possible_states):
            if state[i] == 'T':
                fluent_state.pos.append(s)
            else:
                fluent_state.neg.append(s)
        
        # Now, find all actions that their precondition can be met by the fluent_state
        for action in self.actions_list:
            pos_overlap = [x for x in action.precond_pos if x in fluent_state.pos]
            neg_overlap = [x for x in action.precond_neg if x in fluent_state.neg]
            if pos_overlap == action.precond_pos and neg_overlap == action.precond_neg:
                possible_actions.append(action)

        return possible_actions

    def result(self, state, action):
        """ Return the state that results from executing the given
        action in the given state. The action must be one of
        self.actions(state).

        :param state: state entering node
        :param action: Action applied
        :return: resulting state after action
        """
        new_state = FluentState([], [])
    
        possible_actions = self.actions(state)
        valid_action = [x for x in possible_actions 
                       if x.name == action.name 
                       and x.args == action.args]
        if valid_action:
        
            # First, create the fluent state which is mapped to this boolean string representation
            possible_states = self.state_map
            init_fluent_state = decode_state(state, possible_states)

            init_pos = init_fluent_state.pos
            init_neg = init_fluent_state.neg

            eff_add = action.effect_add
            eff_rm = action.effect_rem

            # Added effects must be:
            #    1. Added to init_pos
            #    2. Removed from init_neg
            # Removed effects must be:
            #    1. Added to init_neg
            #    2. Removed from init_pos

            result_pos = list( set(init_pos + eff_add) - set(eff_rm) )
            result_neg = list( set(init_neg + eff_rm) - set(eff_add) )

            new_state.pos = result_pos
            new_state.neg = result_neg

        return encode_state(new_state, self.state_map)

    def goal_test(self, state):
        """ Test the state to see if goal is reached

        :param state: str representing state
        :return: bool
        """
        kb = PropKB()
        kb.tell(decode_state(state, self.state_map).pos_sentence())
        for clause in self.goal:
            if clause not in kb.clauses:
                return False
        return True

    def h_1(self, node):
        # note that this is not a true heuristic
        h_const = 1
        return h_const

    @lru_cache(maxsize=8192)
    def h_pg_levelsum(self, node):
        """This heuristic uses a planning graph representation of the problem
        state space to estimate the sum of all actions that must be carried
        out from the current state in order to satisfy each individual goal
        condition.
        """
        # requires implemented PlanningGraph class
        pg = PlanningGraph(self, node.state)
        pg_levelsum = pg.h_levelsum()
        return pg_levelsum

    @lru_cache(maxsize=8192)
    def h_ignore_preconditions(self, node):
        """This heuristic estimates the minimum number of actions that must be
        carried out from the current state in order to satisfy all of the goal
        conditions by ignoring the preconditions required for an action to be
        executed.
        """
        # TODO implement (see Russell-Norvig Ed-3 10.2.3  or Russell-Norvig Ed-2 11.2)
        count = 0
        return count


def air_cargo_p1():

    cargos = ['C1', 'C2']
    planes = ['P1', 'P2']
    airports = ['JFK', 'SFO']

    pos = [expr('At(C1, SFO)'),
           expr('At(C2, JFK)'),
           expr('At(P1, SFO)'),
           expr('At(P2, JFK)'),
           ]

    neg = [expr('At(C2, SFO)'),
           expr('In(C2, P1)'),
           expr('In(C2, P2)'),
           expr('At(C1, JFK)'),
           expr('In(C1, P1)'),
           expr('In(C1, P2)'),
           expr('At(P1, JFK)'),
           expr('At(P2, SFO)'),
           ]

    init = FluentState(pos, neg)

    goal = [expr('At(C1, JFK)'),
            expr('At(C2, SFO)'),
            ]

    return AirCargoProblem(cargos, planes, airports, init, goal)


def air_cargo_p2():

    cargos = ['C1', 'C2', 'C3']
    planes = ['P1', 'P2', 'P3']
    aiports = ['SFO', 'JFK', 'ATL']

    pos = [expr('At(P1, SFO)'),
           expr('At(P2, JFK)'),
           expr('At(P3, ATL)'),
           expr('At(C1, SFO)'),
           expr('At(C2, JFK)'),
           expr('At(C3, ATL)'),
           ]

    neg = [expr('In(C1, P1)'),
           expr('In(C1, P2)'),
           expr('In(C1, P3)'),
           expr('At(C1, JFK)'),
           expr('At(C1, ATL)'),
           expr('In(C2, P1)'),
           expr('In(C2, P2)'),
           expr('In(C2, P3)'),
           expr('At(C2, SFO)'),
           expr('At(C2, ATL)'),
           expr('In(C3, P1)'),
           expr('In(C3, P2)'),
           expr('In(C3, P3)'),
           expr('At(C3, JFK)'),
           expr('At(C3, SFO)'),
           expr('At(P1, JFK)'),
           expr('AT(P1, ATL)'),
           expr('At(P2, SFO)'),
           expr('AT(P2, ATL)'),
           expr('At(P3, JFK)'),
           expr('AT(P3, SFO)'),
           ]

    init = FluentState(pos, neg)

    goal = [expr('At(C1, JFK)'),
            expr('At(C2, SFO)'),
            expr('At(C3, SFO)'),
            ]

    return AirCargoProblem(cargos,planes, aiports, init, goal)


def air_cargo_p3():

    cargos = ['C1', 'C2', 'C3', 'C4']
    planes = ['P1', 'P2']
    aiports = ['SFO', 'JFK', 'ATL', 'ORD']

    pos = [expr('At(P1, SFO)'),
           expr('At(P2, JFK)'),
           expr('At(C1, SFO)'),
           expr('At(C2, JFK)'),
           expr('At(C3, ATL)'),
           expr('At(C4, ORD)')
           ]

    neg = [expr('In(C1, P1)'),
           expr('In(C1, P2)'),
           expr('At(C1, JFK)'),
           expr('At(C1, ATL)'),
           expr('At(C1, ORD)'),
           expr('In(C2, P1)'),
           expr('In(C2, P2)'),
           expr('At(C2, SFO)'),
           expr('At(C2, ATL)'),
           expr('At(C2, ORD)'),
           expr('In(C3, P1)'),
           expr('In(C3, P2)'),
           expr('At(C3, JFK)'),
           expr('At(C3, SFO)'),
           expr('At(C3, ORD)'),
           expr('In(C4, P1)'),
           expr('In(C4, P2)'),
           expr('At(C4, JFK)'),
           expr('At(C4, SFO)'),
           expr('At(C4, ATL)'),
           expr('At(P1, JFK)'),
           expr('AT(P1, ATL)'),
           expr('AT(P1, ORD)'),
           expr('At(P2, SFO)'),
           expr('AT(P2, ATL)'),
           expr('AT(P2, ORD)')
           ]

    init = FluentState(pos, neg)

    goal = [expr('At(C1, JFK)'),
            expr('At(C2, SFO)'),
            expr('At(C3, JFK)'),
            expr('At(C4, SFO)'),
            ]

    return AirCargoProblem(cargos,planes, aiports, init, goal)

In [239]:
p1 = air_cargo_p1()

action = Action(
    expr('Load(C1, P1, SFO)'),
    [[expr('At(C1, SFO)'), expr('At(P1, SFO)')], []],
    [[expr('In(C1, P1)')], [expr('At(C1, SFO)')]]
    )

In [240]:
a = p1.result(p1.initial, action)

[In(C1, P1), At(P2, JFK), At(P1, SFO), At(C2, JFK)] [At(P1, JFK), In(C2, P1), In(C1, P2), At(C1, SFO), At(C2, SFO), In(C2, P2), At(C1, JFK), At(P2, SFO)] FTTTFFFFTFFF


In [230]:
a

'FFFFFFFFTFFF'

In [231]:
    def test_AC_result(self):
        fs = decode_state(self.p1.result(self.p1.initial, self.act1), self.p1.state_map)
        self.assertTrue(expr('In(C1, P1)') in fs.pos)
        self.assertTrue(expr('At(C1, SFO)') in fs.neg)

In [219]:
import os
import sys
from aimacode.planning import Action
from aimacode.utils import expr
from aimacode.search import Node
import unittest
from lp_utils import decode_state
from my_air_cargo_problems import (
    air_cargo_p1, air_cargo_p2, air_cargo_p3,
)

class TestAirCargoProb1(unittest.TestCase):

    def setUp(self):
        self.p1 = air_cargo_p1()

    def test_ACP1_num_fluents(self):
        self.assertEqual(len(self.p1.initial), 12)

    def test_ACP1_num_requirements(self):
        self.assertEqual(len(self.p1.goal),2)


class TestAirCargoProb2(unittest.TestCase):

    def setUp(self):
        self.p2 = air_cargo_p2()

    def test_ACP2_num_fluents(self):
        self.assertEqual(len(self.p2.initial), 27)

    def test_ACP2_num_requirements(self):
        self.assertEqual(len(self.p2.goal),3)


class TestAirCargoProb3(unittest.TestCase):

    def setUp(self):
        self.p3 = air_cargo_p3()

    def test_ACP3_num_fluents(self):
        self.assertEqual(len(self.p3.initial), 32)

    def test_ACP3_num_requirements(self):
        self.assertEqual(len(self.p3.goal),4)


class TestAirCargoMethods(unittest.TestCase):

    def setUp(self):
        self.p1 = air_cargo_p1()
        self.act1 = Action(
            expr('Load(C1, P1, SFO)'),
            [[expr('At(C1, SFO)'), expr('At(P1, SFO)')], []],
            [[expr('In(C1, P1)')], [expr('At(C1, SFO)')]]
        )

    def test_AC_get_actions(self):
        # to see a list of the actions, uncomment below
        # print("\nactions for problem")
        # for action in self.p1.actions_list:
        #     print("{}{}".format(action.name, action.args))
        self.assertEqual(len(self.p1.actions_list), 20)

    def test_AC_actions(self):
        # to see list of possible actions, uncomment below
        # print("\npossible actions:")
        # for action in self.p1.actions(self.p1.initial):
        #     print("{}{}".format(action.name, action.args))
        self.assertEqual(len(self.p1.actions(self.p1.initial)), 4)

    def test_AC_result(self):
        fs = decode_state(self.p1.result(self.p1.initial, self.act1), self.p1.state_map)
        self.assertTrue(expr('In(C1, P1)') in fs.pos)
        self.assertTrue(expr('At(C1, SFO)') in fs.neg)

    def test_h_ignore_preconditions(self):
        n = Node(self.p1.initial)
        self.assertEqual(self.p1.h_ignore_preconditions(n),2)

#if __name__ == '__main__':
#    unittest.main()


In [220]:
tests = TestAirCargoMethods()

In [221]:
tests.setUp()

In [222]:
tests.test_AC_result()

AssertionError: False is not true

In [102]:
unittest.TestCase

unittest.case.TestCase