In [22]:
"""
Bank renege example

Covers:

- Resources: Resource
- Condition events

Scenario:
  A counter with a random service time and customers who renege. Based on the
  program bank08.py from TheBank tutorial of SimPy 2. (KGM)

"""
import random

import simpy


def source(env, number, interval, counter):
    """Source generates customers randomly"""
    for i in range(number):
        c = customer(env, 'Customer%02d' % i, counter, time_in_bank=12.0)
        env.process(c)
        t = random.expovariate(1.0 / interval)
        yield env.timeout(t)


def customer(env, name, counter, time_in_bank):
    """Customer arrives, is served and leaves."""
    arrive = env.now
    print('%7.4f %s: Here I am' % (arrive, name))

    with counter.request() as req:
        patience = random.uniform(MIN_PATIENCE, MAX_PATIENCE)
        # Wait for the counter or abort at the end of our tether
        results = yield req | env.timeout(patience)

        wait = env.now - arrive

        if req in results:
            # We got to the counter
            print('%7.4f %s: Waited %6.3f' % (env.now, name, wait))

            tib = random.expovariate(1.0 / time_in_bank)
            yield env.timeout(tib)
            print('%7.4f %s: Finished' % (env.now, name))

        else:
            # We reneged
            print('%7.4f %s: RENEGED after %6.3f' % (env.now, name, wait))

# Setup and start the simulation
RANDOM_SEED = 42
NEW_CUSTOMERS = 5  # Total number of customers
INTERVAL_CUSTOMERS = 10.0  # Generate new customers roughly every x seconds
MIN_PATIENCE = 1  # Min. customer patience
MAX_PATIENCE = 3  # Max. customer patience


print('Bank renege')
random.seed(RANDOM_SEED)
env = simpy.Environment()

# Start processes and run
counter = simpy.Resource(env, capacity=1)
env.process(source(env, NEW_CUSTOMERS, INTERVAL_CUSTOMERS, counter))
env.run()

Bank renege
 0.0000 Customer00: Here I am
 0.0000 Customer00: Waited  0.000
 3.8595 Customer00: Finished
10.2006 Customer01: Here I am
10.2006 Customer01: Waited  0.000
12.7265 Customer02: Here I am
13.9003 Customer02: RENEGED after  1.174
23.7507 Customer01: Finished
34.9993 Customer03: Here I am
34.9993 Customer03: Waited  0.000
37.9599 Customer03: Finished
40.4798 Customer04: Here I am
40.4798 Customer04: Waited  0.000
43.1401 Customer04: Finished


In [58]:
import simpy


SIM_DURATION = 100


class House(object):
    def __init__(self, env, house_color):
        self.env = env
        self.color = house_color
        self.store = simpy.Store(env)

    def search_house(self):
        yield self.env.timeout(3)
        yield self.store.get()
        

    def buy(self, new_color):
        self.env.process(self.search_house())
        house.color = new_color
        #return = house.color
    
    def sell(self, value):
        return self.store.put(value)


def seller(env, house):
    #while True:
    yield env.timeout(2)
    house.sell(house.color)
    print('At {0} sold {1}.'.format(env.now, house.color))

def buyer(env, house):
    #while True:
        # Get event for message pipe
    yield env.timeout(5)
    house.buy('Red')
    print('At {0} bought {1}.'.format(env.now, house.color))


# Setup and start the simulation
print('House transaction')
env = simpy.Environment()

house = House(env, 'Blue')
env.process(seller(env, house))
env.process(buyer(env, house))

env.run()

House transaction
At 2 sold Blue.
At 5 bought Red.


In [63]:
import random

import simpy


RANDOM_SEED = 42
SIM_TIME = 100


class BroadcastPipe(object):
    """A Broadcast pipe that allows one process to send messages to many.

    This construct is useful when message consumers are running at
    different rates than message generators and provides an event
    buffering to the consuming processes.

    The parameters are used to create a new
    :class:`~simpy.resources.store.Store` instance each time
    :meth:`get_output_conn()` is called.

    """
    def __init__(self, env, capacity=simpy.core.Infinity):
        self.env = env
        self.capacity = capacity
        self.pipes = []

    def put(self, value):
        """Broadcast a *value* to all receivers."""
        if not self.pipes:
            raise RuntimeError('There are no output pipes.')
        events = [store.put(value) for store in self.pipes]
        return self.env.all_of(events)  # Condition event for all "events"

    def get_output_conn(self):
        """Get a new output connection for this broadcast pipe.

        The return value is a :class:`~simpy.resources.store.Store`.

        """
        pipe = simpy.Store(self.env, capacity=self.capacity)
        self.pipes.append(pipe)
        return pipe


