In [None]:
from nbdev import *
%nbdev_default_export functions

Cells will be exported to pct.functions,
unless a different module is specified after an export flag: `%nbdev_export special.module`


In [None]:
#hide
#%load_ext autoreload
#%autoreload 2

In [None]:
import numpy as np
import logging
log = logging.getLogger(__name__)

# Functions

Functions that form the elements of a perceptual control node (system).

In [None]:
%nbdev_export
from abc import ABC, abstractmethod

In [None]:
%nbdev_export
class BaseFunction(ABC):
    "Base class of a PCT function."
    def __init__(self, name):
        self.value = 0
        self.links = []
        self.name = name
        
    @abstractmethod
    def __call__(self, verbose=False):
        if verbose :
            print(f'{self.value}', end= " ")
            
        return self.value
    
    @abstractmethod    
    def summary(self, str):
        print(f'{self.name} {type(self).__name__} ', end = " ")
        if len(str)>0:
            print(f'| {str}', end= " ")
        print(f'| {self.value}', end = " ")
        if len(self.links)>0:
            print(f'| links ', end=" ")
        for link in self.links:
            print(link.get_name(), end= " ")
        print()
        
    def get_name(self):
        return self.name
    
    def set_name(self, name):
        self.name=name

    def set_value(self, value):
        self.value= value
    
    def get_value(self):
        return self.value
    
    def add_link(self, linkfn):
        self.links.append(linkfn)


In [None]:
%nbdev_export
class Variable(BaseFunction):
    "A function that returns a variable value."
    def __init__(self, variable, name="variable"):
        super().__init__(name)
        self.value = variable
    
    def __call__(self, verbose=False):
        return super().__call__(verbose)
    
    def summary(self):
        super().summary("")


In [None]:
%nbdev_export
class Constant(BaseFunction):
    "A function that returns a constant value."
    def __init__(self, constant, name="constant"):
        super().__init__(name)
        self.value = constant
    
    def __call__(self, verbose=False):
        return super().__call__(verbose)
    
    def summary(self):
        super().summary("")


In [None]:
%nbdev_export
class Subtract(BaseFunction):
    "A function that subtracts one value from another."
    def __init__(self, name="subtract"):
        super().__init__(name)
    
    def __call__(self, verbose=False):
        #print("Sub ", self.links[0].get_value(),self.links[1].get_value() )
        self.value = self.links[0].get_value()-self.links[1].get_value()

        return super().__call__(verbose)

    def summary(self):
        super().summary("")


In [None]:
%nbdev_export
class Proportional(BaseFunction):
    "Proportional function."
    def __init__(self, gain, name="proportional"):
        super().__init__(name)
        self.gain = gain
    
    def __call__(self, verbose=False):
        input = self.links[0].get_value()
        self.value = input * self.gain
        return super().__call__(verbose)
    
    def summary(self):
        super().summary(f' gain {self.gain}')


In [None]:
%nbdev_export
class Integration(BaseFunction):
    "Integration function."
    def __init__(self, gain, slow, name="integration"):
        super().__init__(name)
        self.gain = gain
        self.slow = slow
    
    def __call__(self, verbose=False):
        input = self.links[0].get_value()
        self.value = self.value +  ((input * self.gain) - self.value)/self.slow
        
        return super().__call__(verbose)

 
    def summary(self):
        super().summary(f'gain {self.gain} slow {self.slow} ')


In [None]:
def velocity_model(velocity,  force , mass):
    velocity = velocity + force / mass
    return velocity

In [None]:
integrator = Integration(3, 10)
integrator.add_link(Constant(5))
output = integrator()
print(output)

1.5


In [None]:
#hide
integrator.set_value(np.array([0]))
output = integrator()
assert output == [1.5]

