# Entity Flow Examples




## Objects Definitions

In [27]:
# Entity Flow Models.

import simpy
import numpy as np
from typing import Callable
import matplotlib.pyplot as plt
import pandas as pd


class Entity():
    """Abstract object that flows throw the system"""
    def __init__(self, id, create_time=None):
        self.id = id
        self.created_time = create_time
        self.deleted_time = None




class Queu():
    """Stores entities with specific get and put policy."""
    def __init__(self, env:simpy.Environment,name, capacity=float('inf')):
        self.env = env
        self.name = name
        self.store = simpy.Store(env,capacity=capacity)

    def put(self, entity:Entity):         
        yield self.store.put(entity)
        print(f"{self.env.now} - entity{entity.id} put in {self.name}")


    def get(self):
        e = yield self.store.get()
        print(f"{self.env.now} - entity{e.id} get from {self.name}")
        return e



class Source():
    """Creates entities with an interval arrival tiem"""

    def __init__(self, env:simpy.Environment, name:str, interarrival_time:Callable, max_quantity=float("inf")):
        self.env = env
        self.name = name
        self.interarrival_time = interarrival_time
        self.max_quantity = max_quantity
        self.total_created = 0

        self._output = None

        self.action = env.process(self.run())

    @property
    def output(self):
        return self._output

    @output.setter
    def output(self, output):
        if callable(getattr(output, "put", None)):
            self._output = output
        else:
            raise Exception("Output must have a put() method")

    def create_entity(self):
        e = Entity(id=self.total_created,create_time=self.env.now)
        return e

    def run(self):
        while True:
            yield self.env.timeout(self.interarrival_time)
            e = self.create_entity()
            print(f"{self.env.now} - entity{e.id} generated in {self.name}")
            self.total_created += 1
            try:
                yield self.env.process(self._output.put(e))
            except:
                Exception("An output must be assigned")

    
        

class Sink():
    """Delete entities and records it."""
    def __init__(self, env:simpy.Environment, name:str):
        self.env = env
        self.name = name
        self.sincked_entities = Queu(env, name=f"{name}_queu")
        self.total_sinked = 0
        
    def put(self, entity:Entity):
        print(f"{self.env.now} - entity{entity.id} sinked in {self.name}")
        yield self.env.process(self.sincked_entities.put(entity))
        entity.deleted_time = self.env.now
        self.total_sinked += 1
        


class Server():
    """Delay entities for a given time"""
    def __init__(self, env:simpy.Environment, name:str, server_time=10):
        self.env = env
        self.name = name
        self.server_time = server_time
        self.queu = Queu(env, name=f"{name}_queu")

        self.total_processed = 0
        self._output = None

        self.action = env.process(self.run())

    def run(self):
        while True:
            e = yield self.env.process(self.queu.get())
            yield self.env.timeout(self.server_time)
            print(f"{self.env.now} - entity{e.id} processed in {self.name}")
            try:
                yield self.env.process(self._output.put(e))
                self.total_processed += 1
            except:
                Exception("An output must be assigned")

    def put(self, entity):
        yield self.env.process(self.queu.put(entity))
        # self.queu.put(entity)

    @property
    def output(self):
        return self._output

    @output.setter
    def output(self, output):
        if callable(getattr(output, "put", None)):
            self._output = output
        else:
            raise Exception("Output must have a put() method")
        


class Brancher():
    """Split entities in out branches by a condition"""
    def __init__(self, env:simpy.Environment,name:str, probs=[0.3,0.7]):
        self.env = env
        self.name = name
        self.probs = probs
        self.ranges = [sum(probs[0:n+1]) for n in range(len(probs))]
        self._output = [None for i in range(len(probs))]

    @property
    def output(self):
        return self._output

    def set_output(self, idx, output):
        if callable(getattr(output, "put", None)):
            self._output[idx] = output
        else:
            raise Exception("Output must have a put() method")


    def put(self, entity):
        rand = np.random.random()
        for i in range(len(self.probs)):
            if rand <= self.ranges[i]:
                print(f"{self.env.now} - entity{entity.id} assigned to output {i} in {self.name}")
                yield self.env.process(self._output[i].put(entity))



