In [1]:
# Mypy; for the `|` operator purpose
# Remove this __future__ import once the oldest supported Python is 3.10
from __future__ import annotations

In [2]:
from typing import List, Callable, Tuple
from enum import Enum
from copy import copy
from copy import deepcopy

In [3]:
def fuzzy_not(o):
    if o is None:
        return o
    return not o
    
def fuzzy_or(o1, o2):
    if o1 is None or o2 is None:
        return None
    return o1 or o2
    
def fuzzy_and(o1, o2):
    if o1 is False or o2 is False:
        return False
    if o1 is None or o2 is None:
        return None
    return True
    
def fuzzy_eq(o1, o2):
    if o1 is None or o2 is None:
        return None
    return o1 == o2
    

In [4]:
class Fluent:
    def __init__(self, **fluents):
        for name, value in fluents.items():
            assert (isinstance(name, str))
            self.name = name
            self.value = value
            break  # only the first value is processed

    # for if statements
    def __bool__(self):
        return self.value

    # for ==, != comparisons
    def __eq__(self, other):
        if isinstance(other, bool) or other is None:
            return fuzzy_eq(self.value, other)
        elif isinstance(other, Fluent):
            return fuzzy_and(fuzzy_eq(self.value, other.value), self.name == other.name)
        else:
            return False

    def __repr__(self):
        return f"{self.name}={self.value}"


class DomainDescription:

    def __init__(self):

        self.fluents: Dict[str, Fluent] = dict()
        self._causes: Dict[str, List[Tuple[List[Fluent], List[Fluent]]]] = dict()
        self.actions: Dict[str, Action] = dict()
        self.impossibles: Dict[str, List[List[Fluent]]] = dict()
    
    def state(self):
        return [(f.name, f.value) for f in self.fluents.values()]
        
    def __repr__(self):
        fluents = "\nFLUENTS\n" + "\n".join(map(str, self.fluents.values()))
        #         causes =  "\n\nCAUSES\n"+"\n".join(map(str, self._causes.keys()))
        return fluents

    def _check_if_known(self, fluents: List[Fluent], default_value = None):
        if isinstance(fluents, Fluent):
            fluents = [fluents]
        for fluent in fluents:
            if fluent.name not in self.fluents:
                assumed_fluent = copy(fluent)
                assumed_fluent.value = default_value
                self.fluents[fluent.name] = assumed_fluent
                
    def _set(self, fluents: Fluent):
        if isinstance(fluents, Fluent):
            fluents = [fluents]
        for fluent in fluents:
            self.fluents[fluent.name] = fluent

    def _check(self, conditions: List[Fluent]) -> bool | None:
        """Checks a single list of fluent requirements"""
        if conditions is None:
            return True
        
        conditions_met = []
        for f in conditions:
            if f.name not in self.fluents:
                return False # maybe should be None
            
            conditions_met.append(f == self.fluents[f.name])
            
            
        if False in conditions_met:
            return False
        if None in conditions_met:
            return None
        return True
    
    def _possible(self, action: str) -> bool | None:
        """Checks a list of lists of fluent requirements"""
        if action not in self.impossibles:
            return True

        conditions_met = [self._check(conditions) for conditions in self.impossibles[action]]
            
        if True in conditions_met:
            return False
        if None in conditions_met:
            return None
        return True
        
    def _do(self, fluents: List[Fluent], conditions_met, possible):
        """
        Sets fluents to values specified in fluents variable if action possible and conditions_met
        if condi
        """
        # at this stage possible is either None or True
        
        if conditions_met is False:
            return
        
        diff: List[Fluents] = []
        for fluent in fluents:
            # check the possible changes in fluents
            if (self.fluents[fluent.name] == fluent) is not True: # so if any change happens
                diff.append(fluent)
                
        if conditions_met is None or possible is None:
            diff = [copy(f) for f in diff]
            for fluent in diff:
                fluent.value = None
        self._set(diff)
            
    def do_action(self, action_name):
        possible = self._possible(action_name)
        # print(f"{action_name=} {possible=}")
        if possible is False:
            return
        
        for to_set, conditions in self._causes[action_name]:
            self._do(to_set, self._check(conditions), possible)
            
    def _add_action(self, action_name: str, fluents: List[Fluent], conditions: List[Fluent]):
        if action_name not in self._causes:
            self._causes[action_name] = []
            setattr(self.__class__, action_name, lambda x: x.do_action(action_name))

        self._causes[action_name].append((fluents, conditions))
        

    def initially(self, **kwargs):
        for key, value in kwargs.items():
            self.fluents[key] = Fluent(**{key: value})

    def impossible(self, action: str, conditions: List[Fluent]):
        if isinstance(conditions, Fluent):
            conditions = [conditions]
        if action not in self.impossibles:
            self.impossibles[action] = []

        self.impossibles[action].append(conditions)

    def causes(self, action: str, fluents: List[Fluent], conditions: List[Fluent] = None):
        if isinstance(fluents, Fluent):
            fluents = [fluents]
        if isinstance(conditions, Fluent):
            conditions = [conditions]

        self._check_if_known(fluents)
        self._add_action(action, fluents, conditions)

    def releases(self, action: str,  fluents: List[Fluent]):
        if isinstance(fluents, Fluent):
            fluents = [fluents]

        for fluent in fluents:
            fluent.value = None

        self._check_if_known(fluents)
        self._add_action(action, fluents, None)

