In [1]:
import networkx as nx
import sys
from ruamel.yaml import YAML
import numpy as np
from abc import ABC, abstractmethod
from collections import namedtuple
from copy import deepcopy
from functools import reduce
from typing import Callable, Any
from itertools import accumulate

In [2]:
from systemflow.node import *

In [3]:
class CollectImage(Mutate):
    def __init__(self):
        fields = []
        properties = []
        parameters = ["resolution x", "resolution y", "bit depth"]
        super().__init__(fields, properties, parameters)

    def transform(self, component: Component, message: Message) -> tuple[dict, dict, dict]:
        #access the required fields/properties/parameters
        n_px_x = component.parameters["resolution x"]
        n_px_y = component.parameters["resolution y"]
        resolution = (n_px_x,
                      n_px_y,
                      component.parameters["bit depth"],)
        n_bytes = np.prod(resolution) / 8.0
        sample_rate = component.parameters["sample rate"]
        time = 0.0
        sensor_power = component.parameters["pixel energy"] * n_px_x * n_px_y

        #create the new fields in the message

        msg_fields = {"image data": n_bytes,
                  "time": time}
        msg_props = {"resolution": resolution,
                     "sample rate": sample_rate,}

        #create the new properties in the host
        host_props = {"sensor power": sensor_power,}

        return msg_fields, msg_props, host_props
       

In [4]:
ci = CollectImage()

In [5]:
ci_host = Component("Image sensor",
                    [CollectImage(),],
                    {"resolution x": 6000,
                     "resolution y": 4000,
                     "bit depth": 8,
                     "sample rate": 1000,
                     "pixel energy": 1e-6,},
                     {})

In [6]:
msg0 = Message({}, {})

In [7]:
ci(ci_host, msg0)

(Message(fields={'image data': np.float64(24000000.0), 'time': 0.0}, properties={'resolution': (6000, 4000, 8), 'sample rate': 1000}),
 {'sensor power': 24.0})

In [8]:
class CollectTemperature(Mutate):
    def __init__(self):
        fields = []
        properties = []
        parameters = ["bit depth", "sample rate", "sensor power"]
        super().__init__(fields, properties, parameters)

    def transform(self, component: Component, message: Message) -> tuple[dict, dict, dict]:
        #access the required fields/properties/parameters
        n_bytes = component.parameters["bit depth"] / 8.0
        sample_rate = component.parameters["sample rate"]
        time = 0.0
        sensor_power = component.parameters["sensor power"]

        #create the new fields in the message
        msg_fields = {"image data": n_bytes,
                  "time": time}
        msg_props = {"sample rate": sample_rate,}

        #create the new properties in the host
        host_props = {"sensor power": sensor_power,}

        return msg_fields, msg_props, host_props
       

In [9]:
ct = CollectTemperature()

In [10]:
ct_host = Component("Thermocouple",
                    [CollectTemperature(),],
                    {"bit depth": 8,
                     "sample rate": 1200,
                     "sensor power": 1e-3,},
                     {})

In [11]:
ct.host_parameters

['bit depth', 'sample rate', 'sensor power']

In [12]:
sa = set(["a", "b", "c"])

In [13]:
sb = set(["b", "f", "g"])

In [14]:
sa.intersection(sb)

{'b'}

In [15]:
ct(ct_host, msg0)

(Message(fields={'image data': 1.0, 'time': 0.0}, properties={'sample rate': 1200}),
 {'sensor power': 0.001})

In [16]:
dm = OverwriteMerge()

In [17]:
list(sa)

['b', 'c', 'a']

In [18]:
msg0

Message(fields={}, properties={})

In [19]:
dm([msg0, msg0])

Message(fields={}, properties={})

In [20]:
msg1 = Message({"spiciness": 1000}, {"seasoning": "hot sauce"})
msg2 = Message({"spiciness": 2}, {"seasoning": "black pepper", "dish": "pasta"}) 

In [21]:
set(msg2.fields.keys()).union(set(msg2.properties.keys()))

{'dish', 'seasoning', 'spiciness'}

In [22]:
dm([msg1, msg2])

No merge provided for spiciness, taking first value
No merge provided for seasoning, taking first value


Message(fields={'spiciness': 1000}, properties={'dish': 'pasta', 'seasoning': 'hot sauce'})

In [23]:
class MaxSpicyMerge(Merge):
    def __init__(self):
        super().__init__({"spiciness": np.max,}, {"seasoning": lambda x: x[1]})
    


In [24]:
msm = MaxSpicyMerge()

In [25]:
msm([msg1, msg2])