In [None]:
%nbdev_export
class PCTNode():
    "A single PCT controller."
    def __init__(self, perception, name="pctnode", history=False):
        self.links_built = False
        if history:
            self.history = PCTNodeData()
        self.name=name 
        self.perceptionCollection = [perception]
        reference = Constant(1)
        self.referenceCollection = [reference]
        comparator = Subtract()
        self.comparatorCollection = [comparator]
        self.outputCollection = [Proportional(10)]
    
    def __call__(self, verbose=False):
        if not self.links_built:
            self.build_links()
            
        for referenceFunction in self.referenceCollection:
            referenceFunction(verbose)               

        for perceptionFunction in self.perceptionCollection:
            perceptionFunction(verbose)
                    
        for comparatorFunction in self.comparatorCollection:
            comparatorFunction(verbose)

        for outputFunction in self.outputCollection:
            outputFunction(verbose)
            
        self.output = self.outputCollection[-1].get_value()
        
        if verbose:
            print()
            
        if not self.history == None:
            self.history.add_data(self)
            
        return self.output
    
    def build_links(self):
        if len(self.referenceCollection)>0:
            link = self.referenceCollection[0]
            for i in range (1, len(self.referenceCollection)):
                self.referenceCollection[i].add_link(link)               
                link = self.referenceCollection[i]

        if len(self.perceptionCollection)>0:
            link = self.perceptionCollection[0]
            for i in range (1, len(self.perceptionCollection)):
                self.perceptionCollection[i].add_link(link)               
                link = self.perceptionCollection[i]

        self.comparatorCollection[0].add_link(self.referenceCollection[-1])
        self.comparatorCollection[0].add_link(self.perceptionCollection[-1])

        if len(self.comparatorCollection)>1:
            link = self.comparatorCollection[1]
            for i in range (1, len(self.comparatorCollection)):
                self.comparatorCollection[i].add_link(link)               
                link = self.comparatorCollection[i]

        self.outputCollection[0].add_link(self.comparatorCollection[-1])

        if len(self.outputCollection)>0:
            link = self.outputCollection[0]
            for i in range (1, len(self.outputCollection)):
                self.outputCollection[i].add_link(link)               
                link = self.outputCollection[i]

        self.links_built = True

    def run(self, steps=None, verbose=False):
        for i in range(steps):
            out = self(verbose)
        return out
    
    def set_output(self, value):
        self.outputCollection[-1].set_value(value)
        
    def get_output_function(self):
        return self.outputCollection[-1]
    
    
    def set_function_name(self, collection, position, name):
        if collection == "refcoll":
            self.referenceCollection[position].set_name(name)
    
    def summary(self):
        if not self.links_built:
            self.build_links()

        print(self.name, type(self).__name__)
        print("----------------------------")
        print("REF:", end=" ")
        for referenceFunction in self.referenceCollection:
            referenceFunction.summary()   
        
        print("PER:", end=" ")
        for perceptionFunction in self.perceptionCollection:
            perceptionFunction.summary()
        
        print("COM:", end=" ")
        for comparatorFunction in self.comparatorCollection:
            comparatorFunction.summary()
        
        print("OUT:", end=" ")
        for outputFunction in self.outputCollection:
            outputFunction.summary()
        
        print("----------------------------")


In [None]:
%nbdev_export
class PCTNodeData():
    "Data collected for a PCTNode"
    def __init__(self, name="pctnodedata"):
        self.data = {
            "refcoll":{}, 
            "percoll":{},
            "comcoll":{}, 
            "outcoll":{}}
        
    
    def add_data(self, node):
        ctr = 0 
        
        self.add_collection( node.referenceCollection, "refcoll")
        self.add_collection( node.perceptionCollection, "percoll")
        self.add_collection( node.comparatorCollection, "comcoll")
        self.add_collection( node.outputCollection, "outcoll")

    def add_collection(self, collection, collname):
        for func in collection:            
            if self.data[collname].get(func.get_name()) == None:
                dlist=[]
                cdict={func.get_name():dlist}
                self.data[collname]=cdict
            else:
                dlist = self.data[collname][func.get_name()]
                
            dlist.append(func.get_value())


In [None]:
mass = 50
force = 0
vel = Variable(0, name="velocity")
node = PCTNode(vel, history=True)

In [None]:
node.summary()

pctnode PCTNode
----------------------------
REF: constant Constant  | 1 
PER: velocity Variable  | 0 
COM: subtract Subtract  | 0 | links  constant velocity 
OUT: proportional Proportional  |  gain 10 | 0 | links  subtract 
----------------------------


In [None]:
output = node(verbose=False)
force = output
vel.set_value(velocity_model(vel.get_value(), force, mass))
print(force)
assert output == 10

10


In [None]:
velocity_input = Variable(0, name="velocity")
pctnode = PCTNode(velocity_input, history=True)
pctnode.set_function_name("refcoll", -1, "reference")

for i in range(40):
    force = pctnode(verbose=False)
    vel = velocity_model(velocity_input.get_value(), force, mass)
    velocity_input.set_value(vel)

In [None]:
#print(pctnode.history.data['outcoll']['proportional'])

In [None]:
import plotly.graph_objects as go
fig = go.Figure(layout_title_text="Velocity Goal")
fig.add_trace(go.Scatter(y=pctnode.history.data['refcoll']['reference'], name="ref"))
fig.add_trace(go.Scatter(y=pctnode.history.data['percoll']['velocity'], name="perc"))

In [None]:
#hide
from nbdev import *
notebook2script()