# Container DAG


https://docker-py.readthedocs.io/en/stable/index.html

https://docs.docker.com/reference/cli/docker/

In [1]:
from abc import ABCMeta, abstractmethod
import json
import random
import subprocess
import sys

from docker import from_env

In [3]:
sys.path.append("../../src")

from dockweed.topological_sort import kahns_algorithm
from dockweed.process import *
from dockweed.container import *

## Open Docker client

In [4]:
client = from_env()

### Get container processes

In [5]:
containers = []
processes = {}
# Loop through images. 
# Exclude intermediate layer images (all=False) and dangling images.
for image in client.images.list(all=False, filters={"dangling":False}):
    
    # Image tag
    tags = image.tags
    if not tags:
        continue
    image_tag = tags[0]
    
    # Labels with node names and commands.
    labels = image.attrs['ContainerConfig']['Labels']
    if labels is None:
        continue
    process_cmd = {name: cmd for name, cmd in labels.items() if name.startswith("process.")}
    if not process_cmd:
        continue
        
    # Processes
    for name, cmd in process_cmd.items():
        # Container should have no entry point, or this will call the entrypoint rather than label command.
        result = client.containers.run(image=image_tag, command=cmd, remove=True, stdout=True)
        try:
            dic = json.loads(result)
            if "description" in dic and "outputs" in dic and "inputs" in dic:
                
                if not containers or containers[-1].image != image_tag:
                    container = NodeContainer(image_tag=tags[0])
                    containers.append(container) 
                
                processes[name] = ContainerProcess(
                    name = name,
                    description = dic["description"],
                    container = containers[-1].name,
                    command = cmd,
                    inputs = dic["inputs"],
                    outputs = dic["outputs"]
                )    
        except json.decoder.JSONDecodeError:
            continue
    
print([container.name for container in containers])
print([process.name for process in processes.values()])

['multiple_divide', 'add_subtract']
['process.c_divide', 'process.c_multiply', 'process.python_add', 'process.python_subtract']


### Start containers

In [6]:
for container in containers:
    container.start()

9d4ec7a8cac5d310aead4f08218f8509d7d417656a96620bde3a716ff74c3d54
7ce082fd040b1e45f0f37e61ea6bf55c38efe0d2041b06320f2ccb297d170c38


### Add generator processes

In [7]:
processes["random float"] = UniformFloatProcess()
processes["random choice"] = RandomChoice()

### Information about processes

In [8]:
for i, process in enumerate(processes.values()):
    print("-"*30)
    print(f"#{i} {process.name}")
    print(f"\t{process.description}")
    print(f"\tinputs = {process.inputs}")
    print(f"\toutputs = {process.outputs}")

------------------------------
#0 process.c_divide
	Division. z = a / b
	inputs = {'a': 1.0, 'b': 1.0}
	outputs = {'z': 1.0}
------------------------------
#1 process.c_multiply
	Multiplication. z = a * b
	inputs = {'a': 1.0, 'b': 1.0}
	outputs = {'z': 1.0}
------------------------------
#2 process.python_add
	Addition. z = x + y.
	inputs = {'x': 1.0, 'y': 1.0}
	outputs = {'z': 1.0}
------------------------------
#3 process.python_subtract
	Addition. z = x + y.
	inputs = {'x': 1.0, 'y': 1.0}
	outputs = {'z': 1.0}
------------------------------
#4 random float
	A uniformly distributed random float.
	inputs = {'min': 0.0, 'max': 1.0}
	outputs = {'n': 0.5}
------------------------------
#5 random choice
	A rnadom choice.
	inputs = {'choices': [1, 2, 3, 4]}
	outputs = {'choice': 1}


### Run individual processes

In [9]:
processes["process.python_add"].run({'x': 2.34, 'y': 10.78})

{'z': 13.12}

In [10]:
processes["process.c_multiply"].run({'a': 89.987, 'b': 10.0})

{'x': 899.8699951171875}

In [12]:
for _ in range(10):
    x = processes["random choice"].run({'choices': [3, 4, 8]})
    print(x)

{'choice': 3}
{'choice': 4}
{'choice': 3}
{'choice': 3}
{'choice': 8}
{'choice': 8}
{'choice': 8}
{'choice': 3}
{'choice': 8}
{'choice': 8}


### Run a graph of processes

#### Specify the graph

In [85]:
graph_specification = {
    "node alpha": {
        "process": "random choice",
        "inputs": {'choices': [1.2, 5.4, 6.7]}
    },
    "node beta": {
        "process": "random float",
        "inputs": {"min": -5.0, "max": 5.0}
    },
    "node a": {
        "process": "process.python_add",
        "inputs": {'x': ("node alpha", "choice"), 'y': 16.5}
    },
    "node b": {
        "process": "process.python_add",
        "inputs": {'x': ("node a", "z"), 'y': ("node beta", "n")}
    },
    "node c": {
        "process": "process.c_divide",
        "inputs": {'a': ("node a", "z"), 'b': ("node b", "z")}
    },
}

