In [1]:
from nltk import PCFG, ProbabilisticProduction as pp, nonterminals, Nonterminal, Production, ProbabilisticMixIn
import sys
import random
import copy
import itertools

In [2]:
class PSDG(PCFG):
    def __init__(self, start, productions, calculate_leftcorners=True):
        PCFG.__init__(self, start, productions, calculate_leftcorners)
    
    def update_prods(self,state):
        for i in self._productions:
            i.update_prob(state)

In [3]:
class PSDProduction(Production,ProbabilisticMixIn):
    """
    Edited version of nltk's ProbablisticProduction class
    See http://www.nltk.org/_modules/nltk/grammar.html#ProbabilisticProduction
    
    This was edited to contain a precondition function and to be able to update
    the probability of production given a new state. The probability passed into
    the constructor represents the probability of the production if the passed
    state satisfies the preconditions.
    

    :see also: ``Production``
    """

    def __init__(self, lhs, rhs,current_s,precond, t_prob = 1.0):
        self.precond = precond
        self.t_prob = t_prob
        if self.precond(current_s):
            p = t_prob
        else:
            p = 1 - t_prob
            
        ProbabilisticMixIn.__init__(self, prob = p)
        Production.__init__(self, lhs, rhs)

    def __str__(self):
        return super().__str__() + (
            " [1.0]" if (self.prob() == 1.0) else " [%g]" % self.prob()
        )

    def __eq__(self, other):
        return (
            type(self) == type(other)
            and self._lhs == other._lhs
            and self._rhs == other._rhs
            and self.prob() == other.prob()
        )

    def __ne__(self, other):
        return not self == other

    def __hash__(self):
        return hash((self._lhs, self._rhs, self.prob()))
    
    def update_prob(self,state):
        if self.precond(state):
            p = self.t_prob
        else:
            p = 1 - self.t_prob
        self.set_prob(p)

In [4]:
class PSDG_Domain():
    def __init__(self,methods,actions):
        self.methods = []
        for i in methods:
            if i["task"]:
                if i["task"][0] == '!':
                    raise ValueError("Compound Task name should not start with !")
            else:
                raise ValueError("Compound Task name should have atleast one character")
            if not callable(i["preconditions"]):
                raise ValueError("Preconditions must be callable object")
            if (not isinstance(i["subtasks"], list)) or (not all(isinstance(e,str) for e in i["subtasks"])):
                raise ValueError("Subtasks must be a list of strings")
            if not 't_prob' in i:
                i['t_prob'] = 1
            self.methods.append(i)
        self.actions = []
        for i in actions:
            if not callable(i):
                raise ValueError("Action must be callable object")
            self.actions.append(i)
        
    def initialize_planning(self,i_state):
        self.i_state = i_state
        self.c_state = copy.deepcopy(i_state)
        prods = []
        for i in self.methods:
            subtasks = []
            for j in i['subtasks']:
                if j[0] == '!':
                    subtasks.append(j)
                else:
                    subtasks.append(Nonterminal(j))
            prods.append(PSDProduction(Nonterminal(i['task']),subtasks,i_state,i['preconditions'],i['t_prob']))
        self.psdg = PSDG(Nonterminal('P'),prods)
            
    def _sample_from_grammar(self,grammar, actions, start=None, depth=None):
        """
        This is based off of the generate function in nltk for CFG. This uses ancestral
        sampling to generate one possible plan/sentence rather than all possible 
        plans/sentences. 
        see http://www.nltk.org/_modules/nltk/parse/generate.html#generate

        :param grammar: The Grammar used to generate sentences.
        :param start: The Nonterminal from which to start generate sentences.
        :param depth: The maximal depth of the generated tree.
        :param n: The maximum number of sentences to return.
        :return: An iterator of lists of terminal tokens.
        """
        if not start:
            start = grammar.start()
        if depth is None:
            depth = sys.maxsize
        
        self.states = [copy.deepcopy(self.c_state)]
        
        iter = self._sample_all(grammar, actions,[start], depth)

        return iter

    def _sample_all(self,grammar, actions, items, depth):
        if items:
            try:
                for frag1 in self._sample_one(grammar, actions, items[0], depth):
                    for frag2 in self._sample_all(grammar, actions,items[1:], depth):
                        yield frag1 + frag2
            except RuntimeError as _error:
                if _error.message == "maximum recursion depth exceeded":
                    # Helpful error message while still showing the recursion stack.
                    raise RuntimeError(
                        "The grammar has rule(s) that yield infinite recursion!!"
                    )
                else:
                    raise
        else:
            yield []


    def _sample_one(self,grammar, actions, item, depth):
        if depth > 0:
            if isinstance(item, Nonterminal):
                prod = random.choices([x for x in grammar.productions(lhs=item)],[x.prob() for x in grammar.productions(lhs=item)])[0]
                for frag in self._sample_all(grammar, actions, prod.rhs(), depth - 1):
                    yield frag
            else:
                for i in actions:
                    if i.__name__ == item[1:]:
                        self.c_state = i(self.c_state)
                grammar.update_prods(self.c_state)
                self.states.append(copy.deepcopy(self.c_state))
                yield [item]
                
    def sample_plans(self,n_samples = 1, output_options = None, start = None, depth = None):
        state_seqs = []
        plans = []
        if start is not None:
            start = Nonterminal(start)
        for i in range(n_samples):
            plans.append(list(self._sample_from_grammar(self.psdg, self.actions, start, depth))[0])
            state_seqs.append(self.states)
            self.c_state = copy.deepcopy(self.i_state)
        
        if output_options == 'a':
            return plans
        if output_options == 's':
            return state_seqs
        if output_options == 'as':
            return tuple(zip(plans, state_seqs))
        
        plan_traces = []
        for i,j in enumerate(state_seqs):
            trace = [None]*(len(j)+len(plans[i]))
            trace[::2] = j
            trace[1::2] = plans[i]
            plan_traces.append(trace)
        return plan_traces
        
        
        
    