def message_generator(name, env, out_pipe):
    """A process which randomly generates messages."""
    while True:
        # wait for next transmission
        yield env.timeout(random.randint(6, 10))

        # messages are time stamped to later check if the consumer was
        # late getting them.  Note, using event.triggered to do this may
        # result in failure due to FIFO nature of simulation yields.
        # (i.e. if at the same env.now, message_generator puts a message
        # in the pipe first and then message_consumer gets from pipe,
        # the event.triggered will be True in the other order it will be
        # False
        msg = (env.now, '%s says hello at %d' % (name, env.now))
        out_pipe.put(msg)


def message_consumer(name, env, in_pipe):
    """A process which consumes messages."""
    while True:
        # Get event for message pipe
        msg = yield in_pipe.get()

        if msg[0] < env.now:
            # if message was already put into pipe, then
            # message_consumer was late getting to it. Depending on what
            # is being modeled this, may, or may not have some
            # significance
            print('LATE Getting Message: at time %d: %s received message: %s' %
                    (env.now, name, msg[1]))

        else:
            # message_consumer is synchronized with message_generator
            print('at time %d: %s received message: %s.' %
                    (env.now, name, msg[1]))

        # Process does some other work, which may result in missing messages
        yield env.timeout(random.randint(4, 8))


# Setup and start the simulation
print('Process communication')
random.seed(RANDOM_SEED)
env = simpy.Environment()

# For one-to-one or many-to-one type pipes, use Store
pipe = simpy.Store(env)
env.process(message_generator('A', env, pipe))
env.process(message_consumer('B', env, pipe))

print('\nOne-to-one pipe communication\n')
env.run(until=SIM_TIME)

# For one-to many use BroadcastPipe
# (Note: could also be used for one-to-one,many-to-one or many-to-many)
env = simpy.Environment()
bc_pipe = BroadcastPipe(env)

env.process(message_generator('Generator A', env, bc_pipe))
env.process(message_consumer('Consumer A', env, bc_pipe.get_output_conn()))
env.process(message_consumer('Consumer B', env, bc_pipe.get_output_conn()))

print('\nOne-to-many pipe communication\n')
env.run(until=SIM_TIME)

Process communication

One-to-one pipe communication

at time 6: B received message: A says hello at 6.
at time 12: B received message: A says hello at 12.
at time 19: B received message: A says hello at 19.
at time 26: B received message: A says hello at 26.
at time 36: B received message: A says hello at 36.
at time 46: B received message: A says hello at 46.
at time 52: B received message: A says hello at 52.
at time 58: B received message: A says hello at 58.
LATE Getting Message: at time 66: B received message: A says hello at 65
at time 75: B received message: A says hello at 75.
at time 85: B received message: A says hello at 85.
at time 95: B received message: A says hello at 95.

One-to-many pipe communication



RuntimeError: There are no output pipes.

In [17]:
import simpy


def user1(machine):
    m = yield machine.get()
    print('User 1 @', env.now, m)
    yield machine.put(m)

    m = yield machine.get(lambda m: m['id'] == 1)
    print('User 1 @', env.now, m)
    yield env.timeout(5)
    yield machine.put(m)

    m = yield machine.get(lambda m: m['health'] > 98)
    print('User 1 @', env.now, m)
    yield machine.put(m)

def user2(machine):
    m = yield machine.get()
    print('User 2 @', env.now, m)
    yield machine.put(m)

    m = yield machine.get(lambda m: m['id'] == 1)
    print('User 2 @', env.now, m)
    yield machine.put(m)

    m = yield machine.get(lambda m: m['health'] > 98)
    print('User 2 @', env.now, m)
    yield machine.put(m)


env = simpy.Environment()
machine = simpy.FilterStore(env, 3)
machine.put({'health': 100,'id': 0})
machine.put({'health': 95, 'id': 1})
machine.put({'health': 97.2, 'id': 2})

env.process(user1(machine))
env.process(user2(machine))


env.run()

print(machine.get_queue)