class TimeDomainDescription(DomainDescription):
    
    def __init__(self):
        super().__init__()
        self.durations: Dict[str, int] = dict()
        self.time = 1
        self.termination_time = float('inf')
        
    def duration(self, action, time):
        self.durations[action] = time
        
    def terminate_time(self, time):
        # TODO make this useful
        self.termination_time = time
        
    def make_time_step(self, action):
        if action not in self.durations:
            # maybe shoudl raise exception
            self.time+=1
            return
        
        self.time+=self.durations[action]        
    
    def _add_action(self, action_name: str, fluents: List[Fluent], conditions: List[Fluent]):
        if action_name not in self._causes:
            self._causes[action_name] = []
            setattr(self.__class__, action_name, lambda x, time: x.do_action(action_name, time)) # added time to action execution

        self._causes[action_name].append((fluents, conditions))
        
    def do_action(self, action_name, time_start) -> bool:
        if time_start<self.time:
            # maybe an exception?
            return False
        
        possible = self._possible(action_name)
        # print(f"{action_name=} {possible=}")
        if possible is False:
            return False
        
        self.make_time_step(action_name)
        for to_set, conditions in self._causes[action_name]:
            self._do(to_set, self._check(conditions), possible)
        return True
            

In [5]:
class Scenario:
    
    def __init__(self, domain: TimeDomainDescription, observations: List[Tuple[Fluent, int]], action_occurances: List[Tuple[str, int]]):
        self.observations = observations
        self.action_occurances = action_occurances
        self.domain = deepcopy(domain)
        
        
    def set_observations_as_true(self):
        pass
    
    def is_consistent(self, verbose = False):
        domain = deepcopy(self.domain)
        for action, time in self.action_occurances:
            if domain.do_action(action, time) is False:
                if verbose:
                    print(f"Action {action} at time {time} breaks consistency")
                return False
        return True

In [6]:
f1 = Fluent(key=False)
f2 = Fluent(loaded = False)
f3 = Fluent(key=True)
f4 = Fluent(key=False)

In [7]:
m = DomainDescription()
m.initially(alive=True, loaded=False)
m.causes("load", [Fluent(loaded= True), Fluent(jammed=False)])
m.releases("load", Fluent(hidden=False))
m.causes("jam", Fluent(jammed=True), conditions=Fluent(loaded=True))
m.causes("shoot", Fluent(alive=False), conditions=[Fluent(loaded=True), Fluent(hidden=False), Fluent(jammed=False)])
m.causes("shoot", [Fluent(loaded=False), Fluent(jammed=False)])

In [8]:
m.state()

[('alive', True), ('loaded', False), ('jammed', None), ('hidden', None)]

In [9]:
m.shoot()
m.state()

[('alive', True), ('loaded', False), ('jammed', False), ('hidden', None)]

In [10]:
m.load()
m.state()

[('alive', True), ('loaded', True), ('jammed', False), ('hidden', None)]

In [11]:
m.shoot()
m.state()

[('alive', None), ('loaded', False), ('jammed', False), ('hidden', None)]

# Examples

## Example 1

### Domain description

In [12]:
m = TimeDomainDescription()
m.initially(alive=True)
m.causes("load", [Fluent(loaded= True), Fluent(jammed=False)])
m.releases("load", Fluent(hidden=False))
m.causes("jam", Fluent(jammed=True), conditions=Fluent(loaded=True))
m.causes("shoot", Fluent(alive=False), conditions=[Fluent(loaded=True), Fluent(hidden=False), Fluent(jammed=False)])
m.causes("shoot", [Fluent(loaded=False), Fluent(jammed=False)])
m.duration("load", 2)
m.duration("jam", 1)
m.duration("shoot", 1)

In [13]:
m.state()

[('alive', True), ('loaded', None), ('jammed', None), ('hidden', None)]

In [14]:
m.do_action("load", 1)
m.state()

[('alive', True), ('loaded', True), ('jammed', False), ('hidden', None)]

In [15]:
m.do_action("load", 2)
m.state()

[('alive', True), ('loaded', True), ('jammed', False), ('hidden', None)]