In [5]:
#actions
def go_to_school(state):
    state['went-to-school'] = 1
    return state

def go_to_work(state):
    state['went-to-work'] = 1
    return state

def do_chores(state):
    state['did-chores'] = 1
    return state

def do_homework(state):
    state['did-homework'] = 1
    state['have-homework'] = 0
    return state

def stay_for_tutoring(state):
    state['stayed-for-tutoring'] = 1
    return state

def go_running(state):
    state['ran'] = 1
    return state

def play_videogames(state):
    state['played-videogames'] = 1
    return state

def go_to_store(state):
    state['went-to-store'] = 1
    state['need-groceries'] = 0
    return state

def watch_movie(state):
    state['watched-movie'] = 1
    state['found-movies'] = 0
    return state

actions = [go_to_school,go_to_work,do_chores,do_homework,stay_for_tutoring,go_running,play_videogames,
          go_to_store,watch_movie]

In [6]:
#preconditions

def default(state):
    return True

def work_raining_homework(state):
    if state['raining']:
        if state['have-homework']:
            if state['work-today']:
                return True
    return False

def raining_homework(state):
    if state['raining']:
        if state['have-homework']:
            if not state['work-today']:
                return True
    return False

def work_homework(state):
    if not state['raining']:
        if state['have-homework']:
            if state['work-today']:
                return True
    return False

def homework(state):
    if not state['raining']:
        if state['have-homework']:
            if not state['work-today']:
                return True
    return False

def work_raining(state):
    if state['raining']:
        if not state['have-homework']:
            if state['work-today']:
                return True
    return False

def raining(state):
    if state['raining']:
        if not state['have-homework']:
            if not state['work-today']:
                return True
    return False

def work(state):
    if not state['raining']:
        if not state['have-homework']:
            if state['work-today']:
                return True
    return False

def no_work(state):
    if not state['raining']:
        if not state['have-homework']:
            if not state['work-today']:
                return True
    return False

def work_raining_homework_store(state):
    if state['raining']:
        if state['have-homework']:
            if state['work-today']:
                if state['need-groceries']:
                    return True
    return False

def work_raining_homework_no_store(state):
    if state['raining']:
        if state['have-homework']:
            if state['work-today']:
                if not state['need-groceries']:
                    return True
    return False

def raining_homework_store(state):
    if state['raining']:
        if state['have-homework']:
            if not state['work-today']:
                if state['need-groceries']:
                    return True
    return False

def raining_homework_no_store(state):
    if state['raining']:
        if state['have-homework']:
            if not state['work-today']:
                if not state['need-groceries']:
                    return True
    return False

def work_homework_store(state):
    if not state['raining']:
        if state['have-homework']:
            if state['work-today']:
                if state['need-groceries']:
                    return True
    return False

def work_homework_no_store(state):
    if not state['raining']:
        if state['have-homework']:
            if state['work-today']:
                if not state['need-groceries']:
                    return True
    return False

def homework_store(state):
    if not state['raining']:
        if state['have-homework']:
            if not state['work-today']:
                if state['need-groceries']:
                    return True
    return False

def homework_no_store(state):
    if not state['raining']:
        if state['have-homework']:
            if not state['work-today']:
                if not state['need-groceries']:
                    return True
    return False

def work_raining_store(state):
    if state['raining']:
        if not state['have-homework']:
            if state['work-today']:
                if state['need-groceries']:
                    return True
    return False

def work_raining_no_store(state):
    if state['raining']:
        if not state['have-homework']:
            if state['work-today']:
                if not state['need-groceries']:
                    return True
    return False