class Batcher():
    """Waits for a quantity of an entity and ouputs a new batched entity"""
    def __init__(self, env:simpy.Environment, name:str, size=3):
        self.env = env
        self.name = name
        self.size = size
        self.queu = Queu(env, name=f"{name}_queu")
        self.batched = Queu(env, name=f"{name}_queu")
        self._output = None
        self.n_bached = 0
        self.action = env.process(self.run())

    @property
    def output(self):
        return self._output

    @output.setter
    def output(self, output):
        if callable(getattr(output, "put", None)):
            self._output = output
        else:
            raise Exception("Output must have a put() method")

    def run(self):
        while True:
            batch = []
            while len(batch)<self.size:
                e = yield self.env.process(self.queu.get())
                print(f"{self.env.now} - entity{e.id} added to batch in {self.name}")
                batch.append(e)

            be = Entity(id=f'b{self.n_bached}')
            self.n_bached += 1
            print(f"{self.env.now} - batch entity {be.id} created in {self.name}")  
            try:
                yield self.env.process(self._output.put(be))
            except:
                Exception("An output must be assigned")


    def put(self, entity):
        yield self.env.process(self.queu.put(entity))



class EntityTyped(Entity):
    def __init__(self, id, create_time=None, type='A'):
        super().__init__(id, create_time)
        self.type=type

class SourceTyped(Source):
    def __init__(self, env:simpy.Environment, name:str, interarrival_time:Callable, max_quantity=float("inf"), type="A"):
        super().__init__(env,name,interarrival_time,max_quantity)
        self.type = type
    
    def create_entity(self):
        e = EntityTyped(id=self.total_created,create_time=self.env.now,type=self.type)
        return e


class AttributeBrancher():

    """Split entities in out branches by an entity attribute"""
    def __init__(self, env:simpy.Environment,name:str, attr = "type", values=['A','B']):
        self.env = env
        self.name = name
        self.attr = attr
        self.values = values
        self._output = [None for i in range(len(values))]

    @property
    def output(self):
        return self._output

    def set_output(self, idx, output):
        if callable(getattr(output, "put", None)):
            self._output[idx] = output
        else:
            raise Exception("Output must have a put() method")

    def put(self, entity):
        for i in range(len(self.values)):
            if getattr(entity,self.attr) == self.values[i]:
                print(f"{self.env.now} - entity{entity.id} {self.attr}={self.values[i]} assigned to output {i} in {self.name}")
                yield self.env.process(self._output[i].put(entity))

## 01-One Server System

![image](../figures/entity_flow_1.png)

In [16]:
# one server
env = simpy.Environment()

source = Source(env=env, name="source", interarrival_time=5)
server = Server(env=env, name='server', server_time=10)
sink = Sink(env=env, name='sink')

# build conections
source.output = server
server.output = sink
env.run(until=100)

5 - entity0 generated in source
5 - entity0 put in server_queu
5 - entity0 get from server_queu
10 - entity1 generated in source
10 - entity1 put in server_queu
15 - entity0 processed in server
15 - entity0 sinked in sink
15 - entity2 generated in source
15 - entity0 put in sink_queu
15 - entity2 put in server_queu
15 - entity1 get from server_queu
20 - entity3 generated in source
20 - entity3 put in server_queu
25 - entity1 processed in server
25 - entity1 sinked in sink
25 - entity4 generated in source
25 - entity1 put in sink_queu
25 - entity4 put in server_queu
25 - entity2 get from server_queu
30 - entity5 generated in source
30 - entity5 put in server_queu
35 - entity2 processed in server
35 - entity2 sinked in sink
35 - entity6 generated in source
35 - entity2 put in sink_queu
35 - entity6 put in server_queu
35 - entity3 get from server_queu
40 - entity7 generated in source
40 - entity7 put in server_queu
45 - entity3 processed in server
45 - entity3 sinked in sink
45 - entity8 

## 02-Two sequential server System

![image](../figures/entity_flow_2.png)

In [None]:
# two servers
env = simpy.Environment()

source = Source(env=env, name="source", interarrival_time=5)
server1 = Server(env=env, name='server1', server_time=10)
server2 = Server(env=env, name='server2', server_time=10)
sink = Sink(env=env, name='sink')

# build conections
source.output = server1
server1.output = server2
server2.output = sink
env.run(until=100)

In [None]:
print(server1.total_processed)
print(server2.total_processed)
print(sink.total_sinked)

## 03-Branched System

![image](../figures/entity_flow_3.png)

In [None]:
# three servers with a branch.

env = simpy.Environment()

source = Source(env=env, name="source", interarrival_time=5)

server1 = Server(env=env, name='server1', server_time=10)
server2 = Server(env=env, name='server2', server_time=5)
server3 = Server(env=env, name='server3', server_time=5)

brancher = Brancher(env=env, name='brancher', probs=[0.3, 0.7])

sink1 = Sink(env=env, name='sink1')
sink2 = Sink(env=env, name='sink2')

# build conections
source.output = server1
server1.output = brancher
brancher.set_output(0, server2)
brancher.set_output(1, server3)
server2.output = sink1
server3.output = sink2

env.run(until=100)