Message(fields={'spiciness': np.int64(1000)}, properties={'dish': 'pasta', 'seasoning': 'black pepper'})

In [None]:
class RateMerge(Merge):
    def __init__(self):
        super().__init__()

    def __call__(self, messages: list[Message]) -> Message:
        
        

In [22]:
image_data.properties["time"]

NameError: name 'image_data' is not defined

In [23]:
temp_data = Message({"temperature": 1 ,},
                    {"sample rate": 1e3 ,
                     "time": 0.0 })

In [24]:
merge_messages(image_data, temp_data)

Message(fields={'image data': 24000000, 'temperature': 1}, properties={'sensor': (4000, 6000, 8), 'sample rate': 1000.0, 'time': 0.0})

In [25]:
dm = DefaultMerge()

In [27]:
exp_msg, exp_props = dm([image_data, temp_data])

In [28]:
exp_msg.properties["sensor"]

(4000, 6000, 8)

In [29]:
exp_msg.properties

{'sensor': (4000, 6000, 8), 'sample rate': 1000.0, 'time': 0.0}

In [30]:
exp_msg.fields

{'image data': 24000000, 'temperature': 1}

In [31]:
class Convolve(Mutate):
    def __init__(self):
        fields = ["image data"]
        properties = ["sensor"]
        parameters = ["kernel x", "kernel y", "filters"]
        super().__init__(fields, properties, parameters)

    def transform(self, component: Component, message: Message):
        #access the required fields/properties/parameters
        res = message.properties["sensor"]
        kernel_x = component.parameters["kernel x"]
        kernel_y = component.parameters["kernel y"]
        filters = component.parameters["filters"]

        #calculate the number of ops required for the kernel
        kernel_ops = kernel_x * kernel_y * filters
        steps_x = (res[0] - kernel_x) // kernel_x
        steps_y = (res[1] - kernel_y) // kernel_y
        kernel_repeats = steps_x * steps_y

        #calculate the number of ops required for the kernel
        transform_operations = kernel_ops * kernel_repeats

        properties = {}
        properties["transform operations"] = transform_operations
        fields = {}
        fields["features"] = np.prod((steps_x, steps_y, filters))

        return fields, properties
       

In [32]:
convolve_host = Component("Convolution", [Convolve,], {"kernel x": 3, "kernel y": 3, "filters": 8}, {})

In [33]:
cnv = Convolve()

In [34]:
message, props = cnv(convolve_host, exp_msg)

In [35]:
message

Message(fields={'image data': 24000000, 'temperature': 1, 'features': np.int64(21301344)}, properties={'sensor': (4000, 6000, 8), 'sample rate': 1000.0, 'time': 0.0})

In [36]:
props

{'transform operations': 191712096}

In [None]:
convolve_host = Component("Convolution", [Convolve,], {"kernel x": 3, "kernel y": 3, "filters": 8}, {})

In [152]:
from systemflow.classifier import *

In [153]:
gc = GaussianClassifier(3.0)

In [171]:
gc.solve_reduction([900, 100], 0.9)

In [172]:
gc.error_matrix

array([[0.98055646, 0.17499203],
       [0.01944354, 0.82500797]])

In [180]:

class GaussianClassify(Mutate):
    def __init__(self):
        fields = []
        properties = ["sample rate"]
        parameters = ["skill", "variance", "reduction"]
        super().__init__(fields, properties, parameters)

    def transform(self, component: Component, message: Message):
        #access the required fields/properties/parameters
        sample_rate = message.properties["sample rate"]
        skill = component.parameters["skill"]
        variance = component.parameters["variance"]
        reduction = component.parameters["reduction"]

        #calculate the error statistics
        falses = sample_rate * reduction
        trues = sample_rate - falses
        inputs = np.array([falses, trues])
        
        gc = GaussianClassifier(skill, varscale=variance)
        output = gc(inputs, reduction)

        fields = {"contingency": output}
        properties = {"error matrix": gc.error_matrix}

        return fields, properties

In [181]:
1000 * 0.1

100.0

In [182]:
gcf = GaussianClassify()

In [183]:
classify_host = Component(["Classify"], [], {"skill": 3, "variance": 1, "reduction": 0.9}, {})

In [184]:
msg2, props2 = gcf(classify_host, message)

In [185]:
msg2.properties["sample rate"]

1000.0

In [186]:
props2

{'error matrix': array([[0.98055646, 0.17499203],
        [0.01944354, 0.82500797]])}

In [187]:
get_rejected(msg2.fields["contingency"])

array([882,  17])

In [188]:
get_passed(msg2.fields["contingency"])

array([17, 82])