In [16]:
m.do_action("jam", 3)
m.state()

[('alive', True), ('loaded', True), ('jammed', True), ('hidden', None)]

In [17]:
m.do_action("shoot", 4)
m.state()

[('alive', True), ('loaded', False), ('jammed', False), ('hidden', None)]

### Scenario

In [18]:
m = TimeDomainDescription()
m.initially(alive=True)
m.causes("load", [Fluent(loaded= True), Fluent(jammed=False)])
m.releases("load", Fluent(hidden=False))
m.causes("jam", Fluent(jammed=True), conditions=Fluent(loaded=True))
m.causes("shoot", Fluent(alive=False), conditions=[Fluent(loaded=True), Fluent(hidden=False), Fluent(jammed=False)])
m.causes("shoot", [Fluent(loaded=False), Fluent(jammed=False)])
m.duration("load", 2)
m.duration("jam", 1)
m.duration("shoot", 1)

In [19]:
OBS = ([Fluent(alive=True), Fluent(loaded=False), Fluent(jammed=True), Fluent(hidden=False)], 1)
ACS = (('load', 1), ("jam", 3), ("shoot", 4))
s = Scenario(domain=m, observations=OBS, action_occurances=ACS)

In [20]:
s.is_consistent()

True

## Example 2

In [21]:
m = TimeDomainDescription()
m.initially(alive=True)
m.causes("load", [Fluent(loaded= True), Fluent(jammed=False)])
m.releases("load", Fluent(hidden=False))
m.causes("jam", Fluent(jammed=True), conditions=Fluent(loaded=True))
m.causes("shoot", Fluent(alive=False), conditions=[Fluent(loaded=True), Fluent(hidden=False), Fluent(jammed=False)])
m.causes("shoot", [Fluent(loaded=False), Fluent(jammed=False)])
m.duration("load", 2)
m.duration("jam", 1)
m.duration("shoot", 1)

### Scenario

In [22]:
OBS = ([Fluent(alive=True), Fluent(loaded=False), Fluent(jammed=True), Fluent(hidden=False)], 1)
ACS = (('load', 1), ("jam", 2), ("shoot", 4))
s = Scenario(domain=m, observations=OBS, action_occurances=ACS)

In [23]:
s.is_consistent(verbose=True)

Action jam at time 2 breaks consistency


False

## Example 3

In [24]:
d = TimeDomainDescription()
d.initially(inspired=True, painted=False)
d.causes("paint", [Fluent(painted=True), Fluent(inspired=False)])
d.impossible("paint", conditions=Fluent(inspired=False))
d.causes("pay", Fluent(painted=False))
d.impossible("pay", conditions=Fluent(painted=False))
d.releases("pay", Fluent(inspired=False))
d.duration("paint", 2)
d.duration("pay", 1)

### Scenario

In [25]:
OBS = ([Fluent(inspired=True), Fluent(painted=False)], 1)
ACS = (('paint', 1), ("pay", 3), ("pay", 4), ('paint', 5))
s = Scenario(domain=d, observations=OBS, action_occurances=ACS)

In [26]:
s.is_consistent(verbose=True)

Action pay at time 4 breaks consistency


False

In [27]:
d.paint(2)
d.state()

[('inspired', False), ('painted', True)]

In [28]:

class Statement:
    def __init__(self, fluents: List[Fluent], actions: List[str]):
        self.fluents = fluents
        self.actions = actions
        
def call_method(o, name):
    getattr(o, name)()

class Structure:
    
    def __init__(self, model: DomainDescription):
        self.model = deepcopy(model)

    def is_statement_true(self, statement: Statement):
        m = deepcopy(self.model)
        
        for action in statement.actions:
            m.do_action(action)
            
        return m._check(statement.fluents)

In [29]:
d = DomainDescription()
d.initially(alive=True, loaded=False, hidden=False)
d.causes("load", [Fluent(loaded= True), Fluent(jammed=False)])
d.releases("load", Fluent(hidden=False))
d.causes("jam", Fluent(jammed=True), conditions=Fluent(loaded=True))
d.causes("shoot", Fluent(alive=False), conditions=[Fluent(loaded=True), Fluent(hidden=False), Fluent(jammed=False)])
d.causes("shoot", [Fluent(loaded=False), Fluent(jammed=False)])
d.causes("look", Fluent(hidden=False), conditions=Fluent(alive=True))

In [30]:
statement_1 = Statement([Fluent(alive=False)], actions=["shoot"])
statement_2 = Statement([Fluent(alive=False)], actions=["load", "look", "shoot"])

stru = Structure(model=d)

In [31]:
stru.is_statement_true(statement=statement_1)

False

In [32]:
stru.is_statement_true(statement=statement_2)

True