User 1 @ 0 {'health': 100, 'id': 0}
User 2 @ 0 {'health': 95, 'id': 1}
User 1 @ 0 {'health': 95, 'id': 1}
User 2 @ 5 {'health': 95, 'id': 1}
User 1 @ 5 {'health': 100, 'id': 0}
User 2 @ 5 {'health': 100, 'id': 0}
[]


In [27]:

class Counter:
    usages = 0


def customer(env, counters):
    counter = yield counters.get()
    yield env.timeout(1)
    counter.usages += 1
    yield counters.put(counter)


env = simpy.Environment()

counters = simpy.FilterStore(env, capacity=3)
counters.items = [Counter() for i in range(counters.capacity)]

for i in range(10):
    env.process(customer(env, counters))
env.run()

for counter in counters.items:
    print(counter.usages)
    
p = env.process(customer(env,counters))

3
3
4


In [1]:
"""
Theses test cases demonstrate the API for shared resources.

"""
# Pytest gets the parameters "env" and "log" from the *conftest.py* file
import pytest

import simpy


#
# Tests for Resource
#


def test_resource(env, log):
    """A *resource* is something with a limited numer of slots that need
    to be requested before and released after the usage (e.g., gas pumps
    at a gas station).

    """
    def pem(env, name, resource, log):
        req = resource.request()
        yield req
        assert resource.count == 1

        yield env.timeout(1)
        resource.release(req)

        log.append((name, env.now))

    resource = simpy.Resource(env, capacity=1)
    assert resource.capacity == 1
    assert resource.count == 0
    env.process(pem(env, 'a', resource, log))
    env.process(pem(env, 'b', resource, log))
    env.run()

    assert log == [('a', 1), ('b',  2)]


def test_resource_context_manager(env, log):
    """The event that ``Resource.request()`` returns can be used as
    Context Manager."""
    def pem(env, name, resource, log):
        with resource.request() as request:
            yield request
            yield env.timeout(1)

        log.append((name, env.now))

    resource = simpy.Resource(env, capacity=1)
    env.process(pem(env, 'a', resource, log))
    env.process(pem(env, 'b', resource, log))
    env.run()

    assert log == [('a', 1), ('b',  2)]


def test_resource_slots(env, log):
    def pem(env, name, resource, log):
        with resource.request() as req:
            yield req
            log.append((name, env.now))
            yield env.timeout(1)

    resource = simpy.Resource(env, capacity=3)
    for i in range(9):
        env.process(pem(env, str(i), resource, log))
    env.run()

    assert log == [('0', 0), ('1', 0), ('2', 0), ('3', 1), ('4', 1), ('5', 1),
                   ('6', 2), ('7', 2), ('8', 2)]


def test_resource_continue_after_interrupt(env):
    """A process may be interrupted while waiting for a resource but
    should be able to continue waiting afterwards."""
    def pem(env, res):
        with res.request() as req:
            yield req
            yield env.timeout(1)

    def victim(env, res):
        try:
            evt = res.request()
            yield evt
            pytest.fail('Should not have gotten the resource.')
        except simpy.Interrupt:
            yield evt
            res.release(evt)
            assert env.now == 1

    def interruptor(env, proc):
        proc.interrupt()
        yield env.exit(0)

    res = simpy.Resource(env, 1)
    env.process(pem(env, res))
    proc = env.process(victim(env, res))
    env.process(interruptor(env, proc))
    env.run()


def test_resource_release_after_interrupt(env):
    """A process needs to release a resource, even it it was interrupted
    and does not continue to wait for it."""
    def blocker(env, res):
        with res.request() as req:
            yield req
            yield env.timeout(1)

    def victim(env, res):
        try:
            evt = res.request()
            yield evt
            pytest.fail('Should not have gotten the resource.')
        except simpy.Interrupt:
            # Dont wait for the resource
            res.release(evt)
            assert env.now == 0
            env.exit()

    def interruptor(env, proc):
        proc.interrupt()
        yield env.exit(0)

    res = simpy.Resource(env, 1)
    env.process(blocker(env, res))
    victim_proc = env.process(victim(env, res))
    env.process(interruptor(env, victim_proc))
    env.run()


def test_resource_cm_exception(env, log):
    """Resource with context manager receives an exception."""
    def process(env, resource, log, raise_):
        try:
            with resource.request() as req:
                yield req
                yield env.timeout(1)
                log.append(env.now)
                if raise_:
                    raise ValueError('Foo')
        except ValueError as err:
            assert err.args == ('Foo',)

    resource = simpy.Resource(env, 1)
    env.process(process(env, resource, log, True))
    # The second process is used to check if it was able to access the
    # resource:
    env.process(process(env, resource, log, False))
    env.run()

    assert log == [1, 2]