def raining_store(state):
    if state['raining']:
        if not state['have-homework']:
            if not state['work-today']:
                if state['need-groceries']:
                    return True
    return False

def raining_no_store(state):
    if state['raining']:
        if not state['have-homework']:
            if not state['work-today']:
                if not state['need-groceries']:
                    return True
    return False

def work_store(state):
    if not state['raining']:
        if not state['have-homework']:
            if state['work-today']:
                if state['need-groceries']:
                    return True
    return False

def work_no_store(state):
    if not state['raining']:
        if not state['have-homework']:
            if state['work-today']:
                if not state['need-groceries']:
                    return True
    return False

def no_work_store(state):
    if not state['raining']:
        if not state['have-homework']:
            if not state['work-today']:
                if state['need-groceries']:
                    return True
    return False

def no_work_no_store(state):
    if not state['raining']:
        if not state['have-homework']:
            if not state['work-today']:
                if not state['need-groceries']:
                    return True
    return False

def raining_homework_movie(state):
    if state['raining']:
        if state['have-homework']:
            if not state['work-today']:
                if state['found-movie']:
                    return True
    return False

def raining_homework_no_movie(state):
    if state['raining']:
        if state['have-homework']:
            if not state['work-today']:
                if not state['found-movie']:
                    return True
    return False

def homework_movie(state):
    if not state['raining']:
        if state['have-homework']:
            if not state['work-today']:
                if state['found-movie']:
                    return True
    return False

def homework_no_movie(state):
    if not state['raining']:
        if state['have-homework']:
            if not state['work-today']:
                if not state['found-movie']:
                    return True
    return False

def raining_movie(state):
    if state['raining']:
        if not state['have-homework']:
            if not state['work-today']:
                if state['found-movie']:
                    return True
    return False

def raining_no_movie(state):
    if state['raining']:
        if not state['have-homework']:
            if not state['work-today']:
                if not state['found-movie']:
                    return True
    return False

def no_work_movie(state):
    if not state['raining']:
        if not state['have-homework']:
            if not state['work-today']:
                if state['found-movie']:
                    return True
    return False

def no_work_no_movie(state):
    if not state['raining']:
        if not state['have-homework']:
            if not state['work-today']:
                if not state['found-movie']:
                    return True
    return False

def work_raining_friday(state):
    if state['raining']:
        if state['work-today']:
            return True
    return False

def raining_friday(state):
    if state['raining']:
        if not state['work-today']:
            return True
    return False

def work_friday(state):
    if not state['raining']:
        if state['work-today']:
            return True
    return False

def no_work_friday(state):
    if not state['raining']:
        if not state['work-today']:
            return True
    return False

