# Overview

Our project revolves around PDDLs. In this document, we demonstrate how we represent PDDLs and our implemented solution

## State
A state is represented by a Python dictionary. The key should be a string representing the fluent, and its corresponding value should be an Integer – $1$ or $0$ representing **True** or **False**.

### Example
Below is an example of a state of a person who is not hungry.

In [4]:
state = {'person': 1, 'hungry': 0}

## Action class
The action class represents an action in a PDDL. An action object takes four arguments
1. Preconditions: the preconditions for the action in the form of a dictionary
2. Effects: the applied effects of the action
3. Name: the name of the action
4. Cost: the cost for taking the action (used for A* algorithm) 

In [5]:
class Action:
    pre = {} #preconditions
    eff={} #effects
    cost=0
    name = ""
    def __init__(self, p, e, c, n):
        self.pre = p
        self.eff = e
        self.name = n
        self.cost = c
        
    def formatFluents(self, d):
        string = ""
        for k in d.keys():
            if(d[k]==0):
                string += "\n- " + k
            else:
                string += "\n+ " + k
        return string

    def __str__(self):
        return f'Name: {self.name} \nCost: {self.cost} \nPreconditions: {self.formatFluents(self.pre)} \nEffects: {self.formatFluents(self.eff)}'

### Example
Below is an example of an action being created

In [6]:
a = Action(p={"person":1, "hungry":0, "hasCake":1},e={"hasCake":0, "person":0},c=2,n="EatCake B")

## Core functions

### Below are core functions used for the Forward Search (A*) algorithm

In [7]:
# Get only positive fluents
def getPositiveFluents(state):
    return {key:val for key, val in state.items() if val == 1}


# Get only negative fluents
def getNegativeFluents(state):
    return {key:val for key, val in state.items() if val == 0}


# Check if two states match (match positive fluents only)
def statesMatch(s1,s2):
    s1_pos = getPositiveFluents(s1).copy()
    s2_pos = getPositiveFluents(s2).copy()
    return s1_pos == s2_pos


# Checks if state is a subset of the target – logic rule for checking goal state and applicable actions
def stateSubset(state, target):
    positives = getPositiveFluents(state).copy()
    targetPos = getPositiveFluents(target).copy()
    targetNeg = getNegativeFluents(target).copy()
    subsetPos = targetPos.items() <= positives.items()
    subsetNeg = any(p in positives for p in targetNeg) == False
    return subsetPos & subsetNeg


# Checks if a state fulfills the requirements for a goal state
def atGoal(state, goal):
    remainders = getPositiveFluents(state).copy().items() - getPositiveFluents(goal).copy().items()
    worldCompliment = remainders - getNegativeFluents(goal).copy().items() == set()
    return stateSubset(state, goal) & worldCompliment


# Checks if an action is applicable for a given state
def isApplicable(state, action):
    return stateSubset(state, action.pre)
    

# For a state and all possible actions, return the [applicable actions] for the given state
def applicableActions(state, actions):
    arr = []
    for a in actions:
        if isApplicable(state, a):
            arr.append(a)
    return arr


# Given a state and an action, applies the action and returns the following state if all preconditions are met
def applyAction(state, action):    
    if isApplicable(state, action) == False:
        raise Exception(f'Action {action.name} cannot be applied to state {str(state)}')
    newState = state.copy()
    effects = action.eff.copy()
    for k in effects.keys():
        newState[k] = effects[k]
        if effects[k] == 0:
            newState.pop(k)
    return newState

### Below are the core fuctions for the Backwards Search Algorithm

In [8]:

#Params:
#state: current state from where I want  to konw which actions can be undone
#actions: list with all actions to be evaluated
#fluents: list with all fluents
def removableActions(state,actions,fluents):
    ''' 
    I take the current state and remove the positive effects of the action(A) 
    and also give back the negative effects of it
    Does this new state fullfil action A preconditions?
    '''
    removable = []
    for a in actions:
        flag = 0
        state_pos = [k for k in state.keys()  if state[k]==1]
        state_neg = [k for k in fluents if k not in state_pos]
        eff_pos =[k for k in a.eff.keys() if a.eff[k]==1]
        eff_neg =[k for k in a.eff.keys() if a.eff[k]==0]
        req_neg = [k for k in a.pre.keys() if a.pre[k]==0]
        req_pos = [k for k in a.pre.keys() if a.pre[k]==1]
        for i in eff_pos:
            if i in state_neg:
                flag = 1
        for i in eff_neg:
            if i in state_pos:
                flag = 1
        undone = [e for e in state_pos if e not in eff_pos] #remove the positive effects of A
        undone = undone + eff_neg #add the negative effects of A
        if flag ==0 and containedAinB(req_pos,undone) and not intersectAwithB(req_neg,undone):
        #check if the new state satisfies A preconditions and the positive effect of A are in current state
            removable.append(a)
    return removable

#Params:
#state: current state from where I want to undo the action
#action: the action to be undone
#fluents: list with all fluents
def undoAction(state,action,fluents):
    '''
    We create a state as a result of giving back all negative effects of the action
    and removing all the positive effects of it
    '''
    state_pos = [k for k in state.keys()  if state[k]==1]
    eff_pos =[k for k in action.eff.keys() if action.eff[k]==1]
    eff_neg =[k for k in action.eff.keys() if action.eff[k]==0]
    
    undone = [e for e in state_pos if e not in eff_pos] #remove the positive effects of Action
    undone = undone + eff_neg   #give backthe negative effects of Action
    
    st={}
    for fluent in undone:
        st[fluent]=1
    return st


def containedAinB(a,b):
    '''
    Checks if set A is contained in set B
    '''
    for i in a:
        if i not in b:
            return False
    return True

def intersectAwithB(a,b):
    '''
    Checks if any element of set A belongs to set B
    '''
    for i in a:
        if i in b:
            return True
    return False

def getFluentsFromActions(actions):
    '''
    Gets all the different fluents from the set of all actions
    '''
    #Get all fluents:
    fluents = []
    for a in actions:
        for b  in a.pre.keys():
            fluents.append(b)
        for b  in a.eff.keys():
            fluents.append(b)
    fluents = list(dict.fromkeys(fluents))
    # print(fluents)
    return fluents



## A*

Below is our implementation of the A* algorithm. First, we design a custom Node class for the algorithm to use. Second, we define the heuristic that we use in the travel PDDL. Finally, for A*, the function takes a start state, a goal state, and all actions for the PDDL as input. The output is the list of actions (the path) which the A* algorithm finds. 

In [9]:
# Node class for frontier
class Node:
    
    def __init__(self, state, cost, path):
        self.state = state
        self.cost = cost
        self.path = path

    def __str__(self):
        pathNames = [p.name for p in self.path]
        pathStr = '[' + ', '.join(pathNames) + ']'
        return f'State: {self.state}, Cost: {self.cost}, Path: {pathStr}'

In [10]:
def travelHeuristic(nextState):
    '''
    It is similar to a Pattern Database Heuristic. Based on some pattern (in our case the states)
    We define the heuristic based on how far from the goal the person is
    at home, at the terminal or at the actual destination.
    '''
    atTerminal = False
    atHome = False
    dic = nextState.copy()
    key, value = 'atTerminal',1
    if key in dic and value == dic[key]:
        atTerminal = True
    key, value = 'atHome',1
    if key in dic and value == dic[key]:
        atHome = True

    if atHome == False and atTerminal == False:
        return 1
    if atHome == True and atTerminal == False:
        return 4
    if atHome == False and atTerminal == True:
        return 2
    return 10 # This should never happen

In [11]:
def aStar(start, goal, actions):

    # Init start node, frontier and visited list
    init = Node(start, 0, [])
    frontier = [init]
    visited = [init]

    while True:
        
        # Get first node from frontier
        head = frontier.pop(0)

        state = head.state.copy()

        # When A* has found a path, return it
        if (atGoal(state, goal)):
            print(f'A* found a path with cost: {head.cost}')
            return head.path

        applicActions = applicableActions(state, actions)
        
        for action in applicActions:
            nextState = applyAction(state, action).copy()
            newPath = head.path + [action]
            cost = travelHeuristic(nextState) + head.cost + action.cost
            newNode = Node(nextState, cost, newPath)

            # Only apply action if we haven't visited the state before
            vis = False
            for i in range(len(visited)):
                if visited[i].state == nextState:
                    vis = True
                    if cost < visited[i].cost:
                        visited[i] = newNode
                        frontier.append(newNode)
            if vis == False:
                visited.append(newNode)
                frontier.append(newNode)

        # Sort frontier based on costs
        frontier = sorted(frontier, key=lambda x:x.cost)
        