def test_resource_with_condition(env):
    def process(env, resource):
        with resource.request() as res_event:
            result = yield res_event | env.timeout(1)
            assert res_event in result

    resource = simpy.Resource(env, 1)
    env.process(process(env, resource))
    env.run()


def test_resource_with_priority_queue(env):
    def process(env, delay, resource, priority, res_time):
        yield env.timeout(delay)
        req = resource.request(priority=priority)
        yield req
        assert env.now == res_time
        yield env.timeout(5)
        resource.release(req)

    resource = simpy.PriorityResource(env, capacity=1)
    env.process(process(env, 0, resource, 2, 0))
    env.process(process(env, 2, resource, 3, 10))
    env.process(process(env, 2, resource, 3, 15))  # Test equal priority
    env.process(process(env, 4, resource, 1, 5))
    env.run()


def test_sorted_queue_maxlen(env):
    """Requests must fail if more than *maxlen* requests happen
    concurrently."""
    resource = simpy.PriorityResource(env, capacity=10)
    resource.put_queue.maxlen = 1

    def process(env, resource):
        resource.request(priority=1)
        try:
            resource.request(priority=1)
            pytest.fail('Expected a RuntimeError')
        except RuntimeError as e:
            assert e.args[0] == 'Cannot append event. Queue is full.'
        yield env.timeout(0)

    env.process(process(env, resource))
    env.run()


def test_get_users(env):
    def process(env, resource):
        with resource.request() as req:
            yield req
            yield env.timeout(1)

    resource = simpy.Resource(env, 1)
    procs = [env.process(process(env, resource)) for i in range(3)]
    env.run(until=1)
    assert [evt.proc for evt in resource.users] == procs[0:1]
    assert [evt.proc for evt in resource.queue] == procs[1:]

    env.run(until=2)
    assert [evt.proc for evt in resource.users] == procs[1:2]
    assert [evt.proc for evt in resource.queue] == procs[2:]


#
# Tests for PreemptiveResource
#


def test_preemptive_resource(env, log):
    def process(id, env, res, delay, prio, log):
        yield env.timeout(delay)
        with res.request(priority=prio) as req:
            try:
                yield req
                yield env.timeout(5)
                log.append((env.now, id))
            except simpy.Interrupt as ir:
                log.append((env.now, id, (ir.cause.by, ir.cause.usage_since)))

    res = simpy.PreemptiveResource(env, capacity=2)
    env.process(process(0, env, res, 0, 1, log))
    env.process(process(1, env, res, 0, 1, log))
    p2 = env.process(process(2, env, res, 1, 0, log))
    env.process(process(3, env, res, 2, 2, log))

    env.run()

    assert log == [(1, 1, (p2, 0)), (5, 0), (6, 2), (10, 3)]


def test_preemptive_resource_timeout_0(env):
    def proc_a(env, resource, prio):
        with resource.request(priority=prio) as req:
            try:
                yield req
                yield env.timeout(1)
                pytest.fail('Should have received an interrupt/preemption.')
            except simpy.Interrupt:
                pass
        yield env.event()

    def proc_b(env, resource, prio):
        with resource.request(priority=prio) as req:
            yield req

    resource = simpy.PreemptiveResource(env, 1)
    env.process(proc_a(env, resource, 1))
    env.process(proc_b(env, resource, 0))

    env.run()


def test_mixed_preemption(env, log):
    def process(id, env, res, delay, prio, preempt, log):
        yield env.timeout(delay)
        with res.request(priority=prio, preempt=preempt) as req:
            try:
                yield req
                yield env.timeout(5)
                log.append((env.now, id))
            except simpy.Interrupt as ir:
                log.append((env.now, id, (ir.cause.by, ir.cause.usage_since)))

    res = simpy.PreemptiveResource(env, 2)
    env.process(process(0, env, res, 0, 1, True, log))
    env.process(process(1, env, res, 0, 1, True, log))
    env.process(process(2, env, res, 1, 0, False, log))
    p3 = env.process(process(3, env, res, 1, 0, True, log))
    env.process(process(4, env, res, 2, 2, True, log))

    env.run()

    assert log == [(1, 1, (p3, 0)), (5, 0), (6, 3), (10, 2), (11, 4)]