In [39]:
ntx.DiGraph()

<networkx.classes.digraph.DiGraph at 0x128d81c60>

In [None]:
stop

In [None]:
def flatten_tuple(t):
    for item in t:
        if isinstance(item, tuple):
            yield from flatten_tuple(item)
        else:
            yield item

In [None]:
Message = namedtuple("Message", ["fields", "properties"])

In [None]:
def merge_messages(x: Message, y: Message):
        fields = x.fields | y.fields
        properties = x.properties | y.properties
        new_message = Message(fields, properties)
        return new_message

In [None]:
class Merge(ABC):
    def __init__(self, props_update):
        self.props_update = props_update

    def __call__(self, messages: list[Message]):
        message = reduce(merge_messages, messages)
        new_comp_properties = self.props_update(message)
        return message, new_comp_properties

In [None]:
class DefaultMerge(Merge):
    def __init__(self):
        def dummy_props(message):
            return {}
        self.props_update = dummy_props
        super().__init__(dummy_props)        

In [None]:
class Mutate(ABC):
    def __init__(self, msg_fields: list[str], msg_properties: list[str], parameters: list[str]):
        #fields contain the data necessary for the transform
        self.msg_fields = msg_fields 
        #parameters control the behavior of the transform
        self.msg_properties = msg_properties 
        self.parameters = parameters

    def _field_check(self, message: Message) -> None:
        field_present = [f in message.fields.keys() for f in self.msg_fields]
        assert np.all(field_present), "Input field for transform not found in incoming message"

    def _property_check(self, message: Message) -> None:
        matches = [f in message.properties.keys() for f in self.msg_properties]
        props_chk = np.all(matches)
        if not props_chk:
            missing = str(self.properties.keys()[~matches]) 
            assert props_chk, str("Transform's properties not found in incoming message:", missing)
    
    def _param_check(self, component: 'Component') -> None:
        matches = [f in component.parameters.keys() for f in self.parameters]
        params_chk = np.all(matches)
        if not params_chk:
            missing = str(self.parameters.keys()[~matches]) 
            assert params_chk, str("Transform's control parameters not found in host component:", missing)

    def transform(self, component: 'Component', message: Message) -> tuple[dict, dict, dict]:
        new_msg_fields = {}
        new_msg_properties = {}
        new_host_properties = {}
        
        # USER - calculate new fields/properties for message/component here
        return new_msg_fields, new_msg_properties, new_host_properties
    
    def __call__(self, component: 'Component', message: Message) -> tuple[Message, dict]:
        #check that all incoming messages have the field(s)/parameters necessary for the transform
        self._field_check(message)
        #check that the incoming message has the parameters necessary for the transform
        self._property_check(message)
        #check that the host component has the parameters necessary for the transform
        self._param_check(component)

        #create independent copies of the message and component
        component = deepcopy(component)
        message = deepcopy(message)
        #determine the new fields for the message and properties for message and host
        new_msg_fields, new_msg_props, new_host_props = self.transform(component, message)
        #merge information into a new outgoing message
        new_message = Message(message.fields | new_msg_fields, message.properties | new_msg_props)

        return new_message, new_host_props

In [None]:
class Component(ABC):
    def __init__(self, name: str, mutations: list[Mutate], parameters: dict, properties: dict) -> None:
        super().__init__()
        self.name = name
        self.merge = DefaultMerge()
        assert len(mutations) > 0, "Should have at least one mutation in a component"
        self.mutations = mutations
        self.parameters = parameters
        self.properties = properties

    def blank_message(self) -> Message:
        message = Message({}, {})
        return message

    def __call__(self, exg: 'ExecutionGraph') -> 'Component':
        #find which components send messages here
        predecessors = exg.get_predecessors(self)
        if len(predecessors) > 0:
            #gather their input messages
            input_components = [node(exg) for node in predecessors]
            input_messages = [component.output_msg for component in input_components]
            #merge them
            input_msg = self.merge(input_messages)
        else:
            #otherwise, create a blank message
            input_msg = self.blank_message()
        
        #go through the mutations on this component
        mutations = list(accumulate(self.mutations, lambda x, f: f(x[0]), initial=input_msg))
        #separate the message and property outputs
        properties = [output[1] for output in mutations]
        merged_properties = reduce(lambda x, y: x | y, properties) | self.properties

        #store the new output message in the new component
        new_component = Component(self.name, self.mutations, self.parameters, merged_properties)
        new_component.output_msg = mutations[-1][0]
        
        return new_component


