In [1]:
import simpy
import random
from collections import Counter

import copy

In [2]:
class Item:
    
    def __init__(self, source_id, pipeline, frame_num=None):
        self.source_id = source_id
        self.pipeline = pipeline
        self.frame_num = frame_num
        self._pos = 0
        
    @property
    def work_type(self):
        if self._pos < len(self.pipeline):
            return self.pipeline[self._pos]
        else:
            return None
        
    def next_work(self):
        self._pos += 1
        return self
    
    def __repr__(self):
        return f"<source: {self.source_id}, frame: {self.frame_num}>"

        
class Context:
    
    def __init__(self, env, worker, source_id, work_type, compute_time):
        self.env = env
        self.worker = worker
        self.source_id = source_id
        self.work_type = work_type
        self.compute_time = compute_time
        self.input_buffer = simpy.Store(env)
        self.output_buffer = simpy.Store(env)
        self.process = env.process(self.run())
        
    def put(self, item):
        self.input_buffer.put(item)
        
    def run(self):
        while True:
            try:
                item = yield self.input_buffer.get()
                assert item.work_type == self.work_type and item.source_id == self.source_id
                with self.worker.compute.request() as compute:
                    yield compute
                    yield self.env.timeout(self.compute_time)
                    item = item.next_work()
                    self.worker.output_buffer.put(item)
            except:
                return
            
    def kill(self):
        self.process.interrupt()


class Worker:
    _counter = 0
    _instances = {}
    
    def __init__(self, env, coordinator, ability, compute_capacity):
        self.env = env
        self.coordinator = coordinator
        self.ability = ability
        self.compute = simpy.Resource(env, capacity=compute_capacity)
        self.input_buffer = simpy.Store(env)
        self.output_buffer = simpy.Store(env)
        self.contexts = {}
        self.routing_table = {}
        
        self.id = self._counter
        self._instances[self._counter] = self
        self.__class__._counter += 1
        
        self.process = env.process(self.run())
        
    def run(self):
        inp = self.input_buffer.get()
        out = self.output_buffer.get()
        while True:
            try:
                res = yield inp | out
                if inp in res:
                    item = res[inp]
                    context = self.contexts.get((item.source_id, item.work_type))
                    if context is None:
                        self.add_context(item.source_id, item.work_type)
                        context = self.contexts[(item.source_id, item.work_type)]
                    context.put(item)
                    inp = self.input_buffer.get()
                if out in res:
                    item = res[out]
                    source_id, work_type = item.source_id, item.work_type
                    if work_type is None:
                        self.coordinator.fully_processed_items.append(item)
                    else:
                        receiver_id = self.routing_table.get((source_id, work_type))
                        if receiver_id is None:
                            receiver_id = self.coordinator.request_route(self.id, source_id, work_type)
                            self.routing_table[(source_id, work_type)] = receiver_id
                        self.__class__.send_item(to=receiver_id, item=item)
                    out = self.output_buffer.get()
            except:
                return
                
    def kill(self):
        for context in self.contexts.values():
            context.kill()
        self.process.interrupt()
            
    def put(self, item):
        self.input_buffer.put(item)
            
    def add_context(self, source_id, work_type):
        assert work_type in self.ability
        context = Context(self.env, self, source_id, work_type, self.ability[work_type])
        self.contexts[(source_id, work_type)] = context
        return context
        
    def remove_context(self, source_id, work_type=None):
        work_types = [
            work_type_ for source_id_, work_type_ in self.contexts.keys()
            if source_id_==source_id
        ] if work_type is None else [work_type]
        for work_type in work_types:
            context = self.contexts.pop((source_id, work_type))
            context.process.interrupt()
        
    @classmethod
    def send_item(cls, to, item):
        cls._instances[to].put(item)
        
    @classmethod
    def move_context(cls, to, fr, source_id, work_type):
        src_worker = Worker._instances[fr]
        dst_worker = Worker._instances[to]
        src_context = src_worker.contexts[(source_id, work_type)]
        
        dst_context = dst_worker.add_context(source_id, work_type)
        for item in src_context.input_buffer.items:
            dst_context.put(item)
        src_worker.remove_context(source_id, work_type)
        
        
class Source:
    _counter = 0
    _instances = {}
    
    def __init__(self, env, pipeline, receiver_id=None):
        self.env = env
        self.pipeline = pipeline
        self.receiver_id = receiver_id
        
        self.id = self._counter
        self._instances[self._counter] = self
        self.__class__._counter += 1
        
        self.frame_counter = 0
        
        self.process = env.process(self.run())

    def run(self):
        while True:
            try:
                n_frames = random.randint(1, 30)
                for _ in range(n_frames):
                    if self.receiver_id is not None:
                        Worker.send_item(
                            to=self.receiver_id,
                            item=Item(self.id, copy.copy(self.pipeline), self.frame_counter)
                        )
                        self.frame_counter += 1
                    yield self.env.timeout(1.0)
                idle = 30*random.random()
                yield self.env.timeout(idle)
            except simpy.Interrupt:
                return
            
    def kill(self):
        self.process.interrupt()
        