In [97]:
graph_specification = {
    "node a": {
        "process": "process.python_add",
        "inputs": {'x': 2.1, 'y': 16.5}
    },
    "node b": {
        "process": "process.python_add",
        "inputs": {'x': ("node a", "z"), 'y': 5.4}
    },
    "node c": {
        "process": "process.c_divide",
        "inputs": {'a': ("node a", "z"), 'b': ("node b", "z")}
    },
}

In [102]:
class Graph:
    
    def __init__(self, specification: dict, processes: list):
        
        # Check speification and processes.
        errors = []
        self.specification = {}
        self.processes = {}
        for node, spec in specification.items():
            
            # Process.
            if "process" not in spec:
                errors.append(f"Node {node}: No process is specified.")
                continue
            process_name = spec["process"]
            node_process = [process for process in processes if process.name == process_name]
            if not node_process:
                errors.append(f"Node {node}: There is no process named {process_name}.")
                continue
            self.processes[node] = node_process[0]
           
            # Inputs.
            if "inputs" not in spec:
                errors.append(f"Node {node}: No inputs are specified.")
                continue
            process_variables = set(self.processes[node].inputs.keys())
            spec_variables = set(spec["inputs"].keys())
            if process_variables != spec_variables:
                errors.append(f"Node {node}: The specified inputs do not match the process inputs.")
                continue
            self.specification[node] = spec
                
        if errors:
            raise KeyError("\n".join(errors))
            
        # Topology (edges).
        self.topology = {}
        for node, specification in self.specification.items():
            self.topology[node] = [
                value[0]
                for name, value in specification["inputs"].items()
                if isinstance(value, tuple) and len(value) == 2
            ]
            
        # Topological order.
        self.node_order = kahns_algorithm(self.topology, incoming=True)
        if not self.node_order:
            raise Exception("The graph is cyclic!")
            
        # Optimization.
        self.free_inputs = None
        self.optimize_on = None
        
    def run(self):
        
        inputs = {}
        outputs = {}
        for node in self.topology:
            # Substitute tuple inputs with outputs from prior nodes.
            inputs[node] = self.specification[node]["inputs"].copy()
            for input_name, value in inputs[node].items():
                if not isinstance(value, tuple):
                    continue
                try:
                    incoming_node = value[0]
                    incoming_variable = value[1]
                    incoming_value = outputs[incoming_node][incoming_variable]
                    inputs[node][input_name] = incoming_value
                except KeyError:
                    raise KeyError(f"Invalid inputs for")
        
            # Execute, storing the outputs.
            outputs[node] = self.processes[node].run(inputs[node])
            
        return inputs, outputs
        
        
    def set_optimisation_parameters(self, free_inputs: list, optimise_on: tuple):
        
        self.free_inputs = free_inputs
        self.optimize_on = optimise_on
        
        errors = []
        for node, variable in free_inputs:
            if node not in self.specification:
                errors.append(f"Node {node}: There is no such node.")
            elif variable not in self.specification[node]["inputs"]:
                errors.append(f"Node {node}: There is no input called {variable}.")
            elif isinstance(self.specification[node]["inputs"][variable], tuple):
                errors.append(f"Node {node}: Input {variable} is an edge.")
                
        node, variable = optimise_on
        if node not in self.processes:
            errors.append(f"Node {node}: There is no such node.")
        elif variable not in self.processes[node].outputs:
            errors.append(f"Node {node}: There is no output called {variable}.")
        
        if errors:
            raise KeyError("\n".join(errors))
            
    
    def optimize_func(*vargs):
        
        if len(vargs) < len(self.free_inputs):
            raise ValueError("There are not enough variables to optimize.")
        
        for i, (node, variable) in enumerate(self.free_inputs):
            self.specification[node]["inputs"][variable] = vargs[i]
            
        _, outputs = self.run()
        node, variable = self.optimize_on
        return outputs[node][variable]
        
        

graph = Graph(graph_specification, list(processes.values()))

In [99]:
inputs, outputs = graph.run()
print(inputs)
print("="*20)
print(outputs)

{'node a': {'x': 2.1, 'y': 16.5}, 'node b': {'x': 18.6, 'y': 5.4}, 'node c': {'a': 18.6, 'b': 24.0}}
{'node a': {'z': 18.6}, 'node b': {'z': 24.0}, 'node c': {'x': 0.7750000357627869}}


In [105]:
graph.set_optimisation_parameters(
    free_inputs = [("node a", "x"), ("node b", "y")],
    optimise_on = ("node c", "z")
)

### Stop containers

In [16]:
for container in containers:
    container.stop()

multiple_divide
add_subtract


In [None]:
# echo '{"x": 11.98769, "y": 186.78}' | docker exec -i some-node-a python run.py


## Close client

In [42]:
client.close()