#
# Tests for Container
#


def test_container(env, log):
    """A *container* is a resource (of optinally limited capacity) where
    you can put in our take out a discrete or continuous amount of
    things (e.g., a box of lump sugar or a can of milk).  The *put* and
    *get* operations block if the buffer is to full or to empty. If they
    return, the process nows that the *put* or *get* operation was
    successfull.

    """
    def putter(env, buf, log):
        yield env.timeout(1)
        while True:
            yield buf.put(2)
            log.append(('p', env.now))
            yield env.timeout(1)

    def getter(env, buf, log):
        yield buf.get(1)
        log.append(('g', env.now))

        yield env.timeout(1)
        yield buf.get(1)
        log.append(('g', env.now))

    buf = simpy.Container(env, init=0, capacity=2)
    env.process(putter(env, buf, log))
    env.process(getter(env, buf, log))
    env.run(until=5)

    assert log == [('p', 1), ('g', 1), ('g', 2), ('p', 2)]


def test_container_get_queued(env):
    def proc(env, wait, container, what):
        yield env.timeout(wait)
        with getattr(container, what)(1) as req:
            yield req

    container = simpy.Container(env, 1)
    p0 = env.process(proc(env, 0, container, 'get'))
    env.process(proc(env, 1, container, 'put'))
    env.process(proc(env, 1, container, 'put'))
    p3 = env.process(proc(env, 1, container, 'put'))

    env.run(until=1)
    assert [ev.proc for ev in container.put_queue] == []
    assert [ev.proc for ev in container.get_queue] == [p0]

    env.run(until=2)
    assert [ev.proc for ev in container.put_queue] == [p3]
    assert [ev.proc for ev in container.get_queue] == []


def test_initial_container_capacity(env):
    container = simpy.Container(env)
    assert container.capacity == float('inf')


@pytest.mark.parametrize(('error', 'args'), [
    (None, [2, 1]),  # normal case
    (None, [1, 1]),  # init == capacity should be valid
    (None, [1, 0]),  # init == 0 should be valid
    (ValueError, [1, 2]),  # init > capcity
    (ValueError, [0]),  # capacity == 0
    (ValueError, [-1]),  # capacity < 0
    (ValueError, [1, -1]),  # init < 0
])
def test_container_init_capacity(env, error, args):
    args.insert(0, env)
    if error:
        pytest.raises(error, simpy.Container, *args)
    else:
        simpy.Container(*args)


#
# Tests fore Store
#


def test_store(env):
    """A store models the production and consumption of concrete python
    objects (in contrast to containers, where you only now if the *put*
    or *get* operations were successfull but don't get concrete
    objects).

    """
    def putter(env, store, item):
        yield store.put(item)

    def getter(env, store, orig_item):
        item = yield store.get()
        assert item is orig_item

    store = simpy.Store(env, capacity=2)
    item = object()

    # NOTE: Does the start order matter? Need to test this.
    env.process(putter(env, store, item))
    env.process(getter(env, store, item))
    env.run()


@pytest.mark.parametrize('Store', [
    simpy.Store,
    simpy.FilterStore,
])
def test_initial_store_capacity(env, Store):
    store = Store(env)
    assert store.capacity == float('inf')


def test_store_capacity(env):
    simpy.Store(env, 1)
    pytest.raises(ValueError, simpy.Store, env, 0)
    pytest.raises(ValueError, simpy.Store, env, -1)


def test_filter_store(env):
    def pem(env):
        store = simpy.FilterStore(env, capacity=2)

        get_event = store.get(lambda item: item == 'b')
        yield store.put('a')
        assert not get_event.triggered
        yield store.put('b')
        assert get_event.triggered

    env.process(pem(env))
    env.run()

In [8]:
env = simpy.Environment()

test_filter_store(env)
store

NameError: name 'store' is not defined

In [4]:
>>> from simpy.util import start_delayed
>>>
>>> def sub(env):
...     yield env.timeout(1)
...     return 23
...
>>> def parent(env):
...     start = env.now
...     sub_proc = yield start_delayed(env, sub(env), delay=3)
...     #assert env.now - start == 3
...
...     ret = yield sub_proc
...     return ret

env = simpy.Environment()
>>> env.run(env.process(parent(env)))

23