In [None]:
print("Server 1: ",server1.total_processed)
print("Server 2: ",server2.total_processed)
print("Server 3: ",server3.total_processed)
print("Sink 1: ", sink1.total_sinked)
print("Sink 2: ",sink2.total_sinked)

## 04-Branched and joined system

![image](../figures/entity_flow_4.png)

In [None]:
# four servers with a branch and a final server.

env = simpy.Environment()

source = Source(env=env, name="source", interarrival_time=5)

server1 = Server(env=env, name='server1', server_time=10)
server2 = Server(env=env, name='server2', server_time=5)
server3 = Server(env=env, name='server3', server_time=5)
server4 = Server(env=env, name='server4', server_time=10)

brancher = Brancher(env=env, name='brancher', probs=[0.3, 0.7])

sink = Sink(env=env, name='sink1')

# build conections
source.output = server1
server1.output = brancher
brancher.set_output(0, server2)
brancher.set_output(1, server3)
server2.output = server4
server3.output = server4
server4.output = sink
env.run(until=100)

In [None]:
print("Server 1: ",server1.total_processed)
print("Server 2: ",server2.total_processed)
print("Server 3: ",server3.total_processed)
print("Server 4: ",server4.total_processed)
print("Sink: ", sink.total_sinked)

## 05-Batched entity

![image](../figures/entity_flow_5.png)

In [14]:
# batcher
env = simpy.Environment()

source = Source(env=env, name="source", interarrival_time=5)
batcher = Batcher(env=env, name='batcher',size=3)
server = Server(env=env, name='server2', server_time=10)
sink = Sink(env=env, name='sink')

# build conections
source.output = batcher
batcher.output = server
server.output = sink
env.run(until=100)

5 - entity0 generated in source
5 - entity0 put in batcher_queu
5 - entity0 get from batcher_queu
5 - entity0 added to batch in batcher
10 - entity1 generated in source
10 - entity1 put in batcher_queu
10 - entity1 get from batcher_queu
10 - entity1 added to batch in batcher
15 - entity2 generated in source
15 - entity2 put in batcher_queu
15 - entity2 get from batcher_queu
15 - entity2 added to batch in batcher
15 - batch entity b0 created in batcher
15 - entityb0 put in server2_queu
15 - entityb0 get from server2_queu
20 - entity3 generated in source
20 - entity3 put in batcher_queu
20 - entity3 get from batcher_queu
20 - entity3 added to batch in batcher
25 - entityb0 processed in server2
25 - entityb0 sinked in sink
25 - entity4 generated in source
25 - entityb0 put in sink_queu
25 - entity4 put in batcher_queu
25 - entity4 get from batcher_queu
25 - entity4 added to batch in batcher
30 - entity5 generated in source
30 - entity5 put in batcher_queu
30 - entity5 get from batcher_que

## 05-Type Branched entity

![image](../figures/entity_flow_6.png)

In [32]:

env = simpy.Environment()

source1 = SourceTyped(env=env, name="source", interarrival_time=5, type="A")
source2 = SourceTyped(env=env, name="source", interarrival_time=10, type="B")

brancher = AttributeBrancher(env=env,name='brancher', attr='type',values=['A','B'])


server1 = Server(env=env, name='server1', server_time=10)
server2 = Server(env=env, name='server2', server_time=5)
server3 = Server(env=env, name='server3', server_time=5)

sink = Sink(env=env, name='sink')

# build conections
source1.output = server1
source2.output = server1
server1.output = brancher
brancher.set_output(0, server2)
brancher.set_output(1, server3)
server2.output = sink
server3.output = sink
env.run(until=100)

5 - entity0 generated in source
5 - entity0 put in server1_queu
5 - entity0 get from server1_queu
10 - entity0 generated in source
10 - entity1 generated in source
10 - entity0 put in server1_queu
10 - entity1 put in server1_queu
15 - entity0 processed in server1
15 - entity0 type=A assigned to output 0 in brancher
15 - entity2 generated in source
15 - entity0 put in server2_queu
15 - entity2 put in server1_queu
15 - entity0 get from server2_queu
15 - entity0 get from server1_queu
20 - entity1 generated in source
20 - entity0 processed in server2
20 - entity0 sinked in sink
20 - entity3 generated in source
20 - entity1 put in server1_queu
20 - entity0 put in sink_queu
20 - entity3 put in server1_queu
25 - entity0 processed in server1
25 - entity0 type=B assigned to output 1 in brancher
25 - entity4 generated in source
25 - entity0 put in server3_queu
25 - entity4 put in server1_queu
25 - entity0 get from server3_queu
25 - entity1 get from server1_queu
30 - entity2 generated in source
3