In [1]:
# This class returns causal model objects.

class CausalModel:

    def __init__(self, name, magnitudes, derivatives, controller):
        self.name = name
        self.magnitudes = magnitudes
        self.derivatives = derivatives
        self.controller = controller
        self.influence = []
        self.proportionals = []
        self.constraints = []
        
    def __str__(self):
        return "===========\nName: %s\nMagnitudes: %s\nInfluences: %s\nProportionals: %s\nConstraints: %s\n===========\n" % (self.name, self.magnitudes, self.influence, self.proportionals, self.constraints)
        
    def getName(self):
        return self.name
    
    def getMagnitudes(self):
        return self.magnitudes
    
    def getDerivatives(self):
        return self.derivatives
    
    def getInfluences(self):
        return self.influence
    
    def getProportionals(self):
        return self.proportionals
    
    def getConstraints(self):
        return self.constraints
    
    def getController(self):
        return self.controller
    
    def propagateInfluence(self, magnitude):
        derivative = 0
        affected_q = None
        if len(self.influence) == 1:
            if magnitude != 0:
                affected_q = self.influence[0][0]
                derivative = magnitude * self.influence[0][1]
    
        return (affected_q, derivative)
    
    def propagateProportionals(self, derivative):
        new_derivative = 0
        affected_q = None
        if len(self.proportionals) == 1:
            affected_q = self.proportionals[0][0]
            new_derivative = derivative * self.proportionals[0][1]
        return (affected_q, new_derivative)
    
    def setInfluence(self, other, influence):
        if influence == 'I+':
            other.influence.append((self.name, 1))
        elif influence == 'I-':
            other.influence.append((self.name, -1))
        return
        
    def setProportional(self, other, propto):
        if propto == 'P+':
            self.proportionals.append((other.name, 1))
        elif propto == 'P-':
            self.proportionals.append((other.name, -1))
        return
    
    def setConstraint(self, other, c1, c2):
        self.constraints.append((other.name, [c1, c2]))
        other.constraints.append((self.name, [c2, c1]))
        return
    
def initialize_pset1():
    inflow = CausalModel("Inflow", [0,1],[-1,0,1], controller=True)
    volume = CausalModel("Sink", [0,1,2],[-1,0,1], controller=False)
    outflow = CausalModel("Outflow", [0,1,2],[-1,0,1], controller=False)

    volume.setInfluence(inflow, 'I+')
    volume.setInfluence(outflow, 'I-')
    outflow.setProportional(volume, 'P+')
    outflow.setConstraint(volume, 2, 2)
    outflow.setConstraint(volume, 0, 0)
    
    return inflow, volume, outflow


In [77]:
from copy import deepcopy, copy