## Backwards Search Algorithm

Below is the implementation of a Recursive Backwards Search algorithm which, in this case, does not base its decision in the cost of the actions but it evaluates posibilities based on the order they were defined.
<p>It takes as parameters:</p>
<list>
    <li>state: Starting point of the algorithm, 
    <li>goal: Ending point of the algorithm 
(In backwards, the initial state is the goal state of the Planning and the Goal would be the initial state), 
    <li>actions:list of all possible actions
    <li>depth: should be initialized with 0 and prevents infinite loops (max_depth inside the method can be changed if the PDDL is expected to have more steps than that), 
    <li>depth: should be initialized with 0 and prevents infinite loops (max_depth inside the method can be changed if the PDDL is <li>fluents: list of all fluents,
    <li>last_action: which is the last action performed (to initialize the call, we need a fake action A0 with no preconditions and no effects to keep the recursiveness.)
</list>

In [12]:
def backwards(state, goal, actions, last_action, depth, fluents):
    '''
    state: Starting point of the algorithm, 
    goal: Ending point of the algorithm 
    (In backwards, the initial state is the goal state of the Planning and the Goal would be the initial state), 
    actions:list of all possible actions
    fluents: list of all fluents
    depth: should be initialized with 0 and prevents infinite loops (max_depth inside the method can be changed if the PDDL is expected to have more steps than that), 
    last_action: which is the last action performed (to initialize the call, we need a fake action A0 with no preconditions and no effects to keep the recursiveness.)
    '''
    max_depth = 15 #to avoid infinite loop
    if state != goal:
        axns = removableActions(state,actions,fluents)
        if depth > max_depth: #avoiding infinite loops
            return -1,[] 
        for a in axns:
            sc = undoAction(state,a,fluents)
            r,path = backwards(sc.copy(),goal,actions,a,depth+1,fluents)
            if r != -1:
                try:
                    path.append(r)
                except:
                    print("")
                return last_action, path
        return -1,[] #dead en
    else:
        path = []
        return last_action,path

### Example

Below is an example of the A* algorithm on our PDDL (See presentation/report)

In [13]:
demoGoal = {'atDestination':1,'haveBags':1}
demoStart = {"atHome":1} 

demoA1 = Action(p={"atHome":1},e={"haveBags":1},c=1,n="Pack")
demoA2 = Action(p={"haveBags":1, "carFueled":0},e={"carFueled":1},c=2,n="FuelCar")
demoA3 = Action(p={"haveBags":1,"carFueled":1,"atDestination":0},e={"carFueled":0,'atDestination':1,"atHome":0},c=5,n="Drive")
demoA4 = Action(p={"haveBags":1, "atHome":1},e={"atTerminal":1,"atHome":0},c=2,n="Go To Terminal")
demoA5 = Action(p={"atTerminal":1,"haveBags":1,"boarded":0},e={"boarded":1},c=1,n="Board")
demoA6 = Action(p={"atTerminal":1, "boarded":0, "checkedIn":0},e={"checkedIn":1},c=1,n="Check In")
demoA7 = Action(p={"checkedIn":0,"boarded":1, "atDestination":0},e={"atDestination":1, "atTerminal":0, "boarded":0},c=3,n="Take Train")
demoA8 = Action(p={"checkedIn":1,"boarded":1, "atDestination":0},e={"atDestination":1,"atTerminal":0,"boarded":0, "checkedIn":0},c=1,n="Take Plane")

demoActions=[demoA1,demoA2,demoA3,demoA4,demoA5,demoA6,demoA7,demoA8]

solution = aStar(demoStart, demoGoal, demoActions)
print(f'Solution: {[a.name for a in solution]}')

A* found a path with cost: 16
Solution: ['Pack', 'Go To Terminal', 'Board', 'Take Train']


This is the same scenario but applying <b>Backwards Search</b> which is not looking for the optimal nor cheapest way. Just a possible way.

In [17]:
fluents = getFluentsFromActions(demoActions) #we need all fluents
a0 = Action(p={},e={},c=0,n="init") #we need a dummy first "last_action"
_,solution = backwards(demoGoal,demoStart,demoActions, a0,0,fluents) #with depth=0 for initial depth
print(f'Solution: {[a.name for a in solution]}')

Solution: ['Pack', 'FuelCar', 'Drive']


**To check that the heuristics work as intended, we add two cases here from the demo. First, we add a cost of 2 to the action "take train". This should change the solution to one of the other paths that reach the goal. It could be either one since both of them have a cost of 17**

In [13]:
demoA7 = Action(p={"checkedIn":0,"boarded":1, "atDestination":0},e={"atDestination":1, "atTerminal":0, "boarded":0},c=5,n="Take Train")

demoActions=[demoA1,demoA2,demoA3,demoA4,demoA5,demoA6,demoA7,demoA8]

solution = aStar(demoStart, demoGoal, demoActions)
print(f'Solution: {[a.name for a in solution]}')

A* found a path with cost: 17
Solution: ['Pack', 'FuelCar', 'Drive']


**Next, we add a cost of 1 to the action drive. This should change the solution to the third goal path.**

In [14]:
demoA3 = Action(p={"haveBags":1,"carFueled":1,"atDestination":0},e={"carFueled":0,'atDestination':1,"atHome":0},c=6,n="Drive")

demoActions=[demoA1,demoA2,demoA3,demoA4,demoA5,demoA6,demoA7,demoA8]

solution = aStar(demoStart, demoGoal, demoActions)
print(f'Solution: {[a.name for a in solution]}')

A* found a path with cost: 17
Solution: ['Pack', 'Go To Terminal', 'Check In', 'Board', 'Take Plane']


## Automated tests
Below are automated tests for the functions and algorithms implemented.

In [61]:
def testStatesMatch():
    
    # Test null state
    s1 = {}
    s2 = {}
    assert(statesMatch(s1,s2))
    
    # Test equal state
    s1 = {'person':1,'hungry':0}
    s2 = {'person':1,'hungry':0}
    assert(statesMatch(s1,s2))
    
    # Test pass by omission
    s1 = {'person':1}
    s2 = {'person':1, 'hungry':0}
    assert(statesMatch(s1,s2))
    
    # Test non-equal fluents state
    s1 = {'person':1,'hungry':0}
    s2 = {'person':0,'hungry':1}
    assert(statesMatch(s1,s2) == False)
    
    # Test different states
    s1 = {'person':1}
    s2 = {'hungry':1}
    assert(statesMatch(s1,s2) == False)
    
    # Test different states
    s1 = {'person':1}
    s2 = {'hungry':0}
    assert(statesMatch(s1,s2) == False)
    
    # Test different states
    s1 = {'person':1}
    s2 = {'person':1, 'hungry':1}
    assert(statesMatch(s1,s2) == False)
    
    print(f'All test cases for function "statesMatch" passed')

    
def testGetPositiveFluents():
    
    s = {'person':0,'hungry':0}
    assert(getPositiveFluents(s) == {})
    
    s = {'person':1,'hungry':0}
    assert(getPositiveFluents(s) == {'person':1})
    
    s = {'person':1, 'hungry':1, 'angry': 1}
    assert(getPositiveFluents(s) == {'person':1, 'hungry':1, 'angry': 1})
    
    s = {'person':1}
    assert(getPositiveFluents(s) == {'person':1})
    
    print(f'All test cases for function "getPositiveFluents" passed')
    
    
def testApplicableActions(actions):
    
    # Test cases from powerpoint examples
    start = {'person':1}
    applActions = applicableActions(start, actions)
    assert(all(a in [a4,a5] for a in applActions))
    assert(any(a in [a1,a2,a3] for a in applActions) == False)
    
    s1 = {'person': 1, 'hasMix': 1}
    applActions = applicableActions(s1, actions)
    assert(all(a in [a3,a4,a5] for a in applActions))
    assert(any(a in [a1,a2] for a in applActions) == False)
    
    s2 = {'person': 1, 'hungry': 1}
    applActions = applicableActions(s2, actions)
    assert(all(a in [a5] for a in applActions))
    assert(any(a in [a1,a2,a3,a4] for a in applActions) == False)
    
    s3 = {'person': 1, 'hasCake': 1}
    applActions = applicableActions(s3, actions)
    assert(all(a in [a1,a4,a5] for a in applActions))
    assert(any(a in [a2,a3] for a in applActions) == False)
    
    s4 = {'person': 1, 'hungry': 1, 'hasMix': 1}
    applActions = applicableActions(s4, actions)
    assert(all(a in [a3,a5] for a in applActions))
    assert(any(a in [a1,a2,a4] for a in applActions) == False)
    
    print(f'All test cases for function "applicableActions" passed')

    
def testApplyAction():
    
    # Test added fluents
    s = {'a':1}
    a = Action(p={'a':1},e={'b':1},c=0,n="")
    assert(applyAction(s, a) == {'a':1, 'b':1})
    a = Action(p={'a':1},e={'b':1, 'c':1},c=0,n="")
    assert(applyAction(s, a) == {'a':1, 'b':1, 'c':1})
    
    # Test omission
    s = {'a':1}
    a = Action(p={'a':1, 'c':0},e={'b':1, 'c':1},c=0,n="")
    assert(applyAction(s, a) == {'a':1, 'b':1, 'c':1})
    
    # Test negative (applyAction should throw exception)
    try:
        s = {'a':1, 'c':1}
        a = Action(p={'a':1, 'c':0},e={'b':1, 'c':1},c=0,n="")
        raise Exception(f'Test negative failed')
    except:
        pass
    
    print(f'All test cases for function "applyAction" passed')


# Tests valid solutions to the cake-PDDL (See slide 63 - PDDL lecture)
def testPDDLsolutions():
    a1 = Action(p={"person":1, "hungry":0, "hasCake":1},e={"hasCake":0, "person":0},c=1,n="EatCake B")
    a2 = Action(p={"person":1, "hungry":1, "hasCake":1},e={"hasCake":0, "eatenCake":1,"hungry":0},c=1,n="EatCake A")
    a3 = Action(p={"person":1,"hasMix":1},e={"hasCake":1,"hasMix":0},c=1,n="MakeCake")
    a4 = Action(p={"person":1,"hungry":0},e={"hungry":1},c=1,n="Wait")
    a5 = Action(p={"person":1},e={"hasMix":1},c=1,n="Go Shopping")

    actions = [a1,a2,a3,a4,a5]

    goal = {'person':1,'hungry':0, "hasCake":1, "eatenCake":1}
    start = {'person':1}

    # Test valid path from start to goal
    s1 = applyAction(start, a5)
    s3 = applyAction(s1, a3)
    s5 = applyAction(s3, a5)
    s9 = applyAction(s5, a4)
    s11 = applyAction(s9, a2)
    g = applyAction(s11, a3)
    assert(atGoal(g, goal))

    # Test another valid path from start to goal
    s2 = applyAction(start, a4)
    s4 = applyAction(s2, a5)
    s7 = applyAction(s4, a3)
    s10 = applyAction(s7, a2)
    s11_v2 = applyAction(s10, a5)

    # sanity check - check that s11 from both versions are identical
    assert(statesMatch(s11, s11_v2))

    g2 = applyAction(s11_v2, a3)
    assert(atGoal(g2, goal))

    print(f'All test cases for function "testPDDLsolutions" passed')

    
# Validate a PDDL solution
def validatePDDL(start, goal, solution):
    s = start
    for a in solution:
        s = applyAction(s, a)
    assert(atGoal(s, goal))
    print(f'The given PDDL solution is valid')


def testAtGoal():
    s = {'atGoal':1}
    goal = {'atGoal': 1}
    assert(atGoal(s,goal))
    
    s = {'atGoal':1, 'happy':0}
    goal = {'atGoal': 1}
    assert(atGoal(s,goal))
    
    s = {'atGoal':0}
    goal = {'atGoal': 1}
    assert(atGoal(s,goal) == False)
    
    s = {'notAtGoal':1}
    goal = {'atGoal': 1}
    assert(atGoal(s,goal) == False)
    
    print(f'All test cases for function "atGoal" passed')
    
def getTestActions():
    a1 = Action(p={"person":1, "hungry":0, "hasCake":1},e={"hasCake":0, "person":0},c=1,n="EatCake B")
    a2 = Action(p={"person":1, "hungry":1, "hasCake":1},e={"hasCake":0, "eatenCake":1,"hungry":0},c=1,n="EatCake A")
    a3 = Action(p={"person":1,"hasMix":1},e={"hasCake":1,"hasMix":0},c=1,n="MakeCake")
    a4 = Action(p={"person":1,"hungry":0},e={"hungry":1},c=1,n="Wait")
    a5 = Action(p={"person":1},e={"hasMix":1},c=1,n="Go Shopping")
    actions = [a1,a2,a3,a4,a5]
    
    return actions,getFluentsFromActions(actions)
                    
def testRemovableActions():
    s = {'person':1, "hasCake":1, "eatenCake":1}
    actions, fluents = getTestActions()
    
    unaxns = removableActions(s,actions,fluents)
    names = [a.name for a in unaxns]
    assert('MakeCake' in names and len(unaxns)==1 )
    
    s={'person':1, "hasMix":1, "hungry":1}
    unaxns = removableActions(s,actions,fluents)
    names = [a.name for a in unaxns]
    assert('Wait' in names and 'Go Shopping' in names and len(unaxns)==2 )
    
    print(f'All test cases for function "removableActions" passed')

def testUndoAction():
    
    actions,fluents = getTestActions()
    
    s = {'person':1, "hasCake":1, "eatenCake":1}
    a = Action(p={"person":1,"hasMix":1},e={"hasCake":1,"hasMix":0},c=1,n="MakeCake")
    u = undoAction(s,a,fluents) 
    assert(u['person']==1 and u['eatenCake']==1 and u['hasMix']==1 )
    
    s = {'person':1,'eatenCake':1,'hungry':1}
    a = Action(p={"person":1,"hungry":0},e={"hungry":1},c=1,n="Wait")
    u = undoAction(s,a,fluents)
    flag = 0
    try:
        if u['hungry']==1 : #state hungry should not exist since it is 0, then i should not be able to access it -> Exception
            flag =1
    except:
        flag = 0
    assert(u['person']==1 and u['eatenCake']==1 and flag==0)
    
    
    print(f'All test cases for function "UndoAction" passed')


### Define the PDDL from the PDDL lecture (Mike has cake example). We use this to run some of the tests.

In [64]:
goal = {'person':1,'hungry':0, "hasCake":1, "eatenCake":1}
start = {'person':1}

a1 = Action(p={"person":1, "hungry":0, "hasCake":1},e={"hasCake":0, "person":0},c=1,n="EatCake B")
a2 = Action(p={"person":1, "hungry":1, "hasCake":1},e={"hasCake":0, "eatenCake":1,"hungry":0},c=1,n="EatCake A")
a3 = Action(p={"person":1,"hasMix":1},e={"hasCake":1,"hasMix":0},c=1,n="MakeCake")
a4 = Action(p={"person":1,"hungry":0},e={"hungry":1},c=1,n="Wait")
a5 = Action(p={"person":1},e={"hasMix":1},c=1,n="Go Shopping")

actions = [a1,a2,a3,a4,a5]

In [66]:
testStatesMatch()
testGetPositiveFluents()
testApplicableActions(actions)
testApplyAction()
testPDDLsolutions()
testAtGoal()
testUndoAction()
testRemovableActions()

All test cases for function "statesMatch" passed
All test cases for function "getPositiveFluents" passed
All test cases for function "applicableActions" passed
All test cases for function "applyAction" passed
All test cases for function "testPDDLsolutions" passed
All test cases for function "atGoal" passed
All test cases for function "UndoAction" passed
All test cases for function "removableActions" passed