class Coordinator:
    
    def __init__(self, env):
        self.env = env
        self.workers = []
        self.sources = []
        self.routing_table = {}
        self.fully_processed_items = []
        
        self.process = env.process(self.monitor(5.))
        
    def monitor(self, timeout):
        while True:
            yield self.env.timeout(timeout)
            print("----------")
            print(f"Now: {self.env.now}")
            print(f"Total queue size: {self.total_queue_size}")
            print(f"Total processed: {self.total_processed}")
        
    def get_able_worker_id(self, work_type):
        able_worker_ids = [
            worker.id for worker in self.workers
            if work_type in worker.ability
        ]
        num_jobs = Counter([path["dst"] for path in self.routing_table.values()])
        num_jobs.update([source.receiver_id for source in self.sources])
        worker_id, _ = min(
            [(worker_id, num_jobs[worker_id]) for worker_id in able_worker_ids],
            key=lambda pair: pair[1]
        )
        return worker_id
        
    def add_source(self, pipeline):
        receiver_id = self.get_able_worker_id(pipeline[0])
        source = Source(self.env, pipeline, receiver_id)
        self.sources.append(source)
        
    def remove_source(self, source_id):
        source = Source._instances[source_id]
        source.kill()
        self.sources.remove(source)
        for worker in self.workers:
            worker.remove_context(source.id)
        for work_type in source.pipeline:
            if (source.id, work_type) in self.routing_table:
                self.routing_table.pop((source.id, work_type))
        
    def add_worker(self, ability, compute_capacity):
        worker = Worker(self.env, self, ability, compute_capacity)
        self.workers.append(worker)
        
    def remove_worker(self, worker_id):
        worker = Worker._instances[worker_id]
        worker.kill()
        self.workers.remove(worker)
        keys = [key for key, path in self.routing_table.items() if worker_id in path.values()]
        for key in keys:
            self.routing_table.pop(key)
        for source in self.sources:
            if source.receiver_id == worker_id:
                source.receiver_id = self.get_able_worker_id(source.pipeline[0])
        
    def request_route(self, worker_id, source_id, work_type):
        path = self.routing_table.get((source_id, work_type))
        if path:
            assert path["src"] == worker_id
            return path["dst"]
        else:
            receiver_id = self.get_able_worker_id(work_type)
            path = {"src": worker_id, "dst": receiver_id}
            self.routing_table[(source_id, work_type)] = path
            return receiver_id

    @property
    def processing_chains(self):
        chains = []
        for source in self.sources:
            chain = [f's{source.id}']
            worker_id = source.receiver_id
            chain.append(f'w{worker_id}')
            for work_type in source.pipeline[1:]:
                if (source.id, work_type) in self.routing_table:
                    worker_id = self.routing_table[(source.id, work_type)]['dst']
                    chain.append(f'w{worker_id}')
                else:
                    choin.append('*')
            chain = ' -> '.join(chain)
            chains.append(chain)
        return chains
    
    @property
    def total_queue_size(self):
        queue_size = 0
        for worker in self.workers:
            for context in worker.contexts.values():
                queue_size += len(context.input_buffer.items)
        return queue_size
    
    @property
    def total_processed(self):
        return len(self.fully_processed_items)

In [3]:
env = simpy.Environment()
coordinator = Coordinator(env)
coordinator.add_worker({'preprocess': 0.5}, compute_capacity=1)
coordinator.add_worker({'preprocess': 0.5}, compute_capacity=1)
coordinator.add_worker({'preprocess': 0.5}, compute_capacity=1)
coordinator.add_worker({'process': 1}, compute_capacity=3)
coordinator.add_worker({'process': 1}, compute_capacity=3)
coordinator.add_worker({'postprocess': 0.5}, compute_capacity=1)
coordinator.add_worker({'postprocess': 0.5}, compute_capacity=1)


coordinator.add_source(('preprocess', 'process'))
coordinator.add_source(('preprocess', 'process'))
env.run(until=20)
coordinator.add_source(('preprocess', 'process', 'postprocess'))
coordinator.add_source(('preprocess', 'process', 'postprocess'))
env.run(until=40)
coordinator.add_source(('preprocess', 'postprocess'))
coordinator.add_source(('preprocess', 'postprocess'))
coordinator.add_source(('preprocess', 'postprocess'))
coordinator.add_source(('preprocess', 'postprocess'))
env.run(until=60)
coordinator.remove_source(0)
print("----------")
print("Source 0 removed")
env.run(until=80)
print("----------")
print("Routing:")
print(*coordinator.processing_chains, sep='\n')
coordinator.remove_worker(2)
coordinator.remove_worker(3)
print("----------")
print("Workers 2, 3 removed.")
env.run(until=100)
print("----------")
print("Routing:")
print(*coordinator.processing_chains, sep='\n')
coordinator.remove_worker(1)
print("----------")
print("Worker 1 removed.")
env.run(until=140)
print("----------")
print("Routing:")
print(*coordinator.processing_chains, sep='\n')

----------
Now: 5.0
Total queue size: 0
Total processed: 8
----------
Now: 10.0
Total queue size: 0
Total processed: 18
----------
Now: 15.0
Total queue size: 0
Total processed: 28
----------
Now: 20.0
Total queue size: 0
Total processed: 36
----------
Now: 25.0
Total queue size: 0
Total processed: 47
----------
Now: 30.0
Total queue size: 1
Total processed: 61
----------
Now: 35.0
Total queue size: 1
Total processed: 70
----------
Now: 40.0
Total queue size: 1
Total processed: 76
----------
Now: 45.0
Total queue size: 3
Total processed: 98
----------
Now: 50.0
Total queue size: 5
Total processed: 124
----------
Now: 55.0
Total queue size: 4
Total processed: 146
----------
Source 0 removed
----------
Now: 60.0
Total queue size: 5
Total processed: 172
----------
Now: 65.0
Total queue size: 3
Total processed: 192
----------
Now: 70.0
Total queue size: 0
Total processed: 209
----------
Now: 75.0
Total queue size: 0
Total processed: 219
----------
Routing:
s1 -> w1 -> w4
s2 -> w2 -> w3 -> 