In [None]:
class Transport(ABC):
    def __init__(self, msg_fields: dict, msg_properties: dict, parameters: dict) -> None:
        self.msg_fields = msg_fields
        self.msg_properties = msg_properties
        self.parameters = parameters
        self.properties = {}

    def _field_check(self, message: Message) -> None:
        field_present = [f in message.fields.keys() for f in self.msg_fields]
        assert np.all(field_present), "Input field for transform not found in incoming message"

    def _property_check(self, message: Message) -> None:
        matches = [f in message.properties.keys() for f in self.msg_properties]
        props_chk = np.all(matches)
        if not props_chk:
            missing = str(self.properties.keys()[~matches]) 
            assert props_chk, str("Transform's properties not found in incoming message:", missing)
    
    def _param_check(self, link: 'Link') -> None:
        matches = [f in link.parameters.keys() for f in self.parameters]
        params_chk = np.all(matches)
        if not params_chk:
            missing = str(self.parameters.keys()[~matches]) 
            assert params_chk, str("Transform's control parameters not found in host component:", missing)

    def transport(self, link: 'Link', message: Message) -> dict:
        parameters = link.parameters
        new_properties = {}
        # USER - calculate new properties for the link here
        return new_properties

    def __call__(self, link: 'Link') -> dict:
        tx_message = link.tx.output_msg
        #check that all incoming messages have the field(s)/parameters necessary for the transform
        self._field_check(tx_message)
        #check that the incoming message has the parameters necessary for the transform
        self._property_check(tx_message)
        #check that the host component has the parameters necessary for the transform
        self._param_check(link)
        
        new_properties = self.transport(link, tx_message)
        return new_properties


In [None]:
class Link(ABC):
    def __init__(self, name: str, tx: Component, rx: Component, transport: Transport, parameters: dict):
        self.name = name
        self.tx = tx
        self.rx = rx
        self.transport = transport
        self.parameters = parameters
        self.properties = {}

    def __call__(self) -> 'Link':
        new_properties = self.transport(self)
        new_link = Link(self.name, self.tx, self.rx, self.transport, self.properties | new_properties)
        return new_link

In [None]:
class ExecutionGraph(ABC):
    """
    Construct a new execution graph given a name, list of nodes (producers and components), and list of edges
    (links between nodes).
    """
    def __init__(self, name: str, nodes: list[Component], links: list[Link], iteration: int = 0):
        self.name = name
        self.nodes = nodes
        self.links = links
        self.iteration = iteration

        nodes = [(n.name, {"ref": n,}) for n in nodes]
        edges = [(l.tx, l.rx, {"ref": l}) for l in links]
        self.graph = self.construct_graph(nodes, edges, 0)
        self.root = self.identify_root()

    def construct_graph(self, nodes: list[Component], edges: list[Link]) -> nx.classes.digraph:
        g = nx.DiGraph()
        g.add_nodes_from(nodes)
        g.add_edges_from(edges)
        g.graph["name"] = self.name
        #check that it's acyclic
        assert nx.is_directed_acyclic_graph(g), "Graph must be a tree (acyclic), check definition"

        return g

    """
    Identify and return the root node of the execution graph
    """
    def identify_root(self,):
        od = list(self.graph.out_degree)
        roots = list(filter(lambda x: x[1] == 0, od))
        assert len(roots) == 1, "More than 1 root identified"
        node = roots[0][0]
        return node
    
    """
    Retrieve the nodes which are the parents of a given component
    """
    def get_predecessors(self, node: Component) -> list[Component]:
        up = list(self.graph.predecessors(node.name))
        components = [u["ref"] for u in up]
        return components
    
    """
    Make a copy of a graph without attributes (names only) - for plotting
    """
    def lean_copy(self):
        graph = self.graph
        lean_copy = nx.DiGraph()
        for n in list(graph.nodes):
            lean_copy.add_node(n)

        for e in list(graph.edges):
            lean_copy.add_edge(*e)

        return lean_copy
    
    def __call__(self) -> 'ExecutionGraph':
        #Get the root processing node and propagate calls recursively up from it
        new_nodes = flatten_tuple(self.root(self))
        #update the link information given the new nodes
        new_links = [link(new_nodes) for link in self.links]
        #create the new execution graph
        exg = ExecutionGraph(self.name, new_nodes, new_links, self.iteration + 1)
        return exg


In [None]:
class System(ABC):
    def __init__(self, name: str, exec_graphs: list[ExecutionGraph], iter: int = 0):
        self.name = name
        self.exec_graphs = exec_graphs

    def __call__(self) -> 'System':
        new_graphs = [graph() for graph in self.exec_graphs]
        new_system = System(self.name, new_graphs, self.iter + 1)
        return new_system