From 7b1bb63dc600ab143aa31b2051be6689bd819c53 Mon Sep 17 00:00:00 2001 From: Evangelos Vazaios Date: Tue, 24 Jul 2018 15:14:45 +0100 Subject: [PATCH] Add desmod.pool.Pool for modeling pool of resources Pool offers a similar behavior to simpy.Container with some additional events: * Pool.when_any() when the pool is non-empty * Pool.when_new() when items are inserted in pool * Pool.when_full() when the pool is full Pool is to simpy.Container what Queue is to simpy.Store --- desmod/pool.py | 197 +++++++++++++++++++++++++++++++++++++++++++ desmod/probe.py | 22 +++++ desmod/tracer.py | 5 +- tests/test_pool.py | 138 ++++++++++++++++++++++++++++++ tests/test_tracer.py | 3 + 5 files changed, 363 insertions(+), 2 deletions(-) create mode 100644 desmod/pool.py create mode 100644 tests/test_pool.py diff --git a/desmod/pool.py b/desmod/pool.py new file mode 100644 index 00000000..58f53ec5 --- /dev/null +++ b/desmod/pool.py @@ -0,0 +1,197 @@ +"""Pool class for modeling a container of resources. + +A pool models a container of items or resources. Pool is similar to the :class: +`simpy.resources.Container`, but with additional events when the Container is +empty or full. Users can put or get items in the pool with a certain amount as +a parameter. +""" + +from simpy import Event +from simpy.core import BoundClass + + +class PoolPutEvent(Event): + def __init__(self, pool, amount=1): + super(PoolPutEvent, self).__init__(pool.env) + self.pool = pool + self.amount = amount + self.callbacks.append(pool._trigger_get) + pool._putters.append(self) + pool._trigger_put() + + def cancel(self): + if not self.triggered: + self.pool._putters.remove(self) + self.callbacks = None + + +class PoolGetEvent(Event): + def __init__(self, pool, amount=1): + super(PoolGetEvent, self).__init__(pool.env) + self.pool = pool + self.amount = amount + self.callbacks.append(pool._trigger_put) + pool._getters.append(self) + pool._trigger_get() + + def cancel(self): + if not self.triggered: + self.pool._getters.remove(self) + self.callbacks = None + + +class PoolWhenNewEvent(Event): + def __init__(self, pool): + super(PoolWhenNewEvent, self).__init__(pool.env) + self.pool = pool + pool._new_waiters.append(self) + + def cancel(self): + if not self.triggered: + self.pool._new_waiters.remove(self) + self.callbacks = None + + +class PoolWhenAnyEvent(Event): + def __init__(self, pool): + super(PoolWhenAnyEvent, self).__init__(pool.env) + self.pool = pool + pool._any_waiters.append(self) + pool._trigger_when_any() + + def cancel(self): + if not self.triggered: + self.pool._any_waiters.remove(self) + self.callbacks = None + + +class PoolWhenFullEvent(Event): + def __init__(self, pool): + super(PoolWhenFullEvent, self).__init__(pool.env) + self.pool = pool + pool._full_waiters.append(self) + pool._trigger_when_full() + + def cancel(self): + if not self.triggered: + self.pool._full_waiters.remove(self) + self.callbacks = None + + +class Pool(object): + """Simulation pool of discrete or continuous resources. + + `Pool` is similar to :class:`simpy.resources.Container`. + It provides a simulation-aware container for managing a shared pool of + resources. The resources can be either discrete objects (like apples) or + continuous (like water). + + Resources are added and removed using :meth:`put()` and :meth:`get()`. + + :param env: Simulation environment. + :param capacity: Capacity of the pool; infinite by default. + :param hard_cap: + If specified, the pool overflows when the `capacity` is reached. + :param init_level: Initial level of the pool. + :param name: Optional name to associate with the queue. + + """ + + def __init__(self, env, capacity=float('inf'), hard_cap=False, + init_level=0, name=None): + self.env = env + #: Capacity of the queue (maximum number of items). + self.capacity = capacity + self._hard_cap = hard_cap + self.level = init_level + self.name = name + self._putters = [] + self._getters = [] + self._new_waiters = [] + self._any_waiters = [] + self._full_waiters = [] + self._put_hook = None + self._get_hook = None + BoundClass.bind_early(self) + + @property + def remaining(self): + """Remaining pool capacity.""" + return self.capacity - self.level + + @property + def is_empty(self): + """Indicates whether the pool is empty.""" + return self.level == 0 + + @property + def is_full(self): + """Indicates whether the pool is full.""" + return self.level >= self.capacity + + #: Put amount items in the pool. + put = BoundClass(PoolPutEvent) + + #: Get amount items from the queue. + get = BoundClass(PoolGetEvent) + + #: Return an event triggered when the pool is non-empty. + when_any = BoundClass(PoolWhenAnyEvent) + + #: Return an event triggered when items are put in pool + when_new = BoundClass(PoolWhenNewEvent) + + #: Return an event triggered when the pool becomes full. + when_full = BoundClass(PoolWhenFullEvent) + + def _trigger_put(self, _=None): + if self._putters: + put_ev = self._putters.pop(0) + put_ev.succeed() + self.level += put_ev.amount + if put_ev.amount: + self._trigger_when_new() + self._trigger_when_any() + self._trigger_when_full() + if self._put_hook: + self._put_hook() + if self.level > self.capacity and self._hard_cap: + raise OverflowError() + + def _trigger_get(self, _=None): + if self._getters and self.level: + for get_ev in self._getters: + assert get_ev.amount <= self.capacity, ( + "Amount {} greater than pool's {} capacity {}".format( + get_ev.amount, str(self.name), self.capacity)) + if get_ev.amount <= self.level: + self._getters.remove(get_ev) + self.level -= get_ev.amount + get_ev.succeed(get_ev.amount) + if self._get_hook: + self._get_hook() + else: + break + + def _trigger_when_new(self): + for when_new_ev in self._new_waiters: + when_new_ev.succeed() + del self._new_waiters[:] + + def _trigger_when_any(self): + if self.level: + for when_any_ev in self._any_waiters: + when_any_ev.succeed() + del self._any_waiters[:] + + def _trigger_when_full(self): + if self.level >= self.capacity: + for when_full_ev in self._full_waiters: + when_full_ev.succeed() + del self._full_waiters[:] + + def __str__(self): + return ('Pool: name={0.name}' + ' level={0.level}' + ' capacity={0.capacity}' + ')'.format(self)) diff --git a/desmod/probe.py b/desmod/probe.py index eff4278e..42557505 100644 --- a/desmod/probe.py +++ b/desmod/probe.py @@ -4,6 +4,7 @@ import six from desmod.queue import Queue +from desmod.pool import Pool def attach(scope, target, callbacks, **hints): @@ -23,6 +24,11 @@ def attach(scope, target, callbacks, **hints): _attach_queue_remaining(target, callbacks) else: _attach_queue_size(target, callbacks) + elif isinstance(target, Pool): + if hints.get('trace_remaining', False): + _attach_pool_remaining(target, callbacks) + else: + _attach_pool_level(target, callbacks) else: raise TypeError( 'Cannot probe {} of type {}'.format(scope, type(target))) @@ -124,3 +130,19 @@ def hook(): callback(queue.remaining) queue._put_hook = queue._get_hook = hook + + +def _attach_pool_level(pool, callbacks): + def hook(): + for callback in callbacks: + callback(pool.level) + + pool._put_hook = pool._get_hook = hook + + +def _attach_pool_remaining(pool, callbacks): + def hook(): + for callback in callbacks: + callback(pool.remaining) + + pool._put_hook = pool._get_hook = hook diff --git a/desmod/tracer.py b/desmod/tracer.py index d387fc1c..05c85c92 100644 --- a/desmod/tracer.py +++ b/desmod/tracer.py @@ -12,6 +12,7 @@ from .util import partial_format from .timescale import parse_time, scale_time from .queue import Queue +from .pool import Pool class Tracer(object): @@ -205,7 +206,7 @@ def activate_probe(self, scope, target, **hints): assert self.enabled var_type = hints.get('var_type') if var_type is None: - if isinstance(target, simpy.Container): + if isinstance(target, (simpy.Container, Pool)): if isinstance(target.level, float): var_type = 'real' else: @@ -221,7 +222,7 @@ def activate_probe(self, scope, target, **hints): if k in hints} if 'init' not in kwargs: - if isinstance(target, simpy.Container): + if isinstance(target, (simpy.Container, Pool)): kwargs['init'] = target.level elif isinstance(target, simpy.Resource): kwargs['init'] = len(target.users) if target.users else 'z' diff --git a/tests/test_pool.py b/tests/test_pool.py new file mode 100644 index 00000000..29acbb3c --- /dev/null +++ b/tests/test_pool.py @@ -0,0 +1,138 @@ +from pytest import raises + +from desmod.pool import Pool + + +def test_pool(env): + pool = Pool(env, capacity=2) + + def producer(amount, wait): + yield env.timeout(wait) + yield pool.put(amount) + + def consumer(expected_amount, wait): + yield env.timeout(wait) + msg = yield pool.get(expected_amount) + assert msg == expected_amount + + env.process(producer(1, 0)) + env.process(producer(2, 1)) + env.process(consumer(1, 0)) + env.process(consumer(2, 1)) + env.run() + + +def test_pool_when_full_any(env): + pool = Pool(env, capacity=9) + result = [] + + def producer(env): + yield env.timeout(1) + for i in range(1, 6): + yield pool.put(i) + yield env.timeout(1) + + def consumer(env): + yield env.timeout(5) + for i in range(1, 3): + msg = yield pool.get(i) + assert msg == i + + def full_waiter(env): + yield pool.when_full() + assert env.now == 4 + assert pool.level == 10 + result.append('full') + + def any_waiter(env): + yield pool.when_any() + assert env.now == 1 + result.append('any') + + def new_waiter(env): + yield pool.when_any() + assert env.now == 1 + result.append('new') + + env.process(producer(env)) + env.process(consumer(env)) + env.process(full_waiter(env)) + env.process(any_waiter(env)) + env.process(any_waiter(env)) + env.process(new_waiter(env)) + env.run() + assert pool.level + assert pool.is_full + assert pool.remaining == pool.capacity - pool.level + assert not pool.is_empty + assert 'full' in result + assert 'new' in result + assert result.count('any') == 2 + + +def test_pool_overflow(env): + pool = Pool(env, capacity=5, hard_cap=True) + + def producer(env): + yield env.timeout(1) + for i in range(5): + yield pool.put(i) + yield env.timeout(1) + + env.process(producer(env)) + with raises(OverflowError): + env.run() + + +def test_pool_get_more(env): + pool = Pool(env, capacity=6, name='foo') + + def producer(env): + yield pool.put(1) + yield env.timeout(1) + yield pool.put(1) + + def consumer(env, amount1, amount2): + amount = yield pool.get(amount1) + assert amount == amount1 + amount = yield pool.get(amount2) # should fail + yield amount + + env.process(producer(env)) + env.process(consumer(env, 1, 10)) + with raises(AssertionError, + message="Amount {} greater than pool's {} capacity {}".format( + 10, 'foo', 6)): + env.run() + + +def test_pool_cancel(env): + pool = Pool(env) + + event_cancel = pool.get(2) + event_cancel.cancel() + event_full = pool.when_full() + event_full.cancel() + event_any = pool.when_any() + event_any.cancel() + event_new = pool.when_new() + event_new.cancel() + + env.run() + assert pool.level == 0 + assert not event_cancel.triggered + assert not event_full.triggered + assert not event_any.triggered + assert not event_new.triggered + + +def test_pool_check_str(env): + pool = Pool(env, name='bar', capacity=5) + + def producer(env, amount): + yield env.timeout(1) + yield pool.put(amount) + + env.process(producer(env, 1)) + env.run() + assert str(pool) == "Pool: name=bar level=1 capacity=5)" diff --git a/tests/test_tracer.py b/tests/test_tracer.py index 3a8f0d8b..fe9bbb92 100644 --- a/tests/test_tracer.py +++ b/tests/test_tracer.py @@ -6,6 +6,7 @@ from desmod.component import Component from desmod.queue import Queue +from desmod.pool import Pool from desmod.simulation import simulate pytestmark = pytest.mark.usefixtures('cleandir') @@ -50,6 +51,7 @@ def __init__(self, *args, **kwargs): self.container = simpy.Container(self.env) self.resource = simpy.Resource(self.env) self.queue = Queue(self.env) + self.pool = Pool(self.env) self.a = CompA(self) self.b = CompB(self) hints = {} @@ -62,6 +64,7 @@ def __init__(self, *args, **kwargs): self.auto_probe('container', **hints) self.auto_probe('resource', **hints) self.auto_probe('queue', **hints) + self.auto_probe('pool', **hints) self.trace_some = self.get_trace_function( 'something', vcd={'var_type': 'real'}, log={'level': 'INFO'}) self.trace_other = self.get_trace_function(