In [7]:
#methods
methods = [
    {
        'task': 'P',
        'preconditions': default,
        'subtasks': ['MONDAY'],
        't_prob': 0.2
    },
    {
        'task': 'P',
        'preconditions': default,
        'subtasks': ['TUESDAY'],
        't_prob': 0.2
    },
    {
        'task': 'P',
        'preconditions': default,
        'subtasks': ['WEDNESDAY'],
        't_prob': 0.2
    },
    {
        'task': 'P',
        'preconditions': default,
        'subtasks': ['THURSDAY'],
        't_prob': 0.2
    },
    {
        'task': 'P',
        'preconditions': default,
        'subtasks': ['FRIDAY'],
        't_prob': 0.2
    },
    {
        'task': 'MONDAY',
        'preconditions': work_raining_homework,
        'subtasks': ['!go_to_school','!go_to_work','!do_chores','!do_homework'],
        't_prob': 1
    },
    {
        'task': 'MONDAY',
        'preconditions': raining_homework,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!do_chores','!do_homework'],
        't_prob': 1
    },
    {
        'task': 'MONDAY',
        'preconditions': work_homework,
        'subtasks': ['!go_to_school','!go_to_work','!go_running','!do_homework'],
        't_prob': 1
    },
    {
        'task': 'MONDAY',
        'preconditions': homework,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!go_running','!do_homework'],
        't_prob': 1
    },
    {
        'task': 'MONDAY',
        'preconditions': work_raining,
        'subtasks': ['!go_to_school','!go_to_work','!do_chores','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'MONDAY',
        'preconditions': raining,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!do_chores','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'MONDAY',
        'preconditions': work,
        'subtasks': ['!go_to_school','!go_to_work','!go_running','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'MONDAY',
        'preconditions': no_work,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!go_running','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': work_raining_homework_store,
        'subtasks': ['!go_to_work','!go_to_school','!go_to_store','!do_homework'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': work_raining_homework_no_store,
        'subtasks': ['!go_to_work','!go_to_school','!do_homework','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': raining_homework_store,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!go_to_store','!do_homework'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': raining_homework_no_store,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!do_homework','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': work_homework_store,
        'subtasks': ['!go_to_work','!go_to_school','!go_to_store','!do_homework'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': work_homework_no_store,
        'subtasks': ['!go_to_work','!go_to_school','!do_homework','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': homework_store,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!go_to_store','!do_homework'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': homework_no_store,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!do_homework','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': work_raining_store,
        'subtasks': ['!go_to_work','!go_to_school','!go_to_store','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': work_raining_no_store,
        'subtasks': ['!go_to_work','!go_to_school','!do_chores','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': raining_store,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!go_to_store','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': raining_no_store,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!do_chores','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': work_store,
        'subtasks': ['!go_to_work','!go_to_school','!go_to_store','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': work_no_store,
        'subtasks': ['!go_to_work','!go_to_school','!go_running','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': no_work_store,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!go_to_store','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'TUESDAY',
        'preconditions': no_work_no_store,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!go_running','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'WEDNESDAY',
        'preconditions': work_raining_homework,
        'subtasks': ['!go_to_school','!do_homework','!go_to_work','!do_chores'],
        't_prob': 1
    },
    {
        'task': 'WEDNESDAY',
        'preconditions': raining_homework,
        'subtasks': ['!go_to_school','!do_homework','!do_chores','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'WEDNESDAY',
        'preconditions': work_homework,
        'subtasks': ['!go_to_school','!do_homework','!go_to_work','!do_chores'],
        't_prob': 1
    },
    {
        'task': 'WEDNESDAY',
        'preconditions': homework,
        'subtasks': ['!go_to_school','!do_homework','!go_running','!do_chores'],
        't_prob': 1
    },
    {
        'task': 'WEDNESDAY',
        'preconditions': work_raining,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!go_to_work','!do_chores'],
        't_prob': 1
    },
    {
        'task': 'WEDNESDAY',
        'preconditions': raining,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!play_videogames','!do_chores'],
        't_prob': 1
    },
    {
        'task': 'WEDNESDAY',
        'preconditions': work,
        'subtasks': ['!go_to_school','!go_to_work','!go_running','!do_chores'],
        't_prob': 1
    },
    {
        'task': 'WEDNESDAY',
        'preconditions': no_work,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!go_running','!do_chores'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': work_raining_homework,
        'subtasks': ['!go_to_work','!go_to_school','!do_homework','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': raining_homework_movie,
        'subtasks': ['!go_to_school','!do_homework','!do_chores','!watch_movie'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': raining_homework_no_movie,
        'subtasks': ['!go_to_school','!do_homework','!do_chores','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': work_homework,
        'subtasks': ['!go_to_work','!go_to_school','!do_homework','!go_running'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': homework_movie,
        'subtasks': ['!go_to_school','!do_homework','!go_running','!watch_movie'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': homework_no_movie,
        'subtasks': ['!go_to_school','!do_homework','!go_running','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': work_raining,
        'subtasks': ['!go_to_work','!go_to_school','!stay_for_tutoring','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': raining_movie,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!play_videogames','!watch_movie'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': raining_no_movie,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': work,
        'subtasks': ['!go_to_work','!go_to_school','!go_running','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': no_work_movie,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!play_videogames','!watch_movie'],
        't_prob': 1
    },
    {
        'task': 'THURSDAY',
        'preconditions': no_work_no_movie,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'FRIDAY',
        'preconditions': work_raining_friday,
        'subtasks': ['!do_chores','!play_videogames','!go_to_work'],
        't_prob': 1
    },
    {
        'task': 'FRIDAY',
        'preconditions': raining_friday,
        'subtasks': ['!go_to_school','!stay_for_tutoring','!do_chores','!play_videogames'],
        't_prob': 1
    },
    {
        'task': 'FRIDAY',
        'preconditions': work_friday,
        'subtasks': ['!play_videogames','!go_running','!go_to_work'],
        't_prob': 1
    },
    {
        'task': 'FRIDAY',
        'preconditions': no_work_friday,
        'subtasks': ['!go_to_school','!go_running','!do_chores','!play_videogames'],
        't_prob': 1
    }
]

In [8]:
simple_schedule = PSDG_Domain(methods,actions)

In [9]:
current_state = {'raining': 1, 'work-today': 1, 'need-groceries': 0,
                'have-homework': 1, 'found-movie': 0, 'went-to-school': 0,
                'went-to-work': 0, 'did-chores': 0, 'did-homework': 0,
                'stayed-for-tutoring': 0, 'ran': 0, 'played-videogames': 0,
                'went-to-store': 0, 'watched-movie': 0}


In [10]:
simple_schedule.initialize_planning(current_state)

In [11]:
plans = simple_schedule.sample_plans(output_options = 'a',start = 'MONDAY')

In [12]:
print(plans)

[['!go_to_school', '!go_to_work', '!do_chores', '!do_homework']]