# A class that creates states and find all new states based on a given state.
class State:
    
    # state is a dict of quantities, with given values.
    # quantities is an array of CausalModel objects that contains the 'rules' for finding new states.
    # Children will be an array of state objects, i.e. the states that were found.
    def __init__(self, quantity, state):
        self.uniqueID = 0
        self.state = state
        self.quantity = quantity
        self.children = []
        self.isInterval = True

    def __str__(self):
        return "Inf%s\nVol%s\nOut%s" % (self.state["Inflow"], self.state["Sink"],self.state["Outflow"])
    
    def getMagnitudes(self):
        return self.state["Inflow"][0], self.state["Sink"][0], self.state["Outflow"][0]
    
    def getDerivatives(self):
        return self.state["Inflow"][1], self.state["Sink"][1], self.state["Outflow"][1]
    
    def getChildren(self):
        children = []
        for child in range(len(self.children)):
            children.append(str(self.children[child]))
        return children    

    def findNewStates(self):
        new_state = deepcopy(self)
        new_states = []
        quantities = self.quantity
        state_magnitudes = self.getMagnitudes()
        state_derivatives = self.getDerivatives()
        
        sum_derivatives = 0
        sum_magnitudes = 0
        
        # Sum of all derivatives
        for i in state_derivatives:
            sum_derivatives += state_derivatives[i]
        
        # Sum of all magnitudes
        for i in state_magnitudes:
            sum_magnitudes += state_magnitudes[i]
        
        # We need to escape equilibria, if all derivatives are zero:
        if sum_derivatives == 0:
            # There is a unique situation where we need to start the process by opening the tap...
            if sum_magnitudes == 0:
                new_state.isInterval = False
                new_state.state["Inflow"][1] = 1
                self.children.append(new_state)
                new_states.append(new_state)
                print(new_state.state)
                return new_states
            # Without having second order derivatives, we already know what states to create
            # if we have a situation where the magnitudes are active but the derivatives aren't!
            else:
                new_state_pos = deepcopy(self)
                new_state_neg = deepcopy(self)
                new_state_pos.isInterval = False
                new_state_neg.isInterval = False
                new_state_pos.state["Inflow"][1] = 1
                new_state_neg.state["Inflow"][1] = -1
                self.children.append(new_state_pos)
                self.children.append(new_state_neg)
                new_states.append(new_state_pos)
                new_states.append(new_state_neg)
                print(new_state.state)
                return new_states
        
        """
        for s in range(len(new_states)):

            if state_derivatives[0] == state_magnitudes[0] and state_magnitudes[0] == 1:
                for i in range(len(state_derivatives)):
                    if state_derivatives[i] != 0:
                        new_magnitude = state_magnitudes[i] + state_derivatives[i]
                        if new_magnitude in quantities[i].magnitudes:
                            new_state.state[quantities[i].name][0] = new_magnitude

            if state_derivatives[0] != state_magnitudes[0]:
                new_state.state["Inflow"][0] = new_state.state["Inflow"][1]
                state_magnitudes = new_state.getMagnitudes()
                
                for q in range(len(quantities)):
                    s_influence = quantities[q].propagateInfluence(state_magnitudes[q])
                    if s_influence[0] != None:
                        print("influence haywire")
                        new_state.state[s_influence[0]][1] = s_influence[1]

                    if len(quantities[q].proportionals) > 0:
                        print("prop haywire")
                        proportional = quantities[q].proportionals
                        prop_derivative = new_state.state[proportional[0][0]][1]
                        s_proportionality = quantities[q].propagateProportionals(prop_derivative)
                        new_state.state[quantities[q].name][1] = s_proportionality[1]
        """
        
        if str(new_state) != str(self):
            self.children.append(new_state)
            new_states.append(new_state)
        
        return new_states
    
    # Main problem: Create a method that finds all possible states given a state.
    # This function can be iterative or recursive.
    def exploreAllStates(self):
        statesProcessed, statesFound = [self], [self]
        
        for i in range(len(statesProcessed)):
            state_being_processed = statesProcessed.pop(0)
            
            new_states_list = state_being_processed.findNewStates()
            
            for j in range(len(new_states_list)):
                statesProcessed.append(new_states_list[i])
            

            # Prune found states
            
            # Store all states found as children of states
            
            # If the found states are not yet in statesFound, add them to states found.
            
            # If the found states are not yet in statesProcessed, add them to states processed.
            
            # If the statesProcessed list is empty, The iteration stops.
            
        return statesFound

In [78]:
from graphviz import Digraph

def main():
    # All states in the form of {"Inflow" : [0,0], "Sink" : [0,0],"Outflow" : [0,0]}
    
    state_list = []
    inf, vol, out = initialize_pset1()
    
    # Create a start state
    start_state = State([inf, vol, out],{"Inflow" : [0,0], "Sink" : [0,0],"Outflow" : [0,0]})
    
    # Find a new state based on the start_state:
    result = start_state.exploreAllStates()
    #new_state = State([inf, vol, out],{"Inflow" : [0,1], "Sink" : [0,0],"Outflow" : [0,0]})
    #suc_state = State([inf, vol, out],{"Inflow" : [1,1], "Sink" : [0,1],"Outflow" : [0,1]})
    
    # Just adding states to the state list, which we will plot:
    state_list.append(start_state)
    #state_list.append(new_state)
    
    graph = generateStateGraph(result)
    #print(start_state)
    
main()

{'Sink': [0, 0], 'Inflow': [0, 1], 'Outflow': [0, 0]}


In [None]:
from graphviz import Digraph

state_formats = {
    'continode' : {
        'fontname' : 'Courier',
        'fontsize' : '7',
        'shape' : 'box',
        'width' : '.3'
    },
    'pointnode' : {
        'fontname' : 'Courier',
        'fontsize' : '7',
        'shape' : 'box',
        'color' : 'lightgrey',
        'width' : '.3'
    },
    'activedge' : {
        'fontname' : 'Courier',
        'fontsize' : '10',
    }
}

def generateStateGraph(stateList):
        
    graph = Digraph(comment='State Graph:')

    for state in range(len(stateList)):
        node_name = str(stateList[state]).replace("-1", "-")
        node_name = node_name.replace("1", "+")
        node_name = node_name.replace("2", "Max")
        if stateList[state].isInterval == True:
            graph.node(str(stateList[state]), node_name, **state_formats['continode'])
        else:
            graph.node(str(stateList[state]), node_name, **state_formats['pointnode'])
        children = stateList[state].getChildren()
        for child in range(len(children)):
            graph.edge(str(stateList[state]), str(children[child]), **state_formats['activedge'])
    
    graph.view()
    return
    